mirror of
https://github.com/enricoros/big-AGI.git
synced 2026-05-10 21:50:14 -07:00
AIX: dispatch: improve chatGenerate structure
This commit is contained in:
@@ -5,7 +5,8 @@ import { createTRPCRouter, publicProcedure } from '~/server/api/trpc.server';
|
||||
import { fetchResponseOrTRPCThrow } from '~/server/api/trpc.router.fetchers';
|
||||
|
||||
import { IntakeHandler } from './intake/IntakeHandler';
|
||||
import { dispatchChatGenerate } from './dispatch/chatGenerate/dispatchChatGenerate';
|
||||
import { createChatGenerateDispatch } from './dispatch/chatGenerate/chatGenerate.dispatch';
|
||||
import { createStreamDemuxer } from './dispatch/stream.demuxers';
|
||||
import { intake_Access_Schema, intake_ChatGenerateRequest_Schema, intake_ContextChatStream_Schema, intake_Model_Schema } from './intake/schemas.intake.api';
|
||||
|
||||
|
||||
@@ -39,9 +40,9 @@ export const aixRouter = createTRPCRouter({
|
||||
|
||||
|
||||
// Prepare the dispatch
|
||||
let dispatch: ReturnType<typeof dispatchChatGenerate>;
|
||||
let dispatch: ReturnType<typeof createChatGenerateDispatch>;
|
||||
try {
|
||||
dispatch = dispatchChatGenerate(access, model, chatGenerate, streaming);
|
||||
dispatch = createChatGenerateDispatch(access, model, chatGenerate, streaming);
|
||||
|
||||
// TEMP for debugging without requiring a full server restart
|
||||
if (input._debugRequestBody)
|
||||
@@ -82,7 +83,7 @@ export const aixRouter = createTRPCRouter({
|
||||
if (!streaming) {
|
||||
try {
|
||||
const dispatchBody = await dispatchResponse.text();
|
||||
const messageAction = dispatch.parser(dispatchBody);
|
||||
const messageAction = dispatch.chatGenerateParse(dispatchBody);
|
||||
yield* intakeHandler.yieldDmaOps(messageAction, prettyDialect);
|
||||
} catch (error: any) {
|
||||
yield* intakeHandler.yieldError('dispatch-read', `**[Service Issue] ${prettyDialect}**: ${safeErrorString(error) || 'Unknown service reading error'}`);
|
||||
@@ -94,6 +95,8 @@ export const aixRouter = createTRPCRouter({
|
||||
// STREAM the response to the client
|
||||
const dispatchReader = (dispatchResponse.body || createEmptyReadableStream()).getReader();
|
||||
const dispatchDecoder = new TextDecoder('utf-8', { fatal: false /* malformed data -> “ ” (U+FFFD) */ });
|
||||
const dispatchDemuxer = createStreamDemuxer(dispatch.demuxerFormat);
|
||||
const dispatchParser = dispatch.chatGenerateParse;
|
||||
|
||||
// Data pump: AI Service -- (dispatch) --> Server -- (intake) --> Client
|
||||
do {
|
||||
@@ -125,7 +128,7 @@ export const aixRouter = createTRPCRouter({
|
||||
|
||||
|
||||
// Demux the chunk into 0 or more events
|
||||
for (const demuxedItem of dispatch.demuxer.demux(dispatchChunk)) {
|
||||
for (const demuxedItem of dispatchDemuxer.demux(dispatchChunk)) {
|
||||
intakeHandler.onReceivedDispatchEvent(demuxedItem);
|
||||
|
||||
// ignore events post termination
|
||||
@@ -146,7 +149,7 @@ export const aixRouter = createTRPCRouter({
|
||||
}
|
||||
|
||||
try {
|
||||
const messageAction = dispatch.parser(demuxedItem.data, demuxedItem.name);
|
||||
const messageAction = dispatchParser(demuxedItem.data, demuxedItem.name);
|
||||
yield* intakeHandler.yieldDmaOps(messageAction, prettyDialect);
|
||||
} catch (error: any) {
|
||||
console.warn('[chatGenerateContent] Error parsing dispatch stream event:', demuxedItem, error);
|
||||
|
||||
@@ -0,0 +1,5 @@
|
||||
// configuration
|
||||
export const ISSUE_SYMBOL = '❌';
|
||||
export const ISSUE_SYMBOL_PROMPT_BLOCKED = '🚫';
|
||||
export const ISSUE_SYMBOL_RECITATION = '🦜';
|
||||
export const TEXT_SYMBOL_MAX_TOKENS = '🧱';
|
||||
+14
-13
@@ -6,14 +6,15 @@ import type { Intake_Access, Intake_ChatGenerateRequest, Intake_Model } from '..
|
||||
import { intakeToAnthropicMessageCreate } from './anthropic/anthropic.adapters';
|
||||
import { intakeToOpenAIMessageCreate } from './openai/oai.adapters';
|
||||
|
||||
import { createDispatchDemuxer, nullDispatchDemuxer } from './chatGenerate.demuxers';
|
||||
import { createDispatchParserAnthropicMessage, createDispatchParserAnthropicNS, createDispatchParserOpenAI, DispatchParser } from './chatGenerate.parsers';
|
||||
import type { ChatGenerateParseFunction } from './chatGenerate.types';
|
||||
import type { StreamDemuxerFormat } from '../stream.demuxers';
|
||||
import { createAnthropicMessageParser, createAnthropicMessageParserNS, createOpenAIMessageCreateParser } from './chatGenerate.parsers';
|
||||
|
||||
|
||||
export function dispatchChatGenerate(access: Intake_Access, model: Intake_Model, chatGenerate: Intake_ChatGenerateRequest, streaming: boolean): {
|
||||
export function createChatGenerateDispatch(access: Intake_Access, model: Intake_Model, chatGenerate: Intake_ChatGenerateRequest, streaming: boolean): {
|
||||
request: { url: string, headers: HeadersInit, body: object },
|
||||
demuxer: ReturnType<typeof createDispatchDemuxer>;
|
||||
parser: DispatchParser;
|
||||
demuxerFormat: StreamDemuxerFormat;
|
||||
chatGenerateParse: ChatGenerateParseFunction;
|
||||
} {
|
||||
|
||||
switch (access.dialect) {
|
||||
@@ -23,8 +24,8 @@ export function dispatchChatGenerate(access: Intake_Access, model: Intake_Model,
|
||||
...anthropicAccess(access, '/v1/messages'),
|
||||
body: intakeToAnthropicMessageCreate(model, chatGenerate, streaming),
|
||||
},
|
||||
demuxer: streaming ? createDispatchDemuxer('sse') : nullDispatchDemuxer,
|
||||
parser: streaming ? createDispatchParserAnthropicMessage() : createDispatchParserAnthropicNS(),
|
||||
demuxerFormat: streaming ? 'sse' : null,
|
||||
chatGenerateParse: streaming ? createAnthropicMessageParser() : createAnthropicMessageParserNS(),
|
||||
};
|
||||
|
||||
case 'gemini':
|
||||
@@ -34,8 +35,8 @@ export function dispatchChatGenerate(access: Intake_Access, model: Intake_Model,
|
||||
// ...geminiAccess(access, model.id, streaming ? geminiModelsStreamGenerateContentPath : geminiModelsGenerateContentPath),
|
||||
// // body: geminiGenerateContentTextPayload(model, _hist, access.minSafetyLevel, 1),
|
||||
// },
|
||||
// demuxer: streaming ? createDispatchDemuxer('sse') : nullDispatchDemuxer,
|
||||
// parser: createDispatchParserGemini(model.id.replace('models/', '')),
|
||||
// demuxerFormat: streaming ? 'sse' : null,
|
||||
// chatGenerateParse: createDispatchParserGemini(model.id.replace('models/', '')),
|
||||
// };
|
||||
|
||||
case 'ollama':
|
||||
@@ -45,8 +46,8 @@ export function dispatchChatGenerate(access: Intake_Access, model: Intake_Model,
|
||||
// ...ollamaAccess(access, OLLAMA_PATH_CHAT),
|
||||
// body: ollamaChatCompletionPayload(model, _hist, access.ollamaJson, streaming),
|
||||
// },
|
||||
// demuxer: streaming ? createDispatchDemuxer('json-nl') : nullDispatchDemuxer,
|
||||
// parser: createDispatchParserOllama(),
|
||||
// demuxerFormat: streaming ? 'json-nl' : null,
|
||||
// chatGenerateParse: createDispatchParserOllama(),
|
||||
// };
|
||||
|
||||
case 'azure':
|
||||
@@ -65,8 +66,8 @@ export function dispatchChatGenerate(access: Intake_Access, model: Intake_Model,
|
||||
...openAIAccess(access, model.id, '/v1/chat/completions'),
|
||||
body: intakeToOpenAIMessageCreate(access.dialect, model, chatGenerate, false, streaming),
|
||||
},
|
||||
demuxer: streaming ? createDispatchDemuxer('sse') : nullDispatchDemuxer,
|
||||
parser: createDispatchParserOpenAI(),
|
||||
demuxerFormat: streaming ? 'sse' : null,
|
||||
chatGenerateParse: createOpenAIMessageCreateParser(),
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -7,43 +7,13 @@ import { geminiGeneratedContentResponseSchema, geminiHarmProbabilitySortFunction
|
||||
import { openaiWire_ChatCompletionChunkResponse_Schema } from './openai/oai.wiretypes';
|
||||
import { wireOllamaChunkedOutputSchema } from './ollama/ollama.wiretypes';
|
||||
|
||||
|
||||
// configuration
|
||||
const ISSUE_SYMBOL = '❌';
|
||||
const ISSUE_SYMBOL_PROMPT_BLOCKED = '🚫';
|
||||
const ISSUE_SYMBOL_RECITATION = '🦜';
|
||||
const TEXT_SYMBOL_MAX_TOKENS = '🧱';
|
||||
|
||||
|
||||
export type DispatchMessageAction = {
|
||||
op: 'text',
|
||||
text: string;
|
||||
} | {
|
||||
op: 'issue';
|
||||
issue: string;
|
||||
symbol: string;
|
||||
} | {
|
||||
op: 'parser-close';
|
||||
} | {
|
||||
op: 'set';
|
||||
value: {
|
||||
model?: string;
|
||||
stats?: {
|
||||
chatInTokens?: number; // -1: unknown
|
||||
chatOutTokens: number;
|
||||
chatOutRate?: number;
|
||||
timeInner?: number;
|
||||
timeOuter?: number;
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
export type DispatchParser = (eventData: string, eventName?: string) => Generator<DispatchMessageAction>;
|
||||
import type { ChatGenerateMessageAction, ChatGenerateParseFunction } from './chatGenerate.types';
|
||||
import { ISSUE_SYMBOL, ISSUE_SYMBOL_PROMPT_BLOCKED, ISSUE_SYMBOL_RECITATION, TEXT_SYMBOL_MAX_TOKENS } from './chatGenerate.config';
|
||||
|
||||
|
||||
/// Stream Parsers
|
||||
|
||||
export function createDispatchParserAnthropicMessage(): DispatchParser {
|
||||
export function createAnthropicMessageParser(): ChatGenerateParseFunction {
|
||||
let responseMessage: AnthropicWire_MessageResponse;
|
||||
let hasErrored = false;
|
||||
let messageStartTime: number | undefined = undefined;
|
||||
@@ -52,7 +22,7 @@ export function createDispatchParserAnthropicMessage(): DispatchParser {
|
||||
// Note: at this stage, the parser only returns the text content as text, which is streamed as text
|
||||
// to the client. It is however building in parallel the responseMessage object, which is not
|
||||
// yet used, but contains token counts, for instance.
|
||||
return function* (eventData: string, eventName?: string): Generator<DispatchMessageAction> {
|
||||
return function* (eventData: string, eventName?: string): Generator<ChatGenerateMessageAction> {
|
||||
|
||||
// if we've errored, we should not be receiving more data
|
||||
if (hasErrored)
|
||||
@@ -182,10 +152,10 @@ export function createDispatchParserAnthropicMessage(): DispatchParser {
|
||||
}
|
||||
|
||||
|
||||
export function createDispatchParserAnthropicNS(): DispatchParser {
|
||||
export function createAnthropicMessageParserNS(): ChatGenerateParseFunction {
|
||||
let messageStartTime: number = Date.now();
|
||||
|
||||
return function* (fullData: string): Generator<DispatchMessageAction> {
|
||||
return function* (fullData: string): Generator<ChatGenerateMessageAction> {
|
||||
|
||||
// parse with validation (e.g. type: 'message' && role: 'assistant')
|
||||
const {
|
||||
@@ -246,11 +216,11 @@ function explainGeminiSafetyIssues(safetyRatings?: GeminiSafetyRatings): string
|
||||
.join(', ');
|
||||
}
|
||||
|
||||
export function createDispatchParserGemini(modelName: string): DispatchParser {
|
||||
export function createGeminiParser(modelName: string): ChatGenerateParseFunction {
|
||||
let hasBegun = false;
|
||||
|
||||
// this can throw, it's caught by the caller
|
||||
return function* (eventData): Generator<DispatchMessageAction> {
|
||||
return function* (eventData): Generator<ChatGenerateMessageAction> {
|
||||
|
||||
// parse the JSON chunk
|
||||
const wireGenerationChunk = JSON.parse(eventData);
|
||||
@@ -318,10 +288,10 @@ export function createDispatchParserGemini(modelName: string): DispatchParser {
|
||||
}
|
||||
|
||||
|
||||
export function createDispatchParserOllama(): DispatchParser {
|
||||
export function createOllamaParser(): ChatGenerateParseFunction {
|
||||
let hasBegun = false;
|
||||
|
||||
return function* (eventData: string): Generator<DispatchMessageAction> {
|
||||
return function* (eventData: string): Generator<ChatGenerateMessageAction> {
|
||||
|
||||
// parse the JSON chunk
|
||||
let wireJsonChunk: any;
|
||||
@@ -365,12 +335,12 @@ export function createDispatchParserOllama(): DispatchParser {
|
||||
}
|
||||
|
||||
|
||||
export function createDispatchParserOpenAI(): DispatchParser {
|
||||
export function createOpenAIMessageCreateParser(): ChatGenerateParseFunction {
|
||||
let hasBegun = false;
|
||||
let hasWarned = false;
|
||||
// NOTE: could compute rate (tok/s) from the first textful event to the last (to ignore the prefill time)
|
||||
|
||||
return function* (eventData: string): Generator<DispatchMessageAction> {
|
||||
return function* (eventData: string): Generator<ChatGenerateMessageAction> {
|
||||
|
||||
// Throws on malformed event data
|
||||
const json = openaiWire_ChatCompletionChunkResponse_Schema.parse(JSON.parse(eventData));
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
export type ChatGenerateMessageAction = {
|
||||
op: 'text',
|
||||
text: string;
|
||||
} | {
|
||||
op: 'issue';
|
||||
issue: string;
|
||||
symbol: string;
|
||||
} | {
|
||||
op: 'parser-close';
|
||||
} | {
|
||||
op: 'set';
|
||||
value: {
|
||||
model?: string;
|
||||
stats?: {
|
||||
chatInTokens?: number; // -1: unknown
|
||||
chatOutTokens: number;
|
||||
chatOutRate?: number;
|
||||
timeInner?: number;
|
||||
timeOuter?: number;
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
export type ChatGenerateParseFunction = (eventData: string, eventName?: string) => Generator<ChatGenerateMessageAction>;
|
||||
+33
-25
@@ -1,11 +1,28 @@
|
||||
import { createParser as createEventsourceParser } from 'eventsource-parser';
|
||||
|
||||
/**
|
||||
* Event stream formats
|
||||
* - 'sse' is the default format, and is used by all vendors except Ollama
|
||||
* - 'json-nl' is used by Ollama
|
||||
* The format of the stream: 'sse' or 'json-nl'
|
||||
* - 'sse' is the default format, and is used by all vendors except Ollama
|
||||
* - 'json-nl' is used by Ollama
|
||||
*/
|
||||
type DispatchDemuxFormat = 'sse' | 'json-nl';
|
||||
export type StreamDemuxerFormat = 'sse' | 'json-nl' | null;
|
||||
|
||||
|
||||
/**
|
||||
* Creates a demuxer for a stream of events.
|
||||
* The demuxer is stateful and accumulates data until a full event is available.
|
||||
*/
|
||||
export function createStreamDemuxer(format: StreamDemuxerFormat): StreamDemuxer {
|
||||
switch (format) {
|
||||
case 'sse':
|
||||
return _createEventSourceDemuxer();
|
||||
case 'json-nl':
|
||||
return _createJsonNlDemuxer();
|
||||
case null:
|
||||
return _nullStreamDemuxerWarn;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
export type DemuxedEvent = {
|
||||
type: 'event' | 'reconnect-interval';
|
||||
@@ -13,37 +30,19 @@ export type DemuxedEvent = {
|
||||
data: string;
|
||||
};
|
||||
|
||||
type DispatchDemuxer = {
|
||||
type StreamDemuxer = {
|
||||
demux: (chunk: string) => DemuxedEvent[];
|
||||
remaining: () => string;
|
||||
};
|
||||
|
||||
|
||||
export function createDispatchDemuxer(format: DispatchDemuxFormat) {
|
||||
switch (format) {
|
||||
case 'sse':
|
||||
return _createEventSourceDemuxer();
|
||||
case 'json-nl':
|
||||
return _createJsonNlDemuxer();
|
||||
}
|
||||
}
|
||||
|
||||
export const nullDispatchDemuxer: DispatchDemuxer = {
|
||||
demux: () => {
|
||||
console.warn('Null demuxer called - shall not happen, as it is only created in non-streaming');
|
||||
return [];
|
||||
},
|
||||
remaining: () => '',
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* Creates a parser for an EventSource stream (e.g. OpenAI's format).
|
||||
* Uses the renowned `eventsource-parser` library.
|
||||
*
|
||||
* Note that we only use the 'feed' function and not 'reset', as we recreate the object per-call.
|
||||
*/
|
||||
function _createEventSourceDemuxer(): DispatchDemuxer {
|
||||
function _createEventSourceDemuxer(): StreamDemuxer {
|
||||
let buffer: DemuxedEvent[] = [];
|
||||
const parser = createEventsourceParser((event) => {
|
||||
switch (event.type) {
|
||||
@@ -71,7 +70,7 @@ function _createEventSourceDemuxer(): DispatchDemuxer {
|
||||
* Creates a parser for a 'JSON\n' non-event stream, to be swapped with an EventSource parser.
|
||||
* Ollama is the only vendor that uses this format.
|
||||
*/
|
||||
function _createJsonNlDemuxer(): DispatchDemuxer {
|
||||
function _createJsonNlDemuxer(): StreamDemuxer {
|
||||
let buffer = '';
|
||||
|
||||
return {
|
||||
@@ -91,3 +90,12 @@ function _createJsonNlDemuxer(): DispatchDemuxer {
|
||||
remaining: () => buffer,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
const _nullStreamDemuxerWarn: StreamDemuxer = {
|
||||
demux: () => {
|
||||
console.warn('Null demuxer called - shall not happen, as it is only created in non-streaming');
|
||||
return [];
|
||||
},
|
||||
remaining: () => '',
|
||||
};
|
||||
@@ -1,7 +1,7 @@
|
||||
import { SERVER_DEBUG_WIRE } from '~/server/wire';
|
||||
|
||||
import type { DemuxedEvent } from '../dispatch/chatGenerate/chatGenerate.demuxers';
|
||||
import type { DispatchMessageAction } from '../dispatch/chatGenerate/chatGenerate.parsers';
|
||||
import type { ChatGenerateMessageAction } from '../dispatch/chatGenerate/chatGenerate.types';
|
||||
import type { DemuxedEvent } from '../dispatch/stream.demuxers';
|
||||
|
||||
|
||||
// type IntakeProtoObject = IntakeControlProtoObject | IntakeEventProtoObject;
|
||||
@@ -44,7 +44,7 @@ export class IntakeHandler {
|
||||
yield op;
|
||||
}
|
||||
|
||||
* yieldDmaOps(parsedEvents: Generator<DispatchMessageAction>, prettyDialect: string) {
|
||||
* yieldDmaOps(parsedEvents: Generator<ChatGenerateMessageAction>, prettyDialect: string) {
|
||||
for (const dma of parsedEvents) {
|
||||
// console.log('parsed dispatch:', dma);
|
||||
// TODO: massively rework this into a good protocol
|
||||
|
||||
Reference in New Issue
Block a user