65883917b0
(Turns out not) Pretty simple, if the average amplitude is under a certain value clear out the buffer! The value I chose (.001) was an arbitrary value I got from printf debugging. I was able to show that this worked pretty well on the desktop session. Hopefully we'll add this to the settings page at some point. Once the app has been under that threshold for more than 200ms, we stop transmitting and send a terminator packet. Reviewed-on: #13 Reviewed-by: restitux <restitux@ohea.xyz>
512 lines
16 KiB
Rust
512 lines
16 KiB
Rust
use crate::app::Command;
|
|
use crate::effects::{AudioProcessor, AudioProcessorSender, TransmitState};
|
|
use color_eyre::eyre::{bail, eyre, Error};
|
|
use crossbeam::atomic::AtomicCell;
|
|
use dioxus::prelude::*;
|
|
use futures::{AsyncRead, AsyncWrite};
|
|
use gloo_timers::future::TimeoutFuture;
|
|
use js_sys::Float32Array;
|
|
use mumble_protocol::control::ClientControlCodec;
|
|
use mumble_web2_common::{ClientConfig, ServerStatus};
|
|
use reqwest::Url;
|
|
use std::future::Future;
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
use tracing::level_filters::LevelFilter;
|
|
use tracing::{debug, error, info, instrument};
|
|
use wasm_bindgen::prelude::*;
|
|
use wasm_bindgen_futures::JsFuture;
|
|
use web_sys::js_sys::{Promise, Reflect, Uint8Array};
|
|
use web_sys::AudioContextOptions;
|
|
use web_sys::AudioData;
|
|
use web_sys::AudioDecoder;
|
|
use web_sys::AudioDecoderConfig;
|
|
use web_sys::AudioDecoderInit;
|
|
use web_sys::AudioEncoder;
|
|
use web_sys::AudioEncoderConfig;
|
|
use web_sys::AudioEncoderInit;
|
|
use web_sys::AudioWorkletNode;
|
|
use web_sys::EncodedAudioChunk;
|
|
use web_sys::EncodedAudioChunkInit;
|
|
use web_sys::EncodedAudioChunkType;
|
|
use web_sys::MediaStream;
|
|
use web_sys::MediaStreamConstraints;
|
|
use web_sys::MessageEvent;
|
|
use web_sys::WebTransport;
|
|
use web_sys::WebTransportBidirectionalStream;
|
|
use web_sys::WebTransportOptions;
|
|
use web_sys::WorkletOptions;
|
|
use web_sys::{console, window};
|
|
use web_sys::{AudioContext, AudioDataCopyToOptions};
|
|
|
|
pub use wasm_bindgen_futures::spawn_local as spawn;
|
|
|
|
pub trait ImpRead: AsyncRead + Unpin + 'static {}
|
|
impl<T: AsyncRead + Unpin + 'static> ImpRead for T {}
|
|
|
|
pub trait ImpWrite: AsyncWrite + Unpin + 'static {}
|
|
impl<T: AsyncWrite + Unpin + 'static> ImpWrite for T {}
|
|
|
|
pub async fn sleep(d: Duration) {
|
|
TimeoutFuture::new(d.as_millis() as u32).await
|
|
}
|
|
|
|
trait ResultExt<T> {
|
|
fn ey(self) -> Result<T, Error>;
|
|
}
|
|
|
|
impl<T> ResultExt<T> for Result<T, JsValue> {
|
|
fn ey(self) -> Result<T, Error> {
|
|
match self {
|
|
Ok(x) => Ok(x),
|
|
Err(e) => match e.dyn_into::<js_sys::Error>() {
|
|
Ok(e) => Err(eyre!("{}: {}", e.name(), e.message())),
|
|
Err(e) => Err(eyre!("{:?}", e)),
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T> ResultExt<T> for Result<T, JsError> {
|
|
fn ey(self) -> Result<T, Error> {
|
|
self.map_err(|e| JsValue::from(e)).ey()
|
|
}
|
|
}
|
|
|
|
pub struct AudioSystem {
|
|
webctx: AudioContext,
|
|
processors: AudioProcessorSender,
|
|
}
|
|
|
|
async fn attach_worklet(audio_context: &AudioContext) -> Result<(), Error> {
|
|
// Create worklets to process mic and speaker audio
|
|
// Speaker audio processing worklet only required on
|
|
// browsers that don't support MediaStreamTrackGenerator
|
|
|
|
let options = WorkletOptions::new();
|
|
Reflect::set(
|
|
&options,
|
|
&"processorOptions".into(),
|
|
&wasm_bindgen::module(),
|
|
)
|
|
.ey()?;
|
|
|
|
let module = asset!("assets/rust_audio_worklet.js").to_string();
|
|
info!("loading mic worklet from {module:?}");
|
|
audio_context
|
|
.audio_worklet()
|
|
.ey()?
|
|
.add_module_with_options(&module, &options)
|
|
.ey()?
|
|
.into_future()
|
|
.await
|
|
.ey()?;
|
|
Ok(())
|
|
}
|
|
|
|
impl AudioSystem {
|
|
pub async fn new() -> Result<Self, Error> {
|
|
// Create MediaStreams to playback decoded audio
|
|
// The audio context is used to reproduce audio.
|
|
let webctx = configure_audio_context();
|
|
attach_worklet(&webctx).await?;
|
|
|
|
let processors = AudioProcessorSender::default();
|
|
|
|
Ok(AudioSystem { webctx, processors })
|
|
}
|
|
|
|
pub fn set_processor(&self, processor: AudioProcessor) {
|
|
self.processors.store(Some(processor))
|
|
}
|
|
|
|
pub fn start_recording(
|
|
&mut self,
|
|
each: impl FnMut(Vec<u8>, bool) + 'static,
|
|
) -> Result<(), Error> {
|
|
let audio_context_worklet = self.webctx.clone();
|
|
let processors = self.processors.clone();
|
|
spawn(async move {
|
|
match run_encoder_worklet(&audio_context_worklet, each, processors).await {
|
|
Ok(node) => info!("created encoder worklet: {:?}", &node),
|
|
Err(err) => error!("could not create encoder worklet: {err}"),
|
|
}
|
|
});
|
|
Ok(())
|
|
}
|
|
|
|
pub fn create_player(&mut self) -> Result<AudioPlayer, Error> {
|
|
let sink_node = AudioWorkletNode::new(&self.webctx, "rust_speaker_worklet").ey()?;
|
|
|
|
// Connect worklet to destination
|
|
sink_node
|
|
.connect_with_audio_node(&self.webctx.destination())
|
|
.ey()?;
|
|
|
|
// Create callback functions for AudioDecoder
|
|
let decoder_error = Closure::wrap(Box::new(move |e: JsValue| {
|
|
error!("error decoding audio {:?}", e);
|
|
}) as Box<dyn FnMut(JsValue)>);
|
|
|
|
let sink_port = sink_node.port().ey()?;
|
|
|
|
let output = Closure::wrap(Box::new(move |audio_data: AudioData| {
|
|
// Extract planar PCM from AudioData into an ArrayBuffer or Float32Array
|
|
// Here we assume f32 samples, 1 channel for brevity.
|
|
let number_of_frames = audio_data.number_of_frames();
|
|
|
|
let js_buffer = Float32Array::new_with_length(number_of_frames);
|
|
|
|
let audio_data_copy_to_options = &AudioDataCopyToOptions::new(0);
|
|
audio_data_copy_to_options.set_format(web_sys::AudioSampleFormat::F32);
|
|
|
|
if let Err(e) = audio_data
|
|
.copy_to_with_buffer_source(&js_buffer.buffer(), &audio_data_copy_to_options)
|
|
{
|
|
error!("could not copy audio data to array {:?}", e);
|
|
}
|
|
|
|
// Post to the worklet; include sampleRate and channel count if needed.
|
|
let msg = js_sys::Object::new();
|
|
js_sys::Reflect::set(&msg, &"samples".into(), &js_buffer).unwrap();
|
|
|
|
sink_port.post_message(&msg).unwrap();
|
|
|
|
audio_data.close();
|
|
}) as Box<dyn FnMut(AudioData)>);
|
|
|
|
let audio_decoder = AudioDecoder::new(&AudioDecoderInit::new(
|
|
decoder_error.as_ref().unchecked_ref(),
|
|
output.as_ref().unchecked_ref(),
|
|
))
|
|
.ey()?;
|
|
|
|
audio_decoder.configure(&AudioDecoderConfig::new("opus", 1, 48000));
|
|
info!("created audio decoder");
|
|
|
|
// This is required to prevent these from being deallocated
|
|
decoder_error.forget();
|
|
output.forget();
|
|
|
|
Ok(AudioPlayer(audio_decoder))
|
|
}
|
|
}
|
|
|
|
pub struct AudioPlayer(AudioDecoder);
|
|
|
|
impl AudioPlayer {
|
|
pub fn play_opus(&mut self, payload: &[u8]) {
|
|
let js_audio_payload = Uint8Array::from(payload);
|
|
let _ = self.0.decode(
|
|
&EncodedAudioChunk::new(&EncodedAudioChunkInit::new(
|
|
&js_audio_payload.into(),
|
|
0.0,
|
|
EncodedAudioChunkType::Key,
|
|
))
|
|
.unwrap(),
|
|
);
|
|
}
|
|
}
|
|
|
|
// Borrowed from
|
|
// https://github.com/security-union/videocall-rs/blob/main/videocall-client/src/decode/config.rs#L6
|
|
fn configure_audio_context() -> AudioContext {
|
|
let audio_context_options = AudioContextOptions::new();
|
|
audio_context_options.set_sample_rate(48000 as f32);
|
|
let audio_context = AudioContext::new_with_context_options(&audio_context_options).unwrap();
|
|
audio_context
|
|
}
|
|
|
|
trait PromiseExt {
|
|
fn into_future(self) -> JsFuture;
|
|
}
|
|
|
|
impl PromiseExt for Promise {
|
|
fn into_future(self) -> JsFuture {
|
|
self.into()
|
|
}
|
|
}
|
|
|
|
fn process_audio(frame: &JsValue, processor: &mut AudioProcessor) -> TransmitState {
|
|
let Ok(samples) = Reflect::get(&frame, &"data".into()) else {
|
|
return TransmitState::Silent;
|
|
};
|
|
let Ok(samples) = samples.dyn_into::<Float32Array>() else {
|
|
return TransmitState::Silent;
|
|
};
|
|
let input = samples.to_vec();
|
|
let mut output = Vec::with_capacity(input.len());
|
|
let state = processor.process(&input, 1, &mut output);
|
|
if !output.is_empty() {
|
|
samples.copy_from(&output);
|
|
}
|
|
|
|
state
|
|
}
|
|
|
|
async fn run_encoder_worklet(
|
|
audio_context: &AudioContext,
|
|
mut each: impl FnMut(Vec<u8>, bool) + 'static,
|
|
processors: AudioProcessorSender,
|
|
) -> Result<AudioWorkletNode, Error> {
|
|
let constraints = MediaStreamConstraints::new();
|
|
constraints.set_audio(&JsValue::TRUE);
|
|
let stream = window()
|
|
.unwrap()
|
|
.navigator()
|
|
.media_devices()
|
|
.ey()?
|
|
.get_user_media_with_constraints(&constraints)
|
|
.ey()?
|
|
.into_future()
|
|
.await
|
|
.ey()?
|
|
.dyn_into()
|
|
.map_err(|e| JsError::new(&format!("not a stream: {e:?}")))
|
|
.ey()?;
|
|
|
|
let source = audio_context.create_media_stream_source(&stream).ey()?;
|
|
let worklet_node = AudioWorkletNode::new(audio_context, "rust_mic_worklet").ey()?;
|
|
|
|
let encoder_error: Closure<dyn FnMut(JsValue)> =
|
|
Closure::new(|e| error!("error encoding audio {:?}", e));
|
|
|
|
// Shared state to signal terminator between onmessage and output closures
|
|
// The output closure runs asynchronously after encoding completes
|
|
let pending_terminator = Arc::new(AtomicCell::new(false));
|
|
let pending_terminator_output = pending_terminator.clone();
|
|
|
|
// This knows what MediaStreamTrackGenerator to use as it closes around it
|
|
let output: Closure<dyn FnMut(EncodedAudioChunk)> =
|
|
Closure::new(move |audio_data: EncodedAudioChunk| {
|
|
let mut array = vec![0u8; audio_data.byte_length() as usize];
|
|
audio_data.copy_to_with_u8_slice(&mut array);
|
|
// Check if this frame was marked as a terminator
|
|
let is_terminator = pending_terminator_output.swap(false);
|
|
each(array, is_terminator);
|
|
});
|
|
|
|
let audio_encoder = AudioEncoder::new(&AudioEncoderInit::new(
|
|
encoder_error.as_ref().unchecked_ref(),
|
|
output.as_ref().unchecked_ref(),
|
|
))
|
|
.unwrap();
|
|
|
|
// This is required to prevent these from being deallocated
|
|
encoder_error.forget();
|
|
output.forget();
|
|
let encoder_config = AudioEncoderConfig::new("opus");
|
|
encoder_config.set_number_of_channels(1);
|
|
encoder_config.set_sample_rate(48000);
|
|
encoder_config.set_bitrate(72_000.0);
|
|
|
|
audio_encoder.configure(&encoder_config);
|
|
info!("created audio encoder");
|
|
|
|
let mut current_processor = AudioProcessor::new_plain();
|
|
let onmessage: Closure<dyn FnMut(MessageEvent)> = Closure::new(move |event: MessageEvent| {
|
|
if let Some(new_processor) = processors.take() {
|
|
current_processor = new_processor;
|
|
}
|
|
|
|
let frame = event.data();
|
|
let state = process_audio(&frame, &mut current_processor);
|
|
|
|
match state {
|
|
TransmitState::Silent => {
|
|
// Don't encode or send anything
|
|
return;
|
|
}
|
|
TransmitState::Transmitting => (), // Normal transmission
|
|
TransmitState::Terminator => {
|
|
// Mark this as a terminator before encoding
|
|
pending_terminator.store(true);
|
|
}
|
|
}
|
|
match AudioData::new(frame.unchecked_ref()) {
|
|
Ok(data) => {
|
|
let _ = audio_encoder.encode(&data);
|
|
}
|
|
Err(err) => {
|
|
error!(
|
|
"error creating AudioData object {:?} during event {:?}",
|
|
err, event,
|
|
);
|
|
}
|
|
}
|
|
});
|
|
Reflect::set(
|
|
&Reflect::get(&worklet_node, &"port".into()).ey()?,
|
|
&"onmessage".into(),
|
|
onmessage.as_ref(),
|
|
)
|
|
.ey()?;
|
|
onmessage.forget();
|
|
|
|
source.connect_with_audio_node(&worklet_node).ey()?;
|
|
worklet_node
|
|
.connect_with_audio_node(&audio_context.destination())
|
|
.ey()?;
|
|
|
|
Ok(worklet_node)
|
|
}
|
|
|
|
#[instrument]
|
|
pub async fn network_connect(
|
|
address: String,
|
|
username: String,
|
|
event_rx: &mut UnboundedReceiver<Command>,
|
|
gui_config: &ClientConfig,
|
|
) -> Result<(), Error> {
|
|
info!("connecting");
|
|
|
|
let object = web_sys::js_sys::Object::new();
|
|
|
|
Reflect::set(
|
|
&object,
|
|
&JsValue::from_str("algorithm"),
|
|
&JsValue::from_str("sha-256"),
|
|
)
|
|
.ey()?;
|
|
|
|
if let Some(server_hash) = &gui_config.cert_hash {
|
|
let hash = web_sys::js_sys::Uint8Array::from(server_hash.as_slice());
|
|
web_sys::js_sys::Reflect::set(&object, &"value".into(), &hash).ey()?;
|
|
}
|
|
|
|
let array = web_sys::js_sys::Array::new();
|
|
array.push(&object);
|
|
|
|
debug!("created option object: {:?}", &object);
|
|
|
|
let mut options = WebTransportOptions::new();
|
|
options.set_server_certificate_hashes(&array);
|
|
|
|
debug!("created WebTransportOptions");
|
|
console::log_1(&options.clone().into());
|
|
|
|
let transport = WebTransport::new_with_options(&address, &options).ey()?;
|
|
debug!("created WebTransport connection object");
|
|
console::log_1(&transport.clone().into());
|
|
|
|
if let Err(e) = wasm_bindgen_futures::JsFuture::from(transport.ready())
|
|
.await
|
|
.ey()
|
|
{
|
|
bail!("could not connect to transport: {e}");
|
|
}
|
|
|
|
info!("transport is ready");
|
|
|
|
let stream: WebTransportBidirectionalStream =
|
|
wasm_bindgen_futures::JsFuture::from(transport.create_bidirectional_stream())
|
|
.await
|
|
.ey()?
|
|
.into();
|
|
|
|
let wasm_stream_readable = wasm_streams::ReadableStream::from_raw(stream.readable().into());
|
|
let wasm_stream_writable = wasm_streams::WritableStream::from_raw(stream.writable().into());
|
|
|
|
let read_codec = ClientControlCodec::new();
|
|
let write_codec = ClientControlCodec::new();
|
|
|
|
let reader =
|
|
asynchronous_codec::FramedRead::new(wasm_stream_readable.into_async_read(), read_codec);
|
|
let writer =
|
|
asynchronous_codec::FramedWrite::new(wasm_stream_writable.into_async_write(), write_codec);
|
|
|
|
crate::network_loop(username, event_rx, reader, writer).await
|
|
}
|
|
|
|
pub fn set_default_username(username: &str) -> Option<()> {
|
|
web_sys::window()?
|
|
.local_storage()
|
|
.ok()??
|
|
.set_item("username", username)
|
|
.ok()
|
|
}
|
|
|
|
pub fn set_default_server(username: &str) -> Option<()> {
|
|
None
|
|
}
|
|
|
|
pub fn load_username() -> Option<String> {
|
|
web_sys::window()
|
|
.unwrap()
|
|
.local_storage()
|
|
.ok()??
|
|
.get_item("username")
|
|
.ok()?
|
|
}
|
|
|
|
pub fn load_server_url() -> Option<String> {
|
|
None
|
|
}
|
|
|
|
pub fn absolute_url(path: &str) -> Result<Url, Error> {
|
|
let window: web_sys::Window = web_sys::window().expect("no global `window` exists");
|
|
let location = window.location();
|
|
Ok(Url::parse(&location.href().ey()?)?.join(path)?)
|
|
}
|
|
|
|
pub async fn load_config() -> color_eyre::Result<ClientConfig> {
|
|
let config_url = match option_env!("MUMBLE_WEB2_GUI_CONFIG_URL") {
|
|
Some(url) => Url::parse(url)?,
|
|
None => absolute_url("config")?,
|
|
};
|
|
info!("loading config from {}", config_url);
|
|
|
|
let config = reqwest::get(config_url)
|
|
.await?
|
|
.json::<ClientConfig>()
|
|
.await?;
|
|
|
|
Ok(config)
|
|
}
|
|
|
|
pub async fn get_status(client: &reqwest::Client) -> color_eyre::Result<ServerStatus> {
|
|
Ok(client
|
|
.get(absolute_url("status")?)
|
|
.send()
|
|
.await?
|
|
.json::<ServerStatus>()
|
|
.await?)
|
|
}
|
|
|
|
pub fn init_logging() {
|
|
// copied from tracing_web example usage
|
|
|
|
use tracing_subscriber::fmt::format::Pretty;
|
|
use tracing_subscriber::prelude::*;
|
|
use tracing_web::{performance_layer, MakeWebConsoleWriter};
|
|
|
|
let fmt_layer = tracing_subscriber::fmt::layer()
|
|
.with_ansi(false) // Only partially supported across browsers
|
|
.without_time() // std::time is not available in browsers
|
|
.with_writer(MakeWebConsoleWriter::new()) // write events to the console
|
|
.with_filter(LevelFilter::DEBUG);
|
|
let perf_layer = performance_layer().with_details_from_fields(Pretty::default());
|
|
|
|
tracing_subscriber::registry()
|
|
.with(fmt_layer)
|
|
.with(perf_layer)
|
|
.init();
|
|
|
|
info!("logging initiated");
|
|
}
|
|
|
|
pub struct SpawnHandle;
|
|
|
|
impl SpawnHandle {
|
|
pub fn current() -> Self {
|
|
SpawnHandle
|
|
}
|
|
|
|
pub fn spawn<F>(&self, future: F)
|
|
where
|
|
F: Future<Output = ()> + 'static,
|
|
{
|
|
spawn(future);
|
|
}
|
|
}
|