diff --git a/src/modules/aix/server/aix.stream.router.ts b/src/modules/aix/server/aix.router.ts similarity index 75% rename from src/modules/aix/server/aix.stream.router.ts rename to src/modules/aix/server/aix.router.ts index 73a4bab3a..aecbb6e46 100644 --- a/src/modules/aix/server/aix.stream.router.ts +++ b/src/modules/aix/server/aix.router.ts @@ -1,66 +1,27 @@ -import { createEmptyReadableStream, safeErrorString, SERVER_DEBUG_WIRE, serverCapitalizeFirstLetter } from '~/server/wire'; +import { z } from 'zod'; + +import { createEmptyReadableStream, safeErrorString, serverCapitalizeFirstLetter } from '~/server/wire'; import { createTRPCRouter, publicProcedure } from '~/server/api/trpc.server'; import { fetchResponseOrTRPCThrow } from '~/server/api/trpc.router.fetchers'; -import type { DemuxedEvent } from './upstream.demuxers'; -import { aixGenerateContentInputSchema } from '../shared/aix.shared.chat'; -import { prepareUpstream } from './upstream'; +import { prepareDispatch } from './dispatch/prepareDispatch'; +import { IntakeHandler } from '~/modules/aix/server/intake/IntakeHandler'; + +import { aixAccessSchema, aixHistorySchema, aixModelSchema, aixStreamingContextSchema } from '~/modules/aix/server/intake/aix.types'; +import { aixToolsPolicySchema, aixToolsSchema } from '~/modules/aix/server/intake/aix.tool.types'; -// stream handler as a class -class DownstreamHandler { - private upstreamReceivedEvents: number = 0; - private debugReceivedLastMs: number | null = null; - public downstreamTerminated: boolean = false; +// export type AixGenerateContentInput = z.infer; - constructor(readonly prettyDialect: string) { - // ... - } - - onReceivedUpstreamEvent(demuxedEvent: DemuxedEvent) { - this.upstreamReceivedEvents++; - if (SERVER_DEBUG_WIRE) { - const nowMs = Date.now(); - const elapsedMs = this.debugReceivedLastMs ? nowMs - this.debugReceivedLastMs : 0; - this.debugReceivedLastMs = nowMs; - console.log(`<- SSE (${elapsedMs} ms):`, demuxedEvent); - } - } - - async* handleStream() { - // ... - } - - * yieldStart() { - yield { - type: 'start', - }; - } - - * yieldTermination(reasonId: 'upstream-close' | 'event-done' | 'parser-done') { - if (SERVER_DEBUG_WIRE || true) - console.log('|terminate|', reasonId, this.downstreamTerminated ? '(WARNING: already terminated)' : ''); - if (this.downstreamTerminated) return; - yield { - type: 'done', - }; - this.downstreamTerminated = true; - } - - * yieldError(errorId: 'upstream-prepare' | 'upstream-fetch' | 'upstream-read' | 'upstream-parse', errorText: string, forceErrorMessage?: boolean) { - if (SERVER_DEBUG_WIRE || forceErrorMessage || true) - console.error(`[POST] /api/llms/stream: ${this.prettyDialect}: ${errorId}: ${errorText}`); - yield { - issueId: errorId, - issueText: errorText, - }; - this.downstreamTerminated = true; - } - - markTermination() { - this.downstreamTerminated = true; - } -} +export const aixGenerateContentInputSchema = z.object({ + access: aixAccessSchema, + model: aixModelSchema, + history: aixHistorySchema, + tools: aixToolsSchema.optional(), + toolPolicy: aixToolsPolicySchema.optional(), + context: aixStreamingContextSchema, + // stream? -> discriminated via the rpc function name +}); export const aixRouter = createTRPCRouter({ @@ -76,13 +37,13 @@ export const aixRouter = createTRPCRouter({ const prettyDialect = serverCapitalizeFirstLetter(accessDialect); // Downstream handler - const downstreamHandler = new DownstreamHandler(prettyDialect); + const downstreamHandler = new IntakeHandler(prettyDialect); yield* downstreamHandler.yieldStart(); // Prepare the upstream - let upstreamData: ReturnType; + let upstreamData: ReturnType; try { - upstreamData = prepareUpstream(access, model, history); + upstreamData = prepareDispatch(access, model, history); } catch (error: any) { yield* downstreamHandler.yieldError('upstream-prepare', `**[Service Prepare Issue] ${prettyDialect}**: ${safeErrorString(error) || 'Unknown service preparation error'}`); return; // exit diff --git a/src/modules/aix/shared/aix.shared.chat.ts b/src/modules/aix/shared/aix.shared.chat.ts deleted file mode 100644 index 869c1b021..000000000 --- a/src/modules/aix/shared/aix.shared.chat.ts +++ /dev/null @@ -1,19 +0,0 @@ -import { z } from 'zod'; - -import { aixAccessSchema, aixHistorySchema, aixModelSchema, aixStreamingContextSchema } from '~/modules/aix/shared/aix.shared.types'; -import { aixToolsPolicySchema, aixToolsSchema } from './aix.shared.tools'; - - -/// GENERATE INPUT Schema /// - -export type AixGenerateContentInput = z.infer; - -export const aixGenerateContentInputSchema = z.object({ - access: aixAccessSchema, - model: aixModelSchema, - history: aixHistorySchema, - tools: aixToolsSchema.optional(), - toolPolicy: aixToolsPolicySchema.optional(), - context: aixStreamingContextSchema, - // stream? -> discriminated via the rpc function name -}); diff --git a/src/server/api/trpc.router-edge.ts b/src/server/api/trpc.router-edge.ts index 19c7b4bb9..6cfc2555c 100644 --- a/src/server/api/trpc.router-edge.ts +++ b/src/server/api/trpc.router-edge.ts @@ -1,6 +1,6 @@ import { createTRPCRouter } from './trpc.server'; -import { aixRouter } from '~/modules/aix/server/aix.stream.router'; +import { aixRouter } from '~/modules/aix/server/aix.router'; import { backendRouter } from '~/modules/backend/backend.router'; import { elevenlabsRouter } from '~/modules/elevenlabs/elevenlabs.router'; import { googleSearchRouter } from '~/modules/google/search.router';