imp module

This commit is contained in:
2024-11-08 16:10:07 -07:00
parent 542d51c42f
commit 0f5b6249ae
5 changed files with 2636 additions and 551 deletions
Generated
+2103 -20
View File
File diff suppressed because it is too large Load Diff
+13 -8
View File
@@ -4,22 +4,22 @@ version = "0.1.0"
edition = "2021"
[dependencies]
dioxus = { version = "0.5.6", features = ["web"] }
dioxus-web = "0.5.6"
dioxus = { version = "0.5.6" }
dioxus-web = { version="0.5.6", optional = true }
manganis = "0.2.2"
once_cell = "1.19.0"
asynchronous-codec = "0.6.2"
futures = "0.3.30"
merge-io = "0.3.0"
mumble-protocol = { version = "0.5.0", package = "mumble-protocol-2x", default-features = false, features = ["asynchronous-codec"]}
serde-wasm-bindgen = "0.6.5"
serde_json = "1.0.117"
tokio-util = { version = "0.7.11", features = ["codec"]}
wasm-bindgen = "0.2.92"
wasm-bindgen-futures = "0.4.42"
wasm-streams = "0.4.0"
js-sys = "0.3.70"
web-sys = { version = "0.3.70", features = ["WebTransport", "console", "WebTransportOptions", "WebTransportBidirectionalStream", "WebTransportSendStream", "WebTransportReceiveStream", "Navigator", "MediaDevices", "AudioDecoder", "AudioDecoderInit", "AudioData", "AudioEncoderConfig", "AudioDecoderConfig", "EncodedAudioChunk", "EncodedAudioChunkInit", "EncodedAudioChunkType", "CodecState", "MediaStreamTrackGenerator", "MediaStreamTrackGeneratorInit", "AudioContext", "AudioContextOptions", "MediaStream", "GainNode", "MediaStreamAudioSourceNode", "BaseAudioContext", "AudioDestinationNode", "AudioWorkletNode", "AudioWorklet", "AudioWorkletProcessor", "MediaStreamConstraints", "WorkletOptions", "AudioEncoder", "AudioEncoderInit", "AudioDataInit", "HtmlAnchorElement", "Url", "Blob", "AudioDataCopyToOptions", "AudioSampleFormat"] }
wasm-bindgen = { version = "0.2.92", optional = true }
wasm-bindgen-futures = { version = "0.4.42", optional = true }
wasm-streams = { version = "0.4.0", optional = true }
serde-wasm-bindgen = { version = "0.6.5", optional = true }
js-sys = { version = "0.3.70", optional = true }
web-sys = { version = "0.3.70", features = ["WebTransport", "console", "WebTransportOptions", "WebTransportBidirectionalStream", "WebTransportSendStream", "WebTransportReceiveStream", "Navigator", "MediaDevices", "AudioDecoder", "AudioDecoderInit", "AudioData", "AudioEncoderConfig", "AudioDecoderConfig", "EncodedAudioChunk", "EncodedAudioChunkInit", "EncodedAudioChunkType", "CodecState", "MediaStreamTrackGenerator", "MediaStreamTrackGeneratorInit", "AudioContext", "AudioContextOptions", "MediaStream", "GainNode", "MediaStreamAudioSourceNode", "BaseAudioContext", "AudioDestinationNode", "AudioWorkletNode", "AudioWorklet", "AudioWorkletProcessor", "MediaStreamConstraints", "WorkletOptions", "AudioEncoder", "AudioEncoderInit", "AudioDataInit", "HtmlAnchorElement", "Url", "Blob", "AudioDataCopyToOptions", "AudioSampleFormat"], optional = true }
anyhow = "1.0.86"
byteorder = "1.5.0"
ogg = "0.9.1"
@@ -29,3 +29,8 @@ markdown = "0.3.0"
gloo-timers = { version = "0.3.0", features = ["futures"] }
futures-channel = "0.3.30"
sir = { version = "0.5.0", features = ["dioxus"] }
[features]
default = ["web"]
web = ["dioxus/web", "dioxus-web", "wasm-bindgen", "wasm-bindgen-futures", "wasm-streams", "serde-wasm-bindgen", "js-sys", "web-sys"]
desktop = ["dioxus/desktop"]
+6
View File
@@ -0,0 +1,6 @@
pub async fn network_connect(
address: String,
username: String,
event_rx: &mut UnboundedReceiver<Command>,
) -> Result<(), JsValue> {
}
+488
View File
@@ -0,0 +1,488 @@
use crate::app::ChannelId;
use crate::app::Chat;
use crate::app::Command;
use crate::app::ConnectionState;
use crate::app::STATE;
use crate::bail;
use dioxus::prelude::*;
use futures::select;
use futures::FutureExt;
use futures::SinkExt;
use futures::StreamExt;
use futures_channel::mpsc::UnboundedSender;
use gloo_timers::future::TimeoutFuture;
use manganis::{file, mg};
use markdown;
use mumble_protocol::control::ControlPacket;
use mumble_protocol::control::{msgs, ClientControlCodec};
use mumble_protocol::voice::VoicePacket;
use mumble_protocol::voice::VoicePacketPayload;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt;
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::spawn_local as spawn;
use wasm_bindgen_futures::{future_to_promise, JsFuture};
use web_sys::console;
use web_sys::js_sys::Promise;
use web_sys::js_sys::Reflect;
use web_sys::js_sys::Uint8Array;
use web_sys::window;
use web_sys::AudioContextOptions;
use web_sys::AudioData;
use web_sys::AudioDecoder;
use web_sys::AudioDecoderConfig;
use web_sys::AudioDecoderInit;
use web_sys::AudioEncoder;
use web_sys::AudioEncoderConfig;
use web_sys::AudioEncoderInit;
use web_sys::AudioWorkletNode;
use web_sys::EncodedAudioChunk;
use web_sys::EncodedAudioChunkInit;
use web_sys::EncodedAudioChunkType;
use web_sys::MediaStream;
use web_sys::MediaStreamConstraints;
use web_sys::MediaStreamTrackGenerator;
use web_sys::MediaStreamTrackGeneratorInit;
use web_sys::MessageEvent;
use web_sys::WebTransport;
use web_sys::WebTransportBidirectionalStream;
use web_sys::WebTransportOptions;
use web_sys::WorkletOptions;
pub use web_sys::AudioContext;
pub struct AudioPlayer(AudioDecoder);
impl AudioPlayer {
pub fn play_opus(&mut self, payload: &[u8]) {
let js_audio_payload = Uint8Array::from(payload);
let _ = self.0.decode(
&EncodedAudioChunk::new(&EncodedAudioChunkInit::new(
&js_audio_payload.into(),
0.0,
EncodedAudioChunkType::Key,
))
.unwrap(),
);
}
}
pub struct Error(JsValue);
impl From<anyhow::Error> for Error {
fn from(value: anyhow::Error) -> Self {
Error(JsError::new(&value.to_string()).into())
}
}
impl From<JsValue> for Error {
fn from(value: JsValue) -> Self {
Error(value)
}
}
impl From<JsError> for Error {
fn from(value: JsError) -> Self {
Error(JsError::from(value).into())
}
}
impl Error {
pub fn new(text: String) -> Self {
wasm_bindgen::JsError::new(&text).into()
}
pub fn log(&self) {
console::error_1(&self.0);
}
}
impl std::error::Error for Error {}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let text: String = js_sys::Object::from(self.0.clone()).to_string().into();
f.write_str(&text)
}
}
impl fmt::Debug for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let text: String = js_sys::Object::from(self.0.clone()).to_string().into();
f.write_str(&text)
}
}
// Borrowed from
// https://github.com/security-union/videocall-rs/blob/main/videocall-client/src/decode/config.rs#L6
fn configure_audio_context() -> AudioContext {
let mut audio_context_options = AudioContextOptions::new();
audio_context_options.sample_rate(48000 as f32);
let audio_context = AudioContext::new_with_context_options(&audio_context_options).unwrap();
audio_context
}
trait PromiseExt {
fn into_future(self) -> JsFuture;
}
impl PromiseExt for Promise {
fn into_future(self) -> JsFuture {
self.into()
}
}
async fn create_encoder_worklet(
audio_context: &AudioContext,
packets: UnboundedSender<ControlPacket<mumble_protocol::Serverbound>>,
) -> Result<AudioWorkletNode, Error> {
let stream = window()
.unwrap()
.navigator()
.media_devices()?
.get_user_media_with_constraints(MediaStreamConstraints::new().audio(&JsValue::TRUE))?
.into_future()
.await?
.dyn_into()
.map_err(|e| JsError::new(&format!("not a stream: {e:?}")))?;
let options = WorkletOptions::new();
Reflect::set(
&options,
&"processorOptions".into(),
&wasm_bindgen::module(),
)?;
let module = "rust_mic_worklet.js";
console::log_1(&format!("Loading mic worklet from {module:?}").into());
audio_context
.audio_worklet()?
.add_module_with_options(module, &options)?
.into_future()
.await?;
let source = audio_context.create_media_stream_source(&stream)?;
let worklet_node = AudioWorkletNode::new(audio_context, "rust_mic_worklet")?;
let error: Closure<dyn FnMut(JsValue)> = Closure::new(|e| console::error_1(&e));
let mut download_buffer = std::cell::RefCell::new(Vec::new());
// This knows what MediaStreamTrackGenerator to use as it closes around it
let mut sequence_num = 0;
let output: Closure<dyn FnMut(EncodedAudioChunk)> =
Closure::new(move |audio_data: EncodedAudioChunk| {
let mut array = vec![0u8; audio_data.byte_length() as usize];
audio_data.copy_to_with_u8_array(&mut array);
download_buffer.borrow_mut().push(array.clone());
if download_buffer.borrow().len() > 200 {
//download_data(download_buffer.borrow().to_vec(), "download_buffer.opus");
//download_data(
// ass::encode(download_buffer.borrow().to_vec(), 960, 0),
// "download_buffer.opus",
//);
download_buffer.borrow_mut().clear();
}
let _ =
packets.unbounded_send(ControlPacket::UDPTunnel(Box::new(VoicePacket::Audio {
_dst: std::marker::PhantomData,
target: 0,
session_id: (),
seq_num: sequence_num,
payload: VoicePacketPayload::Opus(array.into(), false),
position_info: None,
})));
sequence_num = sequence_num.wrapping_add(2);
});
let audio_encoder = AudioEncoder::new(&AudioEncoderInit::new(
error.as_ref().unchecked_ref(),
output.as_ref().unchecked_ref(),
))
.unwrap();
// This is required to prevent these from being deallocated
error.forget();
output.forget();
let encoder_config = AudioEncoderConfig::new("opus");
encoder_config.set_number_of_channels(1);
encoder_config.set_sample_rate(48000);
encoder_config.set_bitrate(72_000.0);
audio_encoder.configure(&encoder_config);
console::log_1(&"Created Audio Encoder".into());
let mut download_buffer = std::cell::RefCell::new(Vec::new());
let onmessage: Closure<dyn FnMut(MessageEvent)> = Closure::new(move |event: MessageEvent| {
match AudioData::new(event.data().unchecked_ref()) {
Ok(data) => {
let x = web_sys::AudioDataCopyToOptions::new(0);
x.set_format(web_sys::AudioSampleFormat::F32);
let mut sub_buffer = vec![0; data.allocation_size(&x).unwrap() as usize];
data.copy_to_with_u8_array(&mut sub_buffer, &x);
download_buffer.borrow_mut().append(&mut sub_buffer);
if download_buffer.borrow().len() > 48000 * 10 * 4 {
//pub fn download_data(data: Vec<u8>, filename: &str) -> Result<(), JsValue> {
//download_data(download_buffer.borrow().to_vec(), "download_buffer.pcm32");
download_buffer.borrow_mut().clear();
}
audio_encoder.encode(&data);
}
Err(err) => {
console::error_1(&err);
console::debug_1(&event);
}
}
});
Reflect::set(
&Reflect::get(&worklet_node, &"port".into())?,
&"onmessage".into(),
onmessage.as_ref(),
)?;
onmessage.forget();
source.connect_with_audio_node(&worklet_node)?;
worklet_node.connect_with_audio_node(&audio_context.destination())?;
Ok(worklet_node)
}
pub fn create_player(audio_context: &AudioContext) -> Result<AudioPlayer, Error> {
let audio_stream_generator =
MediaStreamTrackGenerator::new(&MediaStreamTrackGeneratorInit::new("audio"))?;
// Create MediaStream from MediaStreamTrackGenerator
let js_tracks = web_sys::js_sys::Array::new();
js_tracks.push(&audio_stream_generator);
let media_stream = MediaStream::new_with_tracks(&js_tracks)?;
// Create MediaStreamAudioSourceNode
let audio_source = audio_context.create_media_stream_source(&media_stream)?;
// Connect output of audio_source to audio_context (browser audio)
audio_source.connect_with_audio_node(&audio_context.destination())?;
// Create callback functions for AudioDecoder
let error = Closure::wrap(Box::new(move |e: JsValue| {
console::error_1(&e);
}) as Box<dyn FnMut(JsValue)>);
// This knows what MediaStreamTrackGenerator to use as it closes around it
let output = Closure::wrap(Box::new(move |audio_data: AudioData| {
let writable = audio_stream_generator.writable();
if writable.locked() {
return;
}
if let Err(e) = writable.get_writer().map(|writer| {
spawn(async move {
if let Err(e) = JsFuture::from(writer.ready()).await {
console::error_1(&format!("write chunk ready error {:?}", e).into());
}
if let Err(e) = JsFuture::from(writer.write_with_chunk(&audio_data)).await {
console::error_1(&format!("write chunk error {:?}", e).into());
};
writer.release_lock();
});
}) {
console::error_1(&e);
}
}) as Box<dyn FnMut(AudioData)>);
let audio_decoder = AudioDecoder::new(&AudioDecoderInit::new(
error.as_ref().unchecked_ref(),
output.as_ref().unchecked_ref(),
))?;
audio_decoder.configure(&AudioDecoderConfig::new("opus", 1, 48000));
console::log_1(&"Created Audio Decoder".into());
// This is required to prevent these from being deallocated
error.forget();
output.forget();
Ok(AudioPlayer(audio_decoder))
}
pub async fn network_connect(
address: String,
username: String,
event_rx: &mut UnboundedReceiver<Command>,
) -> Result<(), Error> {
*STATE.server.write() = Default::default();
*STATE.status.write() = ConnectionState::Connecting;
console::log_1(&"Rust via WASM!".into());
let Ok(server_hash): Result<Vec<u8>, _> = include_str!("../../server_hash.txt")
.trim()
.trim_matches(&['[', ']'])
.split(',')
.map(|x| x.trim().parse())
.collect()
else {
bail!("could not parse server hash");
};
let hash = web_sys::js_sys::Uint8Array::from(server_hash.as_slice());
let object = web_sys::js_sys::Object::new();
Reflect::set(
&object,
&JsValue::from_str("algorithm"),
&JsValue::from_str("sha-256"),
)?;
web_sys::js_sys::Reflect::set(&object, &"value".into(), &hash)?;
let array = web_sys::js_sys::Array::new();
array.push(&object);
console::log_1(&object.clone().into());
console::log_1(&"Created option object!".into());
let mut options = WebTransportOptions::new();
options.server_certificate_hashes(&array);
console::log_1(&"Created WebTransportOptions!".into());
let transport = WebTransport::new_with_options(&address, &options)?;
console::log_1(&"Created WebTransport connection object.".into());
console::log_1(&transport.clone().into());
if let Err(e) = wasm_bindgen_futures::JsFuture::from(transport.ready()).await {
bail!("could not connect to transport: {e:?}");
}
console::log_1(&"Transport is ready.".into());
let stream: WebTransportBidirectionalStream =
wasm_bindgen_futures::JsFuture::from(transport.create_bidirectional_stream())
.await?
.into();
let wasm_stream_readable = wasm_streams::ReadableStream::from_raw(stream.readable().into());
let wasm_stream_writable = wasm_streams::WritableStream::from_raw(stream.writable().into());
let read_codec = ClientControlCodec::new();
let write_codec = ClientControlCodec::new();
let mut reader =
asynchronous_codec::FramedRead::new(wasm_stream_readable.into_async_read(), read_codec);
let mut writer =
asynchronous_codec::FramedWrite::new(wasm_stream_writable.into_async_write(), write_codec);
let (mut send_chan, mut writer_recv_chan) = futures_channel::mpsc::unbounded();
// Spawn worker to send packets.
spawn(async move {
while let Some(msg) = writer_recv_chan.next().await {
if !matches!(msg, ControlPacket::Ping(_) | ControlPacket::UDPTunnel(_)) {
console::log_1(&format!("sending {:#?}", msg).into());
}
if let Err(e) = writer.send(msg).await {
console::error_1(&e.to_string().into());
break;
}
}
});
// Get version packet
let version = match reader.next().await {
Some(Ok(v)) => v,
Some(Err(err)) => bail!("bad version packet: {err:?}"),
None => bail!("no version was recieved"),
};
console::log_1(&"Got version packet".into());
console::log_1(&format!("{:#?}", version).into());
// Send version packet
let mut msg = msgs::Version::new();
msg.set_version(0x000010204);
msg.set_release(format!("{} {}", "mumbleweb2", "6.9.0"));
//msg.set_os("Chrome".to_string());
send_chan.send(msg.into()).await.unwrap();
// Send authenticate packet
let mut msg = msgs::Authenticate::new();
msg.set_username(username);
msg.set_opus(true);
send_chan.send(msg.into()).await.unwrap();
// Spawn worker to send pings
{
let mut send_chan = send_chan.clone();
spawn(async move {
loop {
if let Err(e) = send_chan.send(msgs::Ping::new().into()).await {
console::log_1(&"could not ping".into());
console::log_1(&e.to_string().into());
break;
}
TimeoutFuture::new(3000).await;
}
});
}
// Create MediaStreams to playback decoded audio
// The audio context is used to reproduce audio.
let audio_context = configure_audio_context();
let audio_context_worklet = audio_context.clone();
let packet_sender_worklet = send_chan.clone();
spawn(async move {
match create_encoder_worklet(&audio_context_worklet, packet_sender_worklet).await {
Ok(node) => console::log_2(&"Created audio worklet:".into(), &node),
Err(err) => err.log(),
}
});
// Create map of session_id -> AudioDecoder
let mut decoder_map = HashMap::new();
let mut reader_future = reader.next().fuse();
let mut command_future = event_rx.next();
loop {
select! {
packet = reader_future => {
reader_future = reader.next().fuse();
match packet {
Some(Ok(msg)) => {
if !matches!(msg, ControlPacket::UDPTunnel(_) | ControlPacket::Ping(_)) {
console::log_1(&format!("receiving {:#?}", msg).into());
}
let res = super::accept_packet(msg, &audio_context, &mut decoder_map);
if let Err(err) = res {
err.log();
}
},
Some(Err(err)) => console::error_1(&err.to_string().into()),
None => break,
}
}
command = command_future => {
command_future = event_rx.next();
if let Some(command) = &command {
console::log_1(&format!("commanding {:#?}", command).into());
}
match command {
Some(Command::Disconnect) => break,
Some(command) => {
let res = super::accept_command(command, &mut send_chan);
if let Err(err) = res {
err.log();
}
}
None => continue,
}
}
}
}
let _ = send_chan.close();
*STATE.status.write() = ConnectionState::Disconnected;
Ok(())
}
+25 -522
View File
@@ -1,18 +1,16 @@
pub mod app;
use app::ChannelId;
use app::Chat;
use app::Command;
use app::ConnectionState;
use dioxus::prelude::*;
use app::STATE;
use dioxus::prelude::*;
use futures::select;
use futures::FutureExt;
use futures::SinkExt;
use futures::StreamExt;
use futures_channel::mpsc::UnboundedSender;
use gloo_timers::future::TimeoutFuture;
pub use imp::Error;
use manganis::{file, mg};
use markdown;
use mumble_protocol::control::ControlPacket;
@@ -22,324 +20,24 @@ use mumble_protocol::voice::VoicePacketPayload;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt;
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::spawn_local as spawn;
use wasm_bindgen_futures::{future_to_promise, JsFuture};
use web_sys::console;
use web_sys::js_sys::Promise;
use web_sys::js_sys::Reflect;
use web_sys::js_sys::Uint8Array;
use web_sys::window;
use web_sys::AudioContext;
use web_sys::AudioContextOptions;
use web_sys::AudioData;
use web_sys::AudioDecoder;
use web_sys::AudioDecoderConfig;
use web_sys::AudioDecoderInit;
use web_sys::AudioEncoder;
use web_sys::AudioEncoderConfig;
use web_sys::AudioEncoderInit;
use web_sys::AudioWorkletNode;
use web_sys::EncodedAudioChunk;
use web_sys::EncodedAudioChunkInit;
use web_sys::EncodedAudioChunkType;
use web_sys::MediaStream;
use web_sys::MediaStreamConstraints;
use web_sys::MediaStreamTrackGenerator;
use web_sys::MediaStreamTrackGeneratorInit;
use web_sys::MessageEvent;
use web_sys::WebTransport;
use web_sys::WebTransportBidirectionalStream;
use web_sys::WebTransportOptions;
use web_sys::WorkletOptions;
mod ass {
use byteorder::{ByteOrder, LittleEndian};
use ogg::PacketWriter;
pub mod app;
const VER: &str = "ballz";
#[cfg(feature = "web")]
#[path = "imp/web.rs"]
pub mod imp;
const fn to_samples<const S_PS: u32>(ms: u32) -> usize {
((S_PS * ms) / 1000) as usize
}
/*
#[cfg(feature = "desktop")]
#[path = "imp/desktop.rs"]
pub mod imp;
*/
const fn calc_sr_u64(val: u64, from: u32, to: u32) -> u64 {
(val * to as u64) / from as u64
}
pub fn encode(pre_encoded_frames: Vec<Vec<u8>>, frame_size: usize, skip: u16) -> Vec<u8> {
let mut buffer: Vec<u8> = Vec::new();
let mut packet_writer = PacketWriter::new(&mut buffer);
// Hardcoded serial number
let serial = 12345;
let skip_48 = calc_sr_u64(skip.into(), 48000, 48000);
let opus_head: [u8; 19] = [
b'O', b'p', b'u', b's', b'H', b'e', b'a', b'd', 1, 1, // NUM_CHANNELS = 1
0, 0, 0, 0, 0, 0, 0, 0, 0,
];
let mut head = opus_head;
LittleEndian::write_u16(&mut head[10..12], skip_48 as u16);
LittleEndian::write_u32(&mut head[12..16], 48000);
let mut opus_tags: Vec<u8> = Vec::with_capacity(60);
let vendor_str = format!("ogg-opus {}", VER);
opus_tags.extend(b"OpusTags");
let mut len_bf = [0u8; 4];
LittleEndian::write_u32(&mut len_bf, vendor_str.len() as u32);
opus_tags.extend(&len_bf);
opus_tags.extend(vendor_str.bytes());
opus_tags.extend(&[0u8; 4]);
let _ = packet_writer.write_packet(&head, serial, ogg::PacketWriteEndInfo::EndPage, 0);
let _ = packet_writer.write_packet(&opus_tags, serial, ogg::PacketWriteEndInfo::EndPage, 0);
for (i, frame) in pre_encoded_frames.iter().enumerate() {
let is_last = i == pre_encoded_frames.len() - 1;
//let granule_pos = 0;
let granule_pos =
calc_sr_u64((skip as usize + (i + 1) * frame_size) as u64, 48000, 48000);
let _ = packet_writer.write_packet(
frame.clone(),
serial,
if is_last {
ogg::PacketWriteEndInfo::EndStream
} else {
ogg::PacketWriteEndInfo::NormalPacket
},
granule_pos,
);
}
buffer
}
}
// Function to download data as a file
pub fn download_data(data: Vec<u8>, filename: &str) -> Result<(), JsValue> {
use wasm_bindgen::prelude::*;
use web_sys::{window, Blob, HtmlAnchorElement, Url};
// Create a new Blob from the data
let array = web_sys::js_sys::Uint8Array::from(&data[..]);
let blob = Blob::new_with_u8_array_sequence(&vec![array].into())?;
// Create a URL for the Blob
let url = Url::create_object_url_with_blob(&blob)?;
// Create an anchor element and set its href to the Blob URL
let document = window().unwrap().document().unwrap();
let a = document
.create_element("a")?
.dyn_into::<HtmlAnchorElement>()?;
a.set_href(&url);
a.set_download(filename);
// Append the anchor to the document body, click it, and remove it
document.body().unwrap().append_child(&a)?;
a.click();
document.body().unwrap().remove_child(&a)?;
// Revoke the object URL to free resources
Url::revoke_object_url(&url)?;
Ok(())
}
// Borrowed from
// https://github.com/security-union/videocall-rs/blob/main/videocall-client/src/decode/config.rs#L6
fn configure_audio_context() -> AudioContext {
let mut audio_context_options = AudioContextOptions::new();
audio_context_options.sample_rate(48000 as f32);
let audio_context = AudioContext::new_with_context_options(&audio_context_options).unwrap();
audio_context
}
trait PromiseExt {
fn into_future(self) -> JsFuture;
}
impl PromiseExt for Promise {
fn into_future(self) -> JsFuture {
self.into()
}
}
async fn create_encoder_worklet(
audio_context: &AudioContext,
packets: UnboundedSender<ControlPacket<mumble_protocol::Serverbound>>,
) -> Result<AudioWorkletNode, JsValue> {
let stream = window()
.unwrap()
.navigator()
.media_devices()?
.get_user_media_with_constraints(MediaStreamConstraints::new().audio(&JsValue::TRUE))?
.into_future()
.await?
.dyn_into()
.map_err(|e| JsError::new(&format!("not a stream: {e:?}")))?;
let options = WorkletOptions::new();
Reflect::set(
&options,
&"processorOptions".into(),
&wasm_bindgen::module(),
)?;
let module = "rust_mic_worklet.js";
console::log_1(&format!("Loading mic worklet from {module:?}").into());
audio_context
.audio_worklet()?
.add_module_with_options(module, &options)?
.into_future()
.await?;
let source = audio_context.create_media_stream_source(&stream)?;
let worklet_node = AudioWorkletNode::new(audio_context, "rust_mic_worklet")?;
let error: Closure<dyn FnMut(JsValue)> = Closure::new(|e| console::error_1(&e));
let mut download_buffer = std::cell::RefCell::new(Vec::new());
// This knows what MediaStreamTrackGenerator to use as it closes around it
let mut sequence_num = 0;
let output: Closure<dyn FnMut(EncodedAudioChunk)> =
Closure::new(move |audio_data: EncodedAudioChunk| {
let mut array = vec![0u8; audio_data.byte_length() as usize];
audio_data.copy_to_with_u8_array(&mut array);
download_buffer.borrow_mut().push(array.clone());
if download_buffer.borrow().len() > 200 {
//download_data(download_buffer.borrow().to_vec(), "download_buffer.opus");
//download_data(
// ass::encode(download_buffer.borrow().to_vec(), 960, 0),
// "download_buffer.opus",
//);
download_buffer.borrow_mut().clear();
}
let _ =
packets.unbounded_send(ControlPacket::UDPTunnel(Box::new(VoicePacket::Audio {
_dst: std::marker::PhantomData,
target: 0,
session_id: (),
seq_num: sequence_num,
payload: VoicePacketPayload::Opus(array.into(), false),
position_info: None,
})));
sequence_num = sequence_num.wrapping_add(2);
});
let audio_encoder = AudioEncoder::new(&AudioEncoderInit::new(
error.as_ref().unchecked_ref(),
output.as_ref().unchecked_ref(),
))
.unwrap();
// This is required to prevent these from being deallocated
error.forget();
output.forget();
let encoder_config = AudioEncoderConfig::new("opus");
encoder_config.set_number_of_channels(1);
encoder_config.set_sample_rate(48000);
encoder_config.set_bitrate(72_000.0);
audio_encoder.configure(&encoder_config);
console::log_1(&"Created Audio Encoder".into());
let mut download_buffer = std::cell::RefCell::new(Vec::new());
let onmessage: Closure<dyn FnMut(MessageEvent)> = Closure::new(move |event: MessageEvent| {
match AudioData::new(event.data().unchecked_ref()) {
Ok(data) => {
let x = web_sys::AudioDataCopyToOptions::new(0);
x.set_format(web_sys::AudioSampleFormat::F32);
let mut sub_buffer = vec![0; data.allocation_size(&x).unwrap() as usize];
data.copy_to_with_u8_array(&mut sub_buffer, &x);
download_buffer.borrow_mut().append(&mut sub_buffer);
if download_buffer.borrow().len() > 48000 * 10 * 4 {
//pub fn download_data(data: Vec<u8>, filename: &str) -> Result<(), JsValue> {
//download_data(download_buffer.borrow().to_vec(), "download_buffer.pcm32");
download_buffer.borrow_mut().clear();
}
audio_encoder.encode(&data);
}
Err(err) => {
console::error_1(&err);
console::debug_1(&event);
}
}
});
Reflect::set(
&Reflect::get(&worklet_node, &"port".into())?,
&"onmessage".into(),
onmessage.as_ref(),
)?;
onmessage.forget();
source.connect_with_audio_node(&worklet_node)?;
worklet_node.connect_with_audio_node(&audio_context.destination())?;
Ok(worklet_node)
}
fn create_decoder(audio_context: &AudioContext) -> Result<AudioDecoder, JsValue> {
let audio_stream_generator =
MediaStreamTrackGenerator::new(&MediaStreamTrackGeneratorInit::new("audio"))?;
// Create MediaStream from MediaStreamTrackGenerator
let js_tracks = web_sys::js_sys::Array::new();
js_tracks.push(&audio_stream_generator);
let media_stream = MediaStream::new_with_tracks(&js_tracks)?;
// Create MediaStreamAudioSourceNode
let audio_source = audio_context.create_media_stream_source(&media_stream)?;
// Connect output of audio_source to audio_context (browser audio)
audio_source.connect_with_audio_node(&audio_context.destination())?;
// Create callback functions for AudioDecoder
let error = Closure::wrap(Box::new(move |e: JsValue| {
console::error_1(&e);
}) as Box<dyn FnMut(JsValue)>);
// This knows what MediaStreamTrackGenerator to use as it closes around it
let output = Closure::wrap(Box::new(move |audio_data: AudioData| {
let writable = audio_stream_generator.writable();
if writable.locked() {
return;
}
if let Err(e) = writable.get_writer().map(|writer| {
spawn(async move {
if let Err(e) = JsFuture::from(writer.ready()).await {
console::error_1(&format!("write chunk ready error {:?}", e).into());
}
if let Err(e) = JsFuture::from(writer.write_with_chunk(&audio_data)).await {
console::error_1(&format!("write chunk error {:?}", e).into());
#[macro_export]
macro_rules! bail {
($($x:tt)*) => {
return Err(crate::Error::new(format!($($x)*)))
};
writer.release_lock();
});
}) {
console::error_1(&e);
}
}) as Box<dyn FnMut(AudioData)>);
let audio_decoder = AudioDecoder::new(&AudioDecoderInit::new(
error.as_ref().unchecked_ref(),
output.as_ref().unchecked_ref(),
))?;
audio_decoder.configure(&AudioDecoderConfig::new("opus", 1, 48000));
console::log_1(&"Created Audio Decoder".into());
// This is required to prevent these from being deallocated
error.forget();
output.forget();
Ok(audio_decoder)
}
pub async fn network_entrypoint(mut event_rx: UnboundedReceiver<Command>) {
@@ -353,201 +51,17 @@ pub async fn network_entrypoint(mut event_rx: UnboundedReceiver<Command>) {
panic!("Did not receive connect command")
};
if let Err(error) = network_connect(address, username, &mut event_rx).await {
console::error_1(&error);
let text = js_sys::Object::from(error).to_string().into();
*STATE.status.write() = ConnectionState::Failed(text);
if let Err(error) = imp::network_connect(address, username, &mut event_rx).await {
error.log();
*STATE.status.write() = ConnectionState::Failed(error.to_string());
}
}
}
macro_rules! bail {
($($x:tt)*) => {
return Err(wasm_bindgen::JsError::new(&format!($($x)*)).into())
};
}
async fn network_connect(
address: String,
username: String,
event_rx: &mut UnboundedReceiver<Command>,
) -> Result<(), JsValue> {
*STATE.server.write() = Default::default();
*STATE.status.write() = ConnectionState::Connecting;
console::log_1(&"Rust via WASM!".into());
let Ok(server_hash): Result<Vec<u8>, _> = include_str!("../server_hash.txt")
.trim()
.trim_matches(&['[', ']'])
.split(',')
.map(|x| x.trim().parse())
.collect()
else {
bail!("could not parse server hash");
};
let hash = web_sys::js_sys::Uint8Array::from(server_hash.as_slice());
let object = web_sys::js_sys::Object::new();
Reflect::set(
&object,
&JsValue::from_str("algorithm"),
&JsValue::from_str("sha-256"),
)?;
web_sys::js_sys::Reflect::set(&object, &"value".into(), &hash)?;
let array = web_sys::js_sys::Array::new();
array.push(&object);
console::log_1(&object.clone().into());
console::log_1(&"Created option object!".into());
let mut options = WebTransportOptions::new();
options.server_certificate_hashes(&array);
console::log_1(&"Created WebTransportOptions!".into());
let transport = WebTransport::new_with_options(&address, &options)?;
console::log_1(&"Created WebTransport connection object.".into());
console::log_1(&transport.clone().into());
if let Err(e) = wasm_bindgen_futures::JsFuture::from(transport.ready()).await {
bail!("could not connect to transport: {e:?}");
}
console::log_1(&"Transport is ready.".into());
let stream: WebTransportBidirectionalStream =
wasm_bindgen_futures::JsFuture::from(transport.create_bidirectional_stream())
.await?
.into();
let wasm_stream_readable = wasm_streams::ReadableStream::from_raw(stream.readable().into());
let wasm_stream_writable = wasm_streams::WritableStream::from_raw(stream.writable().into());
let read_codec = ClientControlCodec::new();
let write_codec = ClientControlCodec::new();
let mut reader =
asynchronous_codec::FramedRead::new(wasm_stream_readable.into_async_read(), read_codec);
let mut writer =
asynchronous_codec::FramedWrite::new(wasm_stream_writable.into_async_write(), write_codec);
let (mut send_chan, mut writer_recv_chan) = futures_channel::mpsc::unbounded();
// Spawn worker to send packets.
spawn(async move {
while let Some(msg) = writer_recv_chan.next().await {
if !matches!(msg, ControlPacket::Ping(_) | ControlPacket::UDPTunnel(_)) {
console::log_1(&format!("sending {:#?}", msg).into());
}
if let Err(e) = writer.send(msg).await {
console::error_1(&e.to_string().into());
break;
}
}
});
// Get version packet
let version = match reader.next().await {
Some(Ok(v)) => v,
Some(Err(err)) => bail!("bad version packet: {err:?}"),
None => bail!("no version was recieved"),
};
console::log_1(&"Got version packet".into());
console::log_1(&format!("{:#?}", version).into());
// Send version packet
let mut msg = msgs::Version::new();
msg.set_version(0x000010204);
msg.set_release(format!("{} {}", "mumbleweb2", "6.9.0"));
//msg.set_os("Chrome".to_string());
send_chan.send(msg.into()).await.unwrap();
// Send authenticate packet
let mut msg = msgs::Authenticate::new();
msg.set_username(username);
msg.set_opus(true);
send_chan.send(msg.into()).await.unwrap();
// Spawn worker to send pings
{
let mut send_chan = send_chan.clone();
spawn(async move {
loop {
if let Err(e) = send_chan.send(msgs::Ping::new().into()).await {
console::log_1(&"could not ping".into());
console::log_1(&e.to_string().into());
break;
}
TimeoutFuture::new(3000).await;
}
});
}
// Create MediaStreams to playback decoded audio
// The audio context is used to reproduce audio.
let audio_context = configure_audio_context();
let audio_context_worklet = audio_context.clone();
let packet_sender_worklet = send_chan.clone();
spawn(async move {
match create_encoder_worklet(&audio_context_worklet, packet_sender_worklet).await {
Ok(node) => console::log_2(&"Created audio worklet:".into(), &node),
Err(err) => console::error_1(&err),
}
});
// Create map of session_id -> AudioDecoder
let mut decoder_map = HashMap::new();
let mut reader_future = reader.next().fuse();
let mut command_future = event_rx.next();
loop {
select! {
packet = reader_future => {
reader_future = reader.next().fuse();
match packet {
Some(Ok(msg)) => {
let res =accept_packet(msg, &audio_context, &mut decoder_map);
if let Err(err) = res {
console::error_1(&err.into());
}
},
Some(Err(err)) => console::error_1(&err.to_string().into()),
None => break,
}
}
command = command_future => {
command_future = event_rx.next();
if let Some(command) = &command {
console::log_1(&format!("commanding {:#?}", command).into());
}
match command {
Some(Command::Disconnect) => break,
Some(command) => {
let res = accept_command(command, &mut send_chan);
if let Err(err) = res {
console::error_1(&err.into());
}
}
None => continue,
}
}
}
}
let _ = send_chan.close();
*STATE.status.write() = ConnectionState::Disconnected;
Ok(())
}
fn accept_command(
command: Command,
send_chan: &mut UnboundedSender<ControlPacket<mumble_protocol::Serverbound>>,
) -> Result<(), JsValue> {
) -> Result<(), Error> {
match command {
Command::SendChat { markdown, channels } => {
use markdown::*;
@@ -591,12 +105,9 @@ fn accept_command(
fn accept_packet(
msg: ControlPacket<mumble_protocol::Clientbound>,
audio_context: &AudioContext,
decoder_map: &mut HashMap<u32, AudioDecoder>,
) -> Result<(), JsValue> {
if !matches!(msg, ControlPacket::UDPTunnel(_) | ControlPacket::Ping(_)) {
console::log_1(&format!("receiving {:#?}", msg).into());
}
audio_context: &imp::AudioContext,
player_map: &mut HashMap<u32, imp::AudioPlayer>,
) -> Result<(), Error> {
match msg {
ControlPacket::UDPTunnel(u) => {
match *u.clone() {
@@ -609,10 +120,10 @@ fn accept_packet(
position_info,
} => {
// Get or create audio decoder for this user
let audio_decoder = match decoder_map.entry(session_id) {
let audio_player = match player_map.entry(session_id) {
Entry::Occupied(occupied_entry) => occupied_entry.into_mut(),
Entry::Vacant(vacant_entry) => {
vacant_entry.insert(create_decoder(&audio_context)?)
vacant_entry.insert(imp::create_player(&audio_context)?)
}
};
// This will over time (as users join and leave) leak
@@ -621,15 +132,7 @@ fn accept_packet(
// infra on channel join and update it as new users join the channel, dropping
// any audio packets that come in the meantime.
if let VoicePacketPayload::Opus(audio_payload, end_bit) = payload {
let js_audio_payload = Uint8Array::from(audio_payload.as_ref());
let _ = audio_decoder.decode(
&EncodedAudioChunk::new(&EncodedAudioChunkInit::new(
&js_audio_payload.into(),
0.0,
EncodedAudioChunkType::Key,
))
.unwrap(),
);
audio_player.play_opus(audio_payload.as_ref());
//console::log_1(&"Oueued audio chunk for decoding".into());
}
}