From 3667425c613462428636e80e24efeb7d1feb7552 Mon Sep 17 00:00:00 2001 From: Enrico Ros Date: Thu, 23 Oct 2025 05:13:06 -0700 Subject: [PATCH] AIX: Refactor - Client - Retry State Machine --- src/modules/aix/client/aix.client.retry.ts | 138 +++++++++++++++------ src/modules/aix/client/aix.client.ts | 44 +++---- src/modules/aix/server/api/aix.router.ts | 2 +- 3 files changed, 121 insertions(+), 63 deletions(-) diff --git a/src/modules/aix/client/aix.client.retry.ts b/src/modules/aix/client/aix.client.retry.ts index 2e6556b69..4c5817f92 100644 --- a/src/modules/aix/client/aix.client.retry.ts +++ b/src/modules/aix/client/aix.client.retry.ts @@ -1,71 +1,127 @@ +import type { DMessageGenerator } from '~/common/stores/chat/chat.message'; + + +// configuration +const RETRY_PROFILES = { + network: { baseDelay: 500, maxDelay: 8000, jitter: 0.25 }, // Network interruptions + server: { baseDelay: 1000, maxDelay: 30000, jitter: 0.5 }, // Server overload: 429, 503, 502 + transient: { baseDelay: 200, maxDelay: 2000, jitter: 0.1 }, // Quick transient errors: other 5xx +} as const; + + // internal types -type AixRetryDecision = false | { strategy: AixRetryStrategy; delayMs: number; attemptNumber: number }; -type AixRetryStrategy = 'resume' | 'reconnect'; -type AixAbortableDelayResult = 'completed' | 'aborted'; +type RetryDecision = false | { strategy: RetryStrategy; delayMs: number; attemptNumber: number }; +type RetryStrategy = 'resume' | 'reconnect'; +type StepResult = 'completed' | 'aborted'; + +type _ResumeHandle = DMessageGenerator['upstreamHandle']; /** - * Minimal retry logic for AIX _aixChatGenerateContent_LL. - * - supports 'reconnect', for instance to loop while servers are busy - * - supports 'resume', for instance to recover a network breakage while a responses is still being generated + * Retry/resume logic for AIX _aixChatGenerateContent_LL. + * - supports 'reconnect' for server busy/overload scenarios with intelligent backoff + * - supports 'resume' for network interruptions with upstream handle management */ export class AixStreamRetry { - private attempts = 0; + + private mAttempts = 0; + private mResumeHandle: _ResumeHandle; constructor( - private readonly maxResumeAttempts = 4, - private readonly maxReconnectAttempts = 0, // disabled by default - ) {} + private readonly maxReconnectAttempts = 0, + private readonly maxResumeAttempts = 0, + ) { + } - /** Determines if error is retryable and returns strategy + delay. */ - shallRetry(errorType: 'client-aborted' | 'net-disconnected' | 'request-exceeded' | 'response-captive' | 'net-unknown', hasUpstreamHandle: boolean): AixRetryDecision { + // resume handle management - // only retry supported errors - const supportedErrors = [ - 'net-disconnected', - // 'net-unknown', // NOTE: we only support a hard disconnect for now, for safety - // 'client-aborted', 'request-exceeded', 'response-captive' // absolutely not retryable - ]; - if (!supportedErrors.includes(errorType)) - return false; + get resumeHandle() { + return this.mResumeHandle; + } - // determine strategy - const strategy: AixRetryStrategy = hasUpstreamHandle ? 'resume' : 'reconnect'; + set resumeHandle(handle: _ResumeHandle | undefined) { + if (!handle) return; + this.mResumeHandle = handle; + } + + /** + * Determines if error is retryable and returns strategy + delay + */ + shallRetry(errorType: 'client-aborted' | 'net-disconnected' | 'request-exceeded' | 'response-captive' | 'net-unknown', maybeStatusCode?: number): RetryDecision { + + // determine strategy - based on availability of resume handle + const strategy: RetryStrategy = this.mResumeHandle ? 'resume' : 'reconnect'; const maxAttempts = strategy === 'resume' ? this.maxResumeAttempts : this.maxReconnectAttempts; + // check if we've exceeded max attempts + if (this.mAttempts >= maxAttempts) + return false; + + + // retry profile selection + let profile; + switch (errorType) { + + // never retry these + case 'client-aborted': + case 'request-exceeded': + case 'response-captive': + return false; + + case 'net-disconnected': + profile = RETRY_PROFILES.network; // Network disconnections are always retryable + break; + + case 'net-unknown': + if (typeof maybeStatusCode !== 'number') + return false; + + // unknown errors: check status code (if any) to determine if retryable + if (maybeStatusCode === 429 || maybeStatusCode === 503 || maybeStatusCode === 502) + profile = RETRY_PROFILES.server; // Server overload/unavailable - use longer backoff + else if (maybeStatusCode >= 500) + profile = RETRY_PROFILES.transient; // Other 5xx errors - quick retry with transient profile + else + return false; + break; + + default: + const _exhaustiveCheck: never = errorType; + console.warn(`[DEV] AixStreamRetry.shallRetry: unhandled errorType '${errorType}'`); + return false; + } + + // calculate delay with exponential backoff and jitter + let delayMs = Math.min(profile.baseDelay * Math.pow(2, this.mAttempts), profile.maxDelay); + + // add jitter to prevent thundering herd + if (profile.jitter > 0) { + const jitterAmount = delayMs * profile.jitter * (Math.random() * 2 - 1); + delayMs = Math.max(Math.round(delayMs + jitterAmount), 1); + } + // returns the strategy if below max attempts - return this.attempts >= maxAttempts ? false : { + return { strategy, - delayMs: 500 * Math.pow(2, this.attempts), // exponential backoff: 500ms, 1s, 2s, 4s - attemptNumber: this.attempts + 1, // 1-based for external use + delayMs, + attemptNumber: this.mAttempts + 1, // 1-based for external use }; } - recordAttempt(): void { - this.attempts++; - } - - // isRetrying(): boolean { - // return this.attempts > 0; - // } - - // getCurrentAttempt(): number { - // return this.attempts; - // } - /** - * Abort-aware delay helper: waits for the specified delay or until abort signal fires. - * @returns 'completed' if delay finished, 'aborted' if cancelled + * Performs a delayed step with abort support. */ - static async abortableDelay(delayMs: number, abortSignal: AbortSignal): Promise { + async delayedStep(delayMs: number, abortSignal: AbortSignal): Promise { if (abortSignal.aborted || delayMs <= 0) return delayMs === 0 ? 'completed' : 'aborted'; - return await new Promise((resolve) => { + return await new Promise((resolve) => { const onAbort = () => { clearTimeout(timer); resolve('aborted'); }; const timer = setTimeout(() => { abortSignal.removeEventListener('abort', onAbort); + // record the attempt only after successful delay completion + this.mAttempts++; resolve('completed'); }, delayMs); abortSignal.addEventListener('abort', onAbort, { once: true }); diff --git a/src/modules/aix/client/aix.client.ts b/src/modules/aix/client/aix.client.ts index ad7568ad6..4851b3c88 100644 --- a/src/modules/aix/client/aix.client.ts +++ b/src/modules/aix/client/aix.client.ts @@ -6,6 +6,7 @@ import { DLLM, DLLMId, LLM_IF_HOTFIX_NoTemperature, LLM_IF_OAI_Responses, LLM_IF import { DMetricsChatGenerate_Lg, metricsChatGenerateLgToMd, metricsComputeChatGenerateCostsMd } from '~/common/stores/metrics/metrics.chatgenerate'; import { DModelParameterValues, getAllModelParameterValues } from '~/common/stores/llms/llms.parameters'; import { apiStream } from '~/common/util/trpc.client'; +import { capitalizeFirstLetter } from '~/common/util/textUtils'; import { createErrorContentFragment, DMessageContentFragment, DMessageErrorPart, DMessageVoidFragment, isContentFragment, isErrorPart } from '~/common/stores/chat/chat.fragments'; import { findLLMOrThrow } from '~/common/stores/llms/store-llms'; import { getAixInspectorEnabled } from '~/common/stores/store-ui'; @@ -643,9 +644,7 @@ async function _aixChatGenerateContent_LL( // Retry/Reconnect - low-level state machine - const retry = new AixStreamRetry(0, 0); - let upstreamHandle: DMessageGenerator['upstreamHandle']; - + const rsm = new AixStreamRetry(0, 0); while (true) { @@ -677,15 +676,9 @@ async function _aixChatGenerateContent_LL( try { - // tRPC Aix Chat Generation (streaming) API - inside the try block for deployment path errors - const particleStream = upstreamHandle ? - await apiStream.aix.chatGenerateContentResume.mutate({ - access: aixAccess, - resumeHandle: upstreamHandle, - context: aixContext, - streaming: true, - connectionOptions: aixConnectionOptions, - }, { signal: abortSignal }) : + const particleStream = !rsm.resumeHandle ? + + // AIX tRPC Streaming Generation from Chat input await apiStream.aix.chatGenerateContent.mutate({ access: aixAccess, model: aixModel, @@ -693,6 +686,15 @@ async function _aixChatGenerateContent_LL( context: aixContext, streaming: getLabsDevNoStreaming() ? false : aixStreaming, // [DEV] disable streaming if set in the UX (testing) connectionOptions: aixConnectionOptions, + }, { signal: abortSignal }) : + + // AIX tRPC Streaming re-attachment from handle - for low-level auto-resume + await apiStream.aix.reattachContent.mutate({ + access: aixAccess, + resumeHandle: rsm.resumeHandle, + context: aixContext, + streaming: true, + connectionOptions: aixConnectionOptions, }, { signal: abortSignal }); /** @@ -721,12 +723,15 @@ async function _aixChatGenerateContent_LL( // stop the deadline decimator, as we're into error handling mode now sendContentUpdate?.stop?.(); + // store the resume handle, if got one + if (accumulator_LL.genUpstreamHandle) rsm.resumeHandle = accumulator_LL.genUpstreamHandle; + // classify error const { errorType, errorMessage } = aixClassifyStreamingError(error, abortSignal.aborted, !!accumulator_LL.fragments.length); + const maybeErrorStatusCode = error?.status || error?.response?.status || undefined; - // retry decision - check for handle from either current iteration or previous - const hasUpstreamHandler = !!accumulator_LL.genUpstreamHandle || !!upstreamHandle; - const shallRetry = retry.shallRetry(errorType, hasUpstreamHandler); + // retry decision + const shallRetry = rsm.shallRetry(errorType, maybeErrorStatusCode); if (!shallRetry) { // NOT retryable: e.g. client-abort, or missing handle @@ -740,19 +745,16 @@ async function _aixChatGenerateContent_LL( // fragment-notify of our ongoing retry attempt try { - await reassembler.setClientExcepted(`**Retrying** (${shallRetry.attemptNumber}) because: ${errorMessage}`); + await reassembler.setClientExcepted(`**${capitalizeFirstLetter(shallRetry.strategy)}** (attempt ${shallRetry.attemptNumber}) in ${Math.round(shallRetry.delayMs / 1000)}s: ${errorMessage}`); await onGenerateContentUpdate?.(accumulator_LL, false /* partial */); } catch (e) { // .. ignore the notification error } // delay then RETRY - const delayResult = await AixStreamRetry.abortableDelay(shallRetry.delayMs, abortSignal); - if (delayResult === 'completed') { - if (accumulator_LL.genUpstreamHandle) upstreamHandle = accumulator_LL.genUpstreamHandle; // update only if fresher - retry.recordAttempt(); + const stepResult = await rsm.delayedStep(shallRetry.delayMs, abortSignal); + if (stepResult === 'completed') continue; // -> Loop - } // user-aborted during retry-backoff await reassembler.setClientAborted().catch(console.error); diff --git a/src/modules/aix/server/api/aix.router.ts b/src/modules/aix/server/api/aix.router.ts index 0da722228..d59ce1884 100644 --- a/src/modules/aix/server/api/aix.router.ts +++ b/src/modules/aix/server/api/aix.router.ts @@ -323,7 +323,7 @@ export const aixRouter = createTRPCRouter({ * Chat content generation RESUME, streaming only. * Reconnects to an in-progress response by its ID - OpenAI Responses API only. */ - chatGenerateContentResume: publicProcedure + reattachContent: publicProcedure .input(z.object({ access: AixWire_API.Access_schema, resumeHandle: AixWire_API.ResumeHandle_schema, // resume has a handle instead of 'model + chatGenerate'