This commit is contained in:
2024-08-31 00:58:39 -06:00
parent 5a48d0bcf7
commit e19333d686
5 changed files with 302 additions and 214 deletions
+41 -5
View File
@@ -2,23 +2,59 @@
use dioxus::prelude::*;
use super::Command;
use super::Command::*;
use super::ConnectionState::{self, *};
pub struct State {
pub status: GlobalSignal<String>,
pub status: GlobalSignal<ConnectionState>,
}
pub static STATE: State = State {
status: Signal::global(|| "Starting...".to_owned()),
status: Signal::global(|| Disconnected),
};
pub fn app() -> Element {
use_coroutine(|_: UnboundedReceiver<()>| super::network_entrypoint());
let net = use_coroutine(|rx: UnboundedReceiver<Command>| super::network_entrypoint(rx));
let mut username = use_signal(|| "".to_string());
let default_address = option_env!("MUMBLEWEB2_WEBTRANSPORT_SERVER_ADDRESS").unwrap_or("");
let mut address = use_signal(|| default_address.to_string());
let status = &STATE.status;
rsx!(
div {
"Hello, World!"
input {
placeholder: "username",
value: "{username.read()}",
autofocus: "true",
oninput: move |evt| username.set(evt.value().clone()),
}
br {}
input {
placeholder: "server address",
value: "{address.read()}",
autofocus: "true",
oninput: move |evt| address.set(evt.value().clone()),
}
br {}
match *status.read() {
Disconnected => rsx!{
button {
onclick: move |event| net.send(Connect{address: address.read().clone(), username: username.read().clone(), hash: "[39, 96, 204, 127, 26, 59, 35, 209, 197, 103, 192, 6, 3, 98, 203, 228, 124, 46, 247, 72, 44, 224, 123, 238, 218, 140, 128, 100, 115, 14, 23, 233]".to_string()}),
"Connect!"
}
},
Connecting => rsx!{
"Connecting..."
},
Connected => rsx!{
button {
onclick: move |event| net.send(Disconnect),
"Disconnect"
}
}
}
br {}
"{status}"
}
)
}
+207 -167
View File
@@ -1,8 +1,12 @@
pub mod app;
use dioxus::prelude::*;
use anyhow::Error;
use app::STATE;
use async_std::channel::Sender;
use futures::select;
use futures::FutureExt;
use futures::SinkExt;
use futures::StreamExt;
use manganis::{file, mg};
@@ -206,10 +210,10 @@ async fn create_encoder_worklet(
download_buffer.borrow_mut().push(array.clone());
if download_buffer.borrow().len() > 200 {
//download_data(download_buffer.borrow().to_vec(), "download_buffer.opus");
download_data(
ass::encode(download_buffer.borrow().to_vec(), 960, 0),
"download_buffer.opus",
);
//download_data(
// ass::encode(download_buffer.borrow().to_vec(), 960, 0),
// "download_buffer.opus",
//);
download_buffer.borrow_mut().clear();
}
@@ -221,7 +225,7 @@ async fn create_encoder_worklet(
payload: VoicePacketPayload::Opus(array.into(), false),
position_info: None,
})));
sequence_num = sequence_num.wrapping_add(1);
sequence_num = sequence_num.wrapping_add(2);
});
let audio_encoder = AudioEncoder::new(&AudioEncoderInit::new(
@@ -338,209 +342,245 @@ fn create_decoder(audio_context: &AudioContext) -> AudioDecoder {
audio_decoder
}
pub async fn network_entrypoint() {
// This sleep is to allow the user to interact with the window so the MediaStream
// can be created. This works around Chrome's autoplay policy rules. This will
// eventually be unnecessary when we have a proper GUI.
async_std::task::sleep(Duration::from_millis(3000)).await;
pub enum ConnectionState {
Disconnected,
Connecting,
Connected,
}
console::log_1(&"Rust via WASM!".into());
pub enum Command {
Connect {
address: String,
username: String,
hash: String,
},
Disconnect,
}
let Ok(server_hash): Result<Vec<u8>, _> = env!("WEBTRANSPORT_SERVER_HASH")
.trim_matches(&['[', ']'])
.split(',')
.map(|x| x.trim().parse())
.collect()
else {
panic!("could not parse server hash")
};
let hash = web_sys::js_sys::Uint8Array::from(server_hash.as_slice());
pub async fn network_entrypoint(mut event_rx: UnboundedReceiver<Command>) {
loop {
let Some(Command::Connect {
address,
username,
hash,
}) = event_rx.next().await
else {
panic!("Did not receive connect command")
};
let object = web_sys::js_sys::Object::new();
*STATE.status.write() = ConnectionState::Connecting;
Reflect::set(
&object,
&JsValue::from_str("algorithm"),
&JsValue::from_str("sha-256"),
)
.unwrap();
console::log_1(&"Rust via WASM!".into());
web_sys::js_sys::Reflect::set(&object, &"value".into(), &hash).unwrap();
let Ok(server_hash): Result<Vec<u8>, _> = env!("WEBTRANSPORT_SERVER_HASH")
.trim_matches(&['[', ']'])
.split(',')
.map(|x| x.trim().parse())
.collect()
else {
panic!("could not parse server hash")
};
let hash = web_sys::js_sys::Uint8Array::from(server_hash.as_slice());
let array = web_sys::js_sys::Array::new();
array.push(&object);
let object = web_sys::js_sys::Object::new();
console::log_1(&object.clone().into());
console::log_1(&"Created option object!".into());
Reflect::set(
&object,
&JsValue::from_str("algorithm"),
&JsValue::from_str("sha-256"),
)
.unwrap();
let mut options = WebTransportOptions::new();
options.server_certificate_hashes(&array);
web_sys::js_sys::Reflect::set(&object, &"value".into(), &hash).unwrap();
console::log_1(&"Created WebTransportOptions!".into());
let array = web_sys::js_sys::Array::new();
array.push(&object);
*STATE.status.write() = "Connecting to WebTransport...".into();
console::log_1(&object.clone().into());
console::log_1(&"Created option object!".into());
let transport = match WebTransport::new_with_options(
"https://localhost:4433/?hostname=ohea.xyz&port=64738&username=test",
&options,
) {
Ok(x) => x,
Err(e) => {
console::log_1(&e.into());
panic!();
}
};
let mut options = WebTransportOptions::new();
options.server_certificate_hashes(&array);
console::log_1(&"Created WebTransport connection object.".into());
console::log_1(&"Created WebTransportOptions!".into());
console::log_1(&transport.clone().into());
if let Err(e) = wasm_bindgen_futures::JsFuture::from(transport.ready()).await {
console::log_1(&e.into());
panic!();
}
console::log_1(&"Transport is ready.".into());
*STATE.status.write() = "Creating stream...".into();
let stream: web_sys::WebTransportBidirectionalStream =
match wasm_bindgen_futures::JsFuture::from(transport.create_bidirectional_stream()).await {
Ok(x) => x.into(),
let transport = match WebTransport::new_with_options(&address, &options) {
Ok(x) => x,
Err(e) => {
console::log_1(&e.into());
panic!();
}
};
let wasm_stream_readable = wasm_streams::ReadableStream::from_raw(stream.readable().into());
let wasm_stream_writable = wasm_streams::WritableStream::from_raw(stream.writable().into());
console::log_1(&"Created WebTransport connection object.".into());
let read_codec = ClientControlCodec::new();
let write_codec = ClientControlCodec::new();
console::log_1(&transport.clone().into());
let mut reader =
asynchronous_codec::FramedRead::new(wasm_stream_readable.into_async_read(), read_codec);
let mut writer =
asynchronous_codec::FramedWrite::new(wasm_stream_writable.into_async_write(), write_codec);
let (send_chan, writer_recv_chan) = async_std::channel::unbounded();
spawn(async move {
while let Ok(msg) = writer_recv_chan.recv().await {
if let Err(e) = writer.send(msg).await {
console::log_1(&e.to_string().into());
break;
}
if let Err(e) = wasm_bindgen_futures::JsFuture::from(transport.ready()).await {
console::log_1(&e.into());
panic!();
}
});
// Get version packet
let version = reader.next().await.unwrap().unwrap();
console::log_1(&"Got version packet".into());
console::log_1(&format!("{:#?}", version).into());
console::log_1(&"Transport is ready.".into());
// Send version packet
let mut msg = msgs::Version::new();
msg.set_version(0x000010204);
msg.set_release(format!("{} {}", "mumbleweb2", "6.9.0"));
//msg.set_os("Chrome".to_string());
send_chan.send(msg.into()).await.unwrap();
console::log_1(&"Sent version packet".into());
let stream: web_sys::WebTransportBidirectionalStream =
match wasm_bindgen_futures::JsFuture::from(transport.create_bidirectional_stream())
.await
{
Ok(x) => x.into(),
Err(e) => {
console::log_1(&e.into());
panic!();
}
};
// Send authenticate packet
let mut msg = msgs::Authenticate::new();
msg.set_username("mumbleweb2".to_string());
msg.set_opus(true);
send_chan.send(msg.into()).await.unwrap();
console::log_1(&"Sent authenticate packet".into());
let wasm_stream_readable = wasm_streams::ReadableStream::from_raw(stream.readable().into());
let wasm_stream_writable = wasm_streams::WritableStream::from_raw(stream.writable().into());
let read_codec = ClientControlCodec::new();
let write_codec = ClientControlCodec::new();
let mut reader =
asynchronous_codec::FramedRead::new(wasm_stream_readable.into_async_read(), read_codec);
let mut writer = asynchronous_codec::FramedWrite::new(
wasm_stream_writable.into_async_write(),
write_codec,
);
let (send_chan, writer_recv_chan) = async_std::channel::unbounded();
// Spawn worker to send pings
{
let send_chan = send_chan.clone();
spawn(async move {
loop {
console::log_1(&"Sending ping".into());
if let Err(e) = send_chan.send(msgs::Ping::new().into()).await {
while let Ok(msg) = writer_recv_chan.recv().await {
if let Err(e) = writer.send(msg).await {
console::log_1(&e.to_string().into());
break;
}
async_std::task::sleep(Duration::from_millis(3000)).await;
}
});
}
// Create MediaStreams to playback decoded audio
// The audio context is used to reproduce audio.
let audio_context = configure_audio_context();
// Get version packet
let version = reader.next().await.unwrap().unwrap();
console::log_1(&"Got version packet".into());
console::log_1(&format!("{:#?}", version).into());
let audio_context_worklet = audio_context.clone();
let packet_sender_worklet = send_chan.clone();
spawn(async move {
match create_encoder_worklet(&audio_context_worklet, packet_sender_worklet).await {
Ok(node) => console::log_2(&"Created audio worklet:".into(), &node),
Err(err) => console::error_1(&err),
}
});
// Send version packet
let mut msg = msgs::Version::new();
msg.set_version(0x000010204);
msg.set_release(format!("{} {}", "mumbleweb2", "6.9.0"));
//msg.set_os("Chrome".to_string());
send_chan.send(msg.into()).await.unwrap();
console::log_1(&"Sent version packet".into());
// Create map of session_id -> AudioDecoder
let mut decoder_map = HashMap::new();
// Send authenticate packet
let mut msg = msgs::Authenticate::new();
msg.set_username(username);
msg.set_opus(true);
send_chan.send(msg.into()).await.unwrap();
console::log_1(&"Sent authenticate packet".into());
loop {
match reader.next().await {
Some(Ok(msg)) => {
match msg {
ControlPacket::UDPTunnel(u) => {
match *u.clone() {
mumble_protocol::voice::VoicePacket::Audio {
_dst,
target,
session_id,
seq_num,
payload,
position_info,
} => {
// Get or create audio decoder for this user
let audio_decoder = decoder_map
.entry(session_id)
.or_insert_with(|| create_decoder(&audio_context));
// This will over time (as users join and leave) leak
// AudioDecoders, MediaStreamTrackGenerators, MediaStreams, and MediaStreamAudioSourceNodes.
// A better way to handle this would be to delete and create all the audio
// infra on channel join and update it as new users join the channel, dropping
// any audio packets that come in the meantime.
if let VoicePacketPayload::Opus(audio_payload, end_bit) = payload {
let js_audio_payload = Uint8Array::from(audio_payload.as_ref());
audio_decoder.decode(
&EncodedAudioChunk::new(&EncodedAudioChunkInit::new(
&js_audio_payload.into(),
0.0,
EncodedAudioChunkType::Key,
))
.unwrap(),
);
//console::log_1(&"Oueued audio chunk for decoding".into());
}
}
_ => {
unreachable!("TCP tunnels UDP packets should not contain pings");
// I think?
}
}
// Spawn worker to send pings
{
let send_chan = send_chan.clone();
spawn(async move {
loop {
console::log_1(&"Sending ping".into());
if let Err(e) = send_chan.send(msgs::Ping::new().into()).await {
console::log_1(&"could not ping".into());
console::log_1(&e.to_string().into());
break;
}
_ => {
console::log_1(&format!("{:#?}", msg).into());
async_std::task::sleep(Duration::from_millis(3000)).await;
}
});
}
// Create MediaStreams to playback decoded audio
// The audio context is used to reproduce audio.
let audio_context = configure_audio_context();
let audio_context_worklet = audio_context.clone();
let packet_sender_worklet = send_chan.clone();
spawn(async move {
match create_encoder_worklet(&audio_context_worklet, packet_sender_worklet).await {
Ok(node) => console::log_2(&"Created audio worklet:".into(), &node),
Err(err) => console::error_1(&err),
}
});
*STATE.status.write() = ConnectionState::Connected;
// Create map of session_id -> AudioDecoder
let mut decoder_map = HashMap::new();
loop {
select! {
packet = reader.next().fuse() => {
match packet {
Some(Ok(msg)) => accept_packet(msg, &audio_context, &mut decoder_map),
Some(Err(err)) => panic!("{err}"),
None => break,
}
}
command = event_rx.next() => {
match command {
Some(Command::Disconnect) => break,
_ => continue,
}
}
}
None => {
break;
}
Some(Err(e)) => {
console::log_1(&e.to_string().into());
break;
}
send_chan.close();
*STATE.status.write() = ConnectionState::Disconnected;
}
}
fn accept_packet(
msg: ControlPacket<mumble_protocol::Clientbound>,
audio_context: &AudioContext,
decoder_map: &mut HashMap<u32, AudioDecoder>,
) {
match msg {
ControlPacket::UDPTunnel(u) => {
match *u.clone() {
mumble_protocol::voice::VoicePacket::Audio {
_dst,
target,
session_id,
seq_num,
payload,
position_info,
} => {
// Get or create audio decoder for this user
let audio_decoder = decoder_map
.entry(session_id)
.or_insert_with(|| create_decoder(&audio_context));
// This will over time (as users join and leave) leak
// AudioDecoders, MediaStreamTrackGenerators, MediaStreams, and MediaStreamAudioSourceNodes.
// A better way to handle this would be to delete and create all the audio
// infra on channel join and update it as new users join the channel, dropping
// any audio packets that come in the meantime.
if let VoicePacketPayload::Opus(audio_payload, end_bit) = payload {
let js_audio_payload = Uint8Array::from(audio_payload.as_ref());
audio_decoder.decode(
&EncodedAudioChunk::new(&EncodedAudioChunkInit::new(
&js_audio_payload.into(),
0.0,
EncodedAudioChunkType::Key,
))
.unwrap(),
);
//console::log_1(&"Oueued audio chunk for decoding".into());
}
}
_ => {
unreachable!("TCP tunnels UDP packets should not contain pings");
// I think?
}
}
}
_ => {
console::log_1(&format!("{:#?}", msg).into());
}
}
}