use crate::app::Command; use crate::effects::{AudioProcessor, AudioProcessorSender}; use color_eyre::eyre::{bail, eyre, Context, Error}; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait as _}; use dioxus::hooks::UnboundedReceiver; use futures::io::{AsyncRead, AsyncWrite}; use mumble_protocol::control::ClientControlCodec; use mumble_web2_common::{ClientConfig, ServerStatus}; use std::mem::replace; use std::net::ToSocketAddrs; use std::sync::Arc; use std::sync::Mutex; use tokio::net::TcpStream; use tokio_rustls::rustls; use tokio_rustls::rustls::client::danger::{HandshakeSignatureValid, ServerCertVerifier}; use tokio_rustls::rustls::pki_types::{CertificateDer, ServerName, UnixTime}; use tokio_rustls::rustls::ClientConfig as RlsClientConfig; use tokio_rustls::rustls::DigitallySignedStruct; use tokio_rustls::TlsConnector; use tokio_util::compat::{TokioAsyncReadCompatExt as _, TokioAsyncWriteCompatExt as _}; use tracing::{error, info, instrument, warn}; pub use tokio::runtime::Handle as SpawnHandle; pub use tokio::task::spawn; pub use tokio::time::sleep; pub trait ImpRead: AsyncRead + Unpin + Send + 'static {} impl ImpRead for T {} pub trait ImpWrite: AsyncWrite + Unpin + Send + 'static {} impl ImpWrite for T {} pub struct AudioSystem { output: cpal::Device, input: cpal::Device, processors: AudioProcessorSender, recording_stream: Option, } const SAMPLE_RATE: u32 = 48_000; const PACKET_SAMPLES: u32 = 960; type Buffer = Arc>>>; impl AudioSystem { pub async fn new() -> Result { // TODO let host = cpal::default_host(); let name = host.id(); let processors = AudioProcessorSender::default(); Ok(AudioSystem { output: host .default_output_device() .ok_or(eyre!("no output devices from {name:?}"))?, input: host .default_input_device() .ok_or(eyre!("no input devices from {name:?}"))?, processors, recording_stream: None, }) } pub fn set_processor(&self, processor: AudioProcessor) { self.processors.store(Some(processor)) } fn choose_config( &self, configs: impl Iterator, ) -> Result { let mut supported_configs: Vec<_> = configs .filter_map(|cfg| cfg.try_with_sample_rate(cpal::SampleRate(SAMPLE_RATE))) .filter(|cfg| cfg.sample_format() == cpal::SampleFormat::I16) .map(|cfg| cpal::StreamConfig { buffer_size: cpal::BufferSize::Fixed(match *cfg.buffer_size() { cpal::SupportedBufferSize::Range { min, max } => 480.clamp(min, max), cpal::SupportedBufferSize::Unknown => 480, }), ..cfg.config() }) .collect(); supported_configs.sort_by(|a, b| { let cpal::BufferSize::Fixed(a_buf) = a.buffer_size else { unreachable!() }; let cpal::BufferSize::Fixed(b_buf) = b.buffer_size else { unreachable!() }; Ord::cmp(&a.channels, &b.channels).then(Ord::cmp(&a_buf, &b_buf)) }); supported_configs .get(0) .cloned() .ok_or(eyre!("no supported stream configs")) } pub fn start_recording( &mut self, mut each: impl FnMut(Vec) + Send + 'static, ) -> Result<(), Error> { let config = self.choose_config(self.input.supported_input_configs()?)?; info!( "creating recording on {:?} with {:#?}", self.input.name()?, config ); let mut encoder = opus::Encoder::new(SAMPLE_RATE, opus::Channels::Mono, opus::Application::Voip)?; let mut current_processor = AudioProcessor::new_plain(); let mut output_buffer = Vec::new(); let processors = self.processors.clone(); let error_callback = move |e: cpal::StreamError| error!("error recording: {e:?}"); let data_callback = move |frame: &[f32], _: &cpal::InputCallbackInfo| { if let Some(new_processor) = processors.take() { current_processor = new_processor; } current_processor.process(frame, config.channels as usize, &mut output_buffer); if output_buffer.len() < PACKET_SAMPLES as usize { return; } let remainder = output_buffer.split_off(PACKET_SAMPLES as usize); let frame = replace(&mut output_buffer, remainder); match encoder.encode_vec_float(&frame, frame.len() * 2) { Ok(buf) => { each(buf); } Err(e) => { error!("error encoding {} samples: {e:?}", frame.len()); } } }; match self .input .build_input_stream(&config, data_callback, error_callback, None) { Ok(stream) => { stream.play()?; self.recording_stream = Some(stream); Ok(()) } Err(err) => { self.recording_stream = None; Err(err.into()) } } } pub fn create_player(&mut self) -> Result { let config = self.choose_config(self.input.supported_input_configs()?)?; info!( "creating player on {:?} with {:#?}", self.output.name().ok(), &config ); let buffer = Arc::new(Mutex::new(dasp_ring_buffer::Bounded::from_raw_parts( 0, 0, vec![ 0; SAMPLE_RATE as usize/4 // 250ms of buffer ], ))); let decoder = opus::Decoder::new(SAMPLE_RATE, opus::Channels::Mono)?; let stream = { let buffer = buffer.clone(); self.output.build_output_stream( &config, move |frame, _info| { let mut buffer = buffer.lock().unwrap(); for x in frame.chunks_mut(config.channels as usize) { match buffer.pop() { Some(y) => { x.fill(y); } None => { x.fill(0); } } } }, move |err| error!("could not create output stream {err:?}"), None, )? }; stream.play()?; Ok(AudioPlayer { decoder, stream, buffer, tmp: vec![0; 2400], }) } } pub struct AudioPlayer { decoder: opus::Decoder, stream: cpal::Stream, buffer: Buffer, tmp: Vec, } impl AudioPlayer { pub fn play_opus(&mut self, payload: &[u8]) { let len = loop { match self.decoder.decode(payload, &mut self.tmp, false) { Ok(l) => break l, Err(e) => { error!("opus decode error {e:?}"); return; } } }; let mut buffer = self.buffer.lock().unwrap(); let mut overrun = 0; for x in &self.tmp[..len] { if let Some(_) = buffer.push(*x) { overrun += 1; } } if overrun > 0 { warn!("playback overrun by {overrun} samples"); } } } #[derive(Debug)] struct NoCertificateVerification; impl ServerCertVerifier for NoCertificateVerification { fn verify_server_cert( &self, _end_entity: &CertificateDer<'_>, _intermediates: &[CertificateDer<'_>], _server_name: &ServerName<'_>, _ocsp: &[u8], _now: UnixTime, ) -> Result { Ok(rustls::client::danger::ServerCertVerified::assertion()) } fn verify_tls12_signature( &self, _message: &[u8], _cert: &CertificateDer<'_>, _dss: &DigitallySignedStruct, ) -> Result { Ok(HandshakeSignatureValid::assertion()) } fn verify_tls13_signature( &self, _message: &[u8], _cert: &CertificateDer<'_>, _dss: &DigitallySignedStruct, ) -> Result { Ok(HandshakeSignatureValid::assertion()) } fn supported_verify_schemes(&self) -> Vec { vec![ rustls::SignatureScheme::RSA_PKCS1_SHA1, rustls::SignatureScheme::ECDSA_SHA1_Legacy, rustls::SignatureScheme::RSA_PKCS1_SHA256, rustls::SignatureScheme::ECDSA_NISTP256_SHA256, rustls::SignatureScheme::RSA_PKCS1_SHA384, rustls::SignatureScheme::ECDSA_NISTP384_SHA384, rustls::SignatureScheme::RSA_PKCS1_SHA512, rustls::SignatureScheme::ECDSA_NISTP521_SHA512, rustls::SignatureScheme::RSA_PSS_SHA256, rustls::SignatureScheme::RSA_PSS_SHA384, rustls::SignatureScheme::RSA_PSS_SHA512, rustls::SignatureScheme::ED25519, rustls::SignatureScheme::ED448, ] } } #[instrument] pub async fn network_connect( address: String, username: String, event_rx: &mut UnboundedReceiver, gui_config: &ClientConfig, ) -> Result<(), Error> { info!("connecting"); let config = RlsClientConfig::builder() .dangerous() .with_custom_certificate_verifier(Arc::new(NoCertificateVerification)) .with_no_client_auth(); let connector = TlsConnector::from(Arc::new(config)); let addr = format!("{}:{}", address, 64738) .to_socket_addrs()? .next() .unwrap(); let server_tcp = TcpStream::connect(addr).await?; let server_stream = connector //.connect("127.0.0.1".try_into()?, server_tcp) .connect(address.try_into()?, server_tcp) .await?; let (read_server, write_server) = tokio::io::split(server_stream); let read_codec = ClientControlCodec::new(); let write_codec = ClientControlCodec::new(); let reader = asynchronous_codec::FramedRead::new(read_server.compat(), read_codec); let writer = asynchronous_codec::FramedWrite::new(write_server.compat_write(), write_codec); crate::network_loop(username, event_rx, reader, writer).await } pub fn set_default_username(username: &str) -> Option<()> { None } pub fn load_username() -> Option { return None; } pub async fn load_config() -> color_eyre::Result { Ok(ClientConfig { proxy_url: None, cert_hash: None, any_server: true, }) } pub async fn get_status(client: &reqwest::Client) -> color_eyre::Result { bail!("status not supported on desktop yet") } pub fn init_logging() { use tracing::level_filters::LevelFilter; use tracing_subscriber::filter::EnvFilter; let env_filter = EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) .from_env_lossy(); tracing_subscriber::fmt() .with_target(true) .with_level(true) .with_env_filter(env_filter) .init(); }