From fae7971f2bee34fb1fac163b902f245703c43e71 Mon Sep 17 00:00:00 2001 From: restitux Date: Sun, 20 Jul 2025 16:44:23 -0600 Subject: [PATCH] backend: refactor proxy code --- .../src/gamestream/mod.rs | 2 +- gamestream-webtransport-proxy/src/proxy.rs | 143 ------------------ .../src/proxy/handler.rs | 106 +++++++++++++ .../src/proxy/mod.rs | 87 +++++++++++ gamestream-webtransport-proxy/src/stream.rs | 11 +- 5 files changed, 201 insertions(+), 148 deletions(-) delete mode 100644 gamestream-webtransport-proxy/src/proxy.rs create mode 100644 gamestream-webtransport-proxy/src/proxy/handler.rs create mode 100644 gamestream-webtransport-proxy/src/proxy/mod.rs diff --git a/gamestream-webtransport-proxy/src/gamestream/mod.rs b/gamestream-webtransport-proxy/src/gamestream/mod.rs index 5d7affd..d83af7f 100644 --- a/gamestream-webtransport-proxy/src/gamestream/mod.rs +++ b/gamestream-webtransport-proxy/src/gamestream/mod.rs @@ -10,7 +10,7 @@ pub struct GamestreamChannels { } pub fn start_connection( - stream: crate::backend::Stream, + stream: &crate::backend::Stream, address: &str, ) -> Result { let mut server_info = config::server_info( diff --git a/gamestream-webtransport-proxy/src/proxy.rs b/gamestream-webtransport-proxy/src/proxy.rs deleted file mode 100644 index c2140d3..0000000 --- a/gamestream-webtransport-proxy/src/proxy.rs +++ /dev/null @@ -1,143 +0,0 @@ -use salvo::prelude::*; -use serde::{Deserialize, Serialize}; -use tokio::{select, sync::RwLock}; -use tracing::{error, info}; - -use crate::common::{AppError, AppResult}; - -pub struct Proxy { - pub cert_hash: [u8; 32], - //pub cert_hash: String, - pub stream: RwLock>, -} - -impl Proxy { - pub fn new(cert_hash: [u8; 32]) -> Self { - //pub fn new(cert_hash: String) -> Self { - Proxy { - stream: RwLock::new(None), - cert_hash, - } - } -} - -#[derive(Serialize, Deserialize)] -pub struct ProxySetupParams { - pub stream: crate::backend::Stream, -} - -#[derive(Serialize, Deserialize)] -pub struct ProxySetupResponse { - pub cert_hash: [u8; 32], - //pub cert_hash: String, -} - -#[craft] -impl crate::proxy::Proxy { - #[craft(handler)] - pub async fn stream_setup( - self: ::std::sync::Arc, - body: salvo::oapi::extract::JsonBody, - ) -> AppResult> { - let mut writer = self.stream.write().await; - *writer = Some(body.stream.clone()); - - info!("Configured proxy with config: {:?}", body.stream); - - Ok(Json(ProxySetupResponse { - cert_hash: self.cert_hash, - })) - } - - #[craft(handler)] - pub async fn stream_connect(self: ::std::sync::Arc, req: &mut Request) -> AppResult<()> { - let standard_error = Err(crate::common::AppError { - status_code: StatusCode::INTERNAL_SERVER_ERROR, - description: "Could not start stream".to_string(), - }); - - info!("WebTransport connection initiated"); - - let session = match req.web_transport_mut().await { - Ok(w) => w, - Err(e) => { - error!("Could not initalize WebTransport connection: {e}"); - return Err(AppError { - status_code: StatusCode::BAD_REQUEST, - description: "User did not connect with a WebTransport connection".to_string(), - }); - } - }; - - let stream = match self.stream.read().await.clone() { - Some(s) => s, - None => { - error!("Stream has not been configured, cannot connect to server"); - return Err(AppError { - status_code: StatusCode::BAD_REQUEST, - description: "Proxy has not been configured yet: THIS IS A BUG".to_string(), - }); - } - }; - - tracing::debug!( - "Connecting to stream at address {} with stream config {:?}", - stream.server_address, - stream - ); - - let (tx, rx) = tokio::sync::oneshot::channel(); - let (stop_tx, stop_rx) = tokio::sync::oneshot::channel::<()>(); - - let host = stream.server_address.clone(); - std::thread::spawn(move || { - let result = crate::gamestream::start_connection(stream, &host); - - let _ = tx.send(result); - - let _ = stop_rx.blocking_recv(); - - crate::gamestream::stop_connection(); - }); - - let mut channels = match rx.await { - Ok(r) => match r { - Ok(r) => r, - Err(e) => { - error!("Could not get gamestream communication channels: {e}"); - return standard_error; - } - }, - Err(e) => { - error!("Could not start connection: {e}"); - return standard_error; - } - }; - - loop { - select! { - recv = channels.decoder_rx.recv() => { - match recv { - Some(frame) => { - info!("Got decoder packet") - } - None => { - error!("Decoder channel is None"); - break; - } - } - } - } - } - - //// Handle bidirectional streams - //if let Ok(Some(webtransport::server::AcceptedBi::BidiStream(_, stream))) = - // session.accept_bi().await - //{ - // let (send, recv) = salvo::proto::quic::BidiStream::split(stream); - // // Process bidirectional stream data - //} - - Ok(()) - } -} diff --git a/gamestream-webtransport-proxy/src/proxy/handler.rs b/gamestream-webtransport-proxy/src/proxy/handler.rs new file mode 100644 index 0000000..2229b8c --- /dev/null +++ b/gamestream-webtransport-proxy/src/proxy/handler.rs @@ -0,0 +1,106 @@ +use anyhow::{Result, anyhow}; + +use salvo::{ + prelude::*, + proto::quic::BidiStream, + webtransport::{self}, +}; +use serde::{Deserialize, Serialize}; +use tracing::{error, info}; + +use crate::{ + backend, + common::{AppError, AppResult}, +}; + +#[derive(Serialize, Deserialize)] +pub struct ProxySetupParams { + pub stream: backend::Stream, +} + +#[derive(Serialize, Deserialize)] +pub struct ProxySetupResponse { + pub cert_hash: [u8; 32], + //pub cert_hash: String, +} + +async fn setup_webtransport( + req: &mut Request, +) -> Result<( + impl tokio::io::AsyncWrite + Send + Sync + 'static, + impl tokio::io::AsyncRead + Send + Sync + 'static, + //salvo::webtransport::stream::SendStream< + // impl salvo::proto::quic::SendStream, + // salvo::hyper::body::Bytes, + //>, + //salvo::webtransport::stream::RecvStream< + // impl salvo::proto::quic::RecvStream, + // salvo::hyper::body::Bytes, + //>, +)> { + let session = req.web_transport_mut().await?; + let bidirectional_stream = session + .accept_bi() + .await? + .ok_or(anyhow!("No bidirectional stream"))?; + + if let webtransport::server::AcceptedBi::BidiStream(_, stream) = bidirectional_stream { + Ok(stream.split()) + } else { + Err(anyhow!("bidirectional stream was of the wrong type")) + } +} + +#[craft] +impl crate::proxy::Proxy { + #[craft(handler)] + pub async fn stream_setup( + self: ::std::sync::Arc, + body: salvo::oapi::extract::JsonBody, + ) -> AppResult> { + let mut writer = self.stream.write().await; + *writer = Some(body.stream.clone()); + + info!("Configured proxy with config: {:?}", body.stream); + + Ok(Json(ProxySetupResponse { + cert_hash: self.cert_hash, + })) + } + + #[craft(handler)] + pub async fn stream_connect(self: ::std::sync::Arc, req: &mut Request) -> AppResult<()> { + let standard_error = Err(crate::common::AppError { + status_code: StatusCode::INTERNAL_SERVER_ERROR, + description: "Could not start stream".to_string(), + }); + + info!("WebTransport connection initiated"); + let (wt_send, wt_recv) = match setup_webtransport(req).await { + Ok(w) => w, + Err(e) => { + error!("Could not upgrade connection to WebTransport: {e}"); + return standard_error; + } + }; + + let stream = match self.stream.read().await.clone() { + Some(s) => s, + None => { + error!("Stream has not been configured, cannot connect to server"); + return Err(AppError { + status_code: StatusCode::BAD_REQUEST, + description: "Proxy has not been configured yet: THIS IS A BUG".to_string(), + }); + } + }; + + match super::proxy_main(stream, wt_send, wt_recv).await { + Ok(()) => Ok(()), + Err(e) => { + error!("Proxy main loop failed: {e}"); + standard_error + } + } + } +} diff --git a/gamestream-webtransport-proxy/src/proxy/mod.rs b/gamestream-webtransport-proxy/src/proxy/mod.rs new file mode 100644 index 0000000..f613b99 --- /dev/null +++ b/gamestream-webtransport-proxy/src/proxy/mod.rs @@ -0,0 +1,87 @@ +use anyhow::{Context, Result}; +use salvo::{conn::http1::Connection, hyper::body::Bytes, proto::WebTransportSession}; +use tokio::{io::AsyncReadExt, io::AsyncWriteExt, select, sync::RwLock}; +use tracing::{debug, error, info}; + +use crate::{backend, gamestream}; + +pub mod handler; + +pub struct Proxy { + pub cert_hash: [u8; 32], + //pub cert_hash: String, + pub stream: RwLock>, +} + +impl Proxy { + pub fn new(cert_hash: [u8; 32]) -> Self { + //pub fn new(cert_hash: String) -> Self { + Proxy { + stream: RwLock::new(None), + cert_hash, + } + } +} + +pub struct Channels { + gamestream_channels: gamestream::GamestreamChannels, + stop_tx: tokio::sync::oneshot::Sender<()>, +} + +async fn proxy_main( + stream: backend::Stream, + mut wt_send: impl tokio::io::AsyncWrite + Send + Sync + 'static + std::marker::Unpin, + mut wt_recv: impl tokio::io::AsyncRead + Send + Sync + 'static + std::marker::Unpin, +) -> Result<()> { + debug!( + "Connecting to stream at address {} with stream config {:?}", + stream.server_address, stream + ); + + let mut channels = spawn_gamestream(stream).await?; + + let mut buffer = vec![0; 65536].into_boxed_slice(); + + loop { + select! { + gamestream_packet = channels.gamestream_channels.decoder_rx.recv() => { + match gamestream_packet { + Some(frame) => { + info!("Got decoder packet"); + wt_send.write_all(&[0;32]).await?; + } + None => { + error!("Decoder channel is None"); + break; + } + } + }, + webtransport_packet = wt_recv.read(&mut buffer) => { + info!("Got packet from client"); + } + } + } + Ok(()) +} + +async fn spawn_gamestream(stream: backend::Stream) -> Result { + let (tx, rx) = tokio::sync::oneshot::channel(); + let (stop_tx, stop_rx) = tokio::sync::oneshot::channel::<()>(); + + std::thread::spawn(move || { + let result = gamestream::start_connection(&stream, &stream.server_address); + + let _ = tx.send(result); + + let _ = stop_rx.blocking_recv(); + + gamestream::stop_connection(); + }); + + Ok(Channels { + stop_tx, + gamestream_channels: rx + .await? + .context("Could not get gamestream communication channels")?, + }) +} diff --git a/gamestream-webtransport-proxy/src/stream.rs b/gamestream-webtransport-proxy/src/stream.rs index b50e941..9227e4d 100644 --- a/gamestream-webtransport-proxy/src/stream.rs +++ b/gamestream-webtransport-proxy/src/stream.rs @@ -6,7 +6,7 @@ use tracing::{debug, error, info}; use crate::{ common::{AppError, AppResult, get_url}, - proxy::ProxySetupResponse, + proxy, state::{GamestreamServer, StateReadAccess, StateReader}, }; @@ -59,7 +59,10 @@ fn get_server(reader: &StateReadAccess, server: &String) -> Result Result { +async fn setup_proxy( + port: u16, + stream: crate::backend::Stream, +) -> Result { let url = url_constructor::UrlConstructor::new() .scheme("https") .host("localhost") @@ -79,10 +82,10 @@ async fn setup_proxy(port: u16, stream: crate::backend::Stream) -> Result() + .json::() .await?) }