desktop almost working

This commit is contained in:
2024-11-08 18:00:39 -07:00
parent 0f0218159d
commit 86fc23b3cb
6 changed files with 603 additions and 209 deletions
+111 -8
View File
@@ -1,6 +1,22 @@
use crate::app::Command;
use dioxus::hooks::UnboundedReceiver;
use std::{fmt, io};
use anyhow::Result;
use dioxus::hooks::{UnboundedReceiver, UnboundedSender};
use mumble_protocol::control::{ClientControlCodec, ControlPacket};
use mumble_protocol::Serverbound;
use std::net::ToSocketAddrs;
use std::{fmt, io, sync::Arc};
use tokio::net::TcpStream;
use tokio::task::LocalSet;
use tokio_rustls::rustls;
use tokio_rustls::rustls::client::danger::{HandshakeSignatureValid, ServerCertVerifier};
use tokio_rustls::rustls::pki_types::{CertificateDer, ServerName, UnixTime};
use tokio_rustls::rustls::ClientConfig;
use tokio_rustls::rustls::DigitallySignedStruct;
use tokio_rustls::TlsConnector;
use tokio_util::compat::{TokioAsyncReadCompatExt as _, TokioAsyncWriteCompatExt as _};
pub use tokio::task::spawn_local as spawn;
pub use tokio::time::sleep;
pub struct Error(anyhow::Error);
@@ -40,7 +56,19 @@ impl fmt::Debug for Error {
}
}
pub struct AudioContext();
pub struct AudioSystem();
impl AudioSystem {
pub fn new(sender: UnboundedSender<ControlPacket<Serverbound>>) -> Result<Self, Error> {
dbg!("todo");
Ok(AudioSystem())
}
pub fn create_player(&mut self) -> Result<AudioPlayer, Error> {
dbg!("todo");
Ok(AudioPlayer())
}
}
pub struct AudioPlayer();
@@ -50,15 +78,90 @@ impl AudioPlayer {
}
}
#[derive(Debug)]
struct NoCertificateVerification;
impl ServerCertVerifier for NoCertificateVerification {
fn verify_server_cert(
&self,
_end_entity: &CertificateDer<'_>,
_intermediates: &[CertificateDer<'_>],
_server_name: &ServerName<'_>,
_ocsp: &[u8],
_now: UnixTime,
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
Ok(rustls::client::danger::ServerCertVerified::assertion())
}
fn verify_tls12_signature(
&self,
_message: &[u8],
_cert: &CertificateDer<'_>,
_dss: &DigitallySignedStruct,
) -> Result<HandshakeSignatureValid, rustls::Error> {
Ok(HandshakeSignatureValid::assertion())
}
fn verify_tls13_signature(
&self,
_message: &[u8],
_cert: &CertificateDer<'_>,
_dss: &DigitallySignedStruct,
) -> Result<HandshakeSignatureValid, rustls::Error> {
Ok(HandshakeSignatureValid::assertion())
}
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
vec![
rustls::SignatureScheme::RSA_PKCS1_SHA1,
rustls::SignatureScheme::ECDSA_SHA1_Legacy,
rustls::SignatureScheme::RSA_PKCS1_SHA256,
rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
rustls::SignatureScheme::RSA_PKCS1_SHA384,
rustls::SignatureScheme::ECDSA_NISTP384_SHA384,
rustls::SignatureScheme::RSA_PKCS1_SHA512,
rustls::SignatureScheme::ECDSA_NISTP521_SHA512,
rustls::SignatureScheme::RSA_PSS_SHA256,
rustls::SignatureScheme::RSA_PSS_SHA384,
rustls::SignatureScheme::RSA_PSS_SHA512,
rustls::SignatureScheme::ED25519,
rustls::SignatureScheme::ED448,
]
}
}
pub async fn network_connect(
address: String,
username: String,
event_rx: &mut UnboundedReceiver<Command>,
) -> Result<(), Error> {
dbg!("todo");
Ok(())
}
let localset = LocalSet::new();
let _guard = localset.enter();
pub fn create_player(ctx: &AudioContext) -> Result<AudioPlayer, Error> {
Ok(AudioPlayer())
let config = ClientConfig::builder()
.dangerous()
.with_custom_certificate_verifier(Arc::new(NoCertificateVerification))
.with_no_client_auth();
let connector = TlsConnector::from(Arc::new(config));
let addr = format!("{}:{}", address, 64738)
.to_socket_addrs()?
.next()
.unwrap();
let server_tcp = TcpStream::connect(addr).await?;
let server_stream = connector
//.connect("127.0.0.1".try_into()?, server_tcp)
.connect(address.try_into().map_err(anyhow::Error::from)?, server_tcp)
.await?;
let (read_server, write_server) = tokio::io::split(server_stream);
let read_codec = ClientControlCodec::new();
let write_codec = ClientControlCodec::new();
let mut reader = asynchronous_codec::FramedRead::new(read_server.compat(), read_codec);
let mut writer = asynchronous_codec::FramedWrite::new(write_server.compat_write(), write_codec);
super::network_loop(username, event_rx, reader, writer).await
}
+96 -174
View File
@@ -1,26 +1,24 @@
use crate::app::Command;
use crate::bail;
use dioxus::prelude::*;
use futures::select;
use futures::FutureExt;
use futures::SinkExt;
use futures::StreamExt;
use futures_channel::mpsc::UnboundedSender;
use gloo_timers::future::TimeoutFuture;
use mumble_protocol::control::ClientControlCodec;
use mumble_protocol::control::ControlPacket;
use mumble_protocol::control::{msgs, ClientControlCodec};
use mumble_protocol::voice::VoicePacket;
use mumble_protocol::voice::VoicePacketPayload;
use std::collections::HashMap;
use mumble_protocol::Serverbound;
use std::fmt;
use std::io;
use std::time::Duration;
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::spawn_local as spawn;
use wasm_bindgen_futures::JsFuture;
use web_sys::console;
use web_sys::js_sys::Promise;
use web_sys::js_sys::Reflect;
use web_sys::js_sys::Uint8Array;
use web_sys::window;
use web_sys::AudioContext;
use web_sys::AudioContextOptions;
use web_sys::AudioData;
use web_sys::AudioDecoder;
@@ -43,7 +41,11 @@ use web_sys::WebTransportBidirectionalStream;
use web_sys::WebTransportOptions;
use web_sys::WorkletOptions;
pub use web_sys::AudioContext;
pub use wasm_bindgen_futures::spawn_local as spawn;
pub async fn sleep(d: Duration) {
TimeoutFuture::new(d.as_millis() as u32).await
}
pub struct Error(JsValue);
@@ -53,6 +55,12 @@ impl From<anyhow::Error> for Error {
}
}
impl From<io::Error> for Error {
fn from(value: io::Error) -> Self {
Error(JsError::new(&value.to_string()).into())
}
}
impl From<JsValue> for Error {
fn from(value: JsValue) -> Self {
Error(value)
@@ -91,6 +99,83 @@ impl fmt::Debug for Error {
}
}
pub struct AudioSystem(AudioContext);
impl AudioSystem {
pub fn new(sender: UnboundedSender<ControlPacket<Serverbound>>) -> Result<Self, Error> {
// Create MediaStreams to playback decoded audio
// The audio context is used to reproduce audio.
let audio_context = configure_audio_context();
let audio_context_worklet = audio_context.clone();
spawn(async move {
match create_encoder_worklet(&audio_context_worklet, sender).await {
Ok(node) => console::log_2(&"Created audio worklet:".into(), &node),
Err(err) => err.log(),
}
});
Ok(AudioSystem(audio_context))
}
pub fn create_player(&mut self) -> Result<AudioPlayer, Error> {
let audio_context = &self.0;
let audio_stream_generator =
MediaStreamTrackGenerator::new(&MediaStreamTrackGeneratorInit::new("audio"))?;
// Create MediaStream from MediaStreamTrackGenerator
let js_tracks = web_sys::js_sys::Array::new();
js_tracks.push(&audio_stream_generator);
let media_stream = MediaStream::new_with_tracks(&js_tracks)?;
// Create MediaStreamAudioSourceNode
let audio_source = audio_context.create_media_stream_source(&media_stream)?;
// Connect output of audio_source to audio_context (browser audio)
audio_source.connect_with_audio_node(&audio_context.destination())?;
// Create callback functions for AudioDecoder
let error = Closure::wrap(Box::new(move |e: JsValue| {
console::error_1(&e);
}) as Box<dyn FnMut(JsValue)>);
// This knows what MediaStreamTrackGenerator to use as it closes around it
let output = Closure::wrap(Box::new(move |audio_data: AudioData| {
let writable = audio_stream_generator.writable();
if writable.locked() {
return;
}
if let Err(e) = writable.get_writer().map(|writer| {
spawn(async move {
if let Err(e) = JsFuture::from(writer.ready()).await {
console::error_1(&format!("write chunk ready error {:?}", e).into());
}
if let Err(e) = JsFuture::from(writer.write_with_chunk(&audio_data)).await {
console::error_1(&format!("write chunk error {:?}", e).into());
};
writer.release_lock();
});
}) {
console::error_1(&e);
}
}) as Box<dyn FnMut(AudioData)>);
let audio_decoder = AudioDecoder::new(&AudioDecoderInit::new(
error.as_ref().unchecked_ref(),
output.as_ref().unchecked_ref(),
))?;
audio_decoder.configure(&AudioDecoderConfig::new("opus", 1, 48000));
console::log_1(&"Created Audio Decoder".into());
// This is required to prevent these from being deallocated
error.forget();
output.forget();
Ok(AudioPlayer(audio_decoder))
}
}
pub struct AudioPlayer(AudioDecoder);
impl AudioPlayer {
@@ -245,61 +330,6 @@ async fn create_encoder_worklet(
Ok(worklet_node)
}
pub fn create_player(audio_context: &AudioContext) -> Result<AudioPlayer, Error> {
let audio_stream_generator =
MediaStreamTrackGenerator::new(&MediaStreamTrackGeneratorInit::new("audio"))?;
// Create MediaStream from MediaStreamTrackGenerator
let js_tracks = web_sys::js_sys::Array::new();
js_tracks.push(&audio_stream_generator);
let media_stream = MediaStream::new_with_tracks(&js_tracks)?;
// Create MediaStreamAudioSourceNode
let audio_source = audio_context.create_media_stream_source(&media_stream)?;
// Connect output of audio_source to audio_context (browser audio)
audio_source.connect_with_audio_node(&audio_context.destination())?;
// Create callback functions for AudioDecoder
let error = Closure::wrap(Box::new(move |e: JsValue| {
console::error_1(&e);
}) as Box<dyn FnMut(JsValue)>);
// This knows what MediaStreamTrackGenerator to use as it closes around it
let output = Closure::wrap(Box::new(move |audio_data: AudioData| {
let writable = audio_stream_generator.writable();
if writable.locked() {
return;
}
if let Err(e) = writable.get_writer().map(|writer| {
spawn(async move {
if let Err(e) = JsFuture::from(writer.ready()).await {
console::error_1(&format!("write chunk ready error {:?}", e).into());
}
if let Err(e) = JsFuture::from(writer.write_with_chunk(&audio_data)).await {
console::error_1(&format!("write chunk error {:?}", e).into());
};
writer.release_lock();
});
}) {
console::error_1(&e);
}
}) as Box<dyn FnMut(AudioData)>);
let audio_decoder = AudioDecoder::new(&AudioDecoderInit::new(
error.as_ref().unchecked_ref(),
output.as_ref().unchecked_ref(),
))?;
audio_decoder.configure(&AudioDecoderConfig::new("opus", 1, 48000));
console::log_1(&"Created Audio Decoder".into());
// This is required to prevent these from being deallocated
error.forget();
output.forget();
Ok(AudioPlayer(audio_decoder))
}
pub async fn network_connect(
address: String,
username: String,
@@ -360,118 +390,10 @@ pub async fn network_connect(
let read_codec = ClientControlCodec::new();
let write_codec = ClientControlCodec::new();
let mut reader =
let reader =
asynchronous_codec::FramedRead::new(wasm_stream_readable.into_async_read(), read_codec);
let mut writer =
let writer =
asynchronous_codec::FramedWrite::new(wasm_stream_writable.into_async_write(), write_codec);
let (mut send_chan, mut writer_recv_chan) = futures_channel::mpsc::unbounded();
// Spawn worker to send packets.
spawn(async move {
while let Some(msg) = writer_recv_chan.next().await {
if !matches!(msg, ControlPacket::Ping(_) | ControlPacket::UDPTunnel(_)) {
console::log_1(&format!("sending {:#?}", msg).into());
}
if let Err(e) = writer.send(msg).await {
console::error_1(&e.to_string().into());
break;
}
}
});
// Get version packet
let version = match reader.next().await {
Some(Ok(v)) => v,
Some(Err(err)) => bail!("bad version packet: {err:?}"),
None => bail!("no version was recieved"),
};
console::log_1(&"Got version packet".into());
console::log_1(&format!("{:#?}", version).into());
// Send version packet
let mut msg = msgs::Version::new();
msg.set_version(0x000010204);
msg.set_release(format!("{} {}", "mumbleweb2", "6.9.0"));
//msg.set_os("Chrome".to_string());
send_chan.send(msg.into()).await.unwrap();
// Send authenticate packet
let mut msg = msgs::Authenticate::new();
msg.set_username(username);
msg.set_opus(true);
send_chan.send(msg.into()).await.unwrap();
// Spawn worker to send pings
{
let mut send_chan = send_chan.clone();
spawn(async move {
loop {
if let Err(e) = send_chan.send(msgs::Ping::new().into()).await {
console::log_1(&"could not ping".into());
console::log_1(&e.to_string().into());
break;
}
TimeoutFuture::new(3000).await;
}
});
}
// Create MediaStreams to playback decoded audio
// The audio context is used to reproduce audio.
let audio_context = configure_audio_context();
let audio_context_worklet = audio_context.clone();
let packet_sender_worklet = send_chan.clone();
spawn(async move {
match create_encoder_worklet(&audio_context_worklet, packet_sender_worklet).await {
Ok(node) => console::log_2(&"Created audio worklet:".into(), &node),
Err(err) => err.log(),
}
});
// Create map of session_id -> AudioDecoder
let mut decoder_map = HashMap::new();
let mut reader_future = reader.next().fuse();
let mut command_future = event_rx.next();
loop {
select! {
packet = reader_future => {
reader_future = reader.next().fuse();
match packet {
Some(Ok(msg)) => {
if !matches!(msg, ControlPacket::UDPTunnel(_) | ControlPacket::Ping(_)) {
console::log_1(&format!("receiving {:#?}", msg).into());
}
let res = super::accept_packet(msg, &audio_context, &mut decoder_map);
if let Err(err) = res {
err.log();
}
},
Some(Err(err)) => console::error_1(&err.to_string().into()),
None => break,
}
}
command = command_future => {
command_future = event_rx.next();
if let Some(command) = &command {
console::log_1(&format!("commanding {:#?}", command).into());
}
match command {
Some(Command::Disconnect) => break,
Some(command) => {
let res = super::accept_command(command, &mut send_chan);
if let Err(err) = res {
err.log();
}
}
None => continue,
}
}
}
}
let _ = send_chan.close();
Ok(())
super::network_loop(username, event_rx, reader, writer).await
}
+127 -2
View File
@@ -2,15 +2,32 @@ use app::Chat;
use app::Command;
use app::ConnectionState;
use app::STATE;
use asynchronous_codec::FramedRead;
use asynchronous_codec::FramedWrite;
use dioxus::prelude::*;
use futures::io::AsyncRead;
use futures::io::AsyncWrite;
use futures::select;
use futures::FutureExt as _;
use futures::Sink;
use futures::SinkExt as _;
use futures::Stream;
use futures::StreamExt;
use futures_channel::mpsc::UnboundedSender;
pub use imp::spawn;
pub use imp::Error;
use mumble_protocol::control::msgs;
use mumble_protocol::control::ControlCodec;
use mumble_protocol::control::ControlPacket;
use mumble_protocol::voice::VoicePacketPayload;
use mumble_protocol::Clientbound;
use mumble_protocol::Serverbound;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::time::Duration;
pub mod app;
@@ -51,6 +68,114 @@ pub async fn network_entrypoint(mut event_rx: UnboundedReceiver<Command>) {
}
}
pub async fn network_loop<R, W>(
username: String,
event_rx: &mut UnboundedReceiver<Command>,
mut reader: FramedRead<R, ControlCodec<Serverbound, Clientbound>>,
mut writer: FramedWrite<W, ControlCodec<Serverbound, Clientbound>>,
) -> Result<(), Error>
where
R: AsyncRead + Unpin + 'static,
W: AsyncWrite + Unpin + 'static,
{
let (mut send_chan, mut writer_recv_chan) = futures_channel::mpsc::unbounded();
spawn(async move {
while let Some(msg) = writer_recv_chan.next().await {
if !matches!(msg, ControlPacket::Ping(_) | ControlPacket::UDPTunnel(_)) {
eprintln!("sending {:#?}", msg);
}
if let Err(e) = writer.send(msg).await {
eprintln!("ERROR: {}", e);
break;
}
}
});
// Get version packet
let version = match reader.next().await {
Some(Ok(v)) => v,
Some(Err(err)) => bail!("bad version packet: {err:?}"),
None => bail!("no version was recieved"),
};
println!("Got version packet");
println!("{:#?}", version);
// Send version packet
let mut msg = msgs::Version::new();
msg.set_version(0x000010204);
msg.set_release(format!("{} {}", "mumbleweb2", "6.9.0"));
//msg.set_os("Chrome".to_string());
send_chan.send(msg.into()).await.unwrap();
// Send authenticate packet
let mut msg = msgs::Authenticate::new();
msg.set_username(username);
msg.set_opus(true);
send_chan.send(msg.into()).await.unwrap();
// Spawn worker to send pings
{
let mut send_chan = send_chan.clone();
spawn(async move {
loop {
if let Err(_) = send_chan.send(msgs::Ping::new().into()).await {
break;
}
imp::sleep(Duration::from_millis(3000)).await;
}
});
}
let mut audio = imp::AudioSystem::new(send_chan.clone())?;
// Create map of session_id -> AudioDecoder
let mut decoder_map = HashMap::new();
let mut reader_future = reader.next().fuse();
let mut command_future = event_rx.next();
loop {
select! {
packet = reader_future => {
reader_future = reader.next().fuse();
match packet {
Some(Ok(msg)) => {
if !matches!(msg, ControlPacket::UDPTunnel(_) | ControlPacket::Ping(_)) {
println!("receiving {:#?}", msg);
}
let res = accept_packet(msg, &mut audio, &mut decoder_map);
if let Err(err) = res {
err.log();
}
},
Some(Err(err)) => Error::from(err).log(),
None => break,
}
}
command = command_future => {
command_future = event_rx.next();
if let Some(command) = &command {
println!("commanding {:#?}", command);
}
match command {
Some(Command::Disconnect) => break,
Some(command) => {
let res = accept_command(command, &mut send_chan);
if let Err(err) = res {
err.log();
}
}
None => continue,
}
}
}
}
let _ = send_chan.close();
Ok(())
}
fn accept_command(
command: Command,
send_chan: &mut UnboundedSender<ControlPacket<mumble_protocol::Serverbound>>,
@@ -98,7 +223,7 @@ fn accept_command(
fn accept_packet(
msg: ControlPacket<mumble_protocol::Clientbound>,
audio_context: &imp::AudioContext,
audio_context: &mut imp::AudioSystem,
player_map: &mut HashMap<u32, imp::AudioPlayer>,
) -> Result<(), Error> {
match msg {
@@ -116,7 +241,7 @@ fn accept_packet(
let audio_player = match player_map.entry(session_id) {
Entry::Occupied(occupied_entry) => occupied_entry.into_mut(),
Entry::Vacant(vacant_entry) => {
vacant_entry.insert(imp::create_player(&audio_context)?)
vacant_entry.insert(audio_context.create_player()?)
}
};
// This will over time (as users join and leave) leak
+7 -1
View File
@@ -1,5 +1,11 @@
use mumble_webtransport::app;
use mumble_web2::app;
pub fn main() {
#[cfg(feature = "desktop")]
let _guard = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.enter();
dioxus::launch(app::app);
}