AIX: ❤|awaited ops

This commit is contained in:
Enrico Ros
2025-03-16 17:43:07 -07:00
parent ab55804039
commit 6faa468ed3
4 changed files with 62 additions and 6 deletions
@@ -197,6 +197,9 @@ export class ContentReassembler {
// PartParticleOp
case 'p' in op:
switch (op.p) {
case '❤':
// ignore the heartbeats
break;
case 'tr_':
this.onAppendReasoningText(op);
break;
+11 -6
View File
@@ -6,10 +6,15 @@ import { fetchResponseOrTRPCThrow } from '~/server/trpc/trpc.router.fetchers';
import { AixWire_API, AixWire_API_ChatContentGenerate, AixWire_Particles } from './aix.wiretypes';
import { ChatGenerateTransmitter } from '../dispatch/chatGenerate/ChatGenerateTransmitter';
import { awaitWithHeartbeats } from '../dispatch/awaitWithHeartbeats';
import { createChatGenerateDispatch } from '../dispatch/chatGenerate/chatGenerate.dispatch';
import { createStreamDemuxer } from '../dispatch/stream.demuxers';
// configuration
const HEARTBEAT_TIMEOUT_MS = 10_000;
export const aixRouter = createTRPCRouter({
/**
@@ -59,8 +64,8 @@ export const aixRouter = createTRPCRouter({
yield* chatGenerateTx.emitParticles();
}
// Blocking fetch - may timeout, for instance with long Anthriopic requests (>25s on Vercel)
dispatchResponse = await fetchResponseOrTRPCThrow({
// Blocking fetch with heartbeats - combats timeouts, for instance with long Anthriopic requests (>25s on Vercel)
dispatchResponse = yield* awaitWithHeartbeats(fetchResponseOrTRPCThrow({
url: dispatch.request.url,
method: 'POST',
headers: dispatch.request.headers,
@@ -68,7 +73,7 @@ export const aixRouter = createTRPCRouter({
signal: intakeAbortSignal,
name: `Aix.${prettyDialect}`,
throwWithoutName: true,
});
}), HEARTBEAT_TIMEOUT_MS);
} catch (error: any) {
// Handle expected dispatch abortion while the first fetch hasn't even completed
@@ -94,8 +99,8 @@ export const aixRouter = createTRPCRouter({
if (!streaming) {
let dispatchBody: string | undefined = undefined;
try {
// Read the full response body
dispatchBody = await dispatchResponse.text();
// Read the full response body with heartbeats
dispatchBody = yield* awaitWithHeartbeats(dispatchResponse.text(), HEARTBEAT_TIMEOUT_MS);
serverDebugIncomingPackets?.onMessage(dispatchBody);
// Parse the response in full
@@ -125,7 +130,7 @@ export const aixRouter = createTRPCRouter({
// Read AI Service chunk
let dispatchChunk: string;
try {
const { done, value } = await dispatchReader.read();
const { done, value } = yield* awaitWithHeartbeats(dispatchReader.read(), HEARTBEAT_TIMEOUT_MS);
// Handle normal dispatch stream closure (no more data, AI Service closed the stream)
if (done) {
@@ -572,6 +572,7 @@ export namespace AixWire_Particles {
| { t: string }; // special: incremental text, but with a more optimized/succinct representation compared to { p: 't_', i_t: string }
export type PartParticleOp =
| { p: '❤' } // heart beat
| { p: 'tr_', _t: string, weak?: 'tag' } // reasoning text, incremental; could be a 'weak' detection, e.g. heuristic from '<think>' rather than API-provided
| { p: 'trs', signature: string } // reasoning signature
| { p: 'trr_', _data: string } // reasoning raw (or redacted) data
@@ -0,0 +1,47 @@
/**
* Awaits a promise while sending ❤
*
* Maintains connection liveliness during long-running operations such as
* long fetches (e.g. Anthropic on large context) or long reads (e.g.
* image generation by Gemini Image).
*
* @param operationPromise Promise to await with heartbeat protection
* @param timeoutMs Time in ms between heartbeats (if 0, no heartbeats)
* @returns The same result as awaiting the promise
*/
export async function* awaitWithHeartbeats<TOut>(operationPromise: Promise<TOut>, timeoutMs: number) {
if (!timeoutMs) return await operationPromise;
// holds the outcome in either state
const operationWrapper = operationPromise
.then(result => ({ type: 'resolved' as const, result }))
.catch(error => ({ type: 'rejected' as const, error }));
while (true) {
// setup next ❤ timeout
const heartbeatPromise = new Promise<'❤'>(resolve => {
setTimeout(() => resolve('❤'), timeoutMs);
});
// race ❤|operation
const winner = await Promise.race([
operationWrapper,
heartbeatPromise,
]);
// if the operation won, great, we're done
if (winner !== '❤')
break;
// otherwise send the ❤
yield { p: '❤' as const };
}
// return the actual result (or throw if rejected)
const wrappedResult = await operationWrapper;
if (wrappedResult.type === 'rejected')
throw wrappedResult.error;
return wrappedResult.result;
}