use crate::msghtml::process_message_html; use crate::AudioSettings; use crate::Chat; use crate::Command; use crate::ConnectionState; use asynchronous_codec::FramedRead; use asynchronous_codec::FramedWrite; use color_eyre::eyre::{bail, Error}; use dioxus_signals::ReadableExt as _; use dioxus_signals::WritableExt as _; use futures::select; use futures::AsyncRead; use futures::AsyncWrite; use futures::FutureExt as _; use futures::SinkExt as _; use futures::StreamExt as _; use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender}; use mumble_protocol::control::msgs; use mumble_protocol::control::ControlCodec; use mumble_protocol::control::ControlPacket; use mumble_protocol::voice::VoicePacket; use mumble_protocol::voice::VoicePacketPayload; use mumble_protocol::Clientbound; use mumble_protocol::Serverbound; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::time::Duration; use tracing::error; use tracing::info; use crate::app::SharedState; use crate::app::State; use crate::effects::AudioProcessor; use crate::imp::{ spawn, AudioPlayer, AudioPlayerInterface as _, AudioSystem, AudioSystemInterface as _, Platform, PlatformInterface as _, }; pub async fn network_entrypoint(mut event_rx: UnboundedReceiver, state: SharedState) { loop { let Some(Command::Connect { target, username, config, }) = event_rx.next().await else { panic!("did not receive connect command") }; *state.server.write_unchecked() = Default::default(); *state.status.write_unchecked() = ConnectionState::Connecting; if let Err(error) = Platform::network_connect(target, username, &mut event_rx, &config, state.clone()) .await { error!("could not connect {:?}", error); *state.status.write_unchecked() = ConnectionState::Failed(error.to_string()); } else { *state.status.write_unchecked() = ConnectionState::Disconnected; } } } pub(crate) async fn sender_loop( mut outgoing: UnboundedReceiver>, mut writer: FramedWrite>, ) { while let Some(msg) = outgoing.next().await { if !matches!(msg, ControlPacket::Ping(_) | ControlPacket::UDPTunnel(_)) { info!("sending packet {:#?}", msg); } if let Err(e) = writer.send(msg).await { error!("error sending packet {:?}", e); break; } } } pub(crate) async fn network_loop( username: String, state: SharedState, event_rx: &mut UnboundedReceiver, mut outgoing: UnboundedSender>, mut reader: FramedRead>, ) -> Result<(), Error> { let audio_settings = state.audio.read().clone(); // Get version packet let version = match reader.next().await { Some(Ok(v)) => v, Some(Err(err)) => bail!("bad version packet: {err:?}"), None => bail!("no version was recieved"), }; info!("got version packet {:#?}", version); // Send version packet let mut msg = msgs::Version::new(); msg.set_version(0x000010204); msg.set_release(format!("{} {}", "mumbleweb2", "6.9.0")); //msg.set_os("Chrome".to_string()); outgoing.send(msg.into()).await.unwrap(); // Send authenticate packet let mut msg = msgs::Authenticate::new(); msg.set_username(username); msg.set_opus(true); outgoing.send(msg.into()).await.unwrap(); // Spawn worker to send pings { let mut send_chan = outgoing.clone(); spawn(async move { loop { if let Err(_) = send_chan.send(msgs::Ping::new().into()).await { break; } Platform::sleep(Duration::from_millis(3000)).await; } }); } let mut audio = AudioSystem::new().await?; audio.set_processor(AudioProcessor::new(audio_settings.denoise)); { let send_chan = outgoing.clone(); let mut sequence_num = 0; if let Err(err) = audio.start_recording(move |opus_frame, is_terminator| { let _ = send_chan.unbounded_send(ControlPacket::UDPTunnel(Box::new(VoicePacket::Audio { _dst: std::marker::PhantomData, target: 0, session_id: (), seq_num: sequence_num, payload: VoicePacketPayload::Opus(opus_frame.into(), is_terminator), position_info: None, }))); sequence_num = sequence_num.wrapping_add(2); }) { error!("could not begin recording: {err:?}") } } // Create map of session_id -> AudioDecoder let mut decoder_map = HashMap::new(); let mut reader_future = reader.next().fuse(); let mut command_future = event_rx.next(); loop { select! { packet = reader_future => { reader_future = reader.next().fuse(); match packet { Some(Ok(msg)) => { if !matches!(msg, ControlPacket::UDPTunnel(_) | ControlPacket::Ping(_)) { info!("receiving packet {:#?}", msg); } let res = accept_packet(msg, &mut audio, &mut decoder_map, &state); if let Err(err) = res { error!("error accepting packet {:?}", err) } }, Some(Err(err)) => { error!("error receiving packet {:?}", err) }, None => break, } } command = command_future => { command_future = event_rx.next(); if let Some(command) = &command { info!("issuing command {:#?}", command); } match command { Some(Command::Disconnect) => break, Some(command) => { let res = accept_command(command, &mut outgoing, &mut audio, &state); if let Err(err) = res { info!("error accepting command {:?}", err) } } None => continue, } } } } let _ = outgoing.close(); Ok(()) } fn accept_command( command: Command, send_chan: &mut UnboundedSender>, audio: &mut AudioSystem, state: &State, ) -> Result<(), Error> { use Command::*; let Some(session) = state.server.read().session else { bail!("no session id") }; match command { SendChat { markdown, channels } => { use markdown::*; let blocks = tokenize(&markdown); let html_text = match blocks.as_slice() { [Block::Paragraph(par)] => match par.as_slice() { [Span::Text(text)] => text.to_string(), _ => to_html(&markdown) .trim() .strip_prefix("

") .unwrap() .strip_suffix("

") .unwrap() .to_string(), }, _ => to_html(&markdown).trim().to_string(), }; { let mut server = state.server.write_unchecked(); let Some(me) = server.session else { bail!("not signed in with a session id") }; server.chat.push(Chat { raw: markdown, dangerous_html: html_text.clone(), sender: Some(me), }) } let mut u = msgs::TextMessage::new(); u.set_message(html_text.to_string()); u.set_channel_id(channels); let _ = send_chan.unbounded_send(u.into()); } SendFile { ref bytes, name, mime, channels, } => { use base64::{display::Base64Display, prelude::BASE64_STANDARD}; let html = match mime { Some(mime) if mime.type_() == "image" => format!( "", mime, Base64Display::new(bytes, &BASE64_STANDARD) ), Some(mime) => format!( "{name}", mime, Base64Display::new(bytes, &BASE64_STANDARD) ), None => format!( "{name}", Base64Display::new(bytes, &BASE64_STANDARD) ), }; { let mut server = state.server.write_unchecked(); let Some(me) = server.session else { bail!("not signed in with a session id") }; server.chat.push(Chat { raw: "".to_string(), dangerous_html: html.clone(), sender: Some(me), }) } let mut u = msgs::TextMessage::new(); u.set_message(html); u.set_channel_id(channels); let _ = send_chan.unbounded_send(u.into()); } SetMute { mute } => { let mut u = msgs::UserState::new(); u.set_session(session); u.set_self_mute(mute); let _ = send_chan.unbounded_send(u.into()); } SetDeaf { deaf } => { let mut u = msgs::UserState::new(); u.set_session(session); u.set_self_deaf(deaf); let _ = send_chan.unbounded_send(u.into()); } EnterChannel { channel, user } => { let mut u = msgs::UserState::new(); u.set_session(user); u.set_channel_id(channel); let _ = send_chan.unbounded_send(u.into()); } Connect { .. } | Disconnect => (), UpdateAudioSettings(AudioSettings { denoise }) => { audio.set_processor(AudioProcessor::new(denoise)); } } Ok(()) } fn accept_packet( msg: ControlPacket, audio_context: &mut AudioSystem, player_map: &mut HashMap, state: &State, ) -> Result<(), Error> { match msg { ControlPacket::UDPTunnel(u) => { match *u.clone() { mumble_protocol::voice::VoicePacket::Audio { _dst, target, session_id, seq_num, payload, position_info, } => { // Get or create audio decoder for this user let audio_player = match player_map.entry(session_id) { Entry::Occupied(occupied_entry) => occupied_entry.into_mut(), Entry::Vacant(vacant_entry) => { vacant_entry.insert(audio_context.create_player()?) } }; // This will over time (as users join and leave) leak // AudioDecoders, MediaStreamTrackGenerators, MediaStreams, and MediaStreamAudioSourceNodes. // A better way to handle this would be to delete and create all the audio // infra on channel join and update it as new users join the channel, dropping // any audio packets that come in the meantime. if let VoicePacketPayload::Opus(audio_payload, end_bit) = payload { audio_player.play_opus(audio_payload.as_ref()); //console::log_1(&"Oueued audio chunk for decoding".into()); } } _ => { unreachable!("TCP tunnels UDP packets should not contain pings"); // I think? } } } ControlPacket::ChannelState(u) => { let mut server = state.server.write_unchecked(); server.channels_state.update_from_channel_state(&u); } ControlPacket::ChannelRemove(u) => { let mut server = state.server.write_unchecked(); server.channels_state.update_from_channel_remove(&u); } ControlPacket::UserState(u) => { let mut server = state.server.write_unchecked(); let server = &mut *server; let id = u.get_session(); let state_entry = server.users.entry(id); let new = matches!(state_entry, std::collections::hash_map::Entry::Vacant(_)); let state = state_entry.or_default(); // the server might now send a channel_id if the user is in channel=0 if u.has_channel_id() || new { if let Some(parent) = server.channels_state.channels.get_mut(&state.channel) { parent.users.remove(&id); } let channel_id = u.get_channel_id(); server .channels_state .channels .entry(channel_id) .or_default() .users .insert(id); state.channel = channel_id; } if u.has_name() { state.name = u.get_name().to_string(); } if u.has_mute() { state.mute = u.get_mute(); } if u.has_deaf() { state.deaf = u.get_deaf(); } if u.has_suppress() { state.suppress = u.get_suppress(); } if u.has_self_mute() { state.self_mute = u.get_self_mute(); } if u.has_self_deaf() { state.self_deaf = u.get_self_deaf(); } } ControlPacket::UserRemove(u) => { let mut server = state.server.write_unchecked(); let id = u.get_session(); if let Some(state) = server.users.remove(&id) { if let Some(parent) = server.channels_state.channels.get_mut(&state.channel) { parent.users.remove(&id); } } } ControlPacket::TextMessage(u) => { let mut server = state.server.write_unchecked(); if u.has_message() { let text = u.get_message().to_string(); server.chat.push(Chat { sender: if u.has_actor() { Some(u.get_actor()) } else { None }, dangerous_html: process_message_html(&text), raw: text, }); } } ControlPacket::ServerSync(u) => { *state.status.write_unchecked() = ConnectionState::Connected; let mut server = state.server.write_unchecked(); if u.has_welcome_text() { let text = u.get_welcome_text().to_string(); server.chat.push(Chat { sender: None, dangerous_html: process_message_html(&text), raw: text, }); } if u.has_session() { server.session = Some(u.get_session()); } } _ => {} } Ok(()) }