Files
mumble-web2/client/src/mainloop.rs
T

443 lines
16 KiB
Rust

use crate::msghtml::process_message_html;
use crate::AudioSettings;
use crate::Chat;
use crate::Command;
use crate::ConnectionState;
use asynchronous_codec::FramedRead;
use asynchronous_codec::FramedWrite;
use color_eyre::eyre::{bail, Error};
use dioxus_signals::ReadableExt as _;
use dioxus_signals::WritableExt as _;
use futures::select;
use futures::AsyncRead;
use futures::AsyncWrite;
use futures::FutureExt as _;
use futures::SinkExt as _;
use futures::StreamExt as _;
use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
use mumble_protocol::control::msgs;
use mumble_protocol::control::ControlCodec;
use mumble_protocol::control::ControlPacket;
use mumble_protocol::voice::VoicePacket;
use mumble_protocol::voice::VoicePacketPayload;
use mumble_protocol::Clientbound;
use mumble_protocol::Serverbound;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::time::Duration;
use tracing::error;
use tracing::info;
use crate::app::SharedState;
use crate::app::State;
use crate::effects::AudioProcessor;
use crate::imp::{
spawn, AudioPlayer, AudioPlayerInterface as _, AudioSystem, AudioSystemInterface as _,
Platform, PlatformInterface as _,
};
pub async fn network_entrypoint(mut event_rx: UnboundedReceiver<Command>, state: SharedState) {
loop {
let Some(Command::Connect {
target,
username,
config,
}) = event_rx.next().await
else {
panic!("did not receive connect command")
};
*state.server.write_unchecked() = Default::default();
*state.status.write_unchecked() = ConnectionState::Connecting;
if let Err(error) =
Platform::network_connect(target, username, &mut event_rx, &config, state.clone())
.await
{
error!("could not connect {:?}", error);
*state.status.write_unchecked() = ConnectionState::Failed(error.to_string());
} else {
*state.status.write_unchecked() = ConnectionState::Disconnected;
}
}
}
pub(crate) async fn sender_loop<W: AsyncWrite + Unpin + 'static>(
mut outgoing: UnboundedReceiver<ControlPacket<Serverbound>>,
mut writer: FramedWrite<W, ControlCodec<Serverbound, Clientbound>>,
) {
while let Some(msg) = outgoing.next().await {
if !matches!(msg, ControlPacket::Ping(_) | ControlPacket::UDPTunnel(_)) {
info!("sending packet {:#?}", msg);
}
if let Err(e) = writer.send(msg).await {
error!("error sending packet {:?}", e);
break;
}
}
}
pub(crate) async fn network_loop<R: AsyncRead + Unpin + 'static>(
username: String,
state: SharedState,
event_rx: &mut UnboundedReceiver<Command>,
mut outgoing: UnboundedSender<ControlPacket<Serverbound>>,
mut reader: FramedRead<R, ControlCodec<Serverbound, Clientbound>>,
) -> Result<(), Error> {
let audio_settings = state.audio.read().clone();
// Get version packet
let version = match reader.next().await {
Some(Ok(v)) => v,
Some(Err(err)) => bail!("bad version packet: {err:?}"),
None => bail!("no version was recieved"),
};
info!("got version packet {:#?}", version);
// Send version packet
let mut msg = msgs::Version::new();
msg.set_version(0x000010204);
msg.set_release(format!("{} {}", "mumbleweb2", "6.9.0"));
//msg.set_os("Chrome".to_string());
outgoing.send(msg.into()).await.unwrap();
// Send authenticate packet
let mut msg = msgs::Authenticate::new();
msg.set_username(username);
msg.set_opus(true);
outgoing.send(msg.into()).await.unwrap();
// Spawn worker to send pings
{
let mut send_chan = outgoing.clone();
spawn(async move {
loop {
if let Err(_) = send_chan.send(msgs::Ping::new().into()).await {
break;
}
Platform::sleep(Duration::from_millis(3000)).await;
}
});
}
let mut audio = AudioSystem::new().await?;
audio.set_processor(AudioProcessor::new(audio_settings.denoise));
{
let send_chan = outgoing.clone();
let mut sequence_num = 0;
if let Err(err) = audio.start_recording(move |opus_frame, is_terminator| {
let _ =
send_chan.unbounded_send(ControlPacket::UDPTunnel(Box::new(VoicePacket::Audio {
_dst: std::marker::PhantomData,
target: 0,
session_id: (),
seq_num: sequence_num,
payload: VoicePacketPayload::Opus(opus_frame.into(), is_terminator),
position_info: None,
})));
sequence_num = sequence_num.wrapping_add(2);
}) {
error!("could not begin recording: {err:?}")
}
}
// Create map of session_id -> AudioDecoder
let mut decoder_map = HashMap::new();
let mut reader_future = reader.next().fuse();
let mut command_future = event_rx.next();
loop {
select! {
packet = reader_future => {
reader_future = reader.next().fuse();
match packet {
Some(Ok(msg)) => {
if !matches!(msg, ControlPacket::UDPTunnel(_) | ControlPacket::Ping(_)) {
info!("receiving packet {:#?}", msg);
}
let res = accept_packet(msg, &mut audio, &mut decoder_map, &state);
if let Err(err) = res {
error!("error accepting packet {:?}", err)
}
},
Some(Err(err)) => {
error!("error receiving packet {:?}", err)
},
None => break,
}
}
command = command_future => {
command_future = event_rx.next();
if let Some(command) = &command {
info!("issuing command {:#?}", command);
}
match command {
Some(Command::Disconnect) => break,
Some(command) => {
let res = accept_command(command, &mut outgoing, &mut audio, &state);
if let Err(err) = res {
info!("error accepting command {:?}", err)
}
}
None => continue,
}
}
}
}
let _ = outgoing.close();
Ok(())
}
fn accept_command(
command: Command,
send_chan: &mut UnboundedSender<ControlPacket<mumble_protocol::Serverbound>>,
audio: &mut AudioSystem,
state: &State,
) -> Result<(), Error> {
use Command::*;
let Some(session) = state.server.read().session else {
bail!("no session id")
};
match command {
SendChat { markdown, channels } => {
use markdown::*;
let blocks = tokenize(&markdown);
let html_text = match blocks.as_slice() {
[Block::Paragraph(par)] => match par.as_slice() {
[Span::Text(text)] => text.to_string(),
_ => to_html(&markdown)
.trim()
.strip_prefix("<p>")
.unwrap()
.strip_suffix("</p>")
.unwrap()
.to_string(),
},
_ => to_html(&markdown).trim().to_string(),
};
{
let mut server = state.server.write_unchecked();
let Some(me) = server.session else {
bail!("not signed in with a session id")
};
server.chat.push(Chat {
raw: markdown,
dangerous_html: html_text.clone(),
sender: Some(me),
})
}
let mut u = msgs::TextMessage::new();
u.set_message(html_text.to_string());
u.set_channel_id(channels);
let _ = send_chan.unbounded_send(u.into());
}
SendFile {
ref bytes,
name,
mime,
channels,
} => {
use base64::{display::Base64Display, prelude::BASE64_STANDARD};
let html = match mime {
Some(mime) if mime.type_() == "image" => format!(
"<img src=\"data:{};base64,{}\" />",
mime,
Base64Display::new(bytes, &BASE64_STANDARD)
),
Some(mime) => format!(
"<a href=\"data:{};base64,{}\" download>{name}</a>",
mime,
Base64Display::new(bytes, &BASE64_STANDARD)
),
None => format!(
"<a href=\"data:application/octet-stream;base64,{}\" download>{name}</a>",
Base64Display::new(bytes, &BASE64_STANDARD)
),
};
{
let mut server = state.server.write_unchecked();
let Some(me) = server.session else {
bail!("not signed in with a session id")
};
server.chat.push(Chat {
raw: "".to_string(),
dangerous_html: html.clone(),
sender: Some(me),
})
}
let mut u = msgs::TextMessage::new();
u.set_message(html);
u.set_channel_id(channels);
let _ = send_chan.unbounded_send(u.into());
}
SetMute { mute } => {
let mut u = msgs::UserState::new();
u.set_session(session);
u.set_self_mute(mute);
let _ = send_chan.unbounded_send(u.into());
}
SetDeaf { deaf } => {
let mut u = msgs::UserState::new();
u.set_session(session);
u.set_self_deaf(deaf);
let _ = send_chan.unbounded_send(u.into());
}
EnterChannel { channel, user } => {
let mut u = msgs::UserState::new();
u.set_session(user);
u.set_channel_id(channel);
let _ = send_chan.unbounded_send(u.into());
}
Connect { .. } | Disconnect => (),
UpdateAudioSettings(AudioSettings { denoise }) => {
audio.set_processor(AudioProcessor::new(denoise));
}
}
Ok(())
}
fn accept_packet(
msg: ControlPacket<mumble_protocol::Clientbound>,
audio_context: &mut AudioSystem,
player_map: &mut HashMap<u32, AudioPlayer>,
state: &State,
) -> Result<(), Error> {
match msg {
ControlPacket::UDPTunnel(u) => {
match *u.clone() {
mumble_protocol::voice::VoicePacket::Audio {
_dst,
target,
session_id,
seq_num,
payload,
position_info,
} => {
// Get or create audio decoder for this user
let audio_player = match player_map.entry(session_id) {
Entry::Occupied(occupied_entry) => occupied_entry.into_mut(),
Entry::Vacant(vacant_entry) => {
vacant_entry.insert(audio_context.create_player()?)
}
};
// This will over time (as users join and leave) leak
// AudioDecoders, MediaStreamTrackGenerators, MediaStreams, and MediaStreamAudioSourceNodes.
// A better way to handle this would be to delete and create all the audio
// infra on channel join and update it as new users join the channel, dropping
// any audio packets that come in the meantime.
if let VoicePacketPayload::Opus(audio_payload, end_bit) = payload {
audio_player.play_opus(audio_payload.as_ref());
//console::log_1(&"Oueued audio chunk for decoding".into());
}
}
_ => {
unreachable!("TCP tunnels UDP packets should not contain pings");
// I think?
}
}
}
ControlPacket::ChannelState(u) => {
let mut server = state.server.write_unchecked();
server.channels_state.update_from_channel_state(&u);
}
ControlPacket::ChannelRemove(u) => {
let mut server = state.server.write_unchecked();
server.channels_state.update_from_channel_remove(&u);
}
ControlPacket::UserState(u) => {
let mut server = state.server.write_unchecked();
let server = &mut *server;
let id = u.get_session();
let state_entry = server.users.entry(id);
let new = matches!(state_entry, std::collections::hash_map::Entry::Vacant(_));
let state = state_entry.or_default();
// the server might now send a channel_id if the user is in channel=0
if u.has_channel_id() || new {
if let Some(parent) = server.channels_state.channels.get_mut(&state.channel) {
parent.users.remove(&id);
}
let channel_id = u.get_channel_id();
server
.channels_state
.channels
.entry(channel_id)
.or_default()
.users
.insert(id);
state.channel = channel_id;
}
if u.has_name() {
state.name = u.get_name().to_string();
}
if u.has_mute() {
state.mute = u.get_mute();
}
if u.has_deaf() {
state.deaf = u.get_deaf();
}
if u.has_suppress() {
state.suppress = u.get_suppress();
}
if u.has_self_mute() {
state.self_mute = u.get_self_mute();
}
if u.has_self_deaf() {
state.self_deaf = u.get_self_deaf();
}
}
ControlPacket::UserRemove(u) => {
let mut server = state.server.write_unchecked();
let id = u.get_session();
if let Some(state) = server.users.remove(&id) {
if let Some(parent) = server.channels_state.channels.get_mut(&state.channel) {
parent.users.remove(&id);
}
}
}
ControlPacket::TextMessage(u) => {
let mut server = state.server.write_unchecked();
if u.has_message() {
let text = u.get_message().to_string();
server.chat.push(Chat {
sender: if u.has_actor() {
Some(u.get_actor())
} else {
None
},
dangerous_html: process_message_html(&text),
raw: text,
});
}
}
ControlPacket::ServerSync(u) => {
*state.status.write_unchecked() = ConnectionState::Connected;
let mut server = state.server.write_unchecked();
if u.has_welcome_text() {
let text = u.get_welcome_text().to_string();
server.chat.push(Chat {
sender: None,
dangerous_html: process_message_html(&text),
raw: text,
});
}
if u.has_session() {
server.session = Some(u.get_session());
}
}
_ => {}
}
Ok(())
}