use crate::app::{Command, ConnectTarget, SharedState}; use crate::effects::{AudioProcessor, AudioProcessorSender, TransmitState}; use color_eyre::eyre::{bail, eyre, Error}; use crossbeam::atomic::AtomicCell; use futures_channel::mpsc::UnboundedReceiver; use gloo_timers::future::TimeoutFuture; use js_sys::Float32Array; use manganis::asset; use mumble_protocol::control::ClientControlCodec; use mumble_web2_common::{ProxyOverrides, ServerStatus}; use reqwest::Url; use std::collections::HashMap; use std::future::Future; use std::sync::Arc; use std::time::Duration; use tracing::level_filters::LevelFilter; use tracing::{debug, error, info, instrument}; use wasm_bindgen::prelude::*; use wasm_bindgen_futures::JsFuture; use web_sys::js_sys::{Promise, Reflect, Uint8Array}; use web_sys::AudioContextOptions; use web_sys::AudioData; use web_sys::AudioDecoder; use web_sys::AudioDecoderConfig; use web_sys::AudioDecoderInit; use web_sys::AudioEncoder; use web_sys::AudioEncoderConfig; use web_sys::AudioEncoderInit; use web_sys::AudioWorkletNode; use web_sys::EncodedAudioChunk; use web_sys::EncodedAudioChunkInit; use web_sys::EncodedAudioChunkType; use web_sys::MediaStreamConstraints; use web_sys::MessageEvent; use web_sys::WebTransport; use web_sys::WebTransportBidirectionalStream; use web_sys::WebTransportOptions; use web_sys::WorkletOptions; use web_sys::{console, window}; use web_sys::{AudioContext, AudioDataCopyToOptions}; #[allow(unused)] pub use wasm_bindgen_futures::spawn_local as spawn; #[allow(unused)] #[derive(Clone)] pub struct SpawnHandle; impl SpawnHandle { pub fn spawn(&self, future: F) where F: Future + 'static, { wasm_bindgen_futures::spawn_local(future); } pub fn current() -> Self { SpawnHandle } } /// Web platform implementation using WebTransport and Web Audio API. pub struct WebPlatform; impl super::PlatformInterface for WebPlatform { type AudioSystem = WebAudioSystem; type ConfigSystem = WebConfigSystem; fn init_logging() { // copied from tracing_web example usage use tracing_subscriber::fmt::format::Pretty; use tracing_subscriber::prelude::*; use tracing_web::{performance_layer, MakeWebConsoleWriter}; let fmt_layer = tracing_subscriber::fmt::layer() .with_ansi(false) // Only partially supported across browsers .without_time() // std::time is not available in browsers .with_writer(MakeWebConsoleWriter::new()) // write events to the console .with_filter(LevelFilter::DEBUG); let perf_layer = performance_layer().with_details_from_fields(Pretty::default()); tracing_subscriber::registry() .with(fmt_layer) .with(perf_layer) .init(); info!("logging initiated"); } fn request_permissions() { // No-op on web } async fn load_proxy_overrides() -> color_eyre::Result { let overrides = match option_env!("MUMBLE_WEB2_PROXY_OVERRIDES_URL") { Some(url) => Url::parse(url)?, None => absolute_url("overrides")?, }; info!("loading config from {}", overrides); let config = reqwest::get(overrides) .await? .json::() .await?; Ok(config) } async fn network_connect( target: ConnectTarget, username: String, event_rx: &mut UnboundedReceiver, overrides: &ProxyOverrides, state: SharedState, ) -> Result<(), Error> { let url = match target { ConnectTarget::Proxy(url) => url, ConnectTarget::Direct { .. } => { bail!("web platform requires a proxy URL, not a direct host:port") } }; network_connect(url, username, event_rx, overrides, state).await } async fn get_status( client: &reqwest::Client, _address: &str, ) -> color_eyre::Result { Ok(client .get(absolute_url("status")?) .send() .await? .json::() .await?) } async fn sleep(duration: Duration) { TimeoutFuture::new(duration.as_millis() as u32).await; } } trait ResultExt { fn ey(self) -> Result; } impl ResultExt for Result { fn ey(self) -> Result { match self { Ok(x) => Ok(x), Err(e) => match e.dyn_into::() { Ok(e) => Err(eyre!("{}: {}", e.name(), e.message())), Err(e) => Err(eyre!("{:?}", e)), }, } } } impl ResultExt for Result { fn ey(self) -> Result { self.map_err(|e| JsValue::from(e)).ey() } } pub struct WebAudioSystem { webctx: AudioContext, processors: AudioProcessorSender, } async fn attach_worklet(audio_context: &AudioContext, worklet_url: &str) -> Result<(), Error> { // Create worklets to process mic and speaker audio // Speaker audio processing worklet only required on // browsers that don't support MediaStreamTrackGenerator let options = WorkletOptions::new(); Reflect::set( &options, &"processorOptions".into(), &wasm_bindgen::module(), ) .ey()?; info!("loading mic worklet from {worklet_url:?}"); audio_context .audio_worklet() .ey()? .add_module_with_options(worklet_url, &options) .ey()? .into_future() .await .ey()?; Ok(()) } impl super::AudioSystemInterface for WebAudioSystem { type AudioPlayer = WebAudioPlayer; async fn new() -> Result { // Create MediaStreams to playback decoded audio // The audio context is used to reproduce audio. let webctx = configure_audio_context(); attach_worklet( &webctx, &asset!("/assets/rust_audio_worklet.js").to_string(), ) .await?; let processors = AudioProcessorSender::default(); Ok(WebAudioSystem { webctx, processors }) } fn set_processor(&self, processor: AudioProcessor) { self.processors.store(Some(processor)) } fn start_recording(&mut self, each: impl FnMut(Vec, bool) + 'static) -> Result<(), Error> { let audio_context_worklet = self.webctx.clone(); let processors = self.processors.clone(); spawn(async move { match run_encoder_worklet(&audio_context_worklet, each, processors).await { Ok(node) => info!("created encoder worklet: {:?}", &node), Err(err) => error!("could not create encoder worklet: {err}"), } }); Ok(()) } fn create_player(&mut self) -> Result { let sink_node = AudioWorkletNode::new(&self.webctx, "rust_speaker_worklet").ey()?; // Connect worklet to destination sink_node .connect_with_audio_node(&self.webctx.destination()) .ey()?; // Create callback functions for AudioDecoder let decoder_error = Closure::wrap(Box::new(move |e: JsValue| { error!("error decoding audio {:?}", e); }) as Box); let sink_port = sink_node.port().ey()?; let output = Closure::wrap(Box::new(move |audio_data: AudioData| { // Extract planar PCM from AudioData into an ArrayBuffer or Float32Array // Here we assume f32 samples, 1 channel for brevity. let number_of_frames = audio_data.number_of_frames(); let js_buffer = Float32Array::new_with_length(number_of_frames); let audio_data_copy_to_options = &AudioDataCopyToOptions::new(0); audio_data_copy_to_options.set_format(web_sys::AudioSampleFormat::F32); if let Err(e) = audio_data .copy_to_with_buffer_source(&js_buffer.buffer(), &audio_data_copy_to_options) { error!("could not copy audio data to array {:?}", e); } // Post to the worklet; include sampleRate and channel count if needed. let msg = js_sys::Object::new(); js_sys::Reflect::set(&msg, &"samples".into(), &js_buffer).unwrap(); sink_port.post_message(&msg).unwrap(); audio_data.close(); }) as Box); let audio_decoder = AudioDecoder::new(&AudioDecoderInit::new( decoder_error.as_ref().unchecked_ref(), output.as_ref().unchecked_ref(), )) .ey()?; audio_decoder.configure(&AudioDecoderConfig::new("opus", 1, 48000)); info!("created audio decoder"); // This is required to prevent these from being deallocated decoder_error.forget(); output.forget(); Ok(WebAudioPlayer(audio_decoder)) } } pub struct WebAudioPlayer(AudioDecoder); impl super::AudioPlayerInterface for WebAudioPlayer { fn play_opus(&mut self, payload: &[u8]) { let js_audio_payload = Uint8Array::from(payload); let _ = self.0.decode( &EncodedAudioChunk::new(&EncodedAudioChunkInit::new( &js_audio_payload.into(), 0.0, EncodedAudioChunkType::Key, )) .unwrap(), ); } } // Borrowed from // https://github.com/security-union/videocall-rs/blob/main/videocall-client/src/decode/config.rs#L6 fn configure_audio_context() -> AudioContext { let audio_context_options = AudioContextOptions::new(); audio_context_options.set_sample_rate(48000 as f32); let audio_context = AudioContext::new_with_context_options(&audio_context_options).unwrap(); audio_context } trait PromiseExt { fn into_future(self) -> JsFuture; } impl PromiseExt for Promise { fn into_future(self) -> JsFuture { self.into() } } fn process_audio(frame: &JsValue, processor: &mut AudioProcessor) -> TransmitState { let Ok(samples) = Reflect::get(&frame, &"data".into()) else { return TransmitState::Silent; }; let Ok(samples) = samples.dyn_into::() else { return TransmitState::Silent; }; let input = samples.to_vec(); let mut output = Vec::with_capacity(input.len()); let state = processor.process(&input, 1, &mut output); if !output.is_empty() { samples.copy_from(&output); } state } async fn run_encoder_worklet( audio_context: &AudioContext, mut each: impl FnMut(Vec, bool) + 'static, processors: AudioProcessorSender, ) -> Result { let constraints = MediaStreamConstraints::new(); constraints.set_audio(&JsValue::TRUE); let stream = window() .unwrap() .navigator() .media_devices() .ey()? .get_user_media_with_constraints(&constraints) .ey()? .into_future() .await .ey()? .dyn_into() .map_err(|e| JsError::new(&format!("not a stream: {e:?}"))) .ey()?; let source = audio_context.create_media_stream_source(&stream).ey()?; let worklet_node = AudioWorkletNode::new(audio_context, "rust_mic_worklet").ey()?; let encoder_error: Closure = Closure::new(|e| error!("error encoding audio {:?}", e)); // Shared state to signal terminator between onmessage and output closures // The output closure runs asynchronously after encoding completes let pending_terminator = Arc::new(AtomicCell::new(false)); let pending_terminator_output = pending_terminator.clone(); // This knows what MediaStreamTrackGenerator to use as it closes around it let output: Closure = Closure::new(move |audio_data: EncodedAudioChunk| { let mut array = vec![0u8; audio_data.byte_length() as usize]; audio_data.copy_to_with_u8_slice(&mut array); // Check if this frame was marked as a terminator let is_terminator = pending_terminator_output.swap(false); each(array, is_terminator); }); let audio_encoder = AudioEncoder::new(&AudioEncoderInit::new( encoder_error.as_ref().unchecked_ref(), output.as_ref().unchecked_ref(), )) .unwrap(); // This is required to prevent these from being deallocated encoder_error.forget(); output.forget(); let encoder_config = AudioEncoderConfig::new("opus"); encoder_config.set_number_of_channels(1); encoder_config.set_sample_rate(48000); encoder_config.set_bitrate(72_000.0); audio_encoder.configure(&encoder_config); info!("created audio encoder"); let mut current_processor = AudioProcessor::new(false); let onmessage: Closure = Closure::new(move |event: MessageEvent| { if let Some(new_processor) = processors.take() { current_processor = new_processor; } let frame = event.data(); let state = process_audio(&frame, &mut current_processor); match state { TransmitState::Silent => { // Don't encode or send anything return; } TransmitState::Transmitting => (), // Normal transmission TransmitState::Terminator => { // Mark this as a terminator before encoding pending_terminator.store(true); } } match AudioData::new(frame.unchecked_ref()) { Ok(data) => { let _ = audio_encoder.encode(&data); } Err(err) => { error!( "error creating AudioData object {:?} during event {:?}", err, event, ); } } }); Reflect::set( &Reflect::get(&worklet_node, &"port".into()).ey()?, &"onmessage".into(), onmessage.as_ref(), ) .ey()?; onmessage.forget(); source.connect_with_audio_node(&worklet_node).ey()?; worklet_node .connect_with_audio_node(&audio_context.destination()) .ey()?; Ok(worklet_node) } #[instrument] pub async fn network_connect( address: String, username: String, event_rx: &mut UnboundedReceiver, overrides: &ProxyOverrides, state: SharedState, ) -> Result<(), Error> { info!("connecting"); let object = web_sys::js_sys::Object::new(); Reflect::set( &object, &JsValue::from_str("algorithm"), &JsValue::from_str("sha-256"), ) .ey()?; if let Some(server_hash) = &overrides.cert_hash { let hash = web_sys::js_sys::Uint8Array::from(server_hash.as_slice()); web_sys::js_sys::Reflect::set(&object, &"value".into(), &hash).ey()?; } let array = web_sys::js_sys::Array::new(); array.push(&object); debug!("created option object: {:?}", &object); let mut options = WebTransportOptions::new(); options.set_server_certificate_hashes(&array); debug!("created WebTransportOptions"); console::log_1(&options.clone().into()); let transport = WebTransport::new_with_options(&address, &options).ey()?; debug!("created WebTransport connection object"); console::log_1(&transport.clone().into()); if let Err(e) = wasm_bindgen_futures::JsFuture::from(transport.ready()) .await .ey() { bail!("could not connect to transport: {e}"); } info!("transport is ready"); let stream: WebTransportBidirectionalStream = wasm_bindgen_futures::JsFuture::from(transport.create_bidirectional_stream()) .await .ey()? .into(); let wasm_stream_readable = wasm_streams::ReadableStream::from_raw(stream.readable().into()); let wasm_stream_writable = wasm_streams::WritableStream::from_raw(stream.writable().into()); let read_codec = ClientControlCodec::new(); let write_codec = ClientControlCodec::new(); let reader = asynchronous_codec::FramedRead::new(wasm_stream_readable.into_async_read(), read_codec); let writer = asynchronous_codec::FramedWrite::new(wasm_stream_writable.into_async_write(), write_codec); let (outgoing_send, outgoing_recv) = futures_channel::mpsc::unbounded(); spawn(crate::sender_loop(outgoing_recv, writer)); crate::network_loop(username, state, event_rx, outgoing_send, reader).await } pub fn absolute_url(path: &str) -> Result { let window: web_sys::Window = web_sys::window().expect("no global `window` exists"); let location = window.location(); Ok(Url::parse(&location.href().ey()?)?.join(path)?) } #[derive(Clone, PartialEq)] pub struct WebConfigSystem {} impl super::ConfigSystemInterface for WebConfigSystem { fn new() -> Result { return Ok(WebConfigSystem {}); } fn config_get(&self, key: &str) -> Option where T: serde::de::DeserializeOwned, { // Get Storage let storage = web_sys::window()?.local_storage().ok()??; // Try localStorage first if let Ok(Some(raw)) = storage.get_item(key) { if let Ok(parsed) = serde_json::from_str::(&raw) { return Some(parsed); } } // Fallback to default if deserialization fails or key missing let default_value = config_get_default(key)?; serde_json::from_value::(default_value).ok() } fn config_set(&self, key: &str, value: &T) where T: serde::Serialize, { let storage = window() .and_then(|w| w.local_storage().ok().flatten()) .expect("localStorage not available"); let json_value = serde_json::to_string(value).expect("failed to serialize config value to JSON string"); storage .set_item(key, &json_value) .expect("failed to write to localStorage"); } } fn config_get_default(key: &str) -> Option { let default_config = platform_default_config(); default_config .get(key) .cloned() .or(super::global_default_config().get(key).cloned()) } fn platform_default_config() -> HashMap { serde_json::json!({}) .as_object() .unwrap() .clone() .into_iter() .collect() }