diff --git a/src/modules/aix/client/ContentReassembler.ts b/src/modules/aix/client/ContentReassembler.ts index f7bc5eb9e..b5cefe59e 100644 --- a/src/modules/aix/client/ContentReassembler.ts +++ b/src/modules/aix/client/ContentReassembler.ts @@ -197,6 +197,9 @@ export class ContentReassembler { // PartParticleOp case 'p' in op: switch (op.p) { + case '❤': + // ignore the heartbeats + break; case 'tr_': this.onAppendReasoningText(op); break; diff --git a/src/modules/aix/server/api/aix.router.ts b/src/modules/aix/server/api/aix.router.ts index 7762aec1a..7be3f2fc7 100644 --- a/src/modules/aix/server/api/aix.router.ts +++ b/src/modules/aix/server/api/aix.router.ts @@ -6,10 +6,15 @@ import { fetchResponseOrTRPCThrow } from '~/server/trpc/trpc.router.fetchers'; import { AixWire_API, AixWire_API_ChatContentGenerate, AixWire_Particles } from './aix.wiretypes'; import { ChatGenerateTransmitter } from '../dispatch/chatGenerate/ChatGenerateTransmitter'; +import { awaitWithHeartbeats } from '../dispatch/awaitWithHeartbeats'; import { createChatGenerateDispatch } from '../dispatch/chatGenerate/chatGenerate.dispatch'; import { createStreamDemuxer } from '../dispatch/stream.demuxers'; +// configuration +const HEARTBEAT_TIMEOUT_MS = 10_000; + + export const aixRouter = createTRPCRouter({ /** @@ -59,8 +64,8 @@ export const aixRouter = createTRPCRouter({ yield* chatGenerateTx.emitParticles(); } - // Blocking fetch - may timeout, for instance with long Anthriopic requests (>25s on Vercel) - dispatchResponse = await fetchResponseOrTRPCThrow({ + // Blocking fetch with heartbeats - combats timeouts, for instance with long Anthriopic requests (>25s on Vercel) + dispatchResponse = yield* awaitWithHeartbeats(fetchResponseOrTRPCThrow({ url: dispatch.request.url, method: 'POST', headers: dispatch.request.headers, @@ -68,7 +73,7 @@ export const aixRouter = createTRPCRouter({ signal: intakeAbortSignal, name: `Aix.${prettyDialect}`, throwWithoutName: true, - }); + }), HEARTBEAT_TIMEOUT_MS); } catch (error: any) { // Handle expected dispatch abortion while the first fetch hasn't even completed @@ -94,8 +99,8 @@ export const aixRouter = createTRPCRouter({ if (!streaming) { let dispatchBody: string | undefined = undefined; try { - // Read the full response body - dispatchBody = await dispatchResponse.text(); + // Read the full response body with heartbeats + dispatchBody = yield* awaitWithHeartbeats(dispatchResponse.text(), HEARTBEAT_TIMEOUT_MS); serverDebugIncomingPackets?.onMessage(dispatchBody); // Parse the response in full @@ -125,7 +130,7 @@ export const aixRouter = createTRPCRouter({ // Read AI Service chunk let dispatchChunk: string; try { - const { done, value } = await dispatchReader.read(); + const { done, value } = yield* awaitWithHeartbeats(dispatchReader.read(), HEARTBEAT_TIMEOUT_MS); // Handle normal dispatch stream closure (no more data, AI Service closed the stream) if (done) { diff --git a/src/modules/aix/server/api/aix.wiretypes.ts b/src/modules/aix/server/api/aix.wiretypes.ts index 2d1b1de2d..e3d616fa7 100644 --- a/src/modules/aix/server/api/aix.wiretypes.ts +++ b/src/modules/aix/server/api/aix.wiretypes.ts @@ -572,6 +572,7 @@ export namespace AixWire_Particles { | { t: string }; // special: incremental text, but with a more optimized/succinct representation compared to { p: 't_', i_t: string } export type PartParticleOp = + | { p: '❤' } // heart beat | { p: 'tr_', _t: string, weak?: 'tag' } // reasoning text, incremental; could be a 'weak' detection, e.g. heuristic from '' rather than API-provided | { p: 'trs', signature: string } // reasoning signature | { p: 'trr_', _data: string } // reasoning raw (or redacted) data diff --git a/src/modules/aix/server/dispatch/awaitWithHeartbeats.ts b/src/modules/aix/server/dispatch/awaitWithHeartbeats.ts new file mode 100644 index 000000000..1d4158ca5 --- /dev/null +++ b/src/modules/aix/server/dispatch/awaitWithHeartbeats.ts @@ -0,0 +1,47 @@ +/** + * Awaits a promise while sending ❤ + * + * Maintains connection liveliness during long-running operations such as + * long fetches (e.g. Anthropic on large context) or long reads (e.g. + * image generation by Gemini Image). + * + * @param operationPromise Promise to await with heartbeat protection + * @param timeoutMs Time in ms between heartbeats (if 0, no heartbeats) + * @returns The same result as awaiting the promise + */ +export async function* awaitWithHeartbeats(operationPromise: Promise, timeoutMs: number) { + if (!timeoutMs) return await operationPromise; + + // holds the outcome in either state + const operationWrapper = operationPromise + .then(result => ({ type: 'resolved' as const, result })) + .catch(error => ({ type: 'rejected' as const, error })); + + while (true) { + + // setup next ❤ timeout + const heartbeatPromise = new Promise<'❤'>(resolve => { + setTimeout(() => resolve('❤'), timeoutMs); + }); + + // race ❤|operation + const winner = await Promise.race([ + operationWrapper, + heartbeatPromise, + ]); + + // if the operation won, great, we're done + if (winner !== '❤') + break; + + // otherwise send the ❤ + yield { p: '❤' as const }; + } + + // return the actual result (or throw if rejected) + const wrappedResult = await operationWrapper; + if (wrappedResult.type === 'rejected') + throw wrappedResult.error; + + return wrappedResult.result; +}