diff --git a/src/modules/aix/server/api/aix.router.ts b/src/modules/aix/server/api/aix.router.ts index a950d07a5..fe9a4fa8f 100644 --- a/src/modules/aix/server/api/aix.router.ts +++ b/src/modules/aix/server/api/aix.router.ts @@ -4,11 +4,11 @@ import { createEmptyReadableStream, createServerDebugWireEvents, safeErrorString import { createTRPCRouter, publicProcedure } from '~/server/trpc/trpc.server'; import { fetchResponseOrTRPCThrow } from '~/server/trpc/trpc.router.fetchers'; +import { AixDemuxers } from '../dispatch/stream.demuxers'; import { AixWire_API, AixWire_API_ChatContentGenerate, AixWire_Particles } from './aix.wiretypes'; import { ChatGenerateTransmitter } from '../dispatch/chatGenerate/ChatGenerateTransmitter'; import { PerformanceProfiler } from '../dispatch/PerformanceProfiler'; import { createChatGenerateDispatch } from '../dispatch/chatGenerate/chatGenerate.dispatch'; -import { createStreamDemuxer } from '../dispatch/stream.demuxers'; import { heartbeatsWhileAwaiting } from '../dispatch/heartbeatsWhileAwaiting'; @@ -150,7 +150,7 @@ export const aixRouter = createTRPCRouter({ // STREAM the response to the client const dispatchReader = (dispatchResponse.body || createEmptyReadableStream()).getReader(); const dispatchDecoder = new TextDecoder('utf-8', { fatal: false /* malformed data -> “ ” (U+FFFD) */ }); - const dispatchDemuxer = createStreamDemuxer(dispatch.demuxerFormat); + const dispatchDemuxer = AixDemuxers.createStreamDemuxer(dispatch.demuxerFormat); const dispatchParser = dispatch.chatGenerateParse; // Data pump: AI Service -- (dispatch) --> Server -- (intake) --> Client diff --git a/src/modules/aix/server/dispatch/chatGenerate/chatGenerate.dispatch.ts b/src/modules/aix/server/dispatch/chatGenerate/chatGenerate.dispatch.ts index f8838226b..82c27c17c 100644 --- a/src/modules/aix/server/dispatch/chatGenerate/chatGenerate.dispatch.ts +++ b/src/modules/aix/server/dispatch/chatGenerate/chatGenerate.dispatch.ts @@ -4,7 +4,7 @@ import { ollamaAccess } from '~/modules/llms/server/ollama/ollama.router'; import { openAIAccess } from '~/modules/llms/server/openai/openai.router'; import type { AixAPI_Access, AixAPI_Model, AixAPIChatGenerate_Request } from '../../api/aix.wiretypes'; -import type { StreamDemuxerFormat } from '../stream.demuxers'; +import type { AixDemuxers } from '../stream.demuxers'; import { GeminiWire_API_Generate_Content } from '../wiretypes/gemini.wiretypes'; @@ -29,7 +29,7 @@ export type ChatGenerateParseFunction = (partTransmitter: IParticleTransmitter, */ export function createChatGenerateDispatch(access: AixAPI_Access, model: AixAPI_Model, chatGenerate: AixAPIChatGenerate_Request, streaming: boolean): { request: { url: string, headers: HeadersInit, body: object }, - demuxerFormat: StreamDemuxerFormat; + demuxerFormat: AixDemuxers.StreamDemuxerFormat; chatGenerateParse: ChatGenerateParseFunction; } { diff --git a/src/modules/aix/server/dispatch/stream.demuxer.sse.ts b/src/modules/aix/server/dispatch/stream.demuxer.sse.ts new file mode 100644 index 000000000..392386a76 --- /dev/null +++ b/src/modules/aix/server/dispatch/stream.demuxer.sse.ts @@ -0,0 +1,44 @@ +import { createParser as createEventsourceParser, type EventSourceMessage, ParseError } from 'eventsource-parser'; + +import { AIX_SECURITY_ONLY_IN_DEV_BUILDS } from '../api/aix.router'; + +import type { AixDemuxers } from './stream.demuxers'; + + +/** + * NOTE: this uses the `eventsource-parser` library, which is compliant, but not fast. + * When possible, use the _createFastEventSourceDemuxer + * + * Creates a parser for an EventSource stream (e.g. OpenAI's format). + * Uses the renowned `eventsource-parser` library. + * + * Note that we only use the 'feed' function and not 'reset', as we recreate the object per-call. + */ +export function _createEventSourceDemuxer(): AixDemuxers.StreamDemuxer { + let buffer: AixDemuxers.DemuxedEvent[] = []; + const parser = createEventsourceParser({ + onEvent: (event: EventSourceMessage) => { + buffer.push({ type: 'event', name: event.event || undefined, data: event.data }); + }, + onRetry: (interval: number) => { + buffer.push({ type: 'reconnect-interval', data: '' + interval }); + }, + onError: (error: ParseError) => { + console.warn(`stream.demuxers: parser error (${error.type}):`, error.field, error.value, error.line); + }, + onComment: (comment: string) => { + if (AIX_SECURITY_ONLY_IN_DEV_BUILDS) + console.log('[DEV] stream.demuxers: parser comment (safe to ignore):', comment); + }, + }); + + return { + demux: (chunk: string) => { + parser.feed(chunk); + const bufferCopy = buffer; + buffer = []; + return bufferCopy; + }, + remaining: () => '', + }; +} diff --git a/src/modules/aix/server/dispatch/stream.demuxers.ts b/src/modules/aix/server/dispatch/stream.demuxers.ts index e9c543b81..9b381f50c 100644 --- a/src/modules/aix/server/dispatch/stream.demuxers.ts +++ b/src/modules/aix/server/dispatch/stream.demuxers.ts @@ -1,86 +1,55 @@ -import { createParser as createEventsourceParser, type EventSourceMessage, ParseError } from 'eventsource-parser'; -import { AIX_SECURITY_ONLY_IN_DEV_BUILDS } from '~/modules/aix/server/api/aix.router'; - -/** - * The format of the stream: 'sse' or 'json-nl' - * - 'sse' is the default format, and is used by all vendors except Ollama - * - 'json-nl' is used by Ollama - */ -export type StreamDemuxerFormat = 'sse' | 'json-nl' | null; +import { _createEventSourceDemuxer } from './stream.demuxer.sse'; -/** - * Creates a demuxer for a stream of events. - * The demuxer is stateful and accumulates data until a full event is available. - */ -export function createStreamDemuxer(format: StreamDemuxerFormat): StreamDemuxer { - switch (format) { - case 'sse': - return _createEventSourceDemuxer(); - case 'json-nl': - return _createJsonNlDemuxer(); - case null: - return _nullStreamDemuxerWarn; +export namespace AixDemuxers { + + /** + * The format of the stream: 'sse' or 'json-nl' + * - 'sse' is the default format, and is used by all vendors except Ollama + * - 'json-nl' is used by Ollama + */ + export type StreamDemuxerFormat = 'sse' | 'json-nl' | null; + + + /** + * Creates a demuxer for a stream of events. + * The demuxer is stateful and accumulates data until a full event is available. + */ + export function createStreamDemuxer(format: StreamDemuxerFormat): StreamDemuxer { + switch (format) { + case 'sse': + return _createEventSourceDemuxer(); + case 'json-nl': + return _createJsonNlDemuxer(); + case null: + return _nullStreamDemuxerWarn; + } } -} -export type DemuxedEvent = { - type: 'event' | 'reconnect-interval'; - name?: string; - data: string; -}; - -type StreamDemuxer = { - demux: (chunk: string) => DemuxedEvent[]; - remaining: () => string; -}; - - -/** - * Creates a parser for an EventSource stream (e.g. OpenAI's format). - * Uses the renowned `eventsource-parser` library. - * - * Note that we only use the 'feed' function and not 'reset', as we recreate the object per-call. - */ -function _createEventSourceDemuxer(): StreamDemuxer { - let buffer: DemuxedEvent[] = []; - const parser = createEventsourceParser({ - onEvent: (event: EventSourceMessage) => { - buffer.push({ type: 'event', name: event.event || undefined, data: event.data }); - }, - onRetry: (interval: number) => { - buffer.push({ type: 'reconnect-interval', data: '' + interval }); - }, - onError: (error: ParseError) => { - console.warn(`stream.demuxers: parser error (${error.type}):`, error.field, error.value, error.line); - }, - onComment: (comment: string) => { - if (AIX_SECURITY_ONLY_IN_DEV_BUILDS) - console.log('[DEV] stream.demuxers: parser comment (safe to ignore):', comment); - }, - }); - - return { - demux: (chunk: string) => { - parser.feed(chunk); - const bufferCopy = buffer; - buffer = []; - return bufferCopy; - }, - remaining: () => '', + export type DemuxedEvent = { + type: 'event' | 'reconnect-interval'; + name?: string; + data: string; }; + + export type StreamDemuxer = { + demux: (chunk: string) => DemuxedEvent[]; + remaining: () => string; + }; + } + /** * Creates a parser for a 'JSON\n' non-event stream, to be swapped with an EventSource parser. * Ollama is the only vendor that uses this format. */ -function _createJsonNlDemuxer(): StreamDemuxer { +function _createJsonNlDemuxer(): AixDemuxers.StreamDemuxer { let buffer = ''; return { - demux: (chunk: string): DemuxedEvent[] => { + demux: (chunk: string): AixDemuxers.DemuxedEvent[] => { buffer += chunk; if (!buffer.endsWith('\n')) return []; @@ -98,7 +67,7 @@ function _createJsonNlDemuxer(): StreamDemuxer { } -const _nullStreamDemuxerWarn: StreamDemuxer = { +const _nullStreamDemuxerWarn: AixDemuxers.StreamDemuxer = { demux: () => { console.warn('Null demuxer called - shall not happen, as it is only created in non-streaming'); return [];