f49f470f80
Generate a random 256-bit token when spawning a proxy process, pass it as a CLI argument, and return it to the client in the stream start response. The proxy validates the token on WebTransport connect and consumes it after first use, preventing replay. A wrong token attempt also consumes the token for security. Includes 5 unit tests for token validation logic. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
144 lines
4.6 KiB
Rust
144 lines
4.6 KiB
Rust
use anyhow::{Result, anyhow};
|
|
|
|
use salvo::{
|
|
prelude::*,
|
|
proto::quic::BidiStream,
|
|
webtransport::{self},
|
|
};
|
|
use serde::{Deserialize, Serialize};
|
|
use tracing::{error, info};
|
|
|
|
use crate::{
|
|
backend,
|
|
common::{AppError, AppResult},
|
|
};
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
pub struct ProxySetupParams {
|
|
pub stream: backend::Stream,
|
|
}
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
pub struct ProxySetupResponse {
|
|
pub cert_hash: [u8; 32],
|
|
//pub cert_hash: String,
|
|
}
|
|
|
|
async fn setup_webtransport(
|
|
req: &mut Request,
|
|
) -> Result<(
|
|
impl tokio::io::AsyncWrite + Send + Sync + 'static,
|
|
impl tokio::io::AsyncRead + Send + Sync + 'static,
|
|
h3_datagram::datagram_handler::DatagramSender<
|
|
<h3_quinn::Connection as h3_datagram::quic_traits::DatagramConnectionExt<
|
|
salvo::hyper::body::Bytes,
|
|
>>::SendDatagramHandler,
|
|
salvo::hyper::body::Bytes,
|
|
>,
|
|
//salvo::webtransport::stream::SendStream<
|
|
// impl salvo::proto::quic::SendStream<salvo::hyper::body::Bytes>,
|
|
// salvo::hyper::body::Bytes,
|
|
//>,
|
|
//salvo::webtransport::stream::RecvStream<
|
|
// impl salvo::proto::quic::RecvStream,
|
|
// salvo::hyper::body::Bytes,
|
|
//>,
|
|
)> {
|
|
let session = req.web_transport_mut().await?;
|
|
|
|
let datagram_send = session.datagram_sender();
|
|
|
|
let bidirectional_stream = session
|
|
.accept_bi()
|
|
.await?
|
|
.ok_or(anyhow!("No bidirectional stream"))?;
|
|
|
|
if let webtransport::server::AcceptedBi::BidiStream(_, stream) = bidirectional_stream {
|
|
let (stream_send, stream_recv) = stream.split();
|
|
Ok((stream_send, stream_recv, datagram_send))
|
|
} else {
|
|
Err(anyhow!("bidirectional stream was of the wrong type"))
|
|
}
|
|
}
|
|
|
|
#[craft]
|
|
impl crate::proxy::Proxy {
|
|
#[craft(handler)]
|
|
pub async fn stream_setup(
|
|
self: ::std::sync::Arc<Self>,
|
|
body: salvo::oapi::extract::JsonBody<ProxySetupParams>,
|
|
) -> AppResult<Json<ProxySetupResponse>> {
|
|
let mut writer = self.stream.write().await;
|
|
*writer = Some(body.stream.clone());
|
|
|
|
info!("Configured proxy with config: {:?}", body.stream);
|
|
|
|
Ok(Json(ProxySetupResponse {
|
|
cert_hash: self.cert_hash,
|
|
}))
|
|
}
|
|
|
|
#[craft(handler)]
|
|
pub async fn stream_connect(self: ::std::sync::Arc<Self>, req: &mut Request) -> AppResult<()> {
|
|
let standard_error = Err(crate::common::AppError {
|
|
status_code: StatusCode::INTERNAL_SERVER_ERROR,
|
|
description: "Could not start stream".to_string(),
|
|
});
|
|
|
|
// Validate single-use stream token
|
|
let provided_token = req.query::<String>("token").unwrap_or_default();
|
|
{
|
|
let mut token_guard = self.stream_token.write().await;
|
|
match token_guard.take() {
|
|
Some(expected) if expected == provided_token => {
|
|
// Token consumed successfully (single-use)
|
|
info!("Stream token validated and consumed");
|
|
}
|
|
Some(_) => {
|
|
error!("Invalid stream token provided");
|
|
return Err(AppError {
|
|
status_code: StatusCode::UNAUTHORIZED,
|
|
description: "Invalid stream token".to_string(),
|
|
});
|
|
}
|
|
None => {
|
|
error!("Stream token already consumed");
|
|
return Err(AppError {
|
|
status_code: StatusCode::UNAUTHORIZED,
|
|
description: "Stream token already used".to_string(),
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
info!("WebTransport connection initiated");
|
|
let (wt_stream_send, wt_stream_recv, wt_datagram_send) = match setup_webtransport(req).await
|
|
{
|
|
Ok(w) => w,
|
|
Err(e) => {
|
|
error!("Could not upgrade connection to WebTransport: {e}");
|
|
return standard_error;
|
|
}
|
|
};
|
|
|
|
let stream = match self.stream.read().await.clone() {
|
|
Some(s) => s,
|
|
None => {
|
|
error!("Stream has not been configured, cannot connect to server");
|
|
return Err(AppError {
|
|
status_code: StatusCode::BAD_REQUEST,
|
|
description: "Proxy has not been configured yet: THIS IS A BUG".to_string(),
|
|
});
|
|
}
|
|
};
|
|
|
|
match super::proxy_main(stream, wt_stream_send, wt_stream_recv, wt_datagram_send).await {
|
|
Ok(()) => Ok(()),
|
|
Err(e) => {
|
|
error!("Proxy main loop failed: {e}");
|
|
standard_error
|
|
}
|
|
}
|
|
}
|
|
}
|