meta: WebTransport now works

- added a small frontend for starting
- added logic to serve the frontend
- split out the gamestream logic into a separate process
- added logic to scaffold the separate proxy process
This commit is contained in:
2025-07-20 14:07:01 -06:00
parent 188005ab11
commit a11b4deb31
13 changed files with 767 additions and 156 deletions
Generated
+35
View File
@@ -424,6 +424,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e" checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e"
dependencies = [ dependencies = [
"powerfmt", "powerfmt",
"serde",
] ]
[[package]] [[package]]
@@ -669,6 +670,7 @@ dependencies = [
"directories", "directories",
"getrandom 0.3.3", "getrandom 0.3.3",
"hex", "hex",
"hmac-sha256",
"libc", "libc",
"moonlight-common-c-sys", "moonlight-common-c-sys",
"openssl", "openssl",
@@ -858,6 +860,12 @@ dependencies = [
"digest", "digest",
] ]
[[package]]
name = "hmac-sha256"
version = "1.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad6880c8d4a9ebf39c6e8b77007ce223f646a4d21ce29d99f70cb16420545425"
[[package]] [[package]]
name = "http" name = "http"
version = "1.3.1" version = "1.3.1"
@@ -1527,6 +1535,12 @@ dependencies = [
"windows-targets 0.52.6", "windows-targets 0.52.6",
] ]
[[package]]
name = "path-slash"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e91099d4268b0e11973f036e885d652fb0b21fedcf69738c627f94db6a44f42"
[[package]] [[package]]
name = "pem" name = "pem"
version = "3.0.5" version = "3.0.5"
@@ -2115,6 +2129,7 @@ dependencies = [
"salvo-jwt-auth", "salvo-jwt-auth",
"salvo-oapi", "salvo-oapi",
"salvo-proxy", "salvo-proxy",
"salvo-serve-static",
"salvo_core", "salvo_core",
"salvo_extra", "salvo_extra",
] ]
@@ -2257,6 +2272,26 @@ dependencies = [
"syn 2.0.104", "syn 2.0.104",
] ]
[[package]]
name = "salvo-serve-static"
version = "0.80.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a98e8c27387f5b28ce66c337ae5772268c6d5f728b605fad4355cd4b58f6974b"
dependencies = [
"hex",
"mime",
"mime-infer",
"path-slash",
"percent-encoding",
"rust-embed",
"salvo_core",
"serde",
"serde_json",
"time",
"tokio",
"tracing",
]
[[package]] [[package]]
name = "salvo_core" name = "salvo_core"
version = "0.80.0" version = "0.80.0"
+9 -1
View File
@@ -8,6 +8,7 @@ anyhow = "1.0.98"
directories = "6.0.0" directories = "6.0.0"
getrandom = { version = "0.3.3", features = ["std"] } getrandom = { version = "0.3.3", features = ["std"] }
hex = "0.4.3" hex = "0.4.3"
hmac-sha256 = "1.1.12"
libc = "0.2.174" libc = "0.2.174"
moonlight-common-c-sys = { path = "../moonlight-common-c-sys" } moonlight-common-c-sys = { path = "../moonlight-common-c-sys" }
openssl = "0.10.73" openssl = "0.10.73"
@@ -15,8 +16,15 @@ rand = "0.9.1"
reqwest = { version = "0.12.20", features = [ reqwest = { version = "0.12.20", features = [
"rustls-tls", "rustls-tls",
"native-tls", "native-tls",
"json",
], default-features = false } ], default-features = false }
salvo = { version = "0.80.0", features = ["oapi", "craft", "logging", "quinn"] } salvo = { version = "0.80.0", features = [
"oapi",
"craft",
"logging",
"quinn",
"serve-static",
] }
serde = { version = "1.0.219", features = ["serde_derive"] } serde = { version = "1.0.219", features = ["serde_derive"] }
serde-xml-rs = "0.8.1" serde-xml-rs = "0.8.1"
serde_json = "1.0.140" serde_json = "1.0.140"
+1 -1
View File
@@ -37,7 +37,7 @@ struct GetAppsResponse {
} }
#[craft] #[craft]
impl crate::server::Server { impl crate::backend::Backend {
#[craft(endpoint(status_codes(StatusCode::OK, StatusCode::INTERNAL_SERVER_ERROR)))] #[craft(endpoint(status_codes(StatusCode::OK, StatusCode::INTERNAL_SERVER_ERROR)))]
pub async fn get_apps(self: ::std::sync::Arc<Self>) -> AppResult<Json<GetAppsResponse>> { pub async fn get_apps(self: ::std::sync::Arc<Self>) -> AppResult<Json<GetAppsResponse>> {
let standard_error = Err(crate::common::AppError { let standard_error = Err(crate::common::AppError {
@@ -2,12 +2,12 @@ use std::collections::HashMap;
use anyhow::Result; use anyhow::Result;
use salvo::oapi::ToSchema; use salvo::oapi::ToSchema;
use serde::Deserialize; use serde::{Deserialize, Serialize};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use crate::state::StateFile; use crate::state::StateFile;
#[derive(Debug, Clone)] #[derive(Debug, Clone, Deserialize, Serialize)]
pub struct InputCrypto { pub struct InputCrypto {
pub aes_key: [u8; 16], pub aes_key: [u8; 16],
pub aes_iv: [u8; 16], pub aes_iv: [u8; 16],
@@ -50,7 +50,7 @@ impl InputCrypto {
} }
} }
#[derive(Debug, Clone, ToSchema, Deserialize)] #[derive(Debug, Clone, ToSchema, Serialize, Deserialize)]
pub struct Mode { pub struct Mode {
pub width: i32, pub width: i32,
pub height: i32, pub height: i32,
@@ -63,20 +63,20 @@ impl Mode {
} }
} }
#[derive(Debug, Clone, ToSchema, Deserialize)] #[derive(Debug, Clone, ToSchema, Serialize, Deserialize)]
pub struct StreamConfig { pub struct StreamConfig {
pub mode: Mode, pub mode: Mode,
pub bitrate_kbps: i32, pub bitrate_kbps: i32,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Stream { pub struct Stream {
pub id: uuid::Uuid, pub id: uuid::Uuid,
pub url: url::Url, pub url: url::Url,
pub game_session: u64, pub game_session: u64,
pub server_name: String, pub server_address: String,
pub input_crypto: InputCrypto, pub input_crypto: InputCrypto,
pub stream_config: StreamConfig, pub stream_config: StreamConfig,
@@ -85,16 +85,18 @@ pub struct Stream {
pub server_codec_mode_support: i32, pub server_codec_mode_support: i32,
} }
pub struct Server { pub struct Backend {
pub state: StateFile, pub state: StateFile,
pub streams: RwLock<HashMap<uuid::Uuid, Stream>>, pub streams: RwLock<HashMap<uuid::Uuid, Stream>>,
pub port: u16,
} }
impl Server { impl Backend {
pub fn new() -> Result<Self> { pub fn new(port: u16) -> Result<Self> {
Ok(Server { Ok(Backend {
state: StateFile::new()?, state: StateFile::new()?,
streams: RwLock::new(HashMap::new()), streams: RwLock::new(HashMap::new()),
port,
}) })
} }
} }
+64 -8
View File
@@ -3,11 +3,14 @@ use std::io::Write;
use std::path::PathBuf; use std::path::PathBuf;
use anyhow::Result; use anyhow::Result;
use openssl::ec::{EcGroup, EcKey};
use openssl::hash::MessageDigest; use openssl::hash::MessageDigest;
use openssl::nid::Nid;
use openssl::pkey::{PKey, Private}; use openssl::pkey::{PKey, Private};
use openssl::rsa::Rsa; use openssl::rsa::Rsa;
use openssl::x509::X509; use openssl::x509::{X509, X509NameBuilder};
use salvo::conn::rustls::{Keycert, RustlsConfig}; use salvo::conn::rustls::{Keycert, RustlsConfig};
use tracing::debug;
pub fn get_and_create_cert_dir() -> Result<PathBuf> { pub fn get_and_create_cert_dir() -> Result<PathBuf> {
let project_dirs = let project_dirs =
@@ -31,7 +34,7 @@ pub fn get_gamestream_cert_and_key() -> Result<(X509, PKey<Private>)> {
pub fn get_http_stream_config() -> Result<RustlsConfig> { pub fn get_http_stream_config() -> Result<RustlsConfig> {
let (cert, key) = match load_cert_and_key_from_disk("http-cert", "http-key") { let (cert, key) = match load_cert_and_key_from_disk("http-cert", "http-key") {
Ok((cert, key)) => (cert, key), Ok((cert, key)) => (cert, key),
Err(_) => generate_http_cert_and_key()?, Err(_) => generate_http_cert_and_key(false, "http-cert", "http-key")?,
}; };
Ok(RustlsConfig::new( Ok(RustlsConfig::new(
@@ -41,6 +44,34 @@ pub fn get_http_stream_config() -> Result<RustlsConfig> {
)) ))
} }
fn calculate_cert_hash(cert: &X509) -> Result<[u8; 32]> {
//fn calculate_cert_hash(cert: &X509) -> Result<String> {
let cert_hash = openssl::hash::hash(openssl::hash::MessageDigest::sha256(), &cert.to_der()?)?;
//Ok(hex::encode(cert_hash))
debug!("cert_hash: {}", hex::encode(cert_hash));
Ok(hmac_sha256::Hash::hash(&cert.to_der()?))
}
pub fn get_webtransport_stream_config(id: uuid::Uuid) -> Result<(RustlsConfig, [u8; 32])> {
//pub fn get_webtransport_stream_config(id: uuid::Uuid) -> Result<(RustlsConfig, String)> {
let (cert_filename, key_filename) = (&format!("{id}-cert"), &format!("{id}-key"));
let (cert, key) = match load_cert_and_key_from_disk(cert_filename, key_filename) {
Ok((cert, key)) => (cert, key),
Err(_) => generate_http_cert_and_key(true, cert_filename, key_filename)?,
};
let hash = calculate_cert_hash(&cert)?;
Ok((
RustlsConfig::new(
Keycert::new()
.cert(cert.to_pem()?)
.key(key.private_key_to_pem_pkcs8()?),
),
hash,
))
}
fn load_cert_and_key_from_disk( fn load_cert_and_key_from_disk(
cert_filename: &str, cert_filename: &str,
key_filename: &str, key_filename: &str,
@@ -59,9 +90,19 @@ fn load_cert_and_key_from_disk(
Ok((cert, key)) Ok((cert, key))
} }
fn generate_http_cert_and_key() -> Result<(X509, PKey<Private>)> { fn generate_http_cert_and_key(
let rsa = Rsa::generate(2048)?; web_transport: bool,
let key = PKey::from_rsa(rsa)?; cert_filename: &str,
key_filename: &str,
) -> Result<(X509, PKey<Private>)> {
let key = if web_transport {
let group = EcGroup::from_curve_name(Nid::X9_62_PRIME256V1)?;
let eckey = EcKey::generate(&group)?;
PKey::from_ec_key(eckey)?
} else {
let rsa = Rsa::generate(2048)?;
PKey::from_rsa(rsa)?
};
let mut cert_builder = X509::builder()?; let mut cert_builder = X509::builder()?;
@@ -70,16 +111,31 @@ fn generate_http_cert_and_key() -> Result<(X509, PKey<Private>)> {
.expect("Time went backwards") .expect("Time went backwards")
.as_secs(); .as_secs();
let now = openssl::asn1::Asn1Time::from_unix(now_unix as i64)?; let now = openssl::asn1::Asn1Time::from_unix(now_unix as i64)?;
let thirteen_days_from_now = openssl::asn1::Asn1Time::days_from_now(13)?;
let expiration_time = if web_transport {
openssl::asn1::Asn1Time::days_from_now(12)?
} else {
openssl::asn1::Asn1Time::days_from_now(365 * 5)?
};
cert_builder.set_version(2)?; cert_builder.set_version(2)?;
// Set subject (Distinguished Name)
let mut name_builder = X509NameBuilder::new()?;
name_builder.append_entry_by_text("CN", "mumble-web self-signed")?;
let subject_name = name_builder.build();
cert_builder.set_subject_name(&subject_name)?;
// Set issuer (same as subject for self-signed)
cert_builder.set_issuer_name(&subject_name)?;
cert_builder.set_not_before(&now)?; cert_builder.set_not_before(&now)?;
cert_builder.set_not_after(&thirteen_days_from_now)?; cert_builder.set_not_after(&expiration_time)?;
cert_builder.set_pubkey(&key)?; cert_builder.set_pubkey(&key)?;
cert_builder.sign(&key, MessageDigest::sha256())?; cert_builder.sign(&key, MessageDigest::sha256())?;
let cert = cert_builder.build(); let cert = cert_builder.build();
save_cert_and_key_to_disk(&cert, &key, "http-cert", "http-key")?; save_cert_and_key_to_disk(&cert, &key, cert_filename, key_filename)?;
Ok((cert, key)) Ok((cert, key))
} }
@@ -32,7 +32,7 @@ pub fn server_info(
}) })
} }
pub fn stream_config(stream: &crate::server::Stream) -> _STREAM_CONFIGURATION { pub fn stream_config(stream: &crate::backend::Stream) -> _STREAM_CONFIGURATION {
let (aes_key, aes_iv) = stream.input_crypto.as_stream_config_params(); let (aes_key, aes_iv) = stream.input_crypto.as_stream_config_params();
_STREAM_CONFIGURATION { _STREAM_CONFIGURATION {
@@ -10,7 +10,7 @@ pub struct GamestreamChannels {
} }
pub fn start_connection( pub fn start_connection(
stream: crate::server::Stream, stream: crate::backend::Stream,
address: &str, address: &str,
) -> Result<GamestreamChannels> { ) -> Result<GamestreamChannels> {
let mut server_info = config::server_info( let mut server_info = config::server_info(
+86 -18
View File
@@ -1,43 +1,82 @@
use anyhow::{Result, anyhow};
use salvo::logging::Logger; use salvo::logging::Logger;
use salvo::prelude::*; use salvo::prelude::*;
mod apps; mod apps;
mod backend;
mod certs; mod certs;
mod common; mod common;
mod gamestream; mod gamestream;
mod pair; mod pair;
mod server; mod proxy;
mod state; mod state;
mod stream; mod stream;
#[tokio::main] use salvo::serve_static::{StaticDir, static_embed};
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();
let server = server::Server::new()?; #[cfg(not(debug_assertions))]
let server_arc = std::sync::Arc::new(server); use rust_embed::RustEmbed;
// Only compile this in release builds
#[cfg(not(debug_assertions))]
#[derive(RustEmbed)]
#[folder = "webroot"]
struct Assets;
#[cfg(debug_assertions)]
fn create_static_handler() -> StaticDir {
// Debug build: serve live files from filesystem
StaticDir::new(["webroot"])
.defaults("index.html")
.auto_list(false)
}
#[cfg(not(debug_assertions))]
fn create_static_handler() -> impl Handler {
// Release build: serve embedded files
static_embed::<Assets>().fallback("index.html")
}
async fn run_backend(port: u16) -> Result<()> {
let backend = backend::Backend::new(port)?;
let backend_arc = std::sync::Arc::new(backend);
let router = Router::new() let router = Router::new()
.push(Router::with_path("pair").post(server_arc.post_pair())) .push(Router::with_path("api/pair").post(backend_arc.post_pair()))
.push(Router::with_path("apps").get(server_arc.get_apps())) .push(Router::with_path("api/apps").get(backend_arc.get_apps()))
.push(Router::with_path("stream/start").post(server_arc.post_stream_start())) .push(Router::with_path("api/stream/start").post(backend_arc.post_stream_start()))
.push( .push(Router::with_path("{*path}").get(create_static_handler()));
Router::with_path("stream/connect/{stream_id}").post(server_arc.post_stream_connect()),
);
let doc = OpenApi::new("test api", "0.0.1").merge_router(&router); let doc = OpenApi::new("test api", "0.0.1").merge_router(&router);
let router = router let router = router
.unshift(doc.into_router("/api-doc/openapi.json")) .unshift(doc.into_router("/api-doc/openapi.json"))
.unshift(SwaggerUi::new("/api-doc/openapi.json").into_router("/swagger-ui")); .unshift(SwaggerUi::new("/api-doc/openapi.json").into_router("/swagger-ui"));
let service = Service::new(router).hoop(Logger::new()); let service = Service::new(router).hoop(Logger::new());
//let config = RustlsConfig::new(Keycert::new().cert(cert.as_slice()).key(key.as_slice()));
let config = certs::get_http_stream_config()?; let config = certs::get_http_stream_config()?;
let listener = TcpListener::new("0.0.0.0:3000").rustls(config.clone()); let listener = TcpListener::new(("0.0.0.0", port))
let acceptor = QuinnListener::new(config, ("0.0.0.0", 5800)) .rustls(config.clone())
.bind()
.await;
salvo::Server::new(listener).serve(service).await;
Ok(())
}
async fn run_proxy(port: u16, stream_id: uuid::Uuid) -> Result<()> {
let (config, cert_hash) = certs::get_webtransport_stream_config(stream_id)?;
//let config = certs::get_http_stream_config()?;
//let cert_hash = [0; 32];
let proxy = proxy::Proxy::new(cert_hash);
let proxy_arc = std::sync::Arc::new(proxy);
let router = Router::new()
.push(Router::with_path("api/stream/setup").post(proxy_arc.stream_setup()))
.push(Router::with_path("api/stream/connect/").goal(proxy_arc.stream_connect()));
let service = Service::new(router).hoop(Logger::new());
let listener = TcpListener::new(("0.0.0.0", port)).rustls(config.clone());
let acceptor = QuinnListener::new(config, ("0.0.0.0", port))
.join(listener) .join(listener)
.bind() .bind()
.await; .await;
@@ -45,3 +84,32 @@ async fn main() -> anyhow::Result<()> {
Ok(()) Ok(())
} }
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();
let mode = std::env::args()
.nth(1)
.ok_or(anyhow!("Mode argument missing"))?;
let port = std::env::args()
.nth(2)
.ok_or(anyhow!("Port argument missing"))?
.parse::<u16>()?;
match mode.as_str() {
"backend" => run_backend(port).await,
"proxy" => {
let stream_id = uuid::Uuid::parse_str(
&std::env::args()
.nth(3)
.ok_or(anyhow!("Cert ID argument missing"))?,
)?;
run_proxy(port, stream_id).await
}
_ => Err(anyhow!("Unknown mode: {mode}")),
}
}
+1 -1
View File
@@ -291,7 +291,7 @@ async fn send_client_pairing_secret(
} }
#[craft] #[craft]
impl crate::server::Server { impl crate::backend::Backend {
#[craft(endpoint(status_codes( #[craft(endpoint(status_codes(
StatusCode::OK, StatusCode::OK,
StatusCode::BAD_REQUEST, StatusCode::BAD_REQUEST,
+143
View File
@@ -0,0 +1,143 @@
use salvo::prelude::*;
use serde::{Deserialize, Serialize};
use tokio::{select, sync::RwLock};
use tracing::{error, info};
use crate::common::{AppError, AppResult};
pub struct Proxy {
pub cert_hash: [u8; 32],
//pub cert_hash: String,
pub stream: RwLock<Option<crate::backend::Stream>>,
}
impl Proxy {
pub fn new(cert_hash: [u8; 32]) -> Self {
//pub fn new(cert_hash: String) -> Self {
Proxy {
stream: RwLock::new(None),
cert_hash,
}
}
}
#[derive(Serialize, Deserialize)]
pub struct ProxySetupParams {
pub stream: crate::backend::Stream,
}
#[derive(Serialize, Deserialize)]
pub struct ProxySetupResponse {
pub cert_hash: [u8; 32],
//pub cert_hash: String,
}
#[craft]
impl crate::proxy::Proxy {
#[craft(handler)]
pub async fn stream_setup(
self: ::std::sync::Arc<Self>,
body: salvo::oapi::extract::JsonBody<ProxySetupParams>,
) -> AppResult<Json<ProxySetupResponse>> {
let mut writer = self.stream.write().await;
*writer = Some(body.stream.clone());
info!("Configured proxy with config: {:?}", body.stream);
Ok(Json(ProxySetupResponse {
cert_hash: self.cert_hash,
}))
}
#[craft(handler)]
pub async fn stream_connect(self: ::std::sync::Arc<Self>, req: &mut Request) -> AppResult<()> {
let standard_error = Err(crate::common::AppError {
status_code: StatusCode::INTERNAL_SERVER_ERROR,
description: "Could not start stream".to_string(),
});
info!("WebTransport connection initiated");
let session = match req.web_transport_mut().await {
Ok(w) => w,
Err(e) => {
error!("Could not initalize WebTransport connection: {e}");
return Err(AppError {
status_code: StatusCode::BAD_REQUEST,
description: "User did not connect with a WebTransport connection".to_string(),
});
}
};
let stream = match self.stream.read().await.clone() {
Some(s) => s,
None => {
error!("Stream has not been configured, cannot connect to server");
return Err(AppError {
status_code: StatusCode::BAD_REQUEST,
description: "Proxy has not been configured yet: THIS IS A BUG".to_string(),
});
}
};
tracing::debug!(
"Connecting to stream at address {} with stream config {:?}",
stream.server_address,
stream
);
let (tx, rx) = tokio::sync::oneshot::channel();
let (stop_tx, stop_rx) = tokio::sync::oneshot::channel::<()>();
let host = stream.server_address.clone();
std::thread::spawn(move || {
let result = crate::gamestream::start_connection(stream, &host);
let _ = tx.send(result);
let _ = stop_rx.blocking_recv();
crate::gamestream::stop_connection();
});
let mut channels = match rx.await {
Ok(r) => match r {
Ok(r) => r,
Err(e) => {
error!("Could not get gamestream communication channels: {e}");
return standard_error;
}
},
Err(e) => {
error!("Could not start connection: {e}");
return standard_error;
}
};
loop {
select! {
recv = channels.decoder_rx.recv() => {
match recv {
Some(frame) => {
info!("Got decoder packet")
}
None => {
error!("Decoder channel is None");
break;
}
}
}
}
}
//// Handle bidirectional streams
//if let Ok(Some(webtransport::server::AcceptedBi::BidiStream(_, stream))) =
// session.accept_bi().await
//{
// let (send, recv) = salvo::proto::quic::BidiStream::split(stream);
// // Process bidirectional stream data
//}
Ok(())
}
}
+84 -115
View File
@@ -1,14 +1,12 @@
use std::thread;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use moonlight_common_c_sys::SCM_H264; use moonlight_common_c_sys::SCM_H264;
use salvo::prelude::*; use salvo::prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::{select, sync::oneshot};
use tracing::{debug, error, info}; use tracing::{debug, error, info};
use crate::{ use crate::{
common::{AppError, AppResult, get_url}, common::{AppError, AppResult, get_url},
proxy::ProxySetupResponse,
state::{GamestreamServer, StateReadAccess, StateReader}, state::{GamestreamServer, StateReadAccess, StateReader},
}; };
@@ -16,13 +14,15 @@ use crate::{
struct PostStreamStartParams { struct PostStreamStartParams {
server: String, server: String,
id: u64, id: u64,
stream_config: crate::server::StreamConfig, stream_config: crate::backend::StreamConfig,
server_mode: Option<crate::server::Mode>, server_mode: Option<crate::backend::Mode>,
} }
#[derive(Serialize, ToSchema)] #[derive(Serialize, ToSchema)]
struct PostStreamStartResponse { struct PostStreamStartResponse {
stream_id: uuid::Uuid, url: String,
cert_hash: [u8; 32],
//cert_hash: String,
} }
#[derive(Deserialize)] #[derive(Deserialize)]
@@ -59,12 +59,40 @@ fn get_server(reader: &StateReadAccess, server: &String) -> Result<Option<Gamest
Ok(servers.get(server).cloned()) Ok(servers.get(server).cloned())
} }
async fn setup_proxy(port: u16, stream: crate::backend::Stream) -> Result<ProxySetupResponse> {
let url = url_constructor::UrlConstructor::new()
.scheme("https")
.host("localhost")
.port(port)
.subdir("api/stream/setup")
.build();
let client = reqwest::Client::builder()
.danger_accept_invalid_certs(true)
.build()?;
//let resp = client.post(url).send().await?;
//let text = resp.text().await?;
//debug!(text);
//Ok(serde_json::from_str(&text)?)
Ok(client
.post(url)
.json(&crate::proxy::ProxySetupParams { stream })
.send()
.await?
.json::<ProxySetupResponse>()
.await?)
}
#[craft] #[craft]
impl crate::server::Server { impl crate::backend::Backend {
#[craft(endpoint(status_codes(StatusCode::OK, StatusCode::INTERNAL_SERVER_ERROR)))] #[craft(endpoint(status_codes(StatusCode::OK, StatusCode::INTERNAL_SERVER_ERROR)))]
pub async fn post_stream_start( pub async fn post_stream_start(
self: ::std::sync::Arc<Self>, self: ::std::sync::Arc<Self>,
body: salvo::oapi::extract::JsonBody<PostStreamStartParams>, body: salvo::oapi::extract::JsonBody<PostStreamStartParams>,
req: &mut Request,
) -> AppResult<Json<PostStreamStartResponse>> { ) -> AppResult<Json<PostStreamStartResponse>> {
let standard_error = Err(crate::common::AppError { let standard_error = Err(crate::common::AppError {
status_code: StatusCode::INTERNAL_SERVER_ERROR, status_code: StatusCode::INTERNAL_SERVER_ERROR,
@@ -119,7 +147,7 @@ impl crate::server::Server {
//"%s://%s:%d/serverinfo?uniqueid=%s&uuid=%s", //"%s://%s:%d/serverinfo?uniqueid=%s&uuid=%s",
let input_crypto = match crate::server::InputCrypto::new() { let input_crypto = match crate::backend::InputCrypto::new() {
Ok(i) => i, Ok(i) => i,
Err(e) => { Err(e) => {
error!("Could not create input crypto: {e}"); error!("Could not create input crypto: {e}");
@@ -204,11 +232,11 @@ impl crate::server::Server {
server_info.ServerCodecModeSupport server_info.ServerCodecModeSupport
}; };
let stream = crate::server::Stream { let stream = crate::backend::Stream {
id: stream_id, id: stream_id,
url: launch_response.session_url_0, url: launch_response.session_url_0,
game_session: launch_response.game_session, game_session: launch_response.game_session,
server_name: server.name.clone(), server_address: server.host.clone(),
stream_config: body.stream_config.clone(), stream_config: body.stream_config.clone(),
app_version: server_info.appversion, app_version: server_info.appversion,
gfe_version: server_info.GfeVersion, gfe_version: server_info.GfeVersion,
@@ -220,123 +248,64 @@ impl crate::server::Server {
"Launched stream {stream_id} on {} with config {stream:?}", "Launched stream {stream_id} on {} with config {stream:?}",
server.name server.name
); );
(*self.streams.write().await).insert(stream.id, stream); let mut writer = self.streams.write().await;
(*writer).insert(stream.id, stream.clone());
let post_stream_response = PostStreamStartResponse { stream_id }; let port = self.port + <u16>::try_from((*writer).len()).unwrap();
Ok(Json(post_stream_response)) // Spawn WebTransport proxy
} let binary_path = match std::env::current_exe() {
Ok(b) => b,
#[craft(endpoint(status_codes(StatusCode::OK, StatusCode::INTERNAL_SERVER_ERROR)))]
pub async fn post_stream_connect(
self: ::std::sync::Arc<Self>,
stream_id: salvo::oapi::extract::PathParam<uuid::Uuid>,
req: &mut Request,
) -> AppResult<()> {
let standard_error = Err(crate::common::AppError {
status_code: StatusCode::INTERNAL_SERVER_ERROR,
description: "Could not start stream".to_string(),
});
let session = match req.web_transport_mut().await {
Ok(w) => w,
Err(e) => { Err(e) => {
error!("Could not initalize WebTransport connection: {e}"); error!("Could not get binary path to spawn proxy: {e}");
return Err(AppError {
status_code: StatusCode::BAD_REQUEST,
description: "User did not connect with a WebTransport connection".to_string(),
});
}
};
let stream_id = stream_id.into_inner();
let stream = match self.streams.read().await.get(&stream_id).cloned() {
Some(s) => s,
None => {
error!("Could not find stream with id {stream_id}");
return Err(AppError {
status_code: StatusCode::BAD_REQUEST,
description: "No stream with given ID".to_string(),
});
}
};
let servers = match self.state.read().await.servers() {
Ok(s) => s,
Err(e) => {
error!("Could not get servers: {e}");
return standard_error; return standard_error;
} }
}; };
info!(
let server = match servers.get(&stream.server_name) { "Spawning proxy process for stream {} on port {}",
Some(s) => s, stream_id, port
None => {
error!(
"Could not find server {} pointed to by stream {}",
stream.server_name, stream.id
);
return standard_error;
}
};
debug!(
"Connecting to stream on server {} with stream config {:?}",
server.name, stream
); );
match tokio::process::Command::new(binary_path)
let (tx, rx) = oneshot::channel(); .args(["proxy", &port.to_string(), &stream_id.to_string()])
let (stop_tx, stop_rx) = oneshot::channel::<()>(); .spawn()
{
let host = server.host.clone(); Ok(_) => (),
thread::spawn(move || {
let result = crate::gamestream::start_connection(stream, &host);
let _ = tx.send(result);
let _ = stop_rx.blocking_recv();
crate::gamestream::stop_connection();
});
let mut channels = match rx.await {
Ok(r) => match r {
Ok(r) => r,
Err(e) => {
error!("Could not get gamestream communication channels: {e}");
return standard_error;
}
},
Err(e) => { Err(e) => {
error!("Could not start connection: {e}"); error!("Failed to spawn proxy process: {e}");
return standard_error; return standard_error;
} }
};
loop {
select! {
recv = channels.decoder_rx.recv() => {
match recv {
Some(frame) => {
info!("Got decoder packet")
}
None => {
error!("Decoder channel is None");
break;
}
}
}
}
} }
//// Handle bidirectional streams tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
//if let Ok(Some(webtransport::server::AcceptedBi::BidiStream(_, stream))) =
// session.accept_bi().await
//{
// let (send, recv) = salvo::proto::quic::BidiStream::split(stream);
// // Process bidirectional stream data
//}
Ok(()) let setup_resp = match setup_proxy(port, stream).await {
Ok(r) => r,
Err(e) => {
error!("Could not setup proxy: {e}");
return standard_error;
}
};
let host = match req.uri_mut().host() {
Some(h) => h,
None => {
error!("Request URI does not have a host");
return standard_error;
}
};
let webtransport_url = url_constructor::UrlConstructor::new()
.scheme("https")
.host(host)
.port(port)
.subdir("api/stream/connect")
.build();
let post_stream_response = PostStreamStartResponse {
url: webtransport_url,
cert_hash: setup_resp.cert_hash,
};
Ok(Json(post_stream_response))
} }
} }
@@ -0,0 +1,125 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Game Streaming App</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
background-color: #1a1a1a;
color: white;
}
.apps-container {
display: flex;
flex-wrap: wrap;
gap: 20px;
padding: 20px 0;
}
.app-box {
position: relative;
width: 200px;
height: 280px;
cursor: pointer;
transition: all 0.3s ease;
}
.app-box:hover {
transform: translateY(-5px);
box-shadow: 0 10px 20px rgba(255, 255, 255, 0.1);
}
.app-artwork {
width: 200px;
height: 200px;
background-color: #333;
border: 2px solid #555;
border-radius: 8px;
position: relative;
overflow: hidden;
transition: border-color 0.3s ease;
}
.app-box:hover .app-artwork {
border-color: #00aaff;
}
.app-title {
text-align: center;
margin: 10px 0 5px 0;
font-size: 16px;
font-weight: bold;
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
}
.app-server {
text-align: center;
font-size: 12px;
color: #aaa;
margin: 0;
}
.play-button {
position: absolute;
top: 50%;
left: 50%;
transform: translate(-50%, -50%);
width: 60px;
height: 60px;
background-color: rgba(0, 170, 255, 0.9);
border-radius: 50%;
display: flex;
align-items: center;
justify-content: center;
opacity: 0;
transition: opacity 0.3s ease;
pointer-events: none;
}
.play-button::after {
content: '';
width: 0;
height: 0;
border-left: 20px solid white;
border-top: 12px solid transparent;
border-bottom: 12px solid transparent;
margin-left: 4px;
}
.app-box:hover .play-button {
opacity: 1;
}
.app-box.clicked {
transform: scale(0.95);
}
.loading {
text-align: center;
font-size: 18px;
margin-top: 50px;
}
.error {
color: #ff4444;
text-align: center;
font-size: 18px;
margin-top: 50px;
}
</style>
</head>
<body>
<h1>Game Streaming Platform</h1>
<div id="content">
<div class="loading">Loading applications...</div>
</div>
<script src="index.js"></script>
</body>
</html>
@@ -0,0 +1,205 @@
class GameStreamingApp {
constructor() {
this.apps = [];
this.init();
}
async init() {
try {
await this.loadApps();
this.renderApps();
} catch (error) {
this.showError('Failed to load applications: ' + error.message);
}
}
async loadApps() {
try {
const response = await fetch('/api/apps');
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const data = await response.json();
this.apps = this.processAppsData(data);
} catch (error) {
console.error('Error loading apps:', error);
throw error;
}
}
processAppsData(data) {
const apps = [];
if (data.apps) {
Object.keys(data.apps).forEach(serverName => {
data.apps[serverName].forEach(app => {
apps.push({
...app,
server: serverName
});
});
});
}
return apps;
}
renderApps() {
const contentDiv = document.getElementById('content');
if (this.apps.length === 0) {
contentDiv.innerHTML = '<div class="error">No applications found.</div>';
return;
}
const appsContainer = document.createElement('div');
appsContainer.className = 'apps-container';
this.apps.forEach(app => {
const appBox = this.createAppBox(app);
appsContainer.appendChild(appBox);
});
contentDiv.innerHTML = '';
contentDiv.appendChild(appsContainer);
}
createAppBox(app) {
const appBox = document.createElement('div');
appBox.className = 'app-box';
appBox.dataset.id = app.id;
appBox.dataset.server = app.server;
appBox.innerHTML = `
<div class="app-artwork">
<div class="play-button"></div>
</div>
<div class="app-title">${app.title}</div>
<div class="app-server">${app.server}</div>
`;
// Add click event listener
appBox.addEventListener('click', (e) => this.handleAppClick(e, app));
// Add click animation
appBox.addEventListener('mousedown', () => {
appBox.classList.add('clicked');
});
appBox.addEventListener('mouseup', () => {
setTimeout(() => {
appBox.classList.remove('clicked');
}, 150);
});
appBox.addEventListener('mouseleave', () => {
appBox.classList.remove('clicked');
});
return appBox;
}
async handleAppClick(event, app) {
event.preventDefault();
try {
console.log(`Starting stream for ${app.title} on ${app.server}`);
// Create the POST request payload
const payload = {
id: app.id,
server: app.server,
server_mode: {
fps: 60,
height: 1280,
width: 720
},
stream_config: {
bitrate_kbps: 5120,
mode: {
fps: 60,
height: 1280,
width: 720
}
}
};
// Make POST request to start stream
const response = await fetch('/api/stream/start', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(payload)
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const streamData = await response.json();
console.log('Stream started:', streamData);
if (streamData.url && streamData.cert_hash) {
await this.connectToStream(streamData.url, streamData.cert_hash);
} else {
throw new Error('Response was missing required parameters');
}
} catch (error) {
console.error('Error starting stream:', error);
alert('Failed to start stream: ' + error.message);
}
}
async connectToStream(url, cert_hash) {
const buffer = new Uint8Array(cert_hash);
console.log('Hash: ', buffer);
try {
console.log(`Connecting to stream`);
// Check if WebTransport is supported
if (!window.WebTransport) {
throw new Error('WebTransport is not supported in this browser');
}
//const url = new URL();
const transport = new WebTransport(url, {
serverCertificateHashes: [
{
algorithm: "sha-256",
value: buffer,
}
]
});
console.log('Connecting to WebTransport at ', url);
// Wait for the connection to be ready
await transport.ready;
console.log('WebTransport connection established');
// Handle connection close
transport.closed.then(() => {
console.log('WebTransport connection closed');
}).catch((error) => {
console.error('WebTransport connection closed with error:', error);
});
// You can add more WebTransport handling logic here
// For example, handling incoming streams, sending data, etc.
} catch (error) {
console.error('Error connecting to stream:', error);
alert('Failed to connect to stream: ' + error.message);
}
}
showError(message) {
const contentDiv = document.getElementById('content');
contentDiv.innerHTML = `<div class="error">${message}</div>`;
}
}
// Initialize the app when the page loads
document.addEventListener('DOMContentLoaded', () => {
new GameStreamingApp();
});