use crate::effects::{AudioProcessor, AudioProcessorSender, TransmitState}; use color_eyre::eyre::{eyre, Error}; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait as _}; use std::mem::replace; use std::sync::Arc; use std::sync::Mutex; use tracing::{error, info, warn}; pub struct NativeAudioSystem { output: cpal::Device, input: cpal::Device, processors: AudioProcessorSender, recording_stream: Option, } const SAMPLE_RATE: u32 = 48_000; const PACKET_SAMPLES: u32 = 960; // Divide by 1000 to get samples per ms, then multiply by 60ms for max Opus frame size. const MAX_DECODE_SAMPLES: usize = SAMPLE_RATE as usize / 1000 * 60; fn encode_and_send( state: TransmitState, output_buffer: &mut Vec, encoder: &mut opus::Encoder, each: &mut impl FnMut(Vec, bool), ) { let (is_terminator, should_encode) = match state { TransmitState::Silent => return, TransmitState::Transmitting => (false, output_buffer.len() >= PACKET_SAMPLES as usize), TransmitState::Terminator => { output_buffer.resize(PACKET_SAMPLES as usize, 0.0); (true, true) } }; if should_encode { let remainder = output_buffer.split_off(PACKET_SAMPLES as usize); let frame = replace(output_buffer, remainder); match encoder.encode_vec_float(&frame, frame.len() * 2) { Ok(encoded) => each(encoded, is_terminator), Err(e) => error!("error encoding {} samples: {e:?}", frame.len()), } } } type Buffer = Arc>>>; impl NativeAudioSystem { fn choose_config( &self, configs: impl Iterator, ) -> Result { let mut supported_configs: Vec<_> = configs .filter_map(|cfg| cfg.try_with_sample_rate(cpal::SampleRate(SAMPLE_RATE))) .filter(|cfg| cfg.sample_format() == cpal::SampleFormat::I16) .map(|cfg| cpal::StreamConfig { buffer_size: cpal::BufferSize::Fixed(match *cfg.buffer_size() { cpal::SupportedBufferSize::Range { min, max } => 480.clamp(min, max), cpal::SupportedBufferSize::Unknown => 480, }), ..cfg.config() }) .collect(); supported_configs.sort_by(|a, b| { let cpal::BufferSize::Fixed(a_buf) = a.buffer_size else { unreachable!() }; let cpal::BufferSize::Fixed(b_buf) = b.buffer_size else { unreachable!() }; Ord::cmp(&a.channels, &b.channels).then(Ord::cmp(&a_buf, &b_buf)) }); supported_configs .get(0) .cloned() .ok_or(eyre!("no supported stream configs")) } } impl super::AudioSystemInterface for NativeAudioSystem { type AudioPlayer = NativeAudioPlayer; async fn new() -> Result { let host = cpal::default_host(); let name = host.id(); let processors = AudioProcessorSender::default(); Ok(NativeAudioSystem { output: host .default_output_device() .ok_or(eyre!("no output devices from {name:?}"))?, input: host .default_input_device() .ok_or(eyre!("no input devices from {name:?}"))?, processors, recording_stream: None, }) } fn set_processor(&self, processor: AudioProcessor) { self.processors.store(Some(processor)) } fn start_recording( &mut self, mut each: impl FnMut(Vec, bool) + Send + 'static, ) -> Result<(), Error> { let config = self.choose_config(self.input.supported_input_configs()?)?; info!( "creating recording on {:?} with {:#?}", self.input.name()?, config ); let mut encoder = opus::Encoder::new(SAMPLE_RATE, opus::Channels::Mono, opus::Application::Voip)?; let mut current_processor = AudioProcessor::new_plain(); let mut output_buffer = Vec::new(); let processors = self.processors.clone(); let error_callback = move |e: cpal::StreamError| error!("error recording: {e:?}"); let data_callback = move |frame: &[f32], _: &cpal::InputCallbackInfo| { if let Some(new_processor) = processors.take() { current_processor = new_processor; } let state = current_processor.process(frame, config.channels as usize, &mut output_buffer); encode_and_send(state, &mut output_buffer, &mut encoder, &mut each); }; match self .input .build_input_stream(&config, data_callback, error_callback, None) { Ok(stream) => { stream.play()?; self.recording_stream = Some(stream); Ok(()) } Err(err) => { self.recording_stream = None; Err(err.into()) } } } fn create_player(&mut self) -> Result { let config = self.choose_config(self.output.supported_output_configs()?)?; info!( "creating player on {:?} with {:#?}", self.output.name().ok(), &config ); let buffer = Arc::new(Mutex::new(dasp_ring_buffer::Bounded::from_raw_parts( 0, 0, vec![ 0; SAMPLE_RATE as usize/4 // 250ms of buffer ], ))); let decoder = opus::Decoder::new(SAMPLE_RATE, opus::Channels::Mono)?; let stream = { let buffer = buffer.clone(); self.output.build_output_stream( &config, move |frame, _info| { let mut buffer = buffer.lock().unwrap(); for x in frame.chunks_mut(config.channels as usize) { match buffer.pop() { Some(y) => { x.fill(y); } None => { x.fill(0); } } } }, move |err| error!("could not create output stream {err:?}"), None, )? }; stream.play()?; Ok(NativeAudioPlayer { decoder, stream, buffer, tmp: vec![0; MAX_DECODE_SAMPLES], }) } } pub struct NativeAudioPlayer { decoder: opus::Decoder, stream: cpal::Stream, buffer: Buffer, tmp: Vec, } impl super::AudioPlayerInterface for NativeAudioPlayer { fn play_opus(&mut self, payload: &[u8]) { let len = match self.decoder.decode(payload, &mut self.tmp, false) { Ok(l) => l, Err(e) => { error!("opus decode error {e:?}"); return; } }; let mut buffer = self.buffer.lock().unwrap(); let mut overrun = 0; for x in &self.tmp[..len] { if let Some(_) = buffer.push(*x) { overrun += 1; } } if overrun > 0 { warn!("playback overrun by {overrun} samples"); } } }