make reactivity system pluggable

This commit is contained in:
2026-05-04 23:07:32 -06:00
parent e72bb6d4c4
commit 4dca40ef61
11 changed files with 78 additions and 45 deletions
Generated
-1
View File
@@ -4235,7 +4235,6 @@ dependencies = [
"dasp_ring_buffer", "dasp_ring_buffer",
"deep_filter", "deep_filter",
"dioxus-asset-resolver", "dioxus-asset-resolver",
"dioxus-signals",
"etcetera", "etcetera",
"futures", "futures",
"futures-channel", "futures-channel",
-1
View File
@@ -66,7 +66,6 @@ etcetera = { version = "0.10.0", optional = true }
# Base Dependencies # Base Dependencies
# ================ # ================
dioxus-signals = "0.7.2"
manganis = "0.7.2" manganis = "0.7.2"
once_cell = "1.19.0" once_cell = "1.19.0"
asynchronous-codec = { workspace = true } asynchronous-codec = { workspace = true }
+17 -9
View File
@@ -1,8 +1,8 @@
use dioxus_signals::{ReadableExt as _, Signal};
use mime_guess::Mime; use mime_guess::Mime;
use mumble_web2_common::ProxyOverrides; use mumble_web2_common::ProxyOverrides;
use ordermap::OrderSet; use ordermap::OrderSet;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::ops::{Deref, DerefMut};
use std::{fmt, sync::Arc}; use std::{fmt, sync::Arc};
pub type ChannelId = u32; pub type ChannelId = u32;
@@ -197,19 +197,27 @@ impl ServerState {
} }
} }
pub struct State { pub trait Reactivity {
pub status: Signal<ConnectionState>, type Signal<T>;
pub server: Signal<ServerState>,
pub audio: Signal<AudioSettings>, fn new<T: 'static>(value: T) -> Self::Signal<T>;
fn read<T: 'static>(signal: &Self::Signal<T>) -> impl Deref<Target = T>;
fn write<T: 'static>(signal: &Self::Signal<T>) -> impl DerefMut<Target = T>;
} }
impl fmt::Debug for State { pub struct State<R: Reactivity> {
pub status: R::Signal<ConnectionState>,
pub server: R::Signal<ServerState>,
pub audio: R::Signal<AudioSettings>,
}
impl<R: Reactivity> fmt::Debug for State<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("State") f.debug_struct("State")
.field("status", &self.status.read()) .field("status", &*R::read(&self.status))
.field("server", &self.server.read()) .field("server", &*R::read(&self.server))
.finish() .finish()
} }
} }
pub type SharedState = Arc<State>; pub type SharedState<R> = Arc<State<R>>;
+2 -1
View File
@@ -1,4 +1,5 @@
use crate::app::{Command, SharedState}; use crate::app::{Command, SharedState};
use crate::Reactivity;
use color_eyre::eyre::Error; use color_eyre::eyre::Error;
use futures_channel::mpsc::UnboundedReceiver; use futures_channel::mpsc::UnboundedReceiver;
use mumble_protocol::control::ClientControlCodec; use mumble_protocol::control::ClientControlCodec;
@@ -74,7 +75,7 @@ pub async fn network_connect(
username: String, username: String,
event_rx: &mut UnboundedReceiver<Command>, event_rx: &mut UnboundedReceiver<Command>,
overrides: &ProxyOverrides, overrides: &ProxyOverrides,
state: SharedState, state: SharedState<impl Reactivity>,
) -> Result<(), Error> { ) -> Result<(), Error> {
info!("connecting"); info!("connecting");
+2 -1
View File
@@ -1,4 +1,5 @@
use crate::app::{Command, SharedState}; use crate::app::{Command, SharedState};
use crate::Reactivity;
use color_eyre::eyre::Error; use color_eyre::eyre::Error;
use futures_channel::mpsc::UnboundedReceiver; use futures_channel::mpsc::UnboundedReceiver;
use mumble_web2_common::{ProxyOverrides, ServerStatus}; use mumble_web2_common::{ProxyOverrides, ServerStatus};
@@ -28,7 +29,7 @@ impl super::PlatformInterface for DesktopPlatform {
username: String, username: String,
event_rx: &mut UnboundedReceiver<Command>, event_rx: &mut UnboundedReceiver<Command>,
overrides: &ProxyOverrides, overrides: &ProxyOverrides,
state: SharedState, state: SharedState<impl Reactivity>,
) -> Result<(), Error> { ) -> Result<(), Error> {
super::connect::network_connect(address, username, event_rx, overrides, state).await super::connect::network_connect(address, username, event_rx, overrides, state).await
} }
+2 -1
View File
@@ -1,4 +1,5 @@
use crate::app::{Command, SharedState}; use crate::app::{Command, SharedState};
use crate::Reactivity;
use color_eyre::eyre::Error; use color_eyre::eyre::Error;
use futures_channel::mpsc::UnboundedReceiver; use futures_channel::mpsc::UnboundedReceiver;
use mumble_web2_common::{ProxyOverrides, ServerStatus}; use mumble_web2_common::{ProxyOverrides, ServerStatus};
@@ -24,7 +25,7 @@ impl super::PlatformInterface for MobilePlatform {
username: String, username: String,
event_rx: &mut UnboundedReceiver<Command>, event_rx: &mut UnboundedReceiver<Command>,
overrides: &ProxyOverrides, overrides: &ProxyOverrides,
state: SharedState, state: SharedState<impl Reactivity>,
) -> Result<(), Error> { ) -> Result<(), Error> {
super::connect::network_connect(address, username, event_rx, overrides, state).await super::connect::network_connect(address, username, event_rx, overrides, state).await
} }
+2 -1
View File
@@ -6,6 +6,7 @@
use crate::app::{Command, SharedState}; use crate::app::{Command, SharedState};
use crate::effects::AudioProcessor; use crate::effects::AudioProcessor;
use crate::Reactivity;
use color_eyre::eyre::Error; use color_eyre::eyre::Error;
use futures_channel::mpsc::UnboundedReceiver; use futures_channel::mpsc::UnboundedReceiver;
use mumble_web2_common::{ProxyOverrides, ServerStatus}; use mumble_web2_common::{ProxyOverrides, ServerStatus};
@@ -83,7 +84,7 @@ pub trait PlatformInterface {
username: String, username: String,
event_rx: &mut UnboundedReceiver<Command>, event_rx: &mut UnboundedReceiver<Command>,
proxy_overrides: &ProxyOverrides, proxy_overrides: &ProxyOverrides,
state: SharedState, state: SharedState<impl Reactivity>,
) -> impl Future<Output = Result<(), Error>>; ) -> impl Future<Output = Result<(), Error>>;
/// Get server status (user count, version, etc.) for the given address. /// Get server status (user count, version, etc.) for the given address.
+2 -2
View File
@@ -1,6 +1,6 @@
/// Stub implementation of the platform interface, so that we can /// Stub implementation of the platform interface, so that we can
/// `cargo check` without any --feature flags. /// `cargo check` without any --feature flags.
use crate::{app::SharedState, effects::AudioProcessor}; use crate::{app::SharedState, effects::AudioProcessor, Reactivity};
use color_eyre::eyre::Error; use color_eyre::eyre::Error;
use futures_channel::mpsc::UnboundedReceiver; use futures_channel::mpsc::UnboundedReceiver;
use mumble_web2_common::{ProxyOverrides, ServerStatus}; use mumble_web2_common::{ProxyOverrides, ServerStatus};
@@ -25,7 +25,7 @@ impl super::PlatformInterface for StubPlatform {
_username: String, _username: String,
_event_rx: &mut UnboundedReceiver<crate::app::Command>, _event_rx: &mut UnboundedReceiver<crate::app::Command>,
_overrides: &ProxyOverrides, _overrides: &ProxyOverrides,
_state: SharedState, _state: SharedState<impl Reactivity>,
) -> impl Future<Output = Result<(), Error>> { ) -> impl Future<Output = Result<(), Error>> {
async { panic!("stubbed platform") } async { panic!("stubbed platform") }
} }
+2 -1
View File
@@ -1,5 +1,6 @@
use crate::app::{Command, SharedState}; use crate::app::{Command, SharedState};
use crate::effects::{AudioProcessor, AudioProcessorSender, TransmitState}; use crate::effects::{AudioProcessor, AudioProcessorSender, TransmitState};
use crate::Reactivity;
use color_eyre::eyre::{bail, eyre, Error}; use color_eyre::eyre::{bail, eyre, Error};
use crossbeam::atomic::AtomicCell; use crossbeam::atomic::AtomicCell;
use futures_channel::mpsc::UnboundedReceiver; use futures_channel::mpsc::UnboundedReceiver;
@@ -112,7 +113,7 @@ impl super::PlatformInterface for WebPlatform {
username: String, username: String,
event_rx: &mut UnboundedReceiver<Command>, event_rx: &mut UnboundedReceiver<Command>,
overrides: &ProxyOverrides, overrides: &ProxyOverrides,
state: SharedState, state: SharedState<impl Reactivity>,
) -> Result<(), Error> { ) -> Result<(), Error> {
network_connect(address, username, event_rx, overrides, state).await network_connect(address, username, event_rx, overrides, state).await
} }
+26 -24
View File
@@ -3,11 +3,10 @@ use crate::AudioSettings;
use crate::Chat; use crate::Chat;
use crate::Command; use crate::Command;
use crate::ConnectionState; use crate::ConnectionState;
use crate::Reactivity;
use asynchronous_codec::FramedRead; use asynchronous_codec::FramedRead;
use asynchronous_codec::FramedWrite; use asynchronous_codec::FramedWrite;
use color_eyre::eyre::{bail, Error}; use color_eyre::eyre::{bail, Error};
use dioxus_signals::ReadableExt as _;
use dioxus_signals::WritableExt as _;
use futures::select; use futures::select;
use futures::AsyncRead; use futures::AsyncRead;
use futures::AsyncWrite; use futures::AsyncWrite;
@@ -36,7 +35,10 @@ use crate::imp::{
Platform, PlatformInterface as _, Platform, PlatformInterface as _,
}; };
pub async fn network_entrypoint(mut event_rx: UnboundedReceiver<Command>, state: SharedState) { pub async fn network_entrypoint<X: Reactivity>(
mut event_rx: UnboundedReceiver<Command>,
state: SharedState<X>,
) {
loop { loop {
let Some(Command::Connect { let Some(Command::Connect {
address, address,
@@ -47,16 +49,16 @@ pub async fn network_entrypoint(mut event_rx: UnboundedReceiver<Command>, state:
panic!("did not receive connect command") panic!("did not receive connect command")
}; };
*state.server.write_unchecked() = Default::default(); *X::write(&state.server) = Default::default();
*state.status.write_unchecked() = ConnectionState::Connecting; *X::write(&state.status) = ConnectionState::Connecting;
if let Err(error) = if let Err(error) =
Platform::network_connect(address, username, &mut event_rx, &config, state.clone()) Platform::network_connect(address, username, &mut event_rx, &config, state.clone())
.await .await
{ {
error!("could not connect {:?}", error); error!("could not connect {:?}", error);
*state.status.write_unchecked() = ConnectionState::Failed(error.to_string()); *X::write(&state.status) = ConnectionState::Failed(error.to_string());
} else { } else {
*state.status.write_unchecked() = ConnectionState::Disconnected; *X::write(&state.status) = ConnectionState::Disconnected;
} }
} }
} }
@@ -76,14 +78,14 @@ pub(crate) async fn sender_loop<W: AsyncWrite + Unpin + 'static>(
} }
} }
pub(crate) async fn network_loop<R: AsyncRead + Unpin + 'static>( pub(crate) async fn network_loop<R: AsyncRead + Unpin + 'static, X: Reactivity>(
username: String, username: String,
state: SharedState, state: SharedState<X>,
event_rx: &mut UnboundedReceiver<Command>, event_rx: &mut UnboundedReceiver<Command>,
mut outgoing: UnboundedSender<ControlPacket<Serverbound>>, mut outgoing: UnboundedSender<ControlPacket<Serverbound>>,
mut reader: FramedRead<R, ControlCodec<Serverbound, Clientbound>>, mut reader: FramedRead<R, ControlCodec<Serverbound, Clientbound>>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let audio_settings = state.audio.read().clone(); let audio_settings = X::read(&state.audio).clone();
// Get version packet // Get version packet
let version = match reader.next().await { let version = match reader.next().await {
@@ -190,14 +192,14 @@ pub(crate) async fn network_loop<R: AsyncRead + Unpin + 'static>(
Ok(()) Ok(())
} }
fn accept_command( fn accept_command<X: Reactivity>(
command: Command, command: Command,
send_chan: &mut UnboundedSender<ControlPacket<mumble_protocol::Serverbound>>, send_chan: &mut UnboundedSender<ControlPacket<mumble_protocol::Serverbound>>,
audio: &mut AudioSystem, audio: &mut AudioSystem,
state: &State, state: &State<X>,
) -> Result<(), Error> { ) -> Result<(), Error> {
use Command::*; use Command::*;
let Some(session) = state.server.read().session else { let Some(session) = X::read(&state.server).session else {
bail!("no session id") bail!("no session id")
}; };
@@ -220,7 +222,7 @@ fn accept_command(
}; };
{ {
let mut server = state.server.write_unchecked(); let mut server = X::write(&state.server);
let Some(me) = server.session else { let Some(me) = server.session else {
bail!("not signed in with a session id") bail!("not signed in with a session id")
}; };
@@ -261,7 +263,7 @@ fn accept_command(
}; };
{ {
let mut server = state.server.write_unchecked(); let mut server = X::write(&state.server);
let Some(me) = server.session else { let Some(me) = server.session else {
bail!("not signed in with a session id") bail!("not signed in with a session id")
}; };
@@ -304,11 +306,11 @@ fn accept_command(
Ok(()) Ok(())
} }
fn accept_packet( fn accept_packet<X: Reactivity>(
msg: ControlPacket<mumble_protocol::Clientbound>, msg: ControlPacket<mumble_protocol::Clientbound>,
audio_context: &mut AudioSystem, audio_context: &mut AudioSystem,
player_map: &mut HashMap<u32, AudioPlayer>, player_map: &mut HashMap<u32, AudioPlayer>,
state: &State, state: &State<X>,
) -> Result<(), Error> { ) -> Result<(), Error> {
match msg { match msg {
ControlPacket::UDPTunnel(u) => { ControlPacket::UDPTunnel(u) => {
@@ -345,15 +347,15 @@ fn accept_packet(
} }
} }
ControlPacket::ChannelState(u) => { ControlPacket::ChannelState(u) => {
let mut server = state.server.write_unchecked(); let mut server = X::write(&state.server);
server.channels_state.update_from_channel_state(&u); server.channels_state.update_from_channel_state(&u);
} }
ControlPacket::ChannelRemove(u) => { ControlPacket::ChannelRemove(u) => {
let mut server = state.server.write_unchecked(); let mut server = X::write(&state.server);
server.channels_state.update_from_channel_remove(&u); server.channels_state.update_from_channel_remove(&u);
} }
ControlPacket::UserState(u) => { ControlPacket::UserState(u) => {
let mut server = state.server.write_unchecked(); let mut server = X::write(&state.server);
let server = &mut *server; let server = &mut *server;
let id = u.get_session(); let id = u.get_session();
@@ -397,7 +399,7 @@ fn accept_packet(
} }
} }
ControlPacket::UserRemove(u) => { ControlPacket::UserRemove(u) => {
let mut server = state.server.write_unchecked(); let mut server = X::write(&state.server);
let id = u.get_session(); let id = u.get_session();
if let Some(state) = server.users.remove(&id) { if let Some(state) = server.users.remove(&id) {
if let Some(parent) = server.channels_state.channels.get_mut(&state.channel) { if let Some(parent) = server.channels_state.channels.get_mut(&state.channel) {
@@ -406,7 +408,7 @@ fn accept_packet(
} }
} }
ControlPacket::TextMessage(u) => { ControlPacket::TextMessage(u) => {
let mut server = state.server.write_unchecked(); let mut server = X::write(&state.server);
if u.has_message() { if u.has_message() {
let text = u.get_message().to_string(); let text = u.get_message().to_string();
server.chat.push(Chat { server.chat.push(Chat {
@@ -421,8 +423,8 @@ fn accept_packet(
} }
} }
ControlPacket::ServerSync(u) => { ControlPacket::ServerSync(u) => {
*state.status.write_unchecked() = ConnectionState::Connected; *X::write(&state.status) = ConnectionState::Connected;
let mut server = state.server.write_unchecked(); let mut server = X::write(&state.server);
if u.has_welcome_text() { if u.has_welcome_text() {
let text = u.get_welcome_text().to_string(); let text = u.get_welcome_text().to_string();
server.chat.push(Chat { server.chat.push(Chat {
+23 -3
View File
@@ -4,7 +4,7 @@ use dioxus::prelude::*;
use mumble_web2_client::{ use mumble_web2_client::{
network_entrypoint, reqwest, AudioSettings, ChannelId, Command, ConfigSystem, network_entrypoint, reqwest, AudioSettings, ChannelId, Command, ConfigSystem,
ConfigSystemInterface as _, ConnectionState, Platform, PlatformInterface as _, ServerState, ConfigSystemInterface as _, ConnectionState, Platform, PlatformInterface as _, ServerState,
SharedState, State, UserId, UserState, UserId, UserState,
}; };
use mumble_web2_common::{ProxyOverrides, ServerStatus}; use mumble_web2_common::{ProxyOverrides, ServerStatus};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
@@ -12,6 +12,27 @@ use std::{fmt, sync::Arc};
use Command::*; use Command::*;
use ConnectionState::*; use ConnectionState::*;
pub struct DioxusReactivity;
impl mumble_web2_client::Reactivity for DioxusReactivity {
type Signal<T> = Signal<T>;
fn new<T: 'static>(value: T) -> Signal<T> {
Signal::new(value)
}
fn read<T: 'static>(signal: &Signal<T>) -> impl std::ops::Deref<Target = T> {
signal.read_unchecked()
}
fn write<T: 'static>(signal: &Signal<T>) -> impl std::ops::DerefMut<Target = T> {
signal.write_unchecked()
}
}
pub type SharedState = mumble_web2_client::SharedState<DioxusReactivity>;
pub type State = mumble_web2_client::State<DioxusReactivity>;
#[derive(Clone, Copy, PartialEq, Eq)] #[derive(Clone, Copy, PartialEq, Eq)]
pub enum UserIcon { pub enum UserIcon {
Normal, Normal,
@@ -519,8 +540,7 @@ pub fn LoginView(overrides: Resource<ProxyOverrides>) -> Element {
async move { async move {
let client = reqwest::Client::new(); let client = reqwest::Client::new();
loop { loop {
*last_status.write_unchecked() = *last_status.write_unchecked() = Some(Platform::get_status(&client, &addr).await);
Some(Platform::get_status(&client, &addr).await);
Platform::sleep(std::time::Duration::from_secs_f32(1.0)).await; Platform::sleep(std::time::Duration::from_secs_f32(1.0)).await;
} }
} }