From 597ebe376098924bc45a12c6f49f9d72de7cdcb8 Mon Sep 17 00:00:00 2001 From: restitux Date: Sat, 17 Jan 2026 02:49:29 -0700 Subject: [PATCH] split imp between desktop and android --- gui/src/imp/connect.rs | 110 +++++++++++++ gui/src/imp/desktop.rs | 318 +----------------------------------- gui/src/imp/mobile.rs | 48 ++++++ gui/src/imp/mod.rs | 13 +- gui/src/imp/native_audio.rs | 209 ++++++++++++++++++++++++ gui/src/lib.rs | 3 - 6 files changed, 381 insertions(+), 320 deletions(-) create mode 100644 gui/src/imp/connect.rs create mode 100644 gui/src/imp/native_audio.rs diff --git a/gui/src/imp/connect.rs b/gui/src/imp/connect.rs new file mode 100644 index 0000000..8136e2f --- /dev/null +++ b/gui/src/imp/connect.rs @@ -0,0 +1,110 @@ +use crate::app::Command; +use color_eyre::eyre::{bail, Error}; +use dioxus::hooks::UnboundedReceiver; +use mumble_protocol::control::ClientControlCodec; +use std::net::ToSocketAddrs; +use std::sync::Arc; +use tokio::net::TcpStream; +use tokio_rustls::rustls; +use tokio_rustls::rustls::client::danger::{HandshakeSignatureValid, ServerCertVerifier}; +use tokio_rustls::rustls::pki_types::{CertificateDer, ServerName, UnixTime}; +use tokio_rustls::rustls::ClientConfig as RlsClientConfig; +use tokio_rustls::rustls::DigitallySignedStruct; +use tokio_rustls::TlsConnector; +use tokio_util::compat::{TokioAsyncReadCompatExt as _, TokioAsyncWriteCompatExt as _}; +use tracing::{info, instrument}; + +use mumble_web2_common::{ClientConfig, ServerStatus}; + +#[derive(Debug)] +struct NoCertificateVerification; + +impl ServerCertVerifier for NoCertificateVerification { + fn verify_server_cert( + &self, + _end_entity: &CertificateDer<'_>, + _intermediates: &[CertificateDer<'_>], + _server_name: &ServerName<'_>, + _ocsp: &[u8], + _now: UnixTime, + ) -> Result { + Ok(rustls::client::danger::ServerCertVerified::assertion()) + } + + fn verify_tls12_signature( + &self, + _message: &[u8], + _cert: &CertificateDer<'_>, + _dss: &DigitallySignedStruct, + ) -> Result { + Ok(HandshakeSignatureValid::assertion()) + } + + fn verify_tls13_signature( + &self, + _message: &[u8], + _cert: &CertificateDer<'_>, + _dss: &DigitallySignedStruct, + ) -> Result { + Ok(HandshakeSignatureValid::assertion()) + } + + fn supported_verify_schemes(&self) -> Vec { + vec![ + rustls::SignatureScheme::RSA_PKCS1_SHA1, + rustls::SignatureScheme::ECDSA_SHA1_Legacy, + rustls::SignatureScheme::RSA_PKCS1_SHA256, + rustls::SignatureScheme::ECDSA_NISTP256_SHA256, + rustls::SignatureScheme::RSA_PKCS1_SHA384, + rustls::SignatureScheme::ECDSA_NISTP384_SHA384, + rustls::SignatureScheme::RSA_PKCS1_SHA512, + rustls::SignatureScheme::ECDSA_NISTP521_SHA512, + rustls::SignatureScheme::RSA_PSS_SHA256, + rustls::SignatureScheme::RSA_PSS_SHA384, + rustls::SignatureScheme::RSA_PSS_SHA512, + rustls::SignatureScheme::ED25519, + rustls::SignatureScheme::ED448, + ] + } +} + +#[instrument] +pub async fn network_connect( + address: String, + username: String, + event_rx: &mut UnboundedReceiver, + gui_config: &ClientConfig, +) -> Result<(), Error> { + info!("connecting"); + + let config = RlsClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(Arc::new(NoCertificateVerification)) + .with_no_client_auth(); + + let connector = TlsConnector::from(Arc::new(config)); + + let addr = format!("{}:{}", address, 64738) + .to_socket_addrs()? + .next() + .unwrap(); + + let server_tcp = TcpStream::connect(addr).await?; + let server_stream = connector + //.connect("127.0.0.1".try_into()?, server_tcp) + .connect(address.try_into()?, server_tcp) + .await?; + let (read_server, write_server) = tokio::io::split(server_stream); + + let read_codec = ClientControlCodec::new(); + let write_codec = ClientControlCodec::new(); + + let reader = asynchronous_codec::FramedRead::new(read_server.compat(), read_codec); + let writer = asynchronous_codec::FramedWrite::new(write_server.compat_write(), write_codec); + + crate::network_loop(username, event_rx, reader, writer).await +} + +pub async fn get_status(client: &reqwest::Client) -> color_eyre::Result { + bail!("status not supported on desktop yet") +} diff --git a/gui/src/imp/desktop.rs b/gui/src/imp/desktop.rs index d392ddf..8a41f31 100644 --- a/gui/src/imp/desktop.rs +++ b/gui/src/imp/desktop.rs @@ -1,320 +1,12 @@ -use crate::app::Command; -use crate::effects::{AudioProcessor, AudioProcessorSender}; -use color_eyre::eyre::{bail, eyre, Error}; -use cpal::traits::{DeviceTrait, HostTrait, StreamTrait as _}; -use dioxus::hooks::UnboundedReceiver; use etcetera::{choose_app_strategy, AppStrategy, AppStrategyArgs}; -use futures::io::{AsyncRead, AsyncWrite}; -use mumble_protocol::control::ClientControlCodec; -use mumble_web2_common::{ClientConfig, ServerStatus}; +use mumble_web2_common::ClientConfig; use std::collections::HashMap; -use std::mem::replace; -use std::net::ToSocketAddrs; -use std::sync::Arc; -use std::sync::Mutex; -use tokio::net::TcpStream; -use tokio_rustls::rustls; -use tokio_rustls::rustls::client::danger::{HandshakeSignatureValid, ServerCertVerifier}; -use tokio_rustls::rustls::pki_types::{CertificateDer, ServerName, UnixTime}; -use tokio_rustls::rustls::ClientConfig as RlsClientConfig; -use tokio_rustls::rustls::DigitallySignedStruct; -use tokio_rustls::TlsConnector; -use tokio_util::compat::{TokioAsyncReadCompatExt as _, TokioAsyncWriteCompatExt as _}; -use tracing::{error, info, instrument, warn}; - pub use tokio::runtime::Handle as SpawnHandle; pub use tokio::task::spawn; pub use tokio::time::sleep; -pub trait ImpRead: AsyncRead + Unpin + Send + 'static {} -impl ImpRead for T {} - -pub trait ImpWrite: AsyncWrite + Unpin + Send + 'static {} -impl ImpWrite for T {} - -pub struct AudioSystem { - output: cpal::Device, - input: cpal::Device, - processors: AudioProcessorSender, - recording_stream: Option, -} - -const SAMPLE_RATE: u32 = 48_000; -const PACKET_SAMPLES: u32 = 960; - -type Buffer = Arc>>>; - -impl AudioSystem { - pub async fn new() -> Result { - // TODO - let host = cpal::default_host(); - let name = host.id(); - let processors = AudioProcessorSender::default(); - Ok(AudioSystem { - output: host - .default_output_device() - .ok_or(eyre!("no output devices from {name:?}"))?, - input: host - .default_input_device() - .ok_or(eyre!("no input devices from {name:?}"))?, - processors, - recording_stream: None, - }) - } - - pub fn set_processor(&self, processor: AudioProcessor) { - self.processors.store(Some(processor)) - } - - fn choose_config( - &self, - configs: impl Iterator, - ) -> Result { - let mut supported_configs: Vec<_> = configs - .filter_map(|cfg| cfg.try_with_sample_rate(cpal::SampleRate(SAMPLE_RATE))) - .filter(|cfg| cfg.sample_format() == cpal::SampleFormat::I16) - .map(|cfg| cpal::StreamConfig { - buffer_size: cpal::BufferSize::Fixed(match *cfg.buffer_size() { - cpal::SupportedBufferSize::Range { min, max } => 480.clamp(min, max), - cpal::SupportedBufferSize::Unknown => 480, - }), - ..cfg.config() - }) - .collect(); - supported_configs.sort_by(|a, b| { - let cpal::BufferSize::Fixed(a_buf) = a.buffer_size else { - unreachable!() - }; - let cpal::BufferSize::Fixed(b_buf) = b.buffer_size else { - unreachable!() - }; - Ord::cmp(&a.channels, &b.channels).then(Ord::cmp(&a_buf, &b_buf)) - }); - supported_configs - .get(0) - .cloned() - .ok_or(eyre!("no supported stream configs")) - } - - pub fn start_recording( - &mut self, - mut each: impl FnMut(Vec) + Send + 'static, - ) -> Result<(), Error> { - let config = self.choose_config(self.input.supported_input_configs()?)?; - info!( - "creating recording on {:?} with {:#?}", - self.input.name()?, - config - ); - let mut encoder = - opus::Encoder::new(SAMPLE_RATE, opus::Channels::Mono, opus::Application::Voip)?; - let mut current_processor = AudioProcessor::new_plain(); - let mut output_buffer = Vec::new(); - let processors = self.processors.clone(); - let error_callback = move |e: cpal::StreamError| error!("error recording: {e:?}"); - let data_callback = move |frame: &[f32], _: &cpal::InputCallbackInfo| { - if let Some(new_processor) = processors.take() { - current_processor = new_processor; - } - current_processor.process(frame, config.channels as usize, &mut output_buffer); - if output_buffer.len() < PACKET_SAMPLES as usize { - return; - } - let remainder = output_buffer.split_off(PACKET_SAMPLES as usize); - let frame = replace(&mut output_buffer, remainder); - match encoder.encode_vec_float(&frame, frame.len() * 2) { - Ok(buf) => { - each(buf); - } - Err(e) => { - error!("error encoding {} samples: {e:?}", frame.len()); - } - } - }; - - match self - .input - .build_input_stream(&config, data_callback, error_callback, None) - { - Ok(stream) => { - stream.play()?; - self.recording_stream = Some(stream); - Ok(()) - } - Err(err) => { - self.recording_stream = None; - Err(err.into()) - } - } - } - - pub fn create_player(&mut self) -> Result { - let config = self.choose_config(self.output.supported_output_configs()?)?; - info!( - "creating player on {:?} with {:#?}", - self.output.name().ok(), - &config - ); - let buffer = Arc::new(Mutex::new(dasp_ring_buffer::Bounded::from_raw_parts( - 0, - 0, - vec![ - 0; - SAMPLE_RATE as usize/4 // 250ms of buffer - ], - ))); - let decoder = opus::Decoder::new(SAMPLE_RATE, opus::Channels::Mono)?; - let stream = { - let buffer = buffer.clone(); - self.output.build_output_stream( - &config, - move |frame, _info| { - let mut buffer = buffer.lock().unwrap(); - for x in frame.chunks_mut(config.channels as usize) { - match buffer.pop() { - Some(y) => { - x.fill(y); - } - None => { - x.fill(0); - } - } - } - }, - move |err| error!("could not create output stream {err:?}"), - None, - )? - }; - stream.play()?; - Ok(AudioPlayer { - decoder, - stream, - buffer, - tmp: vec![0; 2400], - }) - } -} - -pub struct AudioPlayer { - decoder: opus::Decoder, - stream: cpal::Stream, - buffer: Buffer, - tmp: Vec, -} - -impl AudioPlayer { - pub fn play_opus(&mut self, payload: &[u8]) { - let len = loop { - match self.decoder.decode(payload, &mut self.tmp, false) { - Ok(l) => break l, - Err(e) => { - error!("opus decode error {e:?}"); - return; - } - } - }; - - let mut buffer = self.buffer.lock().unwrap(); - let mut overrun = 0; - for x in &self.tmp[..len] { - if let Some(_) = buffer.push(*x) { - overrun += 1; - } - } - if overrun > 0 { - warn!("playback overrun by {overrun} samples"); - } - } -} - -#[derive(Debug)] -struct NoCertificateVerification; - -impl ServerCertVerifier for NoCertificateVerification { - fn verify_server_cert( - &self, - _end_entity: &CertificateDer<'_>, - _intermediates: &[CertificateDer<'_>], - _server_name: &ServerName<'_>, - _ocsp: &[u8], - _now: UnixTime, - ) -> Result { - Ok(rustls::client::danger::ServerCertVerified::assertion()) - } - - fn verify_tls12_signature( - &self, - _message: &[u8], - _cert: &CertificateDer<'_>, - _dss: &DigitallySignedStruct, - ) -> Result { - Ok(HandshakeSignatureValid::assertion()) - } - - fn verify_tls13_signature( - &self, - _message: &[u8], - _cert: &CertificateDer<'_>, - _dss: &DigitallySignedStruct, - ) -> Result { - Ok(HandshakeSignatureValid::assertion()) - } - - fn supported_verify_schemes(&self) -> Vec { - vec![ - rustls::SignatureScheme::RSA_PKCS1_SHA1, - rustls::SignatureScheme::ECDSA_SHA1_Legacy, - rustls::SignatureScheme::RSA_PKCS1_SHA256, - rustls::SignatureScheme::ECDSA_NISTP256_SHA256, - rustls::SignatureScheme::RSA_PKCS1_SHA384, - rustls::SignatureScheme::ECDSA_NISTP384_SHA384, - rustls::SignatureScheme::RSA_PKCS1_SHA512, - rustls::SignatureScheme::ECDSA_NISTP521_SHA512, - rustls::SignatureScheme::RSA_PSS_SHA256, - rustls::SignatureScheme::RSA_PSS_SHA384, - rustls::SignatureScheme::RSA_PSS_SHA512, - rustls::SignatureScheme::ED25519, - rustls::SignatureScheme::ED448, - ] - } -} - -#[instrument] -pub async fn network_connect( - address: String, - username: String, - event_rx: &mut UnboundedReceiver, - gui_config: &ClientConfig, -) -> Result<(), Error> { - info!("connecting"); - - let config = RlsClientConfig::builder() - .dangerous() - .with_custom_certificate_verifier(Arc::new(NoCertificateVerification)) - .with_no_client_auth(); - - let connector = TlsConnector::from(Arc::new(config)); - - let addr = format!("{}:{}", address, 64738) - .to_socket_addrs()? - .next() - .unwrap(); - - let server_tcp = TcpStream::connect(addr).await?; - let server_stream = connector - //.connect("127.0.0.1".try_into()?, server_tcp) - .connect(address.try_into()?, server_tcp) - .await?; - let (read_server, write_server) = tokio::io::split(server_stream); - - let read_codec = ClientControlCodec::new(); - let write_codec = ClientControlCodec::new(); - - let reader = asynchronous_codec::FramedRead::new(read_server.compat(), read_codec); - let writer = asynchronous_codec::FramedWrite::new(write_server.compat_write(), write_codec); - - crate::network_loop(username, event_rx, reader, writer).await -} +pub use super::connect::*; +pub use super::native_audio::*; fn get_config_path() -> std::path::PathBuf { let strategy = choose_app_strategy(AppStrategyArgs { @@ -374,10 +66,6 @@ pub async fn load_config() -> color_eyre::Result { }) } -pub async fn get_status(client: &reqwest::Client) -> color_eyre::Result { - bail!("status not supported on desktop yet") -} - pub fn init_logging() { use tracing::level_filters::LevelFilter; use tracing_subscriber::filter::EnvFilter; diff --git a/gui/src/imp/mobile.rs b/gui/src/imp/mobile.rs index 97d75c5..b632934 100644 --- a/gui/src/imp/mobile.rs +++ b/gui/src/imp/mobile.rs @@ -1,5 +1,53 @@ use android_permissions::{PermissionManager, RECORD_AUDIO}; use jni::{objects::JObject, JavaVM}; +use mumble_web2_common::ClientConfig; + +use std::collections::HashMap; +pub use tokio::runtime::Handle as SpawnHandle; +pub use tokio::task::spawn; +pub use tokio::time::sleep; + +pub use super::connect::*; +pub use super::native_audio::*; + +pub fn set_default_username(username: &str) -> Option<()> { + None +} + +pub fn set_default_server(server: &str) -> Option<()> { + None +} + +pub fn load_username() -> Option { + None +} + +pub fn load_server_url() -> Option { + None +} + +pub async fn load_config() -> color_eyre::Result { + Ok(ClientConfig { + proxy_url: None, + cert_hash: None, + any_server: true, + }) +} + +pub fn init_logging() { + use tracing::level_filters::LevelFilter; + use tracing_subscriber::filter::EnvFilter; + + let env_filter = EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(); + + tracing_subscriber::fmt() + .with_target(true) + .with_level(true) + .with_env_filter(env_filter) + .init(); +} #[cfg(feature = "mobile")] pub fn request_permissions() { diff --git a/gui/src/imp/mod.rs b/gui/src/imp/mod.rs index b16228f..106e281 100644 --- a/gui/src/imp/mod.rs +++ b/gui/src/imp/mod.rs @@ -2,11 +2,20 @@ mod web; #[cfg(any(feature = "desktop", feature = "mobile"))] -mod desktop; +mod connect; +#[cfg(any(feature = "desktop", feature = "mobile"))] +mod native_audio; +#[cfg(feature = "desktop")] +mod desktop; #[cfg(feature = "mobile")] mod mobile; +#[cfg(feature = "desktop")] +pub use desktop::*; +#[cfg(feature = "mobile")] +pub use mobile::*; + #[cfg(feature = "mobile")] pub use mobile::request_permissions; @@ -16,5 +25,5 @@ pub fn request_permissions() {} #[cfg(all(feature = "web", not(any(feature = "desktop", feature = "mobile"))))] pub use web::*; -#[cfg(any(feature = "desktop", feature = "mobile"))] +#[cfg(any(feature = "desktop"))] pub use desktop::*; diff --git a/gui/src/imp/native_audio.rs b/gui/src/imp/native_audio.rs new file mode 100644 index 0000000..dda5c3a --- /dev/null +++ b/gui/src/imp/native_audio.rs @@ -0,0 +1,209 @@ +use crate::effects::{AudioProcessor, AudioProcessorSender}; +use color_eyre::eyre::{eyre, Error}; +use cpal::traits::{DeviceTrait, HostTrait, StreamTrait as _}; +use futures::io::{AsyncRead, AsyncWrite}; +use std::mem::replace; +use std::sync::Arc; +use std::sync::Mutex; +use tracing::{error, info, warn}; + +pub trait ImpRead: AsyncRead + Unpin + Send + 'static {} +impl ImpRead for T {} + +pub trait ImpWrite: AsyncWrite + Unpin + Send + 'static {} +impl ImpWrite for T {} + +pub struct AudioSystem { + output: cpal::Device, + input: cpal::Device, + processors: AudioProcessorSender, + recording_stream: Option, +} + +const SAMPLE_RATE: u32 = 48_000; +const PACKET_SAMPLES: u32 = 960; + +type Buffer = Arc>>>; + +impl AudioSystem { + pub async fn new() -> Result { + // TODO + let host = cpal::default_host(); + let name = host.id(); + let processors = AudioProcessorSender::default(); + Ok(AudioSystem { + output: host + .default_output_device() + .ok_or(eyre!("no output devices from {name:?}"))?, + input: host + .default_input_device() + .ok_or(eyre!("no input devices from {name:?}"))?, + processors, + recording_stream: None, + }) + } + + pub fn set_processor(&self, processor: AudioProcessor) { + self.processors.store(Some(processor)) + } + + fn choose_config( + &self, + configs: impl Iterator, + ) -> Result { + let mut supported_configs: Vec<_> = configs + .filter_map(|cfg| cfg.try_with_sample_rate(cpal::SampleRate(SAMPLE_RATE))) + .filter(|cfg| cfg.sample_format() == cpal::SampleFormat::I16) + .map(|cfg| cpal::StreamConfig { + buffer_size: cpal::BufferSize::Fixed(match *cfg.buffer_size() { + cpal::SupportedBufferSize::Range { min, max } => 480.clamp(min, max), + cpal::SupportedBufferSize::Unknown => 480, + }), + ..cfg.config() + }) + .collect(); + supported_configs.sort_by(|a, b| { + let cpal::BufferSize::Fixed(a_buf) = a.buffer_size else { + unreachable!() + }; + let cpal::BufferSize::Fixed(b_buf) = b.buffer_size else { + unreachable!() + }; + Ord::cmp(&a.channels, &b.channels).then(Ord::cmp(&a_buf, &b_buf)) + }); + supported_configs + .get(0) + .cloned() + .ok_or(eyre!("no supported stream configs")) + } + + pub fn start_recording( + &mut self, + mut each: impl FnMut(Vec) + Send + 'static, + ) -> Result<(), Error> { + let config = self.choose_config(self.input.supported_input_configs()?)?; + info!( + "creating recording on {:?} with {:#?}", + self.input.name()?, + config + ); + let mut encoder = + opus::Encoder::new(SAMPLE_RATE, opus::Channels::Mono, opus::Application::Voip)?; + let mut current_processor = AudioProcessor::new_plain(); + let mut output_buffer = Vec::new(); + let processors = self.processors.clone(); + let error_callback = move |e: cpal::StreamError| error!("error recording: {e:?}"); + let data_callback = move |frame: &[f32], _: &cpal::InputCallbackInfo| { + if let Some(new_processor) = processors.take() { + current_processor = new_processor; + } + current_processor.process(frame, config.channels as usize, &mut output_buffer); + if output_buffer.len() < PACKET_SAMPLES as usize { + return; + } + let remainder = output_buffer.split_off(PACKET_SAMPLES as usize); + let frame = replace(&mut output_buffer, remainder); + match encoder.encode_vec_float(&frame, frame.len() * 2) { + Ok(buf) => { + each(buf); + } + Err(e) => { + error!("error encoding {} samples: {e:?}", frame.len()); + } + } + }; + + match self + .input + .build_input_stream(&config, data_callback, error_callback, None) + { + Ok(stream) => { + stream.play()?; + self.recording_stream = Some(stream); + Ok(()) + } + Err(err) => { + self.recording_stream = None; + Err(err.into()) + } + } + } + + pub fn create_player(&mut self) -> Result { + let config = self.choose_config(self.output.supported_output_configs()?)?; + info!( + "creating player on {:?} with {:#?}", + self.output.name().ok(), + &config + ); + let buffer = Arc::new(Mutex::new(dasp_ring_buffer::Bounded::from_raw_parts( + 0, + 0, + vec![ + 0; + SAMPLE_RATE as usize/4 // 250ms of buffer + ], + ))); + let decoder = opus::Decoder::new(SAMPLE_RATE, opus::Channels::Mono)?; + let stream = { + let buffer = buffer.clone(); + self.output.build_output_stream( + &config, + move |frame, _info| { + let mut buffer = buffer.lock().unwrap(); + for x in frame.chunks_mut(config.channels as usize) { + match buffer.pop() { + Some(y) => { + x.fill(y); + } + None => { + x.fill(0); + } + } + } + }, + move |err| error!("could not create output stream {err:?}"), + None, + )? + }; + stream.play()?; + Ok(AudioPlayer { + decoder, + stream, + buffer, + tmp: vec![0; 2400], + }) + } +} + +pub struct AudioPlayer { + decoder: opus::Decoder, + stream: cpal::Stream, + buffer: Buffer, + tmp: Vec, +} + +impl AudioPlayer { + pub fn play_opus(&mut self, payload: &[u8]) { + let len = loop { + match self.decoder.decode(payload, &mut self.tmp, false) { + Ok(l) => break l, + Err(e) => { + error!("opus decode error {e:?}"); + return; + } + } + }; + + let mut buffer = self.buffer.lock().unwrap(); + let mut overrun = 0; + for x in &self.tmp[..len] { + if let Some(_) = buffer.push(*x) { + overrun += 1; + } + } + if overrun > 0 { + warn!("playback overrun by {overrun} samples"); + } + } +} diff --git a/gui/src/lib.rs b/gui/src/lib.rs index a8a163d..b5c07a0 100644 --- a/gui/src/lib.rs +++ b/gui/src/lib.rs @@ -20,12 +20,9 @@ use mumble_protocol::voice::VoicePacket; use mumble_protocol::voice::VoicePacketPayload; use mumble_protocol::Clientbound; use mumble_protocol::Serverbound; -use mumble_web2_common::ClientConfig; -use once_cell::sync::Lazy; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::time::Duration; -use tracing::debug; use tracing::error; use tracing::info;