backend: add support for receiving a gamestream stream

This commit is contained in:
2025-07-20 01:13:28 -06:00
parent e3892e7134
commit 188005ab11
13 changed files with 1003 additions and 126 deletions
Generated
+91
View File
@@ -572,6 +572,21 @@ version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c"
[[package]]
name = "futures"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]] [[package]]
name = "futures-channel" name = "futures-channel"
version = "0.3.31" version = "0.3.31"
@@ -579,6 +594,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-sink",
] ]
[[package]] [[package]]
@@ -587,6 +603,17 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-executor"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]] [[package]]
name = "futures-io" name = "futures-io"
version = "0.3.31" version = "0.3.31"
@@ -622,6 +649,7 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [ dependencies = [
"futures-channel",
"futures-core", "futures-core",
"futures-io", "futures-io",
"futures-macro", "futures-macro",
@@ -653,6 +681,7 @@ dependencies = [
"tokio", "tokio",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
"url",
"url-constructor", "url-constructor",
"uuid", "uuid",
] ]
@@ -735,6 +764,46 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "h3"
version = "0.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10872b55cfb02a821b69dc7cf8dc6a71d6af25eb9a79662bec4a9d016056b3be"
dependencies = [
"bytes",
"fastrand",
"futures-util",
"http",
"pin-project-lite",
"tokio",
]
[[package]]
name = "h3-datagram"
version = "0.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d2c9f77921668673721ae40f17c729fc48b9e38a663858097cea547484fdf0f"
dependencies = [
"bytes",
"h3",
"pin-project-lite",
]
[[package]]
name = "h3-quinn"
version = "0.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b2e732c8d91a74731663ac8479ab505042fbf547b9a207213ab7fbcbfc4f8b4"
dependencies = [
"bytes",
"futures",
"h3",
"h3-datagram",
"quinn",
"tokio",
"tokio-util",
]
[[package]] [[package]]
name = "hashbrown" name = "hashbrown"
version = "0.12.3" version = "0.12.3"
@@ -1617,6 +1686,7 @@ checksum = "c3bd15a6f2967aef83887dcb9fec0014580467e33720d073560cf015a5683012"
dependencies = [ dependencies = [
"bytes", "bytes",
"cfg_aliases", "cfg_aliases",
"futures-io",
"pin-project-lite", "pin-project-lite",
"quinn-proto", "quinn-proto",
"quinn-udp", "quinn-udp",
@@ -2071,6 +2141,23 @@ dependencies = [
"syn 2.0.104", "syn 2.0.104",
] ]
[[package]]
name = "salvo-http3"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8dc5ede68d4df95dbe7af4483438e739f0e8b41c679615ca864559434f24d07a"
dependencies = [
"bytes",
"futures-util",
"h3",
"h3-datagram",
"h3-quinn",
"http",
"pin-project-lite",
"tokio",
"tracing",
]
[[package]] [[package]]
name = "salvo-jwt-auth" name = "salvo-jwt-auth"
version = "0.80.0" version = "0.80.0"
@@ -2184,6 +2271,7 @@ dependencies = [
"form_urlencoded", "form_urlencoded",
"futures-channel", "futures-channel",
"futures-util", "futures-util",
"h3-datagram",
"headers", "headers",
"http", "http",
"http-body-util", "http-body-util",
@@ -2199,9 +2287,11 @@ dependencies = [
"parking_lot", "parking_lot",
"percent-encoding", "percent-encoding",
"pin-project", "pin-project",
"quinn",
"rand 0.9.1", "rand 0.9.1",
"regex", "regex",
"rustls-pemfile", "rustls-pemfile",
"salvo-http3",
"salvo_macros", "salvo_macros",
"serde", "serde",
"serde-xml-rs", "serde-xml-rs",
@@ -2962,6 +3052,7 @@ dependencies = [
"form_urlencoded", "form_urlencoded",
"idna", "idna",
"percent-encoding", "percent-encoding",
"serde",
] ]
[[package]] [[package]]
+2 -1
View File
@@ -16,12 +16,13 @@ reqwest = { version = "0.12.20", features = [
"rustls-tls", "rustls-tls",
"native-tls", "native-tls",
], default-features = false } ], default-features = false }
salvo = { version = "0.80.0", features = ["oapi", "craft", "logging"] } salvo = { version = "0.80.0", features = ["oapi", "craft", "logging", "quinn"] }
serde = { version = "1.0.219", features = ["serde_derive"] } serde = { version = "1.0.219", features = ["serde_derive"] }
serde-xml-rs = "0.8.1" serde-xml-rs = "0.8.1"
serde_json = "1.0.140" serde_json = "1.0.140"
tokio = { version = "1.45.1", features = ["full"] } tokio = { version = "1.45.1", features = ["full"] }
tracing = "0.1.41" tracing = "0.1.41"
tracing-subscriber = "0.3.19" tracing-subscriber = "0.3.19"
url = { version = "2.5.4", features = ["serde"] }
url-constructor = "0.1.0" url-constructor = "0.1.0"
uuid = { version = "1.17.0", features = ["v4", "serde"] } uuid = { version = "1.17.0", features = ["v4", "serde"] }
+2 -2
View File
@@ -10,8 +10,8 @@ use crate::{common::AppResult, state::StateReader};
struct AppListRespApp { struct AppListRespApp {
#[serde(rename = "AppTitle")] #[serde(rename = "AppTitle")]
app_title: String, app_title: String,
#[serde(rename = "UUID")] //#[serde(rename = "UUID")]
uuid: uuid::Uuid, //uuid: uuid::Uuid,
#[serde(rename = "IsHdrSupported")] #[serde(rename = "IsHdrSupported")]
is_hdr_supported: bool, is_hdr_supported: bool,
#[serde(rename = "ID")] #[serde(rename = "ID")]
+61 -14
View File
@@ -7,6 +7,7 @@ use openssl::hash::MessageDigest;
use openssl::pkey::{PKey, Private}; use openssl::pkey::{PKey, Private};
use openssl::rsa::Rsa; use openssl::rsa::Rsa;
use openssl::x509::X509; use openssl::x509::X509;
use salvo::conn::rustls::{Keycert, RustlsConfig};
pub fn get_and_create_cert_dir() -> Result<PathBuf> { pub fn get_and_create_cert_dir() -> Result<PathBuf> {
let project_dirs = let project_dirs =
@@ -19,19 +20,35 @@ pub fn get_and_create_cert_dir() -> Result<PathBuf> {
Ok(cert_dir) Ok(cert_dir)
} }
pub fn get_cert_and_key() -> Result<(X509, PKey<Private>)> { pub fn get_gamestream_cert_and_key() -> Result<(X509, PKey<Private>)> {
if let Ok((cert, key)) = load_cert_and_key_from_disk() { if let Ok((cert, key)) = load_cert_and_key_from_disk("gamestream-cert", "gamestream-key") {
Ok((cert, key)) Ok((cert, key))
} else { } else {
generate_cert_and_key() generate_gamestream_cert_and_key()
} }
} }
pub fn load_cert_and_key_from_disk() -> Result<(X509, PKey<Private>)> { pub fn get_http_stream_config() -> Result<RustlsConfig> {
let (cert, key) = match load_cert_and_key_from_disk("http-cert", "http-key") {
Ok((cert, key)) => (cert, key),
Err(_) => generate_http_cert_and_key()?,
};
Ok(RustlsConfig::new(
Keycert::new()
.cert(cert.to_pem()?)
.key(key.private_key_to_pem_pkcs8()?),
))
}
fn load_cert_and_key_from_disk(
cert_filename: &str,
key_filename: &str,
) -> Result<(X509, PKey<Private>)> {
let cert_dir = get_and_create_cert_dir()?; let cert_dir = get_and_create_cert_dir()?;
let cert_filepath = cert_dir.join("cert"); let cert_filepath = cert_dir.join(cert_filename);
let key_filepath = cert_dir.join("key"); let key_filepath = cert_dir.join(key_filename);
let cert_bytes = fs::read(cert_filepath)?; let cert_bytes = fs::read(cert_filepath)?;
let key_bytes = fs::read(key_filepath)?; let key_bytes = fs::read(key_filepath)?;
@@ -42,7 +59,32 @@ pub fn load_cert_and_key_from_disk() -> Result<(X509, PKey<Private>)> {
Ok((cert, key)) Ok((cert, key))
} }
pub fn generate_cert_and_key() -> Result<(X509, PKey<Private>)> { fn generate_http_cert_and_key() -> Result<(X509, PKey<Private>)> {
let rsa = Rsa::generate(2048)?;
let key = PKey::from_rsa(rsa)?;
let mut cert_builder = X509::builder()?;
let now_unix = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();
let now = openssl::asn1::Asn1Time::from_unix(now_unix as i64)?;
let thirteen_days_from_now = openssl::asn1::Asn1Time::days_from_now(13)?;
cert_builder.set_version(2)?;
cert_builder.set_not_before(&now)?;
cert_builder.set_not_after(&thirteen_days_from_now)?;
cert_builder.set_pubkey(&key)?;
cert_builder.sign(&key, MessageDigest::sha256())?;
let cert = cert_builder.build();
save_cert_and_key_to_disk(&cert, &key, "http-cert", "http-key")?;
Ok((cert, key))
}
fn generate_gamestream_cert_and_key() -> Result<(X509, PKey<Private>)> {
let rsa = Rsa::generate(2048)?; let rsa = Rsa::generate(2048)?;
let key = PKey::from_rsa(rsa)?; let key = PKey::from_rsa(rsa)?;
@@ -73,15 +115,20 @@ pub fn generate_cert_and_key() -> Result<(X509, PKey<Private>)> {
cert_builder.sign(&key, MessageDigest::sha256())?; cert_builder.sign(&key, MessageDigest::sha256())?;
let cert = cert_builder.build(); let cert = cert_builder.build();
save_cert_and_key_to_disk(&cert, &key)?; save_cert_and_key_to_disk(&cert, &key, "gamestream-cert", "gamestream-key")?;
Ok((cert, key)) Ok((cert, key))
} }
pub fn save_cert_and_key_to_disk(cert: &X509, key: &PKey<Private>) -> Result<()> { fn save_cert_and_key_to_disk(
cert: &X509,
key: &PKey<Private>,
cert_filename: &str,
key_filename: &str,
) -> Result<()> {
let cert_dir = get_and_create_cert_dir()?; let cert_dir = get_and_create_cert_dir()?;
let cert_filepath = cert_dir.join("cert"); let cert_filepath = cert_dir.join(cert_filename);
let key_filepath = cert_dir.join("key"); let key_filepath = cert_dir.join(key_filename);
let mut cert_file_builder = std::fs::OpenOptions::new(); let mut cert_file_builder = std::fs::OpenOptions::new();
cert_file_builder.create(true); cert_file_builder.create(true);
@@ -110,10 +157,10 @@ pub fn save_cert_and_key_to_disk(cert: &X509, key: &PKey<Private>) -> Result<()>
Ok(()) Ok(())
} }
pub fn identity() -> Result<reqwest::tls::Identity> { pub fn gamestream_identity() -> Result<reqwest::tls::Identity> {
let cert_dir = get_and_create_cert_dir()?; let cert_dir = get_and_create_cert_dir()?;
let cert_filepath = cert_dir.join("cert"); let cert_filepath = cert_dir.join("gamestream-cert");
let key_filepath = cert_dir.join("key"); let key_filepath = cert_dir.join("gamestream-key");
let cert_bytes = fs::read(cert_filepath)?; let cert_bytes = fs::read(cert_filepath)?;
let key_bytes = fs::read(key_filepath)?; let key_bytes = fs::read(key_filepath)?;
+1 -1
View File
@@ -84,7 +84,7 @@ pub async fn get_url(
http_builder = http_builder.user_agent("Mozilla/5.0"); http_builder = http_builder.user_agent("Mozilla/5.0");
http_builder = http_builder.danger_accept_invalid_certs(true); http_builder = http_builder.danger_accept_invalid_certs(true);
if with_identity { if with_identity {
let identity = crate::certs::identity()?; let identity = crate::certs::gamestream_identity()?;
http_builder = http_builder.identity(identity); http_builder = http_builder.identity(identity);
} }
@@ -0,0 +1,72 @@
use anyhow::Result;
use std::ffi::CString;
use moonlight_common_c_sys::{
_SERVER_INFORMATION, _STREAM_CONFIGURATION, COLOR_RANGE_LIMITED, COLORSPACE_REC_601,
CONNECTION_LISTENER_CALLBACKS, ENCFLG_NONE, SCM_H264, STREAM_CFG_AUTO, STREAM_CFG_LOCAL,
VIDEO_FORMAT_H264, VIDEO_FORMAT_H265,
};
unsafe extern "C" {
#[allow(unused)]
fn printf(format: *const i8, ...);
}
extern "C" fn conn_listener_stage_starting(stage: std::os::raw::c_int) {
println!("Stage starting: {}", stage);
}
pub fn server_info(
stream_url: &url::Url,
app_version: &str,
gfe_version: &str,
address: &str,
server_codec_mode_support: i32,
) -> Result<_SERVER_INFORMATION> {
Ok(_SERVER_INFORMATION {
address: CString::new(address)?.into_raw(),
serverInfoAppVersion: CString::new(app_version)?.into_raw(),
serverInfoGfeVersion: CString::new(gfe_version)?.into_raw(),
rtspSessionUrl: CString::new(stream_url.as_str())?.into_raw(),
serverCodecModeSupport: server_codec_mode_support,
})
}
pub fn stream_config(stream: &crate::server::Stream) -> _STREAM_CONFIGURATION {
let (aes_key, aes_iv) = stream.input_crypto.as_stream_config_params();
_STREAM_CONFIGURATION {
width: stream.stream_config.mode.width,
height: stream.stream_config.mode.height,
fps: stream.stream_config.mode.fps,
bitrate: stream.stream_config.bitrate_kbps,
packetSize: 512,
streamingRemotely: STREAM_CFG_AUTO,
audioConfiguration: (0x3 << 16) | (2 << 8) | 0xCA,
supportedVideoFormats: VIDEO_FORMAT_H265,
clientRefreshRateX100: 0,
colorSpace: COLORSPACE_REC_601,
colorRange: COLOR_RANGE_LIMITED,
encryptionFlags: ENCFLG_NONE,
remoteInputAesKey: aes_key,
remoteInputAesIv: aes_iv,
}
}
pub fn listener_callbacks() -> CONNECTION_LISTENER_CALLBACKS {
CONNECTION_LISTENER_CALLBACKS {
stageStarting: None,
stageComplete: None,
stageFailed: None,
connectionStarted: None,
connectionTerminated: None,
logMessage: Some(printf),
rumble: None,
connectionStatusUpdate: None,
setHdrMode: None,
rumbleTriggers: None,
setMotionEventState: None,
setControllerLED: None,
setAdaptiveTriggers: None,
}
}
@@ -0,0 +1,278 @@
use std::{slice, sync::RwLock};
use anyhow::{Result, anyhow};
use moonlight_common_c_sys::{
_DECODE_UNIT, _LENTRY, CAPABILITY_DIRECT_SUBMIT, DECODER_RENDERER_CALLBACKS, FRAME_TYPE_PFRAME,
PDECODE_UNIT, VIDEO_FORMAT_H264_HIGH8_444, VIDEO_FORMAT_H265, VIDEO_FORMAT_H265_MAIN10,
VIDEO_FORMAT_H265_REXT8_444, VIDEO_FRAME_HANDLE,
};
use salvo::{http::body::Frame, hyper::body::Buf};
use tokio::sync::mpsc;
use tracing::{debug, error};
enum FrameType {
PFRAME,
IDR,
}
impl TryFrom<i32> for FrameType {
type Error = anyhow::Error;
fn try_from(val: i32) -> Result<Self, Self::Error> {
Ok(match val {
moonlight_common_c_sys::FRAME_TYPE_PFRAME => FrameType::PFRAME,
moonlight_common_c_sys::FRAME_TYPE_IDR => FrameType::IDR,
val => {
return Err(anyhow!("Unknown frame type {val}"));
}
})
}
}
enum VideoFormat {
H264,
H264_HIGH8_444,
H265,
H265_MAIN10,
H265_REXT8_444,
H265_REXT10_444,
AV1_MAIN8,
AV1_MAIN10,
AV1_HIGH8_444,
AV1_HIGH10_444,
}
impl TryFrom<i32> for VideoFormat {
type Error = anyhow::Error;
fn try_from(val: i32) -> Result<Self, Self::Error> {
Ok(match val {
moonlight_common_c_sys::VIDEO_FORMAT_H264 => VideoFormat::H264,
moonlight_common_c_sys::VIDEO_FORMAT_H264_HIGH8_444 => VideoFormat::H264_HIGH8_444,
moonlight_common_c_sys::VIDEO_FORMAT_H265 => VideoFormat::H265,
moonlight_common_c_sys::VIDEO_FORMAT_H265_MAIN10 => VideoFormat::H265_MAIN10,
moonlight_common_c_sys::VIDEO_FORMAT_H265_REXT8_444 => VideoFormat::H265_REXT8_444,
moonlight_common_c_sys::VIDEO_FORMAT_H265_REXT10_444 => VideoFormat::H265_REXT10_444,
moonlight_common_c_sys::VIDEO_FORMAT_AV1_MAIN8 => VideoFormat::AV1_MAIN8,
moonlight_common_c_sys::VIDEO_FORMAT_AV1_MAIN10 => VideoFormat::AV1_MAIN10,
moonlight_common_c_sys::VIDEO_FORMAT_AV1_HIGH8_444 => VideoFormat::AV1_HIGH8_444,
moonlight_common_c_sys::VIDEO_FORMAT_AV1_HIGH10_444 => VideoFormat::AV1_HIGH10_444,
val => {
return Err(anyhow!("Unknown video format {val}"));
}
})
}
}
enum BufferType {
PICDATA,
SPS,
PPS,
VPS,
}
impl TryFrom<i32> for BufferType {
type Error = anyhow::Error;
fn try_from(val: i32) -> Result<Self, Self::Error> {
Ok(match val {
moonlight_common_c_sys::BUFFER_TYPE_PICDATA => BufferType::PICDATA,
moonlight_common_c_sys::BUFFER_TYPE_SPS => BufferType::SPS,
moonlight_common_c_sys::BUFFER_TYPE_PPS => BufferType::PPS,
moonlight_common_c_sys::BUFFER_TYPE_VPS => BufferType::VPS,
_ => {
return Err(anyhow!("Unknown buffer type {val}"));
}
})
}
}
struct Buffer {
data: Vec<u8>,
buffer_type: BufferType,
}
impl TryFrom<_LENTRY> for Buffer {
type Error = anyhow::Error;
fn try_from(val: _LENTRY) -> Result<Self, Self::Error> {
let length = <usize>::try_from(val.length)?;
let data = unsafe { slice::from_raw_parts(val.data as *const u8, length) }.to_vec();
let buffer_type = BufferType::try_from(val.bufferType)?;
Ok(Buffer { data, buffer_type })
}
}
pub enum RendererMessage {
Setup {
video_format: VideoFormat,
width: u64,
height: u64,
redraw_rate: u64,
dr_flags: i32,
},
DecodeUnit {
frame_number: u64,
frame_type: FrameType,
host_processing_latency: u16,
receieve_time_ms: u64,
enqueue_time_ms: u64,
presentation_time: u64,
full_length: usize,
buffers: Vec<Buffer>,
hdr_active: bool,
colorspace: u8,
},
}
impl RendererMessage {
fn from_setup_cb_args(
video_format: std::os::raw::c_int,
width: std::os::raw::c_int,
height: std::os::raw::c_int,
redraw_rate: std::os::raw::c_int,
dr_flags: std::os::raw::c_int,
) -> Result<Self> {
Ok(RendererMessage::Setup {
video_format: VideoFormat::try_from(video_format)?,
width: <u64>::try_from(width)?,
height: <u64>::try_from(height)?,
redraw_rate: <u64>::try_from(redraw_rate)?,
dr_flags,
})
}
fn from_decode_unit(decode_unit: _DECODE_UNIT) -> Result<Self> {
let mut buffers = Vec::new();
if decode_unit.bufferList.is_null() {
return Err(anyhow!("DecodeUnit bufferList is null"));
}
let mut next = unsafe { *decode_unit.bufferList };
loop {
if next.next.is_null() {
break;
}
let buffer = Buffer::try_from(next)?;
buffers.push(buffer);
next = unsafe { *next.next };
}
Ok(RendererMessage::DecodeUnit {
frame_number: <u64>::try_from(decode_unit.frameNumber)?,
frame_type: FrameType::try_from(decode_unit.frameType)?,
host_processing_latency: decode_unit.frameHostProcessingLatency,
receieve_time_ms: decode_unit.receiveTimeMs,
enqueue_time_ms: decode_unit.enqueueTimeMs,
presentation_time: decode_unit.presentationTimeMs as u64,
full_length: <usize>::try_from(decode_unit.fullLength)?,
buffers,
hdr_active: decode_unit.hdrActive,
colorspace: decode_unit.colorspace,
})
}
}
static DECODER_SENDER: RwLock<Option<mpsc::Sender<RendererMessage>>> = RwLock::new(None);
fn send_message(msg: RendererMessage) -> i32 {
let read_guard = match DECODER_SENDER.read() {
Ok(r) => r,
Err(e) => {
error!("Could not lock RendererMessage RwLock: {e}");
return -1;
}
};
match read_guard.as_ref() {
Some(sender) => match sender.blocking_send(msg) {
Ok(()) => 0,
Err(e) => {
error!("Could not send to RendererMessage channel: {e}");
-1
}
},
None => {
error!("RendererMessage sender was not initalized");
-1
}
}
}
extern "C" fn setup_cb(
video_format: std::os::raw::c_int,
width: std::os::raw::c_int,
height: std::os::raw::c_int,
redraw_rate: std::os::raw::c_int,
_context: *mut std::os::raw::c_void,
dr_flags: std::os::raw::c_int,
) -> std::os::raw::c_int {
debug!("SETUP CB");
let msg = match RendererMessage::from_setup_cb_args(
video_format,
width,
height,
redraw_rate,
dr_flags,
) {
Ok(m) => m,
Err(e) => {
error!("Cannot construct RendererMessage: {e}");
return -1;
}
};
send_message(msg)
}
extern "C" fn start_cb() {
debug!("START CB");
}
extern "C" fn submit_decode_unit_cb(decode_unit: PDECODE_UNIT) -> std::os::raw::c_int {
debug!("SUBMIT DECODE UNIT CB");
if decode_unit.is_null() {
error!("Decode unit pointer was null");
return -1;
}
let decode_unit = unsafe { *decode_unit };
let msg = match RendererMessage::from_decode_unit(decode_unit) {
Ok(m) => m,
Err(e) => {
error!("Cannot construct RendererMessage: {e}");
return -1;
}
};
send_message(msg)
}
pub fn decoder_callbacks() -> Result<(DECODER_RENDERER_CALLBACKS, mpsc::Receiver<RendererMessage>)>
{
let (tx, rx) = mpsc::channel(100);
let mut writer = DECODER_SENDER.write().unwrap();
*writer = Some(tx);
Ok((
DECODER_RENDERER_CALLBACKS {
setup: Some(setup_cb),
start: Some(start_cb),
stop: None,
cleanup: None,
submitDecodeUnit: Some(submit_decode_unit_cb),
capabilities: CAPABILITY_DIRECT_SUBMIT,
},
rx,
))
}
@@ -0,0 +1,49 @@
use anyhow::{Result, anyhow};
use tokio::sync::mpsc;
mod config;
mod decoder;
#[derive(Debug)]
pub struct GamestreamChannels {
pub decoder_rx: mpsc::Receiver<decoder::RendererMessage>,
}
pub fn start_connection(
stream: crate::server::Stream,
address: &str,
) -> Result<GamestreamChannels> {
let mut server_info = config::server_info(
&stream.url,
&stream.app_version,
&stream.gfe_version,
address,
stream.server_codec_mode_support,
)?;
let mut stream_config = config::stream_config(&stream);
let mut listener_callbacks = config::listener_callbacks();
let (mut decoder_callbacks, decoder_rx) = decoder::decoder_callbacks()?;
let ret;
unsafe {
ret = moonlight_common_c_sys::LiStartConnection(
&mut server_info,
&mut stream_config,
&mut listener_callbacks,
&mut decoder_callbacks,
std::ptr::null_mut(),
std::ptr::null_mut(),
0,
std::ptr::null_mut(),
0,
);
}
match ret {
0 => Ok(GamestreamChannels { decoder_rx }),
_ => Err(anyhow!("Gamestream connection failed: {ret}")),
}
}
pub fn stop_connection() {
unsafe { moonlight_common_c_sys::LiStopConnection() };
}
+16 -103
View File
@@ -1,112 +1,14 @@
use salvo::logging::Logger; use salvo::logging::Logger;
use salvo::prelude::*; use salvo::prelude::*;
use std::ffi::CString;
use moonlight_common_c_sys::{
_SERVER_INFORMATION, _STREAM_CONFIGURATION, COLOR_RANGE_LIMITED, COLORSPACE_REC_601,
CONNECTION_LISTENER_CALLBACKS, ENCFLG_NONE, SCM_H264, STREAM_CFG_LOCAL, VIDEO_FORMAT_H264,
};
mod apps; mod apps;
mod certs; mod certs;
mod common; mod common;
mod gamestream;
mod pair; mod pair;
mod server; mod server;
mod state; mod state;
mod stream;
#[allow(unused)]
fn get_server_info() -> _SERVER_INFORMATION {
_SERVER_INFORMATION {
// TODO: these all leak
address: CString::new("10.0.1.8")
.expect("CString::new failed")
.into_raw(),
serverInfoAppVersion: CString::new("foo").expect("CString::new failed").into_raw(),
serverInfoGfeVersion: CString::new("foo").expect("CString::new failed").into_raw(),
rtspSessionUrl: CString::new("foo").expect("CString::new failed").into_raw(),
serverCodecModeSupport: SCM_H264,
}
}
#[allow(unused)]
fn get_stream_config() -> _STREAM_CONFIGURATION {
let mut remote_input_aes_key_u8: [u8; 16] = [0; 16];
let remote_input_aes_iv: [i8; 16] = [0; 16];
getrandom::fill(&mut remote_input_aes_key_u8)
.expect("Failed to generate cryptographic random bytes");
let remote_input_aes_key: [i8; 16] =
unsafe { *(&mut remote_input_aes_key_u8 as *mut [u8; 16] as *mut [i8; 16]) };
_STREAM_CONFIGURATION {
width: 1280,
height: 720,
fps: 60,
bitrate: 50 * 1024 * 1024,
packetSize: 1024,
streamingRemotely: STREAM_CFG_LOCAL,
audioConfiguration: (0x3 << 16) | (2 << 8) | 0xCA,
supportedVideoFormats: VIDEO_FORMAT_H264,
clientRefreshRateX100: 6000,
colorSpace: COLORSPACE_REC_601,
colorRange: COLOR_RANGE_LIMITED,
encryptionFlags: ENCFLG_NONE,
remoteInputAesKey: remote_input_aes_key,
remoteInputAesIv: remote_input_aes_iv,
}
}
unsafe extern "C" {
#[allow(unused)]
fn printf(format: *const i8, ...);
}
#[allow(unused)]
fn get_listener_callbacks() -> CONNECTION_LISTENER_CALLBACKS {
CONNECTION_LISTENER_CALLBACKS {
stageStarting: None,
stageComplete: None,
stageFailed: None,
connectionStarted: None,
connectionTerminated: None,
logMessage: Some(printf),
rumble: None,
connectionStatusUpdate: None,
setHdrMode: None,
rumbleTriggers: None,
setMotionEventState: None,
setControllerLED: None,
setAdaptiveTriggers: None,
}
}
//fn barmain() {
// //let server_info = moonlight_common_c_sys::LiInitializeServerInformation();
// let mut server_info = get_server_info();
// let mut stream_config = get_stream_config();
// let mut listener_callbacks = get_listener_callbacks();
// let ret;
// unsafe {
// ret = moonlight_common_c_sys::LiStartConnection(
// &mut server_info,
// &mut stream_config,
// &mut listener_callbacks,
// std::ptr::null_mut(),
// std::ptr::null_mut(),
// std::ptr::null_mut(),
// 0,
// std::ptr::null_mut(),
// 0,
// );
// }
//
// println!("{ret}");
//
// loop {}
//
// //println!("Hello, world!");
//}
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
@@ -119,15 +21,26 @@ async fn main() -> anyhow::Result<()> {
let router = Router::new() let router = Router::new()
.push(Router::with_path("pair").post(server_arc.post_pair())) .push(Router::with_path("pair").post(server_arc.post_pair()))
.push(Router::with_path("apps").get(server_arc.get_apps())); .push(Router::with_path("apps").get(server_arc.get_apps()))
.push(Router::with_path("stream/start").post(server_arc.post_stream_start()))
.push(
Router::with_path("stream/connect/{stream_id}").post(server_arc.post_stream_connect()),
);
let doc = OpenApi::new("test api", "0.0.1").merge_router(&router); let doc = OpenApi::new("test api", "0.0.1").merge_router(&router);
let router = router let router = router
.unshift(doc.into_router("/api-doc/openapi.json")) .unshift(doc.into_router("/api-doc/openapi.json"))
.unshift(SwaggerUi::new("/api-doc/openapi.json").into_router("/swagger-ui")); .unshift(SwaggerUi::new("/api-doc/openapi.json").into_router("/swagger-ui"));
let service = Service::new(router).hoop(Logger::new()); let service = Service::new(router).hoop(Logger::new());
let listener = TcpListener::new("0.0.0.0:3001"); //let config = RustlsConfig::new(Keycert::new().cert(cert.as_slice()).key(key.as_slice()));
let acceptor = listener.join(TcpListener::new("0.0.0.0:3000")).bind().await;
let config = certs::get_http_stream_config()?;
let listener = TcpListener::new("0.0.0.0:3000").rustls(config.clone());
let acceptor = QuinnListener::new(config, ("0.0.0.0", 5800))
.join(listener)
.bind()
.await;
salvo::Server::new(acceptor).serve(service).await; salvo::Server::new(acceptor).serve(service).await;
Ok(()) Ok(())
+1 -1
View File
@@ -365,7 +365,7 @@ impl crate::server::Server {
} }
// Get or generate cert / private key // Get or generate cert / private key
let (cert, private_key) = match crate::certs::get_cert_and_key() { let (cert, private_key) = match crate::certs::get_gamestream_cert_and_key() {
Ok(v) => v, Ok(v) => v,
Err(e) => { Err(e) => {
error!("Could not generate certs: {e}"); error!("Could not generate certs: {e}");
+87 -3
View File
@@ -1,16 +1,100 @@
use std::collections::HashMap;
use anyhow::Result; use anyhow::Result;
use salvo::oapi::ToSchema;
use serde::Deserialize;
use tokio::sync::RwLock;
use crate::state::StateFile; use crate::state::StateFile;
pub struct Server { #[derive(Debug, Clone)]
pub state: StateFile, pub struct InputCrypto {
pub aes_key: [u8; 16],
pub aes_iv: [u8; 16],
} }
impl InputCrypto {
pub fn new() -> Result<Self> {
let mut aes_key = [0u8; 16];
let mut aes_iv = [0u8; 16];
openssl::rand::rand_bytes(&mut aes_key)?;
openssl::rand::rand_bytes(&mut aes_iv)?;
Ok(InputCrypto { aes_key, aes_iv })
}
pub fn as_url_params(&self) -> (String, String) {
let aes_key_hex = hex::encode(self.aes_key);
// not sure why we have to do this but I'm just matching the embedded client behavior
// 1. Generate 16 random bytes
// 2. memcpy the first 4 bytes to a u32 (from_le_bytes)
// 3. Convert that u32 to network order (to_be) and convert it to a string with %d
// (to_string)
let aes_iv_array = <[u8; 4]>::try_from(&self.aes_iv[0..4]).unwrap();
let aes_iv_u32 = u32::from_le_bytes(aes_iv_array);
let aes_iv_string = aes_iv_u32.to_be().to_string();
(aes_key_hex, aes_iv_string)
}
pub fn as_stream_config_params(&self) -> ([i8; 16], [i8; 16]) {
let aes_key_ptr = &self.aes_key as *const [u8; 16];
let aes_key_ptr_i8 = aes_key_ptr as *const [i8; 16];
let aes_key_i8 = unsafe { *aes_key_ptr_i8 };
let aes_iv_ptr = &self.aes_iv as *const [u8; 16];
let aes_iv_ptr_i8 = aes_iv_ptr as *const [i8; 16];
let aes_iv_i8 = unsafe { *aes_iv_ptr_i8 };
(aes_key_i8, aes_iv_i8)
}
}
#[derive(Debug, Clone, ToSchema, Deserialize)]
pub struct Mode {
pub width: i32,
pub height: i32,
pub fps: i32,
}
impl Mode {
pub fn as_url_string(&self) -> String {
format!("{}x{}x{}", self.width, self.height, self.fps)
}
}
#[derive(Debug, Clone, ToSchema, Deserialize)]
pub struct StreamConfig {
pub mode: Mode,
pub bitrate_kbps: i32,
}
#[derive(Debug, Clone)]
pub struct Stream {
pub id: uuid::Uuid,
pub url: url::Url,
pub game_session: u64,
pub server_name: String,
pub input_crypto: InputCrypto,
pub stream_config: StreamConfig,
pub app_version: String,
pub gfe_version: String,
pub server_codec_mode_support: i32,
}
pub struct Server {
pub state: StateFile,
pub streams: RwLock<HashMap<uuid::Uuid, Stream>>,
}
impl Server { impl Server {
pub fn new() -> Result<Self> { pub fn new() -> Result<Self> {
Ok(Server { Ok(Server {
state: StateFile::new()? state: StateFile::new()?,
streams: RwLock::new(HashMap::new()),
}) })
} }
} }
+1 -1
View File
@@ -8,7 +8,7 @@ use anyhow::{Result, anyhow};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::{RwLockReadGuard, RwLockWriteGuard}; use tokio::sync::{RwLockReadGuard, RwLockWriteGuard};
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize, Clone)]
pub struct GamestreamServer { pub struct GamestreamServer {
pub name: String, pub name: String,
pub host: String, pub host: String,
+342
View File
@@ -0,0 +1,342 @@
use std::thread;
use anyhow::{Context, Result};
use moonlight_common_c_sys::SCM_H264;
use salvo::prelude::*;
use serde::{Deserialize, Serialize};
use tokio::{select, sync::oneshot};
use tracing::{debug, error, info};
use crate::{
common::{AppError, AppResult, get_url},
state::{GamestreamServer, StateReadAccess, StateReader},
};
#[derive(Deserialize, ToSchema)]
struct PostStreamStartParams {
server: String,
id: u64,
stream_config: crate::server::StreamConfig,
server_mode: Option<crate::server::Mode>,
}
#[derive(Serialize, ToSchema)]
struct PostStreamStartResponse {
stream_id: uuid::Uuid,
}
#[derive(Deserialize)]
struct ServerInfoResponse {
hostname: String,
appversion: String,
GfeVersion: String,
uniqueid: uuid::Uuid,
HttpsPort: u16,
ExternalPort: u16,
MaxLumaPixelsHEVC: u64,
mac: String,
//ServerCommand: String,
//Permission: u64,
//VirtualDisplayCapable: bool,
//VirtualDisplayDriverReady: bool,
LocalIP: String,
ServerCodecModeSupport: i32,
PairStatus: u64,
currentgame: u64,
state: String,
}
#[derive(Deserialize)]
struct LaunchResponse {
#[serde(rename = "sessionUrl0")]
session_url_0: url::Url,
#[serde(rename = "gamesession")]
game_session: u64,
}
fn get_server(reader: &StateReadAccess, server: &String) -> Result<Option<GamestreamServer>> {
let servers = reader.servers().context("Failed to get servers")?;
Ok(servers.get(server).cloned())
}
#[craft]
impl crate::server::Server {
#[craft(endpoint(status_codes(StatusCode::OK, StatusCode::INTERNAL_SERVER_ERROR)))]
pub async fn post_stream_start(
self: ::std::sync::Arc<Self>,
body: salvo::oapi::extract::JsonBody<PostStreamStartParams>,
) -> AppResult<Json<PostStreamStartResponse>> {
let standard_error = Err(crate::common::AppError {
status_code: StatusCode::INTERNAL_SERVER_ERROR,
description: "Could not start stream".to_string(),
});
let reader = self.state.read().await;
let server = match get_server(&reader, &body.server) {
Ok(s) => match s {
Some(s) => s,
None => {
info!("No server with name: {}", body.server);
return Err(AppError {
status_code: StatusCode::BAD_REQUEST,
description: "No server configured with that name.".to_string(),
});
}
},
Err(e) => {
error!("Could not get server: {e}");
return standard_error;
}
};
let unique_id = match reader.unique_id() {
Ok(u) => u,
Err(e) => {
error!("could not get unique id: {e}");
return standard_error;
}
};
let mut base_url = crate::common::base_url(
"https",
&server.host,
server.https_port(),
&unique_id,
"serverinfo",
None,
);
let server_info = crate::common::get_url(&mut base_url, true).await.unwrap();
debug!("app_info: {server_info}");
let server_info: ServerInfoResponse = match serde_xml_rs::from_str(&server_info) {
Ok(s) => s,
Err(e) => {
error!("Could not parse serverinfo response: {e}");
return standard_error;
}
};
//"%s://%s:%d/serverinfo?uniqueid=%s&uuid=%s",
let input_crypto = match crate::server::InputCrypto::new() {
Ok(i) => i,
Err(e) => {
error!("Could not create input crypto: {e}");
return standard_error;
}
};
let (remote_input_aes_key_string, remote_input_aes_iv_string) =
input_crypto.as_url_params();
let mode = match &body.server_mode {
Some(m) => m.as_url_string(),
None => body.stream_config.mode.as_url_string(),
};
let id_string = body.id.to_string();
let params = vec![
("appid", id_string.as_str()),
("mode", mode.as_str()),
("additionalStates", "1"),
("sops", "1"),
("rikey", remote_input_aes_key_string.as_str()),
("rikeyid", remote_input_aes_iv_string.as_str()),
("localAudioPlayMode", "0"),
("surroundAudioInfo", "196610"),
("remoteControllersBitmap", "0"),
("gcmap", "0"),
("corever", "1"),
];
// moonlight-embedded url params
//https://
// %s: server->serverInfo.address
// %u/ server->httpsPort,
// %s? server->currentGame ? "resume" : "launch",
// &uniqueid=%s unique_id,
// &uuid=%s uuid_str,
// &appid=%d appId
// &mode=%dx%dx%d config->width, config->height, fps
// &additionalStates=1
// &sops=%d sops
// &rikey=%s rikey_hex
// &rikeyid=%d rikeyid
// &localAudioPlayMode=%d localaudio
// &surroundAudioInfo=%d surround_info
// &remoteControllersBitmap=%d gamepad_mask
// &gcmap=%d%s%s" gamepad_mask,
let mut base_url = crate::common::base_url(
"https",
&server.host,
server.https_port(),
&unique_id,
"launch",
Some(params),
);
let resp = match get_url(&mut base_url, true).await {
Ok(r) => r,
Err(e) => {
error!("Cannot start game: {e}");
return standard_error;
}
};
debug!("/launch response: {}", resp.replace("\n", ""));
let launch_response: LaunchResponse = match serde_xml_rs::from_str(&resp) {
Ok(r) => r,
Err(e) => {
error!("Could not parse server launch response: {e}");
return standard_error;
}
};
let stream_id = uuid::Uuid::new_v4();
let server_codec_mode_support = if server_info.ServerCodecModeSupport == 0 {
SCM_H264
} else {
server_info.ServerCodecModeSupport
};
let stream = crate::server::Stream {
id: stream_id,
url: launch_response.session_url_0,
game_session: launch_response.game_session,
server_name: server.name.clone(),
stream_config: body.stream_config.clone(),
app_version: server_info.appversion,
gfe_version: server_info.GfeVersion,
server_codec_mode_support,
input_crypto,
};
info!(
"Launched stream {stream_id} on {} with config {stream:?}",
server.name
);
(*self.streams.write().await).insert(stream.id, stream);
let post_stream_response = PostStreamStartResponse { stream_id };
Ok(Json(post_stream_response))
}
#[craft(endpoint(status_codes(StatusCode::OK, StatusCode::INTERNAL_SERVER_ERROR)))]
pub async fn post_stream_connect(
self: ::std::sync::Arc<Self>,
stream_id: salvo::oapi::extract::PathParam<uuid::Uuid>,
req: &mut Request,
) -> AppResult<()> {
let standard_error = Err(crate::common::AppError {
status_code: StatusCode::INTERNAL_SERVER_ERROR,
description: "Could not start stream".to_string(),
});
let session = match req.web_transport_mut().await {
Ok(w) => w,
Err(e) => {
error!("Could not initalize WebTransport connection: {e}");
return Err(AppError {
status_code: StatusCode::BAD_REQUEST,
description: "User did not connect with a WebTransport connection".to_string(),
});
}
};
let stream_id = stream_id.into_inner();
let stream = match self.streams.read().await.get(&stream_id).cloned() {
Some(s) => s,
None => {
error!("Could not find stream with id {stream_id}");
return Err(AppError {
status_code: StatusCode::BAD_REQUEST,
description: "No stream with given ID".to_string(),
});
}
};
let servers = match self.state.read().await.servers() {
Ok(s) => s,
Err(e) => {
error!("Could not get servers: {e}");
return standard_error;
}
};
let server = match servers.get(&stream.server_name) {
Some(s) => s,
None => {
error!(
"Could not find server {} pointed to by stream {}",
stream.server_name, stream.id
);
return standard_error;
}
};
debug!(
"Connecting to stream on server {} with stream config {:?}",
server.name, stream
);
let (tx, rx) = oneshot::channel();
let (stop_tx, stop_rx) = oneshot::channel::<()>();
let host = server.host.clone();
thread::spawn(move || {
let result = crate::gamestream::start_connection(stream, &host);
let _ = tx.send(result);
let _ = stop_rx.blocking_recv();
crate::gamestream::stop_connection();
});
let mut channels = match rx.await {
Ok(r) => match r {
Ok(r) => r,
Err(e) => {
error!("Could not get gamestream communication channels: {e}");
return standard_error;
}
},
Err(e) => {
error!("Could not start connection: {e}");
return standard_error;
}
};
loop {
select! {
recv = channels.decoder_rx.recv() => {
match recv {
Some(frame) => {
info!("Got decoder packet")
}
None => {
error!("Decoder channel is None");
break;
}
}
}
}
}
//// Handle bidirectional streams
//if let Ok(Some(webtransport::server::AcceptedBi::BidiStream(_, stream))) =
// session.accept_bi().await
//{
// let (send, recv) = salvo::proto::quic::BidiStream::split(stream);
// // Process bidirectional stream data
//}
Ok(())
}
}