AIX: Rename router

This commit is contained in:
Enrico Ros
2024-07-09 10:07:44 -07:00
parent dd3993ec4f
commit 6b11291284
3 changed files with 22 additions and 80 deletions
@@ -1,66 +1,27 @@
import { createEmptyReadableStream, safeErrorString, SERVER_DEBUG_WIRE, serverCapitalizeFirstLetter } from '~/server/wire';
import { z } from 'zod';
import { createEmptyReadableStream, safeErrorString, serverCapitalizeFirstLetter } from '~/server/wire';
import { createTRPCRouter, publicProcedure } from '~/server/api/trpc.server';
import { fetchResponseOrTRPCThrow } from '~/server/api/trpc.router.fetchers';
import type { DemuxedEvent } from './upstream.demuxers';
import { aixGenerateContentInputSchema } from '../shared/aix.shared.chat';
import { prepareUpstream } from './upstream';
import { prepareDispatch } from './dispatch/prepareDispatch';
import { IntakeHandler } from '~/modules/aix/server/intake/IntakeHandler';
import { aixAccessSchema, aixHistorySchema, aixModelSchema, aixStreamingContextSchema } from '~/modules/aix/server/intake/aix.types';
import { aixToolsPolicySchema, aixToolsSchema } from '~/modules/aix/server/intake/aix.tool.types';
// stream handler as a class
class DownstreamHandler {
private upstreamReceivedEvents: number = 0;
private debugReceivedLastMs: number | null = null;
public downstreamTerminated: boolean = false;
// export type AixGenerateContentInput = z.infer<typeof aixGenerateContentInputSchema>;
constructor(readonly prettyDialect: string) {
// ...
}
onReceivedUpstreamEvent(demuxedEvent: DemuxedEvent) {
this.upstreamReceivedEvents++;
if (SERVER_DEBUG_WIRE) {
const nowMs = Date.now();
const elapsedMs = this.debugReceivedLastMs ? nowMs - this.debugReceivedLastMs : 0;
this.debugReceivedLastMs = nowMs;
console.log(`<- SSE (${elapsedMs} ms):`, demuxedEvent);
}
}
async* handleStream() {
// ...
}
* yieldStart() {
yield {
type: 'start',
};
}
* yieldTermination(reasonId: 'upstream-close' | 'event-done' | 'parser-done') {
if (SERVER_DEBUG_WIRE || true)
console.log('|terminate|', reasonId, this.downstreamTerminated ? '(WARNING: already terminated)' : '');
if (this.downstreamTerminated) return;
yield {
type: 'done',
};
this.downstreamTerminated = true;
}
* yieldError(errorId: 'upstream-prepare' | 'upstream-fetch' | 'upstream-read' | 'upstream-parse', errorText: string, forceErrorMessage?: boolean) {
if (SERVER_DEBUG_WIRE || forceErrorMessage || true)
console.error(`[POST] /api/llms/stream: ${this.prettyDialect}: ${errorId}: ${errorText}`);
yield {
issueId: errorId,
issueText: errorText,
};
this.downstreamTerminated = true;
}
markTermination() {
this.downstreamTerminated = true;
}
}
export const aixGenerateContentInputSchema = z.object({
access: aixAccessSchema,
model: aixModelSchema,
history: aixHistorySchema,
tools: aixToolsSchema.optional(),
toolPolicy: aixToolsPolicySchema.optional(),
context: aixStreamingContextSchema,
// stream? -> discriminated via the rpc function name
});
export const aixRouter = createTRPCRouter({
@@ -76,13 +37,13 @@ export const aixRouter = createTRPCRouter({
const prettyDialect = serverCapitalizeFirstLetter(accessDialect);
// Downstream handler
const downstreamHandler = new DownstreamHandler(prettyDialect);
const downstreamHandler = new IntakeHandler(prettyDialect);
yield* downstreamHandler.yieldStart();
// Prepare the upstream
let upstreamData: ReturnType<typeof prepareUpstream>;
let upstreamData: ReturnType<typeof prepareDispatch>;
try {
upstreamData = prepareUpstream(access, model, history);
upstreamData = prepareDispatch(access, model, history);
} catch (error: any) {
yield* downstreamHandler.yieldError('upstream-prepare', `**[Service Prepare Issue] ${prettyDialect}**: ${safeErrorString(error) || 'Unknown service preparation error'}`);
return; // exit
-19
View File
@@ -1,19 +0,0 @@
import { z } from 'zod';
import { aixAccessSchema, aixHistorySchema, aixModelSchema, aixStreamingContextSchema } from '~/modules/aix/shared/aix.shared.types';
import { aixToolsPolicySchema, aixToolsSchema } from './aix.shared.tools';
/// GENERATE INPUT Schema ///
export type AixGenerateContentInput = z.infer<typeof aixGenerateContentInputSchema>;
export const aixGenerateContentInputSchema = z.object({
access: aixAccessSchema,
model: aixModelSchema,
history: aixHistorySchema,
tools: aixToolsSchema.optional(),
toolPolicy: aixToolsPolicySchema.optional(),
context: aixStreamingContextSchema,
// stream? -> discriminated via the rpc function name
});
+1 -1
View File
@@ -1,6 +1,6 @@
import { createTRPCRouter } from './trpc.server';
import { aixRouter } from '~/modules/aix/server/aix.stream.router';
import { aixRouter } from '~/modules/aix/server/aix.router';
import { backendRouter } from '~/modules/backend/backend.router';
import { elevenlabsRouter } from '~/modules/elevenlabs/elevenlabs.router';
import { googleSearchRouter } from '~/modules/google/search.router';