AIX: Refactor - Client - Retry & Resume

This refactor allows for low-level looping on the client side.

This can be used for network errors between server<>upstream reported as particles,
as well as for client<>server connections.

One special case of this is the OpenAI system to reattach to detached (background) requests,
or as an alternative to re-fetch them from the server once completed.
This commit is contained in:
Enrico Ros
2025-10-23 04:26:06 -07:00
parent a0b549855f
commit fd0ab93744
3 changed files with 211 additions and 151 deletions
+2 -2
View File
@@ -46,12 +46,12 @@ export class ContentReassembler {
constructor(
private readonly accumulator: AixChatGenerateContent_LL,
private readonly onAccumulatorUpdated?: () => MaybePromise<void>,
enableDebugContext?: AixClientDebugger.Context,
inspectorContext?: AixClientDebugger.Context,
private readonly wireAbortSignal?: AbortSignal,
) {
// [SUDO] Debugging the request, last-write-wins for the global (displayed in the UI)
this.debuggerFrameId = !enableDebugContext ? null : aixClientDebugger_init(enableDebugContext);
this.debuggerFrameId = !inspectorContext ? null : aixClientDebugger_init(inspectorContext);
}
@@ -0,0 +1,75 @@
// internal types
type AixRetryDecision = false | { strategy: AixRetryStrategy; delayMs: number; attemptNumber: number };
type AixRetryStrategy = 'resume' | 'reconnect';
type AixAbortableDelayResult = 'completed' | 'aborted';
/**
* Minimal retry logic for AIX _aixChatGenerateContent_LL.
* - supports 'reconnect', for instance to loop while servers are busy
* - supports 'resume', for instance to recover a network breakage while a responses is still being generated
*/
export class AixStreamRetry {
private attempts = 0;
constructor(
private readonly maxResumeAttempts = 4,
private readonly maxReconnectAttempts = 0, // disabled by default
) {}
/** Determines if error is retryable and returns strategy + delay. */
shallRetry(errorType: 'client-aborted' | 'net-disconnected' | 'request-exceeded' | 'response-captive' | 'net-unknown', hasUpstreamHandle: boolean): AixRetryDecision {
// only retry supported errors
const supportedErrors = [
'net-disconnected',
// 'net-unknown', // NOTE: we only support a hard disconnect for now, for safety
// 'client-aborted', 'request-exceeded', 'response-captive' // absolutely not retryable
];
if (!supportedErrors.includes(errorType))
return false;
// determine strategy
const strategy: AixRetryStrategy = hasUpstreamHandle ? 'resume' : 'reconnect';
const maxAttempts = strategy === 'resume' ? this.maxResumeAttempts : this.maxReconnectAttempts;
// returns the strategy if below max attempts
return this.attempts >= maxAttempts ? false : {
strategy,
delayMs: 500 * Math.pow(2, this.attempts), // exponential backoff: 500ms, 1s, 2s, 4s
attemptNumber: this.attempts + 1, // 1-based for external use
};
}
recordAttempt(): void {
this.attempts++;
}
// isRetrying(): boolean {
// return this.attempts > 0;
// }
// getCurrentAttempt(): number {
// return this.attempts;
// }
/**
* Abort-aware delay helper: waits for the specified delay or until abort signal fires.
* @returns 'completed' if delay finished, 'aborted' if cancelled
*/
static async abortableDelay(delayMs: number, abortSignal: AbortSignal): Promise<AixAbortableDelayResult> {
if (abortSignal.aborted || delayMs <= 0) return delayMs === 0 ? 'completed' : 'aborted';
return await new Promise<AixAbortableDelayResult>((resolve) => {
const onAbort = () => {
clearTimeout(timer);
resolve('aborted');
};
const timer = setTimeout(() => {
abortSignal.removeEventListener('abort', onAbort);
resolve('completed');
}, delayMs);
abortSignal.addEventListener('abort', onAbort, { once: true });
});
}
}
+134 -149
View File
@@ -1,32 +1,30 @@
import { TRPCClientError } from '@trpc/client';
import { findServiceAccessOrThrow } from '~/modules/llms/vendors/vendor.helpers';
import type { DMessage, DMessageGenerator } from '~/common/stores/chat/chat.message';
import type { MaybePromise } from '~/common/types/useful.types';
import { DLLM, DLLMId, LLM_IF_HOTFIX_NoTemperature, LLM_IF_OAI_Responses, LLM_IF_Outputs_Audio, LLM_IF_Outputs_Image, LLM_IF_Outputs_NoText } from '~/common/stores/llms/llms.types';
import { apiStream } from '~/common/util/trpc.client';
import { DMetricsChatGenerate_Lg, metricsChatGenerateLgToMd, metricsComputeChatGenerateCostsMd } from '~/common/stores/metrics/metrics.chatgenerate';
import { DModelParameterValues, getAllModelParameterValues } from '~/common/stores/llms/llms.parameters';
import { apiStream } from '~/common/util/trpc.client';
import { createErrorContentFragment, DMessageContentFragment, DMessageErrorPart, DMessageVoidFragment, isContentFragment, isErrorPart } from '~/common/stores/chat/chat.fragments';
import { findLLMOrThrow } from '~/common/stores/llms/store-llms';
import { getAixInspector } from '~/common/stores/store-ui';
import { getAixInspectorEnabled } from '~/common/stores/store-ui';
import { getLabsDevNoStreaming } from '~/common/stores/store-ux-labs';
import { metricsStoreAddChatGenerate } from '~/common/stores/metrics/store-metrics';
import { presentErrorToHumans } from '~/common/util/errorUtils';
import { webGeolocationCached } from '~/common/util/webGeolocationUtils';
// NOTE: pay particular attention to the "import type", as this is importing from the server-side Zod definitions
import type { AixAPI_Access, AixAPI_Context_ChatGenerate, AixAPI_Model, AixAPIChatGenerate_Request } from '../server/api/aix.wiretypes';
import type { AixAPI_Access, AixAPI_ConnectionOptions_ChatGenerate, AixAPI_Context_ChatGenerate, AixAPI_Model, AixAPIChatGenerate_Request } from '../server/api/aix.wiretypes';
import { aixCGR_ChatSequence_FromDMessagesOrThrow, aixCGR_FromSimpleText, aixCGR_SystemMessage_FromDMessageOrThrow, AixChatGenerate_TextMessages, clientHotFixGenerateRequest_ApplyAll } from './aix.client.chatGenerateRequest';
import { AixStreamRetry } from './aix.client.retry';
import { ContentReassembler } from './ContentReassembler';
import { aixCGR_ChatSequence_FromDMessagesOrThrow, aixCGR_FromSimpleText, aixCGR_SystemMessage_FromDMessageOrThrow, AixChatGenerate_TextMessages, clientHotFixGenerateRequest_ApplyAll } from './aix.client.chatGenerateRequest';
import { aixClassifyStreamingError } from './aix.client.errors';
import { withDecimator } from './withDecimator';
// configuration
export const DEBUG_PARTICLES = false;
const AIX_CLIENT_DEV_ASSERTS = process.env.NODE_ENV === 'development';
export function aixCreateChatGenerateContext(name: AixAPI_Context_ChatGenerate['name'], ref: string | '_DEV_'): AixAPI_Context_ChatGenerate {
@@ -48,7 +46,7 @@ export function aixCreateModelFromLLMOptions(
// destructure input with the overrides
const {
llmRef, llmTemperature, llmResponseTokens, llmTopP, llmForceNoStream,
llmRef, llmTemperature, llmResponseTokens, llmTopP,
llmVndAnt1MContext, llmVndAntSkills, llmVndAntThinkingBudget, llmVndAntWebFetch, llmVndAntWebSearch,
llmVndGeminiAspectRatio, llmVndGeminiGoogleSearch, llmVndGeminiShowThoughts, llmVndGeminiThinkingBudget,
llmVndOaiReasoningEffort, llmVndOaiReasoningEffort4, llmVndOaiRestoreMarkdown, llmVndOaiVerbosity, llmVndOaiWebSearchContext, llmVndOaiWebSearchGeolocation, llmVndOaiImageGeneration,
@@ -101,7 +99,6 @@ export function aixCreateModelFromLLMOptions(
...(hotfixOmitTemperature ? { temperature: null } : llmTemperature !== undefined ? { temperature: llmTemperature } : {}),
...(llmResponseTokens /* null: similar to undefined, will omit the value */ ? { maxTokens: llmResponseTokens } : {}),
...(llmTopP !== undefined ? { topP: llmTopP } : {}),
...(llmForceNoStream ? { forceNoStream: llmForceNoStream } : {}),
...(llmVndAntThinkingBudget !== undefined ? { vndAntThinkingBudget: llmVndAntThinkingBudget } : {}),
...(llmVndAnt1MContext ? { vndAnt1MContext: llmVndAnt1MContext } : {}),
...(llmVndAntSkills ? { vndAntSkills: llmVndAntSkills } : {}),
@@ -289,7 +286,7 @@ export async function aixChatGenerateText_Simple(
// Client-side late stage model HotFixes
const { shallDisableStreaming } = clientHotFixGenerateRequest_ApplyAll(llm.interfaces, aixChatGenerate, llmParameters.llmRef || llm.id);
if (shallDisableStreaming)
if (shallDisableStreaming || aixModel.forceNoStream)
aixStreaming = false;
@@ -455,7 +452,7 @@ export async function aixChatGenerateContent_DMessage<TServiceSettings extends o
// Client-side late stage model HotFixes
const { shallDisableStreaming } = clientHotFixGenerateRequest_ApplyAll(llm.interfaces, aixChatGenerate, llmParameters.llmRef || llm.id);
if (shallDisableStreaming)
if (shallDisableStreaming || aixModel.forceNoStream)
aixStreaming = false;
@@ -620,170 +617,158 @@ async function _aixChatGenerateContent_LL(
onGenerateContentUpdate?: (accumulator: AixChatGenerateContent_LL, isDone: boolean) => MaybePromise<void>,
): Promise<AixChatGenerateContent_LL> {
// Inspector support - can be requested by the client, but granted on the server side
const inspectorEnabled = getAixInspectorEnabled();
const inspectorContext = inspectorEnabled ? { contextName: aixContext.name, contextRef: aixContext.ref } : undefined;
/**
* FIXME: implement client selection of resumability - aixAccess option?
* For now we turn it on for Responses API for select kinds of request.
*/
const requestResumability = !!aixModel.vndOaiResponsesAPI &&
(['conversation', 'beam-scatter', 'beam-gather'] satisfies (AixAPI_Context_ChatGenerate['name'] | string)[]).includes(aixContext.name);
const aixConnectionOptions: AixAPI_ConnectionOptions_ChatGenerate = {
...inspectorEnabled && { debugDispatchRequest: true, debugProfilePerformance: true },
// FIXME: disabled until clearly working
// ...requestResumability && { enableResumability: true },
} as const;
// Aix Low-Level Chat Generation Accumulator
const accumulator_LL: AixChatGenerateContent_LL = {
fragments: [],
/* rest start as undefined (missing in reality) */
};
const sendContentUpdate = !onGenerateContentUpdate ? undefined : withDecimator(throttleParallelThreads ?? 0, 'aicChatGenerateContent', async () => {
/**
* We want the first update to have actual content.
* However note that we won't be sending out the model name very fast this way,
* but it's probably what we want because of the ParticleIndicators (VFX!)
*/
if (!accumulator_LL.fragments.length)
return;
await onGenerateContentUpdate(accumulator_LL, false);
});
// Retry/Reconnect - low-level state machine
const retry = new AixStreamRetry(0, 0);
let upstreamHandle: DMessageGenerator['upstreamHandle'];
/**
* DEBUG note: early we were filtering (aixContext.name === 'conversation'), but with the new debugger we don't
* - AIX inspector is now independent from sudo mode
* - every request thereafter both sends back the Aix server-side dispatch packet, and appends all the particles received by the client side
*/
const requestServerDebugging = getAixInspector();
const debugContext = !requestServerDebugging ? undefined : { contextName: aixContext.name, contextRef: aixContext.ref };
/**
* TODO: implement client selection of resumability.
* For now we turn it on for Responses API for select kinds of request.
*/
const requestResumability = (false as boolean) && !!aixModel.vndOaiResponsesAPI &&
(['conversation', 'beam-scatter', 'beam-gather'] satisfies (AixAPI_Context_ChatGenerate['name'] | string)[]).includes(aixContext.name);
while (true) {
/**
* Particles Reassembler.
* - uses this accumulator
* - calls a partial update callback with built-in decimation
* - optional. forwards particles to the debugger
* - abort will interrupt the fetch, and also the reassembly (for pieces coming still down the wire)
*/
const reassembler = new ContentReassembler(
accumulator_LL,
sendContentUpdate,
debugContext,
abortSignal,
);
const sendContentUpdate = !onGenerateContentUpdate ? undefined : withDecimator(throttleParallelThreads ?? 0, 'aicChatGenerateContent', async () => {
/**
* We want the first update to have actual content.
* However note that we won't be sending out the model name very fast this way,
* but it's probably what we want because of the ParticleIndicators (VFX!)
*/
if (!accumulator_LL.fragments.length)
return;
try {
// tRPC Aix Chat Generation (streaming) API - inside the try block for deployment path errors
const particleStream = await apiStream.aix.chatGenerateContent.mutate({
access: aixAccess,
model: aixModel,
chatGenerate: aixChatGenerate,
context: aixContext,
streaming: getLabsDevNoStreaming() ? false : aixStreaming, // [DEV] disable streaming if set in the UX (testing)
...((requestResumability || requestServerDebugging) && {
connectionOptions: {
...requestResumability && {
/**
* Request a resumable connection, if the model/service supports it.
*/
enableResumability: true,
},
...requestServerDebugging && {
/**
* Request an echo of the upstream AIX dispatch request.
* Fulfillment is decided by the server, and 'production' builds will NOT include 'headers', just the 'body'.
*/
debugDispatchRequest: true,
/**
* Request profiling data for a streaming call: time spent preparing, connecting, waiting, receiving, etc.
* Fulfillment is decided by the server, and won't be available on 'production' builds.
*/
debugProfilePerformance: true,
},
},
}),
}, {
signal: abortSignal,
await onGenerateContentUpdate(accumulator_LL, false);
});
/**
* Reassemble the particles by enqueueing them as they come in.
* Processing is done asynchronously and in batches.
*
* Workaround: we cannot use Asyncs insie the 'for...await' loop, as we'd get
* a 'closed connection' exception thrown when looping and a slow operation.
* Particles Reassembler.
* - uses this accumulator
* - calls a partial update callback with built-in decimation
* - optional. forwards particles to the debugger
* - abort will interrupt the fetch, and also the reassembly (for pieces coming still down the wire)
*/
for await (const particle of particleStream)
reassembler.enqueueWireParticle(particle);
const reassembler = new ContentReassembler(
accumulator_LL, // FIXME: TEMP: moved the accumulator outside to keep appending to it (recreating new ContentReassembler each retry)
sendContentUpdate,
inspectorContext,
abortSignal,
);
// stop the deadline decimator before the await, as we're done basically
sendContentUpdate?.stop?.();
try {
// synchronize any pending async tasks
await reassembler.waitForWireComplete();
// tRPC Aix Chat Generation (streaming) API - inside the try block for deployment path errors
const particleStream = upstreamHandle ?
await apiStream.aix.chatGenerateContentResume.mutate({
access: aixAccess,
resumeHandle: upstreamHandle,
context: aixContext,
streaming: true,
connectionOptions: aixConnectionOptions,
}, { signal: abortSignal }) :
await apiStream.aix.chatGenerateContent.mutate({
access: aixAccess,
model: aixModel,
chatGenerate: aixChatGenerate,
context: aixContext,
streaming: getLabsDevNoStreaming() ? false : aixStreaming, // [DEV] disable streaming if set in the UX (testing)
connectionOptions: aixConnectionOptions,
}, { signal: abortSignal });
} catch (error: any) {
/**
* Stream Consumption Loop - MUST be synchronous (no awaits).
*
* Critical: This loop only enqueues particles without awaiting processing.
* If we await async work here, tRPC closes the connection while we're blocked,
* causing "closed connection" exceptions when resuming. Processing happens in
* ContentReassembler's background promise chain.
*
* Error handling split:
* - This catch: tRPC/network errors (connection, stream, abort)
* - Reassembler catch: processing errors (malformed particles, async work)
*/
for await (const particle of particleStream)
reassembler.enqueueWireParticle(particle);
// stop the deadline decimator, as we're into error handling mode now
sendContentUpdate?.stop?.();
// stop the deadline decimator before the await, as we're done basically
sendContentUpdate?.stop?.();
// something else broke, likely a User Abort, or an Aix server error (e.g. tRPC)
const isUserAbort = abortSignal.aborted;
const isErrorAbort = (error instanceof Error) && (error.name === 'AbortError' || (error.cause instanceof DOMException && error.cause.name === 'AbortError'));
if (isUserAbort || isErrorAbort) {
if (isUserAbort !== isErrorAbort)
if (AIX_CLIENT_DEV_ASSERTS)
console.error(`[DEV] Aix streaming AbortError mismatch (${isUserAbort}, ${isErrorAbort})`, { error: error });
await reassembler.setClientAborted().catch(console.error /* never */);
} else {
// NOTE: this code path has also been almost replicated on `ContentReassembler.#processWireBacklog.catch() {...}`
if (AIX_CLIENT_DEV_ASSERTS)
console.error('[DEV] Aix streaming Error:', { error });
// synchronize any pending async tasks
await reassembler.waitForWireComplete();
// Special case: request too large: this is a TRPCClientError, and we can show a user-friendly message
let errorHandled = false;
if (error instanceof TRPCClientError) {
switch (error.cause?.message) {
/**
* The body of the response was "Request Entity Too Large".
* - this caused trpc, in ...stream/jsonl.ts, function createConsumerStream, to throw an error due to parsing the line as JSON
* - "const head = JSON.parse(line);"
* - as the error bubbles up to here, and cannot be handled by the superjson transformer either, which happens after this
*/
case `Unexpected token 'R', "Request En"... is not valid JSON`:
await reassembler.setClientExcepted(`**Request too large**: Your message or attachments exceed the 4.5MB limit of the Vercel edge network. Tip: use the cleanup button in the right pane to hide messages, remove large attachments or reduce conversation length.`).catch(console.error);
errorHandled = true;
break;
} catch (error: any) {
/**
* This happened many times in the past with captive portals and alike. Jet's just improve the messaging here.
*/
case `Unexpected token '<', "<!DOCTYPE "... is not valid JSON`:
await reassembler.setClientExcepted(`**Network issue**: The network returned an HTML page instead of expected data. This can be a WiFi signin page, a proxy or browser extension, or a temporary gateway error. Please **refresh and try again**, or check your connection and disable blockers. Additional details may be available in the browser console.`).catch(console.error);
errorHandled = true;
break;
// stop the deadline decimator, as we're into error handling mode now
sendContentUpdate?.stop?.();
// classify error
const { errorType, errorMessage } = aixClassifyStreamingError(error, abortSignal.aborted, !!accumulator_LL.fragments.length);
// retry decision - check for handle from either current iteration or previous
const hasUpstreamHandler = !!accumulator_LL.genUpstreamHandle || !!upstreamHandle;
const shallRetry = retry.shallRetry(errorType, hasUpstreamHandler);
if (!shallRetry) {
// NOT retryable: e.g. client-abort, or missing handle
if (errorType === 'client-aborted')
await reassembler.setClientAborted().catch(console.error /* never */);
else
await reassembler.setClientExcepted(errorMessage).catch(console.error);
// ... fall through (traditional single path)
} else {
// fragment-notify of our ongoing retry attempt
try {
await reassembler.setClientExcepted(`**Retrying** (${shallRetry.attemptNumber}) because: ${errorMessage}`);
await onGenerateContentUpdate?.(accumulator_LL, false /* partial */);
} catch (e) {
// .. ignore the notification error
}
}
// Special case: network error (TypeError) - when the client is disconnected (Vercel 5min timeout, Mobile timeout / disconnect, etc)
if (!errorHandled && (error instanceof TypeError) && error.message === 'network error') {
// await reassembler.setClientExcepted(`Network error: **connection interrupted**.`).catch(console.error);
await reassembler.setClientExcepted('An unexpected issue occurred: **network error**.').catch(console.error);
errorHandled = true;
}
// delay then RETRY
const delayResult = await AixStreamRetry.abortableDelay(shallRetry.delayMs, abortSignal);
if (delayResult === 'completed') {
if (accumulator_LL.genUpstreamHandle) upstreamHandle = accumulator_LL.genUpstreamHandle; // update only if fresher
retry.recordAttempt();
continue; // -> Loop
}
// user-aborted during retry-backoff
await reassembler.setClientAborted().catch(console.error);
// ... fall through (aborted during backoff)
// Only show the generic error if we haven't handled it specifically
if (!errorHandled) {
const showAsBold = !!accumulator_LL.fragments.length;
const errorText = (presentErrorToHumans(error, showAsBold, true) || 'Unknown error').replace('[TRPCClientError]', '');
await reassembler.setClientExcepted(`An unexpected error occurred: ${errorText} Please retry.`).catch(console.error /* never */);
}
}
// NOTE: sooner or later we fall through on this code path, maybe looped or not, maybe with good data or maybe with reassembled errors...
// and we're done
reassembler.finalizeAccumulator();
// final update bypasses decimation entirely and contains complete content
await onGenerateContentUpdate?.(accumulator_LL, true /* Last message, done */);
// return the final accumulated message
return accumulator_LL;
}
// and we're done
reassembler.finalizeAccumulator();
// final update bypasses decimation entirely and contains complete content
await onGenerateContentUpdate?.(accumulator_LL, true /* Last message, done */);
// return the final accumulated message
return accumulator_LL;
}