pub mod app; use app::ChannelId; use app::Chat; use app::Command; use app::ConnectionState; use dioxus::prelude::*; use app::STATE; use futures::select; use futures::FutureExt; use futures::SinkExt; use futures::StreamExt; use futures_channel::mpsc::UnboundedSender; use gloo_timers::future::TimeoutFuture; use manganis::{file, mg}; use markdown; use mumble_protocol::control::ControlPacket; use mumble_protocol::control::{msgs, ClientControlCodec}; use mumble_protocol::voice::VoicePacket; use mumble_protocol::voice::VoicePacketPayload; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::fmt; use wasm_bindgen::prelude::*; use wasm_bindgen_futures::spawn_local as spawn; use wasm_bindgen_futures::{future_to_promise, JsFuture}; use web_sys::console; use web_sys::js_sys::Promise; use web_sys::js_sys::Reflect; use web_sys::js_sys::Uint8Array; use web_sys::window; use web_sys::AudioContext; use web_sys::AudioContextOptions; use web_sys::AudioData; use web_sys::AudioDecoder; use web_sys::AudioDecoderConfig; use web_sys::AudioDecoderInit; use web_sys::AudioEncoder; use web_sys::AudioEncoderConfig; use web_sys::AudioEncoderInit; use web_sys::AudioWorkletNode; use web_sys::EncodedAudioChunk; use web_sys::EncodedAudioChunkInit; use web_sys::EncodedAudioChunkType; use web_sys::MediaStream; use web_sys::MediaStreamConstraints; use web_sys::MediaStreamTrackGenerator; use web_sys::MediaStreamTrackGeneratorInit; use web_sys::MessageEvent; use web_sys::WebTransport; use web_sys::WebTransportBidirectionalStream; use web_sys::WebTransportOptions; use web_sys::WorkletOptions; mod ass { use byteorder::{ByteOrder, LittleEndian}; use ogg::PacketWriter; const VER: &str = "ballz"; const fn to_samples(ms: u32) -> usize { ((S_PS * ms) / 1000) as usize } const fn calc_sr_u64(val: u64, from: u32, to: u32) -> u64 { (val * to as u64) / from as u64 } pub fn encode(pre_encoded_frames: Vec>, frame_size: usize, skip: u16) -> Vec { let mut buffer: Vec = Vec::new(); let mut packet_writer = PacketWriter::new(&mut buffer); // Hardcoded serial number let serial = 12345; let skip_48 = calc_sr_u64(skip.into(), 48000, 48000); let opus_head: [u8; 19] = [ b'O', b'p', b'u', b's', b'H', b'e', b'a', b'd', 1, 1, // NUM_CHANNELS = 1 0, 0, 0, 0, 0, 0, 0, 0, 0, ]; let mut head = opus_head; LittleEndian::write_u16(&mut head[10..12], skip_48 as u16); LittleEndian::write_u32(&mut head[12..16], 48000); let mut opus_tags: Vec = Vec::with_capacity(60); let vendor_str = format!("ogg-opus {}", VER); opus_tags.extend(b"OpusTags"); let mut len_bf = [0u8; 4]; LittleEndian::write_u32(&mut len_bf, vendor_str.len() as u32); opus_tags.extend(&len_bf); opus_tags.extend(vendor_str.bytes()); opus_tags.extend(&[0u8; 4]); let _ = packet_writer.write_packet(&head, serial, ogg::PacketWriteEndInfo::EndPage, 0); let _ = packet_writer.write_packet(&opus_tags, serial, ogg::PacketWriteEndInfo::EndPage, 0); for (i, frame) in pre_encoded_frames.iter().enumerate() { let is_last = i == pre_encoded_frames.len() - 1; //let granule_pos = 0; let granule_pos = calc_sr_u64((skip as usize + (i + 1) * frame_size) as u64, 48000, 48000); let _ = packet_writer.write_packet( frame.clone(), serial, if is_last { ogg::PacketWriteEndInfo::EndStream } else { ogg::PacketWriteEndInfo::NormalPacket }, granule_pos, ); } buffer } } // Function to download data as a file pub fn download_data(data: Vec, filename: &str) -> Result<(), JsValue> { use wasm_bindgen::prelude::*; use web_sys::{window, Blob, HtmlAnchorElement, Url}; // Create a new Blob from the data let array = web_sys::js_sys::Uint8Array::from(&data[..]); let blob = Blob::new_with_u8_array_sequence(&vec![array].into())?; // Create a URL for the Blob let url = Url::create_object_url_with_blob(&blob)?; // Create an anchor element and set its href to the Blob URL let document = window().unwrap().document().unwrap(); let a = document .create_element("a")? .dyn_into::()?; a.set_href(&url); a.set_download(filename); // Append the anchor to the document body, click it, and remove it document.body().unwrap().append_child(&a)?; a.click(); document.body().unwrap().remove_child(&a)?; // Revoke the object URL to free resources Url::revoke_object_url(&url)?; Ok(()) } // Borrowed from // https://github.com/security-union/videocall-rs/blob/main/videocall-client/src/decode/config.rs#L6 fn configure_audio_context() -> AudioContext { let mut audio_context_options = AudioContextOptions::new(); audio_context_options.sample_rate(48000 as f32); let audio_context = AudioContext::new_with_context_options(&audio_context_options).unwrap(); audio_context } trait PromiseExt { fn into_future(self) -> JsFuture; } impl PromiseExt for Promise { fn into_future(self) -> JsFuture { self.into() } } async fn create_encoder_worklet( audio_context: &AudioContext, packets: UnboundedSender>, ) -> Result { let stream = window() .unwrap() .navigator() .media_devices()? .get_user_media_with_constraints(MediaStreamConstraints::new().audio(&JsValue::TRUE))? .into_future() .await? .dyn_into() .map_err(|e| JsError::new(&format!("not a stream: {e:?}")))?; let options = WorkletOptions::new(); Reflect::set( &options, &"processorOptions".into(), &wasm_bindgen::module(), )?; let module = "rust_mic_worklet.js"; console::log_1(&format!("Loading mic worklet from {module:?}").into()); audio_context .audio_worklet()? .add_module_with_options(module, &options)? .into_future() .await?; let source = audio_context.create_media_stream_source(&stream)?; let worklet_node = AudioWorkletNode::new(audio_context, "rust_mic_worklet")?; let error: Closure = Closure::new(|e| console::error_1(&e)); let mut download_buffer = std::cell::RefCell::new(Vec::new()); // This knows what MediaStreamTrackGenerator to use as it closes around it let mut sequence_num = 0; let output: Closure = Closure::new(move |audio_data: EncodedAudioChunk| { let mut array = vec![0u8; audio_data.byte_length() as usize]; audio_data.copy_to_with_u8_array(&mut array); download_buffer.borrow_mut().push(array.clone()); if download_buffer.borrow().len() > 200 { //download_data(download_buffer.borrow().to_vec(), "download_buffer.opus"); //download_data( // ass::encode(download_buffer.borrow().to_vec(), 960, 0), // "download_buffer.opus", //); download_buffer.borrow_mut().clear(); } let _ = packets.unbounded_send(ControlPacket::UDPTunnel(Box::new(VoicePacket::Audio { _dst: std::marker::PhantomData, target: 0, session_id: (), seq_num: sequence_num, payload: VoicePacketPayload::Opus(array.into(), false), position_info: None, }))); sequence_num = sequence_num.wrapping_add(2); }); let audio_encoder = AudioEncoder::new(&AudioEncoderInit::new( error.as_ref().unchecked_ref(), output.as_ref().unchecked_ref(), )) .unwrap(); // This is required to prevent these from being deallocated error.forget(); output.forget(); let encoder_config = AudioEncoderConfig::new("opus"); encoder_config.set_number_of_channels(1); encoder_config.set_sample_rate(48000); encoder_config.set_bitrate(72_000.0); audio_encoder.configure(&encoder_config); console::log_1(&"Created Audio Encoder".into()); let mut download_buffer = std::cell::RefCell::new(Vec::new()); let onmessage: Closure = Closure::new(move |event: MessageEvent| { match AudioData::new(event.data().unchecked_ref()) { Ok(data) => { let x = web_sys::AudioDataCopyToOptions::new(0); x.set_format(web_sys::AudioSampleFormat::F32); let mut sub_buffer = vec![0; data.allocation_size(&x).unwrap() as usize]; data.copy_to_with_u8_array(&mut sub_buffer, &x); download_buffer.borrow_mut().append(&mut sub_buffer); if download_buffer.borrow().len() > 48000 * 10 * 4 { //pub fn download_data(data: Vec, filename: &str) -> Result<(), JsValue> { //download_data(download_buffer.borrow().to_vec(), "download_buffer.pcm32"); download_buffer.borrow_mut().clear(); } audio_encoder.encode(&data); } Err(err) => { console::error_1(&err); console::debug_1(&event); } } }); Reflect::set( &Reflect::get(&worklet_node, &"port".into())?, &"onmessage".into(), onmessage.as_ref(), )?; onmessage.forget(); source.connect_with_audio_node(&worklet_node)?; worklet_node.connect_with_audio_node(&audio_context.destination())?; Ok(worklet_node) } fn create_decoder(audio_context: &AudioContext) -> Result { let audio_stream_generator = MediaStreamTrackGenerator::new(&MediaStreamTrackGeneratorInit::new("audio"))?; // Create MediaStream from MediaStreamTrackGenerator let js_tracks = web_sys::js_sys::Array::new(); js_tracks.push(&audio_stream_generator); let media_stream = MediaStream::new_with_tracks(&js_tracks)?; // Create MediaStreamAudioSourceNode let audio_source = audio_context.create_media_stream_source(&media_stream)?; // Connect output of audio_source to audio_context (browser audio) audio_source.connect_with_audio_node(&audio_context.destination())?; // Create callback functions for AudioDecoder let error = Closure::wrap(Box::new(move |e: JsValue| { console::error_1(&e); }) as Box); // This knows what MediaStreamTrackGenerator to use as it closes around it let output = Closure::wrap(Box::new(move |audio_data: AudioData| { let writable = audio_stream_generator.writable(); if writable.locked() { return; } if let Err(e) = writable.get_writer().map(|writer| { spawn(async move { if let Err(e) = JsFuture::from(writer.ready()).await { console::error_1(&format!("write chunk ready error {:?}", e).into()); } if let Err(e) = JsFuture::from(writer.write_with_chunk(&audio_data)).await { console::error_1(&format!("write chunk error {:?}", e).into()); }; writer.release_lock(); }); }) { console::error_1(&e); } }) as Box); let audio_decoder = AudioDecoder::new(&AudioDecoderInit::new( error.as_ref().unchecked_ref(), output.as_ref().unchecked_ref(), ))?; audio_decoder.configure(&AudioDecoderConfig::new("opus", 1, 48000)); console::log_1(&"Created Audio Decoder".into()); // This is required to prevent these from being deallocated error.forget(); output.forget(); Ok(audio_decoder) } pub async fn network_entrypoint(mut event_rx: UnboundedReceiver) { loop { let Some(Command::Connect { address, username, hash, }) = event_rx.next().await else { panic!("Did not receive connect command") }; if let Err(error) = network_connect(address, username, &mut event_rx).await { console::error_1(&error); } } } macro_rules! bail { ($($x:tt)*) => { return Err(wasm_bindgen::JsError::new(&format!($($x)*)).into()); }; } async fn network_connect( address: String, username: String, event_rx: &mut UnboundedReceiver, ) -> Result<(), JsValue> { *STATE.server.write() = Default::default(); *STATE.status.write() = ConnectionState::Connecting; console::log_1(&"Rust via WASM!".into()); let Ok(server_hash): Result, _> = include_str!("../server_hash.txt") .trim() .trim_matches(&['[', ']']) .split(',') .map(|x| x.trim().parse()) .collect() else { bail!("could not parse server hash"); }; let hash = web_sys::js_sys::Uint8Array::from(server_hash.as_slice()); let object = web_sys::js_sys::Object::new(); Reflect::set( &object, &JsValue::from_str("algorithm"), &JsValue::from_str("sha-256"), )?; web_sys::js_sys::Reflect::set(&object, &"value".into(), &hash)?; let array = web_sys::js_sys::Array::new(); array.push(&object); console::log_1(&object.clone().into()); console::log_1(&"Created option object!".into()); let mut options = WebTransportOptions::new(); options.server_certificate_hashes(&array); console::log_1(&"Created WebTransportOptions!".into()); let transport = WebTransport::new_with_options(&address, &options)?; console::log_1(&"Created WebTransport connection object.".into()); console::log_1(&transport.clone().into()); if let Err(e) = wasm_bindgen_futures::JsFuture::from(transport.ready()).await { console::log_1(&e.into()); panic!(); } console::log_1(&"Transport is ready.".into()); let stream: WebTransportBidirectionalStream = wasm_bindgen_futures::JsFuture::from(transport.create_bidirectional_stream()) .await? .into(); let wasm_stream_readable = wasm_streams::ReadableStream::from_raw(stream.readable().into()); let wasm_stream_writable = wasm_streams::WritableStream::from_raw(stream.writable().into()); let read_codec = ClientControlCodec::new(); let write_codec = ClientControlCodec::new(); let mut reader = asynchronous_codec::FramedRead::new(wasm_stream_readable.into_async_read(), read_codec); let mut writer = asynchronous_codec::FramedWrite::new(wasm_stream_writable.into_async_write(), write_codec); let (mut send_chan, mut writer_recv_chan) = futures_channel::mpsc::unbounded(); // Spawn worker to send packets. spawn(async move { while let Some(msg) = writer_recv_chan.next().await { if !matches!(msg, ControlPacket::Ping(_) | ControlPacket::UDPTunnel(_)) { console::log_1(&format!("sending {:#?}", msg).into()); } if let Err(e) = writer.send(msg).await { console::error_1(&e.to_string().into()); break; } } }); // Get version packet let version = match reader.next().await { Some(Ok(v)) => v, Some(Err(err)) => bail!("bad version packet: {err:?}"), None => bail!("no version was recieved"), }; console::log_1(&"Got version packet".into()); console::log_1(&format!("{:#?}", version).into()); // Send version packet let mut msg = msgs::Version::new(); msg.set_version(0x000010204); msg.set_release(format!("{} {}", "mumbleweb2", "6.9.0")); //msg.set_os("Chrome".to_string()); send_chan.send(msg.into()).await.unwrap(); // Send authenticate packet let mut msg = msgs::Authenticate::new(); msg.set_username(username); msg.set_opus(true); send_chan.send(msg.into()).await.unwrap(); // Spawn worker to send pings { let mut send_chan = send_chan.clone(); spawn(async move { loop { if let Err(e) = send_chan.send(msgs::Ping::new().into()).await { console::log_1(&"could not ping".into()); console::log_1(&e.to_string().into()); break; } TimeoutFuture::new(3000).await; } }); } // Create MediaStreams to playback decoded audio // The audio context is used to reproduce audio. let audio_context = configure_audio_context(); let audio_context_worklet = audio_context.clone(); let packet_sender_worklet = send_chan.clone(); spawn(async move { match create_encoder_worklet(&audio_context_worklet, packet_sender_worklet).await { Ok(node) => console::log_2(&"Created audio worklet:".into(), &node), Err(err) => console::error_1(&err), } }); // Create map of session_id -> AudioDecoder let mut decoder_map = HashMap::new(); let mut reader_future = reader.next().fuse(); let mut command_future = event_rx.next(); loop { select! { packet = reader_future => { reader_future = reader.next().fuse(); match packet { Some(Ok(msg)) => { let res =accept_packet(msg, &audio_context, &mut decoder_map); if let Err(err) = res { console::error_1(&err.into()); } }, Some(Err(err)) => console::error_1(&err.to_string().into()), None => break, } } command = command_future => { command_future = event_rx.next(); if let Some(command) = &command { console::log_1(&format!("commanding {:#?}", command).into()); } match command { Some(Command::Disconnect) => break, Some(command) => { let res = accept_command(command, &mut send_chan); if let Err(err) = res { console::error_1(&err.into()); } } None => continue, } } } } let _ = send_chan.close(); *STATE.status.write() = ConnectionState::Disconnected; Ok(()) } fn accept_command( command: Command, send_chan: &mut UnboundedSender>, ) -> Result<(), JsValue> { match command { Command::SendChat { markdown, channels } => { use markdown::*; let blocks = tokenize(&markdown); let html_text = match blocks.as_slice() { [Block::Paragraph(par)] => match par.as_slice() { [Span::Text(text)] => text.to_string(), _ => to_html(&markdown) .trim() .strip_prefix("

") .unwrap() .strip_suffix("

") .unwrap() .to_string(), }, _ => to_html(&markdown).trim().to_string(), }; { let mut server = STATE.server.write(); let Some(me) = server.session else { bail!("not signed in with a session id") }; server.chat.push(Chat { raw: markdown, dangerous_html: html_text.clone(), sender: Some(me), }) } let mut u = msgs::TextMessage::new(); u.set_message(html_text.to_string()); u.set_channel_id(channels); let _ = send_chan.unbounded_send(u.into()); } _ => (), } Ok(()) } fn accept_packet( msg: ControlPacket, audio_context: &AudioContext, decoder_map: &mut HashMap, ) -> Result<(), JsValue> { if !matches!(msg, ControlPacket::UDPTunnel(_) | ControlPacket::Ping(_)) { console::log_1(&format!("receiving {:#?}", msg).into()); } match msg { ControlPacket::UDPTunnel(u) => { match *u.clone() { mumble_protocol::voice::VoicePacket::Audio { _dst, target, session_id, seq_num, payload, position_info, } => { // Get or create audio decoder for this user let audio_decoder = match decoder_map.entry(session_id) { Entry::Occupied(occupied_entry) => occupied_entry.into_mut(), Entry::Vacant(vacant_entry) => { vacant_entry.insert(create_decoder(&audio_context)?) } }; // This will over time (as users join and leave) leak // AudioDecoders, MediaStreamTrackGenerators, MediaStreams, and MediaStreamAudioSourceNodes. // A better way to handle this would be to delete and create all the audio // infra on channel join and update it as new users join the channel, dropping // any audio packets that come in the meantime. if let VoicePacketPayload::Opus(audio_payload, end_bit) = payload { let js_audio_payload = Uint8Array::from(audio_payload.as_ref()); let _ = audio_decoder.decode( &EncodedAudioChunk::new(&EncodedAudioChunkInit::new( &js_audio_payload.into(), 0.0, EncodedAudioChunkType::Key, )) .unwrap(), ); //console::log_1(&"Oueued audio chunk for decoding".into()); } } _ => { unreachable!("TCP tunnels UDP packets should not contain pings"); // I think? } } } ControlPacket::ChannelState(u) => { let mut server = STATE.server.write(); let id = u.get_channel_id(); let state = server.channels.entry(id).or_default(); let new_parent = if u.has_parent() { if let Some(parent) = state.parent.and_then(|p| server.channels.get_mut(&p)) { parent.children.remove(&id); } let parent_id = u.get_parent(); let parent = server.channels.entry(parent_id).or_default(); if u.has_position() && u.get_position() as usize <= parent.children.len() { // TODO: what if positions are received out of order? we need to sort afterwards? parent.children.insert_before(u.get_position() as usize, id); } else { parent.children.insert(id); } Some(parent_id) } else { None }; let state = server.channels.entry(id).or_default(); state.parent = new_parent; if u.has_name() { state.name = u.get_name().to_string(); } } ControlPacket::ChannelRemove(u) => { let mut server = STATE.server.write(); let id = u.get_channel_id(); if let Some(channel) = server.channels.remove(&id) { if let Some(parent) = channel.parent.and_then(|p| server.channels.get_mut(&p)) { parent.children.remove(&id); } } } ControlPacket::UserState(u) => { let mut server = STATE.server.write(); let server = &mut *server; let id = u.get_session(); let state = server.users.entry(id).or_default(); if u.has_channel_id() { if let Some(parent) = server.channels.get_mut(&state.channel) { parent.users.remove(&id); } let channel_id = u.get_channel_id(); server .channels .entry(channel_id) .or_default() .users .insert(id); state.channel = channel_id; } if u.has_name() { state.name = u.get_name().to_string(); } } ControlPacket::UserRemove(u) => { let mut server = STATE.server.write(); let id = u.get_session(); if let Some(state) = server.users.remove(&id) { if let Some(parent) = server.channels.get_mut(&state.channel) { parent.users.remove(&id); } } } ControlPacket::TextMessage(u) => { let mut server = STATE.server.write(); if u.has_message() { let text = u.get_message().to_string(); server.chat.push(Chat { sender: if u.has_actor() { Some(u.get_actor()) } else { None }, dangerous_html: html_purifier::purifier(&text, Default::default()), raw: text, }); } } ControlPacket::ServerSync(u) => { *STATE.status.write() = ConnectionState::Connected; let mut server = STATE.server.write(); if u.has_welcome_text() { let text = u.get_welcome_text().to_string(); server.chat.push(Chat { sender: None, dangerous_html: html_purifier::purifier(&text, Default::default()), raw: text, }); } if u.has_session() { server.session = Some(u.get_session()); } } _ => {} } Ok(()) }