AIX: Refactor - Client - Retry State Machine

This commit is contained in:
Enrico Ros
2025-10-23 05:13:06 -07:00
parent fd0ab93744
commit 3667425c61
3 changed files with 121 additions and 63 deletions
+97 -41
View File
@@ -1,71 +1,127 @@
import type { DMessageGenerator } from '~/common/stores/chat/chat.message';
// configuration
const RETRY_PROFILES = {
network: { baseDelay: 500, maxDelay: 8000, jitter: 0.25 }, // Network interruptions
server: { baseDelay: 1000, maxDelay: 30000, jitter: 0.5 }, // Server overload: 429, 503, 502
transient: { baseDelay: 200, maxDelay: 2000, jitter: 0.1 }, // Quick transient errors: other 5xx
} as const;
// internal types
type AixRetryDecision = false | { strategy: AixRetryStrategy; delayMs: number; attemptNumber: number };
type AixRetryStrategy = 'resume' | 'reconnect';
type AixAbortableDelayResult = 'completed' | 'aborted';
type RetryDecision = false | { strategy: RetryStrategy; delayMs: number; attemptNumber: number };
type RetryStrategy = 'resume' | 'reconnect';
type StepResult = 'completed' | 'aborted';
type _ResumeHandle = DMessageGenerator['upstreamHandle'];
/**
* Minimal retry logic for AIX _aixChatGenerateContent_LL.
* - supports 'reconnect', for instance to loop while servers are busy
* - supports 'resume', for instance to recover a network breakage while a responses is still being generated
* Retry/resume logic for AIX _aixChatGenerateContent_LL.
* - supports 'reconnect' for server busy/overload scenarios with intelligent backoff
* - supports 'resume' for network interruptions with upstream handle management
*/
export class AixStreamRetry {
private attempts = 0;
private mAttempts = 0;
private mResumeHandle: _ResumeHandle;
constructor(
private readonly maxResumeAttempts = 4,
private readonly maxReconnectAttempts = 0, // disabled by default
) {}
private readonly maxReconnectAttempts = 0,
private readonly maxResumeAttempts = 0,
) {
}
/** Determines if error is retryable and returns strategy + delay. */
shallRetry(errorType: 'client-aborted' | 'net-disconnected' | 'request-exceeded' | 'response-captive' | 'net-unknown', hasUpstreamHandle: boolean): AixRetryDecision {
// resume handle management
// only retry supported errors
const supportedErrors = [
'net-disconnected',
// 'net-unknown', // NOTE: we only support a hard disconnect for now, for safety
// 'client-aborted', 'request-exceeded', 'response-captive' // absolutely not retryable
];
if (!supportedErrors.includes(errorType))
return false;
get resumeHandle() {
return this.mResumeHandle;
}
// determine strategy
const strategy: AixRetryStrategy = hasUpstreamHandle ? 'resume' : 'reconnect';
set resumeHandle(handle: _ResumeHandle | undefined) {
if (!handle) return;
this.mResumeHandle = handle;
}
/**
* Determines if error is retryable and returns strategy + delay
*/
shallRetry(errorType: 'client-aborted' | 'net-disconnected' | 'request-exceeded' | 'response-captive' | 'net-unknown', maybeStatusCode?: number): RetryDecision {
// determine strategy - based on availability of resume handle
const strategy: RetryStrategy = this.mResumeHandle ? 'resume' : 'reconnect';
const maxAttempts = strategy === 'resume' ? this.maxResumeAttempts : this.maxReconnectAttempts;
// check if we've exceeded max attempts
if (this.mAttempts >= maxAttempts)
return false;
// retry profile selection
let profile;
switch (errorType) {
// never retry these
case 'client-aborted':
case 'request-exceeded':
case 'response-captive':
return false;
case 'net-disconnected':
profile = RETRY_PROFILES.network; // Network disconnections are always retryable
break;
case 'net-unknown':
if (typeof maybeStatusCode !== 'number')
return false;
// unknown errors: check status code (if any) to determine if retryable
if (maybeStatusCode === 429 || maybeStatusCode === 503 || maybeStatusCode === 502)
profile = RETRY_PROFILES.server; // Server overload/unavailable - use longer backoff
else if (maybeStatusCode >= 500)
profile = RETRY_PROFILES.transient; // Other 5xx errors - quick retry with transient profile
else
return false;
break;
default:
const _exhaustiveCheck: never = errorType;
console.warn(`[DEV] AixStreamRetry.shallRetry: unhandled errorType '${errorType}'`);
return false;
}
// calculate delay with exponential backoff and jitter
let delayMs = Math.min(profile.baseDelay * Math.pow(2, this.mAttempts), profile.maxDelay);
// add jitter to prevent thundering herd
if (profile.jitter > 0) {
const jitterAmount = delayMs * profile.jitter * (Math.random() * 2 - 1);
delayMs = Math.max(Math.round(delayMs + jitterAmount), 1);
}
// returns the strategy if below max attempts
return this.attempts >= maxAttempts ? false : {
return {
strategy,
delayMs: 500 * Math.pow(2, this.attempts), // exponential backoff: 500ms, 1s, 2s, 4s
attemptNumber: this.attempts + 1, // 1-based for external use
delayMs,
attemptNumber: this.mAttempts + 1, // 1-based for external use
};
}
recordAttempt(): void {
this.attempts++;
}
// isRetrying(): boolean {
// return this.attempts > 0;
// }
// getCurrentAttempt(): number {
// return this.attempts;
// }
/**
* Abort-aware delay helper: waits for the specified delay or until abort signal fires.
* @returns 'completed' if delay finished, 'aborted' if cancelled
* Performs a delayed step with abort support.
*/
static async abortableDelay(delayMs: number, abortSignal: AbortSignal): Promise<AixAbortableDelayResult> {
async delayedStep(delayMs: number, abortSignal: AbortSignal): Promise<StepResult> {
if (abortSignal.aborted || delayMs <= 0) return delayMs === 0 ? 'completed' : 'aborted';
return await new Promise<AixAbortableDelayResult>((resolve) => {
return await new Promise<StepResult>((resolve) => {
const onAbort = () => {
clearTimeout(timer);
resolve('aborted');
};
const timer = setTimeout(() => {
abortSignal.removeEventListener('abort', onAbort);
// record the attempt only after successful delay completion
this.mAttempts++;
resolve('completed');
}, delayMs);
abortSignal.addEventListener('abort', onAbort, { once: true });
+23 -21
View File
@@ -6,6 +6,7 @@ import { DLLM, DLLMId, LLM_IF_HOTFIX_NoTemperature, LLM_IF_OAI_Responses, LLM_IF
import { DMetricsChatGenerate_Lg, metricsChatGenerateLgToMd, metricsComputeChatGenerateCostsMd } from '~/common/stores/metrics/metrics.chatgenerate';
import { DModelParameterValues, getAllModelParameterValues } from '~/common/stores/llms/llms.parameters';
import { apiStream } from '~/common/util/trpc.client';
import { capitalizeFirstLetter } from '~/common/util/textUtils';
import { createErrorContentFragment, DMessageContentFragment, DMessageErrorPart, DMessageVoidFragment, isContentFragment, isErrorPart } from '~/common/stores/chat/chat.fragments';
import { findLLMOrThrow } from '~/common/stores/llms/store-llms';
import { getAixInspectorEnabled } from '~/common/stores/store-ui';
@@ -643,9 +644,7 @@ async function _aixChatGenerateContent_LL(
// Retry/Reconnect - low-level state machine
const retry = new AixStreamRetry(0, 0);
let upstreamHandle: DMessageGenerator['upstreamHandle'];
const rsm = new AixStreamRetry(0, 0);
while (true) {
@@ -677,15 +676,9 @@ async function _aixChatGenerateContent_LL(
try {
// tRPC Aix Chat Generation (streaming) API - inside the try block for deployment path errors
const particleStream = upstreamHandle ?
await apiStream.aix.chatGenerateContentResume.mutate({
access: aixAccess,
resumeHandle: upstreamHandle,
context: aixContext,
streaming: true,
connectionOptions: aixConnectionOptions,
}, { signal: abortSignal }) :
const particleStream = !rsm.resumeHandle ?
// AIX tRPC Streaming Generation from Chat input
await apiStream.aix.chatGenerateContent.mutate({
access: aixAccess,
model: aixModel,
@@ -693,6 +686,15 @@ async function _aixChatGenerateContent_LL(
context: aixContext,
streaming: getLabsDevNoStreaming() ? false : aixStreaming, // [DEV] disable streaming if set in the UX (testing)
connectionOptions: aixConnectionOptions,
}, { signal: abortSignal }) :
// AIX tRPC Streaming re-attachment from handle - for low-level auto-resume
await apiStream.aix.reattachContent.mutate({
access: aixAccess,
resumeHandle: rsm.resumeHandle,
context: aixContext,
streaming: true,
connectionOptions: aixConnectionOptions,
}, { signal: abortSignal });
/**
@@ -721,12 +723,15 @@ async function _aixChatGenerateContent_LL(
// stop the deadline decimator, as we're into error handling mode now
sendContentUpdate?.stop?.();
// store the resume handle, if got one
if (accumulator_LL.genUpstreamHandle) rsm.resumeHandle = accumulator_LL.genUpstreamHandle;
// classify error
const { errorType, errorMessage } = aixClassifyStreamingError(error, abortSignal.aborted, !!accumulator_LL.fragments.length);
const maybeErrorStatusCode = error?.status || error?.response?.status || undefined;
// retry decision - check for handle from either current iteration or previous
const hasUpstreamHandler = !!accumulator_LL.genUpstreamHandle || !!upstreamHandle;
const shallRetry = retry.shallRetry(errorType, hasUpstreamHandler);
// retry decision
const shallRetry = rsm.shallRetry(errorType, maybeErrorStatusCode);
if (!shallRetry) {
// NOT retryable: e.g. client-abort, or missing handle
@@ -740,19 +745,16 @@ async function _aixChatGenerateContent_LL(
// fragment-notify of our ongoing retry attempt
try {
await reassembler.setClientExcepted(`**Retrying** (${shallRetry.attemptNumber}) because: ${errorMessage}`);
await reassembler.setClientExcepted(`**${capitalizeFirstLetter(shallRetry.strategy)}** (attempt ${shallRetry.attemptNumber}) in ${Math.round(shallRetry.delayMs / 1000)}s: ${errorMessage}`);
await onGenerateContentUpdate?.(accumulator_LL, false /* partial */);
} catch (e) {
// .. ignore the notification error
}
// delay then RETRY
const delayResult = await AixStreamRetry.abortableDelay(shallRetry.delayMs, abortSignal);
if (delayResult === 'completed') {
if (accumulator_LL.genUpstreamHandle) upstreamHandle = accumulator_LL.genUpstreamHandle; // update only if fresher
retry.recordAttempt();
const stepResult = await rsm.delayedStep(shallRetry.delayMs, abortSignal);
if (stepResult === 'completed')
continue; // -> Loop
}
// user-aborted during retry-backoff
await reassembler.setClientAborted().catch(console.error);
+1 -1
View File
@@ -323,7 +323,7 @@ export const aixRouter = createTRPCRouter({
* Chat content generation RESUME, streaming only.
* Reconnects to an in-progress response by its ID - OpenAI Responses API only.
*/
chatGenerateContentResume: publicProcedure
reattachContent: publicProcedure
.input(z.object({
access: AixWire_API.Access_schema,
resumeHandle: AixWire_API.ResumeHandle_schema, // resume has a handle instead of 'model + chatGenerate'