From 58a7ecd88b64a44d73c996bdfe31e223369ff099 Mon Sep 17 00:00:00 2001 From: Sam Sartor Date: Sat, 28 Sep 2024 15:54:11 -0600 Subject: [PATCH] working chat! --- src/app.rs | 7 ++++--- src/lib.rs | 61 ++++++++++++++++++++++++++++++++++++++---------------- 2 files changed, 47 insertions(+), 21 deletions(-) diff --git a/src/app.rs b/src/app.rs index 6c0da58..9956d8f 100644 --- a/src/app.rs +++ b/src/app.rs @@ -14,6 +14,7 @@ pub enum ConnectionState { Connected, } +#[derive(Debug)] pub enum Command { Connect { address: String, @@ -106,9 +107,9 @@ pub fn Channel(id: ChannelId) -> Element { #[component] pub fn ChatView() -> Element { + let net: Coroutine = use_coroutine_handle(); let server = STATE.server.read(); let mut draft = use_signal(|| "".to_string()); - //let net: Coroutine = use_coroutine_handle(); rsx!( div { style: "margin: 16px; padding: 16px; border: solid black 1px;", @@ -132,12 +133,12 @@ pub fn ChatView() -> Element { } button { onclick: move |_| { - /*if let Some(user) = server.this_user() { + if let Some(user) = STATE.server.read().this_user() { net.send(SendChat { markdown: draft.write().split_off(0), channels: vec![user.channel], }); - }*/ + } }, "Send" } diff --git a/src/lib.rs b/src/lib.rs index a01a8f1..04ec652 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -94,8 +94,8 @@ mod ass { opus_tags.extend(vendor_str.bytes()); opus_tags.extend(&[0u8; 4]); - packet_writer.write_packet(&head, serial, ogg::PacketWriteEndInfo::EndPage, 0); - packet_writer.write_packet(&opus_tags, serial, ogg::PacketWriteEndInfo::EndPage, 0); + let _ = packet_writer.write_packet(&head, serial, ogg::PacketWriteEndInfo::EndPage, 0); + let _ = packet_writer.write_packet(&opus_tags, serial, ogg::PacketWriteEndInfo::EndPage, 0); for (i, frame) in pre_encoded_frames.iter().enumerate() { let is_last = i == pre_encoded_frames.len() - 1; @@ -103,7 +103,7 @@ mod ass { let granule_pos = calc_sr_u64((skip as usize + (i + 1) * frame_size) as u64, 48000, 48000); - packet_writer.write_packet( + let _ = packet_writer.write_packet( frame.clone(), serial, if is_last { @@ -436,8 +436,12 @@ async fn network_connect( let (mut send_chan, mut writer_recv_chan) = futures_channel::mpsc::unbounded(); + // Spawn worker to send packets. spawn(async move { while let Some(msg) = writer_recv_chan.next().await { + if !matches!(msg, ControlPacket::Ping(_) | ControlPacket::UDPTunnel(_)) { + console::log_1(&format!("sending {:#?}", msg).into()); + } if let Err(e) = writer.send(msg).await { console::error_1(&e.to_string().into()); break; @@ -460,21 +464,18 @@ async fn network_connect( msg.set_release(format!("{} {}", "mumbleweb2", "6.9.0")); //msg.set_os("Chrome".to_string()); send_chan.send(msg.into()).await.unwrap(); - console::log_1(&"Sent version packet".into()); // Send authenticate packet let mut msg = msgs::Authenticate::new(); msg.set_username(username); msg.set_opus(true); send_chan.send(msg.into()).await.unwrap(); - console::log_1(&"Sent authenticate packet".into()); // Spawn worker to send pings { let mut send_chan = send_chan.clone(); spawn(async move { loop { - console::log_1(&"Sending ping".into()); if let Err(e) = send_chan.send(msgs::Ping::new().into()).await { console::log_1(&"could not ping".into()); console::log_1(&e.to_string().into()); @@ -502,9 +503,12 @@ async fn network_connect( // Create map of session_id -> AudioDecoder let mut decoder_map = HashMap::new(); + let mut reader_future = reader.next().fuse(); + let mut command_future = event_rx.next(); loop { select! { - packet = reader.next().fuse() => { + packet = reader_future => { + reader_future = reader.next().fuse(); match packet { Some(Ok(msg)) => { let res =accept_packet(msg, &audio_context, &mut decoder_map); @@ -516,33 +520,54 @@ async fn network_connect( None => break, } } - command = event_rx.next() => { + command = command_future => { + command_future = event_rx.next(); + if let Some(command) = &command { + console::log_1(&format!("commanding {:#?}", command).into()); + } match command { Some(Command::Disconnect) => break, - Some(Command::SendChat { markdown, channels }) => { - let html_text = markdown::to_html(&markdown); - let mut u = msgs::TextMessage::new(); - u.set_message(html_text); - u.set_channel_id(channels); - let _ = send_chan.send(u.into()); - }, - _ => continue, + Some(command) => { + let res = accept_command(command, &mut send_chan); + if let Err(err) = res { + console::error_1(&err.into()); + } + } + None => continue, } } } } - send_chan.close(); + let _ = send_chan.close(); *STATE.status.write() = ConnectionState::Disconnected; Ok(()) } +fn accept_command( + command: Command, + send_chan: &mut UnboundedSender>, +) -> Result<(), JsValue> { + match command { + Command::SendChat { markdown, channels } => { + let html_text = markdown::to_html(&markdown); + let mut u = msgs::TextMessage::new(); + u.set_message(html_text); + u.set_channel_id(channels); + let _ = send_chan.unbounded_send(u.into()); + } + _ => (), + } + + Ok(()) +} + fn accept_packet( msg: ControlPacket, audio_context: &AudioContext, decoder_map: &mut HashMap, ) -> Result<(), JsValue> { if !matches!(msg, ControlPacket::UDPTunnel(_) | ControlPacket::Ping(_)) { - console::log_1(&format!("{:#?}", msg).into()); + console::log_1(&format!("receiving {:#?}", msg).into()); } match msg { ControlPacket::UDPTunnel(u) => {