backend: refactor proxy code

This commit is contained in:
2025-07-20 16:44:23 -06:00
parent a11b4deb31
commit fae7971f2b
5 changed files with 201 additions and 148 deletions
@@ -10,7 +10,7 @@ pub struct GamestreamChannels {
}
pub fn start_connection(
stream: crate::backend::Stream,
stream: &crate::backend::Stream,
address: &str,
) -> Result<GamestreamChannels> {
let mut server_info = config::server_info(
-143
View File
@@ -1,143 +0,0 @@
use salvo::prelude::*;
use serde::{Deserialize, Serialize};
use tokio::{select, sync::RwLock};
use tracing::{error, info};
use crate::common::{AppError, AppResult};
pub struct Proxy {
pub cert_hash: [u8; 32],
//pub cert_hash: String,
pub stream: RwLock<Option<crate::backend::Stream>>,
}
impl Proxy {
pub fn new(cert_hash: [u8; 32]) -> Self {
//pub fn new(cert_hash: String) -> Self {
Proxy {
stream: RwLock::new(None),
cert_hash,
}
}
}
#[derive(Serialize, Deserialize)]
pub struct ProxySetupParams {
pub stream: crate::backend::Stream,
}
#[derive(Serialize, Deserialize)]
pub struct ProxySetupResponse {
pub cert_hash: [u8; 32],
//pub cert_hash: String,
}
#[craft]
impl crate::proxy::Proxy {
#[craft(handler)]
pub async fn stream_setup(
self: ::std::sync::Arc<Self>,
body: salvo::oapi::extract::JsonBody<ProxySetupParams>,
) -> AppResult<Json<ProxySetupResponse>> {
let mut writer = self.stream.write().await;
*writer = Some(body.stream.clone());
info!("Configured proxy with config: {:?}", body.stream);
Ok(Json(ProxySetupResponse {
cert_hash: self.cert_hash,
}))
}
#[craft(handler)]
pub async fn stream_connect(self: ::std::sync::Arc<Self>, req: &mut Request) -> AppResult<()> {
let standard_error = Err(crate::common::AppError {
status_code: StatusCode::INTERNAL_SERVER_ERROR,
description: "Could not start stream".to_string(),
});
info!("WebTransport connection initiated");
let session = match req.web_transport_mut().await {
Ok(w) => w,
Err(e) => {
error!("Could not initalize WebTransport connection: {e}");
return Err(AppError {
status_code: StatusCode::BAD_REQUEST,
description: "User did not connect with a WebTransport connection".to_string(),
});
}
};
let stream = match self.stream.read().await.clone() {
Some(s) => s,
None => {
error!("Stream has not been configured, cannot connect to server");
return Err(AppError {
status_code: StatusCode::BAD_REQUEST,
description: "Proxy has not been configured yet: THIS IS A BUG".to_string(),
});
}
};
tracing::debug!(
"Connecting to stream at address {} with stream config {:?}",
stream.server_address,
stream
);
let (tx, rx) = tokio::sync::oneshot::channel();
let (stop_tx, stop_rx) = tokio::sync::oneshot::channel::<()>();
let host = stream.server_address.clone();
std::thread::spawn(move || {
let result = crate::gamestream::start_connection(stream, &host);
let _ = tx.send(result);
let _ = stop_rx.blocking_recv();
crate::gamestream::stop_connection();
});
let mut channels = match rx.await {
Ok(r) => match r {
Ok(r) => r,
Err(e) => {
error!("Could not get gamestream communication channels: {e}");
return standard_error;
}
},
Err(e) => {
error!("Could not start connection: {e}");
return standard_error;
}
};
loop {
select! {
recv = channels.decoder_rx.recv() => {
match recv {
Some(frame) => {
info!("Got decoder packet")
}
None => {
error!("Decoder channel is None");
break;
}
}
}
}
}
//// Handle bidirectional streams
//if let Ok(Some(webtransport::server::AcceptedBi::BidiStream(_, stream))) =
// session.accept_bi().await
//{
// let (send, recv) = salvo::proto::quic::BidiStream::split(stream);
// // Process bidirectional stream data
//}
Ok(())
}
}
@@ -0,0 +1,106 @@
use anyhow::{Result, anyhow};
use salvo::{
prelude::*,
proto::quic::BidiStream,
webtransport::{self},
};
use serde::{Deserialize, Serialize};
use tracing::{error, info};
use crate::{
backend,
common::{AppError, AppResult},
};
#[derive(Serialize, Deserialize)]
pub struct ProxySetupParams {
pub stream: backend::Stream,
}
#[derive(Serialize, Deserialize)]
pub struct ProxySetupResponse {
pub cert_hash: [u8; 32],
//pub cert_hash: String,
}
async fn setup_webtransport(
req: &mut Request,
) -> Result<(
impl tokio::io::AsyncWrite + Send + Sync + 'static,
impl tokio::io::AsyncRead + Send + Sync + 'static,
//salvo::webtransport::stream::SendStream<
// impl salvo::proto::quic::SendStream<salvo::hyper::body::Bytes>,
// salvo::hyper::body::Bytes,
//>,
//salvo::webtransport::stream::RecvStream<
// impl salvo::proto::quic::RecvStream,
// salvo::hyper::body::Bytes,
//>,
)> {
let session = req.web_transport_mut().await?;
let bidirectional_stream = session
.accept_bi()
.await?
.ok_or(anyhow!("No bidirectional stream"))?;
if let webtransport::server::AcceptedBi::BidiStream(_, stream) = bidirectional_stream {
Ok(stream.split())
} else {
Err(anyhow!("bidirectional stream was of the wrong type"))
}
}
#[craft]
impl crate::proxy::Proxy {
#[craft(handler)]
pub async fn stream_setup(
self: ::std::sync::Arc<Self>,
body: salvo::oapi::extract::JsonBody<ProxySetupParams>,
) -> AppResult<Json<ProxySetupResponse>> {
let mut writer = self.stream.write().await;
*writer = Some(body.stream.clone());
info!("Configured proxy with config: {:?}", body.stream);
Ok(Json(ProxySetupResponse {
cert_hash: self.cert_hash,
}))
}
#[craft(handler)]
pub async fn stream_connect(self: ::std::sync::Arc<Self>, req: &mut Request) -> AppResult<()> {
let standard_error = Err(crate::common::AppError {
status_code: StatusCode::INTERNAL_SERVER_ERROR,
description: "Could not start stream".to_string(),
});
info!("WebTransport connection initiated");
let (wt_send, wt_recv) = match setup_webtransport(req).await {
Ok(w) => w,
Err(e) => {
error!("Could not upgrade connection to WebTransport: {e}");
return standard_error;
}
};
let stream = match self.stream.read().await.clone() {
Some(s) => s,
None => {
error!("Stream has not been configured, cannot connect to server");
return Err(AppError {
status_code: StatusCode::BAD_REQUEST,
description: "Proxy has not been configured yet: THIS IS A BUG".to_string(),
});
}
};
match super::proxy_main(stream, wt_send, wt_recv).await {
Ok(()) => Ok(()),
Err(e) => {
error!("Proxy main loop failed: {e}");
standard_error
}
}
}
}
@@ -0,0 +1,87 @@
use anyhow::{Context, Result};
use salvo::{conn::http1::Connection, hyper::body::Bytes, proto::WebTransportSession};
use tokio::{io::AsyncReadExt, io::AsyncWriteExt, select, sync::RwLock};
use tracing::{debug, error, info};
use crate::{backend, gamestream};
pub mod handler;
pub struct Proxy {
pub cert_hash: [u8; 32],
//pub cert_hash: String,
pub stream: RwLock<Option<backend::Stream>>,
}
impl Proxy {
pub fn new(cert_hash: [u8; 32]) -> Self {
//pub fn new(cert_hash: String) -> Self {
Proxy {
stream: RwLock::new(None),
cert_hash,
}
}
}
pub struct Channels {
gamestream_channels: gamestream::GamestreamChannels,
stop_tx: tokio::sync::oneshot::Sender<()>,
}
async fn proxy_main(
stream: backend::Stream,
mut wt_send: impl tokio::io::AsyncWrite + Send + Sync + 'static + std::marker::Unpin,
mut wt_recv: impl tokio::io::AsyncRead + Send + Sync + 'static + std::marker::Unpin,
) -> Result<()> {
debug!(
"Connecting to stream at address {} with stream config {:?}",
stream.server_address, stream
);
let mut channels = spawn_gamestream(stream).await?;
let mut buffer = vec![0; 65536].into_boxed_slice();
loop {
select! {
gamestream_packet = channels.gamestream_channels.decoder_rx.recv() => {
match gamestream_packet {
Some(frame) => {
info!("Got decoder packet");
wt_send.write_all(&[0;32]).await?;
}
None => {
error!("Decoder channel is None");
break;
}
}
},
webtransport_packet = wt_recv.read(&mut buffer) => {
info!("Got packet from client");
}
}
}
Ok(())
}
async fn spawn_gamestream(stream: backend::Stream) -> Result<Channels> {
let (tx, rx) = tokio::sync::oneshot::channel();
let (stop_tx, stop_rx) = tokio::sync::oneshot::channel::<()>();
std::thread::spawn(move || {
let result = gamestream::start_connection(&stream, &stream.server_address);
let _ = tx.send(result);
let _ = stop_rx.blocking_recv();
gamestream::stop_connection();
});
Ok(Channels {
stop_tx,
gamestream_channels: rx
.await?
.context("Could not get gamestream communication channels")?,
})
}
+7 -4
View File
@@ -6,7 +6,7 @@ use tracing::{debug, error, info};
use crate::{
common::{AppError, AppResult, get_url},
proxy::ProxySetupResponse,
proxy,
state::{GamestreamServer, StateReadAccess, StateReader},
};
@@ -59,7 +59,10 @@ fn get_server(reader: &StateReadAccess, server: &String) -> Result<Option<Gamest
Ok(servers.get(server).cloned())
}
async fn setup_proxy(port: u16, stream: crate::backend::Stream) -> Result<ProxySetupResponse> {
async fn setup_proxy(
port: u16,
stream: crate::backend::Stream,
) -> Result<proxy::handler::ProxySetupResponse> {
let url = url_constructor::UrlConstructor::new()
.scheme("https")
.host("localhost")
@@ -79,10 +82,10 @@ async fn setup_proxy(port: u16, stream: crate::backend::Stream) -> Result<ProxyS
Ok(client
.post(url)
.json(&crate::proxy::ProxySetupParams { stream })
.json(&proxy::handler::ProxySetupParams { stream })
.send()
.await?
.json::<ProxySetupResponse>()
.json::<proxy::handler::ProxySetupResponse>()
.await?)
}