backend and frontend: support out of order chunks + now it's performant on chrome

This commit is contained in:
2025-08-12 02:20:46 -06:00
parent 7afd8db8d8
commit e80543144a
21 changed files with 876 additions and 253 deletions
+2 -1
View File
@@ -2,7 +2,8 @@
/* eslint-disable @typescript-eslint/no-unused-vars, @typescript-eslint/no-explicit-any, @typescript-eslint/no-non-null-assertion */
export { DecodeUnit } from './video-update/decode-unit.js';
export { DecodeUnitBuffer } from './video-update/decode-unit-buffer.js';
export { DecodeUnitStart } from './video-update/decode-unit-start.js';
export { FrameType } from './video-update/frame-type.js';
export { Setup } from './video-update/setup.js';
export { Update } from './video-update/update.js';
@@ -0,0 +1,100 @@
// automatically generated by the FlatBuffers compiler, do not modify
/* eslint-disable @typescript-eslint/no-unused-vars, @typescript-eslint/no-explicit-any, @typescript-eslint/no-non-null-assertion */
import * as flatbuffers from 'flatbuffers';
export class DecodeUnitBuffer {
bb: flatbuffers.ByteBuffer|null = null;
bb_pos = 0;
__init(i:number, bb:flatbuffers.ByteBuffer):DecodeUnitBuffer {
this.bb_pos = i;
this.bb = bb;
return this;
}
static getRootAsDecodeUnitBuffer(bb:flatbuffers.ByteBuffer, obj?:DecodeUnitBuffer):DecodeUnitBuffer {
return (obj || new DecodeUnitBuffer()).__init(bb.readInt32(bb.position()) + bb.position(), bb);
}
static getSizePrefixedRootAsDecodeUnitBuffer(bb:flatbuffers.ByteBuffer, obj?:DecodeUnitBuffer):DecodeUnitBuffer {
bb.setPosition(bb.position() + flatbuffers.SIZE_PREFIX_LENGTH);
return (obj || new DecodeUnitBuffer()).__init(bb.readInt32(bb.position()) + bb.position(), bb);
}
frameNumber():bigint {
const offset = this.bb!.__offset(this.bb_pos, 4);
return offset ? this.bb!.readUint64(this.bb_pos + offset) : BigInt('0');
}
bufferIndex():bigint {
const offset = this.bb!.__offset(this.bb_pos, 6);
return offset ? this.bb!.readUint64(this.bb_pos + offset) : BigInt('0');
}
bufferOffset():bigint {
const offset = this.bb!.__offset(this.bb_pos, 8);
return offset ? this.bb!.readUint64(this.bb_pos + offset) : BigInt('0');
}
data(index: number):number|null {
const offset = this.bb!.__offset(this.bb_pos, 10);
return offset ? this.bb!.readUint8(this.bb!.__vector(this.bb_pos + offset) + index) : 0;
}
dataLength():number {
const offset = this.bb!.__offset(this.bb_pos, 10);
return offset ? this.bb!.__vector_len(this.bb_pos + offset) : 0;
}
dataArray():Uint8Array|null {
const offset = this.bb!.__offset(this.bb_pos, 10);
return offset ? new Uint8Array(this.bb!.bytes().buffer, this.bb!.bytes().byteOffset + this.bb!.__vector(this.bb_pos + offset), this.bb!.__vector_len(this.bb_pos + offset)) : null;
}
static startDecodeUnitBuffer(builder:flatbuffers.Builder) {
builder.startObject(4);
}
static addFrameNumber(builder:flatbuffers.Builder, frameNumber:bigint) {
builder.addFieldInt64(0, frameNumber, BigInt('0'));
}
static addBufferIndex(builder:flatbuffers.Builder, bufferIndex:bigint) {
builder.addFieldInt64(1, bufferIndex, BigInt('0'));
}
static addBufferOffset(builder:flatbuffers.Builder, bufferOffset:bigint) {
builder.addFieldInt64(2, bufferOffset, BigInt('0'));
}
static addData(builder:flatbuffers.Builder, dataOffset:flatbuffers.Offset) {
builder.addFieldOffset(3, dataOffset, 0);
}
static createDataVector(builder:flatbuffers.Builder, data:number[]|Uint8Array):flatbuffers.Offset {
builder.startVector(1, data.length, 1);
for (let i = data.length - 1; i >= 0; i--) {
builder.addInt8(data[i]!);
}
return builder.endVector();
}
static startDataVector(builder:flatbuffers.Builder, numElems:number) {
builder.startVector(1, numElems, 1);
}
static endDecodeUnitBuffer(builder:flatbuffers.Builder):flatbuffers.Offset {
const offset = builder.endObject();
return offset;
}
static createDecodeUnitBuffer(builder:flatbuffers.Builder, frameNumber:bigint, bufferIndex:bigint, bufferOffset:bigint, dataOffset:flatbuffers.Offset):flatbuffers.Offset {
DecodeUnitBuffer.startDecodeUnitBuffer(builder);
DecodeUnitBuffer.addFrameNumber(builder, frameNumber);
DecodeUnitBuffer.addBufferIndex(builder, bufferIndex);
DecodeUnitBuffer.addBufferOffset(builder, bufferOffset);
DecodeUnitBuffer.addData(builder, dataOffset);
return DecodeUnitBuffer.endDecodeUnitBuffer(builder);
}
}
@@ -0,0 +1,91 @@
// automatically generated by the FlatBuffers compiler, do not modify
/* eslint-disable @typescript-eslint/no-unused-vars, @typescript-eslint/no-explicit-any, @typescript-eslint/no-non-null-assertion */
import * as flatbuffers from 'flatbuffers';
import { FrameType } from '../video-update/frame-type.js';
export class DecodeUnitStart {
bb: flatbuffers.ByteBuffer|null = null;
bb_pos = 0;
__init(i:number, bb:flatbuffers.ByteBuffer):DecodeUnitStart {
this.bb_pos = i;
this.bb = bb;
return this;
}
static getRootAsDecodeUnitStart(bb:flatbuffers.ByteBuffer, obj?:DecodeUnitStart):DecodeUnitStart {
return (obj || new DecodeUnitStart()).__init(bb.readInt32(bb.position()) + bb.position(), bb);
}
static getSizePrefixedRootAsDecodeUnitStart(bb:flatbuffers.ByteBuffer, obj?:DecodeUnitStart):DecodeUnitStart {
bb.setPosition(bb.position() + flatbuffers.SIZE_PREFIX_LENGTH);
return (obj || new DecodeUnitStart()).__init(bb.readInt32(bb.position()) + bb.position(), bb);
}
frameNumber():bigint {
const offset = this.bb!.__offset(this.bb_pos, 4);
return offset ? this.bb!.readUint64(this.bb_pos + offset) : BigInt('0');
}
frameType():FrameType {
const offset = this.bb!.__offset(this.bb_pos, 6);
return offset ? this.bb!.readInt8(this.bb_pos + offset) : FrameType.PFRAME;
}
numBuffers():bigint {
const offset = this.bb!.__offset(this.bb_pos, 8);
return offset ? this.bb!.readUint64(this.bb_pos + offset) : BigInt('0');
}
receiveTimeMs():number {
const offset = this.bb!.__offset(this.bb_pos, 10);
return offset ? this.bb!.readUint16(this.bb_pos + offset) : 0;
}
fullLength():bigint {
const offset = this.bb!.__offset(this.bb_pos, 12);
return offset ? this.bb!.readUint64(this.bb_pos + offset) : BigInt('0');
}
static startDecodeUnitStart(builder:flatbuffers.Builder) {
builder.startObject(5);
}
static addFrameNumber(builder:flatbuffers.Builder, frameNumber:bigint) {
builder.addFieldInt64(0, frameNumber, BigInt('0'));
}
static addFrameType(builder:flatbuffers.Builder, frameType:FrameType) {
builder.addFieldInt8(1, frameType, FrameType.PFRAME);
}
static addNumBuffers(builder:flatbuffers.Builder, numBuffers:bigint) {
builder.addFieldInt64(2, numBuffers, BigInt('0'));
}
static addReceiveTimeMs(builder:flatbuffers.Builder, receiveTimeMs:number) {
builder.addFieldInt16(3, receiveTimeMs, 0);
}
static addFullLength(builder:flatbuffers.Builder, fullLength:bigint) {
builder.addFieldInt64(4, fullLength, BigInt('0'));
}
static endDecodeUnitStart(builder:flatbuffers.Builder):flatbuffers.Offset {
const offset = builder.endObject();
return offset;
}
static createDecodeUnitStart(builder:flatbuffers.Builder, frameNumber:bigint, frameType:FrameType, numBuffers:bigint, receiveTimeMs:number, fullLength:bigint):flatbuffers.Offset {
DecodeUnitStart.startDecodeUnitStart(builder);
DecodeUnitStart.addFrameNumber(builder, frameNumber);
DecodeUnitStart.addFrameType(builder, frameType);
DecodeUnitStart.addNumBuffers(builder, numBuffers);
DecodeUnitStart.addReceiveTimeMs(builder, receiveTimeMs);
DecodeUnitStart.addFullLength(builder, fullLength);
return DecodeUnitStart.endDecodeUnitStart(builder);
}
}
+12 -8
View File
@@ -2,37 +2,41 @@
/* eslint-disable @typescript-eslint/no-unused-vars, @typescript-eslint/no-explicit-any, @typescript-eslint/no-non-null-assertion */
import { DecodeUnit } from '../video-update/decode-unit.js';
import { DecodeUnitBuffer } from '../video-update/decode-unit-buffer.js';
import { DecodeUnitStart } from '../video-update/decode-unit-start.js';
import { Setup } from '../video-update/setup.js';
export enum Update {
NONE = 0,
Setup = 1,
DecodeUnit = 2
DecodeUnitStart = 2,
DecodeUnitBuffer = 3
}
export function unionToUpdate(
type: Update,
accessor: (obj:DecodeUnit|Setup) => DecodeUnit|Setup|null
): DecodeUnit|Setup|null {
accessor: (obj:DecodeUnitBuffer|DecodeUnitStart|Setup) => DecodeUnitBuffer|DecodeUnitStart|Setup|null
): DecodeUnitBuffer|DecodeUnitStart|Setup|null {
switch(Update[type]) {
case 'NONE': return null;
case 'Setup': return accessor(new Setup())! as Setup;
case 'DecodeUnit': return accessor(new DecodeUnit())! as DecodeUnit;
case 'DecodeUnitStart': return accessor(new DecodeUnitStart())! as DecodeUnitStart;
case 'DecodeUnitBuffer': return accessor(new DecodeUnitBuffer())! as DecodeUnitBuffer;
default: return null;
}
}
export function unionListToUpdate(
type: Update,
accessor: (index: number, obj:DecodeUnit|Setup) => DecodeUnit|Setup|null,
accessor: (index: number, obj:DecodeUnitBuffer|DecodeUnitStart|Setup) => DecodeUnitBuffer|DecodeUnitStart|Setup|null,
index: number
): DecodeUnit|Setup|null {
): DecodeUnitBuffer|DecodeUnitStart|Setup|null {
switch(Update[type]) {
case 'NONE': return null;
case 'Setup': return accessor(index, new Setup())! as Setup;
case 'DecodeUnit': return accessor(index, new DecodeUnit())! as DecodeUnit;
case 'DecodeUnitStart': return accessor(index, new DecodeUnitStart())! as DecodeUnitStart;
case 'DecodeUnitBuffer': return accessor(index, new DecodeUnitBuffer())! as DecodeUnitBuffer;
default: return null;
}
}
+160 -83
View File
@@ -1,105 +1,182 @@
import { VideoUpdate } from "$lib/proto/video";
import { ByteBuffer } from "flatbuffers";
import { DecodeUnitBuffer } from "./proto/video-update";
import { DoorClosed, Video } from "lucide-svelte";
function parseData(newBuffer: Uint8Array, oldBuffer: Uint8Array): [Array<VideoUpdate.VideoUpdate>, Uint8Array<ArrayBuffer>] {
let packets = new Array<VideoUpdate.VideoUpdate>();
let unparsedData = new Uint8Array();
let data = new Uint8Array([...oldBuffer, ...newBuffer]);
let index = 0;
while (true) {
if (index >= data.length) {
break
}
const view = new DataView(data.buffer.slice(index, index + 4));
const dataLength = view.getUint32(0, true);
const slice_start_index = index + 4;
const slice_end_index = index + 4 + dataLength;
if (data.length < slice_end_index) {
unparsedData = data.slice(index, data.length);
break;
}
const dataToParse = new ByteBuffer(data.slice(slice_start_index, slice_end_index));
const videoUpdate = VideoUpdate.VideoUpdate.getRootAsVideoUpdate(dataToParse);
packets.push(videoUpdate);
index += 4 + dataLength;
}
return [packets, unparsedData];
}
export async function streamVideoFromReader(reader: ReadableStreamDefaultReader, canvasElement: OffscreenCanvas) {
function getVideoDecoder(canvasElement: OffscreenCanvas): VideoDecoder {
const canvasCtx: OffscreenCanvasRenderingContext2D | null = canvasElement.getContext('2d');
if (canvasCtx == null) {
throw new Error(`Could not get 2d canvas context`);
}
const videoDecoder = new VideoDecoder({
output: (frame) => {
//console.log(`rendering frame start: ${performance.now()}`);
//canvasElement.width = frame.displayWidth;
//canvasElement.height = frame.displayHeight;
//console.log(`rendering frame drawImage: ${performance.now()}`);
canvasCtx.drawImage(frame, 0, 0);
//console.log(`rendering frame end: ${performance.now()}`);
frame.close();
//console.log(`rendering frame close: ${performance.now()}`);
},
error: (e) => {
console.error('Decode error:', e);
}
});
return videoDecoder;
}
async function configureDecoder(videoDecoder: VideoDecoder, videoFormat: string, width: number, height: number) {
let config: VideoDecoderConfig = {
codec: videoFormat,
codedWidth: width,
codedHeight: height,
optimizeForLatency: true,
//hardwareAcceleration: "prefer-hardware",
};
const codecSupport = await VideoDecoder.isConfigSupported(config);
console.log(codecSupport);
if (codecSupport.supported) {
videoDecoder.configure(config);
} else {
throw new Error(`Could not configure decoder`);
}
}
class Decoder {
videoDecoder: VideoDecoder;
frameNumber: bigint | undefined;
frameType: EncodedAudioChunkType = "delta";
fullLength: bigint = 0n;
receiveTimeMs: number = 0;
//frameBroken: boolean = false;
//lastBufferIndex: bigint = 0n;
frameCollector: boolean[] = new Array<boolean>();
dataOffset: number = 0;
data: Uint8Array = new Uint8Array();
constructor(videoDecoder: VideoDecoder) {
this.videoDecoder = videoDecoder;
}
print() {
console.log(
`
frameNumber: ${this.frameNumber}
frameType: ${this.frameType}
fullLength: ${this.fullLength}
receiveTimeMs: ${this.receiveTimeMs}
frameCollector: ${this.frameCollector}
ts: ${performance.now()}
`
);
}
processStart(decodeUnitStart: VideoUpdate.DecodeUnitStart) {
//this.print();
const frameCompleted = !this.frameCollector.includes(false);
if (!frameCompleted) {
console.log(`Got setup packet for frame ${decodeUnitStart.frameNumber()} but the last frame has not been completed`);
}
this.frameNumber = decodeUnitStart.frameNumber();
this.frameType = "delta";
if (decodeUnitStart.frameType() == VideoUpdate.FrameType.IDR) {
this.frameType = "key";
}
this.fullLength = decodeUnitStart.fullLength();
this.receiveTimeMs = decodeUnitStart.receiveTimeMs();
//this.frameBroken = false;
//this.lastBufferIndex = -1n;
//this.dataOffset = 0;
this.frameCollector = new Array(Number(decodeUnitStart.numBuffers())).fill(false);
this.data = new Uint8Array(Number(this.fullLength));
//this.print();
//console.log(`start: `, this);
//console.log(performance.now());
}
processBuffer(decodeUnitBuffer: VideoUpdate.DecodeUnitBuffer) {
//console.log(`buffer: `, this);
//console.log(performance.now());
//this.print();
const frameNumber = decodeUnitBuffer.frameNumber();
if (this.frameNumber === undefined) {
console.log("frameNumber is undefined but we got a buffer, ignoring...");
return;
}
if (this.frameNumber != frameNumber) {
console.log(`Got buffer for frame ${frameNumber} but we are processing frame ${this.frameNumber}, ignoring...`);
return;
}
let offset = decodeUnitBuffer.bufferOffset();
for (var i = 0; i < decodeUnitBuffer.dataLength(); i++) {
this.data[Number(offset) + i] = decodeUnitBuffer.data(i)!;
}
this.frameCollector[Number(decodeUnitBuffer.bufferIndex())] = true;
const gotAllframes = !this.frameCollector.includes(false);
if (gotAllframes) {
const chunk = new EncodedVideoChunk({
//timestamp: this.receiveTimeMs,
timestamp: 0,
type: this.frameType,
data: this.data,
});
//console.log(`${ performance.now() }: Enqueing a new decode request, current queue size ${ this.videoDecoder.decodeQueueSize } `);
this.videoDecoder.decode(chunk);
}
//this.print();
}
}
export async function streamVideoFromReader(reader: ReadableStreamDefaultReader, canvasElement: OffscreenCanvas) {
const videoDecoder = getVideoDecoder(canvasElement);
try {
let unparsedData = new Uint8Array();
let decodeUnitBuffer: VideoUpdate.DecodeUnitBuffer = new VideoUpdate.DecodeUnitBuffer();
let decoder = new Decoder(videoDecoder);
const videoDecoder = new VideoDecoder({
output: (frame) => {
canvasElement.width = frame.displayWidth;
canvasElement.height = frame.displayHeight;
canvasCtx.drawImage(frame, 0, 0);
frame.close();
},
error: (e) => {
console.error('Decode error:', e);
}
});
let decodeUnit: VideoUpdate.DecodeUnit = new VideoUpdate.DecodeUnit();
while (true) {
const { value, done } = await reader.read();
if (done) break;
let [packets, remainingData] = parseData(value, unparsedData);
unparsedData = remainingData;
const dataToParse = new ByteBuffer(value);
const videoUpdate = VideoUpdate.VideoUpdate.getRootAsVideoUpdate(dataToParse);
for (let i = 0; i < packets.length; i++) {
if (packets[i].updateType() == VideoUpdate.Update.Setup) {
let setup = packets[i].update(new VideoUpdate.Setup());
let config: VideoDecoderConfig = {
codec: setup.videoFormat(),
codedWidth: setup.width(),
codedHeight: setup.height(),
};
if (videoUpdate.updateType() == VideoUpdate.Update.Setup) {
let setup = videoUpdate.update(new VideoUpdate.Setup());
await configureDecoder(videoDecoder, setup.videoFormat(), setup.width(), setup.height());
const codecSupport = await VideoDecoder.isConfigSupported(config);
if (codecSupport.supported) {
videoDecoder.configure(config);
} else {
throw new Error(`Could not configure decoder`);
}
} else if (videoUpdate.updateType() == VideoUpdate.Update.DecodeUnitStart) {
let decodeUnitStart: VideoUpdate.DecodeUnitStart = new VideoUpdate.DecodeUnitStart();
videoUpdate.update(decodeUnitStart);
decoder.processStart(decodeUnitStart);
} else if (packets[i].updateType() == VideoUpdate.Update.DecodeUnit) {
packets[i].update(decodeUnit);
} else if (videoUpdate.updateType() == VideoUpdate.Update.DecodeUnitBuffer) {
videoUpdate.update(decodeUnitBuffer);
decoder.processBuffer(decodeUnitBuffer);
let frameType: EncodedAudioChunkType = "delta";
if (decodeUnit.frameType() == VideoUpdate.FrameType.IDR) {
console.log("GOT KEYFRAME");
frameType = "key";
}
const chunk = new EncodedVideoChunk({
timestamp: decodeUnit.receiveTimeMs(),
type: frameType,
data: decodeUnit.dataArray()!,
});
videoDecoder.decode(chunk);
} else {
throw new Error(`Got packet of unknown type`);
}
} else {
throw new Error(`Got packet of unknown type`);
}
}
} catch (e) {