Split into gui and client crates (#30)
Reviewed-on: #30 Reviewed-by: restitux <restitux@ohea.xyz> Co-authored-by: Sam Sartor <me@samsartor.com> Co-committed-by: Sam Sartor <me@samsartor.com>
This commit was merged in pull request #30.
This commit is contained in:
@@ -0,0 +1,571 @@
|
||||
use crate::app::{Command, SharedState};
|
||||
use crate::effects::{AudioProcessor, AudioProcessorSender, TransmitState};
|
||||
use color_eyre::eyre::{bail, eyre, Error};
|
||||
use crossbeam::atomic::AtomicCell;
|
||||
use futures_channel::mpsc::UnboundedReceiver;
|
||||
use gloo_timers::future::TimeoutFuture;
|
||||
use js_sys::Float32Array;
|
||||
use manganis::asset;
|
||||
use mumble_protocol::control::ClientControlCodec;
|
||||
use mumble_web2_common::{ProxyOverrides, ServerStatus};
|
||||
use reqwest::Url;
|
||||
use std::collections::HashMap;
|
||||
use std::future::Future;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tracing::level_filters::LevelFilter;
|
||||
use tracing::{debug, error, info, instrument};
|
||||
use wasm_bindgen::prelude::*;
|
||||
use wasm_bindgen_futures::JsFuture;
|
||||
use web_sys::js_sys::{Promise, Reflect, Uint8Array};
|
||||
use web_sys::AudioContextOptions;
|
||||
use web_sys::AudioData;
|
||||
use web_sys::AudioDecoder;
|
||||
use web_sys::AudioDecoderConfig;
|
||||
use web_sys::AudioDecoderInit;
|
||||
use web_sys::AudioEncoder;
|
||||
use web_sys::AudioEncoderConfig;
|
||||
use web_sys::AudioEncoderInit;
|
||||
use web_sys::AudioWorkletNode;
|
||||
use web_sys::EncodedAudioChunk;
|
||||
use web_sys::EncodedAudioChunkInit;
|
||||
use web_sys::EncodedAudioChunkType;
|
||||
use web_sys::MediaStreamConstraints;
|
||||
use web_sys::MessageEvent;
|
||||
use web_sys::WebTransport;
|
||||
use web_sys::WebTransportBidirectionalStream;
|
||||
use web_sys::WebTransportOptions;
|
||||
use web_sys::WorkletOptions;
|
||||
use web_sys::{console, window};
|
||||
use web_sys::{AudioContext, AudioDataCopyToOptions};
|
||||
|
||||
#[allow(unused)]
|
||||
pub use wasm_bindgen_futures::spawn_local as spawn;
|
||||
|
||||
#[allow(unused)]
|
||||
#[derive(Clone)]
|
||||
pub struct SpawnHandle;
|
||||
|
||||
impl SpawnHandle {
|
||||
pub fn spawn<F>(&self, future: F)
|
||||
where
|
||||
F: Future<Output = ()> + 'static,
|
||||
{
|
||||
wasm_bindgen_futures::spawn_local(future);
|
||||
}
|
||||
|
||||
pub fn current() -> Self {
|
||||
SpawnHandle
|
||||
}
|
||||
}
|
||||
|
||||
/// Web platform implementation using WebTransport and Web Audio API.
|
||||
pub struct WebPlatform;
|
||||
|
||||
impl super::PlatformInterface for WebPlatform {
|
||||
type AudioSystem = WebAudioSystem;
|
||||
type ConfigSystem = WebConfigSystem;
|
||||
|
||||
fn init_logging() {
|
||||
// copied from tracing_web example usage
|
||||
|
||||
use tracing_subscriber::fmt::format::Pretty;
|
||||
use tracing_subscriber::prelude::*;
|
||||
use tracing_web::{performance_layer, MakeWebConsoleWriter};
|
||||
|
||||
let fmt_layer = tracing_subscriber::fmt::layer()
|
||||
.with_ansi(false) // Only partially supported across browsers
|
||||
.without_time() // std::time is not available in browsers
|
||||
.with_writer(MakeWebConsoleWriter::new()) // write events to the console
|
||||
.with_filter(LevelFilter::DEBUG);
|
||||
let perf_layer = performance_layer().with_details_from_fields(Pretty::default());
|
||||
|
||||
tracing_subscriber::registry()
|
||||
.with(fmt_layer)
|
||||
.with(perf_layer)
|
||||
.init();
|
||||
|
||||
info!("logging initiated");
|
||||
}
|
||||
|
||||
fn request_permissions() {
|
||||
// No-op on web
|
||||
}
|
||||
|
||||
async fn load_proxy_overrides() -> color_eyre::Result<ProxyOverrides> {
|
||||
let overrides = match option_env!("MUMBLE_WEB2_PROXY_OVERRIDES_URL") {
|
||||
Some(url) => Url::parse(url)?,
|
||||
None => absolute_url("overrides")?,
|
||||
};
|
||||
info!("loading config from {}", overrides);
|
||||
|
||||
let config = reqwest::get(overrides)
|
||||
.await?
|
||||
.json::<ProxyOverrides>()
|
||||
.await?;
|
||||
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
async fn network_connect(
|
||||
address: String,
|
||||
username: String,
|
||||
event_rx: &mut UnboundedReceiver<Command>,
|
||||
overrides: &ProxyOverrides,
|
||||
state: SharedState,
|
||||
) -> Result<(), Error> {
|
||||
network_connect(address, username, event_rx, overrides, state).await
|
||||
}
|
||||
|
||||
async fn get_status(client: &reqwest::Client) -> color_eyre::Result<ServerStatus> {
|
||||
Ok(client
|
||||
.get(absolute_url("status")?)
|
||||
.send()
|
||||
.await?
|
||||
.json::<ServerStatus>()
|
||||
.await?)
|
||||
}
|
||||
|
||||
async fn sleep(duration: Duration) {
|
||||
TimeoutFuture::new(duration.as_millis() as u32).await;
|
||||
}
|
||||
}
|
||||
|
||||
trait ResultExt<T> {
|
||||
fn ey(self) -> Result<T, Error>;
|
||||
}
|
||||
|
||||
impl<T> ResultExt<T> for Result<T, JsValue> {
|
||||
fn ey(self) -> Result<T, Error> {
|
||||
match self {
|
||||
Ok(x) => Ok(x),
|
||||
Err(e) => match e.dyn_into::<js_sys::Error>() {
|
||||
Ok(e) => Err(eyre!("{}: {}", e.name(), e.message())),
|
||||
Err(e) => Err(eyre!("{:?}", e)),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> ResultExt<T> for Result<T, JsError> {
|
||||
fn ey(self) -> Result<T, Error> {
|
||||
self.map_err(|e| JsValue::from(e)).ey()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct WebAudioSystem {
|
||||
webctx: AudioContext,
|
||||
processors: AudioProcessorSender,
|
||||
}
|
||||
|
||||
async fn attach_worklet(audio_context: &AudioContext, worklet_url: &str) -> Result<(), Error> {
|
||||
// Create worklets to process mic and speaker audio
|
||||
// Speaker audio processing worklet only required on
|
||||
// browsers that don't support MediaStreamTrackGenerator
|
||||
|
||||
let options = WorkletOptions::new();
|
||||
Reflect::set(
|
||||
&options,
|
||||
&"processorOptions".into(),
|
||||
&wasm_bindgen::module(),
|
||||
)
|
||||
.ey()?;
|
||||
|
||||
info!("loading mic worklet from {worklet_url:?}");
|
||||
audio_context
|
||||
.audio_worklet()
|
||||
.ey()?
|
||||
.add_module_with_options(worklet_url, &options)
|
||||
.ey()?
|
||||
.into_future()
|
||||
.await
|
||||
.ey()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl super::AudioSystemInterface for WebAudioSystem {
|
||||
type AudioPlayer = WebAudioPlayer;
|
||||
|
||||
async fn new() -> Result<Self, Error> {
|
||||
// Create MediaStreams to playback decoded audio
|
||||
// The audio context is used to reproduce audio.
|
||||
let webctx = configure_audio_context();
|
||||
attach_worklet(
|
||||
&webctx,
|
||||
&asset!("/assets/rust_audio_worklet.js").to_string(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let processors = AudioProcessorSender::default();
|
||||
|
||||
Ok(WebAudioSystem { webctx, processors })
|
||||
}
|
||||
|
||||
fn set_processor(&self, processor: AudioProcessor) {
|
||||
self.processors.store(Some(processor))
|
||||
}
|
||||
|
||||
fn start_recording(&mut self, each: impl FnMut(Vec<u8>, bool) + 'static) -> Result<(), Error> {
|
||||
let audio_context_worklet = self.webctx.clone();
|
||||
let processors = self.processors.clone();
|
||||
spawn(async move {
|
||||
match run_encoder_worklet(&audio_context_worklet, each, processors).await {
|
||||
Ok(node) => info!("created encoder worklet: {:?}", &node),
|
||||
Err(err) => error!("could not create encoder worklet: {err}"),
|
||||
}
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn create_player(&mut self) -> Result<WebAudioPlayer, Error> {
|
||||
let sink_node = AudioWorkletNode::new(&self.webctx, "rust_speaker_worklet").ey()?;
|
||||
|
||||
// Connect worklet to destination
|
||||
sink_node
|
||||
.connect_with_audio_node(&self.webctx.destination())
|
||||
.ey()?;
|
||||
|
||||
// Create callback functions for AudioDecoder
|
||||
let decoder_error = Closure::wrap(Box::new(move |e: JsValue| {
|
||||
error!("error decoding audio {:?}", e);
|
||||
}) as Box<dyn FnMut(JsValue)>);
|
||||
|
||||
let sink_port = sink_node.port().ey()?;
|
||||
|
||||
let output = Closure::wrap(Box::new(move |audio_data: AudioData| {
|
||||
// Extract planar PCM from AudioData into an ArrayBuffer or Float32Array
|
||||
// Here we assume f32 samples, 1 channel for brevity.
|
||||
let number_of_frames = audio_data.number_of_frames();
|
||||
|
||||
let js_buffer = Float32Array::new_with_length(number_of_frames);
|
||||
|
||||
let audio_data_copy_to_options = &AudioDataCopyToOptions::new(0);
|
||||
audio_data_copy_to_options.set_format(web_sys::AudioSampleFormat::F32);
|
||||
|
||||
if let Err(e) = audio_data
|
||||
.copy_to_with_buffer_source(&js_buffer.buffer(), &audio_data_copy_to_options)
|
||||
{
|
||||
error!("could not copy audio data to array {:?}", e);
|
||||
}
|
||||
|
||||
// Post to the worklet; include sampleRate and channel count if needed.
|
||||
let msg = js_sys::Object::new();
|
||||
js_sys::Reflect::set(&msg, &"samples".into(), &js_buffer).unwrap();
|
||||
|
||||
sink_port.post_message(&msg).unwrap();
|
||||
|
||||
audio_data.close();
|
||||
}) as Box<dyn FnMut(AudioData)>);
|
||||
|
||||
let audio_decoder = AudioDecoder::new(&AudioDecoderInit::new(
|
||||
decoder_error.as_ref().unchecked_ref(),
|
||||
output.as_ref().unchecked_ref(),
|
||||
))
|
||||
.ey()?;
|
||||
|
||||
audio_decoder.configure(&AudioDecoderConfig::new("opus", 1, 48000));
|
||||
info!("created audio decoder");
|
||||
|
||||
// This is required to prevent these from being deallocated
|
||||
decoder_error.forget();
|
||||
output.forget();
|
||||
|
||||
Ok(WebAudioPlayer(audio_decoder))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct WebAudioPlayer(AudioDecoder);
|
||||
|
||||
impl super::AudioPlayerInterface for WebAudioPlayer {
|
||||
fn play_opus(&mut self, payload: &[u8]) {
|
||||
let js_audio_payload = Uint8Array::from(payload);
|
||||
let _ = self.0.decode(
|
||||
&EncodedAudioChunk::new(&EncodedAudioChunkInit::new(
|
||||
&js_audio_payload.into(),
|
||||
0.0,
|
||||
EncodedAudioChunkType::Key,
|
||||
))
|
||||
.unwrap(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Borrowed from
|
||||
// https://github.com/security-union/videocall-rs/blob/main/videocall-client/src/decode/config.rs#L6
|
||||
fn configure_audio_context() -> AudioContext {
|
||||
let audio_context_options = AudioContextOptions::new();
|
||||
audio_context_options.set_sample_rate(48000 as f32);
|
||||
let audio_context = AudioContext::new_with_context_options(&audio_context_options).unwrap();
|
||||
audio_context
|
||||
}
|
||||
|
||||
trait PromiseExt {
|
||||
fn into_future(self) -> JsFuture;
|
||||
}
|
||||
|
||||
impl PromiseExt for Promise {
|
||||
fn into_future(self) -> JsFuture {
|
||||
self.into()
|
||||
}
|
||||
}
|
||||
|
||||
fn process_audio(frame: &JsValue, processor: &mut AudioProcessor) -> TransmitState {
|
||||
let Ok(samples) = Reflect::get(&frame, &"data".into()) else {
|
||||
return TransmitState::Silent;
|
||||
};
|
||||
let Ok(samples) = samples.dyn_into::<Float32Array>() else {
|
||||
return TransmitState::Silent;
|
||||
};
|
||||
let input = samples.to_vec();
|
||||
let mut output = Vec::with_capacity(input.len());
|
||||
let state = processor.process(&input, 1, &mut output);
|
||||
if !output.is_empty() {
|
||||
samples.copy_from(&output);
|
||||
}
|
||||
|
||||
state
|
||||
}
|
||||
|
||||
async fn run_encoder_worklet(
|
||||
audio_context: &AudioContext,
|
||||
mut each: impl FnMut(Vec<u8>, bool) + 'static,
|
||||
processors: AudioProcessorSender,
|
||||
) -> Result<AudioWorkletNode, Error> {
|
||||
let constraints = MediaStreamConstraints::new();
|
||||
constraints.set_audio(&JsValue::TRUE);
|
||||
let stream = window()
|
||||
.unwrap()
|
||||
.navigator()
|
||||
.media_devices()
|
||||
.ey()?
|
||||
.get_user_media_with_constraints(&constraints)
|
||||
.ey()?
|
||||
.into_future()
|
||||
.await
|
||||
.ey()?
|
||||
.dyn_into()
|
||||
.map_err(|e| JsError::new(&format!("not a stream: {e:?}")))
|
||||
.ey()?;
|
||||
|
||||
let source = audio_context.create_media_stream_source(&stream).ey()?;
|
||||
let worklet_node = AudioWorkletNode::new(audio_context, "rust_mic_worklet").ey()?;
|
||||
|
||||
let encoder_error: Closure<dyn FnMut(JsValue)> =
|
||||
Closure::new(|e| error!("error encoding audio {:?}", e));
|
||||
|
||||
// Shared state to signal terminator between onmessage and output closures
|
||||
// The output closure runs asynchronously after encoding completes
|
||||
let pending_terminator = Arc::new(AtomicCell::new(false));
|
||||
let pending_terminator_output = pending_terminator.clone();
|
||||
|
||||
// This knows what MediaStreamTrackGenerator to use as it closes around it
|
||||
let output: Closure<dyn FnMut(EncodedAudioChunk)> =
|
||||
Closure::new(move |audio_data: EncodedAudioChunk| {
|
||||
let mut array = vec![0u8; audio_data.byte_length() as usize];
|
||||
audio_data.copy_to_with_u8_slice(&mut array);
|
||||
// Check if this frame was marked as a terminator
|
||||
let is_terminator = pending_terminator_output.swap(false);
|
||||
each(array, is_terminator);
|
||||
});
|
||||
|
||||
let audio_encoder = AudioEncoder::new(&AudioEncoderInit::new(
|
||||
encoder_error.as_ref().unchecked_ref(),
|
||||
output.as_ref().unchecked_ref(),
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
// This is required to prevent these from being deallocated
|
||||
encoder_error.forget();
|
||||
output.forget();
|
||||
let encoder_config = AudioEncoderConfig::new("opus");
|
||||
encoder_config.set_number_of_channels(1);
|
||||
encoder_config.set_sample_rate(48000);
|
||||
encoder_config.set_bitrate(72_000.0);
|
||||
|
||||
audio_encoder.configure(&encoder_config);
|
||||
info!("created audio encoder");
|
||||
|
||||
let mut current_processor = AudioProcessor::new(false);
|
||||
let onmessage: Closure<dyn FnMut(MessageEvent)> = Closure::new(move |event: MessageEvent| {
|
||||
if let Some(new_processor) = processors.take() {
|
||||
current_processor = new_processor;
|
||||
}
|
||||
|
||||
let frame = event.data();
|
||||
let state = process_audio(&frame, &mut current_processor);
|
||||
|
||||
match state {
|
||||
TransmitState::Silent => {
|
||||
// Don't encode or send anything
|
||||
return;
|
||||
}
|
||||
TransmitState::Transmitting => (), // Normal transmission
|
||||
TransmitState::Terminator => {
|
||||
// Mark this as a terminator before encoding
|
||||
pending_terminator.store(true);
|
||||
}
|
||||
}
|
||||
match AudioData::new(frame.unchecked_ref()) {
|
||||
Ok(data) => {
|
||||
let _ = audio_encoder.encode(&data);
|
||||
}
|
||||
Err(err) => {
|
||||
error!(
|
||||
"error creating AudioData object {:?} during event {:?}",
|
||||
err, event,
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
Reflect::set(
|
||||
&Reflect::get(&worklet_node, &"port".into()).ey()?,
|
||||
&"onmessage".into(),
|
||||
onmessage.as_ref(),
|
||||
)
|
||||
.ey()?;
|
||||
onmessage.forget();
|
||||
|
||||
source.connect_with_audio_node(&worklet_node).ey()?;
|
||||
worklet_node
|
||||
.connect_with_audio_node(&audio_context.destination())
|
||||
.ey()?;
|
||||
|
||||
Ok(worklet_node)
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
pub async fn network_connect(
|
||||
address: String,
|
||||
username: String,
|
||||
event_rx: &mut UnboundedReceiver<Command>,
|
||||
overrides: &ProxyOverrides,
|
||||
state: SharedState,
|
||||
) -> Result<(), Error> {
|
||||
info!("connecting");
|
||||
|
||||
let object = web_sys::js_sys::Object::new();
|
||||
|
||||
Reflect::set(
|
||||
&object,
|
||||
&JsValue::from_str("algorithm"),
|
||||
&JsValue::from_str("sha-256"),
|
||||
)
|
||||
.ey()?;
|
||||
|
||||
if let Some(server_hash) = &overrides.cert_hash {
|
||||
let hash = web_sys::js_sys::Uint8Array::from(server_hash.as_slice());
|
||||
web_sys::js_sys::Reflect::set(&object, &"value".into(), &hash).ey()?;
|
||||
}
|
||||
|
||||
let array = web_sys::js_sys::Array::new();
|
||||
array.push(&object);
|
||||
|
||||
debug!("created option object: {:?}", &object);
|
||||
|
||||
let mut options = WebTransportOptions::new();
|
||||
options.set_server_certificate_hashes(&array);
|
||||
|
||||
debug!("created WebTransportOptions");
|
||||
console::log_1(&options.clone().into());
|
||||
|
||||
let transport = WebTransport::new_with_options(&address, &options).ey()?;
|
||||
debug!("created WebTransport connection object");
|
||||
console::log_1(&transport.clone().into());
|
||||
|
||||
if let Err(e) = wasm_bindgen_futures::JsFuture::from(transport.ready())
|
||||
.await
|
||||
.ey()
|
||||
{
|
||||
bail!("could not connect to transport: {e}");
|
||||
}
|
||||
|
||||
info!("transport is ready");
|
||||
|
||||
let stream: WebTransportBidirectionalStream =
|
||||
wasm_bindgen_futures::JsFuture::from(transport.create_bidirectional_stream())
|
||||
.await
|
||||
.ey()?
|
||||
.into();
|
||||
|
||||
let wasm_stream_readable = wasm_streams::ReadableStream::from_raw(stream.readable().into());
|
||||
let wasm_stream_writable = wasm_streams::WritableStream::from_raw(stream.writable().into());
|
||||
|
||||
let read_codec = ClientControlCodec::new();
|
||||
let write_codec = ClientControlCodec::new();
|
||||
|
||||
let reader =
|
||||
asynchronous_codec::FramedRead::new(wasm_stream_readable.into_async_read(), read_codec);
|
||||
let writer =
|
||||
asynchronous_codec::FramedWrite::new(wasm_stream_writable.into_async_write(), write_codec);
|
||||
|
||||
let (outgoing_send, outgoing_recv) = futures_channel::mpsc::unbounded();
|
||||
spawn(crate::sender_loop(outgoing_recv, writer));
|
||||
crate::network_loop(username, state, event_rx, outgoing_send, reader).await
|
||||
}
|
||||
|
||||
pub fn absolute_url(path: &str) -> Result<Url, Error> {
|
||||
let window: web_sys::Window = web_sys::window().expect("no global `window` exists");
|
||||
let location = window.location();
|
||||
Ok(Url::parse(&location.href().ey()?)?.join(path)?)
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq)]
|
||||
pub struct WebConfigSystem {}
|
||||
|
||||
impl super::ConfigSystemInterface for WebConfigSystem {
|
||||
fn new() -> Result<Self, Error> {
|
||||
return Ok(WebConfigSystem {});
|
||||
}
|
||||
|
||||
fn config_get<T>(&self, key: &str) -> Option<T>
|
||||
where
|
||||
T: serde::de::DeserializeOwned,
|
||||
{
|
||||
// Get Storage
|
||||
let storage = web_sys::window()?.local_storage().ok()??;
|
||||
|
||||
// Try localStorage first
|
||||
if let Ok(Some(raw)) = storage.get_item(key) {
|
||||
if let Ok(parsed) = serde_json::from_str::<T>(&raw) {
|
||||
return Some(parsed);
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback to default if deserialization fails or key missing
|
||||
let default_value = config_get_default(key)?;
|
||||
serde_json::from_value::<T>(default_value).ok()
|
||||
}
|
||||
|
||||
fn config_set<T>(&self, key: &str, value: &T)
|
||||
where
|
||||
T: serde::Serialize,
|
||||
{
|
||||
let storage = window()
|
||||
.and_then(|w| w.local_storage().ok().flatten())
|
||||
.expect("localStorage not available");
|
||||
|
||||
let json_value =
|
||||
serde_json::to_string(value).expect("failed to serialize config value to JSON string");
|
||||
|
||||
storage
|
||||
.set_item(key, &json_value)
|
||||
.expect("failed to write to localStorage");
|
||||
}
|
||||
}
|
||||
|
||||
fn config_get_default(key: &str) -> Option<serde_json::Value> {
|
||||
let default_config = platform_default_config();
|
||||
default_config
|
||||
.get(key)
|
||||
.cloned()
|
||||
.or(super::global_default_config().get(key).cloned())
|
||||
}
|
||||
|
||||
fn platform_default_config() -> HashMap<String, serde_json::Value> {
|
||||
serde_json::json!({})
|
||||
.as_object()
|
||||
.unwrap()
|
||||
.clone()
|
||||
.into_iter()
|
||||
.collect()
|
||||
}
|
||||
Reference in New Issue
Block a user