5 Commits

Author SHA1 Message Date
restitux b8c705554f backend: add single-use token auth for spawned stream proxies
Generate a random 256-bit token when spawning a proxy process, pass
it as a CLI argument, and return it to the client in the stream start
response. The proxy validates the token on WebTransport connect and
consumes it after first use, preventing replay. A wrong token attempt
also consumes the token for security. Includes 5 unit tests for token
validation logic.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-16 15:36:29 +00:00
restitux 826a3b59c9 backend: gate existing endpoints behind auth and app permissions
Move /api/pair, /api/apps, and /api/stream/start under the session
auth middleware so they require a valid session token. Add app-level
permission filtering: non-admin users only see and can stream apps
they have been explicitly granted access to. Admins bypass all
permission checks.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-16 15:12:22 +00:00
restitux 22f9405229 backend: add user management system with SQLite database
Add authentication and authorization infrastructure:
- SQLite database (db.rs) with users, sessions, and app permissions tables
- Password hashing with argon2
- Session-based auth with random 256-bit tokens
- Auth middleware (session validation) and admin middleware
- Login/logout/me endpoints
- Admin CRUD endpoints for user and permission management
- Auto-seed default admin user on first run
- 23 unit tests covering all DB operations

Existing API endpoints are not yet gated behind auth.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-16 02:34:02 +00:00
restitux e80543144a backend and frontend: support out of order chunks + now it's performant on chrome 2025-08-12 02:20:46 -06:00
restitux 7afd8db8d8 backend and frontend: port frame sending to flatbuffers 2025-08-10 21:02:03 -06:00
33 changed files with 3314 additions and 234 deletions
Generated
+97
View File
@@ -93,6 +93,18 @@ version = "1.0.98"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
[[package]]
name = "argon2"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c3610892ee6e0cbce8ae2700349fcf8f98adb0dbfbee85aec3c9179d29cc072"
dependencies = [
"base64ct",
"blake2",
"cpufeatures",
"password-hash",
]
[[package]] [[package]]
name = "arrayvec" name = "arrayvec"
version = "0.7.6" version = "0.7.6"
@@ -143,6 +155,12 @@ version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
[[package]]
name = "base64ct"
version = "1.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2af50177e190e07a26ab74f8b1efbfe2ef87da2116221318cb1c2e82baf7de06"
[[package]] [[package]]
name = "bindgen" name = "bindgen"
version = "0.72.0" version = "0.72.0"
@@ -181,6 +199,15 @@ dependencies = [
"wyz", "wyz",
] ]
[[package]]
name = "blake2"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe"
dependencies = [
"digest",
]
[[package]] [[package]]
name = "block-buffer" name = "block-buffer"
version = "0.10.4" version = "0.10.4"
@@ -531,6 +558,18 @@ dependencies = [
"xxhash-rust", "xxhash-rust",
] ]
[[package]]
name = "fallible-iterator"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649"
[[package]]
name = "fallible-streaming-iterator"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
[[package]] [[package]]
name = "fastrand" name = "fastrand"
version = "2.3.0" version = "2.3.0"
@@ -553,6 +592,12 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "foldhash"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
[[package]] [[package]]
name = "foreign-types" name = "foreign-types"
version = "0.3.2" version = "0.3.2"
@@ -677,9 +722,12 @@ name = "gamestream-webtransport-proxy"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"argon2",
"directories", "directories",
"flatbuffers", "flatbuffers",
"getrandom 0.3.3", "getrandom 0.3.3",
"h3-datagram",
"h3-quinn",
"hex", "hex",
"hmac-sha256", "hmac-sha256",
"http", "http",
@@ -688,6 +736,7 @@ dependencies = [
"openssl", "openssl",
"rand 0.9.1", "rand 0.9.1",
"reqwest", "reqwest",
"rusqlite",
"salvo", "salvo",
"serde", "serde",
"serde-xml-rs", "serde-xml-rs",
@@ -832,6 +881,18 @@ name = "hashbrown"
version = "0.15.4" version = "0.15.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5" checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5"
dependencies = [
"foldhash",
]
[[package]]
name = "hashlink"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1"
dependencies = [
"hashbrown 0.15.4",
]
[[package]] [[package]]
name = "headers" name = "headers"
@@ -1254,6 +1315,17 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "libsqlite3-sys"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbb8270bb4060bd76c6e96f20c52d80620f1d82a3470885694e41e0f81ef6fe7"
dependencies = [
"cc",
"pkg-config",
"vcpkg",
]
[[package]] [[package]]
name = "linux-raw-sys" name = "linux-raw-sys"
version = "0.9.4" version = "0.9.4"
@@ -1547,6 +1619,17 @@ dependencies = [
"windows-targets 0.52.6", "windows-targets 0.52.6",
] ]
[[package]]
name = "password-hash"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "346f04948ba92c43e8469c1ee6736c7563d71012b17d40745260fe106aac2166"
dependencies = [
"base64ct",
"rand_core 0.6.4",
"subtle",
]
[[package]] [[package]]
name = "path-slash" name = "path-slash"
version = "0.2.1" version = "0.2.1"
@@ -1987,6 +2070,20 @@ dependencies = [
"syn 1.0.109", "syn 1.0.109",
] ]
[[package]]
name = "rusqlite"
version = "0.34.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37e34486da88d8e051c7c0e23c3f15fd806ea8546260aa2fec247e97242ec143"
dependencies = [
"bitflags",
"fallible-iterator",
"fallible-streaming-iterator",
"hashlink",
"libsqlite3-sys",
"smallvec",
]
[[package]] [[package]]
name = "rust-embed" name = "rust-embed"
version = "8.6.0" version = "8.6.0"
+10
View File
@@ -0,0 +1,10 @@
// automatically generated by the FlatBuffers compiler, do not modify
/* eslint-disable @typescript-eslint/no-unused-vars, @typescript-eslint/no-explicit-any, @typescript-eslint/no-non-null-assertion */
export { DecodeUnitBuffer } from './video-update/decode-unit-buffer.js';
export { DecodeUnitStart } from './video-update/decode-unit-start.js';
export { FrameType } from './video-update/frame-type.js';
export { Setup } from './video-update/setup.js';
export { Update } from './video-update/update.js';
export { VideoUpdate } from './video-update/video-update.js';
@@ -0,0 +1,100 @@
// automatically generated by the FlatBuffers compiler, do not modify
/* eslint-disable @typescript-eslint/no-unused-vars, @typescript-eslint/no-explicit-any, @typescript-eslint/no-non-null-assertion */
import * as flatbuffers from 'flatbuffers';
export class DecodeUnitBuffer {
bb: flatbuffers.ByteBuffer|null = null;
bb_pos = 0;
__init(i:number, bb:flatbuffers.ByteBuffer):DecodeUnitBuffer {
this.bb_pos = i;
this.bb = bb;
return this;
}
static getRootAsDecodeUnitBuffer(bb:flatbuffers.ByteBuffer, obj?:DecodeUnitBuffer):DecodeUnitBuffer {
return (obj || new DecodeUnitBuffer()).__init(bb.readInt32(bb.position()) + bb.position(), bb);
}
static getSizePrefixedRootAsDecodeUnitBuffer(bb:flatbuffers.ByteBuffer, obj?:DecodeUnitBuffer):DecodeUnitBuffer {
bb.setPosition(bb.position() + flatbuffers.SIZE_PREFIX_LENGTH);
return (obj || new DecodeUnitBuffer()).__init(bb.readInt32(bb.position()) + bb.position(), bb);
}
frameNumber():bigint {
const offset = this.bb!.__offset(this.bb_pos, 4);
return offset ? this.bb!.readUint64(this.bb_pos + offset) : BigInt('0');
}
bufferIndex():bigint {
const offset = this.bb!.__offset(this.bb_pos, 6);
return offset ? this.bb!.readUint64(this.bb_pos + offset) : BigInt('0');
}
bufferOffset():bigint {
const offset = this.bb!.__offset(this.bb_pos, 8);
return offset ? this.bb!.readUint64(this.bb_pos + offset) : BigInt('0');
}
data(index: number):number|null {
const offset = this.bb!.__offset(this.bb_pos, 10);
return offset ? this.bb!.readUint8(this.bb!.__vector(this.bb_pos + offset) + index) : 0;
}
dataLength():number {
const offset = this.bb!.__offset(this.bb_pos, 10);
return offset ? this.bb!.__vector_len(this.bb_pos + offset) : 0;
}
dataArray():Uint8Array|null {
const offset = this.bb!.__offset(this.bb_pos, 10);
return offset ? new Uint8Array(this.bb!.bytes().buffer, this.bb!.bytes().byteOffset + this.bb!.__vector(this.bb_pos + offset), this.bb!.__vector_len(this.bb_pos + offset)) : null;
}
static startDecodeUnitBuffer(builder:flatbuffers.Builder) {
builder.startObject(4);
}
static addFrameNumber(builder:flatbuffers.Builder, frameNumber:bigint) {
builder.addFieldInt64(0, frameNumber, BigInt('0'));
}
static addBufferIndex(builder:flatbuffers.Builder, bufferIndex:bigint) {
builder.addFieldInt64(1, bufferIndex, BigInt('0'));
}
static addBufferOffset(builder:flatbuffers.Builder, bufferOffset:bigint) {
builder.addFieldInt64(2, bufferOffset, BigInt('0'));
}
static addData(builder:flatbuffers.Builder, dataOffset:flatbuffers.Offset) {
builder.addFieldOffset(3, dataOffset, 0);
}
static createDataVector(builder:flatbuffers.Builder, data:number[]|Uint8Array):flatbuffers.Offset {
builder.startVector(1, data.length, 1);
for (let i = data.length - 1; i >= 0; i--) {
builder.addInt8(data[i]!);
}
return builder.endVector();
}
static startDataVector(builder:flatbuffers.Builder, numElems:number) {
builder.startVector(1, numElems, 1);
}
static endDecodeUnitBuffer(builder:flatbuffers.Builder):flatbuffers.Offset {
const offset = builder.endObject();
return offset;
}
static createDecodeUnitBuffer(builder:flatbuffers.Builder, frameNumber:bigint, bufferIndex:bigint, bufferOffset:bigint, dataOffset:flatbuffers.Offset):flatbuffers.Offset {
DecodeUnitBuffer.startDecodeUnitBuffer(builder);
DecodeUnitBuffer.addFrameNumber(builder, frameNumber);
DecodeUnitBuffer.addBufferIndex(builder, bufferIndex);
DecodeUnitBuffer.addBufferOffset(builder, bufferOffset);
DecodeUnitBuffer.addData(builder, dataOffset);
return DecodeUnitBuffer.endDecodeUnitBuffer(builder);
}
}
@@ -0,0 +1,91 @@
// automatically generated by the FlatBuffers compiler, do not modify
/* eslint-disable @typescript-eslint/no-unused-vars, @typescript-eslint/no-explicit-any, @typescript-eslint/no-non-null-assertion */
import * as flatbuffers from 'flatbuffers';
import { FrameType } from '../video-update/frame-type.js';
export class DecodeUnitStart {
bb: flatbuffers.ByteBuffer|null = null;
bb_pos = 0;
__init(i:number, bb:flatbuffers.ByteBuffer):DecodeUnitStart {
this.bb_pos = i;
this.bb = bb;
return this;
}
static getRootAsDecodeUnitStart(bb:flatbuffers.ByteBuffer, obj?:DecodeUnitStart):DecodeUnitStart {
return (obj || new DecodeUnitStart()).__init(bb.readInt32(bb.position()) + bb.position(), bb);
}
static getSizePrefixedRootAsDecodeUnitStart(bb:flatbuffers.ByteBuffer, obj?:DecodeUnitStart):DecodeUnitStart {
bb.setPosition(bb.position() + flatbuffers.SIZE_PREFIX_LENGTH);
return (obj || new DecodeUnitStart()).__init(bb.readInt32(bb.position()) + bb.position(), bb);
}
frameNumber():bigint {
const offset = this.bb!.__offset(this.bb_pos, 4);
return offset ? this.bb!.readUint64(this.bb_pos + offset) : BigInt('0');
}
frameType():FrameType {
const offset = this.bb!.__offset(this.bb_pos, 6);
return offset ? this.bb!.readInt8(this.bb_pos + offset) : FrameType.PFRAME;
}
numBuffers():bigint {
const offset = this.bb!.__offset(this.bb_pos, 8);
return offset ? this.bb!.readUint64(this.bb_pos + offset) : BigInt('0');
}
receiveTimeMs():number {
const offset = this.bb!.__offset(this.bb_pos, 10);
return offset ? this.bb!.readUint16(this.bb_pos + offset) : 0;
}
fullLength():bigint {
const offset = this.bb!.__offset(this.bb_pos, 12);
return offset ? this.bb!.readUint64(this.bb_pos + offset) : BigInt('0');
}
static startDecodeUnitStart(builder:flatbuffers.Builder) {
builder.startObject(5);
}
static addFrameNumber(builder:flatbuffers.Builder, frameNumber:bigint) {
builder.addFieldInt64(0, frameNumber, BigInt('0'));
}
static addFrameType(builder:flatbuffers.Builder, frameType:FrameType) {
builder.addFieldInt8(1, frameType, FrameType.PFRAME);
}
static addNumBuffers(builder:flatbuffers.Builder, numBuffers:bigint) {
builder.addFieldInt64(2, numBuffers, BigInt('0'));
}
static addReceiveTimeMs(builder:flatbuffers.Builder, receiveTimeMs:number) {
builder.addFieldInt16(3, receiveTimeMs, 0);
}
static addFullLength(builder:flatbuffers.Builder, fullLength:bigint) {
builder.addFieldInt64(4, fullLength, BigInt('0'));
}
static endDecodeUnitStart(builder:flatbuffers.Builder):flatbuffers.Offset {
const offset = builder.endObject();
return offset;
}
static createDecodeUnitStart(builder:flatbuffers.Builder, frameNumber:bigint, frameType:FrameType, numBuffers:bigint, receiveTimeMs:number, fullLength:bigint):flatbuffers.Offset {
DecodeUnitStart.startDecodeUnitStart(builder);
DecodeUnitStart.addFrameNumber(builder, frameNumber);
DecodeUnitStart.addFrameType(builder, frameType);
DecodeUnitStart.addNumBuffers(builder, numBuffers);
DecodeUnitStart.addReceiveTimeMs(builder, receiveTimeMs);
DecodeUnitStart.addFullLength(builder, fullLength);
return DecodeUnitStart.endDecodeUnitStart(builder);
}
}
@@ -0,0 +1,103 @@
// automatically generated by the FlatBuffers compiler, do not modify
/* eslint-disable @typescript-eslint/no-unused-vars, @typescript-eslint/no-explicit-any, @typescript-eslint/no-non-null-assertion */
import * as flatbuffers from 'flatbuffers';
import { FrameType } from '../video-update/frame-type.js';
export class DecodeUnit {
bb: flatbuffers.ByteBuffer|null = null;
bb_pos = 0;
__init(i:number, bb:flatbuffers.ByteBuffer):DecodeUnit {
this.bb_pos = i;
this.bb = bb;
return this;
}
static getRootAsDecodeUnit(bb:flatbuffers.ByteBuffer, obj?:DecodeUnit):DecodeUnit {
return (obj || new DecodeUnit()).__init(bb.readInt32(bb.position()) + bb.position(), bb);
}
static getSizePrefixedRootAsDecodeUnit(bb:flatbuffers.ByteBuffer, obj?:DecodeUnit):DecodeUnit {
bb.setPosition(bb.position() + flatbuffers.SIZE_PREFIX_LENGTH);
return (obj || new DecodeUnit()).__init(bb.readInt32(bb.position()) + bb.position(), bb);
}
frameNumber():number {
const offset = this.bb!.__offset(this.bb_pos, 4);
return offset ? this.bb!.readUint16(this.bb_pos + offset) : 0;
}
frameType():FrameType {
const offset = this.bb!.__offset(this.bb_pos, 6);
return offset ? this.bb!.readInt8(this.bb_pos + offset) : FrameType.PFRAME;
}
receiveTimeMs():number {
const offset = this.bb!.__offset(this.bb_pos, 8);
return offset ? this.bb!.readUint16(this.bb_pos + offset) : 0;
}
data(index: number):number|null {
const offset = this.bb!.__offset(this.bb_pos, 10);
return offset ? this.bb!.readUint8(this.bb!.__vector(this.bb_pos + offset) + index) : 0;
}
dataLength():number {
const offset = this.bb!.__offset(this.bb_pos, 10);
return offset ? this.bb!.__vector_len(this.bb_pos + offset) : 0;
}
dataArray():Uint8Array|null {
const offset = this.bb!.__offset(this.bb_pos, 10);
return offset ? new Uint8Array(this.bb!.bytes().buffer, this.bb!.bytes().byteOffset + this.bb!.__vector(this.bb_pos + offset), this.bb!.__vector_len(this.bb_pos + offset)) : null;
}
static startDecodeUnit(builder:flatbuffers.Builder) {
builder.startObject(4);
}
static addFrameNumber(builder:flatbuffers.Builder, frameNumber:number) {
builder.addFieldInt16(0, frameNumber, 0);
}
static addFrameType(builder:flatbuffers.Builder, frameType:FrameType) {
builder.addFieldInt8(1, frameType, FrameType.PFRAME);
}
static addReceiveTimeMs(builder:flatbuffers.Builder, receiveTimeMs:number) {
builder.addFieldInt16(2, receiveTimeMs, 0);
}
static addData(builder:flatbuffers.Builder, dataOffset:flatbuffers.Offset) {
builder.addFieldOffset(3, dataOffset, 0);
}
static createDataVector(builder:flatbuffers.Builder, data:number[]|Uint8Array):flatbuffers.Offset {
builder.startVector(1, data.length, 1);
for (let i = data.length - 1; i >= 0; i--) {
builder.addInt8(data[i]!);
}
return builder.endVector();
}
static startDataVector(builder:flatbuffers.Builder, numElems:number) {
builder.startVector(1, numElems, 1);
}
static endDecodeUnit(builder:flatbuffers.Builder):flatbuffers.Offset {
const offset = builder.endObject();
return offset;
}
static createDecodeUnit(builder:flatbuffers.Builder, frameNumber:number, frameType:FrameType, receiveTimeMs:number, dataOffset:flatbuffers.Offset):flatbuffers.Offset {
DecodeUnit.startDecodeUnit(builder);
DecodeUnit.addFrameNumber(builder, frameNumber);
DecodeUnit.addFrameType(builder, frameType);
DecodeUnit.addReceiveTimeMs(builder, receiveTimeMs);
DecodeUnit.addData(builder, dataOffset);
return DecodeUnit.endDecodeUnit(builder);
}
}
@@ -0,0 +1,8 @@
// automatically generated by the FlatBuffers compiler, do not modify
/* eslint-disable @typescript-eslint/no-unused-vars, @typescript-eslint/no-explicit-any, @typescript-eslint/no-non-null-assertion */
export enum FrameType {
PFRAME = 0,
IDR = 1
}
@@ -0,0 +1,80 @@
// automatically generated by the FlatBuffers compiler, do not modify
/* eslint-disable @typescript-eslint/no-unused-vars, @typescript-eslint/no-explicit-any, @typescript-eslint/no-non-null-assertion */
import * as flatbuffers from 'flatbuffers';
export class Setup {
bb: flatbuffers.ByteBuffer|null = null;
bb_pos = 0;
__init(i:number, bb:flatbuffers.ByteBuffer):Setup {
this.bb_pos = i;
this.bb = bb;
return this;
}
static getRootAsSetup(bb:flatbuffers.ByteBuffer, obj?:Setup):Setup {
return (obj || new Setup()).__init(bb.readInt32(bb.position()) + bb.position(), bb);
}
static getSizePrefixedRootAsSetup(bb:flatbuffers.ByteBuffer, obj?:Setup):Setup {
bb.setPosition(bb.position() + flatbuffers.SIZE_PREFIX_LENGTH);
return (obj || new Setup()).__init(bb.readInt32(bb.position()) + bb.position(), bb);
}
videoFormat():string|null
videoFormat(optionalEncoding:flatbuffers.Encoding):string|Uint8Array|null
videoFormat(optionalEncoding?:any):string|Uint8Array|null {
const offset = this.bb!.__offset(this.bb_pos, 4);
return offset ? this.bb!.__string(this.bb_pos + offset, optionalEncoding) : null;
}
width():number {
const offset = this.bb!.__offset(this.bb_pos, 6);
return offset ? this.bb!.readUint16(this.bb_pos + offset) : 0;
}
height():number {
const offset = this.bb!.__offset(this.bb_pos, 8);
return offset ? this.bb!.readUint16(this.bb_pos + offset) : 0;
}
redrawRate():number {
const offset = this.bb!.__offset(this.bb_pos, 10);
return offset ? this.bb!.readUint16(this.bb_pos + offset) : 0;
}
static startSetup(builder:flatbuffers.Builder) {
builder.startObject(4);
}
static addVideoFormat(builder:flatbuffers.Builder, videoFormatOffset:flatbuffers.Offset) {
builder.addFieldOffset(0, videoFormatOffset, 0);
}
static addWidth(builder:flatbuffers.Builder, width:number) {
builder.addFieldInt16(1, width, 0);
}
static addHeight(builder:flatbuffers.Builder, height:number) {
builder.addFieldInt16(2, height, 0);
}
static addRedrawRate(builder:flatbuffers.Builder, redrawRate:number) {
builder.addFieldInt16(3, redrawRate, 0);
}
static endSetup(builder:flatbuffers.Builder):flatbuffers.Offset {
const offset = builder.endObject();
return offset;
}
static createSetup(builder:flatbuffers.Builder, videoFormatOffset:flatbuffers.Offset, width:number, height:number, redrawRate:number):flatbuffers.Offset {
Setup.startSetup(builder);
Setup.addVideoFormat(builder, videoFormatOffset);
Setup.addWidth(builder, width);
Setup.addHeight(builder, height);
Setup.addRedrawRate(builder, redrawRate);
return Setup.endSetup(builder);
}
}
@@ -0,0 +1,42 @@
// automatically generated by the FlatBuffers compiler, do not modify
/* eslint-disable @typescript-eslint/no-unused-vars, @typescript-eslint/no-explicit-any, @typescript-eslint/no-non-null-assertion */
import { DecodeUnitBuffer } from '../video-update/decode-unit-buffer.js';
import { DecodeUnitStart } from '../video-update/decode-unit-start.js';
import { Setup } from '../video-update/setup.js';
export enum Update {
NONE = 0,
Setup = 1,
DecodeUnitStart = 2,
DecodeUnitBuffer = 3
}
export function unionToUpdate(
type: Update,
accessor: (obj:DecodeUnitBuffer|DecodeUnitStart|Setup) => DecodeUnitBuffer|DecodeUnitStart|Setup|null
): DecodeUnitBuffer|DecodeUnitStart|Setup|null {
switch(Update[type]) {
case 'NONE': return null;
case 'Setup': return accessor(new Setup())! as Setup;
case 'DecodeUnitStart': return accessor(new DecodeUnitStart())! as DecodeUnitStart;
case 'DecodeUnitBuffer': return accessor(new DecodeUnitBuffer())! as DecodeUnitBuffer;
default: return null;
}
}
export function unionListToUpdate(
type: Update,
accessor: (index: number, obj:DecodeUnitBuffer|DecodeUnitStart|Setup) => DecodeUnitBuffer|DecodeUnitStart|Setup|null,
index: number
): DecodeUnitBuffer|DecodeUnitStart|Setup|null {
switch(Update[type]) {
case 'NONE': return null;
case 'Setup': return accessor(index, new Setup())! as Setup;
case 'DecodeUnitStart': return accessor(index, new DecodeUnitStart())! as DecodeUnitStart;
case 'DecodeUnitBuffer': return accessor(index, new DecodeUnitBuffer())! as DecodeUnitBuffer;
default: return null;
}
}
@@ -0,0 +1,69 @@
// automatically generated by the FlatBuffers compiler, do not modify
/* eslint-disable @typescript-eslint/no-unused-vars, @typescript-eslint/no-explicit-any, @typescript-eslint/no-non-null-assertion */
import * as flatbuffers from 'flatbuffers';
import { Update, unionToUpdate, unionListToUpdate } from '../video-update/update.js';
export class VideoUpdate {
bb: flatbuffers.ByteBuffer|null = null;
bb_pos = 0;
__init(i:number, bb:flatbuffers.ByteBuffer):VideoUpdate {
this.bb_pos = i;
this.bb = bb;
return this;
}
static getRootAsVideoUpdate(bb:flatbuffers.ByteBuffer, obj?:VideoUpdate):VideoUpdate {
return (obj || new VideoUpdate()).__init(bb.readInt32(bb.position()) + bb.position(), bb);
}
static getSizePrefixedRootAsVideoUpdate(bb:flatbuffers.ByteBuffer, obj?:VideoUpdate):VideoUpdate {
bb.setPosition(bb.position() + flatbuffers.SIZE_PREFIX_LENGTH);
return (obj || new VideoUpdate()).__init(bb.readInt32(bb.position()) + bb.position(), bb);
}
updateType():Update {
const offset = this.bb!.__offset(this.bb_pos, 4);
return offset ? this.bb!.readUint8(this.bb_pos + offset) : Update.NONE;
}
update<T extends flatbuffers.Table>(obj:any):any|null {
const offset = this.bb!.__offset(this.bb_pos, 6);
return offset ? this.bb!.__union(obj, this.bb_pos + offset) : null;
}
static startVideoUpdate(builder:flatbuffers.Builder) {
builder.startObject(2);
}
static addUpdateType(builder:flatbuffers.Builder, updateType:Update) {
builder.addFieldInt8(0, updateType, Update.NONE);
}
static addUpdate(builder:flatbuffers.Builder, updateOffset:flatbuffers.Offset) {
builder.addFieldOffset(1, updateOffset, 0);
}
static endVideoUpdate(builder:flatbuffers.Builder):flatbuffers.Offset {
const offset = builder.endObject();
return offset;
}
static finishVideoUpdateBuffer(builder:flatbuffers.Builder, offset:flatbuffers.Offset) {
builder.finish(offset);
}
static finishSizePrefixedVideoUpdateBuffer(builder:flatbuffers.Builder, offset:flatbuffers.Offset) {
builder.finish(offset, undefined, true);
}
static createVideoUpdate(builder:flatbuffers.Builder, updateType:Update, updateOffset:flatbuffers.Offset):flatbuffers.Offset {
VideoUpdate.startVideoUpdate(builder);
VideoUpdate.addUpdateType(builder, updateType);
VideoUpdate.addUpdate(builder, updateOffset);
return VideoUpdate.endVideoUpdate(builder);
}
}
+5
View File
@@ -0,0 +1,5 @@
// automatically generated by the FlatBuffers compiler, do not modify
/* eslint-disable @typescript-eslint/no-unused-vars, @typescript-eslint/no-explicit-any, @typescript-eslint/no-non-null-assertion */
export * as VideoUpdate from './video-update.js';
+146 -105
View File
@@ -1,143 +1,184 @@
type Setup = { import { VideoUpdate } from "$lib/proto/video";
video_format: string, import { ByteBuffer } from "flatbuffers";
width: number, import { DecodeUnitBuffer } from "./proto/video-update";
height: number, import { DoorClosed, Video } from "lucide-svelte";
redraw_rate: number,
dr_flags: number,
}
type SetupPacket = { function getVideoDecoder(canvasElement: OffscreenCanvas): VideoDecoder {
Setup: Setup
}
type DecodeBuffer = {
buffer_bype: string,
data: Array<number>,
}
type DecodeUnit = {
frame_number: number,
frame_type: string,
buffer: DecodeBuffer,
receieve_time_ms: number,
}
type DecodeUnitPacket = {
DecodeUnit: DecodeUnit
}
function parseData(newBuffer: Uint8Array, oldBuffer: Uint8Array): [Array<Object>, Uint8Array<ArrayBuffer>] {
let packets = new Array<Object>();
let unparsedData = new Uint8Array();
let data = new Uint8Array([...oldBuffer, ...newBuffer]);
let index = 0;
while (true) {
if (index >= data.length) {
break
}
const view = new DataView(data.buffer.slice(index, index + 4));
const dataLength = view.getUint32(0, true);
const slice_start_index = index + 4;
const slice_end_index = index + 4 + dataLength;
if (data.length < slice_end_index) {
unparsedData = new Uint8Array(data.buffer.slice(index, data.length));
break;
}
const dataToParse = data.buffer.slice(slice_start_index, slice_end_index);
const decoder = new TextDecoder('utf-8');
const jsonString = decoder.decode(dataToParse);
packets.push(JSON.parse(jsonString));
index += 4 + dataLength;
}
return [packets, unparsedData];
}
export async function streamVideoFromReader(reader: ReadableStreamDefaultReader, canvasElement: OffscreenCanvas) {
const canvasCtx: OffscreenCanvasRenderingContext2D | null = canvasElement.getContext('2d'); const canvasCtx: OffscreenCanvasRenderingContext2D | null = canvasElement.getContext('2d');
if (canvasCtx == null) { if (canvasCtx == null) {
throw new Error(`Could not get 2d canvas context`); throw new Error(`Could not get 2d canvas context`);
} }
try {
let unparsedData = new Uint8Array();
const videoDecoder = new VideoDecoder({ const videoDecoder = new VideoDecoder({
output: (frame) => { output: (frame) => {
// Set canvas dimensions to match the frame //console.log(`rendering frame start: ${performance.now()}`);
canvasElement.width = frame.displayWidth; //canvasElement.width = frame.displayWidth;
canvasElement.height = frame.displayHeight; //canvasElement.height = frame.displayHeight;
// Draw the decoded frame to canvas //console.log(`rendering frame drawImage: ${performance.now()}`);
canvasCtx.drawImage(frame, 0, 0); canvasCtx.drawImage(frame, 0, 0);
// Important: close the frame to free memory //console.log(`rendering frame end: ${performance.now()}`);
frame.close(); frame.close();
//console.log(`rendering frame close: ${performance.now()}`);
}, },
error: (e) => { error: (e) => {
console.error('Decode error:', e); console.error('Decode error:', e);
} }
}); });
return videoDecoder;
while (true) {
const { value, done } = await reader.read();
if (done) break;
let [packets, remainingData] = parseData(value, unparsedData);
unparsedData = remainingData;
for (let i = 0; i < packets.length; i++) {
if (Object.hasOwn(packets[i], "Setup")) {
let packet = packets[i] as SetupPacket;
let config: VideoDecoderConfig | undefined = undefined;
if (packet.Setup.video_format == "H264") {
config = {
//codec: 'avc1.42E01E', // H.264 codec
codec: 'avc1.4D002A', // H.264 codec
codedWidth: packet.Setup.width,
codedHeight: packet.Setup.height,
};
} else {
throw new Error(`Unsupported video codec ${packet.Setup.video_format}`);
} }
async function configureDecoder(videoDecoder: VideoDecoder, videoFormat: string, width: number, height: number) {
let config: VideoDecoderConfig = {
codec: videoFormat,
codedWidth: width,
codedHeight: height,
optimizeForLatency: true,
//hardwareAcceleration: "prefer-hardware",
};
const codecSupport = await VideoDecoder.isConfigSupported(config); const codecSupport = await VideoDecoder.isConfigSupported(config);
console.log(codecSupport);
if (codecSupport.supported) { if (codecSupport.supported) {
videoDecoder.configure(config); videoDecoder.configure(config);
} else { } else {
throw new Error(`Could not configure decoder`); throw new Error(`Could not configure decoder`);
} }
} else if (Object.hasOwn(packets[i], "DecodeUnit")) {
let packet = packets[i] as DecodeUnitPacket;
let frame_type: EncodedAudioChunkType = "delta";
if (packet.DecodeUnit.frame_type == "IDR") {
frame_type = "key";
} }
class Decoder {
videoDecoder: VideoDecoder;
frameNumber: bigint | undefined;
frameType: EncodedAudioChunkType = "delta";
fullLength: bigint = 0n;
receiveTimeMs: number = 0;
//frameBroken: boolean = false;
//lastBufferIndex: bigint = 0n;
frameCollector: boolean[] = new Array<boolean>();
dataOffset: number = 0;
data: Uint8Array = new Uint8Array();
constructor(videoDecoder: VideoDecoder) {
this.videoDecoder = videoDecoder;
}
print() {
console.log(
`
frameNumber: ${this.frameNumber}
frameType: ${this.frameType}
fullLength: ${this.fullLength}
receiveTimeMs: ${this.receiveTimeMs}
frameCollector: ${this.frameCollector}
ts: ${performance.now()}
`
);
}
processStart(decodeUnitStart: VideoUpdate.DecodeUnitStart) {
//this.print();
const frameCompleted = !this.frameCollector.includes(false);
if (!frameCompleted) {
console.log(`Got setup packet for frame ${decodeUnitStart.frameNumber()} but the last frame has not been completed`);
}
this.frameNumber = decodeUnitStart.frameNumber();
this.frameType = "delta";
if (decodeUnitStart.frameType() == VideoUpdate.FrameType.IDR) {
this.frameType = "key";
}
this.fullLength = decodeUnitStart.fullLength();
this.receiveTimeMs = decodeUnitStart.receiveTimeMs();
//this.frameBroken = false;
//this.lastBufferIndex = -1n;
//this.dataOffset = 0;
this.frameCollector = new Array(Number(decodeUnitStart.numBuffers())).fill(false);
this.data = new Uint8Array(Number(this.fullLength));
//this.print();
//console.log(`start: `, this);
//console.log(performance.now());
}
processBuffer(decodeUnitBuffer: VideoUpdate.DecodeUnitBuffer) {
//console.log(`buffer: `, this);
//console.log(performance.now());
//this.print();
const frameNumber = decodeUnitBuffer.frameNumber();
if (this.frameNumber === undefined) {
console.log("frameNumber is undefined but we got a buffer, ignoring...");
return;
}
if (this.frameNumber != frameNumber) {
console.log(`Got buffer for frame ${frameNumber} but we are processing frame ${this.frameNumber}, ignoring...`);
return;
}
let offset = decodeUnitBuffer.bufferOffset();
for (var i = 0; i < decodeUnitBuffer.dataLength(); i++) {
this.data[Number(offset) + i] = decodeUnitBuffer.data(i)!;
}
this.frameCollector[Number(decodeUnitBuffer.bufferIndex())] = true;
const gotAllframes = !this.frameCollector.includes(false);
if (gotAllframes) {
const chunk = new EncodedVideoChunk({ const chunk = new EncodedVideoChunk({
timestamp: packet.DecodeUnit.receieve_time_ms, //timestamp: this.receiveTimeMs,
type: frame_type, timestamp: 0,
data: new Uint8Array(packet.DecodeUnit.buffer.data), type: this.frameType,
data: this.data,
}); });
videoDecoder.decode(chunk); //console.log(`${ performance.now() }: Enqueing a new decode request, current queue size ${ this.videoDecoder.decodeQueueSize } `);
this.videoDecoder.decode(chunk);
}
//this.print();
}
}
export async function streamVideoFromReader(reader: ReadableStreamDefaultReader, canvasElement: OffscreenCanvas) {
const videoDecoder = getVideoDecoder(canvasElement);
try {
let decodeUnitBuffer: VideoUpdate.DecodeUnitBuffer = new VideoUpdate.DecodeUnitBuffer();
let decoder = new Decoder(videoDecoder);
while (true) {
const { value, done } = await reader.read();
if (done) break;
const dataToParse = new ByteBuffer(value);
const videoUpdate = VideoUpdate.VideoUpdate.getRootAsVideoUpdate(dataToParse);
if (videoUpdate.updateType() == VideoUpdate.Update.Setup) {
let setup = videoUpdate.update(new VideoUpdate.Setup());
await configureDecoder(videoDecoder, setup.videoFormat(), setup.width(), setup.height());
} else if (videoUpdate.updateType() == VideoUpdate.Update.DecodeUnitStart) {
let decodeUnitStart: VideoUpdate.DecodeUnitStart = new VideoUpdate.DecodeUnitStart();
videoUpdate.update(decodeUnitStart);
decoder.processStart(decodeUnitStart);
} else if (videoUpdate.updateType() == VideoUpdate.Update.DecodeUnitBuffer) {
videoUpdate.update(decodeUnitBuffer);
decoder.processBuffer(decodeUnitBuffer);
} else { } else {
throw new Error(`Got packet of unknown type`); throw new Error(`Got packet of unknown type`);
} }
} }
}
} catch (e) { } catch (e) {
var error = <Error>e; var error = <Error>e;
console.error('Error connecting to stream:', error); console.error('Error connecting to stream:', error);
+2
View File
@@ -23,6 +23,8 @@
let streamData = await getStreamData(app.id, server_name); let streamData = await getStreamData(app.id, server_name);
streamStore.Url = streamData.Url; streamStore.Url = streamData.Url;
streamStore.CertHash = streamData.CertHash; streamStore.CertHash = streamData.CertHash;
streamStore.Width = streamData.Width;
streamStore.Height = streamData.Height;
console.log(`Stream data retrieved. Navigating to /stream.`); console.log(`Stream data retrieved. Navigating to /stream.`);
await goto('/stream'); await goto('/stream');
+11 -32
View File
@@ -1,51 +1,30 @@
//Setup {
// video_format: VideoFormat,
// width: u64,
// height: u64,
// redraw_rate: u64,
// dr_flags: i32,
//},
//DecodeUnit {
// frame_number: u64,
// frame_type: FrameType,
// host_processing_latency: u16,
// receieve_time_ms: u64,
// enqueue_time_ms: u64,
// presentation_time: u64,
// full_length: usize,
// //buffers: Vec<Buffer>,
// buffer: Buffer,
// index: u64,
// hdr_active: bool,
// colorspace: u8,
//},
type StreamData = { type StreamData = {
Url: string, Url: string,
CertHash: Array<number>, CertHash: Array<number>,
Width: number,
Height: number,
} }
export async function getStreamData(appId: number, server_name: string): Promise<StreamData> { export async function getStreamData(appId: number, server_name: string): Promise<StreamData> {
try { try {
// Create the POST request payload // Create the POST request payload
const width = 1920;
const height = 1080;
const payload = { const payload = {
id: appId, id: appId,
server: server_name, server: server_name,
server_mode: { server_mode: {
fps: 60, fps: 60,
width: 1920, width: width,
height: 1080, height: height,
}, },
stream_config: { stream_config: {
bitrate_kbps: 1024 * 10, bitrate_kbps: 1024 * 10 * 5,
mode: { mode: {
fps: 60, fps: 60,
width: 1920, width: width,
height: 1080, height: height,
} }
} }
}; };
@@ -66,7 +45,7 @@ export async function getStreamData(appId: number, server_name: string): Promise
const streamDataResp = await response.json(); const streamDataResp = await response.json();
console.log('Stream started:', streamDataResp); console.log('Stream started:', streamDataResp);
let streamData: StreamData = { Url: streamDataResp.url, CertHash: streamDataResp.cert_hash }; let streamData: StreamData = { Url: streamDataResp.url, CertHash: streamDataResp.cert_hash, Width: width, Height: height };
return streamData; return streamData;
@@ -1,4 +1,6 @@
export const streamStore = $state({ export const streamStore = $state({
Url: '', Url: '',
CertHash: [0], CertHash: [0],
Width: 0,
Height: 0,
}); });
+3 -1
View File
@@ -4,6 +4,8 @@
$: url = streamStore.Url; $: url = streamStore.Url;
$: certHash = streamStore.CertHash; $: certHash = streamStore.CertHash;
$: width = streamStore.Width;
$: height = streamStore.Height;
</script> </script>
<svelte:head> <svelte:head>
@@ -13,7 +15,7 @@
<!--<section> <!--<section>
</section>--> </section>-->
<Stream {url} {certHash} /> <Stream {url} {certHash} {width} {height} />
<style> <style>
section { section {
+12 -2
View File
@@ -6,16 +6,26 @@
interface Props { interface Props {
url: string; url: string;
certHash: Array<number>; certHash: Array<number>;
width: number;
height: number;
} }
let { url, certHash }: Props = $props(); let { url, certHash, width, height }: Props = $props();
let loading = $state(true); let loading = $state(true);
let fullscreen = $state(false); let fullscreen = $state(false);
let gameplayView: HTMLDivElement; let gameplayView: HTMLDivElement;
let gameplayCanvas: HTMLCanvasElement; let gameplayCanvas: HTMLCanvasElement;
async function startStream() { async function startStream() {
await startWebtransportStream(url, certHash, gameplayCanvas, gameplayCanvas, gameplayCanvas); await startWebtransportStream(
url,
certHash,
width,
height,
gameplayCanvas,
gameplayCanvas,
gameplayCanvas
);
} }
async function requestFullscreen() { async function requestFullscreen() {
+15 -6
View File
@@ -4,12 +4,12 @@ import CanvasWorker from "$lib/canvas.worker?worker";
export async function getStreamTransport(url: string, certHash: Array<number>): Promise<WebTransport> { export async function getStreamTransport(url: string, certHash: Array<number>): Promise<WebTransport> {
let certHashArray = new Uint8Array(certHash); let certHashArray = new Uint8Array(certHash);
// Check if WebTransport is supported
if (!window.WebTransport) { if (!window.WebTransport) {
throw new Error('WebTransport is not supported in this browser'); throw new Error('WebTransport is not supported in this browser');
} }
const transport = new WebTransport(url, { const transport = new WebTransport(url, {
//congestionControl: "low-latency",
serverCertificateHashes: [ serverCertificateHashes: [
{ {
algorithm: "sha-256", algorithm: "sha-256",
@@ -19,9 +19,8 @@ export async function getStreamTransport(url: string, certHash: Array<number>):
}); });
console.log('Connecting to WebTransport at ', url); console.log('Connecting to WebTransport at ', url);
// Wait for the connection to be ready
await transport.ready; await transport.ready;
console.log('WebTransport connection established'); console.log(`WebTransport connection established`);
return transport; return transport;
} }
@@ -43,19 +42,29 @@ export async function spawnWorker(gameplayCanvas: HTMLCanvasElement, reader: Rea
export async function startWebtransportStream( export async function startWebtransportStream(
url: string, url: string,
certHash: Array<number>, certHash: Array<number>,
width: number,
height: number,
gameplayCanvas: HTMLCanvasElement, gameplayCanvas: HTMLCanvasElement,
keyEventElement: HTMLElement, keyEventElement: HTMLElement,
mouseElement: HTMLElement, mouseElement: HTMLElement,
) { ) {
console.log(width, height);
gameplayCanvas.width = width;
gameplayCanvas.height = height;
console.log(`Connecting to stream at ${url} with cert_hash ${certHash}`); console.log(`Connecting to stream at ${url} with cert_hash ${certHash}`);
const transport = await getStreamTransport(url, certHash); const transport = await getStreamTransport(url, certHash);
const datagrams = transport.datagrams;
datagrams.incomingHighWaterMark = 20000;
const stream = await transport.createBidirectionalStream(); const stream = await transport.createBidirectionalStream();
const reader = stream.readable //const reader = stream.readable
const datagramReader = datagrams.readable
const writer = stream.writable.getWriter(); const writer = stream.writable.getWriter();
spawnWorker(gameplayCanvas, reader); spawnWorker(gameplayCanvas, datagramReader);
keyEventElement.addEventListener("keydown", (event: KeyboardEvent) => { sendKeyboardEvent(writer, event, KeyAction.DOWN) }); keyEventElement.addEventListener("keydown", (event: KeyboardEvent) => { sendKeyboardEvent(writer, event, KeyAction.DOWN) });
keyEventElement.addEventListener("keyup", (event: KeyboardEvent) => { sendKeyboardEvent(writer, event, KeyAction.UP) }); keyEventElement.addEventListener("keyup", (event: KeyboardEvent) => { sendKeyboardEvent(writer, event, KeyAction.UP) });
@@ -67,7 +76,7 @@ export async function startWebtransportStream(
mouseElement.addEventListener("click", async () => { mouseElement.addEventListener("click", async () => {
console.log("Requesting pointer lock"); console.log("Requesting pointer lock");
await mouseElement.requestPointerLock({ await mouseElement.requestPointerLock({
unadjustedMovement: true, //unadjustedMovement: true,
}); });
console.log("Pointer lock aquired"); console.log("Pointer lock aquired");
}) })
+4
View File
@@ -5,9 +5,12 @@ edition = "2024"
[dependencies] [dependencies]
anyhow = "1.0.98" anyhow = "1.0.98"
argon2 = "0.5"
directories = "6.0.0" directories = "6.0.0"
flatbuffers = "25.2.10" flatbuffers = "25.2.10"
getrandom = { version = "0.3.3", features = ["std"] } getrandom = { version = "0.3.3", features = ["std"] }
h3-datagram = "0.0.2"
h3-quinn = "0.0.10"
hex = "0.4.3" hex = "0.4.3"
hmac-sha256 = "1.1.12" hmac-sha256 = "1.1.12"
http = "1.3.1" http = "1.3.1"
@@ -15,6 +18,7 @@ libc = "0.2.174"
moonlight-common-c-sys = { path = "../moonlight-common-c-sys" } moonlight-common-c-sys = { path = "../moonlight-common-c-sys" }
openssl = "0.10.73" openssl = "0.10.73"
rand = "0.9.1" rand = "0.9.1"
rusqlite = { version = "0.34", features = ["bundled"] }
reqwest = { version = "0.12.20", features = [ reqwest = { version = "0.12.20", features = [
"rustls-tls", "rustls-tls",
"native-tls", "native-tls",
+25 -1
View File
@@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
use tracing::{debug, error}; use tracing::{debug, error};
use crate::{ use crate::{
auth,
common, common,
common::{AppError, AppResult}, common::{AppError, AppResult},
responses, responses,
@@ -45,7 +46,17 @@ struct GetAppsResponse {
#[craft] #[craft]
impl crate::backend::Backend { impl crate::backend::Backend {
#[craft(endpoint(status_codes(StatusCode::OK, StatusCode::INTERNAL_SERVER_ERROR)))] #[craft(endpoint(status_codes(StatusCode::OK, StatusCode::INTERNAL_SERVER_ERROR)))]
pub async fn get_apps(self: ::std::sync::Arc<Self>) -> AppResult<Json<GetAppsResponse>> { pub async fn get_apps(self: ::std::sync::Arc<Self>, depot: &mut Depot) -> AppResult<Json<GetAppsResponse>> {
let user = match auth::get_user_from_depot(depot) {
Some(u) => u.clone(),
None => {
error!("get_apps reached without authenticated user in depot");
return Err(AppError {
status_code: StatusCode::UNAUTHORIZED,
description: "Not authenticated".to_string(),
});
}
};
let standard_error = Err(AppError { let standard_error = Err(AppError {
status_code: StatusCode::INTERNAL_SERVER_ERROR, status_code: StatusCode::INTERNAL_SERVER_ERROR,
description: "failed to get available apps".to_string(), description: "failed to get available apps".to_string(),
@@ -143,6 +154,19 @@ impl crate::backend::Backend {
get_apps_resp.apps.insert(server.name, resp_vec); get_apps_resp.apps.insert(server.name, resp_vec);
} }
// Filter apps by user permissions (admins see everything)
if !user.is_admin {
let permissions = self.db.get_permissions(&user.id).unwrap_or_default();
for (server_name, apps) in get_apps_resp.apps.iter_mut() {
apps.retain(|app| {
permissions.iter().any(|p| {
p.server == *server_name && p.app_id == app.id as i64
})
});
}
get_apps_resp.apps.retain(|_, apps| !apps.is_empty());
}
Ok(Json(get_apps_resp)) Ok(Json(get_apps_resp))
} }
} }
+326
View File
@@ -0,0 +1,326 @@
use std::sync::Arc;
use salvo::prelude::*;
use serde::{Deserialize, Serialize};
use tracing::error;
use crate::common::{AppError, AppResult};
use crate::db::{AppPermission, Db, User};
const SESSION_MAX_AGE_SECONDS: i64 = 7 * 24 * 3600; // 7 days
// Key used to store the authenticated user in the Salvo Depot
const USER_DEPOT_KEY: &str = "authenticated_user";
pub fn get_user_from_depot(depot: &Depot) -> Option<&User> {
depot.get::<User>(USER_DEPOT_KEY).ok()
}
// -- Middleware --
pub struct SessionAuthMiddleware {
pub db: Arc<Db>,
}
#[handler]
impl SessionAuthMiddleware {
async fn handle(&self, req: &mut Request, depot: &mut Depot, res: &mut Response, ctrl: &mut FlowCtrl) {
let token = req
.headers()
.get("authorization")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.strip_prefix("Bearer "));
let token = match token {
Some(t) => t,
None => {
res.status_code(StatusCode::UNAUTHORIZED);
Json(serde_json::json!({"description": "Missing or invalid Authorization header"})).render(res);
ctrl.skip_rest();
return;
}
};
match self.db.validate_session(token) {
Ok(Some(user)) => {
depot.insert(USER_DEPOT_KEY, user);
}
Ok(None) => {
res.status_code(StatusCode::UNAUTHORIZED);
Json(serde_json::json!({"description": "Invalid or expired session"})).render(res);
ctrl.skip_rest();
return;
}
Err(e) => {
error!("Session validation error: {e}");
res.status_code(StatusCode::INTERNAL_SERVER_ERROR);
Json(serde_json::json!({"description": "Internal server error"})).render(res);
ctrl.skip_rest();
return;
}
}
}
}
pub struct AdminCheckMiddleware;
#[handler]
impl AdminCheckMiddleware {
async fn handle(&self, _req: &mut Request, depot: &mut Depot, res: &mut Response, ctrl: &mut FlowCtrl) {
let user = match get_user_from_depot(depot) {
Some(u) => u,
None => {
res.status_code(StatusCode::UNAUTHORIZED);
Json(serde_json::json!({"description": "Not authenticated"})).render(res);
ctrl.skip_rest();
return;
}
};
if !user.is_admin {
res.status_code(StatusCode::FORBIDDEN);
Json(serde_json::json!({"description": "Admin access required"})).render(res);
ctrl.skip_rest();
return;
}
}
}
// -- Request/Response types --
#[derive(Deserialize, ToSchema)]
pub struct LoginRequest {
pub username: String,
pub password: String,
}
#[derive(Serialize, ToSchema)]
pub struct LoginResponse {
pub token: String,
}
#[derive(Serialize, ToSchema)]
pub struct MeResponse {
pub username: String,
pub is_admin: bool,
pub permissions: Vec<AppPermission>,
}
#[derive(Deserialize, ToSchema)]
pub struct CreateUserRequest {
pub username: String,
pub password: String,
pub is_admin: bool,
}
#[derive(Deserialize, ToSchema)]
pub struct UpdateUserRequest {
pub password: Option<String>,
pub is_admin: Option<bool>,
}
#[derive(Deserialize, ToSchema)]
pub struct SetPermissionsRequest {
pub permissions: Vec<AppPermission>,
}
// -- Auth endpoint handlers --
#[craft]
impl crate::backend::Backend {
#[craft(handler)]
pub async fn login(
self: Arc<Self>,
body: salvo::oapi::extract::JsonBody<LoginRequest>,
) -> AppResult<Json<LoginResponse>> {
let user = match self.db.verify_password(&body.username, &body.password) {
Ok(Some(u)) => u,
Ok(None) => {
return Err(AppError {
status_code: StatusCode::UNAUTHORIZED,
description: "Invalid username or password".to_string(),
});
}
Err(e) => {
error!("Login error: {e}");
return Err(AppError {
status_code: StatusCode::INTERNAL_SERVER_ERROR,
description: "Internal server error".to_string(),
});
}
};
let token = match self.db.create_session(&user.id, SESSION_MAX_AGE_SECONDS) {
Ok(t) => t,
Err(e) => {
error!("Session creation error: {e}");
return Err(AppError {
status_code: StatusCode::INTERNAL_SERVER_ERROR,
description: "Internal server error".to_string(),
});
}
};
Ok(Json(LoginResponse { token }))
}
#[craft(handler)]
pub async fn logout(self: Arc<Self>, req: &mut Request) -> AppResult<Json<serde_json::Value>> {
let token = req
.headers()
.get("authorization")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.strip_prefix("Bearer "))
.unwrap_or("");
let _ = self.db.delete_session(token);
Ok(Json(serde_json::json!({"status": "ok"})))
}
#[craft(handler)]
pub async fn me(self: Arc<Self>, depot: &mut Depot) -> AppResult<Json<MeResponse>> {
let user = match get_user_from_depot(depot) {
Some(u) => u.clone(),
None => {
return Err(AppError {
status_code: StatusCode::UNAUTHORIZED,
description: "Not authenticated".to_string(),
});
}
};
let permissions = self.db.get_permissions(&user.id).unwrap_or_default();
Ok(Json(MeResponse {
username: user.username,
is_admin: user.is_admin,
permissions,
}))
}
// -- Admin endpoint handlers --
#[craft(handler)]
pub async fn admin_list_users(self: Arc<Self>) -> AppResult<Json<Vec<User>>> {
match self.db.list_users() {
Ok(users) => Ok(Json(users)),
Err(e) => {
error!("List users error: {e}");
Err(AppError {
status_code: StatusCode::INTERNAL_SERVER_ERROR,
description: "Failed to list users".to_string(),
})
}
}
}
#[craft(handler)]
pub async fn admin_create_user(
self: Arc<Self>,
body: salvo::oapi::extract::JsonBody<CreateUserRequest>,
) -> AppResult<Json<User>> {
match self
.db
.create_user(&body.username, &body.password, body.is_admin)
{
Ok(user) => Ok(Json(user)),
Err(e) => {
error!("Create user error: {e}");
Err(AppError {
status_code: StatusCode::BAD_REQUEST,
description: format!("Failed to create user: {e}"),
})
}
}
}
#[craft(handler)]
pub async fn admin_update_user(
self: Arc<Self>,
req: &mut Request,
body: salvo::oapi::extract::JsonBody<UpdateUserRequest>,
) -> AppResult<Json<serde_json::Value>> {
let user_id = req.param::<String>("id").unwrap_or_default();
match self
.db
.update_user(&user_id, body.password.as_deref(), body.is_admin)
{
Ok(true) => Ok(Json(serde_json::json!({"status": "ok"}))),
Ok(false) => Err(AppError {
status_code: StatusCode::NOT_FOUND,
description: "User not found".to_string(),
}),
Err(e) => {
error!("Update user error: {e}");
Err(AppError {
status_code: StatusCode::INTERNAL_SERVER_ERROR,
description: "Failed to update user".to_string(),
})
}
}
}
#[craft(handler)]
pub async fn admin_delete_user(
self: Arc<Self>,
req: &mut Request,
) -> AppResult<Json<serde_json::Value>> {
let user_id = req.param::<String>("id").unwrap_or_default();
match self.db.delete_user(&user_id) {
Ok(true) => Ok(Json(serde_json::json!({"status": "ok"}))),
Ok(false) => Err(AppError {
status_code: StatusCode::NOT_FOUND,
description: "User not found".to_string(),
}),
Err(e) => {
error!("Delete user error: {e}");
Err(AppError {
status_code: StatusCode::INTERNAL_SERVER_ERROR,
description: "Failed to delete user".to_string(),
})
}
}
}
#[craft(handler)]
pub async fn admin_get_permissions(
self: Arc<Self>,
req: &mut Request,
) -> AppResult<Json<Vec<AppPermission>>> {
let user_id = req.param::<String>("id").unwrap_or_default();
match self.db.get_permissions(&user_id) {
Ok(perms) => Ok(Json(perms)),
Err(e) => {
error!("Get permissions error: {e}");
Err(AppError {
status_code: StatusCode::INTERNAL_SERVER_ERROR,
description: "Failed to get permissions".to_string(),
})
}
}
}
#[craft(handler)]
pub async fn admin_set_permissions(
self: Arc<Self>,
req: &mut Request,
body: salvo::oapi::extract::JsonBody<SetPermissionsRequest>,
) -> AppResult<Json<serde_json::Value>> {
let user_id = req.param::<String>("id").unwrap_or_default();
match self.db.set_permissions(&user_id, &body.permissions) {
Ok(()) => Ok(Json(serde_json::json!({"status": "ok"}))),
Err(e) => {
error!("Set permissions error: {e}");
Err(AppError {
status_code: StatusCode::INTERNAL_SERVER_ERROR,
description: "Failed to set permissions".to_string(),
})
}
}
}
}
@@ -5,6 +5,7 @@ use salvo::oapi::ToSchema;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use crate::db::Db;
use crate::state::StateFile; use crate::state::StateFile;
#[derive(Debug, Clone, Deserialize, Serialize)] #[derive(Debug, Clone, Deserialize, Serialize)]
@@ -89,14 +90,25 @@ pub struct Backend {
pub state: StateFile, pub state: StateFile,
pub streams: RwLock<HashMap<uuid::Uuid, Stream>>, pub streams: RwLock<HashMap<uuid::Uuid, Stream>>,
pub port: u16, pub port: u16,
pub db: Db,
} }
impl Backend { impl Backend {
pub fn new(port: u16) -> Result<Self> { pub fn new(port: u16) -> Result<Self> {
let project_dirs =
directories::ProjectDirs::from("xyz", "ohea", "gamestream-webtransport-proxy")
.ok_or(anyhow::anyhow!("Could not get project dirs"))?;
let data_dir = project_dirs.data_dir();
std::fs::create_dir_all(data_dir)?;
let db_path = data_dir.join("auth.db");
let db = Db::open(&db_path)?;
Ok(Backend { Ok(Backend {
state: StateFile::new()?, state: StateFile::new()?,
streams: RwLock::new(HashMap::new()), streams: RwLock::new(HashMap::new()),
port, port,
db,
}) })
} }
} }
+29 -3
View File
@@ -120,15 +120,41 @@ fn generate_http_cert_and_key(
cert_builder.set_version(2)?; cert_builder.set_version(2)?;
let serial = openssl::bn::BigNum::from_u32(1)?;
let asn_serial = openssl::asn1::Asn1Integer::from_bn(&serial)?;
cert_builder.set_serial_number(&asn_serial)?;
// Set subject (Distinguished Name) // Set subject (Distinguished Name)
let mut name_builder = X509NameBuilder::new()?; let mut name_builder = X509NameBuilder::new()?;
name_builder.append_entry_by_text("CN", "mumble-web self-signed")?; name_builder.append_entry_by_text("CN", "localhost")?;
let subject_name = name_builder.build(); let subject_name = name_builder.build();
cert_builder.set_subject_name(&subject_name)?; cert_builder.set_subject_name(&subject_name)?;
// Set issuer (same as subject for self-signed)
cert_builder.set_issuer_name(&subject_name)?; cert_builder.set_issuer_name(&subject_name)?;
let context = cert_builder.x509v3_context(None, None);
let mut san = openssl::x509::extension::SubjectAlternativeName::new();
san.dns("localhost");
let san_extension = san.build(&context)?;
let key_usage = openssl::x509::extension::KeyUsage::new()
.digital_signature()
.key_encipherment()
.build()?;
let ext_key_usage = openssl::x509::extension::ExtendedKeyUsage::new()
.server_auth()
.build()?;
// Add Subject Key Identifier
let subject_key_id = openssl::x509::extension::SubjectKeyIdentifier::new().build(&context)?;
cert_builder.append_extension(san_extension)?;
cert_builder.append_extension(key_usage)?;
cert_builder.append_extension(ext_key_usage)?;
cert_builder.append_extension(subject_key_id)?;
cert_builder.set_not_before(&now)?; cert_builder.set_not_before(&now)?;
cert_builder.set_not_after(&expiration_time)?; cert_builder.set_not_after(&expiration_time)?;
cert_builder.set_pubkey(&key)?; cert_builder.set_pubkey(&key)?;
+659
View File
@@ -0,0 +1,659 @@
use std::path::Path;
use std::sync::Mutex;
use anyhow::{Context, Result};
use argon2::{
Argon2,
password_hash::{PasswordHash, PasswordHasher, PasswordVerifier, SaltString, rand_core::OsRng},
};
use salvo::oapi::ToSchema;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct User {
pub id: String,
pub username: String,
pub is_admin: bool,
pub created_at: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct AppPermission {
pub server: String,
pub app_id: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Session {
pub token: String,
pub user_id: String,
pub created_at: String,
pub expires_at: String,
}
pub struct Db {
conn: Mutex<rusqlite::Connection>,
}
impl Db {
pub fn open(path: &Path) -> Result<Self> {
let conn = rusqlite::Connection::open(path)?;
let db = Db {
conn: Mutex::new(conn),
};
db.init()?;
Ok(db)
}
fn init(&self) -> Result<()> {
let conn = self.conn.lock().unwrap();
conn.execute_batch("PRAGMA foreign_keys = ON;")?;
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL,
is_admin INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS user_app_permissions (
user_id TEXT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
server TEXT NOT NULL,
app_id INTEGER NOT NULL,
PRIMARY KEY (user_id, server, app_id)
);
CREATE TABLE IF NOT EXISTS sessions (
token TEXT PRIMARY KEY,
user_id TEXT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
created_at TEXT NOT NULL,
expires_at TEXT NOT NULL
);",
)?;
Ok(())
}
pub fn seed_admin_if_needed(&self) -> Result<Option<(String, String)>> {
let conn = self.conn.lock().unwrap();
let count: i64 = conn.query_row("SELECT COUNT(*) FROM users", [], |row| row.get(0))?;
if count > 0 {
return Ok(None);
}
drop(conn);
let password = generate_random_password();
let user = self.create_user("admin", &password, true)?;
Ok(Some((user.username, password)))
}
pub fn create_user(&self, username: &str, password: &str, is_admin: bool) -> Result<User> {
let id = uuid::Uuid::new_v4().to_string();
let password_hash = hash_password(password)?;
let created_at = now_iso8601();
let conn = self.conn.lock().unwrap();
conn.execute(
"INSERT INTO users (id, username, password, is_admin, created_at) VALUES (?1, ?2, ?3, ?4, ?5)",
rusqlite::params![id, username, password_hash, is_admin as i32, created_at],
).context("Failed to create user (username may already exist)")?;
Ok(User {
id,
username: username.to_string(),
is_admin,
created_at,
})
}
pub fn verify_password(&self, username: &str, password: &str) -> Result<Option<User>> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT id, username, password, is_admin, created_at FROM users WHERE username = ?1",
)?;
let mut rows = stmt.query(rusqlite::params![username])?;
let row = match rows.next()? {
Some(r) => r,
None => return Ok(None),
};
let id: String = row.get(0)?;
let uname: String = row.get(1)?;
let stored_hash: String = row.get(2)?;
let is_admin: bool = row.get::<_, i32>(3)? != 0;
let created_at: String = row.get(4)?;
let parsed_hash =
PasswordHash::new(&stored_hash).map_err(|e| anyhow::anyhow!("Invalid hash: {e}"))?;
if Argon2::default()
.verify_password(password.as_bytes(), &parsed_hash)
.is_err()
{
return Ok(None);
}
Ok(Some(User {
id,
username: uname,
is_admin,
created_at,
}))
}
pub fn get_user(&self, user_id: &str) -> Result<Option<User>> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT id, username, is_admin, created_at FROM users WHERE id = ?1",
)?;
let mut rows = stmt.query(rusqlite::params![user_id])?;
match rows.next()? {
Some(row) => Ok(Some(User {
id: row.get(0)?,
username: row.get(1)?,
is_admin: row.get::<_, i32>(2)? != 0,
created_at: row.get(3)?,
})),
None => Ok(None),
}
}
pub fn list_users(&self) -> Result<Vec<User>> {
let conn = self.conn.lock().unwrap();
let mut stmt =
conn.prepare("SELECT id, username, is_admin, created_at FROM users ORDER BY username")?;
let users = stmt
.query_map([], |row| {
Ok(User {
id: row.get(0)?,
username: row.get(1)?,
is_admin: row.get::<_, i32>(2)? != 0,
created_at: row.get(3)?,
})
})?
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(users)
}
pub fn update_user(
&self,
user_id: &str,
new_password: Option<&str>,
new_is_admin: Option<bool>,
) -> Result<bool> {
let conn = self.conn.lock().unwrap();
if let Some(password) = new_password {
let hash = hash_password(password)?;
conn.execute(
"UPDATE users SET password = ?1 WHERE id = ?2",
rusqlite::params![hash, user_id],
)?;
}
if let Some(is_admin) = new_is_admin {
conn.execute(
"UPDATE users SET is_admin = ?1 WHERE id = ?2",
rusqlite::params![is_admin as i32, user_id],
)?;
}
let changed = conn.changes() > 0;
Ok(changed)
}
pub fn delete_user(&self, user_id: &str) -> Result<bool> {
let conn = self.conn.lock().unwrap();
conn.execute("PRAGMA foreign_keys = ON;", [])?;
let rows = conn.execute("DELETE FROM users WHERE id = ?1", rusqlite::params![user_id])?;
Ok(rows > 0)
}
// Session management
pub fn create_session(&self, user_id: &str, max_age_seconds: i64) -> Result<String> {
let token = generate_session_token();
let created_at = now_iso8601();
let expires_at = future_iso8601(max_age_seconds);
let conn = self.conn.lock().unwrap();
conn.execute(
"INSERT INTO sessions (token, user_id, created_at, expires_at) VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![token, user_id, created_at, expires_at],
)?;
Ok(token)
}
pub fn validate_session(&self, token: &str) -> Result<Option<User>> {
let conn = self.conn.lock().unwrap();
let now = now_iso8601();
let mut stmt = conn.prepare(
"SELECT u.id, u.username, u.is_admin, u.created_at
FROM sessions s
JOIN users u ON s.user_id = u.id
WHERE s.token = ?1 AND s.expires_at > ?2",
)?;
let mut rows = stmt.query(rusqlite::params![token, now])?;
match rows.next()? {
Some(row) => Ok(Some(User {
id: row.get(0)?,
username: row.get(1)?,
is_admin: row.get::<_, i32>(2)? != 0,
created_at: row.get(3)?,
})),
None => Ok(None),
}
}
pub fn delete_session(&self, token: &str) -> Result<bool> {
let conn = self.conn.lock().unwrap();
let rows = conn.execute(
"DELETE FROM sessions WHERE token = ?1",
rusqlite::params![token],
)?;
Ok(rows > 0)
}
pub fn cleanup_expired_sessions(&self) -> Result<usize> {
let conn = self.conn.lock().unwrap();
let now = now_iso8601();
let rows = conn.execute(
"DELETE FROM sessions WHERE expires_at <= ?1",
rusqlite::params![now],
)?;
Ok(rows)
}
// Permission management
pub fn set_permissions(&self, user_id: &str, permissions: &[AppPermission]) -> Result<()> {
let conn = self.conn.lock().unwrap();
conn.execute(
"DELETE FROM user_app_permissions WHERE user_id = ?1",
rusqlite::params![user_id],
)?;
let mut stmt = conn.prepare(
"INSERT INTO user_app_permissions (user_id, server, app_id) VALUES (?1, ?2, ?3)",
)?;
for perm in permissions {
stmt.execute(rusqlite::params![user_id, perm.server, perm.app_id])?;
}
Ok(())
}
pub fn get_permissions(&self, user_id: &str) -> Result<Vec<AppPermission>> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT server, app_id FROM user_app_permissions WHERE user_id = ?1",
)?;
let perms = stmt
.query_map(rusqlite::params![user_id], |row| {
Ok(AppPermission {
server: row.get(0)?,
app_id: row.get(1)?,
})
})?
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(perms)
}
pub fn check_app_permission(
&self,
user_id: &str,
server: &str,
app_id: i64,
) -> Result<bool> {
// Check if user is admin first
let conn = self.conn.lock().unwrap();
let is_admin: i32 = conn.query_row(
"SELECT is_admin FROM users WHERE id = ?1",
rusqlite::params![user_id],
|row| row.get(0),
)?;
if is_admin != 0 {
return Ok(true);
}
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM user_app_permissions WHERE user_id = ?1 AND server = ?2 AND app_id = ?3",
rusqlite::params![user_id, server, app_id],
|row| row.get(0),
)?;
Ok(count > 0)
}
}
fn hash_password(password: &str) -> Result<String> {
let salt = SaltString::generate(&mut OsRng);
let hash = Argon2::default()
.hash_password(password.as_bytes(), &salt)
.map_err(|e| anyhow::anyhow!("Failed to hash password: {e}"))?;
Ok(hash.to_string())
}
fn generate_session_token() -> String {
let mut bytes = [0u8; 32];
openssl::rand::rand_bytes(&mut bytes).expect("Failed to generate random bytes");
hex::encode(bytes)
}
fn generate_random_password() -> String {
let mut bytes = [0u8; 16];
openssl::rand::rand_bytes(&mut bytes).expect("Failed to generate random bytes");
hex::encode(bytes)
}
fn now_iso8601() -> String {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
.to_string()
}
fn future_iso8601(seconds_from_now: i64) -> String {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
(now as i64 + seconds_from_now).to_string()
}
#[cfg(test)]
mod tests {
use super::*;
fn test_db() -> Db {
let conn = rusqlite::Connection::open_in_memory().unwrap();
let db = Db {
conn: Mutex::new(conn),
};
db.init().unwrap();
db
}
#[test]
fn test_create_and_get_user() {
let db = test_db();
let user = db.create_user("alice", "password123", false).unwrap();
assert_eq!(user.username, "alice");
assert!(!user.is_admin);
let fetched = db.get_user(&user.id).unwrap().unwrap();
assert_eq!(fetched.username, "alice");
assert_eq!(fetched.id, user.id);
}
#[test]
fn test_verify_correct_password() {
let db = test_db();
db.create_user("bob", "secret", false).unwrap();
let result = db.verify_password("bob", "secret").unwrap();
assert!(result.is_some());
assert_eq!(result.unwrap().username, "bob");
}
#[test]
fn test_verify_wrong_password() {
let db = test_db();
db.create_user("bob", "secret", false).unwrap();
let result = db.verify_password("bob", "wrong").unwrap();
assert!(result.is_none());
}
#[test]
fn test_verify_nonexistent_user() {
let db = test_db();
let result = db.verify_password("nobody", "pass").unwrap();
assert!(result.is_none());
}
#[test]
fn test_duplicate_username_rejected() {
let db = test_db();
db.create_user("alice", "pass1", false).unwrap();
let result = db.create_user("alice", "pass2", false);
assert!(result.is_err());
}
#[test]
fn test_list_users() {
let db = test_db();
db.create_user("charlie", "pass", false).unwrap();
db.create_user("alice", "pass", true).unwrap();
let users = db.list_users().unwrap();
assert_eq!(users.len(), 2);
assert_eq!(users[0].username, "alice"); // sorted
assert_eq!(users[1].username, "charlie");
}
#[test]
fn test_update_user_password() {
let db = test_db();
let user = db.create_user("dave", "oldpass", false).unwrap();
db.update_user(&user.id, Some("newpass"), None).unwrap();
assert!(db.verify_password("dave", "oldpass").unwrap().is_none());
assert!(db.verify_password("dave", "newpass").unwrap().is_some());
}
#[test]
fn test_update_user_admin_status() {
let db = test_db();
let user = db.create_user("eve", "pass", false).unwrap();
assert!(!user.is_admin);
db.update_user(&user.id, None, Some(true)).unwrap();
let updated = db.get_user(&user.id).unwrap().unwrap();
assert!(updated.is_admin);
}
#[test]
fn test_delete_user() {
let db = test_db();
let user = db.create_user("frank", "pass", false).unwrap();
assert!(db.delete_user(&user.id).unwrap());
assert!(db.get_user(&user.id).unwrap().is_none());
}
#[test]
fn test_delete_nonexistent_user() {
let db = test_db();
assert!(!db.delete_user("nonexistent-id").unwrap());
}
#[test]
fn test_create_and_validate_session() {
let db = test_db();
let user = db.create_user("grace", "pass", false).unwrap();
let token = db.create_session(&user.id, 3600).unwrap();
let validated = db.validate_session(&token).unwrap();
assert!(validated.is_some());
assert_eq!(validated.unwrap().username, "grace");
}
#[test]
fn test_expired_session_rejected() {
let db = test_db();
let user = db.create_user("heidi", "pass", false).unwrap();
// Create session that expired 10 seconds ago
let token = db.create_session(&user.id, -10).unwrap();
let validated = db.validate_session(&token).unwrap();
assert!(validated.is_none());
}
#[test]
fn test_invalid_token_rejected() {
let db = test_db();
let validated = db.validate_session("bogus-token").unwrap();
assert!(validated.is_none());
}
#[test]
fn test_delete_session() {
let db = test_db();
let user = db.create_user("ivan", "pass", false).unwrap();
let token = db.create_session(&user.id, 3600).unwrap();
assert!(db.delete_session(&token).unwrap());
assert!(db.validate_session(&token).unwrap().is_none());
}
#[test]
fn test_delete_user_cascades_sessions() {
let db = test_db();
let user = db.create_user("judy", "pass", false).unwrap();
let token = db.create_session(&user.id, 3600).unwrap();
db.delete_user(&user.id).unwrap();
assert!(db.validate_session(&token).unwrap().is_none());
}
#[test]
fn test_delete_user_cascades_permissions() {
let db = test_db();
let user = db.create_user("karl", "pass", false).unwrap();
db.set_permissions(
&user.id,
&[AppPermission {
server: "srv".to_string(),
app_id: 1,
}],
)
.unwrap();
db.delete_user(&user.id).unwrap();
// Permissions table should be empty for this user
let perms = db.get_permissions(&user.id).unwrap();
assert!(perms.is_empty());
}
#[test]
fn test_set_and_get_permissions() {
let db = test_db();
let user = db.create_user("laura", "pass", false).unwrap();
let perms = vec![
AppPermission {
server: "server1".to_string(),
app_id: 10,
},
AppPermission {
server: "server1".to_string(),
app_id: 20,
},
];
db.set_permissions(&user.id, &perms).unwrap();
let fetched = db.get_permissions(&user.id).unwrap();
assert_eq!(fetched.len(), 2);
}
#[test]
fn test_set_permissions_replaces_existing() {
let db = test_db();
let user = db.create_user("mike", "pass", false).unwrap();
db.set_permissions(
&user.id,
&[AppPermission {
server: "s1".to_string(),
app_id: 1,
}],
)
.unwrap();
db.set_permissions(
&user.id,
&[AppPermission {
server: "s2".to_string(),
app_id: 2,
}],
)
.unwrap();
let perms = db.get_permissions(&user.id).unwrap();
assert_eq!(perms.len(), 1);
assert_eq!(perms[0].server, "s2");
assert_eq!(perms[0].app_id, 2);
}
#[test]
fn test_check_app_permission_allowed() {
let db = test_db();
let user = db.create_user("nancy", "pass", false).unwrap();
db.set_permissions(
&user.id,
&[AppPermission {
server: "srv".to_string(),
app_id: 42,
}],
)
.unwrap();
assert!(db.check_app_permission(&user.id, "srv", 42).unwrap());
}
#[test]
fn test_check_app_permission_denied() {
let db = test_db();
let user = db.create_user("oscar", "pass", false).unwrap();
assert!(!db.check_app_permission(&user.id, "srv", 42).unwrap());
}
#[test]
fn test_check_app_permission_admin_bypass() {
let db = test_db();
let user = db.create_user("pat", "pass", true).unwrap();
// Admin has no explicit permissions but should pass
assert!(db.check_app_permission(&user.id, "srv", 42).unwrap());
}
#[test]
fn test_cleanup_expired_sessions() {
let db = test_db();
let user = db.create_user("quinn", "pass", false).unwrap();
let _expired = db.create_session(&user.id, -10).unwrap();
let valid = db.create_session(&user.id, 3600).unwrap();
let cleaned = db.cleanup_expired_sessions().unwrap();
assert_eq!(cleaned, 1);
// Valid session should still work
assert!(db.validate_session(&valid).unwrap().is_some());
}
#[test]
fn test_seed_admin_if_needed() {
let db = test_db();
// First call should create admin
let result = db.seed_admin_if_needed().unwrap();
assert!(result.is_some());
let (username, password) = result.unwrap();
assert_eq!(username, "admin");
assert!(!password.is_empty());
// Verify can login with generated password
let user = db.verify_password("admin", &password).unwrap().unwrap();
assert!(user.is_admin);
// Second call should be a no-op
let result = db.seed_admin_if_needed().unwrap();
assert!(result.is_none());
}
}
@@ -40,7 +40,7 @@ pub fn stream_config(stream: &crate::backend::Stream) -> _STREAM_CONFIGURATION {
height: stream.stream_config.mode.height, height: stream.stream_config.mode.height,
fps: stream.stream_config.mode.fps, fps: stream.stream_config.mode.fps,
bitrate: stream.stream_config.bitrate_kbps, bitrate: stream.stream_config.bitrate_kbps,
packetSize: 512, packetSize: 1024,
streamingRemotely: STREAM_CFG_AUTO, streamingRemotely: STREAM_CFG_AUTO,
audioConfiguration: (0x3 << 16) | (2 << 8) | 0xCA, audioConfiguration: (0x3 << 16) | (2 << 8) | 0xCA,
supportedVideoFormats: VIDEO_FORMAT_H264, supportedVideoFormats: VIDEO_FORMAT_H264,
@@ -12,7 +12,7 @@ use tokio::sync::mpsc;
use tracing::{debug, error}; use tracing::{debug, error};
#[derive(Serialize)] #[derive(Serialize)]
enum FrameType { pub enum FrameType {
PFRAME, PFRAME,
IDR, IDR,
} }
@@ -32,7 +32,7 @@ impl TryFrom<i32> for FrameType {
} }
#[derive(Serialize)] #[derive(Serialize)]
enum VideoFormat { pub enum VideoFormat {
H264, H264,
H264_HIGH8_444, H264_HIGH8_444,
H265, H265,
@@ -92,8 +92,8 @@ impl TryFrom<i32> for BufferType {
} }
#[derive(Serialize)] #[derive(Serialize)]
struct Buffer { pub struct Buffer {
data: Vec<u8>, pub data: Vec<u8>,
buffer_type: BufferType, buffer_type: BufferType,
} }
@@ -119,23 +119,27 @@ pub enum RendererMessage {
redraw_rate: u64, redraw_rate: u64,
dr_flags: i32, dr_flags: i32,
}, },
DecodeUnit { DecodeUnitStart {
frame_number: u64, frame_number: u64,
frame_type: FrameType, frame_type: FrameType,
num_buffers: u64,
host_processing_latency: u16, host_processing_latency: u16,
receieve_time_ms: u64, receive_time_ms: u64,
enqueue_time_ms: u64, enqueue_time_ms: u64,
presentation_time: u64, presentation_time: u64,
full_length: usize, full_length: u64,
//buffers: Vec<Buffer>,
buffer: Buffer,
index: u64,
hdr_active: bool, hdr_active: bool,
colorspace: u8, colorspace: u8,
}, },
DecodeUnitBuffer {
frame_number: u64,
buffer_index: u64,
buffer_offset: u64,
buffer: Buffer,
},
} }
impl RendererMessage { impl RendererMessage {
@@ -155,23 +159,43 @@ impl RendererMessage {
}) })
} }
fn from_decode_unit(decode_unit: _DECODE_UNIT) -> Result<Self> { fn from_decode_unit(decode_unit: _DECODE_UNIT) -> Result<Vec<Self>> {
//fn from_decode_unit(decode_unit: _DECODE_UNIT) -> Result<Vec<Self>> { let mut messages = Vec::new();
let mut buffer = Vec::new();
//let mut buffers = Vec::new();
if decode_unit.bufferList.is_null() { if decode_unit.bufferList.is_null() {
return Err(anyhow!("DecodeUnit bufferList is null")); return Err(anyhow!("DecodeUnit bufferList is null"));
} }
let frame_number = <u64>::try_from(decode_unit.frameNumber)?;
messages.push(RendererMessage::DecodeUnitStart {
frame_number,
frame_type: FrameType::try_from(decode_unit.frameType)?,
num_buffers: 0,
host_processing_latency: decode_unit.frameHostProcessingLatency,
receive_time_ms: decode_unit.receiveTimeMs,
enqueue_time_ms: decode_unit.enqueueTimeMs,
presentation_time: decode_unit.presentationTimeMs as u64,
full_length: <u64>::try_from(decode_unit.fullLength)?,
hdr_active: decode_unit.hdrActive,
colorspace: decode_unit.colorspace,
});
let mut next = unsafe { *decode_unit.bufferList }; let mut next = unsafe { *decode_unit.bufferList };
let mut index = 0; let mut index = 0;
let mut offset = 0;
loop { loop {
let mut b = Buffer::try_from(next)?; let b = Buffer::try_from(next)?;
buffer.append(&mut b.data); let buffer_len = b.data.len() as u64;
//buffers.push(msg); messages.push(RendererMessage::DecodeUnitBuffer {
frame_number,
buffer_index: index,
buffer_offset: offset,
buffer: b,
});
offset = offset + buffer_len;
index = index + 1; index = index + 1;
if next.next.is_null() { if next.next.is_null() {
break; break;
@@ -180,22 +204,15 @@ impl RendererMessage {
next = unsafe { *next.next }; next = unsafe { *next.next };
} }
Ok(RendererMessage::DecodeUnit { if let RendererMessage::DecodeUnitStart {
frame_number: <u64>::try_from(decode_unit.frameNumber)?, ref mut num_buffers,
frame_type: FrameType::try_from(decode_unit.frameType)?, ..
host_processing_latency: decode_unit.frameHostProcessingLatency, } = messages[0]
receieve_time_ms: decode_unit.receiveTimeMs, {
enqueue_time_ms: decode_unit.enqueueTimeMs, *num_buffers = index;
presentation_time: decode_unit.presentationTimeMs as u64, }
full_length: <usize>::try_from(decode_unit.fullLength)?,
buffer: Buffer { Ok(messages)
data: buffer,
buffer_type: BufferType::PICDATA,
},
index,
hdr_active: decode_unit.hdrActive,
colorspace: decode_unit.colorspace,
})
} }
} }
@@ -261,23 +278,29 @@ extern "C" fn submit_decode_unit_cb(decode_unit: PDECODE_UNIT) -> std::os::raw::
return -1; return -1;
} }
let decode_unit = unsafe { *decode_unit }; let decode_unit = unsafe { *decode_unit };
//debug!("decode unit bytes: {}", decode_unit.fullLength);
let message = match RendererMessage::from_decode_unit(decode_unit) { let messages = match RendererMessage::from_decode_unit(decode_unit) {
Ok(m) => m, Ok(m) => m,
Err(e) => { Err(e) => {
error!("Cannot construct RendererMessage: {e}"); error!("Cannot construct RendererMessage: {e}");
return -1; return -1;
} }
}; };
debug!(
"got decode unit with {} buffers: {:?}",
messages.len() - 1,
std::time::Instant::now()
);
send_message(message) for msg in messages {
//for msg in messages { let ret = send_message(msg);
// let ret = send_message(msg); if ret != 0 {
// if ret != 0 { return ret;
// return ret; }
// } }
//} //debug!("dispatched decode unit: {:?}", std::time::Instant::now());
//0 0
} }
pub fn decoder_callbacks() -> Result<(DECODER_RENDERER_CALLBACKS, mpsc::Receiver<RendererMessage>)> pub fn decoder_callbacks() -> Result<(DECODER_RENDERER_CALLBACKS, mpsc::Receiver<RendererMessage>)>
@@ -2,7 +2,7 @@ use anyhow::{Result, anyhow};
use tokio::sync::mpsc; use tokio::sync::mpsc;
mod config; mod config;
mod decoder; pub mod decoder;
#[derive(Debug)] #[derive(Debug)]
pub struct GamestreamChannels { pub struct GamestreamChannels {
+69 -8
View File
@@ -3,9 +3,11 @@ use salvo::logging::Logger;
use salvo::prelude::*; use salvo::prelude::*;
mod apps; mod apps;
mod auth;
mod backend; mod backend;
mod certs; mod certs;
mod common; mod common;
mod db;
mod gamestream; mod gamestream;
mod pair; mod pair;
mod proxy; mod proxy;
@@ -40,12 +42,70 @@ fn create_static_handler() -> impl Handler {
async fn run_backend(port: u16) -> Result<()> { async fn run_backend(port: u16) -> Result<()> {
let backend = backend::Backend::new(port)?; let backend = backend::Backend::new(port)?;
// Seed default admin user if no users exist
if let Some((username, password)) = backend.db.seed_admin_if_needed()? {
tracing::info!("Created default admin user: {username}");
println!("===========================================");
println!(" Default admin credentials:");
println!(" Username: {username}");
println!(" Password: {password}");
println!("===========================================");
}
// Clean up expired sessions on startup
if let Ok(cleaned) = backend.db.cleanup_expired_sessions() {
if cleaned > 0 {
tracing::info!("Cleaned up {cleaned} expired sessions");
}
}
let backend_arc = std::sync::Arc::new(backend); let backend_arc = std::sync::Arc::new(backend);
let auth_middleware = auth::SessionAuthMiddleware {
db: std::sync::Arc::new(
db::Db::open(
&directories::ProjectDirs::from("xyz", "ohea", "gamestream-webtransport-proxy")
.ok_or(anyhow!("Could not get project dirs"))?
.data_dir()
.join("auth.db"),
)?,
),
};
let router = Router::new() let router = Router::new()
.push(Router::with_path("api/pair").post(backend_arc.post_pair())) // Public auth routes
.push(Router::with_path("api/apps").get(backend_arc.get_apps())) .push(Router::with_path("api/auth/login").post(backend_arc.login()))
.push(Router::with_path("api/stream/start").post(backend_arc.post_stream_start())) // Authenticated routes
.push(
Router::with_path("api")
.hoop(auth_middleware)
.push(Router::with_path("auth/logout").post(backend_arc.logout()))
.push(Router::with_path("auth/me").get(backend_arc.me()))
.push(Router::with_path("pair").post(backend_arc.post_pair()))
.push(Router::with_path("apps").get(backend_arc.get_apps()))
.push(Router::with_path("stream/start").post(backend_arc.post_stream_start()))
// Admin-only routes
.push(
Router::with_path("admin")
.hoop(auth::AdminCheckMiddleware)
.push(
Router::with_path("users")
.get(backend_arc.admin_list_users())
.post(backend_arc.admin_create_user()),
)
.push(
Router::with_path("users/<id>")
.put(backend_arc.admin_update_user())
.delete(backend_arc.admin_delete_user()),
)
.push(
Router::with_path("users/<id>/permissions")
.get(backend_arc.admin_get_permissions())
.put(backend_arc.admin_set_permissions()),
),
),
)
.push(Router::with_path("{*path}").get(create_static_handler())); .push(Router::with_path("{*path}").get(create_static_handler()));
let doc = OpenApi::new("test api", "0.0.1").merge_router(&router); let doc = OpenApi::new("test api", "0.0.1").merge_router(&router);
let router = router let router = router
@@ -64,11 +124,9 @@ async fn run_backend(port: u16) -> Result<()> {
Ok(()) Ok(())
} }
async fn run_proxy(port: u16, stream_id: uuid::Uuid) -> Result<()> { async fn run_proxy(port: u16, stream_id: uuid::Uuid, stream_token: String) -> Result<()> {
let (config, cert_hash) = certs::get_webtransport_stream_config(stream_id)?; let (config, cert_hash) = certs::get_webtransport_stream_config(stream_id)?;
//let config = certs::get_http_stream_config()?; let proxy = proxy::Proxy::new(cert_hash, stream_token);
//let cert_hash = [0; 32];
let proxy = proxy::Proxy::new(cert_hash);
let proxy_arc = std::sync::Arc::new(proxy); let proxy_arc = std::sync::Arc::new(proxy);
let router = Router::new() let router = Router::new()
@@ -108,8 +166,11 @@ async fn main() -> anyhow::Result<()> {
.nth(3) .nth(3)
.ok_or(anyhow!("Cert ID argument missing"))?, .ok_or(anyhow!("Cert ID argument missing"))?,
)?; )?;
let stream_token = std::env::args()
.nth(4)
.ok_or(anyhow!("Stream token argument missing"))?;
run_proxy(port, stream_id).await run_proxy(port, stream_id, stream_token).await
} }
_ => Err(anyhow!("Unknown mode: {mode}")), _ => Err(anyhow!("Unknown mode: {mode}")),
} }
@@ -29,6 +29,12 @@ async fn setup_webtransport(
) -> Result<( ) -> Result<(
impl tokio::io::AsyncWrite + Send + Sync + 'static, impl tokio::io::AsyncWrite + Send + Sync + 'static,
impl tokio::io::AsyncRead + Send + Sync + 'static, impl tokio::io::AsyncRead + Send + Sync + 'static,
h3_datagram::datagram_handler::DatagramSender<
<h3_quinn::Connection as h3_datagram::quic_traits::DatagramConnectionExt<
salvo::hyper::body::Bytes,
>>::SendDatagramHandler,
salvo::hyper::body::Bytes,
>,
//salvo::webtransport::stream::SendStream< //salvo::webtransport::stream::SendStream<
// impl salvo::proto::quic::SendStream<salvo::hyper::body::Bytes>, // impl salvo::proto::quic::SendStream<salvo::hyper::body::Bytes>,
// salvo::hyper::body::Bytes, // salvo::hyper::body::Bytes,
@@ -39,13 +45,17 @@ async fn setup_webtransport(
//>, //>,
)> { )> {
let session = req.web_transport_mut().await?; let session = req.web_transport_mut().await?;
let datagram_send = session.datagram_sender();
let bidirectional_stream = session let bidirectional_stream = session
.accept_bi() .accept_bi()
.await? .await?
.ok_or(anyhow!("No bidirectional stream"))?; .ok_or(anyhow!("No bidirectional stream"))?;
if let webtransport::server::AcceptedBi::BidiStream(_, stream) = bidirectional_stream { if let webtransport::server::AcceptedBi::BidiStream(_, stream) = bidirectional_stream {
Ok(stream.split()) let (stream_send, stream_recv) = stream.split();
Ok((stream_send, stream_recv, datagram_send))
} else { } else {
Err(anyhow!("bidirectional stream was of the wrong type")) Err(anyhow!("bidirectional stream was of the wrong type"))
} }
@@ -75,8 +85,21 @@ impl crate::proxy::Proxy {
description: "Could not start stream".to_string(), description: "Could not start stream".to_string(),
}); });
// Validate single-use stream token via the shared helper so this
// handler and its unit tests exercise the same code path.
let provided_token = req.query::<String>("token").unwrap_or_default();
if let Err(msg) = super::validate_stream_token(&self, &provided_token).await {
error!("Stream token validation failed: {msg}");
return Err(AppError {
status_code: StatusCode::UNAUTHORIZED,
description: msg,
});
}
info!("Stream token validated and consumed");
info!("WebTransport connection initiated"); info!("WebTransport connection initiated");
let (wt_send, wt_recv) = match setup_webtransport(req).await { let (wt_stream_send, wt_stream_recv, wt_datagram_send) = match setup_webtransport(req).await
{
Ok(w) => w, Ok(w) => w,
Err(e) => { Err(e) => {
error!("Could not upgrade connection to WebTransport: {e}"); error!("Could not upgrade connection to WebTransport: {e}");
@@ -95,7 +118,7 @@ impl crate::proxy::Proxy {
} }
}; };
match super::proxy_main(stream, wt_send, wt_recv).await { match super::proxy_main(stream, wt_stream_send, wt_stream_recv, wt_datagram_send).await {
Ok(()) => Ok(()), Ok(()) => Ok(()),
Err(e) => { Err(e) => {
error!("Proxy main loop failed: {e}"); error!("Proxy main loop failed: {e}");
+83 -10
View File
@@ -7,19 +7,20 @@ use crate::{backend, gamestream};
pub mod handler; pub mod handler;
mod input; mod input;
mod packet_parser; mod packet_parser;
mod video;
pub struct Proxy { pub struct Proxy {
pub cert_hash: [u8; 32], pub cert_hash: [u8; 32],
//pub cert_hash: String,
pub stream: RwLock<Option<backend::Stream>>, pub stream: RwLock<Option<backend::Stream>>,
pub stream_token: RwLock<Option<String>>,
} }
impl Proxy { impl Proxy {
pub fn new(cert_hash: [u8; 32]) -> Self { pub fn new(cert_hash: [u8; 32], stream_token: String) -> Self {
//pub fn new(cert_hash: String) -> Self {
Proxy { Proxy {
stream: RwLock::new(None), stream: RwLock::new(None),
cert_hash, cert_hash,
stream_token: RwLock::new(Some(stream_token)),
} }
} }
} }
@@ -33,6 +34,12 @@ async fn proxy_main(
stream: backend::Stream, stream: backend::Stream,
mut wt_send: impl tokio::io::AsyncWrite + Send + Sync + 'static + std::marker::Unpin, mut wt_send: impl tokio::io::AsyncWrite + Send + Sync + 'static + std::marker::Unpin,
mut wt_recv: impl tokio::io::AsyncRead + Send + Sync + 'static + std::marker::Unpin, mut wt_recv: impl tokio::io::AsyncRead + Send + Sync + 'static + std::marker::Unpin,
mut wt_datagram_send: h3_datagram::datagram_handler::DatagramSender<
<h3_quinn::Connection as h3_datagram::quic_traits::DatagramConnectionExt<
salvo::hyper::body::Bytes,
>>::SendDatagramHandler,
salvo::hyper::body::Bytes,
>,
) -> Result<()> { ) -> Result<()> {
debug!( debug!(
"Connecting to stream at address {} with stream config {:?}", "Connecting to stream at address {} with stream config {:?}",
@@ -42,20 +49,14 @@ async fn proxy_main(
let mut channels = spawn_gamestream(stream).await?; let mut channels = spawn_gamestream(stream).await?;
let mut packet_buffer = packet_parser::PacketBuffer::new(); let mut packet_buffer = packet_parser::PacketBuffer::new();
//let mut buffer = vec![0; 65536].into_boxed_slice();
let mut buffer = [0u8; 65536]; let mut buffer = [0u8; 65536];
//let mut buffer = vec![0; 65536].into_boxed_slice();
loop { loop {
select! { select! {
gamestream_packet = channels.gamestream_channels.decoder_rx.recv() => { gamestream_packet = channels.gamestream_channels.decoder_rx.recv() => {
match gamestream_packet { match gamestream_packet {
Some(frame) => { Some(frame) => {
let frame_json = serde_json::to_vec(&frame)?; video::send_video_update(&frame, &mut wt_datagram_send).await?;
let frame_json_len: u32 = <u32>::try_from(frame_json.len())?;
wt_send.write_all(&frame_json_len.to_le_bytes()).await?;
wt_send.write_all(&frame_json).await?;
} }
None => { None => {
error!("Decoder channel is None"); error!("Decoder channel is None");
@@ -77,6 +78,22 @@ async fn proxy_main(
Ok(()) Ok(())
} }
/// Validate a provided token against the stored token. Consumes the token on success (single-use).
/// Returns Ok(()) if valid, Err with description if invalid or already consumed.
pub async fn validate_stream_token(proxy: &Proxy, provided: &str) -> std::result::Result<(), String> {
let mut token_guard = proxy.stream_token.write().await;
match token_guard.take() {
Some(expected) if expected == provided => Ok(()),
Some(_) => {
// Wrong token: still consumed by the `take()` above. Any validation
// attempt — correct or not — invalidates the token, so a wrong
// guess cannot be followed by a correct one.
Err("Invalid stream token".to_string())
}
None => Err("Stream token already used".to_string()),
}
}
async fn spawn_gamestream(stream: backend::Stream) -> Result<Channels> { async fn spawn_gamestream(stream: backend::Stream) -> Result<Channels> {
let (tx, rx) = tokio::sync::oneshot::channel(); let (tx, rx) = tokio::sync::oneshot::channel();
let (stop_tx, stop_rx) = tokio::sync::oneshot::channel::<()>(); let (stop_tx, stop_rx) = tokio::sync::oneshot::channel::<()>();
@@ -98,3 +115,59 @@ async fn spawn_gamestream(stream: backend::Stream) -> Result<Channels> {
.context("Could not get gamestream communication channels")?, .context("Could not get gamestream communication channels")?,
}) })
} }
#[cfg(test)]
mod tests {
use super::*;
fn make_proxy(token: &str) -> Proxy {
Proxy {
cert_hash: [0u8; 32],
stream: RwLock::new(None),
stream_token: RwLock::new(Some(token.to_string())),
}
}
#[tokio::test]
async fn test_valid_token_accepted() {
let proxy = make_proxy("abc123");
let result = validate_stream_token(&proxy, "abc123").await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_wrong_token_rejected() {
let proxy = make_proxy("abc123");
let result = validate_stream_token(&proxy, "wrong").await;
assert!(result.is_err());
assert_eq!(result.unwrap_err(), "Invalid stream token");
}
#[tokio::test]
async fn test_missing_token_rejected() {
let proxy = make_proxy("abc123");
let result = validate_stream_token(&proxy, "").await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_token_consumed_after_use() {
let proxy = make_proxy("abc123");
let first = validate_stream_token(&proxy, "abc123").await;
assert!(first.is_ok());
let second = validate_stream_token(&proxy, "abc123").await;
assert!(second.is_err());
assert_eq!(second.unwrap_err(), "Stream token already used");
}
#[tokio::test]
async fn test_wrong_attempt_consumes_token() {
let proxy = make_proxy("abc123");
// Wrong token attempt should consume it
let _ = validate_stream_token(&proxy, "wrong").await;
// Correct token should also fail now
let result = validate_stream_token(&proxy, "abc123").await;
assert!(result.is_err());
}
}
@@ -0,0 +1,185 @@
use anyhow::Result;
use tracing::debug;
use crate::gamestream;
use video_generated::video_update;
mod video_generated;
fn create_setup_videoupdate(
video_format: &gamestream::decoder::VideoFormat,
width: u64,
height: u64,
redraw_rate: u64,
) -> Vec<u8> {
let mut builder = flatbuffers::FlatBufferBuilder::with_capacity(1024);
//TODO: this is hardcoded to h264 main profile
let video_format = Some(builder.create_string("avc1.4D401E"));
let setup = video_update::Setup::create(
&mut builder,
&video_update::SetupArgs {
video_format,
width: width as u16,
height: height as u16,
redraw_rate: redraw_rate as u16,
},
);
let video_update = video_update::VideoUpdate::create(
&mut builder,
&video_update::VideoUpdateArgs {
update_type: video_update::Update::Setup,
update: Some(setup.as_union_value()),
},
);
builder.finish(video_update, None);
builder.finished_data().to_vec()
}
fn create_decodeunitstart_videoupdate(
frame_number: u64,
frame_type: &gamestream::decoder::FrameType,
num_buffers: u64,
receive_time_ms: u64,
full_length: u64,
) -> Vec<u8> {
let mut builder = flatbuffers::FlatBufferBuilder::with_capacity(1024);
let frame_type_fb = match frame_type {
gamestream::decoder::FrameType::IDR => video_update::FrameType::IDR,
gamestream::decoder::FrameType::PFRAME => video_update::FrameType::PFRAME,
};
let decode_unit_start = video_update::DecodeUnitStart::create(
&mut builder,
&video_update::DecodeUnitStartArgs {
frame_number,
frame_type: frame_type_fb,
num_buffers,
receive_time_ms: receive_time_ms as u16,
full_length,
},
);
let video_update = video_update::VideoUpdate::create(
&mut builder,
&video_update::VideoUpdateArgs {
update_type: video_update::Update::DecodeUnitStart,
update: Some(decode_unit_start.as_union_value()),
},
);
builder.finish(video_update, None);
builder.finished_data().to_vec()
}
fn create_decodeunitbuffer_videoupdate(
frame_number: u64,
buffer_index: u64,
buffer_offset: u64,
buffer: &gamestream::decoder::Buffer,
) -> Vec<u8> {
let mut builder = flatbuffers::FlatBufferBuilder::with_capacity(1024);
let data_vector = builder.create_vector(&buffer.data);
let decode_unit_buffer = video_update::DecodeUnitBuffer::create(
&mut builder,
&video_update::DecodeUnitBufferArgs {
frame_number,
buffer_index,
buffer_offset,
data: Some(data_vector),
},
);
let video_update = video_update::VideoUpdate::create(
&mut builder,
&video_update::VideoUpdateArgs {
update_type: video_update::Update::DecodeUnitBuffer,
update: Some(decode_unit_buffer.as_union_value()),
},
);
builder.finish(video_update, None);
builder.finished_data().to_vec()
}
pub async fn send_video_update(
frame: &gamestream::decoder::RendererMessage,
wt_datagram_send: &mut h3_datagram::datagram_handler::DatagramSender<
<h3_quinn::Connection as h3_datagram::quic_traits::DatagramConnectionExt<
salvo::hyper::body::Bytes,
>>::SendDatagramHandler,
salvo::hyper::body::Bytes,
>,
) -> Result<()> {
let mut print_time = false;
let buffer = match frame {
gamestream::decoder::RendererMessage::Setup {
video_format,
width,
height,
redraw_rate,
dr_flags,
} => create_setup_videoupdate(video_format, *width, *height, *redraw_rate),
gamestream::decoder::RendererMessage::DecodeUnitStart {
frame_number,
frame_type,
num_buffers,
host_processing_latency,
receive_time_ms,
enqueue_time_ms,
presentation_time,
full_length,
hdr_active,
colorspace,
} => {
//debug!(
// "sending decodeunitstart {}: {:?}",
// *frame_number,
// std::time::Instant::now()
//);
create_decodeunitstart_videoupdate(
*frame_number,
frame_type,
*num_buffers,
*receive_time_ms,
*full_length,
)
}
gamestream::decoder::RendererMessage::DecodeUnitBuffer {
frame_number,
buffer_index,
buffer_offset,
buffer,
} => {
//debug!(
// "sending decodeunitbuffer {}/{}: {:?}",
// *frame_number,
// *buffer_index,
// std::time::Instant::now()
//);
create_decodeunitbuffer_videoupdate(
*frame_number,
*buffer_index,
*buffer_offset,
buffer,
)
}
};
let bytes = salvo::hyper::body::Bytes::copy_from_slice(&buffer);
wt_datagram_send.send_datagram(bytes)?;
debug!("sent start: {:?}", std::time::Instant::now());
//if (print_time) {
//}
Ok(())
}
@@ -0,0 +1,925 @@
// automatically generated by the FlatBuffers compiler, do not modify
// @generated
use core::mem;
use core::cmp::Ordering;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
#[allow(unused_imports, dead_code)]
pub mod video_update {
use core::mem;
use core::cmp::Ordering;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
#[deprecated(since = "2.0.0", note = "Use associated constants instead. This will no longer be generated in 2021.")]
pub const ENUM_MIN_FRAME_TYPE: i8 = 0;
#[deprecated(since = "2.0.0", note = "Use associated constants instead. This will no longer be generated in 2021.")]
pub const ENUM_MAX_FRAME_TYPE: i8 = 1;
#[deprecated(since = "2.0.0", note = "Use associated constants instead. This will no longer be generated in 2021.")]
#[allow(non_camel_case_types)]
pub const ENUM_VALUES_FRAME_TYPE: [FrameType; 2] = [
FrameType::PFRAME,
FrameType::IDR,
];
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
#[repr(transparent)]
pub struct FrameType(pub i8);
#[allow(non_upper_case_globals)]
impl FrameType {
pub const PFRAME: Self = Self(0);
pub const IDR: Self = Self(1);
pub const ENUM_MIN: i8 = 0;
pub const ENUM_MAX: i8 = 1;
pub const ENUM_VALUES: &'static [Self] = &[
Self::PFRAME,
Self::IDR,
];
/// Returns the variant's name or "" if unknown.
pub fn variant_name(self) -> Option<&'static str> {
match self {
Self::PFRAME => Some("PFRAME"),
Self::IDR => Some("IDR"),
_ => None,
}
}
}
impl core::fmt::Debug for FrameType {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
if let Some(name) = self.variant_name() {
f.write_str(name)
} else {
f.write_fmt(format_args!("<UNKNOWN {:?}>", self.0))
}
}
}
impl<'a> flatbuffers::Follow<'a> for FrameType {
type Inner = Self;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
let b = flatbuffers::read_scalar_at::<i8>(buf, loc);
Self(b)
}
}
impl flatbuffers::Push for FrameType {
type Output = FrameType;
#[inline]
unsafe fn push(&self, dst: &mut [u8], _written_len: usize) {
flatbuffers::emplace_scalar::<i8>(dst, self.0);
}
}
impl flatbuffers::EndianScalar for FrameType {
type Scalar = i8;
#[inline]
fn to_little_endian(self) -> i8 {
self.0.to_le()
}
#[inline]
#[allow(clippy::wrong_self_convention)]
fn from_little_endian(v: i8) -> Self {
let b = i8::from_le(v);
Self(b)
}
}
impl<'a> flatbuffers::Verifiable for FrameType {
#[inline]
fn run_verifier(
v: &mut flatbuffers::Verifier, pos: usize
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
i8::run_verifier(v, pos)
}
}
impl flatbuffers::SimpleToVerifyInSlice for FrameType {}
#[deprecated(since = "2.0.0", note = "Use associated constants instead. This will no longer be generated in 2021.")]
pub const ENUM_MIN_UPDATE: u8 = 0;
#[deprecated(since = "2.0.0", note = "Use associated constants instead. This will no longer be generated in 2021.")]
pub const ENUM_MAX_UPDATE: u8 = 3;
#[deprecated(since = "2.0.0", note = "Use associated constants instead. This will no longer be generated in 2021.")]
#[allow(non_camel_case_types)]
pub const ENUM_VALUES_UPDATE: [Update; 4] = [
Update::NONE,
Update::Setup,
Update::DecodeUnitStart,
Update::DecodeUnitBuffer,
];
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
#[repr(transparent)]
pub struct Update(pub u8);
#[allow(non_upper_case_globals)]
impl Update {
pub const NONE: Self = Self(0);
pub const Setup: Self = Self(1);
pub const DecodeUnitStart: Self = Self(2);
pub const DecodeUnitBuffer: Self = Self(3);
pub const ENUM_MIN: u8 = 0;
pub const ENUM_MAX: u8 = 3;
pub const ENUM_VALUES: &'static [Self] = &[
Self::NONE,
Self::Setup,
Self::DecodeUnitStart,
Self::DecodeUnitBuffer,
];
/// Returns the variant's name or "" if unknown.
pub fn variant_name(self) -> Option<&'static str> {
match self {
Self::NONE => Some("NONE"),
Self::Setup => Some("Setup"),
Self::DecodeUnitStart => Some("DecodeUnitStart"),
Self::DecodeUnitBuffer => Some("DecodeUnitBuffer"),
_ => None,
}
}
}
impl core::fmt::Debug for Update {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
if let Some(name) = self.variant_name() {
f.write_str(name)
} else {
f.write_fmt(format_args!("<UNKNOWN {:?}>", self.0))
}
}
}
impl<'a> flatbuffers::Follow<'a> for Update {
type Inner = Self;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
let b = flatbuffers::read_scalar_at::<u8>(buf, loc);
Self(b)
}
}
impl flatbuffers::Push for Update {
type Output = Update;
#[inline]
unsafe fn push(&self, dst: &mut [u8], _written_len: usize) {
flatbuffers::emplace_scalar::<u8>(dst, self.0);
}
}
impl flatbuffers::EndianScalar for Update {
type Scalar = u8;
#[inline]
fn to_little_endian(self) -> u8 {
self.0.to_le()
}
#[inline]
#[allow(clippy::wrong_self_convention)]
fn from_little_endian(v: u8) -> Self {
let b = u8::from_le(v);
Self(b)
}
}
impl<'a> flatbuffers::Verifiable for Update {
#[inline]
fn run_verifier(
v: &mut flatbuffers::Verifier, pos: usize
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
u8::run_verifier(v, pos)
}
}
impl flatbuffers::SimpleToVerifyInSlice for Update {}
pub struct UpdateUnionTableOffset {}
pub enum SetupOffset {}
#[derive(Copy, Clone, PartialEq)]
pub struct Setup<'a> {
pub _tab: flatbuffers::Table<'a>,
}
impl<'a> flatbuffers::Follow<'a> for Setup<'a> {
type Inner = Setup<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self { _tab: flatbuffers::Table::new(buf, loc) }
}
}
impl<'a> Setup<'a> {
pub const VT_VIDEO_FORMAT: flatbuffers::VOffsetT = 4;
pub const VT_WIDTH: flatbuffers::VOffsetT = 6;
pub const VT_HEIGHT: flatbuffers::VOffsetT = 8;
pub const VT_REDRAW_RATE: flatbuffers::VOffsetT = 10;
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
Setup { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args SetupArgs<'args>
) -> flatbuffers::WIPOffset<Setup<'bldr>> {
let mut builder = SetupBuilder::new(_fbb);
if let Some(x) = args.video_format { builder.add_video_format(x); }
builder.add_redraw_rate(args.redraw_rate);
builder.add_height(args.height);
builder.add_width(args.width);
builder.finish()
}
#[inline]
pub fn video_format(&self) -> Option<&'a str> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<flatbuffers::ForwardsUOffset<&str>>(Setup::VT_VIDEO_FORMAT, None)}
}
#[inline]
pub fn width(&self) -> u16 {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<u16>(Setup::VT_WIDTH, Some(0)).unwrap()}
}
#[inline]
pub fn height(&self) -> u16 {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<u16>(Setup::VT_HEIGHT, Some(0)).unwrap()}
}
#[inline]
pub fn redraw_rate(&self) -> u16 {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<u16>(Setup::VT_REDRAW_RATE, Some(0)).unwrap()}
}
}
impl flatbuffers::Verifiable for Setup<'_> {
#[inline]
fn run_verifier(
v: &mut flatbuffers::Verifier, pos: usize
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<&str>>("video_format", Self::VT_VIDEO_FORMAT, false)?
.visit_field::<u16>("width", Self::VT_WIDTH, false)?
.visit_field::<u16>("height", Self::VT_HEIGHT, false)?
.visit_field::<u16>("redraw_rate", Self::VT_REDRAW_RATE, false)?
.finish();
Ok(())
}
}
pub struct SetupArgs<'a> {
pub video_format: Option<flatbuffers::WIPOffset<&'a str>>,
pub width: u16,
pub height: u16,
pub redraw_rate: u16,
}
impl<'a> Default for SetupArgs<'a> {
#[inline]
fn default() -> Self {
SetupArgs {
video_format: None,
width: 0,
height: 0,
redraw_rate: 0,
}
}
}
pub struct SetupBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> SetupBuilder<'a, 'b, A> {
#[inline]
pub fn add_video_format(&mut self, video_format: flatbuffers::WIPOffset<&'b str>) {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(Setup::VT_VIDEO_FORMAT, video_format);
}
#[inline]
pub fn add_width(&mut self, width: u16) {
self.fbb_.push_slot::<u16>(Setup::VT_WIDTH, width, 0);
}
#[inline]
pub fn add_height(&mut self, height: u16) {
self.fbb_.push_slot::<u16>(Setup::VT_HEIGHT, height, 0);
}
#[inline]
pub fn add_redraw_rate(&mut self, redraw_rate: u16) {
self.fbb_.push_slot::<u16>(Setup::VT_REDRAW_RATE, redraw_rate, 0);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> SetupBuilder<'a, 'b, A> {
let start = _fbb.start_table();
SetupBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<Setup<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl core::fmt::Debug for Setup<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("Setup");
ds.field("video_format", &self.video_format());
ds.field("width", &self.width());
ds.field("height", &self.height());
ds.field("redraw_rate", &self.redraw_rate());
ds.finish()
}
}
pub enum DecodeUnitStartOffset {}
#[derive(Copy, Clone, PartialEq)]
pub struct DecodeUnitStart<'a> {
pub _tab: flatbuffers::Table<'a>,
}
impl<'a> flatbuffers::Follow<'a> for DecodeUnitStart<'a> {
type Inner = DecodeUnitStart<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self { _tab: flatbuffers::Table::new(buf, loc) }
}
}
impl<'a> DecodeUnitStart<'a> {
pub const VT_FRAME_NUMBER: flatbuffers::VOffsetT = 4;
pub const VT_FRAME_TYPE: flatbuffers::VOffsetT = 6;
pub const VT_NUM_BUFFERS: flatbuffers::VOffsetT = 8;
pub const VT_RECEIVE_TIME_MS: flatbuffers::VOffsetT = 10;
pub const VT_FULL_LENGTH: flatbuffers::VOffsetT = 12;
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
DecodeUnitStart { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args DecodeUnitStartArgs
) -> flatbuffers::WIPOffset<DecodeUnitStart<'bldr>> {
let mut builder = DecodeUnitStartBuilder::new(_fbb);
builder.add_full_length(args.full_length);
builder.add_num_buffers(args.num_buffers);
builder.add_frame_number(args.frame_number);
builder.add_receive_time_ms(args.receive_time_ms);
builder.add_frame_type(args.frame_type);
builder.finish()
}
#[inline]
pub fn frame_number(&self) -> u64 {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<u64>(DecodeUnitStart::VT_FRAME_NUMBER, Some(0)).unwrap()}
}
#[inline]
pub fn frame_type(&self) -> FrameType {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<FrameType>(DecodeUnitStart::VT_FRAME_TYPE, Some(FrameType::PFRAME)).unwrap()}
}
#[inline]
pub fn num_buffers(&self) -> u64 {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<u64>(DecodeUnitStart::VT_NUM_BUFFERS, Some(0)).unwrap()}
}
#[inline]
pub fn receive_time_ms(&self) -> u16 {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<u16>(DecodeUnitStart::VT_RECEIVE_TIME_MS, Some(0)).unwrap()}
}
#[inline]
pub fn full_length(&self) -> u64 {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<u64>(DecodeUnitStart::VT_FULL_LENGTH, Some(0)).unwrap()}
}
}
impl flatbuffers::Verifiable for DecodeUnitStart<'_> {
#[inline]
fn run_verifier(
v: &mut flatbuffers::Verifier, pos: usize
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<u64>("frame_number", Self::VT_FRAME_NUMBER, false)?
.visit_field::<FrameType>("frame_type", Self::VT_FRAME_TYPE, false)?
.visit_field::<u64>("num_buffers", Self::VT_NUM_BUFFERS, false)?
.visit_field::<u16>("receive_time_ms", Self::VT_RECEIVE_TIME_MS, false)?
.visit_field::<u64>("full_length", Self::VT_FULL_LENGTH, false)?
.finish();
Ok(())
}
}
pub struct DecodeUnitStartArgs {
pub frame_number: u64,
pub frame_type: FrameType,
pub num_buffers: u64,
pub receive_time_ms: u16,
pub full_length: u64,
}
impl<'a> Default for DecodeUnitStartArgs {
#[inline]
fn default() -> Self {
DecodeUnitStartArgs {
frame_number: 0,
frame_type: FrameType::PFRAME,
num_buffers: 0,
receive_time_ms: 0,
full_length: 0,
}
}
}
pub struct DecodeUnitStartBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> DecodeUnitStartBuilder<'a, 'b, A> {
#[inline]
pub fn add_frame_number(&mut self, frame_number: u64) {
self.fbb_.push_slot::<u64>(DecodeUnitStart::VT_FRAME_NUMBER, frame_number, 0);
}
#[inline]
pub fn add_frame_type(&mut self, frame_type: FrameType) {
self.fbb_.push_slot::<FrameType>(DecodeUnitStart::VT_FRAME_TYPE, frame_type, FrameType::PFRAME);
}
#[inline]
pub fn add_num_buffers(&mut self, num_buffers: u64) {
self.fbb_.push_slot::<u64>(DecodeUnitStart::VT_NUM_BUFFERS, num_buffers, 0);
}
#[inline]
pub fn add_receive_time_ms(&mut self, receive_time_ms: u16) {
self.fbb_.push_slot::<u16>(DecodeUnitStart::VT_RECEIVE_TIME_MS, receive_time_ms, 0);
}
#[inline]
pub fn add_full_length(&mut self, full_length: u64) {
self.fbb_.push_slot::<u64>(DecodeUnitStart::VT_FULL_LENGTH, full_length, 0);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> DecodeUnitStartBuilder<'a, 'b, A> {
let start = _fbb.start_table();
DecodeUnitStartBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<DecodeUnitStart<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl core::fmt::Debug for DecodeUnitStart<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("DecodeUnitStart");
ds.field("frame_number", &self.frame_number());
ds.field("frame_type", &self.frame_type());
ds.field("num_buffers", &self.num_buffers());
ds.field("receive_time_ms", &self.receive_time_ms());
ds.field("full_length", &self.full_length());
ds.finish()
}
}
pub enum DecodeUnitBufferOffset {}
#[derive(Copy, Clone, PartialEq)]
pub struct DecodeUnitBuffer<'a> {
pub _tab: flatbuffers::Table<'a>,
}
impl<'a> flatbuffers::Follow<'a> for DecodeUnitBuffer<'a> {
type Inner = DecodeUnitBuffer<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self { _tab: flatbuffers::Table::new(buf, loc) }
}
}
impl<'a> DecodeUnitBuffer<'a> {
pub const VT_FRAME_NUMBER: flatbuffers::VOffsetT = 4;
pub const VT_BUFFER_INDEX: flatbuffers::VOffsetT = 6;
pub const VT_BUFFER_OFFSET: flatbuffers::VOffsetT = 8;
pub const VT_DATA: flatbuffers::VOffsetT = 10;
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
DecodeUnitBuffer { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args DecodeUnitBufferArgs<'args>
) -> flatbuffers::WIPOffset<DecodeUnitBuffer<'bldr>> {
let mut builder = DecodeUnitBufferBuilder::new(_fbb);
builder.add_buffer_offset(args.buffer_offset);
builder.add_buffer_index(args.buffer_index);
builder.add_frame_number(args.frame_number);
if let Some(x) = args.data { builder.add_data(x); }
builder.finish()
}
#[inline]
pub fn frame_number(&self) -> u64 {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<u64>(DecodeUnitBuffer::VT_FRAME_NUMBER, Some(0)).unwrap()}
}
#[inline]
pub fn buffer_index(&self) -> u64 {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<u64>(DecodeUnitBuffer::VT_BUFFER_INDEX, Some(0)).unwrap()}
}
#[inline]
pub fn buffer_offset(&self) -> u64 {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<u64>(DecodeUnitBuffer::VT_BUFFER_OFFSET, Some(0)).unwrap()}
}
#[inline]
pub fn data(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(DecodeUnitBuffer::VT_DATA, None)}
}
}
impl flatbuffers::Verifiable for DecodeUnitBuffer<'_> {
#[inline]
fn run_verifier(
v: &mut flatbuffers::Verifier, pos: usize
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<u64>("frame_number", Self::VT_FRAME_NUMBER, false)?
.visit_field::<u64>("buffer_index", Self::VT_BUFFER_INDEX, false)?
.visit_field::<u64>("buffer_offset", Self::VT_BUFFER_OFFSET, false)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("data", Self::VT_DATA, false)?
.finish();
Ok(())
}
}
pub struct DecodeUnitBufferArgs<'a> {
pub frame_number: u64,
pub buffer_index: u64,
pub buffer_offset: u64,
pub data: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for DecodeUnitBufferArgs<'a> {
#[inline]
fn default() -> Self {
DecodeUnitBufferArgs {
frame_number: 0,
buffer_index: 0,
buffer_offset: 0,
data: None,
}
}
}
pub struct DecodeUnitBufferBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> DecodeUnitBufferBuilder<'a, 'b, A> {
#[inline]
pub fn add_frame_number(&mut self, frame_number: u64) {
self.fbb_.push_slot::<u64>(DecodeUnitBuffer::VT_FRAME_NUMBER, frame_number, 0);
}
#[inline]
pub fn add_buffer_index(&mut self, buffer_index: u64) {
self.fbb_.push_slot::<u64>(DecodeUnitBuffer::VT_BUFFER_INDEX, buffer_index, 0);
}
#[inline]
pub fn add_buffer_offset(&mut self, buffer_offset: u64) {
self.fbb_.push_slot::<u64>(DecodeUnitBuffer::VT_BUFFER_OFFSET, buffer_offset, 0);
}
#[inline]
pub fn add_data(&mut self, data: flatbuffers::WIPOffset<flatbuffers::Vector<'b , u8>>) {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(DecodeUnitBuffer::VT_DATA, data);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> DecodeUnitBufferBuilder<'a, 'b, A> {
let start = _fbb.start_table();
DecodeUnitBufferBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<DecodeUnitBuffer<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl core::fmt::Debug for DecodeUnitBuffer<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("DecodeUnitBuffer");
ds.field("frame_number", &self.frame_number());
ds.field("buffer_index", &self.buffer_index());
ds.field("buffer_offset", &self.buffer_offset());
ds.field("data", &self.data());
ds.finish()
}
}
pub enum VideoUpdateOffset {}
#[derive(Copy, Clone, PartialEq)]
pub struct VideoUpdate<'a> {
pub _tab: flatbuffers::Table<'a>,
}
impl<'a> flatbuffers::Follow<'a> for VideoUpdate<'a> {
type Inner = VideoUpdate<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self { _tab: flatbuffers::Table::new(buf, loc) }
}
}
impl<'a> VideoUpdate<'a> {
pub const VT_UPDATE_TYPE: flatbuffers::VOffsetT = 4;
pub const VT_UPDATE: flatbuffers::VOffsetT = 6;
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
VideoUpdate { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args VideoUpdateArgs
) -> flatbuffers::WIPOffset<VideoUpdate<'bldr>> {
let mut builder = VideoUpdateBuilder::new(_fbb);
if let Some(x) = args.update { builder.add_update(x); }
builder.add_update_type(args.update_type);
builder.finish()
}
#[inline]
pub fn update_type(&self) -> Update {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<Update>(VideoUpdate::VT_UPDATE_TYPE, Some(Update::NONE)).unwrap()}
}
#[inline]
pub fn update(&self) -> Option<flatbuffers::Table<'a>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<flatbuffers::ForwardsUOffset<flatbuffers::Table<'a>>>(VideoUpdate::VT_UPDATE, None)}
}
#[inline]
#[allow(non_snake_case)]
pub fn update_as_setup(&self) -> Option<Setup<'a>> {
if self.update_type() == Update::Setup {
self.update().map(|t| {
// Safety:
// Created from a valid Table for this object
// Which contains a valid union in this slot
unsafe { Setup::init_from_table(t) }
})
} else {
None
}
}
#[inline]
#[allow(non_snake_case)]
pub fn update_as_decode_unit_start(&self) -> Option<DecodeUnitStart<'a>> {
if self.update_type() == Update::DecodeUnitStart {
self.update().map(|t| {
// Safety:
// Created from a valid Table for this object
// Which contains a valid union in this slot
unsafe { DecodeUnitStart::init_from_table(t) }
})
} else {
None
}
}
#[inline]
#[allow(non_snake_case)]
pub fn update_as_decode_unit_buffer(&self) -> Option<DecodeUnitBuffer<'a>> {
if self.update_type() == Update::DecodeUnitBuffer {
self.update().map(|t| {
// Safety:
// Created from a valid Table for this object
// Which contains a valid union in this slot
unsafe { DecodeUnitBuffer::init_from_table(t) }
})
} else {
None
}
}
}
impl flatbuffers::Verifiable for VideoUpdate<'_> {
#[inline]
fn run_verifier(
v: &mut flatbuffers::Verifier, pos: usize
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_union::<Update, _>("update_type", Self::VT_UPDATE_TYPE, "update", Self::VT_UPDATE, false, |key, v, pos| {
match key {
Update::Setup => v.verify_union_variant::<flatbuffers::ForwardsUOffset<Setup>>("Update::Setup", pos),
Update::DecodeUnitStart => v.verify_union_variant::<flatbuffers::ForwardsUOffset<DecodeUnitStart>>("Update::DecodeUnitStart", pos),
Update::DecodeUnitBuffer => v.verify_union_variant::<flatbuffers::ForwardsUOffset<DecodeUnitBuffer>>("Update::DecodeUnitBuffer", pos),
_ => Ok(()),
}
})?
.finish();
Ok(())
}
}
pub struct VideoUpdateArgs {
pub update_type: Update,
pub update: Option<flatbuffers::WIPOffset<flatbuffers::UnionWIPOffset>>,
}
impl<'a> Default for VideoUpdateArgs {
#[inline]
fn default() -> Self {
VideoUpdateArgs {
update_type: Update::NONE,
update: None,
}
}
}
pub struct VideoUpdateBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> VideoUpdateBuilder<'a, 'b, A> {
#[inline]
pub fn add_update_type(&mut self, update_type: Update) {
self.fbb_.push_slot::<Update>(VideoUpdate::VT_UPDATE_TYPE, update_type, Update::NONE);
}
#[inline]
pub fn add_update(&mut self, update: flatbuffers::WIPOffset<flatbuffers::UnionWIPOffset>) {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(VideoUpdate::VT_UPDATE, update);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> VideoUpdateBuilder<'a, 'b, A> {
let start = _fbb.start_table();
VideoUpdateBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<VideoUpdate<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl core::fmt::Debug for VideoUpdate<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("VideoUpdate");
ds.field("update_type", &self.update_type());
match self.update_type() {
Update::Setup => {
if let Some(x) = self.update_as_setup() {
ds.field("update", &x)
} else {
ds.field("update", &"InvalidFlatbuffer: Union discriminant does not match value.")
}
},
Update::DecodeUnitStart => {
if let Some(x) = self.update_as_decode_unit_start() {
ds.field("update", &x)
} else {
ds.field("update", &"InvalidFlatbuffer: Union discriminant does not match value.")
}
},
Update::DecodeUnitBuffer => {
if let Some(x) = self.update_as_decode_unit_buffer() {
ds.field("update", &x)
} else {
ds.field("update", &"InvalidFlatbuffer: Union discriminant does not match value.")
}
},
_ => {
let x: Option<()> = None;
ds.field("update", &x)
},
};
ds.finish()
}
}
#[inline]
/// Verifies that a buffer of bytes contains a `VideoUpdate`
/// and returns it.
/// Note that verification is still experimental and may not
/// catch every error, or be maximally performant. For the
/// previous, unchecked, behavior use
/// `root_as_video_update_unchecked`.
pub fn root_as_video_update(buf: &[u8]) -> Result<VideoUpdate, flatbuffers::InvalidFlatbuffer> {
flatbuffers::root::<VideoUpdate>(buf)
}
#[inline]
/// Verifies that a buffer of bytes contains a size prefixed
/// `VideoUpdate` and returns it.
/// Note that verification is still experimental and may not
/// catch every error, or be maximally performant. For the
/// previous, unchecked, behavior use
/// `size_prefixed_root_as_video_update_unchecked`.
pub fn size_prefixed_root_as_video_update(buf: &[u8]) -> Result<VideoUpdate, flatbuffers::InvalidFlatbuffer> {
flatbuffers::size_prefixed_root::<VideoUpdate>(buf)
}
#[inline]
/// Verifies, with the given options, that a buffer of bytes
/// contains a `VideoUpdate` and returns it.
/// Note that verification is still experimental and may not
/// catch every error, or be maximally performant. For the
/// previous, unchecked, behavior use
/// `root_as_video_update_unchecked`.
pub fn root_as_video_update_with_opts<'b, 'o>(
opts: &'o flatbuffers::VerifierOptions,
buf: &'b [u8],
) -> Result<VideoUpdate<'b>, flatbuffers::InvalidFlatbuffer> {
flatbuffers::root_with_opts::<VideoUpdate<'b>>(opts, buf)
}
#[inline]
/// Verifies, with the given verifier options, that a buffer of
/// bytes contains a size prefixed `VideoUpdate` and returns
/// it. Note that verification is still experimental and may not
/// catch every error, or be maximally performant. For the
/// previous, unchecked, behavior use
/// `root_as_video_update_unchecked`.
pub fn size_prefixed_root_as_video_update_with_opts<'b, 'o>(
opts: &'o flatbuffers::VerifierOptions,
buf: &'b [u8],
) -> Result<VideoUpdate<'b>, flatbuffers::InvalidFlatbuffer> {
flatbuffers::size_prefixed_root_with_opts::<VideoUpdate<'b>>(opts, buf)
}
#[inline]
/// Assumes, without verification, that a buffer of bytes contains a VideoUpdate and returns it.
/// # Safety
/// Callers must trust the given bytes do indeed contain a valid `VideoUpdate`.
pub unsafe fn root_as_video_update_unchecked(buf: &[u8]) -> VideoUpdate {
flatbuffers::root_unchecked::<VideoUpdate>(buf)
}
#[inline]
/// Assumes, without verification, that a buffer of bytes contains a size prefixed VideoUpdate and returns it.
/// # Safety
/// Callers must trust the given bytes do indeed contain a valid size prefixed `VideoUpdate`.
pub unsafe fn size_prefixed_root_as_video_update_unchecked(buf: &[u8]) -> VideoUpdate {
flatbuffers::size_prefixed_root_unchecked::<VideoUpdate>(buf)
}
#[inline]
pub fn finish_video_update_buffer<'a, 'b, A: flatbuffers::Allocator + 'a>(
fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
root: flatbuffers::WIPOffset<VideoUpdate<'a>>) {
fbb.finish(root, None);
}
#[inline]
pub fn finish_size_prefixed_video_update_buffer<'a, 'b, A: flatbuffers::Allocator + 'a>(fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>, root: flatbuffers::WIPOffset<VideoUpdate<'a>>) {
fbb.finish_size_prefixed(root, None);
}
} // pub mod VideoUpdate
+49 -3
View File
@@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize};
use tracing::{debug, error, info}; use tracing::{debug, error, info};
use crate::{ use crate::{
auth,
common::{AppError, AppResult, get_url}, common::{AppError, AppResult, get_url},
proxy, responses, proxy, responses,
state::{GamestreamServer, StateReadAccess, StateReader}, state::{GamestreamServer, StateReadAccess, StateReader},
@@ -24,7 +25,7 @@ struct PostStreamStartParams {
struct PostStreamStartResponse { struct PostStreamStartResponse {
url: String, url: String,
cert_hash: [u8; 32], cert_hash: [u8; 32],
//cert_hash: String, stream_token: String,
} }
#[derive(Deserialize)] #[derive(Deserialize)]
@@ -81,12 +82,40 @@ impl crate::backend::Backend {
self: ::std::sync::Arc<Self>, self: ::std::sync::Arc<Self>,
body: salvo::oapi::extract::JsonBody<PostStreamStartParams>, body: salvo::oapi::extract::JsonBody<PostStreamStartParams>,
req: &mut Request, req: &mut Request,
depot: &mut Depot,
) -> AppResult<Json<PostStreamStartResponse>> { ) -> AppResult<Json<PostStreamStartResponse>> {
let standard_error = Err(crate::common::AppError { let standard_error = Err(crate::common::AppError {
status_code: StatusCode::INTERNAL_SERVER_ERROR, status_code: StatusCode::INTERNAL_SERVER_ERROR,
description: "Could not start stream".to_string(), description: "Could not start stream".to_string(),
}); });
// Check app permission
let user = match auth::get_user_from_depot(depot) {
Some(u) => u.clone(),
None => {
error!("post_stream_start reached without authenticated user in depot");
return Err(AppError {
status_code: StatusCode::UNAUTHORIZED,
description: "Not authenticated".to_string(),
});
}
};
if !user.is_admin {
match self.db.check_app_permission(&user.id, &body.server, body.id as i64) {
Ok(true) => {}
Ok(false) => {
return Err(AppError {
status_code: StatusCode::FORBIDDEN,
description: "You do not have permission to access this application".to_string(),
});
}
Err(e) => {
error!("Permission check error: {e}");
return standard_error;
}
}
}
let reader = self.state.read().await; let reader = self.state.read().await;
let server = match get_server(&reader, &body.server) { let server = match get_server(&reader, &body.server) {
@@ -272,6 +301,19 @@ impl crate::backend::Backend {
let port = self.port + <u16>::try_from((*writer).len()).unwrap(); let port = self.port + <u16>::try_from((*writer).len()).unwrap();
// Generate single-use stream token for proxy authentication
let stream_token = {
let mut bytes = [0u8; 32];
openssl::rand::rand_bytes(&mut bytes).map_err(|e| {
error!("Failed to generate stream token: {e}");
AppError {
status_code: StatusCode::INTERNAL_SERVER_ERROR,
description: "Could not start stream".to_string(),
}
})?;
hex::encode(bytes)
};
// Spawn WebTransport proxy // Spawn WebTransport proxy
let binary_path = match std::env::current_exe() { let binary_path = match std::env::current_exe() {
Ok(b) => b, Ok(b) => b,
@@ -285,7 +327,7 @@ impl crate::backend::Backend {
stream_id, port stream_id, port
); );
match tokio::process::Command::new(binary_path) match tokio::process::Command::new(binary_path)
.args(["proxy", &port.to_string(), &stream_id.to_string()]) .args(["proxy", &port.to_string(), &stream_id.to_string(), &stream_token])
.spawn() .spawn()
{ {
Ok(_) => (), Ok(_) => (),
@@ -315,7 +357,10 @@ impl crate::backend::Backend {
let webtransport_url = url_constructor::UrlConstructor::new() let webtransport_url = url_constructor::UrlConstructor::new()
.scheme("https") .scheme("https")
.host(host) // TODO: this is hardcoded to 127.0.0.1 to fix problems with
// tls certificates and IPv6 in chrome. This needs to eventually be fixed
// but I don't actually know what the fix is
.host("127.0.0.1")
.port(port) .port(port)
.subdir("api/stream/connect") .subdir("api/stream/connect")
.build(); .build();
@@ -323,6 +368,7 @@ impl crate::backend::Backend {
let post_stream_response = PostStreamStartResponse { let post_stream_response = PostStreamStartResponse {
url: webtransport_url, url: webtransport_url,
cert_hash: setup_resp.cert_hash, cert_hash: setup_resp.cert_hash,
stream_token,
}; };
Ok(Json(post_stream_response)) Ok(Json(post_stream_response))
+43
View File
@@ -0,0 +1,43 @@
namespace VideoUpdate;
table Setup {
video_format: string;
width: uint16;
height: uint16;
redraw_rate: uint16;
}
enum FrameType: byte {
PFRAME,
IDR,
}
table DecodeUnitStart {
frame_number: uint64;
frame_type: FrameType;
num_buffers: uint64;
receive_time_ms: uint16;
full_length: uint64;
}
table DecodeUnitBuffer {
frame_number: uint64;
buffer_index: uint64;
buffer_offset: uint64;
data: [ubyte];
}
union Update {
Setup:Setup,
DecodeUnitStart:DecodeUnitStart,
DecodeUnitBuffer:DecodeUnitBuffer,
}
table VideoUpdate {
update: Update;
}
root_type VideoUpdate;