diff --git a/Cargo.lock b/Cargo.lock index 9f96b5d..062cfc6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -523,9 +523,9 @@ dependencies = [ [[package]] name = "dioxus" -version = "0.5.1" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e50735a28b303b0d67e1b5637fb57e4711bf2776266290cbc987c0adfdabb55" +checksum = "b8e7fe217b50d43b27528b0f24c89b411f742a3e7564d1cfbf85253f967954db" dependencies = [ "dioxus-config-macro", "dioxus-core", @@ -541,9 +541,9 @@ dependencies = [ [[package]] name = "dioxus-cli-config" -version = "0.5.0" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5d4661064bad2f0b12929faf6c9cea4d94e60217ba6b11ff4146b505a57124b" +checksum = "c7dffc452ed91af6ef772b0d9a5899573f6785314e97c533733ec55413c01df3" dependencies = [ "once_cell", "serde", @@ -553,9 +553,9 @@ dependencies = [ [[package]] name = "dioxus-config-macro" -version = "0.5.0" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebe244197b320dec9e9f38742985fe98c058136ada770df73e9429878ed92863" +checksum = "cb1a1aa34cc04c1f7fcbb7a10791ba773cc02d834fe3ec1fe05647699f3b101f" dependencies = [ "proc-macro2", "quote", @@ -563,9 +563,9 @@ dependencies = [ [[package]] name = "dioxus-core" -version = "0.5.1" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "088daa3f45aaa729e9eef32dc0a9393dd709ee906b092089e5839cad1cad7c85" +checksum = "3730d2459ab66951cedf10b09eb84141a6eda7f403c28057cbe010495be156b7" dependencies = [ "futures-channel", "futures-util", @@ -574,15 +574,16 @@ dependencies = [ "rustc-hash", "serde", "slab", + "slotmap", "tracing", "tracing-subscriber", ] [[package]] name = "dioxus-core-macro" -version = "0.5.1" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e29a07448245451334eec2883a394e207f28caedf0a57fd1a903e9ccea0b9531" +checksum = "0d9c0dfe0e6a46626fa716c4aa1d2ccb273441337909cfeacad5bb6fcfb947d2" dependencies = [ "constcat", "convert_case", @@ -601,9 +602,9 @@ checksum = "2ea539174bb236e0e7dc9c12b19b88eae3cb574dedbd0252a2d43ea7e6de13e2" [[package]] name = "dioxus-fullstack" -version = "0.5.2" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db7fb1a22ff7af8756bc9506eebfbecf374b1c8c57f087c85c752ba8bd767fce" +checksum = "b80f0ac18166302341164e681322e0385131c08a11c3cc1c51ee8df799ab0d3d" dependencies = [ "async-trait", "base64", @@ -624,9 +625,9 @@ dependencies = [ [[package]] name = "dioxus-hooks" -version = "0.5.1" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a8c7019308a6d8381fce84a51006f207407af265aebc5425871399c98d788e4" +checksum = "fa8f9c661eea82295219d25555d5c0b597e74186b029038ceb5e3700ccbd4380" dependencies = [ "dioxus-core", "dioxus-debug-cell", @@ -641,9 +642,9 @@ dependencies = [ [[package]] name = "dioxus-hot-reload" -version = "0.5.0" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d5a28a2af6655473c6521fb5a428538807b985e8e5f1a8c30e2ab71bd54e637" +checksum = "77d01246cb1b93437fb0bbd0dd11cfc66342d86b4311819e76654f2017ce1473" dependencies = [ "dioxus-core", "dioxus-html", @@ -655,9 +656,9 @@ dependencies = [ [[package]] name = "dioxus-html" -version = "0.5.1" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8d612d9732f32adc1852e13e1387a9d5baa710b0b004641b5123def53065c8d" +checksum = "f01a0826f179adad6ea8d6586746e8edde0c602cc86f4eb8e5df7a3b204c4018" dependencies = [ "async-trait", "dioxus-core", @@ -678,9 +679,9 @@ dependencies = [ [[package]] name = "dioxus-html-internal-macro" -version = "0.5.0" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1799f34affdb158f6ebec23b46b11f9e65de0bbadbbb781dc68c3eddfe6fd32b" +checksum = "0b96f35a608d0ab8f4ca6f66ce1828354e4ebd41580b12454f490221a11da93c" dependencies = [ "convert_case", "proc-macro2", @@ -690,23 +691,24 @@ dependencies = [ [[package]] name = "dioxus-interpreter-js" -version = "0.5.1" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc68a22e33562317b40ccc7b2d140017e510745c5d7e062e911c6a4f9042e4b1" +checksum = "351fad098c657d14f3ac2900362d2b86e83c22c4c620a404839e1ab628f3395b" dependencies = [ "js-sys", "md5", "sledgehammer_bindgen", "sledgehammer_utils", "wasm-bindgen", + "wasm-bindgen-futures", "web-sys", ] [[package]] name = "dioxus-lib" -version = "0.5.1" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9af36a9f985ad68783acf62dc276b0a8f0aa928f1c3b42f78e2ae222b19d445c" +checksum = "8bd39b2c41dd1915dcb91d914ea72d8b646f1f8995aaeff82816b862ec586ecd" dependencies = [ "dioxus-core", "dioxus-core-macro", @@ -718,9 +720,9 @@ dependencies = [ [[package]] name = "dioxus-router" -version = "0.5.0" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c4cbbc1aff811aa3715c94a7ca0375be0be34356aba7f3897328d844323519b" +checksum = "c235c5dbeb528c0c2b0424763da812e7500df69b82eddac54db6f4975e065c5f" dependencies = [ "dioxus-cli-config", "dioxus-lib", @@ -737,9 +739,9 @@ dependencies = [ [[package]] name = "dioxus-router-macro" -version = "0.5.1" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67fcb6e4a203dc816bca720c638562d5a782695dc71d6598de088ce50ba2a0f8" +checksum = "2e7cd1c5137ba361f2150cdea6b3bc9ddda7b1af84b22c9ee6b5499bf43e1381" dependencies = [ "proc-macro2", "quote", @@ -749,9 +751,9 @@ dependencies = [ [[package]] name = "dioxus-rsx" -version = "0.5.1" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "faa83056104f63fdc6f7f9fc1137208c7b7648bf88d6c86db1e095f15297a0f3" +checksum = "15c400bc8a779107d8f3a67b14375db07dbd2bc31163bf085a8e9097f36f7179" dependencies = [ "dioxus-core", "internment", @@ -764,9 +766,9 @@ dependencies = [ [[package]] name = "dioxus-signals" -version = "0.5.1" +version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fd04e2b3739d5c12255005cbf3185446e750fc2b3eeee9fa4e83c989132415c" +checksum = "7e3e224cd3d3713f159f0199fc088c292a0f4adb94996b48120157f6a8f8342d" dependencies = [ "dioxus-core", "futures-channel", @@ -780,9 +782,9 @@ dependencies = [ [[package]] name = "dioxus-web" -version = "0.5.1" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a75cfbe115b193a05c649a80a54a90a6bdd88779694a617daf27d287d6fb944f" +checksum = "e0855ac81fcc9252a0863930a7a7cbb2504fc1b6efe893489c8d0e23aaeb2cb9" dependencies = [ "async-trait", "console_error_panic_hook", @@ -805,9 +807,9 @@ dependencies = [ [[package]] name = "dioxus_server_macro" -version = "0.5.1" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b723da95503f739f9fc5fb23b6ad1e456f1438a496a0427210fa94e4e0d5fe9" +checksum = "b5ef2cad17001c1155f019cb69adbacd620644566d78a77d0778807bb106a337" dependencies = [ "convert_case", "proc-macro2", @@ -1069,9 +1071,9 @@ dependencies = [ [[package]] name = "generational-box" -version = "0.5.1" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f132919c96b85c02a067ceae965fd50ace57111e2f55c7384b95ac191f4d966b" +checksum = "557cf2cbacd0504c6bf8c29f52f8071e0de1d9783346713dc6121d7fa1e5d0e0" dependencies = [ "parking_lot", ] @@ -2427,6 +2429,16 @@ dependencies = [ "rustc-hash", ] +[[package]] +name = "slotmap" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbff4acf519f630b3a3ddcfaea6c06b42174d9a44bc70c620e9ed1649d58b82a" +dependencies = [ + "serde", + "version_check", +] + [[package]] name = "smallvec" version = "1.13.2" diff --git a/Cargo.toml b/Cargo.toml index 4ef6625..08f0843 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,8 +4,8 @@ version = "0.1.0" edition = "2021" [dependencies] -dioxus = { version = "0.5.1", features = ["web"] } -dioxus-web = "0.5.1" +dioxus = { version = "0.5.6", features = ["web"] } +dioxus-web = "0.5.6" manganis = "0.2.2" once_cell = "1.19.0" asynchronous-codec = "0.6.2" diff --git a/public/rust_mic_worklet.js b/public/rust_mic_worklet.js index 1b40f1f..2b77709 100644 --- a/public/rust_mic_worklet.js +++ b/public/rust_mic_worklet.js @@ -1,5 +1,5 @@ const SAMPLE_RATE = 48000; -const PACKET_SAMPLES = 480; +const PACKET_SAMPLES = 960; class RustWorklet extends AudioWorkletProcessor { constructor(options) { @@ -20,7 +20,7 @@ class RustWorklet extends AudioWorkletProcessor { const data = { format: 'f32', sampleRate: SAMPLE_RATE, - numberOfFrames: this.buffer_offset, + numberOfFrames: PACKET_SAMPLES, numberOfChannels: 1, timestamp: this.timestamp, data: this.buffer.slice(0, PACKET_SAMPLES), diff --git a/src/app.rs b/src/app.rs index 2450577..577330f 100644 --- a/src/app.rs +++ b/src/app.rs @@ -2,23 +2,59 @@ use dioxus::prelude::*; +use super::Command; +use super::Command::*; +use super::ConnectionState::{self, *}; + pub struct State { - pub status: GlobalSignal, + pub status: GlobalSignal, } pub static STATE: State = State { - status: Signal::global(|| "Starting...".to_owned()), + status: Signal::global(|| Disconnected), }; pub fn app() -> Element { - use_coroutine(|_: UnboundedReceiver<()>| super::network_entrypoint()); + let net = use_coroutine(|rx: UnboundedReceiver| super::network_entrypoint(rx)); + let mut username = use_signal(|| "".to_string()); + let default_address = option_env!("MUMBLEWEB2_WEBTRANSPORT_SERVER_ADDRESS").unwrap_or(""); + let mut address = use_signal(|| default_address.to_string()); let status = &STATE.status; rsx!( div { - "Hello, World!" + input { + placeholder: "username", + value: "{username.read()}", + autofocus: "true", + oninput: move |evt| username.set(evt.value().clone()), + } + br {} + input { + placeholder: "server address", + value: "{address.read()}", + autofocus: "true", + oninput: move |evt| address.set(evt.value().clone()), + } + br {} + match *status.read() { + Disconnected => rsx!{ + button { + onclick: move |event| net.send(Connect{address: address.read().clone(), username: username.read().clone(), hash: "[39, 96, 204, 127, 26, 59, 35, 209, 197, 103, 192, 6, 3, 98, 203, 228, 124, 46, 247, 72, 44, 224, 123, 238, 218, 140, 128, 100, 115, 14, 23, 233]".to_string()}), + "Connect!" + } + }, + Connecting => rsx!{ + "Connecting..." + }, + Connected => rsx!{ + button { + onclick: move |event| net.send(Disconnect), + "Disconnect" + } + } + } br {} - "{status}" } ) } diff --git a/src/lib.rs b/src/lib.rs index acb095a..80dd684 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,12 @@ pub mod app; +use dioxus::prelude::*; + use anyhow::Error; use app::STATE; use async_std::channel::Sender; +use futures::select; +use futures::FutureExt; use futures::SinkExt; use futures::StreamExt; use manganis::{file, mg}; @@ -206,10 +210,10 @@ async fn create_encoder_worklet( download_buffer.borrow_mut().push(array.clone()); if download_buffer.borrow().len() > 200 { //download_data(download_buffer.borrow().to_vec(), "download_buffer.opus"); - download_data( - ass::encode(download_buffer.borrow().to_vec(), 960, 0), - "download_buffer.opus", - ); + //download_data( + // ass::encode(download_buffer.borrow().to_vec(), 960, 0), + // "download_buffer.opus", + //); download_buffer.borrow_mut().clear(); } @@ -221,7 +225,7 @@ async fn create_encoder_worklet( payload: VoicePacketPayload::Opus(array.into(), false), position_info: None, }))); - sequence_num = sequence_num.wrapping_add(1); + sequence_num = sequence_num.wrapping_add(2); }); let audio_encoder = AudioEncoder::new(&AudioEncoderInit::new( @@ -338,209 +342,245 @@ fn create_decoder(audio_context: &AudioContext) -> AudioDecoder { audio_decoder } -pub async fn network_entrypoint() { - // This sleep is to allow the user to interact with the window so the MediaStream - // can be created. This works around Chrome's autoplay policy rules. This will - // eventually be unnecessary when we have a proper GUI. - async_std::task::sleep(Duration::from_millis(3000)).await; +pub enum ConnectionState { + Disconnected, + Connecting, + Connected, +} - console::log_1(&"Rust via WASM!".into()); +pub enum Command { + Connect { + address: String, + username: String, + hash: String, + }, + Disconnect, +} - let Ok(server_hash): Result, _> = env!("WEBTRANSPORT_SERVER_HASH") - .trim_matches(&['[', ']']) - .split(',') - .map(|x| x.trim().parse()) - .collect() - else { - panic!("could not parse server hash") - }; - let hash = web_sys::js_sys::Uint8Array::from(server_hash.as_slice()); +pub async fn network_entrypoint(mut event_rx: UnboundedReceiver) { + loop { + let Some(Command::Connect { + address, + username, + hash, + }) = event_rx.next().await + else { + panic!("Did not receive connect command") + }; - let object = web_sys::js_sys::Object::new(); + *STATE.status.write() = ConnectionState::Connecting; - Reflect::set( - &object, - &JsValue::from_str("algorithm"), - &JsValue::from_str("sha-256"), - ) - .unwrap(); + console::log_1(&"Rust via WASM!".into()); - web_sys::js_sys::Reflect::set(&object, &"value".into(), &hash).unwrap(); + let Ok(server_hash): Result, _> = env!("WEBTRANSPORT_SERVER_HASH") + .trim_matches(&['[', ']']) + .split(',') + .map(|x| x.trim().parse()) + .collect() + else { + panic!("could not parse server hash") + }; + let hash = web_sys::js_sys::Uint8Array::from(server_hash.as_slice()); - let array = web_sys::js_sys::Array::new(); - array.push(&object); + let object = web_sys::js_sys::Object::new(); - console::log_1(&object.clone().into()); - console::log_1(&"Created option object!".into()); + Reflect::set( + &object, + &JsValue::from_str("algorithm"), + &JsValue::from_str("sha-256"), + ) + .unwrap(); - let mut options = WebTransportOptions::new(); - options.server_certificate_hashes(&array); + web_sys::js_sys::Reflect::set(&object, &"value".into(), &hash).unwrap(); - console::log_1(&"Created WebTransportOptions!".into()); + let array = web_sys::js_sys::Array::new(); + array.push(&object); - *STATE.status.write() = "Connecting to WebTransport...".into(); + console::log_1(&object.clone().into()); + console::log_1(&"Created option object!".into()); - let transport = match WebTransport::new_with_options( - "https://localhost:4433/?hostname=ohea.xyz&port=64738&username=test", - &options, - ) { - Ok(x) => x, - Err(e) => { - console::log_1(&e.into()); - panic!(); - } - }; + let mut options = WebTransportOptions::new(); + options.server_certificate_hashes(&array); - console::log_1(&"Created WebTransport connection object.".into()); + console::log_1(&"Created WebTransportOptions!".into()); - console::log_1(&transport.clone().into()); - - if let Err(e) = wasm_bindgen_futures::JsFuture::from(transport.ready()).await { - console::log_1(&e.into()); - panic!(); - } - - console::log_1(&"Transport is ready.".into()); - - *STATE.status.write() = "Creating stream...".into(); - - let stream: web_sys::WebTransportBidirectionalStream = - match wasm_bindgen_futures::JsFuture::from(transport.create_bidirectional_stream()).await { - Ok(x) => x.into(), + let transport = match WebTransport::new_with_options(&address, &options) { + Ok(x) => x, Err(e) => { console::log_1(&e.into()); panic!(); } }; - let wasm_stream_readable = wasm_streams::ReadableStream::from_raw(stream.readable().into()); - let wasm_stream_writable = wasm_streams::WritableStream::from_raw(stream.writable().into()); + console::log_1(&"Created WebTransport connection object.".into()); - let read_codec = ClientControlCodec::new(); - let write_codec = ClientControlCodec::new(); + console::log_1(&transport.clone().into()); - let mut reader = - asynchronous_codec::FramedRead::new(wasm_stream_readable.into_async_read(), read_codec); - let mut writer = - asynchronous_codec::FramedWrite::new(wasm_stream_writable.into_async_write(), write_codec); - - let (send_chan, writer_recv_chan) = async_std::channel::unbounded(); - - spawn(async move { - while let Ok(msg) = writer_recv_chan.recv().await { - if let Err(e) = writer.send(msg).await { - console::log_1(&e.to_string().into()); - break; - } + if let Err(e) = wasm_bindgen_futures::JsFuture::from(transport.ready()).await { + console::log_1(&e.into()); + panic!(); } - }); - // Get version packet - let version = reader.next().await.unwrap().unwrap(); - console::log_1(&"Got version packet".into()); - console::log_1(&format!("{:#?}", version).into()); + console::log_1(&"Transport is ready.".into()); - // Send version packet - let mut msg = msgs::Version::new(); - msg.set_version(0x000010204); - msg.set_release(format!("{} {}", "mumbleweb2", "6.9.0")); - //msg.set_os("Chrome".to_string()); - send_chan.send(msg.into()).await.unwrap(); - console::log_1(&"Sent version packet".into()); + let stream: web_sys::WebTransportBidirectionalStream = + match wasm_bindgen_futures::JsFuture::from(transport.create_bidirectional_stream()) + .await + { + Ok(x) => x.into(), + Err(e) => { + console::log_1(&e.into()); + panic!(); + } + }; - // Send authenticate packet - let mut msg = msgs::Authenticate::new(); - msg.set_username("mumbleweb2".to_string()); - msg.set_opus(true); - send_chan.send(msg.into()).await.unwrap(); - console::log_1(&"Sent authenticate packet".into()); + let wasm_stream_readable = wasm_streams::ReadableStream::from_raw(stream.readable().into()); + let wasm_stream_writable = wasm_streams::WritableStream::from_raw(stream.writable().into()); + + let read_codec = ClientControlCodec::new(); + let write_codec = ClientControlCodec::new(); + + let mut reader = + asynchronous_codec::FramedRead::new(wasm_stream_readable.into_async_read(), read_codec); + let mut writer = asynchronous_codec::FramedWrite::new( + wasm_stream_writable.into_async_write(), + write_codec, + ); + + let (send_chan, writer_recv_chan) = async_std::channel::unbounded(); - // Spawn worker to send pings - { - let send_chan = send_chan.clone(); spawn(async move { - loop { - console::log_1(&"Sending ping".into()); - if let Err(e) = send_chan.send(msgs::Ping::new().into()).await { + while let Ok(msg) = writer_recv_chan.recv().await { + if let Err(e) = writer.send(msg).await { console::log_1(&e.to_string().into()); break; } - - async_std::task::sleep(Duration::from_millis(3000)).await; } }); - } - // Create MediaStreams to playback decoded audio - // The audio context is used to reproduce audio. - let audio_context = configure_audio_context(); + // Get version packet + let version = reader.next().await.unwrap().unwrap(); + console::log_1(&"Got version packet".into()); + console::log_1(&format!("{:#?}", version).into()); - let audio_context_worklet = audio_context.clone(); - let packet_sender_worklet = send_chan.clone(); - spawn(async move { - match create_encoder_worklet(&audio_context_worklet, packet_sender_worklet).await { - Ok(node) => console::log_2(&"Created audio worklet:".into(), &node), - Err(err) => console::error_1(&err), - } - }); + // Send version packet + let mut msg = msgs::Version::new(); + msg.set_version(0x000010204); + msg.set_release(format!("{} {}", "mumbleweb2", "6.9.0")); + //msg.set_os("Chrome".to_string()); + send_chan.send(msg.into()).await.unwrap(); + console::log_1(&"Sent version packet".into()); - // Create map of session_id -> AudioDecoder - let mut decoder_map = HashMap::new(); + // Send authenticate packet + let mut msg = msgs::Authenticate::new(); + msg.set_username(username); + msg.set_opus(true); + send_chan.send(msg.into()).await.unwrap(); + console::log_1(&"Sent authenticate packet".into()); - loop { - match reader.next().await { - Some(Ok(msg)) => { - match msg { - ControlPacket::UDPTunnel(u) => { - match *u.clone() { - mumble_protocol::voice::VoicePacket::Audio { - _dst, - target, - session_id, - seq_num, - payload, - position_info, - } => { - // Get or create audio decoder for this user - let audio_decoder = decoder_map - .entry(session_id) - .or_insert_with(|| create_decoder(&audio_context)); - // This will over time (as users join and leave) leak - // AudioDecoders, MediaStreamTrackGenerators, MediaStreams, and MediaStreamAudioSourceNodes. - // A better way to handle this would be to delete and create all the audio - // infra on channel join and update it as new users join the channel, dropping - // any audio packets that come in the meantime. - if let VoicePacketPayload::Opus(audio_payload, end_bit) = payload { - let js_audio_payload = Uint8Array::from(audio_payload.as_ref()); - audio_decoder.decode( - &EncodedAudioChunk::new(&EncodedAudioChunkInit::new( - &js_audio_payload.into(), - 0.0, - EncodedAudioChunkType::Key, - )) - .unwrap(), - ); - //console::log_1(&"Oueued audio chunk for decoding".into()); - } - } - _ => { - unreachable!("TCP tunnels UDP packets should not contain pings"); - // I think? - } - } + // Spawn worker to send pings + { + let send_chan = send_chan.clone(); + spawn(async move { + loop { + console::log_1(&"Sending ping".into()); + if let Err(e) = send_chan.send(msgs::Ping::new().into()).await { + console::log_1(&"could not ping".into()); + console::log_1(&e.to_string().into()); + break; } - _ => { - console::log_1(&format!("{:#?}", msg).into()); + + async_std::task::sleep(Duration::from_millis(3000)).await; + } + }); + } + + // Create MediaStreams to playback decoded audio + // The audio context is used to reproduce audio. + let audio_context = configure_audio_context(); + + let audio_context_worklet = audio_context.clone(); + let packet_sender_worklet = send_chan.clone(); + spawn(async move { + match create_encoder_worklet(&audio_context_worklet, packet_sender_worklet).await { + Ok(node) => console::log_2(&"Created audio worklet:".into(), &node), + Err(err) => console::error_1(&err), + } + }); + + *STATE.status.write() = ConnectionState::Connected; + + // Create map of session_id -> AudioDecoder + let mut decoder_map = HashMap::new(); + + loop { + select! { + packet = reader.next().fuse() => { + match packet { + Some(Ok(msg)) => accept_packet(msg, &audio_context, &mut decoder_map), + Some(Err(err)) => panic!("{err}"), + None => break, + } + } + command = event_rx.next() => { + match command { + Some(Command::Disconnect) => break, + _ => continue, } } } - None => { - break; - } - Some(Err(e)) => { - console::log_1(&e.to_string().into()); - break; + } + send_chan.close(); + *STATE.status.write() = ConnectionState::Disconnected; + } +} + +fn accept_packet( + msg: ControlPacket, + audio_context: &AudioContext, + decoder_map: &mut HashMap, +) { + match msg { + ControlPacket::UDPTunnel(u) => { + match *u.clone() { + mumble_protocol::voice::VoicePacket::Audio { + _dst, + target, + session_id, + seq_num, + payload, + position_info, + } => { + // Get or create audio decoder for this user + let audio_decoder = decoder_map + .entry(session_id) + .or_insert_with(|| create_decoder(&audio_context)); + // This will over time (as users join and leave) leak + // AudioDecoders, MediaStreamTrackGenerators, MediaStreams, and MediaStreamAudioSourceNodes. + // A better way to handle this would be to delete and create all the audio + // infra on channel join and update it as new users join the channel, dropping + // any audio packets that come in the meantime. + if let VoicePacketPayload::Opus(audio_payload, end_bit) = payload { + let js_audio_payload = Uint8Array::from(audio_payload.as_ref()); + audio_decoder.decode( + &EncodedAudioChunk::new(&EncodedAudioChunkInit::new( + &js_audio_payload.into(), + 0.0, + EncodedAudioChunkType::Key, + )) + .unwrap(), + ); + //console::log_1(&"Oueued audio chunk for decoding".into()); + } + } + _ => { + unreachable!("TCP tunnels UDP packets should not contain pings"); + // I think? + } } } + _ => { + console::log_1(&format!("{:#?}", msg).into()); + } } }