diff --git a/src/modules/aix/server/aix.router.ts b/src/modules/aix/server/aix.router.ts index c60357a45..fb0f86ac7 100644 --- a/src/modules/aix/server/aix.router.ts +++ b/src/modules/aix/server/aix.router.ts @@ -5,7 +5,8 @@ import { createTRPCRouter, publicProcedure } from '~/server/api/trpc.server'; import { fetchResponseOrTRPCThrow } from '~/server/api/trpc.router.fetchers'; import { IntakeHandler } from './intake/IntakeHandler'; -import { dispatchChatGenerate } from './dispatch/chatGenerate/dispatchChatGenerate'; +import { createChatGenerateDispatch } from './dispatch/chatGenerate/chatGenerate.dispatch'; +import { createStreamDemuxer } from './dispatch/stream.demuxers'; import { intake_Access_Schema, intake_ChatGenerateRequest_Schema, intake_ContextChatStream_Schema, intake_Model_Schema } from './intake/schemas.intake.api'; @@ -39,9 +40,9 @@ export const aixRouter = createTRPCRouter({ // Prepare the dispatch - let dispatch: ReturnType; + let dispatch: ReturnType; try { - dispatch = dispatchChatGenerate(access, model, chatGenerate, streaming); + dispatch = createChatGenerateDispatch(access, model, chatGenerate, streaming); // TEMP for debugging without requiring a full server restart if (input._debugRequestBody) @@ -82,7 +83,7 @@ export const aixRouter = createTRPCRouter({ if (!streaming) { try { const dispatchBody = await dispatchResponse.text(); - const messageAction = dispatch.parser(dispatchBody); + const messageAction = dispatch.chatGenerateParse(dispatchBody); yield* intakeHandler.yieldDmaOps(messageAction, prettyDialect); } catch (error: any) { yield* intakeHandler.yieldError('dispatch-read', `**[Service Issue] ${prettyDialect}**: ${safeErrorString(error) || 'Unknown service reading error'}`); @@ -94,6 +95,8 @@ export const aixRouter = createTRPCRouter({ // STREAM the response to the client const dispatchReader = (dispatchResponse.body || createEmptyReadableStream()).getReader(); const dispatchDecoder = new TextDecoder('utf-8', { fatal: false /* malformed data -> “ ” (U+FFFD) */ }); + const dispatchDemuxer = createStreamDemuxer(dispatch.demuxerFormat); + const dispatchParser = dispatch.chatGenerateParse; // Data pump: AI Service -- (dispatch) --> Server -- (intake) --> Client do { @@ -125,7 +128,7 @@ export const aixRouter = createTRPCRouter({ // Demux the chunk into 0 or more events - for (const demuxedItem of dispatch.demuxer.demux(dispatchChunk)) { + for (const demuxedItem of dispatchDemuxer.demux(dispatchChunk)) { intakeHandler.onReceivedDispatchEvent(demuxedItem); // ignore events post termination @@ -146,7 +149,7 @@ export const aixRouter = createTRPCRouter({ } try { - const messageAction = dispatch.parser(demuxedItem.data, demuxedItem.name); + const messageAction = dispatchParser(demuxedItem.data, demuxedItem.name); yield* intakeHandler.yieldDmaOps(messageAction, prettyDialect); } catch (error: any) { console.warn('[chatGenerateContent] Error parsing dispatch stream event:', demuxedItem, error); diff --git a/src/modules/aix/server/dispatch/chatGenerate/chatGenerate.config.ts b/src/modules/aix/server/dispatch/chatGenerate/chatGenerate.config.ts new file mode 100644 index 000000000..1e2150d7d --- /dev/null +++ b/src/modules/aix/server/dispatch/chatGenerate/chatGenerate.config.ts @@ -0,0 +1,5 @@ +// configuration +export const ISSUE_SYMBOL = '❌'; +export const ISSUE_SYMBOL_PROMPT_BLOCKED = '🚫'; +export const ISSUE_SYMBOL_RECITATION = '🦜'; +export const TEXT_SYMBOL_MAX_TOKENS = '🧱'; diff --git a/src/modules/aix/server/dispatch/chatGenerate/dispatchChatGenerate.ts b/src/modules/aix/server/dispatch/chatGenerate/chatGenerate.dispatch.ts similarity index 63% rename from src/modules/aix/server/dispatch/chatGenerate/dispatchChatGenerate.ts rename to src/modules/aix/server/dispatch/chatGenerate/chatGenerate.dispatch.ts index 6d3edd694..9bbf391c3 100644 --- a/src/modules/aix/server/dispatch/chatGenerate/dispatchChatGenerate.ts +++ b/src/modules/aix/server/dispatch/chatGenerate/chatGenerate.dispatch.ts @@ -6,14 +6,15 @@ import type { Intake_Access, Intake_ChatGenerateRequest, Intake_Model } from '.. import { intakeToAnthropicMessageCreate } from './anthropic/anthropic.adapters'; import { intakeToOpenAIMessageCreate } from './openai/oai.adapters'; -import { createDispatchDemuxer, nullDispatchDemuxer } from './chatGenerate.demuxers'; -import { createDispatchParserAnthropicMessage, createDispatchParserAnthropicNS, createDispatchParserOpenAI, DispatchParser } from './chatGenerate.parsers'; +import type { ChatGenerateParseFunction } from './chatGenerate.types'; +import type { StreamDemuxerFormat } from '../stream.demuxers'; +import { createAnthropicMessageParser, createAnthropicMessageParserNS, createOpenAIMessageCreateParser } from './chatGenerate.parsers'; -export function dispatchChatGenerate(access: Intake_Access, model: Intake_Model, chatGenerate: Intake_ChatGenerateRequest, streaming: boolean): { +export function createChatGenerateDispatch(access: Intake_Access, model: Intake_Model, chatGenerate: Intake_ChatGenerateRequest, streaming: boolean): { request: { url: string, headers: HeadersInit, body: object }, - demuxer: ReturnType; - parser: DispatchParser; + demuxerFormat: StreamDemuxerFormat; + chatGenerateParse: ChatGenerateParseFunction; } { switch (access.dialect) { @@ -23,8 +24,8 @@ export function dispatchChatGenerate(access: Intake_Access, model: Intake_Model, ...anthropicAccess(access, '/v1/messages'), body: intakeToAnthropicMessageCreate(model, chatGenerate, streaming), }, - demuxer: streaming ? createDispatchDemuxer('sse') : nullDispatchDemuxer, - parser: streaming ? createDispatchParserAnthropicMessage() : createDispatchParserAnthropicNS(), + demuxerFormat: streaming ? 'sse' : null, + chatGenerateParse: streaming ? createAnthropicMessageParser() : createAnthropicMessageParserNS(), }; case 'gemini': @@ -34,8 +35,8 @@ export function dispatchChatGenerate(access: Intake_Access, model: Intake_Model, // ...geminiAccess(access, model.id, streaming ? geminiModelsStreamGenerateContentPath : geminiModelsGenerateContentPath), // // body: geminiGenerateContentTextPayload(model, _hist, access.minSafetyLevel, 1), // }, - // demuxer: streaming ? createDispatchDemuxer('sse') : nullDispatchDemuxer, - // parser: createDispatchParserGemini(model.id.replace('models/', '')), + // demuxerFormat: streaming ? 'sse' : null, + // chatGenerateParse: createDispatchParserGemini(model.id.replace('models/', '')), // }; case 'ollama': @@ -45,8 +46,8 @@ export function dispatchChatGenerate(access: Intake_Access, model: Intake_Model, // ...ollamaAccess(access, OLLAMA_PATH_CHAT), // body: ollamaChatCompletionPayload(model, _hist, access.ollamaJson, streaming), // }, - // demuxer: streaming ? createDispatchDemuxer('json-nl') : nullDispatchDemuxer, - // parser: createDispatchParserOllama(), + // demuxerFormat: streaming ? 'json-nl' : null, + // chatGenerateParse: createDispatchParserOllama(), // }; case 'azure': @@ -65,8 +66,8 @@ export function dispatchChatGenerate(access: Intake_Access, model: Intake_Model, ...openAIAccess(access, model.id, '/v1/chat/completions'), body: intakeToOpenAIMessageCreate(access.dialect, model, chatGenerate, false, streaming), }, - demuxer: streaming ? createDispatchDemuxer('sse') : nullDispatchDemuxer, - parser: createDispatchParserOpenAI(), + demuxerFormat: streaming ? 'sse' : null, + chatGenerateParse: createOpenAIMessageCreateParser(), }; } } diff --git a/src/modules/aix/server/dispatch/chatGenerate/chatGenerate.parsers.ts b/src/modules/aix/server/dispatch/chatGenerate/chatGenerate.parsers.ts index 78adcc93e..f035c6bbb 100644 --- a/src/modules/aix/server/dispatch/chatGenerate/chatGenerate.parsers.ts +++ b/src/modules/aix/server/dispatch/chatGenerate/chatGenerate.parsers.ts @@ -7,43 +7,13 @@ import { geminiGeneratedContentResponseSchema, geminiHarmProbabilitySortFunction import { openaiWire_ChatCompletionChunkResponse_Schema } from './openai/oai.wiretypes'; import { wireOllamaChunkedOutputSchema } from './ollama/ollama.wiretypes'; - -// configuration -const ISSUE_SYMBOL = '❌'; -const ISSUE_SYMBOL_PROMPT_BLOCKED = '🚫'; -const ISSUE_SYMBOL_RECITATION = '🦜'; -const TEXT_SYMBOL_MAX_TOKENS = '🧱'; - - -export type DispatchMessageAction = { - op: 'text', - text: string; -} | { - op: 'issue'; - issue: string; - symbol: string; -} | { - op: 'parser-close'; -} | { - op: 'set'; - value: { - model?: string; - stats?: { - chatInTokens?: number; // -1: unknown - chatOutTokens: number; - chatOutRate?: number; - timeInner?: number; - timeOuter?: number; - } - }; -}; - -export type DispatchParser = (eventData: string, eventName?: string) => Generator; +import type { ChatGenerateMessageAction, ChatGenerateParseFunction } from './chatGenerate.types'; +import { ISSUE_SYMBOL, ISSUE_SYMBOL_PROMPT_BLOCKED, ISSUE_SYMBOL_RECITATION, TEXT_SYMBOL_MAX_TOKENS } from './chatGenerate.config'; /// Stream Parsers -export function createDispatchParserAnthropicMessage(): DispatchParser { +export function createAnthropicMessageParser(): ChatGenerateParseFunction { let responseMessage: AnthropicWire_MessageResponse; let hasErrored = false; let messageStartTime: number | undefined = undefined; @@ -52,7 +22,7 @@ export function createDispatchParserAnthropicMessage(): DispatchParser { // Note: at this stage, the parser only returns the text content as text, which is streamed as text // to the client. It is however building in parallel the responseMessage object, which is not // yet used, but contains token counts, for instance. - return function* (eventData: string, eventName?: string): Generator { + return function* (eventData: string, eventName?: string): Generator { // if we've errored, we should not be receiving more data if (hasErrored) @@ -182,10 +152,10 @@ export function createDispatchParserAnthropicMessage(): DispatchParser { } -export function createDispatchParserAnthropicNS(): DispatchParser { +export function createAnthropicMessageParserNS(): ChatGenerateParseFunction { let messageStartTime: number = Date.now(); - return function* (fullData: string): Generator { + return function* (fullData: string): Generator { // parse with validation (e.g. type: 'message' && role: 'assistant') const { @@ -246,11 +216,11 @@ function explainGeminiSafetyIssues(safetyRatings?: GeminiSafetyRatings): string .join(', '); } -export function createDispatchParserGemini(modelName: string): DispatchParser { +export function createGeminiParser(modelName: string): ChatGenerateParseFunction { let hasBegun = false; // this can throw, it's caught by the caller - return function* (eventData): Generator { + return function* (eventData): Generator { // parse the JSON chunk const wireGenerationChunk = JSON.parse(eventData); @@ -318,10 +288,10 @@ export function createDispatchParserGemini(modelName: string): DispatchParser { } -export function createDispatchParserOllama(): DispatchParser { +export function createOllamaParser(): ChatGenerateParseFunction { let hasBegun = false; - return function* (eventData: string): Generator { + return function* (eventData: string): Generator { // parse the JSON chunk let wireJsonChunk: any; @@ -365,12 +335,12 @@ export function createDispatchParserOllama(): DispatchParser { } -export function createDispatchParserOpenAI(): DispatchParser { +export function createOpenAIMessageCreateParser(): ChatGenerateParseFunction { let hasBegun = false; let hasWarned = false; // NOTE: could compute rate (tok/s) from the first textful event to the last (to ignore the prefill time) - return function* (eventData: string): Generator { + return function* (eventData: string): Generator { // Throws on malformed event data const json = openaiWire_ChatCompletionChunkResponse_Schema.parse(JSON.parse(eventData)); diff --git a/src/modules/aix/server/dispatch/chatGenerate/chatGenerate.types.ts b/src/modules/aix/server/dispatch/chatGenerate/chatGenerate.types.ts new file mode 100644 index 000000000..7eea89ba4 --- /dev/null +++ b/src/modules/aix/server/dispatch/chatGenerate/chatGenerate.types.ts @@ -0,0 +1,24 @@ +export type ChatGenerateMessageAction = { + op: 'text', + text: string; +} | { + op: 'issue'; + issue: string; + symbol: string; +} | { + op: 'parser-close'; +} | { + op: 'set'; + value: { + model?: string; + stats?: { + chatInTokens?: number; // -1: unknown + chatOutTokens: number; + chatOutRate?: number; + timeInner?: number; + timeOuter?: number; + } + }; +}; + +export type ChatGenerateParseFunction = (eventData: string, eventName?: string) => Generator; diff --git a/src/modules/aix/server/dispatch/chatGenerate/chatGenerate.demuxers.ts b/src/modules/aix/server/dispatch/stream.demuxers.ts similarity index 74% rename from src/modules/aix/server/dispatch/chatGenerate/chatGenerate.demuxers.ts rename to src/modules/aix/server/dispatch/stream.demuxers.ts index 21c1f185c..4ca814da5 100644 --- a/src/modules/aix/server/dispatch/chatGenerate/chatGenerate.demuxers.ts +++ b/src/modules/aix/server/dispatch/stream.demuxers.ts @@ -1,11 +1,28 @@ import { createParser as createEventsourceParser } from 'eventsource-parser'; /** - * Event stream formats - * - 'sse' is the default format, and is used by all vendors except Ollama - * - 'json-nl' is used by Ollama + * The format of the stream: 'sse' or 'json-nl' + * - 'sse' is the default format, and is used by all vendors except Ollama + * - 'json-nl' is used by Ollama */ -type DispatchDemuxFormat = 'sse' | 'json-nl'; +export type StreamDemuxerFormat = 'sse' | 'json-nl' | null; + + +/** + * Creates a demuxer for a stream of events. + * The demuxer is stateful and accumulates data until a full event is available. + */ +export function createStreamDemuxer(format: StreamDemuxerFormat): StreamDemuxer { + switch (format) { + case 'sse': + return _createEventSourceDemuxer(); + case 'json-nl': + return _createJsonNlDemuxer(); + case null: + return _nullStreamDemuxerWarn; + } +} + export type DemuxedEvent = { type: 'event' | 'reconnect-interval'; @@ -13,37 +30,19 @@ export type DemuxedEvent = { data: string; }; -type DispatchDemuxer = { +type StreamDemuxer = { demux: (chunk: string) => DemuxedEvent[]; remaining: () => string; }; -export function createDispatchDemuxer(format: DispatchDemuxFormat) { - switch (format) { - case 'sse': - return _createEventSourceDemuxer(); - case 'json-nl': - return _createJsonNlDemuxer(); - } -} - -export const nullDispatchDemuxer: DispatchDemuxer = { - demux: () => { - console.warn('Null demuxer called - shall not happen, as it is only created in non-streaming'); - return []; - }, - remaining: () => '', -}; - - /** * Creates a parser for an EventSource stream (e.g. OpenAI's format). * Uses the renowned `eventsource-parser` library. * * Note that we only use the 'feed' function and not 'reset', as we recreate the object per-call. */ -function _createEventSourceDemuxer(): DispatchDemuxer { +function _createEventSourceDemuxer(): StreamDemuxer { let buffer: DemuxedEvent[] = []; const parser = createEventsourceParser((event) => { switch (event.type) { @@ -71,7 +70,7 @@ function _createEventSourceDemuxer(): DispatchDemuxer { * Creates a parser for a 'JSON\n' non-event stream, to be swapped with an EventSource parser. * Ollama is the only vendor that uses this format. */ -function _createJsonNlDemuxer(): DispatchDemuxer { +function _createJsonNlDemuxer(): StreamDemuxer { let buffer = ''; return { @@ -91,3 +90,12 @@ function _createJsonNlDemuxer(): DispatchDemuxer { remaining: () => buffer, }; } + + +const _nullStreamDemuxerWarn: StreamDemuxer = { + demux: () => { + console.warn('Null demuxer called - shall not happen, as it is only created in non-streaming'); + return []; + }, + remaining: () => '', +}; diff --git a/src/modules/aix/server/intake/IntakeHandler.ts b/src/modules/aix/server/intake/IntakeHandler.ts index f3977de4e..b4f6c1585 100644 --- a/src/modules/aix/server/intake/IntakeHandler.ts +++ b/src/modules/aix/server/intake/IntakeHandler.ts @@ -1,7 +1,7 @@ import { SERVER_DEBUG_WIRE } from '~/server/wire'; -import type { DemuxedEvent } from '../dispatch/chatGenerate/chatGenerate.demuxers'; -import type { DispatchMessageAction } from '../dispatch/chatGenerate/chatGenerate.parsers'; +import type { ChatGenerateMessageAction } from '../dispatch/chatGenerate/chatGenerate.types'; +import type { DemuxedEvent } from '../dispatch/stream.demuxers'; // type IntakeProtoObject = IntakeControlProtoObject | IntakeEventProtoObject; @@ -44,7 +44,7 @@ export class IntakeHandler { yield op; } - * yieldDmaOps(parsedEvents: Generator, prettyDialect: string) { + * yieldDmaOps(parsedEvents: Generator, prettyDialect: string) { for (const dma of parsedEvents) { // console.log('parsed dispatch:', dma); // TODO: massively rework this into a good protocol