mirror of
https://github.com/enricoros/big-AGI.git
synced 2026-05-10 21:50:14 -07:00
Next Edge/tRPC: server-side delay (improved workaround) for the 'Stream closed' issue only. #805
This commit is contained in:
@@ -2,6 +2,7 @@ import { z } from 'zod';
|
||||
|
||||
import { createEmptyReadableStream, createServerDebugWireEvents, safeErrorString, serverCapitalizeFirstLetter } from '~/server/wire';
|
||||
import { createTRPCRouter, publicProcedure } from '~/server/trpc/trpc.server';
|
||||
import { delayPostAsyncGeneratorOnEdge } from '~/server/trpc/trpc.next-edge';
|
||||
import { fetchResponseOrTRPCThrow } from '~/server/trpc/trpc.router.fetchers';
|
||||
|
||||
import { AixDemuxers } from '../dispatch/stream.demuxers';
|
||||
@@ -37,7 +38,7 @@ export const aixRouter = createTRPCRouter({
|
||||
streaming: z.boolean(),
|
||||
connectionOptions: AixWire_API.ConnectionOptions_schema.optional(),
|
||||
}))
|
||||
.mutation(async function* ({ input, ctx }): AsyncGenerator<AixWire_Particles.ChatGenerateOp> {
|
||||
.mutation(delayPostAsyncGeneratorOnEdge(0, async function* ({ input, ctx }): AsyncGenerator<AixWire_Particles.ChatGenerateOp> {
|
||||
|
||||
|
||||
// Intake derived state
|
||||
@@ -235,26 +236,6 @@ export const aixRouter = createTRPCRouter({
|
||||
// or an error that has already been queued up for this last flush
|
||||
yield* chatGenerateTx.flushParticles();
|
||||
|
||||
/**
|
||||
* 2025-05-22: Workaround an issue that appeared in all Vercel deployments.
|
||||
*
|
||||
* This is an emergency fix deployed with priority.
|
||||
*
|
||||
* Analysis:
|
||||
* - the server side saw the following exceptions on Vercel, during the call to this server-side tRPC
|
||||
* streaming chatGenerateContent function:
|
||||
*
|
||||
* TypeError: Cannot read properties of undefined (reading 'return')
|
||||
* at (node_modules/@trpc/server/dist/unstable-core-do-not-import/stream/utils/disposable.mjs:38:0)
|
||||
* at (node_modules/@trpc/server/dist/unstable-core-do-not-import/stream/jsonl.mjs:204:0)
|
||||
*
|
||||
* - we haven't isolated the cause, but seems that awaiting for the next event loop cycle suppresses
|
||||
* the issue.
|
||||
*
|
||||
* Ext refs: posted to the tRCP Discord on the streaming channel if anyone else saw this issue.
|
||||
*/
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
|
||||
}),
|
||||
})),
|
||||
|
||||
});
|
||||
|
||||
@@ -6,6 +6,7 @@ import { default as TurndownService } from 'turndown';
|
||||
import { load as cheerioLoad } from 'cheerio';
|
||||
|
||||
import { createTRPCRouter, publicProcedure } from '~/server/trpc/trpc.server';
|
||||
import { delayPostAsyncGeneratorOnEdge } from '~/server/trpc/trpc.next-edge';
|
||||
import { env } from '~/server/env';
|
||||
|
||||
import { workerPuppeteerDownloadFileOrThrow } from './browse.files';
|
||||
@@ -71,7 +72,7 @@ export const browseRouter = createTRPCRouter({
|
||||
|
||||
fetchPagesStreaming: publicProcedure
|
||||
.input(fetchPageInputSchema)
|
||||
.mutation(async function* ({ input: { access, requests } }) {
|
||||
.mutation(delayPostAsyncGeneratorOnEdge(0, async function* ({ input: { access, requests } }) {
|
||||
|
||||
// get endpoint
|
||||
const endpoint = (access.wssEndpoint || env.PUPPETEER_WSS_ENDPOINT || '').trim();
|
||||
@@ -116,7 +117,7 @@ export const browseRouter = createTRPCRouter({
|
||||
pages,
|
||||
workerHost,
|
||||
};
|
||||
}),
|
||||
})),
|
||||
|
||||
});
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ import { z } from 'zod';
|
||||
import { createTRPCRouter, publicProcedure } from '~/server/trpc/trpc.server';
|
||||
import { env } from '~/server/env';
|
||||
import { fetchJsonOrTRPCThrow, fetchResponseOrTRPCThrow } from '~/server/trpc/trpc.router.fetchers';
|
||||
import { delayPostAsyncGeneratorOnEdge } from '~/server/trpc/trpc.next-edge';
|
||||
|
||||
|
||||
// configuration
|
||||
@@ -80,7 +81,7 @@ export const elevenlabsRouter = createTRPCRouter({
|
||||
*/
|
||||
speech: publicProcedure
|
||||
.input(speechInputSchema)
|
||||
.mutation(async function* ({ input: { xiKey, text, voiceId, nonEnglish, audioStreaming, audioTurbo }, ctx }) {
|
||||
.mutation(delayPostAsyncGeneratorOnEdge(0, async function* ({ input: { xiKey, text, voiceId, nonEnglish, audioStreaming, audioTurbo }, ctx }) {
|
||||
|
||||
// start streaming back
|
||||
yield { control: 'start' };
|
||||
@@ -182,7 +183,7 @@ export const elevenlabsRouter = createTRPCRouter({
|
||||
|
||||
// end streaming (if a control error wasn't thrown)
|
||||
yield { control: 'end' };
|
||||
}),
|
||||
})),
|
||||
|
||||
});
|
||||
|
||||
|
||||
@@ -30,6 +30,7 @@ import { perplexityAIModelDescriptions, perplexityAIModelSort } from './models/p
|
||||
import { togetherAIModelsToModelDescriptions } from './models/together.models';
|
||||
import { wilreLocalAIModelsApplyOutputSchema, wireLocalAIModelsAvailableOutputSchema, wireLocalAIModelsListOutputSchema } from './localai.wiretypes';
|
||||
import { xaiModelDescriptions, xaiModelSort } from './models/xai.models';
|
||||
import { delayPostAsyncGeneratorOnEdge } from '~/server/trpc/trpc.next-edge';
|
||||
|
||||
|
||||
const openAIDialects = z.enum([
|
||||
@@ -274,7 +275,7 @@ export const llmOpenAIRouter = createTRPCRouter({
|
||||
/* [OpenAI/LocalAI] images/generations */
|
||||
createImages: publicProcedure
|
||||
.input(createImagesInputSchema)
|
||||
.mutation(async function* ({ input }): AsyncGenerator<T2ICreateImageAsyncStreamOp> {
|
||||
.mutation(delayPostAsyncGeneratorOnEdge(0, async function* ({ input }): AsyncGenerator<T2ICreateImageAsyncStreamOp> {
|
||||
|
||||
const { access, generationConfig: config, editConfig } = input;
|
||||
|
||||
@@ -394,7 +395,7 @@ export const llmOpenAIRouter = createTRPCRouter({
|
||||
},
|
||||
};
|
||||
}
|
||||
}),
|
||||
})),
|
||||
|
||||
|
||||
/* [OpenAI] check for content policy violations */
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { z } from 'zod';
|
||||
|
||||
import { createTRPCRouter, publicProcedure } from '~/server/trpc/trpc.server';
|
||||
import { delayPostAsyncGeneratorOnEdge } from '~/server/trpc/trpc.next-edge';
|
||||
import { env } from '~/server/env';
|
||||
import { fetchResponseOrTRPCThrow } from '~/server/trpc/trpc.router.fetchers';
|
||||
|
||||
@@ -27,7 +28,7 @@ export const prodiaRouter = createTRPCRouter({
|
||||
height: z.number().optional(),
|
||||
seed: z.number().optional(),
|
||||
}))
|
||||
.query(async function* ({ input }): AsyncGenerator<T2ICreateImageAsyncStreamOp> {
|
||||
.query(delayPostAsyncGeneratorOnEdge(0, async function* ({ input }): AsyncGenerator<T2ICreateImageAsyncStreamOp> {
|
||||
|
||||
const key = (input.prodiaKey || env.PRODIA_API_KEY || '').trim();
|
||||
if (!key)
|
||||
@@ -138,6 +139,6 @@ export const prodiaRouter = createTRPCRouter({
|
||||
},
|
||||
};
|
||||
|
||||
}),
|
||||
})),
|
||||
|
||||
});
|
||||
|
||||
@@ -0,0 +1,44 @@
|
||||
/// Next.js Edge Runtime check - won't activate otherwise
|
||||
declare global {
|
||||
// noinspection ES6ConvertVarToLetConst
|
||||
var EdgeRuntime: string | undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* 2025-05-22: Workaround an issue that appeared in all Vercel deployments.
|
||||
* https://github.com/enricoros/big-AGI/issues/805
|
||||
*
|
||||
* This is an emergency workaround for the 'Stream closed' issue, while the 6 Exceptions are still
|
||||
* thrown on each tRPC call.
|
||||
*
|
||||
* Analysis:
|
||||
* - the server side saw the following exceptions on Vercel, during the call to this server-side tRPC
|
||||
* streaming chatGenerateContent function:
|
||||
*
|
||||
* TypeError: Cannot read properties of undefined (reading 'return')
|
||||
* at (node_modules/@trpc/server/dist/unstable-core-do-not-import/stream/utils/disposable.mjs:38:0)
|
||||
* at (node_modules/@trpc/server/dist/unstable-core-do-not-import/stream/jsonl.mjs:204:0)
|
||||
*
|
||||
* - we haven't isolated the cause, but seems that awaiting for the next event loop cycle suppresses
|
||||
* the issue.
|
||||
*
|
||||
* Ext refs: posted to the tRCP Discord on the streaming channel if anyone else saw this issue.
|
||||
*/
|
||||
export function delayPostAsyncGeneratorOnEdge<TArgs, TYield>(
|
||||
delayMs: number,
|
||||
originalAsyncGeneratorFn: (args: TArgs) => AsyncGenerator<TYield>,
|
||||
): (args: TArgs) => AsyncGenerator<TYield> {
|
||||
return async function* wrappedAsyncGenerator(args: TArgs): AsyncGenerator<TYield> {
|
||||
|
||||
yield* originalAsyncGeneratorFn(args);
|
||||
|
||||
// [EdgeRuntime]
|
||||
if (typeof EdgeRuntime === 'string') {
|
||||
if (delayMs >= 0)
|
||||
await new Promise((resolve) => setTimeout(resolve, delayMs));
|
||||
}
|
||||
|
||||
// explicitly return void
|
||||
return;
|
||||
};
|
||||
}
|
||||
Reference in New Issue
Block a user