From 188005ab1178e3e5d18be729a169f0faf0107275 Mon Sep 17 00:00:00 2001 From: restitux Date: Sun, 20 Jul 2025 01:13:28 -0600 Subject: [PATCH] backend: add support for receiving a gamestream stream --- Cargo.lock | 91 +++++ gamestream-webtransport-proxy/Cargo.toml | 3 +- gamestream-webtransport-proxy/src/apps.rs | 4 +- gamestream-webtransport-proxy/src/certs.rs | 75 +++- gamestream-webtransport-proxy/src/common.rs | 2 +- .../src/gamestream/config.rs | 72 ++++ .../src/gamestream/decoder.rs | 278 ++++++++++++++ .../src/gamestream/mod.rs | 49 +++ gamestream-webtransport-proxy/src/main.rs | 119 +----- gamestream-webtransport-proxy/src/pair.rs | 2 +- gamestream-webtransport-proxy/src/server.rs | 90 ++++- gamestream-webtransport-proxy/src/state.rs | 2 +- gamestream-webtransport-proxy/src/stream.rs | 342 ++++++++++++++++++ 13 files changed, 1003 insertions(+), 126 deletions(-) create mode 100644 gamestream-webtransport-proxy/src/gamestream/config.rs create mode 100644 gamestream-webtransport-proxy/src/gamestream/decoder.rs create mode 100644 gamestream-webtransport-proxy/src/gamestream/mod.rs create mode 100644 gamestream-webtransport-proxy/src/stream.rs diff --git a/Cargo.lock b/Cargo.lock index a90f797..a40eae3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -572,6 +572,21 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.31" @@ -579,6 +594,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -587,6 +603,17 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.31" @@ -622,6 +649,7 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -653,6 +681,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "url", "url-constructor", "uuid", ] @@ -735,6 +764,46 @@ dependencies = [ "tracing", ] +[[package]] +name = "h3" +version = "0.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10872b55cfb02a821b69dc7cf8dc6a71d6af25eb9a79662bec4a9d016056b3be" +dependencies = [ + "bytes", + "fastrand", + "futures-util", + "http", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "h3-datagram" +version = "0.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d2c9f77921668673721ae40f17c729fc48b9e38a663858097cea547484fdf0f" +dependencies = [ + "bytes", + "h3", + "pin-project-lite", +] + +[[package]] +name = "h3-quinn" +version = "0.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2e732c8d91a74731663ac8479ab505042fbf547b9a207213ab7fbcbfc4f8b4" +dependencies = [ + "bytes", + "futures", + "h3", + "h3-datagram", + "quinn", + "tokio", + "tokio-util", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -1617,6 +1686,7 @@ checksum = "c3bd15a6f2967aef83887dcb9fec0014580467e33720d073560cf015a5683012" dependencies = [ "bytes", "cfg_aliases", + "futures-io", "pin-project-lite", "quinn-proto", "quinn-udp", @@ -2071,6 +2141,23 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "salvo-http3" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8dc5ede68d4df95dbe7af4483438e739f0e8b41c679615ca864559434f24d07a" +dependencies = [ + "bytes", + "futures-util", + "h3", + "h3-datagram", + "h3-quinn", + "http", + "pin-project-lite", + "tokio", + "tracing", +] + [[package]] name = "salvo-jwt-auth" version = "0.80.0" @@ -2184,6 +2271,7 @@ dependencies = [ "form_urlencoded", "futures-channel", "futures-util", + "h3-datagram", "headers", "http", "http-body-util", @@ -2199,9 +2287,11 @@ dependencies = [ "parking_lot", "percent-encoding", "pin-project", + "quinn", "rand 0.9.1", "regex", "rustls-pemfile", + "salvo-http3", "salvo_macros", "serde", "serde-xml-rs", @@ -2962,6 +3052,7 @@ dependencies = [ "form_urlencoded", "idna", "percent-encoding", + "serde", ] [[package]] diff --git a/gamestream-webtransport-proxy/Cargo.toml b/gamestream-webtransport-proxy/Cargo.toml index 5075d2d..06824a2 100644 --- a/gamestream-webtransport-proxy/Cargo.toml +++ b/gamestream-webtransport-proxy/Cargo.toml @@ -16,12 +16,13 @@ reqwest = { version = "0.12.20", features = [ "rustls-tls", "native-tls", ], default-features = false } -salvo = { version = "0.80.0", features = ["oapi", "craft", "logging"] } +salvo = { version = "0.80.0", features = ["oapi", "craft", "logging", "quinn"] } serde = { version = "1.0.219", features = ["serde_derive"] } serde-xml-rs = "0.8.1" serde_json = "1.0.140" tokio = { version = "1.45.1", features = ["full"] } tracing = "0.1.41" tracing-subscriber = "0.3.19" +url = { version = "2.5.4", features = ["serde"] } url-constructor = "0.1.0" uuid = { version = "1.17.0", features = ["v4", "serde"] } diff --git a/gamestream-webtransport-proxy/src/apps.rs b/gamestream-webtransport-proxy/src/apps.rs index abe0050..359cc9d 100644 --- a/gamestream-webtransport-proxy/src/apps.rs +++ b/gamestream-webtransport-proxy/src/apps.rs @@ -10,8 +10,8 @@ use crate::{common::AppResult, state::StateReader}; struct AppListRespApp { #[serde(rename = "AppTitle")] app_title: String, - #[serde(rename = "UUID")] - uuid: uuid::Uuid, + //#[serde(rename = "UUID")] + //uuid: uuid::Uuid, #[serde(rename = "IsHdrSupported")] is_hdr_supported: bool, #[serde(rename = "ID")] diff --git a/gamestream-webtransport-proxy/src/certs.rs b/gamestream-webtransport-proxy/src/certs.rs index 41b5a22..77467ea 100644 --- a/gamestream-webtransport-proxy/src/certs.rs +++ b/gamestream-webtransport-proxy/src/certs.rs @@ -7,6 +7,7 @@ use openssl::hash::MessageDigest; use openssl::pkey::{PKey, Private}; use openssl::rsa::Rsa; use openssl::x509::X509; +use salvo::conn::rustls::{Keycert, RustlsConfig}; pub fn get_and_create_cert_dir() -> Result { let project_dirs = @@ -19,19 +20,35 @@ pub fn get_and_create_cert_dir() -> Result { Ok(cert_dir) } -pub fn get_cert_and_key() -> Result<(X509, PKey)> { - if let Ok((cert, key)) = load_cert_and_key_from_disk() { +pub fn get_gamestream_cert_and_key() -> Result<(X509, PKey)> { + if let Ok((cert, key)) = load_cert_and_key_from_disk("gamestream-cert", "gamestream-key") { Ok((cert, key)) } else { - generate_cert_and_key() + generate_gamestream_cert_and_key() } } -pub fn load_cert_and_key_from_disk() -> Result<(X509, PKey)> { +pub fn get_http_stream_config() -> Result { + let (cert, key) = match load_cert_and_key_from_disk("http-cert", "http-key") { + Ok((cert, key)) => (cert, key), + Err(_) => generate_http_cert_and_key()?, + }; + + Ok(RustlsConfig::new( + Keycert::new() + .cert(cert.to_pem()?) + .key(key.private_key_to_pem_pkcs8()?), + )) +} + +fn load_cert_and_key_from_disk( + cert_filename: &str, + key_filename: &str, +) -> Result<(X509, PKey)> { let cert_dir = get_and_create_cert_dir()?; - let cert_filepath = cert_dir.join("cert"); - let key_filepath = cert_dir.join("key"); + let cert_filepath = cert_dir.join(cert_filename); + let key_filepath = cert_dir.join(key_filename); let cert_bytes = fs::read(cert_filepath)?; let key_bytes = fs::read(key_filepath)?; @@ -42,7 +59,32 @@ pub fn load_cert_and_key_from_disk() -> Result<(X509, PKey)> { Ok((cert, key)) } -pub fn generate_cert_and_key() -> Result<(X509, PKey)> { +fn generate_http_cert_and_key() -> Result<(X509, PKey)> { + let rsa = Rsa::generate(2048)?; + let key = PKey::from_rsa(rsa)?; + + let mut cert_builder = X509::builder()?; + + let now_unix = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("Time went backwards") + .as_secs(); + let now = openssl::asn1::Asn1Time::from_unix(now_unix as i64)?; + let thirteen_days_from_now = openssl::asn1::Asn1Time::days_from_now(13)?; + + cert_builder.set_version(2)?; + cert_builder.set_not_before(&now)?; + cert_builder.set_not_after(&thirteen_days_from_now)?; + cert_builder.set_pubkey(&key)?; + cert_builder.sign(&key, MessageDigest::sha256())?; + let cert = cert_builder.build(); + + save_cert_and_key_to_disk(&cert, &key, "http-cert", "http-key")?; + + Ok((cert, key)) +} + +fn generate_gamestream_cert_and_key() -> Result<(X509, PKey)> { let rsa = Rsa::generate(2048)?; let key = PKey::from_rsa(rsa)?; @@ -73,15 +115,20 @@ pub fn generate_cert_and_key() -> Result<(X509, PKey)> { cert_builder.sign(&key, MessageDigest::sha256())?; let cert = cert_builder.build(); - save_cert_and_key_to_disk(&cert, &key)?; + save_cert_and_key_to_disk(&cert, &key, "gamestream-cert", "gamestream-key")?; Ok((cert, key)) } -pub fn save_cert_and_key_to_disk(cert: &X509, key: &PKey) -> Result<()> { +fn save_cert_and_key_to_disk( + cert: &X509, + key: &PKey, + cert_filename: &str, + key_filename: &str, +) -> Result<()> { let cert_dir = get_and_create_cert_dir()?; - let cert_filepath = cert_dir.join("cert"); - let key_filepath = cert_dir.join("key"); + let cert_filepath = cert_dir.join(cert_filename); + let key_filepath = cert_dir.join(key_filename); let mut cert_file_builder = std::fs::OpenOptions::new(); cert_file_builder.create(true); @@ -110,10 +157,10 @@ pub fn save_cert_and_key_to_disk(cert: &X509, key: &PKey) -> Result<()> Ok(()) } -pub fn identity() -> Result { +pub fn gamestream_identity() -> Result { let cert_dir = get_and_create_cert_dir()?; - let cert_filepath = cert_dir.join("cert"); - let key_filepath = cert_dir.join("key"); + let cert_filepath = cert_dir.join("gamestream-cert"); + let key_filepath = cert_dir.join("gamestream-key"); let cert_bytes = fs::read(cert_filepath)?; let key_bytes = fs::read(key_filepath)?; diff --git a/gamestream-webtransport-proxy/src/common.rs b/gamestream-webtransport-proxy/src/common.rs index 440a733..b040102 100644 --- a/gamestream-webtransport-proxy/src/common.rs +++ b/gamestream-webtransport-proxy/src/common.rs @@ -84,7 +84,7 @@ pub async fn get_url( http_builder = http_builder.user_agent("Mozilla/5.0"); http_builder = http_builder.danger_accept_invalid_certs(true); if with_identity { - let identity = crate::certs::identity()?; + let identity = crate::certs::gamestream_identity()?; http_builder = http_builder.identity(identity); } diff --git a/gamestream-webtransport-proxy/src/gamestream/config.rs b/gamestream-webtransport-proxy/src/gamestream/config.rs new file mode 100644 index 0000000..57e76bd --- /dev/null +++ b/gamestream-webtransport-proxy/src/gamestream/config.rs @@ -0,0 +1,72 @@ +use anyhow::Result; +use std::ffi::CString; + +use moonlight_common_c_sys::{ + _SERVER_INFORMATION, _STREAM_CONFIGURATION, COLOR_RANGE_LIMITED, COLORSPACE_REC_601, + CONNECTION_LISTENER_CALLBACKS, ENCFLG_NONE, SCM_H264, STREAM_CFG_AUTO, STREAM_CFG_LOCAL, + VIDEO_FORMAT_H264, VIDEO_FORMAT_H265, +}; + +unsafe extern "C" { + #[allow(unused)] + fn printf(format: *const i8, ...); +} + +extern "C" fn conn_listener_stage_starting(stage: std::os::raw::c_int) { + println!("Stage starting: {}", stage); +} + +pub fn server_info( + stream_url: &url::Url, + app_version: &str, + gfe_version: &str, + address: &str, + server_codec_mode_support: i32, +) -> Result<_SERVER_INFORMATION> { + Ok(_SERVER_INFORMATION { + address: CString::new(address)?.into_raw(), + serverInfoAppVersion: CString::new(app_version)?.into_raw(), + serverInfoGfeVersion: CString::new(gfe_version)?.into_raw(), + rtspSessionUrl: CString::new(stream_url.as_str())?.into_raw(), + serverCodecModeSupport: server_codec_mode_support, + }) +} + +pub fn stream_config(stream: &crate::server::Stream) -> _STREAM_CONFIGURATION { + let (aes_key, aes_iv) = stream.input_crypto.as_stream_config_params(); + + _STREAM_CONFIGURATION { + width: stream.stream_config.mode.width, + height: stream.stream_config.mode.height, + fps: stream.stream_config.mode.fps, + bitrate: stream.stream_config.bitrate_kbps, + packetSize: 512, + streamingRemotely: STREAM_CFG_AUTO, + audioConfiguration: (0x3 << 16) | (2 << 8) | 0xCA, + supportedVideoFormats: VIDEO_FORMAT_H265, + clientRefreshRateX100: 0, + colorSpace: COLORSPACE_REC_601, + colorRange: COLOR_RANGE_LIMITED, + encryptionFlags: ENCFLG_NONE, + remoteInputAesKey: aes_key, + remoteInputAesIv: aes_iv, + } +} + +pub fn listener_callbacks() -> CONNECTION_LISTENER_CALLBACKS { + CONNECTION_LISTENER_CALLBACKS { + stageStarting: None, + stageComplete: None, + stageFailed: None, + connectionStarted: None, + connectionTerminated: None, + logMessage: Some(printf), + rumble: None, + connectionStatusUpdate: None, + setHdrMode: None, + rumbleTriggers: None, + setMotionEventState: None, + setControllerLED: None, + setAdaptiveTriggers: None, + } +} diff --git a/gamestream-webtransport-proxy/src/gamestream/decoder.rs b/gamestream-webtransport-proxy/src/gamestream/decoder.rs new file mode 100644 index 0000000..7893a49 --- /dev/null +++ b/gamestream-webtransport-proxy/src/gamestream/decoder.rs @@ -0,0 +1,278 @@ +use std::{slice, sync::RwLock}; + +use anyhow::{Result, anyhow}; +use moonlight_common_c_sys::{ + _DECODE_UNIT, _LENTRY, CAPABILITY_DIRECT_SUBMIT, DECODER_RENDERER_CALLBACKS, FRAME_TYPE_PFRAME, + PDECODE_UNIT, VIDEO_FORMAT_H264_HIGH8_444, VIDEO_FORMAT_H265, VIDEO_FORMAT_H265_MAIN10, + VIDEO_FORMAT_H265_REXT8_444, VIDEO_FRAME_HANDLE, +}; +use salvo::{http::body::Frame, hyper::body::Buf}; +use tokio::sync::mpsc; +use tracing::{debug, error}; + +enum FrameType { + PFRAME, + IDR, +} + +impl TryFrom for FrameType { + type Error = anyhow::Error; + + fn try_from(val: i32) -> Result { + Ok(match val { + moonlight_common_c_sys::FRAME_TYPE_PFRAME => FrameType::PFRAME, + moonlight_common_c_sys::FRAME_TYPE_IDR => FrameType::IDR, + val => { + return Err(anyhow!("Unknown frame type {val}")); + } + }) + } +} + +enum VideoFormat { + H264, + H264_HIGH8_444, + H265, + H265_MAIN10, + H265_REXT8_444, + H265_REXT10_444, + AV1_MAIN8, + AV1_MAIN10, + AV1_HIGH8_444, + AV1_HIGH10_444, +} + +impl TryFrom for VideoFormat { + type Error = anyhow::Error; + + fn try_from(val: i32) -> Result { + Ok(match val { + moonlight_common_c_sys::VIDEO_FORMAT_H264 => VideoFormat::H264, + moonlight_common_c_sys::VIDEO_FORMAT_H264_HIGH8_444 => VideoFormat::H264_HIGH8_444, + moonlight_common_c_sys::VIDEO_FORMAT_H265 => VideoFormat::H265, + moonlight_common_c_sys::VIDEO_FORMAT_H265_MAIN10 => VideoFormat::H265_MAIN10, + moonlight_common_c_sys::VIDEO_FORMAT_H265_REXT8_444 => VideoFormat::H265_REXT8_444, + moonlight_common_c_sys::VIDEO_FORMAT_H265_REXT10_444 => VideoFormat::H265_REXT10_444, + moonlight_common_c_sys::VIDEO_FORMAT_AV1_MAIN8 => VideoFormat::AV1_MAIN8, + moonlight_common_c_sys::VIDEO_FORMAT_AV1_MAIN10 => VideoFormat::AV1_MAIN10, + moonlight_common_c_sys::VIDEO_FORMAT_AV1_HIGH8_444 => VideoFormat::AV1_HIGH8_444, + moonlight_common_c_sys::VIDEO_FORMAT_AV1_HIGH10_444 => VideoFormat::AV1_HIGH10_444, + val => { + return Err(anyhow!("Unknown video format {val}")); + } + }) + } +} + +enum BufferType { + PICDATA, + SPS, + PPS, + VPS, +} + +impl TryFrom for BufferType { + type Error = anyhow::Error; + + fn try_from(val: i32) -> Result { + Ok(match val { + moonlight_common_c_sys::BUFFER_TYPE_PICDATA => BufferType::PICDATA, + moonlight_common_c_sys::BUFFER_TYPE_SPS => BufferType::SPS, + moonlight_common_c_sys::BUFFER_TYPE_PPS => BufferType::PPS, + moonlight_common_c_sys::BUFFER_TYPE_VPS => BufferType::VPS, + _ => { + return Err(anyhow!("Unknown buffer type {val}")); + } + }) + } +} + +struct Buffer { + data: Vec, + buffer_type: BufferType, +} + +impl TryFrom<_LENTRY> for Buffer { + type Error = anyhow::Error; + + fn try_from(val: _LENTRY) -> Result { + let length = ::try_from(val.length)?; + let data = unsafe { slice::from_raw_parts(val.data as *const u8, length) }.to_vec(); + + let buffer_type = BufferType::try_from(val.bufferType)?; + + Ok(Buffer { data, buffer_type }) + } +} + +pub enum RendererMessage { + Setup { + video_format: VideoFormat, + width: u64, + height: u64, + redraw_rate: u64, + dr_flags: i32, + }, + DecodeUnit { + frame_number: u64, + frame_type: FrameType, + + host_processing_latency: u16, + receieve_time_ms: u64, + enqueue_time_ms: u64, + presentation_time: u64, + + full_length: usize, + buffers: Vec, + + hdr_active: bool, + colorspace: u8, + }, +} + +impl RendererMessage { + fn from_setup_cb_args( + video_format: std::os::raw::c_int, + width: std::os::raw::c_int, + height: std::os::raw::c_int, + redraw_rate: std::os::raw::c_int, + dr_flags: std::os::raw::c_int, + ) -> Result { + Ok(RendererMessage::Setup { + video_format: VideoFormat::try_from(video_format)?, + width: ::try_from(width)?, + height: ::try_from(height)?, + redraw_rate: ::try_from(redraw_rate)?, + dr_flags, + }) + } + + fn from_decode_unit(decode_unit: _DECODE_UNIT) -> Result { + let mut buffers = Vec::new(); + + if decode_unit.bufferList.is_null() { + return Err(anyhow!("DecodeUnit bufferList is null")); + } + let mut next = unsafe { *decode_unit.bufferList }; + + loop { + if next.next.is_null() { + break; + } + let buffer = Buffer::try_from(next)?; + + buffers.push(buffer); + + next = unsafe { *next.next }; + } + + Ok(RendererMessage::DecodeUnit { + frame_number: ::try_from(decode_unit.frameNumber)?, + frame_type: FrameType::try_from(decode_unit.frameType)?, + host_processing_latency: decode_unit.frameHostProcessingLatency, + receieve_time_ms: decode_unit.receiveTimeMs, + enqueue_time_ms: decode_unit.enqueueTimeMs, + presentation_time: decode_unit.presentationTimeMs as u64, + full_length: ::try_from(decode_unit.fullLength)?, + buffers, + hdr_active: decode_unit.hdrActive, + colorspace: decode_unit.colorspace, + }) + } +} + +static DECODER_SENDER: RwLock>> = RwLock::new(None); + +fn send_message(msg: RendererMessage) -> i32 { + let read_guard = match DECODER_SENDER.read() { + Ok(r) => r, + Err(e) => { + error!("Could not lock RendererMessage RwLock: {e}"); + return -1; + } + }; + + match read_guard.as_ref() { + Some(sender) => match sender.blocking_send(msg) { + Ok(()) => 0, + Err(e) => { + error!("Could not send to RendererMessage channel: {e}"); + -1 + } + }, + None => { + error!("RendererMessage sender was not initalized"); + -1 + } + } +} + +extern "C" fn setup_cb( + video_format: std::os::raw::c_int, + width: std::os::raw::c_int, + height: std::os::raw::c_int, + redraw_rate: std::os::raw::c_int, + _context: *mut std::os::raw::c_void, + dr_flags: std::os::raw::c_int, +) -> std::os::raw::c_int { + debug!("SETUP CB"); + let msg = match RendererMessage::from_setup_cb_args( + video_format, + width, + height, + redraw_rate, + dr_flags, + ) { + Ok(m) => m, + Err(e) => { + error!("Cannot construct RendererMessage: {e}"); + return -1; + } + }; + + send_message(msg) +} + +extern "C" fn start_cb() { + debug!("START CB"); +} + +extern "C" fn submit_decode_unit_cb(decode_unit: PDECODE_UNIT) -> std::os::raw::c_int { + debug!("SUBMIT DECODE UNIT CB"); + if decode_unit.is_null() { + error!("Decode unit pointer was null"); + return -1; + } + let decode_unit = unsafe { *decode_unit }; + + let msg = match RendererMessage::from_decode_unit(decode_unit) { + Ok(m) => m, + Err(e) => { + error!("Cannot construct RendererMessage: {e}"); + return -1; + } + }; + + send_message(msg) +} + +pub fn decoder_callbacks() -> Result<(DECODER_RENDERER_CALLBACKS, mpsc::Receiver)> +{ + let (tx, rx) = mpsc::channel(100); + + let mut writer = DECODER_SENDER.write().unwrap(); + + *writer = Some(tx); + + Ok(( + DECODER_RENDERER_CALLBACKS { + setup: Some(setup_cb), + start: Some(start_cb), + stop: None, + cleanup: None, + submitDecodeUnit: Some(submit_decode_unit_cb), + capabilities: CAPABILITY_DIRECT_SUBMIT, + }, + rx, + )) +} diff --git a/gamestream-webtransport-proxy/src/gamestream/mod.rs b/gamestream-webtransport-proxy/src/gamestream/mod.rs new file mode 100644 index 0000000..fe60a35 --- /dev/null +++ b/gamestream-webtransport-proxy/src/gamestream/mod.rs @@ -0,0 +1,49 @@ +use anyhow::{Result, anyhow}; +use tokio::sync::mpsc; + +mod config; +mod decoder; + +#[derive(Debug)] +pub struct GamestreamChannels { + pub decoder_rx: mpsc::Receiver, +} + +pub fn start_connection( + stream: crate::server::Stream, + address: &str, +) -> Result { + let mut server_info = config::server_info( + &stream.url, + &stream.app_version, + &stream.gfe_version, + address, + stream.server_codec_mode_support, + )?; + let mut stream_config = config::stream_config(&stream); + let mut listener_callbacks = config::listener_callbacks(); + let (mut decoder_callbacks, decoder_rx) = decoder::decoder_callbacks()?; + let ret; + unsafe { + ret = moonlight_common_c_sys::LiStartConnection( + &mut server_info, + &mut stream_config, + &mut listener_callbacks, + &mut decoder_callbacks, + std::ptr::null_mut(), + std::ptr::null_mut(), + 0, + std::ptr::null_mut(), + 0, + ); + } + + match ret { + 0 => Ok(GamestreamChannels { decoder_rx }), + _ => Err(anyhow!("Gamestream connection failed: {ret}")), + } +} + +pub fn stop_connection() { + unsafe { moonlight_common_c_sys::LiStopConnection() }; +} diff --git a/gamestream-webtransport-proxy/src/main.rs b/gamestream-webtransport-proxy/src/main.rs index bac6662..26e2d86 100644 --- a/gamestream-webtransport-proxy/src/main.rs +++ b/gamestream-webtransport-proxy/src/main.rs @@ -1,112 +1,14 @@ use salvo::logging::Logger; use salvo::prelude::*; -use std::ffi::CString; - -use moonlight_common_c_sys::{ - _SERVER_INFORMATION, _STREAM_CONFIGURATION, COLOR_RANGE_LIMITED, COLORSPACE_REC_601, - CONNECTION_LISTENER_CALLBACKS, ENCFLG_NONE, SCM_H264, STREAM_CFG_LOCAL, VIDEO_FORMAT_H264, -}; mod apps; mod certs; mod common; +mod gamestream; mod pair; mod server; mod state; - -#[allow(unused)] -fn get_server_info() -> _SERVER_INFORMATION { - _SERVER_INFORMATION { - // TODO: these all leak - address: CString::new("10.0.1.8") - .expect("CString::new failed") - .into_raw(), - serverInfoAppVersion: CString::new("foo").expect("CString::new failed").into_raw(), - serverInfoGfeVersion: CString::new("foo").expect("CString::new failed").into_raw(), - rtspSessionUrl: CString::new("foo").expect("CString::new failed").into_raw(), - serverCodecModeSupport: SCM_H264, - } -} - -#[allow(unused)] -fn get_stream_config() -> _STREAM_CONFIGURATION { - let mut remote_input_aes_key_u8: [u8; 16] = [0; 16]; - let remote_input_aes_iv: [i8; 16] = [0; 16]; - - getrandom::fill(&mut remote_input_aes_key_u8) - .expect("Failed to generate cryptographic random bytes"); - - let remote_input_aes_key: [i8; 16] = - unsafe { *(&mut remote_input_aes_key_u8 as *mut [u8; 16] as *mut [i8; 16]) }; - - _STREAM_CONFIGURATION { - width: 1280, - height: 720, - fps: 60, - bitrate: 50 * 1024 * 1024, - packetSize: 1024, - streamingRemotely: STREAM_CFG_LOCAL, - audioConfiguration: (0x3 << 16) | (2 << 8) | 0xCA, - supportedVideoFormats: VIDEO_FORMAT_H264, - clientRefreshRateX100: 6000, - colorSpace: COLORSPACE_REC_601, - colorRange: COLOR_RANGE_LIMITED, - encryptionFlags: ENCFLG_NONE, - remoteInputAesKey: remote_input_aes_key, - remoteInputAesIv: remote_input_aes_iv, - } -} - -unsafe extern "C" { - #[allow(unused)] - fn printf(format: *const i8, ...); -} - -#[allow(unused)] -fn get_listener_callbacks() -> CONNECTION_LISTENER_CALLBACKS { - CONNECTION_LISTENER_CALLBACKS { - stageStarting: None, - stageComplete: None, - stageFailed: None, - connectionStarted: None, - connectionTerminated: None, - logMessage: Some(printf), - rumble: None, - connectionStatusUpdate: None, - setHdrMode: None, - rumbleTriggers: None, - setMotionEventState: None, - setControllerLED: None, - setAdaptiveTriggers: None, - } -} - -//fn barmain() { -// //let server_info = moonlight_common_c_sys::LiInitializeServerInformation(); -// let mut server_info = get_server_info(); -// let mut stream_config = get_stream_config(); -// let mut listener_callbacks = get_listener_callbacks(); -// let ret; -// unsafe { -// ret = moonlight_common_c_sys::LiStartConnection( -// &mut server_info, -// &mut stream_config, -// &mut listener_callbacks, -// std::ptr::null_mut(), -// std::ptr::null_mut(), -// std::ptr::null_mut(), -// 0, -// std::ptr::null_mut(), -// 0, -// ); -// } -// -// println!("{ret}"); -// -// loop {} -// -// //println!("Hello, world!"); -//} +mod stream; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -119,15 +21,26 @@ async fn main() -> anyhow::Result<()> { let router = Router::new() .push(Router::with_path("pair").post(server_arc.post_pair())) - .push(Router::with_path("apps").get(server_arc.get_apps())); + .push(Router::with_path("apps").get(server_arc.get_apps())) + .push(Router::with_path("stream/start").post(server_arc.post_stream_start())) + .push( + Router::with_path("stream/connect/{stream_id}").post(server_arc.post_stream_connect()), + ); let doc = OpenApi::new("test api", "0.0.1").merge_router(&router); let router = router .unshift(doc.into_router("/api-doc/openapi.json")) .unshift(SwaggerUi::new("/api-doc/openapi.json").into_router("/swagger-ui")); let service = Service::new(router).hoop(Logger::new()); - let listener = TcpListener::new("0.0.0.0:3001"); - let acceptor = listener.join(TcpListener::new("0.0.0.0:3000")).bind().await; + //let config = RustlsConfig::new(Keycert::new().cert(cert.as_slice()).key(key.as_slice())); + + let config = certs::get_http_stream_config()?; + + let listener = TcpListener::new("0.0.0.0:3000").rustls(config.clone()); + let acceptor = QuinnListener::new(config, ("0.0.0.0", 5800)) + .join(listener) + .bind() + .await; salvo::Server::new(acceptor).serve(service).await; Ok(()) diff --git a/gamestream-webtransport-proxy/src/pair.rs b/gamestream-webtransport-proxy/src/pair.rs index 4f1d365..ca8f590 100644 --- a/gamestream-webtransport-proxy/src/pair.rs +++ b/gamestream-webtransport-proxy/src/pair.rs @@ -365,7 +365,7 @@ impl crate::server::Server { } // Get or generate cert / private key - let (cert, private_key) = match crate::certs::get_cert_and_key() { + let (cert, private_key) = match crate::certs::get_gamestream_cert_and_key() { Ok(v) => v, Err(e) => { error!("Could not generate certs: {e}"); diff --git a/gamestream-webtransport-proxy/src/server.rs b/gamestream-webtransport-proxy/src/server.rs index ab154d7..00d93ec 100644 --- a/gamestream-webtransport-proxy/src/server.rs +++ b/gamestream-webtransport-proxy/src/server.rs @@ -1,16 +1,100 @@ +use std::collections::HashMap; + use anyhow::Result; +use salvo::oapi::ToSchema; +use serde::Deserialize; +use tokio::sync::RwLock; use crate::state::StateFile; -pub struct Server { - pub state: StateFile, +#[derive(Debug, Clone)] +pub struct InputCrypto { + pub aes_key: [u8; 16], + pub aes_iv: [u8; 16], } +impl InputCrypto { + pub fn new() -> Result { + let mut aes_key = [0u8; 16]; + let mut aes_iv = [0u8; 16]; + openssl::rand::rand_bytes(&mut aes_key)?; + openssl::rand::rand_bytes(&mut aes_iv)?; + + Ok(InputCrypto { aes_key, aes_iv }) + } + + pub fn as_url_params(&self) -> (String, String) { + let aes_key_hex = hex::encode(self.aes_key); + + // not sure why we have to do this but I'm just matching the embedded client behavior + // 1. Generate 16 random bytes + // 2. memcpy the first 4 bytes to a u32 (from_le_bytes) + // 3. Convert that u32 to network order (to_be) and convert it to a string with %d + // (to_string) + let aes_iv_array = <[u8; 4]>::try_from(&self.aes_iv[0..4]).unwrap(); + let aes_iv_u32 = u32::from_le_bytes(aes_iv_array); + let aes_iv_string = aes_iv_u32.to_be().to_string(); + (aes_key_hex, aes_iv_string) + } + + pub fn as_stream_config_params(&self) -> ([i8; 16], [i8; 16]) { + let aes_key_ptr = &self.aes_key as *const [u8; 16]; + let aes_key_ptr_i8 = aes_key_ptr as *const [i8; 16]; + let aes_key_i8 = unsafe { *aes_key_ptr_i8 }; + + let aes_iv_ptr = &self.aes_iv as *const [u8; 16]; + let aes_iv_ptr_i8 = aes_iv_ptr as *const [i8; 16]; + let aes_iv_i8 = unsafe { *aes_iv_ptr_i8 }; + + (aes_key_i8, aes_iv_i8) + } +} + +#[derive(Debug, Clone, ToSchema, Deserialize)] +pub struct Mode { + pub width: i32, + pub height: i32, + pub fps: i32, +} + +impl Mode { + pub fn as_url_string(&self) -> String { + format!("{}x{}x{}", self.width, self.height, self.fps) + } +} + +#[derive(Debug, Clone, ToSchema, Deserialize)] +pub struct StreamConfig { + pub mode: Mode, + pub bitrate_kbps: i32, +} + +#[derive(Debug, Clone)] +pub struct Stream { + pub id: uuid::Uuid, + + pub url: url::Url, + pub game_session: u64, + + pub server_name: String, + pub input_crypto: InputCrypto, + pub stream_config: StreamConfig, + + pub app_version: String, + pub gfe_version: String, + pub server_codec_mode_support: i32, +} + +pub struct Server { + pub state: StateFile, + pub streams: RwLock>, +} impl Server { pub fn new() -> Result { Ok(Server { - state: StateFile::new()? + state: StateFile::new()?, + streams: RwLock::new(HashMap::new()), }) } } diff --git a/gamestream-webtransport-proxy/src/state.rs b/gamestream-webtransport-proxy/src/state.rs index f837b7e..f994043 100644 --- a/gamestream-webtransport-proxy/src/state.rs +++ b/gamestream-webtransport-proxy/src/state.rs @@ -8,7 +8,7 @@ use anyhow::{Result, anyhow}; use serde::{Deserialize, Serialize}; use tokio::sync::{RwLockReadGuard, RwLockWriteGuard}; -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub struct GamestreamServer { pub name: String, pub host: String, diff --git a/gamestream-webtransport-proxy/src/stream.rs b/gamestream-webtransport-proxy/src/stream.rs new file mode 100644 index 0000000..c97adcc --- /dev/null +++ b/gamestream-webtransport-proxy/src/stream.rs @@ -0,0 +1,342 @@ +use std::thread; + +use anyhow::{Context, Result}; +use moonlight_common_c_sys::SCM_H264; +use salvo::prelude::*; +use serde::{Deserialize, Serialize}; +use tokio::{select, sync::oneshot}; +use tracing::{debug, error, info}; + +use crate::{ + common::{AppError, AppResult, get_url}, + state::{GamestreamServer, StateReadAccess, StateReader}, +}; + +#[derive(Deserialize, ToSchema)] +struct PostStreamStartParams { + server: String, + id: u64, + stream_config: crate::server::StreamConfig, + server_mode: Option, +} + +#[derive(Serialize, ToSchema)] +struct PostStreamStartResponse { + stream_id: uuid::Uuid, +} + +#[derive(Deserialize)] +struct ServerInfoResponse { + hostname: String, + appversion: String, + GfeVersion: String, + uniqueid: uuid::Uuid, + HttpsPort: u16, + ExternalPort: u16, + MaxLumaPixelsHEVC: u64, + mac: String, + //ServerCommand: String, + //Permission: u64, + //VirtualDisplayCapable: bool, + //VirtualDisplayDriverReady: bool, + LocalIP: String, + ServerCodecModeSupport: i32, + PairStatus: u64, + currentgame: u64, + state: String, +} + +#[derive(Deserialize)] +struct LaunchResponse { + #[serde(rename = "sessionUrl0")] + session_url_0: url::Url, + #[serde(rename = "gamesession")] + game_session: u64, +} + +fn get_server(reader: &StateReadAccess, server: &String) -> Result> { + let servers = reader.servers().context("Failed to get servers")?; + Ok(servers.get(server).cloned()) +} + +#[craft] +impl crate::server::Server { + #[craft(endpoint(status_codes(StatusCode::OK, StatusCode::INTERNAL_SERVER_ERROR)))] + pub async fn post_stream_start( + self: ::std::sync::Arc, + body: salvo::oapi::extract::JsonBody, + ) -> AppResult> { + let standard_error = Err(crate::common::AppError { + status_code: StatusCode::INTERNAL_SERVER_ERROR, + description: "Could not start stream".to_string(), + }); + + let reader = self.state.read().await; + + let server = match get_server(&reader, &body.server) { + Ok(s) => match s { + Some(s) => s, + None => { + info!("No server with name: {}", body.server); + return Err(AppError { + status_code: StatusCode::BAD_REQUEST, + description: "No server configured with that name.".to_string(), + }); + } + }, + Err(e) => { + error!("Could not get server: {e}"); + return standard_error; + } + }; + + let unique_id = match reader.unique_id() { + Ok(u) => u, + Err(e) => { + error!("could not get unique id: {e}"); + return standard_error; + } + }; + + let mut base_url = crate::common::base_url( + "https", + &server.host, + server.https_port(), + &unique_id, + "serverinfo", + None, + ); + + let server_info = crate::common::get_url(&mut base_url, true).await.unwrap(); + debug!("app_info: {server_info}"); + let server_info: ServerInfoResponse = match serde_xml_rs::from_str(&server_info) { + Ok(s) => s, + Err(e) => { + error!("Could not parse serverinfo response: {e}"); + return standard_error; + } + }; + + //"%s://%s:%d/serverinfo?uniqueid=%s&uuid=%s", + + let input_crypto = match crate::server::InputCrypto::new() { + Ok(i) => i, + Err(e) => { + error!("Could not create input crypto: {e}"); + return standard_error; + } + }; + + let (remote_input_aes_key_string, remote_input_aes_iv_string) = + input_crypto.as_url_params(); + + let mode = match &body.server_mode { + Some(m) => m.as_url_string(), + None => body.stream_config.mode.as_url_string(), + }; + + let id_string = body.id.to_string(); + + let params = vec![ + ("appid", id_string.as_str()), + ("mode", mode.as_str()), + ("additionalStates", "1"), + ("sops", "1"), + ("rikey", remote_input_aes_key_string.as_str()), + ("rikeyid", remote_input_aes_iv_string.as_str()), + ("localAudioPlayMode", "0"), + ("surroundAudioInfo", "196610"), + ("remoteControllersBitmap", "0"), + ("gcmap", "0"), + ("corever", "1"), + ]; + + // moonlight-embedded url params + //https:// + // %s: server->serverInfo.address + // %u/ server->httpsPort, + // %s? server->currentGame ? "resume" : "launch", + // &uniqueid=%s unique_id, + // &uuid=%s uuid_str, + // &appid=%d appId + // &mode=%dx%dx%d config->width, config->height, fps + // &additionalStates=1 + // &sops=%d sops + // &rikey=%s rikey_hex + // &rikeyid=%d rikeyid + // &localAudioPlayMode=%d localaudio + // &surroundAudioInfo=%d surround_info + // &remoteControllersBitmap=%d gamepad_mask + // &gcmap=%d%s%s" gamepad_mask, + + let mut base_url = crate::common::base_url( + "https", + &server.host, + server.https_port(), + &unique_id, + "launch", + Some(params), + ); + + let resp = match get_url(&mut base_url, true).await { + Ok(r) => r, + Err(e) => { + error!("Cannot start game: {e}"); + return standard_error; + } + }; + + debug!("/launch response: {}", resp.replace("\n", "")); + + let launch_response: LaunchResponse = match serde_xml_rs::from_str(&resp) { + Ok(r) => r, + Err(e) => { + error!("Could not parse server launch response: {e}"); + return standard_error; + } + }; + + let stream_id = uuid::Uuid::new_v4(); + + let server_codec_mode_support = if server_info.ServerCodecModeSupport == 0 { + SCM_H264 + } else { + server_info.ServerCodecModeSupport + }; + + let stream = crate::server::Stream { + id: stream_id, + url: launch_response.session_url_0, + game_session: launch_response.game_session, + server_name: server.name.clone(), + stream_config: body.stream_config.clone(), + app_version: server_info.appversion, + gfe_version: server_info.GfeVersion, + server_codec_mode_support, + input_crypto, + }; + + info!( + "Launched stream {stream_id} on {} with config {stream:?}", + server.name + ); + (*self.streams.write().await).insert(stream.id, stream); + + let post_stream_response = PostStreamStartResponse { stream_id }; + + Ok(Json(post_stream_response)) + } + + #[craft(endpoint(status_codes(StatusCode::OK, StatusCode::INTERNAL_SERVER_ERROR)))] + pub async fn post_stream_connect( + self: ::std::sync::Arc, + stream_id: salvo::oapi::extract::PathParam, + req: &mut Request, + ) -> AppResult<()> { + let standard_error = Err(crate::common::AppError { + status_code: StatusCode::INTERNAL_SERVER_ERROR, + description: "Could not start stream".to_string(), + }); + + let session = match req.web_transport_mut().await { + Ok(w) => w, + Err(e) => { + error!("Could not initalize WebTransport connection: {e}"); + return Err(AppError { + status_code: StatusCode::BAD_REQUEST, + description: "User did not connect with a WebTransport connection".to_string(), + }); + } + }; + + let stream_id = stream_id.into_inner(); + let stream = match self.streams.read().await.get(&stream_id).cloned() { + Some(s) => s, + None => { + error!("Could not find stream with id {stream_id}"); + return Err(AppError { + status_code: StatusCode::BAD_REQUEST, + description: "No stream with given ID".to_string(), + }); + } + }; + + let servers = match self.state.read().await.servers() { + Ok(s) => s, + Err(e) => { + error!("Could not get servers: {e}"); + return standard_error; + } + }; + + let server = match servers.get(&stream.server_name) { + Some(s) => s, + None => { + error!( + "Could not find server {} pointed to by stream {}", + stream.server_name, stream.id + ); + return standard_error; + } + }; + + debug!( + "Connecting to stream on server {} with stream config {:?}", + server.name, stream + ); + + let (tx, rx) = oneshot::channel(); + let (stop_tx, stop_rx) = oneshot::channel::<()>(); + + let host = server.host.clone(); + thread::spawn(move || { + let result = crate::gamestream::start_connection(stream, &host); + + let _ = tx.send(result); + + let _ = stop_rx.blocking_recv(); + + crate::gamestream::stop_connection(); + }); + + let mut channels = match rx.await { + Ok(r) => match r { + Ok(r) => r, + Err(e) => { + error!("Could not get gamestream communication channels: {e}"); + return standard_error; + } + }, + Err(e) => { + error!("Could not start connection: {e}"); + return standard_error; + } + }; + + loop { + select! { + recv = channels.decoder_rx.recv() => { + match recv { + Some(frame) => { + info!("Got decoder packet") + } + None => { + error!("Decoder channel is None"); + break; + } + } + } + } + } + + //// Handle bidirectional streams + //if let Ok(Some(webtransport::server::AcceptedBi::BidiStream(_, stream))) = + // session.accept_bi().await + //{ + // let (send, recv) = salvo::proto::quic::BidiStream::split(stream); + // // Process bidirectional stream data + //} + + Ok(()) + } +}