From 77cd659b392cd391053b84e6e1359ffc27de8be3 Mon Sep 17 00:00:00 2001 From: Enrico Ros Date: Fri, 7 Nov 2025 12:03:12 -0800 Subject: [PATCH] AIX: support operation-level retrier with reassembly wipe #869 --- src/modules/aix/client/ContentReassembler.ts | 20 +++++++++++++++++++ src/modules/aix/server/api/aix.wiretypes.ts | 1 + .../chatGenerate/chatGenerate.retrier.ts | 6 +++++- 3 files changed, 26 insertions(+), 1 deletion(-) diff --git a/src/modules/aix/client/ContentReassembler.ts b/src/modules/aix/client/ContentReassembler.ts index 500424224..5eca788c6 100644 --- a/src/modules/aix/client/ContentReassembler.ts +++ b/src/modules/aix/client/ContentReassembler.ts @@ -256,6 +256,9 @@ export class ContentReassembler { case 'issue': this.onCGIssue(op); break; + case 'retry-reset': + this.onRetryReset(op); + break; case 'set-metrics': this.onMetrics(op); break; @@ -658,6 +661,23 @@ export class ContentReassembler { this.currentTextFragmentIndex = null; } + private onRetryReset({ attempt, maxAttempts, delayMs, reason, shallClear }: Extract): void { + // operation-level retry likely requires a wipe + if (shallClear) { + this.currentTextFragmentIndex = null; + this.accumulator.fragments = []; + delete this.accumulator.genTokenStopReason; + // keep metrics/model/handle intact - may be useful for debugging retries + + // discard any pending particles from the failed attempt + this.wireParticlesBacklog.length = 0; + } + + // -> ph: show operation-level retry status + const retryMessage = `Retrying [${attempt}/${maxAttempts}] in ${Math.round(delayMs / 1000)}s - ${reason}`; + this.accumulator.fragments.push(createPlaceholderVoidFragment(retryMessage, 'ec-retry-srv-op')); + } + private onMetrics({ metrics }: Extract): void { // type check point for AixWire_Particles.CGSelectMetrics -> DMetricsChatGenerate_Lg this.accumulator.genMetricsLg = metrics; diff --git a/src/modules/aix/server/api/aix.wiretypes.ts b/src/modules/aix/server/api/aix.wiretypes.ts index 69cccb695..e84c44577 100644 --- a/src/modules/aix/server/api/aix.wiretypes.ts +++ b/src/modules/aix/server/api/aix.wiretypes.ts @@ -573,6 +573,7 @@ export namespace AixWire_Particles { // | { cg: 'start' } // not really used for now | { cg: 'end', reason: CGEndReason, tokenStopReason: GCTokenStopReason } | { cg: 'issue', issueId: CGIssueId, issueText: string } + | { cg: 'retry-reset', attempt: number, maxAttempts: number, delayMs: number, reason: string, shallClear: boolean } | { cg: 'set-metrics', metrics: CGSelectMetrics } | { cg: 'set-model', name: string } | { cg: 'set-upstream-handle', handle: { uht: 'vnd.oai.responses', responseId: string, expiresAt: number | null } } diff --git a/src/modules/aix/server/dispatch/chatGenerate/chatGenerate.retrier.ts b/src/modules/aix/server/dispatch/chatGenerate/chatGenerate.retrier.ts index 9b9ffe6d7..e3fb2ac45 100644 --- a/src/modules/aix/server/dispatch/chatGenerate/chatGenerate.retrier.ts +++ b/src/modules/aix/server/dispatch/chatGenerate/chatGenerate.retrier.ts @@ -19,6 +19,7 @@ const AIX_DEBUG_OPERATION_RETRY = true; // prints the execution retries */ export class RequestRetryError extends Error { override readonly name = 'RequestRetryError'; + constructor(message: string) { super(message); Object.setPrototypeOf(this, RequestRetryError.prototype); @@ -64,7 +65,7 @@ export async function* executeChatGenerateWithRetry( throw error; // unexpected } - // sanity: exhausted attempts - must be a Parser error + // sanity: exhausted attempts - must be a Parser error - as it shall have not thrown in this case if (attemptNumber >= maxAttempts) { if (AIX_DEBUG_OPERATION_RETRY) console.warn(`[operation.retrier] ⚠️ Retry error on final attempt (parser bug?) - ${error?.message || error}`); @@ -78,6 +79,9 @@ export async function* executeChatGenerateWithRetry( attemptNumber++; + // Emit retry-reset particle to signal client to clear accumulator and show placeholder + yield { cg: 'retry-reset', attempt: attemptNumber, maxAttempts: maxAttempts, delayMs: delayMs, reason: error?.message || 'Unknown retry error', shallClear: true }; + // If aborted during delay, let next attempt detect it and create proper terminating particle // (throwing here would bypass executor's particle-based messaging contract) await abortableDelay(delayMs, abortSignal);