some error handling improvements & start on chat send

This commit is contained in:
2024-09-28 14:42:29 -06:00
parent de17960335
commit 8f420e6efa
5 changed files with 355 additions and 498 deletions
+50 -7
View File
@@ -5,13 +5,31 @@ use std::collections::{BTreeMap, BTreeSet, HashMap};
use dioxus::prelude::*;
use ordermap::OrderSet;
use super::Command;
use super::Command::*;
use super::ConnectionState::{self, *};
pub type ChannelId = u32;
pub type UserId = u32;
pub enum ConnectionState {
Disconnected,
Connecting,
Connected,
}
pub enum Command {
Connect {
address: String,
username: String,
hash: String,
},
SendChat {
markdown: String,
channels: Vec<ChannelId>,
},
Disconnect,
}
use Command::*;
use ConnectionState::*;
#[derive(Default)]
pub struct ChannelState {
pub name: String,
@@ -27,7 +45,7 @@ pub struct UserState {
}
pub struct Chat {
pub text: String,
pub raw: String,
pub dangerous_html: String,
pub sender: Option<UserId>,
}
@@ -37,6 +55,13 @@ pub struct ServerState {
pub channels: HashMap<ChannelId, ChannelState>,
pub users: HashMap<UserId, UserState>,
pub chat: Vec<Chat>,
pub session: Option<UserId>,
}
impl ServerState {
pub fn this_user(&self) -> Option<&UserState> {
self.users.get(&self.session?)
}
}
pub struct State {
@@ -80,8 +105,10 @@ pub fn Channel(id: ChannelId) -> Element {
}
#[component]
pub fn ChatHistory() -> Element {
pub fn ChatView() -> Element {
let server = STATE.server.read();
let mut draft = use_signal(|| "".to_string());
//let net: Coroutine<Command> = use_coroutine_handle();
rsx!(
div {
style: "margin: 16px; padding: 16px; border: solid black 1px;",
@@ -98,6 +125,22 @@ pub fn ChatHistory() -> Element {
}
hr {}
}
input {
placeholder: "say something",
value: "{draft.read()}",
oninput: move |evt| draft.set(evt.value().clone()),
}
button {
onclick: move |_| {
/*if let Some(user) = server.this_user() {
net.send(SendChat {
markdown: draft.write().split_off(0),
channels: vec![user.channel],
});
}*/
},
"Send"
}
}
)
}
@@ -114,7 +157,7 @@ pub fn ServerView() -> Element {
}
}
}
ChatHistory {}
ChatView {}
)
}
+223 -205
View File
@@ -1,25 +1,27 @@
pub mod app;
use app::ChannelState;
use app::ChannelId;
use app::Chat;
use app::UserState;
use app::Command;
use app::ConnectionState;
use dioxus::prelude::*;
use anyhow::Error;
use app::STATE;
use async_std::channel::Sender;
use futures::select;
use futures::FutureExt;
use futures::SinkExt;
use futures::StreamExt;
use futures_channel::mpsc::UnboundedSender;
use gloo_timers::future::TimeoutFuture;
use manganis::{file, mg};
use markdown;
use mumble_protocol::control::ControlPacket;
use mumble_protocol::control::{msgs, ClientControlCodec};
use mumble_protocol::voice::VoicePacket;
use mumble_protocol::voice::VoicePacketDst;
use mumble_protocol::voice::VoicePacketPayload;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::time::Duration;
use std::fmt;
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::spawn_local as spawn;
use wasm_bindgen_futures::{future_to_promise, JsFuture};
@@ -47,11 +49,11 @@ use web_sys::MediaStreamTrackGenerator;
use web_sys::MediaStreamTrackGeneratorInit;
use web_sys::MessageEvent;
use web_sys::WebTransport;
use web_sys::WebTransportBidirectionalStream;
use web_sys::WebTransportOptions;
use web_sys::WorkletOptions;
mod ass {
use byteorder::{ByteOrder, LittleEndian};
use ogg::PacketWriter;
@@ -168,7 +170,7 @@ impl PromiseExt for Promise {
async fn create_encoder_worklet(
audio_context: &AudioContext,
packets: Sender<ControlPacket<mumble_protocol::Serverbound>>,
packets: UnboundedSender<ControlPacket<mumble_protocol::Serverbound>>,
) -> Result<AudioWorkletNode, JsValue> {
let stream = window()
.unwrap()
@@ -219,14 +221,15 @@ async fn create_encoder_worklet(
download_buffer.borrow_mut().clear();
}
let _ = packets.try_send(ControlPacket::UDPTunnel(Box::new(VoicePacket::Audio {
_dst: std::marker::PhantomData,
target: 0,
session_id: (),
seq_num: sequence_num,
payload: VoicePacketPayload::Opus(array.into(), false),
position_info: None,
})));
let _ =
packets.unbounded_send(ControlPacket::UDPTunnel(Box::new(VoicePacket::Audio {
_dst: std::marker::PhantomData,
target: 0,
session_id: (),
seq_num: sequence_num,
payload: VoicePacketPayload::Opus(array.into(), false),
position_info: None,
})));
sequence_num = sequence_num.wrapping_add(2);
});
@@ -284,27 +287,23 @@ async fn create_encoder_worklet(
Ok(worklet_node)
}
fn create_decoder(audio_context: &AudioContext) -> AudioDecoder {
fn create_decoder(audio_context: &AudioContext) -> Result<AudioDecoder, JsValue> {
let audio_stream_generator =
MediaStreamTrackGenerator::new(&MediaStreamTrackGeneratorInit::new("audio")).unwrap();
MediaStreamTrackGenerator::new(&MediaStreamTrackGeneratorInit::new("audio"))?;
// Create MediaStream from MediaStreamTrackGenerator
let js_tracks = web_sys::js_sys::Array::new();
js_tracks.push(&audio_stream_generator);
let media_stream = MediaStream::new_with_tracks(&js_tracks).unwrap();
let media_stream = MediaStream::new_with_tracks(&js_tracks)?;
// Create MediaStreamAudioSourceNode
let audio_source = audio_context
.create_media_stream_source(&media_stream)
.unwrap();
let audio_source = audio_context.create_media_stream_source(&media_stream)?;
// Connect output of audio_source to audio_context (browser audio)
audio_source
.connect_with_audio_node(&audio_context.destination())
.unwrap();
audio_source.connect_with_audio_node(&audio_context.destination())?;
// Create callback functions for AudioDecoder
let error = Closure::wrap(Box::new(move |e: JsValue| {
console::log_1(&e);
console::error_1(&e);
}) as Box<dyn FnMut(JsValue)>);
// This knows what MediaStreamTrackGenerator to use as it closes around it
@@ -316,23 +315,22 @@ fn create_decoder(audio_context: &AudioContext) -> AudioDecoder {
if let Err(e) = writable.get_writer().map(|writer| {
spawn(async move {
if let Err(e) = JsFuture::from(writer.ready()).await {
console::log_1(&format!("write chunk error {:?}", e).into());
console::error_1(&format!("write chunk ready error {:?}", e).into());
}
if let Err(e) = JsFuture::from(writer.write_with_chunk(&audio_data)).await {
console::log_1(&format!("write chunk error {:?}", e).into());
console::error_1(&format!("write chunk error {:?}", e).into());
};
writer.release_lock();
});
}) {
console::log_1(&e);
console::error_1(&e);
}
}) as Box<dyn FnMut(AudioData)>);
let audio_decoder = AudioDecoder::new(&AudioDecoderInit::new(
error.as_ref().unchecked_ref(),
output.as_ref().unchecked_ref(),
))
.unwrap();
))?;
audio_decoder.configure(&AudioDecoderConfig::new("opus", 1, 48000));
console::log_1(&"Created Audio Decoder".into());
@@ -341,22 +339,7 @@ fn create_decoder(audio_context: &AudioContext) -> AudioDecoder {
error.forget();
output.forget();
audio_decoder
}
pub enum ConnectionState {
Disconnected,
Connecting,
Connected,
}
pub enum Command {
Connect {
address: String,
username: String,
hash: String,
},
Disconnect,
Ok(audio_decoder)
}
pub async fn network_entrypoint(mut event_rx: UnboundedReceiver<Command>) {
@@ -370,180 +353,195 @@ pub async fn network_entrypoint(mut event_rx: UnboundedReceiver<Command>) {
panic!("Did not receive connect command")
};
*STATE.server.write() = Default::default();
*STATE.status.write() = ConnectionState::Connecting;
console::log_1(&"Rust via WASM!".into());
let Ok(server_hash): Result<Vec<u8>, _> = env!("WEBTRANSPORT_SERVER_HASH")
.trim_matches(&['[', ']'])
.split(',')
.map(|x| x.trim().parse())
.collect()
else {
panic!("could not parse server hash")
};
let hash = web_sys::js_sys::Uint8Array::from(server_hash.as_slice());
let object = web_sys::js_sys::Object::new();
Reflect::set(
&object,
&JsValue::from_str("algorithm"),
&JsValue::from_str("sha-256"),
)
.unwrap();
web_sys::js_sys::Reflect::set(&object, &"value".into(), &hash).unwrap();
let array = web_sys::js_sys::Array::new();
array.push(&object);
console::log_1(&object.clone().into());
console::log_1(&"Created option object!".into());
let mut options = WebTransportOptions::new();
options.server_certificate_hashes(&array);
console::log_1(&"Created WebTransportOptions!".into());
let transport = match WebTransport::new_with_options(&address, &options) {
Ok(x) => x,
Err(e) => {
console::log_1(&e.into());
panic!();
}
};
console::log_1(&"Created WebTransport connection object.".into());
console::log_1(&transport.clone().into());
if let Err(e) = wasm_bindgen_futures::JsFuture::from(transport.ready()).await {
console::log_1(&e.into());
panic!();
if let Err(error) = network_connect(address, username, &mut event_rx).await {
console::error_1(&error);
}
}
}
console::log_1(&"Transport is ready.".into());
macro_rules! bail {
($($x:tt)*) => {
return Err(wasm_bindgen::JsError::new(&format!($($x)*)).into());
};
}
let stream: web_sys::WebTransportBidirectionalStream =
match wasm_bindgen_futures::JsFuture::from(transport.create_bidirectional_stream())
.await
{
Ok(x) => x.into(),
Err(e) => {
console::log_1(&e.into());
panic!();
}
};
async fn network_connect(
address: String,
username: String,
event_rx: &mut UnboundedReceiver<Command>,
) -> Result<(), JsValue> {
*STATE.server.write() = Default::default();
*STATE.status.write() = ConnectionState::Connecting;
let wasm_stream_readable = wasm_streams::ReadableStream::from_raw(stream.readable().into());
let wasm_stream_writable = wasm_streams::WritableStream::from_raw(stream.writable().into());
console::log_1(&"Rust via WASM!".into());
let read_codec = ClientControlCodec::new();
let write_codec = ClientControlCodec::new();
let Ok(server_hash): Result<Vec<u8>, _> = include_str!("../server_hash.txt")
.trim()
.trim_matches(&['[', ']'])
.split(',')
.map(|x| x.trim().parse())
.collect()
else {
bail!("could not parse server hash");
};
let hash = web_sys::js_sys::Uint8Array::from(server_hash.as_slice());
let mut reader =
asynchronous_codec::FramedRead::new(wasm_stream_readable.into_async_read(), read_codec);
let mut writer = asynchronous_codec::FramedWrite::new(
wasm_stream_writable.into_async_write(),
write_codec,
);
let object = web_sys::js_sys::Object::new();
let (send_chan, writer_recv_chan) = async_std::channel::unbounded();
Reflect::set(
&object,
&JsValue::from_str("algorithm"),
&JsValue::from_str("sha-256"),
)?;
web_sys::js_sys::Reflect::set(&object, &"value".into(), &hash)?;
let array = web_sys::js_sys::Array::new();
array.push(&object);
console::log_1(&object.clone().into());
console::log_1(&"Created option object!".into());
let mut options = WebTransportOptions::new();
options.server_certificate_hashes(&array);
console::log_1(&"Created WebTransportOptions!".into());
let transport = WebTransport::new_with_options(&address, &options)?;
console::log_1(&"Created WebTransport connection object.".into());
console::log_1(&transport.clone().into());
if let Err(e) = wasm_bindgen_futures::JsFuture::from(transport.ready()).await {
console::log_1(&e.into());
panic!();
}
console::log_1(&"Transport is ready.".into());
let stream: WebTransportBidirectionalStream =
wasm_bindgen_futures::JsFuture::from(transport.create_bidirectional_stream())
.await?
.into();
let wasm_stream_readable = wasm_streams::ReadableStream::from_raw(stream.readable().into());
let wasm_stream_writable = wasm_streams::WritableStream::from_raw(stream.writable().into());
let read_codec = ClientControlCodec::new();
let write_codec = ClientControlCodec::new();
let mut reader =
asynchronous_codec::FramedRead::new(wasm_stream_readable.into_async_read(), read_codec);
let mut writer =
asynchronous_codec::FramedWrite::new(wasm_stream_writable.into_async_write(), write_codec);
let (mut send_chan, mut writer_recv_chan) = futures_channel::mpsc::unbounded();
spawn(async move {
while let Some(msg) = writer_recv_chan.next().await {
if let Err(e) = writer.send(msg).await {
console::error_1(&e.to_string().into());
break;
}
}
});
// Get version packet
let version = match reader.next().await {
Some(Ok(v)) => v,
Some(Err(err)) => bail!("bad version packet: {err:?}"),
None => bail!("no version was recieved"),
};
console::log_1(&"Got version packet".into());
console::log_1(&format!("{:#?}", version).into());
// Send version packet
let mut msg = msgs::Version::new();
msg.set_version(0x000010204);
msg.set_release(format!("{} {}", "mumbleweb2", "6.9.0"));
//msg.set_os("Chrome".to_string());
send_chan.send(msg.into()).await.unwrap();
console::log_1(&"Sent version packet".into());
// Send authenticate packet
let mut msg = msgs::Authenticate::new();
msg.set_username(username);
msg.set_opus(true);
send_chan.send(msg.into()).await.unwrap();
console::log_1(&"Sent authenticate packet".into());
// Spawn worker to send pings
{
let mut send_chan = send_chan.clone();
spawn(async move {
while let Ok(msg) = writer_recv_chan.recv().await {
if let Err(e) = writer.send(msg).await {
loop {
console::log_1(&"Sending ping".into());
if let Err(e) = send_chan.send(msgs::Ping::new().into()).await {
console::log_1(&"could not ping".into());
console::log_1(&e.to_string().into());
break;
}
TimeoutFuture::new(3000).await;
}
});
// Get version packet
let version = reader.next().await.unwrap().unwrap();
console::log_1(&"Got version packet".into());
console::log_1(&format!("{:#?}", version).into());
// Send version packet
let mut msg = msgs::Version::new();
msg.set_version(0x000010204);
msg.set_release(format!("{} {}", "mumbleweb2", "6.9.0"));
//msg.set_os("Chrome".to_string());
send_chan.send(msg.into()).await.unwrap();
console::log_1(&"Sent version packet".into());
// Send authenticate packet
let mut msg = msgs::Authenticate::new();
msg.set_username(username);
msg.set_opus(true);
send_chan.send(msg.into()).await.unwrap();
console::log_1(&"Sent authenticate packet".into());
// Spawn worker to send pings
{
let send_chan = send_chan.clone();
spawn(async move {
loop {
console::log_1(&"Sending ping".into());
if let Err(e) = send_chan.send(msgs::Ping::new().into()).await {
console::log_1(&"could not ping".into());
console::log_1(&e.to_string().into());
break;
}
async_std::task::sleep(Duration::from_millis(3000)).await;
}
});
}
// Create MediaStreams to playback decoded audio
// The audio context is used to reproduce audio.
let audio_context = configure_audio_context();
let audio_context_worklet = audio_context.clone();
let packet_sender_worklet = send_chan.clone();
spawn(async move {
match create_encoder_worklet(&audio_context_worklet, packet_sender_worklet).await {
Ok(node) => console::log_2(&"Created audio worklet:".into(), &node),
Err(err) => console::error_1(&err),
}
});
*STATE.status.write() = ConnectionState::Connected;
// Create map of session_id -> AudioDecoder
let mut decoder_map = HashMap::new();
loop {
select! {
packet = reader.next().fuse() => {
match packet {
Some(Ok(msg)) => accept_packet(msg, &audio_context, &mut decoder_map),
Some(Err(err)) => panic!("{err}"),
None => break,
}
}
command = event_rx.next() => {
match command {
Some(Command::Disconnect) => break,
_ => continue,
}
}
}
}
send_chan.close();
*STATE.status.write() = ConnectionState::Disconnected;
}
// Create MediaStreams to playback decoded audio
// The audio context is used to reproduce audio.
let audio_context = configure_audio_context();
let audio_context_worklet = audio_context.clone();
let packet_sender_worklet = send_chan.clone();
spawn(async move {
match create_encoder_worklet(&audio_context_worklet, packet_sender_worklet).await {
Ok(node) => console::log_2(&"Created audio worklet:".into(), &node),
Err(err) => console::error_1(&err),
}
});
// Create map of session_id -> AudioDecoder
let mut decoder_map = HashMap::new();
loop {
select! {
packet = reader.next().fuse() => {
match packet {
Some(Ok(msg)) => {
let res =accept_packet(msg, &audio_context, &mut decoder_map);
if let Err(err) = res {
console::error_1(&err.into());
}
},
Some(Err(err)) => console::error_1(&err.to_string().into()),
None => break,
}
}
command = event_rx.next() => {
match command {
Some(Command::Disconnect) => break,
Some(Command::SendChat { markdown, channels }) => {
let html_text = markdown::to_html(&markdown);
let mut u = msgs::TextMessage::new();
u.set_message(html_text);
u.set_channel_id(channels);
let _ = send_chan.send(u.into());
},
_ => continue,
}
}
}
}
send_chan.close();
*STATE.status.write() = ConnectionState::Disconnected;
Ok(())
}
fn accept_packet(
msg: ControlPacket<mumble_protocol::Clientbound>,
audio_context: &AudioContext,
decoder_map: &mut HashMap<u32, AudioDecoder>,
) {
if !matches!(msg, ControlPacket::UDPTunnel(_)) {
) -> Result<(), JsValue> {
if !matches!(msg, ControlPacket::UDPTunnel(_) | ControlPacket::Ping(_)) {
console::log_1(&format!("{:#?}", msg).into());
}
match msg {
@@ -558,9 +556,12 @@ fn accept_packet(
position_info,
} => {
// Get or create audio decoder for this user
let audio_decoder = decoder_map
.entry(session_id)
.or_insert_with(|| create_decoder(&audio_context));
let audio_decoder = match decoder_map.entry(session_id) {
Entry::Occupied(occupied_entry) => occupied_entry.into_mut(),
Entry::Vacant(vacant_entry) => {
vacant_entry.insert(create_decoder(&audio_context)?)
}
};
// This will over time (as users join and leave) leak
// AudioDecoders, MediaStreamTrackGenerators, MediaStreams, and MediaStreamAudioSourceNodes.
// A better way to handle this would be to delete and create all the audio
@@ -568,7 +569,7 @@ fn accept_packet(
// any audio packets that come in the meantime.
if let VoicePacketPayload::Opus(audio_payload, end_bit) = payload {
let js_audio_payload = Uint8Array::from(audio_payload.as_ref());
audio_decoder.decode(
let _ = audio_decoder.decode(
&EncodedAudioChunk::new(&EncodedAudioChunkInit::new(
&js_audio_payload.into(),
0.0,
@@ -668,10 +669,27 @@ fn accept_packet(
None
},
dangerous_html: html_purifier::purifier(&text, Default::default()),
text: text,
raw: text,
});
}
}
ControlPacket::ServerSync(u) => {
*STATE.status.write() = ConnectionState::Connected;
let mut server = STATE.server.write();
if u.has_welcome_text() {
let text = u.get_welcome_text().to_string();
server.chat.push(Chat {
sender: None,
dangerous_html: html_purifier::purifier(&text, Default::default()),
raw: text,
});
}
if u.has_session() {
server.session = Some(u.get_session());
}
}
_ => {}
}
Ok(())
}