Next Edge/tRPC: server-side delay (improved workaround) for the 'Stream closed' issue only. #805

This commit is contained in:
Enrico Ros
2025-05-26 14:11:02 -07:00
parent e189d3e174
commit 2894c07049
6 changed files with 59 additions and 30 deletions
+3 -22
View File
@@ -2,6 +2,7 @@ import { z } from 'zod';
import { createEmptyReadableStream, createServerDebugWireEvents, safeErrorString, serverCapitalizeFirstLetter } from '~/server/wire';
import { createTRPCRouter, publicProcedure } from '~/server/trpc/trpc.server';
import { delayPostAsyncGeneratorOnEdge } from '~/server/trpc/trpc.next-edge';
import { fetchResponseOrTRPCThrow } from '~/server/trpc/trpc.router.fetchers';
import { AixDemuxers } from '../dispatch/stream.demuxers';
@@ -37,7 +38,7 @@ export const aixRouter = createTRPCRouter({
streaming: z.boolean(),
connectionOptions: AixWire_API.ConnectionOptions_schema.optional(),
}))
.mutation(async function* ({ input, ctx }): AsyncGenerator<AixWire_Particles.ChatGenerateOp> {
.mutation(delayPostAsyncGeneratorOnEdge(0, async function* ({ input, ctx }): AsyncGenerator<AixWire_Particles.ChatGenerateOp> {
// Intake derived state
@@ -235,26 +236,6 @@ export const aixRouter = createTRPCRouter({
// or an error that has already been queued up for this last flush
yield* chatGenerateTx.flushParticles();
/**
* 2025-05-22: Workaround an issue that appeared in all Vercel deployments.
*
* This is an emergency fix deployed with priority.
*
* Analysis:
* - the server side saw the following exceptions on Vercel, during the call to this server-side tRPC
* streaming chatGenerateContent function:
*
* TypeError: Cannot read properties of undefined (reading 'return')
* at (node_modules/@trpc/server/dist/unstable-core-do-not-import/stream/utils/disposable.mjs:38:0)
* at (node_modules/@trpc/server/dist/unstable-core-do-not-import/stream/jsonl.mjs:204:0)
*
* - we haven't isolated the cause, but seems that awaiting for the next event loop cycle suppresses
* the issue.
*
* Ext refs: posted to the tRCP Discord on the streaming channel if anyone else saw this issue.
*/
await new Promise((resolve) => setTimeout(resolve, 0));
}),
})),
});
+3 -2
View File
@@ -6,6 +6,7 @@ import { default as TurndownService } from 'turndown';
import { load as cheerioLoad } from 'cheerio';
import { createTRPCRouter, publicProcedure } from '~/server/trpc/trpc.server';
import { delayPostAsyncGeneratorOnEdge } from '~/server/trpc/trpc.next-edge';
import { env } from '~/server/env';
import { workerPuppeteerDownloadFileOrThrow } from './browse.files';
@@ -71,7 +72,7 @@ export const browseRouter = createTRPCRouter({
fetchPagesStreaming: publicProcedure
.input(fetchPageInputSchema)
.mutation(async function* ({ input: { access, requests } }) {
.mutation(delayPostAsyncGeneratorOnEdge(0, async function* ({ input: { access, requests } }) {
// get endpoint
const endpoint = (access.wssEndpoint || env.PUPPETEER_WSS_ENDPOINT || '').trim();
@@ -116,7 +117,7 @@ export const browseRouter = createTRPCRouter({
pages,
workerHost,
};
}),
})),
});
+3 -2
View File
@@ -3,6 +3,7 @@ import { z } from 'zod';
import { createTRPCRouter, publicProcedure } from '~/server/trpc/trpc.server';
import { env } from '~/server/env';
import { fetchJsonOrTRPCThrow, fetchResponseOrTRPCThrow } from '~/server/trpc/trpc.router.fetchers';
import { delayPostAsyncGeneratorOnEdge } from '~/server/trpc/trpc.next-edge';
// configuration
@@ -80,7 +81,7 @@ export const elevenlabsRouter = createTRPCRouter({
*/
speech: publicProcedure
.input(speechInputSchema)
.mutation(async function* ({ input: { xiKey, text, voiceId, nonEnglish, audioStreaming, audioTurbo }, ctx }) {
.mutation(delayPostAsyncGeneratorOnEdge(0, async function* ({ input: { xiKey, text, voiceId, nonEnglish, audioStreaming, audioTurbo }, ctx }) {
// start streaming back
yield { control: 'start' };
@@ -182,7 +183,7 @@ export const elevenlabsRouter = createTRPCRouter({
// end streaming (if a control error wasn't thrown)
yield { control: 'end' };
}),
})),
});
@@ -30,6 +30,7 @@ import { perplexityAIModelDescriptions, perplexityAIModelSort } from './models/p
import { togetherAIModelsToModelDescriptions } from './models/together.models';
import { wilreLocalAIModelsApplyOutputSchema, wireLocalAIModelsAvailableOutputSchema, wireLocalAIModelsListOutputSchema } from './localai.wiretypes';
import { xaiModelDescriptions, xaiModelSort } from './models/xai.models';
import { delayPostAsyncGeneratorOnEdge } from '~/server/trpc/trpc.next-edge';
const openAIDialects = z.enum([
@@ -274,7 +275,7 @@ export const llmOpenAIRouter = createTRPCRouter({
/* [OpenAI/LocalAI] images/generations */
createImages: publicProcedure
.input(createImagesInputSchema)
.mutation(async function* ({ input }): AsyncGenerator<T2ICreateImageAsyncStreamOp> {
.mutation(delayPostAsyncGeneratorOnEdge(0, async function* ({ input }): AsyncGenerator<T2ICreateImageAsyncStreamOp> {
const { access, generationConfig: config, editConfig } = input;
@@ -394,7 +395,7 @@ export const llmOpenAIRouter = createTRPCRouter({
},
};
}
}),
})),
/* [OpenAI] check for content policy violations */
+3 -2
View File
@@ -1,6 +1,7 @@
import { z } from 'zod';
import { createTRPCRouter, publicProcedure } from '~/server/trpc/trpc.server';
import { delayPostAsyncGeneratorOnEdge } from '~/server/trpc/trpc.next-edge';
import { env } from '~/server/env';
import { fetchResponseOrTRPCThrow } from '~/server/trpc/trpc.router.fetchers';
@@ -27,7 +28,7 @@ export const prodiaRouter = createTRPCRouter({
height: z.number().optional(),
seed: z.number().optional(),
}))
.query(async function* ({ input }): AsyncGenerator<T2ICreateImageAsyncStreamOp> {
.query(delayPostAsyncGeneratorOnEdge(0, async function* ({ input }): AsyncGenerator<T2ICreateImageAsyncStreamOp> {
const key = (input.prodiaKey || env.PRODIA_API_KEY || '').trim();
if (!key)
@@ -138,6 +139,6 @@ export const prodiaRouter = createTRPCRouter({
},
};
}),
})),
});
+44
View File
@@ -0,0 +1,44 @@
/// Next.js Edge Runtime check - won't activate otherwise
declare global {
// noinspection ES6ConvertVarToLetConst
var EdgeRuntime: string | undefined;
}
/**
* 2025-05-22: Workaround an issue that appeared in all Vercel deployments.
* https://github.com/enricoros/big-AGI/issues/805
*
* This is an emergency workaround for the 'Stream closed' issue, while the 6 Exceptions are still
* thrown on each tRPC call.
*
* Analysis:
* - the server side saw the following exceptions on Vercel, during the call to this server-side tRPC
* streaming chatGenerateContent function:
*
* TypeError: Cannot read properties of undefined (reading 'return')
* at (node_modules/@trpc/server/dist/unstable-core-do-not-import/stream/utils/disposable.mjs:38:0)
* at (node_modules/@trpc/server/dist/unstable-core-do-not-import/stream/jsonl.mjs:204:0)
*
* - we haven't isolated the cause, but seems that awaiting for the next event loop cycle suppresses
* the issue.
*
* Ext refs: posted to the tRCP Discord on the streaming channel if anyone else saw this issue.
*/
export function delayPostAsyncGeneratorOnEdge<TArgs, TYield>(
delayMs: number,
originalAsyncGeneratorFn: (args: TArgs) => AsyncGenerator<TYield>,
): (args: TArgs) => AsyncGenerator<TYield> {
return async function* wrappedAsyncGenerator(args: TArgs): AsyncGenerator<TYield> {
yield* originalAsyncGeneratorFn(args);
// [EdgeRuntime]
if (typeof EdgeRuntime === 'string') {
if (delayMs >= 0)
await new Promise((resolve) => setTimeout(resolve, delayMs));
}
// explicitly return void
return;
};
}