AIX: demuxers: extract eventsource-parser

This commit is contained in:
Enrico Ros
2025-03-17 02:30:46 -07:00
parent 4890a90641
commit 0f9c02e249
4 changed files with 86 additions and 73 deletions
+2 -2
View File
@@ -4,11 +4,11 @@ import { createEmptyReadableStream, createServerDebugWireEvents, safeErrorString
import { createTRPCRouter, publicProcedure } from '~/server/trpc/trpc.server';
import { fetchResponseOrTRPCThrow } from '~/server/trpc/trpc.router.fetchers';
import { AixDemuxers } from '../dispatch/stream.demuxers';
import { AixWire_API, AixWire_API_ChatContentGenerate, AixWire_Particles } from './aix.wiretypes';
import { ChatGenerateTransmitter } from '../dispatch/chatGenerate/ChatGenerateTransmitter';
import { PerformanceProfiler } from '../dispatch/PerformanceProfiler';
import { createChatGenerateDispatch } from '../dispatch/chatGenerate/chatGenerate.dispatch';
import { createStreamDemuxer } from '../dispatch/stream.demuxers';
import { heartbeatsWhileAwaiting } from '../dispatch/heartbeatsWhileAwaiting';
@@ -150,7 +150,7 @@ export const aixRouter = createTRPCRouter({
// STREAM the response to the client
const dispatchReader = (dispatchResponse.body || createEmptyReadableStream()).getReader();
const dispatchDecoder = new TextDecoder('utf-8', { fatal: false /* malformed data -> “ ” (U+FFFD) */ });
const dispatchDemuxer = createStreamDemuxer(dispatch.demuxerFormat);
const dispatchDemuxer = AixDemuxers.createStreamDemuxer(dispatch.demuxerFormat);
const dispatchParser = dispatch.chatGenerateParse;
// Data pump: AI Service -- (dispatch) --> Server -- (intake) --> Client
@@ -4,7 +4,7 @@ import { ollamaAccess } from '~/modules/llms/server/ollama/ollama.router';
import { openAIAccess } from '~/modules/llms/server/openai/openai.router';
import type { AixAPI_Access, AixAPI_Model, AixAPIChatGenerate_Request } from '../../api/aix.wiretypes';
import type { StreamDemuxerFormat } from '../stream.demuxers';
import type { AixDemuxers } from '../stream.demuxers';
import { GeminiWire_API_Generate_Content } from '../wiretypes/gemini.wiretypes';
@@ -29,7 +29,7 @@ export type ChatGenerateParseFunction = (partTransmitter: IParticleTransmitter,
*/
export function createChatGenerateDispatch(access: AixAPI_Access, model: AixAPI_Model, chatGenerate: AixAPIChatGenerate_Request, streaming: boolean): {
request: { url: string, headers: HeadersInit, body: object },
demuxerFormat: StreamDemuxerFormat;
demuxerFormat: AixDemuxers.StreamDemuxerFormat;
chatGenerateParse: ChatGenerateParseFunction;
} {
@@ -0,0 +1,44 @@
import { createParser as createEventsourceParser, type EventSourceMessage, ParseError } from 'eventsource-parser';
import { AIX_SECURITY_ONLY_IN_DEV_BUILDS } from '../api/aix.router';
import type { AixDemuxers } from './stream.demuxers';
/**
* NOTE: this uses the `eventsource-parser` library, which is compliant, but not fast.
* When possible, use the _createFastEventSourceDemuxer
*
* Creates a parser for an EventSource stream (e.g. OpenAI's format).
* Uses the renowned `eventsource-parser` library.
*
* Note that we only use the 'feed' function and not 'reset', as we recreate the object per-call.
*/
export function _createEventSourceDemuxer(): AixDemuxers.StreamDemuxer {
let buffer: AixDemuxers.DemuxedEvent[] = [];
const parser = createEventsourceParser({
onEvent: (event: EventSourceMessage) => {
buffer.push({ type: 'event', name: event.event || undefined, data: event.data });
},
onRetry: (interval: number) => {
buffer.push({ type: 'reconnect-interval', data: '' + interval });
},
onError: (error: ParseError) => {
console.warn(`stream.demuxers: parser error (${error.type}):`, error.field, error.value, error.line);
},
onComment: (comment: string) => {
if (AIX_SECURITY_ONLY_IN_DEV_BUILDS)
console.log('[DEV] stream.demuxers: parser comment (safe to ignore):', comment);
},
});
return {
demux: (chunk: string) => {
parser.feed(chunk);
const bufferCopy = buffer;
buffer = [];
return bufferCopy;
},
remaining: () => '',
};
}
@@ -1,86 +1,55 @@
import { createParser as createEventsourceParser, type EventSourceMessage, ParseError } from 'eventsource-parser';
import { AIX_SECURITY_ONLY_IN_DEV_BUILDS } from '~/modules/aix/server/api/aix.router';
/**
* The format of the stream: 'sse' or 'json-nl'
* - 'sse' is the default format, and is used by all vendors except Ollama
* - 'json-nl' is used by Ollama
*/
export type StreamDemuxerFormat = 'sse' | 'json-nl' | null;
import { _createEventSourceDemuxer } from './stream.demuxer.sse';
/**
* Creates a demuxer for a stream of events.
* The demuxer is stateful and accumulates data until a full event is available.
*/
export function createStreamDemuxer(format: StreamDemuxerFormat): StreamDemuxer {
switch (format) {
case 'sse':
return _createEventSourceDemuxer();
case 'json-nl':
return _createJsonNlDemuxer();
case null:
return _nullStreamDemuxerWarn;
export namespace AixDemuxers {
/**
* The format of the stream: 'sse' or 'json-nl'
* - 'sse' is the default format, and is used by all vendors except Ollama
* - 'json-nl' is used by Ollama
*/
export type StreamDemuxerFormat = 'sse' | 'json-nl' | null;
/**
* Creates a demuxer for a stream of events.
* The demuxer is stateful and accumulates data until a full event is available.
*/
export function createStreamDemuxer(format: StreamDemuxerFormat): StreamDemuxer {
switch (format) {
case 'sse':
return _createEventSourceDemuxer();
case 'json-nl':
return _createJsonNlDemuxer();
case null:
return _nullStreamDemuxerWarn;
}
}
}
export type DemuxedEvent = {
type: 'event' | 'reconnect-interval';
name?: string;
data: string;
};
type StreamDemuxer = {
demux: (chunk: string) => DemuxedEvent[];
remaining: () => string;
};
/**
* Creates a parser for an EventSource stream (e.g. OpenAI's format).
* Uses the renowned `eventsource-parser` library.
*
* Note that we only use the 'feed' function and not 'reset', as we recreate the object per-call.
*/
function _createEventSourceDemuxer(): StreamDemuxer {
let buffer: DemuxedEvent[] = [];
const parser = createEventsourceParser({
onEvent: (event: EventSourceMessage) => {
buffer.push({ type: 'event', name: event.event || undefined, data: event.data });
},
onRetry: (interval: number) => {
buffer.push({ type: 'reconnect-interval', data: '' + interval });
},
onError: (error: ParseError) => {
console.warn(`stream.demuxers: parser error (${error.type}):`, error.field, error.value, error.line);
},
onComment: (comment: string) => {
if (AIX_SECURITY_ONLY_IN_DEV_BUILDS)
console.log('[DEV] stream.demuxers: parser comment (safe to ignore):', comment);
},
});
return {
demux: (chunk: string) => {
parser.feed(chunk);
const bufferCopy = buffer;
buffer = [];
return bufferCopy;
},
remaining: () => '',
export type DemuxedEvent = {
type: 'event' | 'reconnect-interval';
name?: string;
data: string;
};
export type StreamDemuxer = {
demux: (chunk: string) => DemuxedEvent[];
remaining: () => string;
};
}
/**
* Creates a parser for a 'JSON\n' non-event stream, to be swapped with an EventSource parser.
* Ollama is the only vendor that uses this format.
*/
function _createJsonNlDemuxer(): StreamDemuxer {
function _createJsonNlDemuxer(): AixDemuxers.StreamDemuxer {
let buffer = '';
return {
demux: (chunk: string): DemuxedEvent[] => {
demux: (chunk: string): AixDemuxers.DemuxedEvent[] => {
buffer += chunk;
if (!buffer.endsWith('\n')) return [];
@@ -98,7 +67,7 @@ function _createJsonNlDemuxer(): StreamDemuxer {
}
const _nullStreamDemuxerWarn: StreamDemuxer = {
const _nullStreamDemuxerWarn: AixDemuxers.StreamDemuxer = {
demux: () => {
console.warn('Null demuxer called - shall not happen, as it is only created in non-streaming');
return [];