working chat!

This commit is contained in:
2024-09-28 15:54:11 -06:00
parent 8f420e6efa
commit 58a7ecd88b
2 changed files with 47 additions and 21 deletions
+4 -3
View File
@@ -14,6 +14,7 @@ pub enum ConnectionState {
Connected, Connected,
} }
#[derive(Debug)]
pub enum Command { pub enum Command {
Connect { Connect {
address: String, address: String,
@@ -106,9 +107,9 @@ pub fn Channel(id: ChannelId) -> Element {
#[component] #[component]
pub fn ChatView() -> Element { pub fn ChatView() -> Element {
let net: Coroutine<Command> = use_coroutine_handle();
let server = STATE.server.read(); let server = STATE.server.read();
let mut draft = use_signal(|| "".to_string()); let mut draft = use_signal(|| "".to_string());
//let net: Coroutine<Command> = use_coroutine_handle();
rsx!( rsx!(
div { div {
style: "margin: 16px; padding: 16px; border: solid black 1px;", style: "margin: 16px; padding: 16px; border: solid black 1px;",
@@ -132,12 +133,12 @@ pub fn ChatView() -> Element {
} }
button { button {
onclick: move |_| { onclick: move |_| {
/*if let Some(user) = server.this_user() { if let Some(user) = STATE.server.read().this_user() {
net.send(SendChat { net.send(SendChat {
markdown: draft.write().split_off(0), markdown: draft.write().split_off(0),
channels: vec![user.channel], channels: vec![user.channel],
}); });
}*/ }
}, },
"Send" "Send"
} }
+42 -17
View File
@@ -94,8 +94,8 @@ mod ass {
opus_tags.extend(vendor_str.bytes()); opus_tags.extend(vendor_str.bytes());
opus_tags.extend(&[0u8; 4]); opus_tags.extend(&[0u8; 4]);
packet_writer.write_packet(&head, serial, ogg::PacketWriteEndInfo::EndPage, 0); let _ = packet_writer.write_packet(&head, serial, ogg::PacketWriteEndInfo::EndPage, 0);
packet_writer.write_packet(&opus_tags, serial, ogg::PacketWriteEndInfo::EndPage, 0); let _ = packet_writer.write_packet(&opus_tags, serial, ogg::PacketWriteEndInfo::EndPage, 0);
for (i, frame) in pre_encoded_frames.iter().enumerate() { for (i, frame) in pre_encoded_frames.iter().enumerate() {
let is_last = i == pre_encoded_frames.len() - 1; let is_last = i == pre_encoded_frames.len() - 1;
@@ -103,7 +103,7 @@ mod ass {
let granule_pos = let granule_pos =
calc_sr_u64((skip as usize + (i + 1) * frame_size) as u64, 48000, 48000); calc_sr_u64((skip as usize + (i + 1) * frame_size) as u64, 48000, 48000);
packet_writer.write_packet( let _ = packet_writer.write_packet(
frame.clone(), frame.clone(),
serial, serial,
if is_last { if is_last {
@@ -436,8 +436,12 @@ async fn network_connect(
let (mut send_chan, mut writer_recv_chan) = futures_channel::mpsc::unbounded(); let (mut send_chan, mut writer_recv_chan) = futures_channel::mpsc::unbounded();
// Spawn worker to send packets.
spawn(async move { spawn(async move {
while let Some(msg) = writer_recv_chan.next().await { while let Some(msg) = writer_recv_chan.next().await {
if !matches!(msg, ControlPacket::Ping(_) | ControlPacket::UDPTunnel(_)) {
console::log_1(&format!("sending {:#?}", msg).into());
}
if let Err(e) = writer.send(msg).await { if let Err(e) = writer.send(msg).await {
console::error_1(&e.to_string().into()); console::error_1(&e.to_string().into());
break; break;
@@ -460,21 +464,18 @@ async fn network_connect(
msg.set_release(format!("{} {}", "mumbleweb2", "6.9.0")); msg.set_release(format!("{} {}", "mumbleweb2", "6.9.0"));
//msg.set_os("Chrome".to_string()); //msg.set_os("Chrome".to_string());
send_chan.send(msg.into()).await.unwrap(); send_chan.send(msg.into()).await.unwrap();
console::log_1(&"Sent version packet".into());
// Send authenticate packet // Send authenticate packet
let mut msg = msgs::Authenticate::new(); let mut msg = msgs::Authenticate::new();
msg.set_username(username); msg.set_username(username);
msg.set_opus(true); msg.set_opus(true);
send_chan.send(msg.into()).await.unwrap(); send_chan.send(msg.into()).await.unwrap();
console::log_1(&"Sent authenticate packet".into());
// Spawn worker to send pings // Spawn worker to send pings
{ {
let mut send_chan = send_chan.clone(); let mut send_chan = send_chan.clone();
spawn(async move { spawn(async move {
loop { loop {
console::log_1(&"Sending ping".into());
if let Err(e) = send_chan.send(msgs::Ping::new().into()).await { if let Err(e) = send_chan.send(msgs::Ping::new().into()).await {
console::log_1(&"could not ping".into()); console::log_1(&"could not ping".into());
console::log_1(&e.to_string().into()); console::log_1(&e.to_string().into());
@@ -502,9 +503,12 @@ async fn network_connect(
// Create map of session_id -> AudioDecoder // Create map of session_id -> AudioDecoder
let mut decoder_map = HashMap::new(); let mut decoder_map = HashMap::new();
let mut reader_future = reader.next().fuse();
let mut command_future = event_rx.next();
loop { loop {
select! { select! {
packet = reader.next().fuse() => { packet = reader_future => {
reader_future = reader.next().fuse();
match packet { match packet {
Some(Ok(msg)) => { Some(Ok(msg)) => {
let res =accept_packet(msg, &audio_context, &mut decoder_map); let res =accept_packet(msg, &audio_context, &mut decoder_map);
@@ -516,23 +520,44 @@ async fn network_connect(
None => break, None => break,
} }
} }
command = event_rx.next() => { command = command_future => {
command_future = event_rx.next();
if let Some(command) = &command {
console::log_1(&format!("commanding {:#?}", command).into());
}
match command { match command {
Some(Command::Disconnect) => break, Some(Command::Disconnect) => break,
Some(Command::SendChat { markdown, channels }) => { Some(command) => {
let res = accept_command(command, &mut send_chan);
if let Err(err) = res {
console::error_1(&err.into());
}
}
None => continue,
}
}
}
}
let _ = send_chan.close();
*STATE.status.write() = ConnectionState::Disconnected;
Ok(())
}
fn accept_command(
command: Command,
send_chan: &mut UnboundedSender<ControlPacket<mumble_protocol::Serverbound>>,
) -> Result<(), JsValue> {
match command {
Command::SendChat { markdown, channels } => {
let html_text = markdown::to_html(&markdown); let html_text = markdown::to_html(&markdown);
let mut u = msgs::TextMessage::new(); let mut u = msgs::TextMessage::new();
u.set_message(html_text); u.set_message(html_text);
u.set_channel_id(channels); u.set_channel_id(channels);
let _ = send_chan.send(u.into()); let _ = send_chan.unbounded_send(u.into());
},
_ => continue,
} }
_ => (),
} }
}
}
send_chan.close();
*STATE.status.write() = ConnectionState::Disconnected;
Ok(()) Ok(())
} }
@@ -542,7 +567,7 @@ fn accept_packet(
decoder_map: &mut HashMap<u32, AudioDecoder>, decoder_map: &mut HashMap<u32, AudioDecoder>,
) -> Result<(), JsValue> { ) -> Result<(), JsValue> {
if !matches!(msg, ControlPacket::UDPTunnel(_) | ControlPacket::Ping(_)) { if !matches!(msg, ControlPacket::UDPTunnel(_) | ControlPacket::Ping(_)) {
console::log_1(&format!("{:#?}", msg).into()); console::log_1(&format!("receiving {:#?}", msg).into());
} }
match msg { match msg {
ControlPacket::UDPTunnel(u) => { ControlPacket::UDPTunnel(u) => {