attempt desktop audio playback

This commit is contained in:
2024-11-12 20:03:58 -07:00
parent b65ec274d8
commit b2ee911c66
5 changed files with 354 additions and 59 deletions
+5 -2
View File
@@ -62,7 +62,9 @@ tracing-web = { version = "0.1.3", optional = true }
dioxus-desktop = { version = "0.6.0-alpha.4", optional = true}
tokio = { version = "1.41.1", features = ["net", "rt"], optional = true }
tokio-rustls = { version = "0.26.0", optional = true }
opus = { version = "0.3.0", optional = true }
cpal = { version = "0.15.3", optional = true }
dasp_ring_buffer = { version = "0.11.0", optional = true }
# Base Dependencies
# ================
@@ -86,6 +88,7 @@ serde = { workspace = true }
tracing-subscriber = { version = "0.3.18", features = ["ansi"] }
tracing = "0.1.40"
color-eyre = "0.6.3"
crossbeam-queue = "0.3.11"
[features]
web = [
@@ -100,4 +103,4 @@ web = [
"gloo-timers",
"tracing-web",
]
desktop = ["dioxus/desktop", "tokio", "tokio-rustls", "tracing-subscriber/env-filter"]
desktop = ["dioxus/desktop", "tokio", "tokio-rustls", "tracing-subscriber/env-filter", "opus", "cpal", "dasp_ring_buffer"]
+80 -8
View File
@@ -1,11 +1,15 @@
use crate::app::Command;
use color_eyre::eyre::Error;
use color_eyre::eyre::{eyre, Error};
use cpal::traits::{DeviceTrait, HostTrait};
use crossbeam_queue::ArrayQueue;
use dioxus::hooks::{UnboundedReceiver, UnboundedSender};
use futures::io::{AsyncRead, AsyncWrite};
use mumble_protocol::control::{ClientControlCodec, ControlPacket};
use mumble_protocol::Serverbound;
use mumble_web2_common::GuiConfig;
use std::collections::VecDeque;
use std::net::ToSocketAddrs;
use std::sync::Mutex;
use std::{fmt, io, sync::Arc};
use tokio::net::TcpStream;
use tokio_rustls::rustls;
@@ -15,6 +19,7 @@ use tokio_rustls::rustls::ClientConfig;
use tokio_rustls::rustls::DigitallySignedStruct;
use tokio_rustls::TlsConnector;
use tokio_util::compat::{TokioAsyncReadCompatExt as _, TokioAsyncWriteCompatExt as _};
use tracing::{error, warn};
pub use tokio::task::spawn;
pub use tokio::time::sleep;
@@ -25,25 +30,92 @@ impl<T: AsyncRead + Unpin + Send + 'static> ImpRead for T {}
pub trait ImpWrite: AsyncWrite + Unpin + Send + 'static {}
impl<T: AsyncWrite + Unpin + Send + 'static> ImpWrite for T {}
pub struct AudioSystem();
pub struct AudioSystem {
output: cpal::Device,
input: cpal::Device,
}
const BUF_LEN: usize = 480; // 20 ms
impl AudioSystem {
pub fn new(sender: UnboundedSender<ControlPacket<Serverbound>>) -> Result<Self, Error> {
pub fn new() -> Result<Self, Error> {
// TODO
Ok(AudioSystem())
let host = cpal::default_host();
let name = host.id();
Ok(AudioSystem {
output: host
.default_output_device()
.ok_or(eyre!("no output devices from {name:?}"))?,
input: host
.default_input_device()
.ok_or(eyre!("no input devices from {name:?}"))?,
})
}
pub fn start_recording(&mut self, each: impl FnMut(Vec<u8>) + 'static) -> Result<(), Error> {
// TODO
Ok(())
}
pub fn create_player(&mut self) -> Result<AudioPlayer, Error> {
// TODO
Ok(AudioPlayer())
let queue = Arc::new(ArrayQueue::<[i16; BUF_LEN]>::new(10));
let decoder = opus::Decoder::new(48_000, opus::Channels::Mono)?;
let stream = {
let queue = queue.clone();
self.output.build_output_stream(
&cpal::StreamConfig {
channels: 1,
sample_rate: cpal::SampleRate(48_000),
buffer_size: cpal::BufferSize::Fixed(BUF_LEN as u32), // 20ms playback delay
},
move |out, info| match queue.pop() {
Some(buf) => out.copy_from_slice(&buf[..]),
None => out.fill(0),
},
move |err| error!("could not create output stream {err:?}"),
None,
)?
};
Ok(AudioPlayer {
decoder,
stream,
queue,
tmp: vec![0; 2400],
pos: 0,
})
}
}
pub struct AudioPlayer();
pub struct AudioPlayer {
decoder: opus::Decoder,
stream: cpal::Stream,
queue: Arc<ArrayQueue<[i16; BUF_LEN]>>,
tmp: Vec<i16>,
pos: usize,
}
impl AudioPlayer {
pub fn play_opus(&mut self, payload: &[u8]) {
// TODO
match self
.decoder
.decode(payload, &mut self.tmp[self.pos..], false)
{
Ok(l) => {
self.pos += l;
}
Err(e) => {
error!("opus decode error {e:?}");
}
};
while self.pos >= BUF_LEN {
let mut chunk = [0; BUF_LEN];
chunk.copy_from_slice(&self.tmp[..BUF_LEN]);
dbg!(&chunk);
let _ = self.queue.push(chunk);
let i = std::cell::Cell::new(0usize);
self.tmp.retain(|_| i.replace(i.get() + 1) >= BUF_LEN);
self.pos -= BUF_LEN;
}
}
}
+10 -31
View File
@@ -74,20 +74,22 @@ impl<T> ResultExt<T> for Result<T, JsError> {
pub struct AudioSystem(AudioContext);
impl AudioSystem {
pub fn new(sender: UnboundedSender<ControlPacket<Serverbound>>) -> Result<Self, Error> {
pub fn new() -> Result<Self, Error> {
// Create MediaStreams to playback decoded audio
// The audio context is used to reproduce audio.
let audio_context = configure_audio_context();
Ok(AudioSystem(audio_context))
}
let audio_context_worklet = audio_context.clone();
pub fn start_recording(&mut self, each: impl FnMut(Vec<u8>) + 'static) -> Result<(), Error> {
let audio_context_worklet = self.0.clone();
spawn(async move {
match create_encoder_worklet(&audio_context_worklet, sender).await {
match run_encoder_worklet(&audio_context_worklet, each).await {
Ok(node) => info!("created encoder worklet: {:?}", &node),
Err(err) => error!("could not create encoder worklet: {err}"),
}
});
Ok(AudioSystem(audio_context))
Ok(())
}
pub fn create_player(&mut self) -> Result<AudioPlayer, Error> {
@@ -191,9 +193,9 @@ impl PromiseExt for Promise {
}
}
async fn create_encoder_worklet(
async fn run_encoder_worklet(
audio_context: &AudioContext,
packets: UnboundedSender<ControlPacket<mumble_protocol::Serverbound>>,
mut each: impl FnMut(Vec<u8>) + 'static,
) -> Result<AudioWorkletNode, Error> {
let stream = window()
.unwrap()
@@ -234,35 +236,12 @@ async fn create_encoder_worklet(
let encoder_error: Closure<dyn FnMut(JsValue)> =
Closure::new(|e| error!("error encoding audio {:?}", e));
let download_buffer = std::cell::RefCell::new(Vec::new());
// This knows what MediaStreamTrackGenerator to use as it closes around it
let mut sequence_num = 0;
let output: Closure<dyn FnMut(EncodedAudioChunk)> =
Closure::new(move |audio_data: EncodedAudioChunk| {
let mut array = vec![0u8; audio_data.byte_length() as usize];
audio_data.copy_to_with_u8_slice(&mut array);
download_buffer.borrow_mut().push(array.clone());
if download_buffer.borrow().len() > 200 {
//download_data(download_buffer.borrow().to_vec(), "download_buffer.opus");
//download_data(
// ass::encode(download_buffer.borrow().to_vec(), 960, 0),
// "download_buffer.opus",
//);
download_buffer.borrow_mut().clear();
}
let _ =
packets.unbounded_send(ControlPacket::UDPTunnel(Box::new(VoicePacket::Audio {
_dst: std::marker::PhantomData,
target: 0,
session_id: (),
seq_num: sequence_num,
payload: VoicePacketPayload::Opus(array.into(), false),
position_info: None,
})));
sequence_num = sequence_num.wrapping_add(2);
each(array);
});
let audio_encoder = AudioEncoder::new(&AudioEncoderInit::new(
+18 -1
View File
@@ -15,6 +15,7 @@ pub use imp::spawn;
use mumble_protocol::control::msgs;
use mumble_protocol::control::ControlCodec;
use mumble_protocol::control::ControlPacket;
use mumble_protocol::voice::VoicePacket;
use mumble_protocol::voice::VoicePacketPayload;
use mumble_protocol::Clientbound;
use mumble_protocol::Serverbound;
@@ -103,7 +104,23 @@ pub async fn network_loop<R: imp::ImpRead, W: imp::ImpWrite>(
});
}
let mut audio = imp::AudioSystem::new(send_chan.clone())?;
let mut audio = imp::AudioSystem::new()?;
{
let send_chan = send_chan.clone();
let mut sequence_num = 0;
audio.start_recording(move |opus_frame| {
let _ =
send_chan.unbounded_send(ControlPacket::UDPTunnel(Box::new(VoicePacket::Audio {
_dst: std::marker::PhantomData,
target: 0,
session_id: (),
seq_num: sequence_num,
payload: VoicePacketPayload::Opus(opus_frame.into(), false),
position_info: None,
})));
sequence_num = sequence_num.wrapping_add(2);
});
}
// Create map of session_id -> AudioDecoder
let mut decoder_map = HashMap::new();