AIX: Core: immutable structural sharing, typed outcome, improve layering

Refactor the AIX client streaming pipeline for Zustand-style immutability
and clean outcome classification, preparing for the agentic execution layer.

ContentReassembler:
- All fragment mutations go through _pushFragment/_replaceFragmentAt/_spliceFragment
  (new array refs per update, no in-place mutation)
- Generator fields (genModelName, genProviderInfraLabel, genUpstreamHandle,
  legacyGenTokenStopReason) consolidated into a single `generator` object,
  replaced immutably when particles arrive
- _classifyTermination() replaces _deriveTokenStopReasonOrAppendError() -
  pure function returning { outcome, tsr, errorMessage }
- finalizeReassembly() returns AixChatGenerateContent_LL_Result (extends
  streaming type with outcome + cgMetricsLg) instead of void
- Initial state snapshot for full reset (replaces initialGenerator field)

Type system:
- AixChatGenerateContent_LL: streaming-only (fragments + generator)
- AixChatGenerateContent_LL_Result: extends LL with outcome + cgMetricsLg
- AixChatGenerateTerminal_LL: 'completed' | 'failed' | 'aborted'
- Outcome flows LL -> L2 -> L3 without leaking into DMessage/stores
- Unified vocabulary throughout (no more success/errored mapping)

LL streaming loop:
- Restructured with break/continue for guaranteed finalizeReassembly()
- Drain in-flight processing before retry/terminal decisions
- Abort-during-retry-backoff surfaces original error (not 'aborted')
- Retryable path first, terminal fallthrough

Callers:
- Remove structuredClone() calls (structural sharing makes them unnecessary)
- Spread fragments/generator directly into stores
This commit is contained in:
Enrico Ros
2026-04-06 12:52:51 -07:00
parent 1772db5e98
commit 90ccb64bd0
7 changed files with 544 additions and 524 deletions
+2 -2
View File
@@ -250,13 +250,13 @@ export function Telephone(props: {
if (messageWasInterruptedAtStart(status.lastDMessage))
return;
// whether status.outcome === 'success' or not, we get a valid DMessage, eventually with Error Fragments inside
// whether status.outcome === 'completed' or not, we get a valid DMessage, eventually with Error Fragments inside
const fullMessage = createDMessageFromFragments('assistant', status.lastDMessage.fragments);
fullMessage.generator = status.lastDMessage.generator;
setCallMessages(messages => [...messages, fullMessage]); // [state] append assistant:call_response
// fire/forget - use 'fast' priority for real-time conversation
if (status.outcome === 'success' && finalText?.length >= 1)
if (status.outcome === 'completed' && finalText?.length >= 1)
void speakText(finalText,
undefined,
{ label: 'Call', priority: 'fast' },
+9 -10
View File
@@ -78,15 +78,14 @@ export async function runPersonaOnConversationHead(
// if (abortController.signal.aborted)
// console.warn('runPersonaOnConversationHead: Aborted', { conversationId, assistantLlmId, messageOverwrite });
// deep copy the object to avoid partial updates
let deepCopy = structuredClone(messageOverwrite);
// fragments and generator are already immutable (new refs per update) - no deep clone needed
const { fragments, ...rest } = messageOverwrite;
// [Cosmetic Logic] if the content hasn't come yet, don't replace the fragments to still show the placeholder
if (!messageComplete && deepCopy.pendingIncomplete && deepCopy.fragments?.length === 0)
delete (deepCopy as any).fragments;
const includeFragments = !!fragments?.length || messageComplete || !messageOverwrite.pendingIncomplete;
// update the message
cHandler.messageEdit(assistantMessageId, deepCopy, messageComplete, false);
cHandler.messageEdit(assistantMessageId, { ...(includeFragments && { fragments }), ...rest }, messageComplete, false);
// if requested, speak the message
autoSpeaker?.handleMessage(messageOverwrite, messageComplete);
@@ -97,12 +96,12 @@ export async function runPersonaOnConversationHead(
);
// final message update (needed only in case of error)
const lastDeepCopy = structuredClone(messageStatus.lastDMessage);
if (messageStatus.outcome === 'errored')
cHandler.messageEdit(assistantMessageId, lastDeepCopy, true, false);
const lastDMessage = messageStatus.lastDMessage;
if (messageStatus.outcome === 'failed')
cHandler.messageEdit(assistantMessageId, lastDMessage, true, false);
// special case: if the last message was aborted and had no content, delete it
if (messageWasInterruptedAtStart(lastDeepCopy)) {
if (messageWasInterruptedAtStart(lastDMessage)) {
cHandler.messagesDelete([assistantMessageId]);
// NOTE: ok to exit here, as the abort was already done
return false;
@@ -136,5 +135,5 @@ export async function runPersonaOnConversationHead(
cHandler.historyStripThinking(0);
// return true if this succeeded
return messageStatus.outcome === 'success';
return messageStatus.outcome === 'completed';
}
+2 -36
View File
@@ -280,45 +280,11 @@ export function messageWasInterruptedAtStart(message: Pick<DMessage, 'generator'
// helpers - generators
export function messageSetGenerator(message: Pick<DMessage, 'generator'>, generator: undefined | DMessageGenerator): void {
if (generator !== undefined)
message.generator = generator;
else
delete message.generator;
}
export function messageSetGeneratorNamed(message: Pick<DMessage, 'generator'>, label: 'web' | 'issue' | 'help' | string): void {
message.generator = {
mgt: 'named',
name: label,
};
}
function _messageSetGeneratorAIX(message: Pick<DMessage, 'generator'>, modelLabel: string, modelVendorId: ModelVendorId, modelId: DLLMId): void {
message.generator = {
mgt: 'aix',
name: modelLabel,
aix: {
vId: modelVendorId,
mId: modelId,
},
};
}
export function messageSetGeneratorAIX_AutoLabel(message: Pick<DMessage, 'generator'>, modelVendorId: ModelVendorId, modelId: DLLMId): void {
// Strip the serviceId prefix: 'vendor-' or 'vendor-N-' (when multiple providers of same vendor)
export function createGeneratorAIX_AutoLabel(modelVendorId: ModelVendorId, modelId: DLLMId): DMessageGenerator {
const heuristicLabel = modelId.includes('-') ? modelId.replace(/^[^-]+-(\d-)?/, '') : modelId;
_messageSetGeneratorAIX(message, heuristicLabel, modelVendorId, modelId);
return { mgt: 'aix', name: heuristicLabel, aix: { vId: modelVendorId, mId: modelId } };
}
/*export function messageUpdateGeneratorInfo(message: Pick<DMessage, 'generator'>, metrics?: DMetricsChatGenerate_Md, tokenStopReason?: DMessageGenerator['tokenStopReason']): void {
if (!message.generator) return;
if (metrics) message.generator.metrics = metrics;
if (tokenStopReason) message.generator.tokenStopReason = tokenStopReason;
}*/
// helpers - user flags
+307 -263
View File
@@ -14,7 +14,7 @@ import type { AixWire_Particles } from '../server/api/aix.wiretypes';
import type { AixClientDebugger, AixFrameId } from './debugger/memstore-aix-client-debugger';
import { aixClientDebugger_completeFrame, aixClientDebugger_init, aixClientDebugger_recordParticleReceived, aixClientDebugger_setProfilerMeasurements, aixClientDebugger_setRequest } from './debugger/reassembler-debug';
import { AixChatGenerateContent_LL, DEBUG_PARTICLES } from './aix.client';
import { AixChatGenerateContent_LL, AixChatGenerateContent_LL_Result, AixChatGenerateTerminal_LL, DEBUG_PARTICLES } from './aix.client';
import { aixClassifyReassemblyError } from './aix.client.errors';
@@ -36,37 +36,22 @@ const VP_PERSISTENCE_DELAY = 500; // persistence of vision for voidPlaceholders
/**
* Extended accumulator - adds reassembly-internal state to the output accumulator so that
* checkpointing/restore is atomic. The `_`-prefixed fields are internal to ContentReassembler;
* external code should treat this as AixChatGenerateContent_LL (structural subtype).
* Internal reassembly state - extends the streaming type with reassembly-internal fields.
* Checkpointing/restore is atomic over this entire object.
* External code sees only AixChatGenerateContent_LL (structural subtype).
*/
type ReassemblerAccumulator = AixChatGenerateContent_LL & {
type ReassemblyState = AixChatGenerateContent_LL & {
// reassembly-internal fields
/** Cursor: index of the open text fragment for appending, or null if none is open */
_textFragmentIndex: number | null;
/** Raw termination data from wire or client-side - classified at finalization */
_terminationReason: 'done-client-aborted' | 'issue-client-rpc' | AixWire_Particles.CGEndReason | undefined;
/** Raw token stop reason from the wire `end` particle */
_tokenStopReasonWire: AixWire_Particles.GCTokenStopReason | undefined;
/** set/overwritten during streaming, consumed by finalizeReassembly() */
cgMetricsLg: undefined | AixChatGenerateContent_LL_Result['cgMetricsLg'];
/** Raw termination cause: undetermined yet, client-set, or received from the wire on {cg:'end'} */
terminationReason: undefined | 'done-client-aborted' | 'issue-client-rpc' | AixWire_Particles.CGEndReason;
/** Raw token stop reason: undetermined yet or received from the wire on {cg:'end'} */
dialectStopReason: undefined | AixWire_Particles.GCTokenStopReason;
};
/** Single source of truth for the initial/blank accumulator state - all fields explicit. */
function _createEmptyAccumulatorState(): ReassemblerAccumulator {
return {
// AixChatGenerateContent_LL fields
fragments: [],
genMetricsLg: undefined,
genModelName: undefined,
genProviderInfraLabel: undefined,
genUpstreamHandle: undefined,
legacyGenTokenStopReason: undefined,
// reassembly-internal fields
_textFragmentIndex: null,
_terminationReason: undefined,
_tokenStopReasonWire: undefined,
};
}
/**
* Reassembles the content fragments and more information from the Aix ChatGenerate Particles.
@@ -81,30 +66,43 @@ export class ContentReassembler {
private isProcessing = false;
private processingPromise = Promise.resolve();
// owned accumulation state - coherent and with checkpointing support
readonly accumulator: ReassemblerAccumulator = _createEmptyAccumulatorState();
private checkpointSnapshot: undefined | ReassemblerAccumulator;
// owned reassembly state - coherent and with checkpointing support
readonly S: ReassemblyState;
private checkpointState?: ReassemblyState; // for continuation reset
private readonly initialState: ReassemblyState; // for full reset
// settable per-iteration callback
private onAccumulatorUpdated?: (accumulator: AixChatGenerateContent_LL, hasContent: boolean) => MaybePromise<void>;
private onStreamingUpdate?: (accumulator: AixChatGenerateContent_LL, hasContent: boolean) => MaybePromise<void>;
private updateContentStarted = false; // true (forever) after the first update with content, even if we have resets/continuations in the future
constructor(
inspectorTransport?: AixClientDebugger.Transport,
inspectorContext?: AixClientDebugger.Context,
initialGenerator: DMessageGenerator,
aiInspectorTransport: undefined | AixClientDebugger.Transport,
aiInspectorContext: undefined | AixClientDebugger.Context,
private readonly skipImageCompression?: boolean,
private readonly wireAbortSignal?: AbortSignal,
private readonly onInlineAudio?: (audio: { blob: Blob; mimeType: string; label: string; durationMs?: number }) => void,
) {
this.initialState = {
// AixChatGenerateContent_LL fields:
fragments: [],
generator: initialGenerator,
// reassembly-internal fields:
_textFragmentIndex: null,
cgMetricsLg: undefined,
terminationReason: undefined,
dialectStopReason: undefined,
};
this.S = { ...this.initialState }; // we trust the rest of the code to never mutate, always replace
// [AI Inspector] Debugging the request, last-write-wins for the global (displayed in the UI)
this.debuggerFrameId = !inspectorContext ? null : aixClientDebugger_init(inspectorTransport ?? 'trpc', inspectorContext);
this.debuggerFrameId = aiInspectorTransport && aiInspectorContext ? aixClientDebugger_init(aiInspectorTransport, aiInspectorContext) : null;
}
set updateCallback(callback: typeof this.onAccumulatorUpdated) {
this.onAccumulatorUpdated = callback;
set updateCallback(callback: typeof this.onStreamingUpdate) {
this.onStreamingUpdate = callback;
}
@@ -134,41 +132,53 @@ export class ContentReassembler {
}
finalizeAccumulator(): void {
finalizeReassembly(): AixChatGenerateContent_LL_Result {
// Classify termination
this.accumulator.legacyGenTokenStopReason = this._deriveTokenStopReason();
// Classify termination -> outcome + tokenStopReason + optional error message
const { outcome, tsr, errorMessage } = this._classifyTermination();
// termination -> legacy UI data pt
if (tsr) this.S.generator = { ...this.S.generator, tokenStopReason: tsr };
// termination -> User/AI issue message
if (errorMessage) this._appendErrorFragment(errorMessage);
// Fragment finalization heuristics:
// - remove placeholders for clean exists, leave them for issues or client-aborts
if (this.accumulator._terminationReason === 'done-dialect')
// - remove placeholders for clean exits, leave them for issues or client-aborts
if (this.S.terminationReason === 'done-dialect')
this._removeAllVoidPlaceholders(); // [PH-LIFECYCLE]
// - mark as completed or errored
for (const fragment of this.accumulator.fragments)
if (isVoidPlaceholderFragment(fragment) && fragment.part.opLog?.length)
for (const entry of fragment.part.opLog) {
if (entry.text?.endsWith('...')) entry.text = entry.text.slice(0, -3);
if (entry.state === 'active') {
entry.state = 'error';
entry.oTexts = [...(entry.oTexts || []), `Terminated with reason: ${this.accumulator._terminationReason ?? 'unknown'}`];
}
}
// - mark active operations as errored on non-clean terminations
if (outcome !== 'completed') {
this.S.fragments = this.S.fragments.map(fragment => {
if (!isVoidPlaceholderFragment(fragment) || !fragment.part.opLog?.length) return fragment;
const updatedOpLog = fragment.part.opLog.map(entry => {
const trimmedText = entry.text?.endsWith('...') ? entry.text.slice(0, -3) : entry.text;
if (entry.state !== 'active') return trimmedText !== entry.text ? { ...entry, text: trimmedText } : entry;
return { ...entry, text: trimmedText, state: 'error' as const, oTexts: [...(entry.oTexts || []), `Terminated with reason: ${this.S.terminationReason ?? 'unknown'}`] };
});
return { ...fragment, part: { ...fragment.part, opLog: updatedOpLog } };
});
}
// - fuse adjacent same-type fragments that were kept separate across continuation turns
// NOTE: not needed because of precise snapshotting and restoration, and upstream guarantees about completeness of fragments
// Metrics
const hadIssues = !!this.accumulator.legacyGenTokenStopReason;
metricsFinishChatGenerateLg(this.accumulator.genMetricsLg, hadIssues);
metricsFinishChatGenerateLg(this.S.cgMetricsLg, outcome !== 'completed');
// [AI Inspector] Debugging, finalize the frame
if (this.debuggerFrameId)
aixClientDebugger_completeFrame(this.debuggerFrameId);
// Return the finalized result: final fragments + generator + outcome + metrics
return {
fragments: this.S.fragments,
generator: this.S.generator,
outcome,
cgMetricsLg: this.S.cgMetricsLg,
};
}
@@ -178,11 +188,11 @@ export class ContentReassembler {
// normal user cancellation does not require error fragments
if (this.accumulator._terminationReason)
console.warn(`⚠️ [ContentReassembler] setClientAborted: overriding server termination '${this.accumulator._terminationReason}' (wire stop: ${this.accumulator._tokenStopReasonWire ?? 'none'})`);
if (this.S.terminationReason)
console.warn(`⚠️ [ContentReassembler] setClientAborted: overriding server termination '${this.S.terminationReason}' (wire stop: ${this.S.dialectStopReason ?? 'none'})`);
this.accumulator._terminationReason = 'done-client-aborted';
this.accumulator._tokenStopReasonWire = undefined; // reset, as we assume we can't know (alt: jsut leave it)
this.S.terminationReason = 'done-client-aborted';
this.S.dialectStopReason = undefined; // reset, as we assume we can't know (alt: jsut leave it)
}
setClientExcepted(errorAsText: string, errorHint?: DMessageErrorPart['hint']): void {
@@ -192,11 +202,11 @@ export class ContentReassembler {
// add the error fragment with the given message
this._appendErrorFragment(errorAsText, errorHint);
if (this.accumulator._terminationReason)
console.warn(`⚠️ [ContentReassembler] setClientExcepted: overriding server termination '${this.accumulator._terminationReason}' (wire stop: ${this.accumulator._tokenStopReasonWire ?? 'none'})`);
if (this.S.terminationReason)
console.warn(`⚠️ [ContentReassembler] setClientExcepted: overriding server termination '${this.S.terminationReason}' (wire stop: ${this.S.dialectStopReason ?? 'none'})`);
this.accumulator._terminationReason = 'issue-client-rpc';
this.accumulator._tokenStopReasonWire = undefined; // reset, as we can't assume we know (alt: jsut leave it)
this.S.terminationReason = 'issue-client-rpc';
this.S.dialectStopReason = undefined; // reset, as we can't assume we know (alt: jsut leave it)
}
async setClientRetrying(strategy: 'reconnect' | 'resume', errorMessage: string, attempt: number, maxAttempts: number, delayMs: number, causeHttp?: number, causeConn?: string) {
@@ -244,7 +254,7 @@ export class ContentReassembler {
await this.#reassembleParticle(particle);
// signal all updates
await this.onAccumulatorUpdated?.(this.accumulator, this.updateContentStarted ||= this.accumulator.fragments.length > 0);
await this.onStreamingUpdate?.(this.S, this.updateContentStarted ||= this.S.fragments.length > 0);
}
@@ -256,11 +266,11 @@ export class ContentReassembler {
// NOTE: we cannot throw here as we are part of a detached promise chain
// READ the `aixClassifyReassemblyError` that explains this in detail
//
const showAsBold = !!this.accumulator.fragments.length;
const showAsBold = !!this.S.fragments.length;
const { errorMessage } = aixClassifyReassemblyError(error, showAsBold);
this._appendReassemblyDevError(errorMessage, true);
await this.onAccumulatorUpdated?.(this.accumulator, this.updateContentStarted ||= true)?.catch(console.error);
await this.onStreamingUpdate?.(this.S, this.updateContentStarted ||= true)?.catch(console.error);
} finally {
@@ -363,10 +373,10 @@ export class ContentReassembler {
case 'aix-info':
if (op.ait === 'flow-cont') {
// break text accumulation - to reflect upstream's clean breaks of content blocks
this.accumulator._textFragmentIndex = null;
this.S._textFragmentIndex = null;
// Continuation checkpoint: create a snapshot now
this.checkpointSnapshot = structuredClone(this.accumulator);
if (DEBUG_FLOW) console.log(`[DEV] [flow] checkpoint created: ${this.accumulator.fragments.length} fragments snapshotted`);
this.checkpointState = structuredClone(this.S);
if (DEBUG_FLOW) console.log(`[DEV] [flow] checkpoint created: ${this.S.fragments.length} fragments snapshotted`);
} else
await this._removeLastVoidPlaceholderDelayed();
this.onAixInfo(op); // creates a voidPlaceholder
@@ -407,71 +417,77 @@ export class ContentReassembler {
// Appends the text to the open text part, or creates a new one if none is open
private onAppendText(particle: AixWire_Particles.TextParticleOp): void {
// add to existing TextContentFragment
const currentTextFragment = this.accumulator._textFragmentIndex !== null ? this.accumulator.fragments[this.accumulator._textFragmentIndex] : null;
if (currentTextFragment && isTextContentFragment(currentTextFragment)) {
currentTextFragment.part.text += particle.t;
// append to existing TextContentFragment
const idx = this.S._textFragmentIndex;
const currentTextFragment = idx !== null ? this.S.fragments[idx] : null;
if (idx !== null && currentTextFragment && isTextContentFragment(currentTextFragment)) {
this._replaceFragmentAt(idx, {
...currentTextFragment,
part: {
...currentTextFragment.part,
text: currentTextFragment.part.text + particle.t,
},
});
return;
}
// new TextContentFragment
const newTextFragment = createTextContentFragment(particle.t);
this.accumulator.fragments.push(newTextFragment);
this.accumulator._textFragmentIndex = this.accumulator.fragments.length - 1;
this._pushFragment(createTextContentFragment(particle.t));
this.S._textFragmentIndex = this.S.fragments.length - 1;
}
private onAppendReasoningText({ _t, restart }: Extract<AixWire_Particles.PartParticleOp, { p: 'tr_' }>): void {
// Break text accumulation
this.accumulator._textFragmentIndex = null;
this.S._textFragmentIndex = null;
// append to existing ModelAuxVoidFragment if possible
const currentFragment = this.accumulator.fragments[this.accumulator.fragments.length - 1];
const currentFragment = this.S.fragments[this.S.fragments.length - 1];
if (!restart && currentFragment && isVoidFragment(currentFragment) && isModelAuxPart(currentFragment.part)) {
const appendedPart = { ...currentFragment.part, aText: (currentFragment.part.aText || '') + _t } satisfies DVoidModelAuxPart;
this.accumulator.fragments[this.accumulator.fragments.length - 1] = { ...currentFragment, part: appendedPart };
this._replaceFragmentAt(this.S.fragments.length - 1, { ...currentFragment, part: appendedPart });
return;
}
// new ModelAuxVoidFragment
const fragment = createModelAuxVoidFragment('reasoning', _t);
this.accumulator.fragments.push(fragment);
this._pushFragment(fragment);
}
private onSetReasoningSignature({ signature }: Extract<AixWire_Particles.PartParticleOp, { p: 'trs' }>): void {
// set to existing ModelAuxVoidFragment if possible
const currentFragment = this.accumulator.fragments[this.accumulator.fragments.length - 1];
const currentFragment = this.S.fragments[this.S.fragments.length - 1];
if (currentFragment && isVoidFragment(currentFragment) && isModelAuxPart(currentFragment.part)) {
const setPart = { ...currentFragment.part, textSignature: signature } satisfies DVoidModelAuxPart;
this.accumulator.fragments[this.accumulator.fragments.length - 1] = { ...currentFragment, part: setPart };
this._replaceFragmentAt(this.S.fragments.length - 1, { ...currentFragment, part: setPart });
return;
}
// if for some reason there's no ModelAuxVoidFragment, create one
const fragment = createModelAuxVoidFragment('reasoning', '', signature);
this.accumulator.fragments.push(fragment);
this._pushFragment(fragment);
}
private onAddRedactedDataParcel({ _data }: Extract<AixWire_Particles.PartParticleOp, { p: 'trr_' }>): void {
// add to existing ModelAuxVoidFragment if possible
const currentFragment = this.accumulator.fragments[this.accumulator.fragments.length - 1];
const currentFragment = this.S.fragments[this.S.fragments.length - 1];
if (currentFragment && isVoidFragment(currentFragment) && isModelAuxPart(currentFragment.part)) {
const appendedPart = { ...currentFragment.part, redactedData: [...(currentFragment.part.redactedData || []), _data] } satisfies DVoidModelAuxPart;
this.accumulator.fragments[this.accumulator.fragments.length - 1] = { ...currentFragment, part: appendedPart };
this._replaceFragmentAt(this.S.fragments.length - 1, { ...currentFragment, part: appendedPart });
return;
}
// create a new ModelAuxVoidFragment for redacted thinking
const fragment = createModelAuxVoidFragment('reasoning', '', undefined, [_data]);
this.accumulator.fragments.push(fragment);
this._pushFragment(fragment);
}
private onStartFunctionCallInvocation(fci: Extract<AixWire_Particles.PartParticleOp, { p: 'fci' }>): void {
// Break text accumulation
this.accumulator._textFragmentIndex = null;
this.S._textFragmentIndex = null;
// Start FC accumulation
const fragment = create_FunctionCallInvocation_ContentFragment(
fci.id,
@@ -480,11 +496,11 @@ export class ContentReassembler {
);
// TODO: add _description from the Spec
// TODO: add _args_schema from the Spec
this.accumulator.fragments.push(fragment);
this._pushFragment(fragment);
}
private onAppendFunctionCallInvocationArgs(_fci: Extract<AixWire_Particles.PartParticleOp, { p: '_fci' }>): void {
const fragment = this.accumulator.fragments[this.accumulator.fragments.length - 1];
const fragment = this.S.fragments[this.S.fragments.length - 1];
if (fragment && isContentFragment(fragment) && fragment.part.pt === 'tool_invocation' && fragment.part.invocation.type === 'function_call') {
const updatedPart = {
...fragment.part,
@@ -493,25 +509,25 @@ export class ContentReassembler {
args: (fragment.part.invocation.args || '') + _fci._args,
},
};
this.accumulator.fragments[this.accumulator.fragments.length - 1] = { ...fragment, part: updatedPart };
this._replaceFragmentAt(this.S.fragments.length - 1, { ...fragment, part: updatedPart });
} else
this._appendReassemblyDevError('unexpected _fc particle without a preceding function-call');
}
private onAddCodeExecutionInvocation(cei: Extract<AixWire_Particles.PartParticleOp, { p: 'cei' }>): void {
this.accumulator.fragments.push(create_CodeExecutionInvocation_ContentFragment(cei.id, cei.language, cei.code, cei.author));
this.accumulator._textFragmentIndex = null;
this._pushFragment(create_CodeExecutionInvocation_ContentFragment(cei.id, cei.language, cei.code, cei.author));
this.S._textFragmentIndex = null;
}
private onAddCodeExecutionResponse(cer: Extract<AixWire_Particles.PartParticleOp, { p: 'cer' }>): void {
this.accumulator.fragments.push(create_CodeExecutionResponse_ContentFragment(cer.id, cer.error, cer.result, cer.executor, cer.environment));
this.accumulator._textFragmentIndex = null;
this._pushFragment(create_CodeExecutionResponse_ContentFragment(cer.id, cer.error, cer.result, cer.executor, cer.environment));
this.S._textFragmentIndex = null;
}
private async onAppendInlineAudio(particle: Extract<AixWire_Particles.PartParticleOp, { p: 'ia' }>): Promise<void> {
// Break text accumulation, as we have a full audio part in the middle
this.accumulator._textFragmentIndex = null;
this.S._textFragmentIndex = null;
const { mimeType, a_b64: base64Data, label, /*generator,*/ durationMs } = particle;
const safeLabel = label || 'Generated Audio';
@@ -522,7 +538,7 @@ export class ContentReassembler {
const audioBlob = await convert_Base64WithMimeType_To_Blob(base64Data, mimeType, 'ContentReassembler.onAppendInlineAudio');
// show a label in the message (audio fragment persistence deferred to future work)
this.accumulator.fragments.push(createTextContentFragment(`Generated audio ▶ \`${safeLabel}\`${durationMs ? ` (${Math.round(durationMs / 10) / 100}s)` : ''}`));
this._pushFragment(createTextContentFragment(`Generated audio ▶ \`${safeLabel}\`${durationMs ? ` (${Math.round(durationMs / 10) / 100}s)` : ''}`));
// Add the audio to the DBlobs DB
// const dblobAssetId = await addDBAudioAsset('global', 'app-chat', {
@@ -560,7 +576,7 @@ export class ContentReassembler {
// durationMs,
// );
// this.accumulator.fragments.push(audioContentFragment);
// this._pushFragment(audioContentFragment);
// notify caller for NorthBridge-coordinated playback
this.onInlineAudio?.({ blob: audioBlob, mimeType, label: safeLabel, durationMs });
@@ -575,7 +591,7 @@ export class ContentReassembler {
private async onAppendInlineImage(particle: Extract<AixWire_Particles.PartParticleOp, { p: 'ii' }>): Promise<void> {
// Break text accumulation, as we have a full image part in the middle
this.accumulator._textFragmentIndex = null;
this.S._textFragmentIndex = null;
let { i_b64: inputBase64, mimeType: inputType, label, generator, prompt, hintSkipResize } = particle;
const safeLabel = label || 'Generated Image';
@@ -627,7 +643,7 @@ export class ContentReassembler {
},
);
this.accumulator.fragments.push(zyncImageAssetFragmentWithLegacy);
this._pushFragment(zyncImageAssetFragmentWithLegacy);
} catch (error: any) {
console.warn('[DEV] Failed to add inline image to DBlobs:', { label, error, inputType, base64Length: inputBase64.length });
}
@@ -638,30 +654,47 @@ export class ContentReassembler {
const { title, url, num: refNumber, from: startIndex, to: endIndex, text: textSnippet, pubTs } = urlc;
// reuse existing annotations - single fragment per message
const existingFragment = this.accumulator.fragments.find(isVoidAnnotationsFragment);
if (existingFragment) {
const existingIdx = this.S.fragments.findIndex(isVoidAnnotationsFragment);
if (existingIdx >= 0) {
const existing = this.S.fragments[existingIdx];
if (!isVoidAnnotationsFragment(existing)) return; // type guard (unreachable)
// coalesce ranges if there are citations at the same URL
const sameUrlCitation = existingFragment.part.annotations.find(({ type, url: existingUrl }) => type === 'citation' && url === existingUrl);
if (!sameUrlCitation) {
existingFragment.part.annotations = [
...existingFragment.part.annotations,
createDVoidWebCitation(url, title, refNumber, startIndex, endIndex, textSnippet, pubTs),
];
} else {
if (startIndex !== undefined && endIndex !== undefined) {
sameUrlCitation.ranges = [
...sameUrlCitation.ranges,
{ startIndex, endIndex, ...(textSnippet ? { textSnippet } : {}) },
];
}
const sameUrlIdx = existing.part.annotations.findIndex(({ type, url: existingUrl }) => type === 'citation' && url === existingUrl);
if (sameUrlIdx < 0) {
// new citation URL
const updatedAnnotations = [...existing.part.annotations, createDVoidWebCitation(url, title, refNumber, startIndex, endIndex, textSnippet, pubTs)];
this._replaceFragmentAt(existingIdx, {
...existing,
part: {
...existing.part,
annotations: updatedAnnotations,
},
});
} else if (startIndex !== undefined && endIndex !== undefined) {
// add range to existing citation
const citation = existing.part.annotations[sameUrlIdx];
const updatedCitation = { ...citation, ranges: [...citation.ranges, { startIndex, endIndex, ...(textSnippet ? { textSnippet } : {}) }] };
const updatedAnnotations = existing.part.annotations.map((a, i) => i === sameUrlIdx ? updatedCitation : a);
this._replaceFragmentAt(existingIdx, {
...existing,
part: {
...existing.part,
annotations: updatedAnnotations,
},
});
}
} else {
// create the *only* annotations fragment in the message
const newCitation = createDVoidWebCitation(url, title, refNumber, startIndex, endIndex, textSnippet, pubTs);
this.accumulator.fragments.push(createAnnotationsVoidFragment([newCitation]));
this._pushFragment(createAnnotationsVoidFragment([
createDVoidWebCitation(url, title, refNumber, startIndex, endIndex, textSnippet, pubTs),
]));
}
@@ -676,197 +709,187 @@ export class ContentReassembler {
// destructure
const { text, mot, opId, state, parentOpId, iTexts, oTexts } = os;
const existingPh = this.accumulator.fragments.findLast(isVoidPlaceholderFragment);
if (!existingPh) {
const newEntry: DVoidPlaceholderMOp = {
opId,
text,
mot,
state: state ?? 'active',
...iTexts ? { iTexts } : undefined,
...oTexts ? { oTexts } : undefined,
...parentOpId ? { parentOpId } : undefined,
level: 0,
cts: Date.now(),
};
const phIdx = this.S.fragments.findLastIndex(isVoidPlaceholderFragment);
if (phIdx < 0) {
// New placeholder with initial opLog entry (root level = 0)
this.accumulator.fragments.push(createPlaceholderVoidFragment(text, undefined, undefined, [{
opId,
text,
mot,
state: state ?? 'active',
...iTexts ? { iTexts } : undefined,
...oTexts ? { oTexts } : undefined,
...parentOpId ? { parentOpId } : undefined,
level: 0,
cts: Date.now(),
}]));
this._pushFragment(createPlaceholderVoidFragment(text, undefined, undefined, [newEntry]));
// Placeholders don't affect text fragment indexing (push to end doesn't shift existing indices)
// NOTE: we could have placeholders breaking text accumulation into new fragments with `this.accumulator._textFragmentIndex = null;`, however
// NOTE: we could have placeholders breaking text accumulation into new fragments with `this.S._textFragmentIndex = null;`, however
// since placeholders are used a lot with hosted tool calls, this could lead to way too many fragments being created
return;
}
// Accumulate into existing placeholder
const part = existingPh.part;
const existingPh = this.S.fragments[phIdx];
if (!isVoidPlaceholderFragment(existingPh)) return; // type guard (unreachable)
const prevOpLog = existingPh.part.opLog ?? [];
// Takeover: operations supersede other placeholder types
delete part.pType;
delete part.aixControl;
// mutable cast: accumulator fragments are not from an immutable store
const opLog = (part.opLog ?? (part.opLog = [])) as DVoidPlaceholderMOp[];
// existing opId in opLog
const entry = opLog.find(e => e.opId === opId);
if (entry) {
// update existing operation in place
if (text) entry.text = text;
if (state) entry.state = state;
if (iTexts) entry.iTexts = iTexts;
if (oTexts) entry.oTexts = oTexts;
} else {
// append new operation - infer level from parent's level (or 0)
const level = !parentOpId ? 0 : 1 + (opLog.find(e => e.opId === parentOpId)?.level ?? 0);
opLog.push({
opId,
mot,
text,
state: state ?? 'active',
// update existing entry or append new one
const existingEntryIdx = prevOpLog.findIndex(e => e.opId === opId);
let updatedOpLog: readonly DVoidPlaceholderMOp[];
if (existingEntryIdx >= 0) {
const prev = prevOpLog[existingEntryIdx];
updatedOpLog = prevOpLog.map((e, i) => i !== existingEntryIdx ? e : {
...prev,
...text ? { text } : undefined,
...state ? { state } : undefined,
...iTexts ? { iTexts } : undefined,
...oTexts ? { oTexts } : undefined,
...parentOpId ? { parentOpId } : undefined,
level,
cts: Date.now(),
});
} else {
// infer level from parent
const level = !parentOpId ? 0 : 1 + (prevOpLog.find(e => e.opId === parentOpId)?.level ?? 0);
updatedOpLog = [...prevOpLog, { ...newEntry, level }];
}
// Top-level pText reflects latest active (or last if all done)
const latest = opLog.findLast(e => e.state === 'active') ?? opLog[opLog.length - 1];
part.pText = latest.text;
// top-level pText reflects latest active (or last if all done)
const latest = updatedOpLog.findLast(e => e.state === 'active') ?? updatedOpLog[updatedOpLog.length - 1];
const updatedPart = {
...existingPh.part,
pText: latest.text,
opLog: updatedOpLog,
};
delete updatedPart.pType; // operations supersede other placeholder types
delete updatedPart.aixControl; // operations supersede info/checkpoint markers
this._replaceFragmentAt(phIdx, { ...existingPh, part: updatedPart });
}
private onSetVendorState(vs: Extract<AixWire_Particles.PartParticleOp, { p: 'svs' }>): void {
// apply vendor state to the last created fragment
const lastFragment = this.accumulator.fragments[this.accumulator.fragments.length - 1];
const lastIdx = this.S.fragments.length - 1;
const lastFragment = this.S.fragments[lastIdx];
if (!lastFragment) {
console.warn('[ContentReassembler] Vendor state particle without preceding content fragment');
return;
}
// attach vendor state
const { vendor, state } = vs;
lastFragment.vendorState = {
...lastFragment.vendorState,
[vendor]: state,
};
this._replaceFragmentAt(lastIdx, {
...lastFragment,
vendorState: {
...lastFragment.vendorState,
[vs.vendor]: vs.state,
},
});
}
private _removeAllVoidPlaceholders(): void {
const fragments = this.accumulator.fragments;
for (let i = fragments.length - 1; i >= 0; i--)
if (isVoidPlaceholderFragment(fragments[i])) {
fragments.splice(i, 1);
if (this.accumulator._textFragmentIndex !== null && this.accumulator._textFragmentIndex > i)
this.accumulator._textFragmentIndex--;
}
this.S.fragments = this.S.fragments.filter(f => !isVoidPlaceholderFragment(f));
// _textFragmentIndex may now be invalid - null it since this runs at finalization only
this.S._textFragmentIndex = null;
}
private async _removeLastVoidPlaceholderDelayed(): Promise<boolean> {
const fragments = this.accumulator.fragments;
const idx = fragments.findLastIndex(isVoidPlaceholderFragment);
if (idx < 0) return false;
// skip if none
if (this.S.fragments.findLastIndex(isVoidPlaceholderFragment) < 0) return false;
// delay before removal
await new Promise(resolve => setTimeout(resolve, VP_PERSISTENCE_DELAY));
fragments.splice(idx, 1);
if (this.accumulator._textFragmentIndex !== null && this.accumulator._textFragmentIndex > idx)
this.accumulator._textFragmentIndex--;
// for stability, search the fragment Index again - this must not have changed, as any mutation would be queued to
// this awaited function, but better safe than sorry
const idx = this.S.fragments.findLastIndex(isVoidPlaceholderFragment);
if (idx < 0) return true; // already removed during the delay
this._spliceFragment(idx);
return true;
}
// private removeLastVoidPlaceholder(): boolean {
// const fragments = this.accumulator.fragments;
// const idx = fragments.findLastIndex(isVoidPlaceholderFragment);
// if (idx < 0) return false;
// fragments.splice(idx, 1);
// if (this.accumulator._textFragmentIndex !== null && this.accumulator._textFragmentIndex > idx)
// this.accumulator._textFragmentIndex--;
// return true;
// }
/// Rest of the data ///
/**
* Stores raw termination data from the wire - classification deferred to finalizeAccumulator()
* Stores raw termination data from the wire - classification deferred to finalizeReassembly()
*/
private onCGEnd({ terminationReason, tokenStopReason }: Extract<AixWire_Particles.ChatGenerateOp, { cg: 'end' }>): void {
this.accumulator._terminationReason = terminationReason;
this.accumulator._tokenStopReasonWire = tokenStopReason;
this.S.terminationReason = terminationReason;
this.S.dialectStopReason = tokenStopReason;
}
/**
* Cross-references both raw termination inputs to derive the DMessage-level tokenStopReason.
* Called once at finalization - the single place where wire-level → UI-level classification happens.
* Pure classification of termination state - no side effects.
* Cross-references terminationReason + dialectStopReason to derive:
* - outcome: definitive result of this LL call
* - tsr: tokenStopReason for the generator (UI detail, undefined = normal completion)
* - errorMessage: optional user/AI-facing message explaining what happened (appended by caller)
*/
private _deriveTokenStopReason(): DMessageGenerator['tokenStopReason'] | undefined {
const wire = this.accumulator._tokenStopReasonWire;
private _classifyTermination(): { outcome: AixChatGenerateTerminal_LL; tsr: DMessageGenerator['tokenStopReason']; errorMessage?: string; } {
const { terminationReason: endReason, dialectStopReason: dialectTokenStopReason } = this.S;
// First handle client terminations
if (this.accumulator._terminationReason === 'done-client-aborted')
return 'client-abort'; // client-side abort is a 'successful' termination with an incomplete message
if (this.accumulator._terminationReason === 'issue-client-rpc') {
// error fragment already appended
// issue on the client-side, such as interrupted server connection
return 'issue';
}
// -- Client-set terminations --
// if the dialect parser explicitly set a stop reason, map it to the DMessageGenerator tokenStopReason enum
if (wire) {
const mapAixStopToDmessageGeneratorStop: Record<AixWire_Particles.GCTokenStopReason, DMessageGenerator['tokenStopReason'] | undefined> = {
// normal completions
'ok': undefined,
'ok-tool_invocations': undefined,
// issues: dialect, dispatch, or client
'cg-issue': 'issue',
// interruptions
'out-of-tokens': 'out-of-tokens',
'filter-content': 'filter',
'filter-recitation': 'filter',
'filter-refusal': 'filter',
if (endReason === 'done-client-aborted')
return { outcome: 'aborted', tsr: 'client-abort' };
if (endReason === 'issue-client-rpc')
return { outcome: 'failed', tsr: 'issue' /* error fragment already appended by setClientExcepted() */ };
// -- Dialect-set dispatch terminations (model responded with an explicit stop reason at the end) --
if (dialectTokenStopReason) {
const classification: Record<AixWire_Particles.GCTokenStopReason, ReturnType<typeof this._classifyTermination>> = {
// normal completions - the model responded and stopped cleanly
'ok': { outcome: 'completed', tsr: undefined },
'ok-tool_invocations': { outcome: 'completed', tsr: undefined },
// issues from the dialect/dispatch layer
'cg-issue': { outcome: 'failed', tsr: 'issue' },
// model completed but with a specific stop condition
'out-of-tokens': { outcome: 'completed', tsr: 'out-of-tokens' },
'filter-content': { outcome: 'completed', tsr: 'filter' },
'filter-recitation': { outcome: 'completed', tsr: 'filter' },
'filter-refusal': { outcome: 'completed', tsr: 'filter' },
} as const;
if (wire in mapAixStopToDmessageGeneratorStop)
return mapAixStopToDmessageGeneratorStop[wire];
console.warn(`[ContentReassembler] Unmapped tokenStopReason from wire: ${wire}. Fallling back to terminationReason.`);
if (dialectTokenStopReason in classification)
return classification[dialectTokenStopReason];
console.warn(`[ContentReassembler] Unmapped dialectStopReason: ${dialectTokenStopReason}. Falling back to terminationReason.`);
}
// fall back to terminationReason
switch (this.accumulator._terminationReason) {
case undefined:
// SEVERE - AIX BUG: don't even know why we terminated
console.warn(`⚠️ [ContentReassembler] finished without 'terminationReason' - possible missing 'end' particle. No tokenStopReason can be derived.`);
this._appendErrorFragment('Message may be incomplete: missing completion signal.');
return undefined;
// -- Unexpected: no termination reason nor token stop reason --
if (endReason === undefined) {
// SEVERE - AIX BUG: either client terminations or an 'end' particle must be received
console.warn(`⚠️ [ContentReassembler] finished without 'terminationReason' - possible missing 'end' particle.`);
return { outcome: 'failed', tsr: 'issue', errorMessage: 'Response may be incomplete - missing completion signal.' };
}
// -- Dispatch-set terminations: AixWire_Particles.CGEndReason --
switch (endReason) {
case 'done-dialect':
// Normal completions: we DO expect a tokenStopReason
console.warn(`⚠️ [ContentReassembler] termination by dialect without 'tokenStopReason' - possible dialect parser issue. assuming ok`);
this._appendErrorFragment('Message may be incomplete: missing finish reason.');
return undefined;
// Acceptable - dialect said done but didn't provide a stop reason - likely a parser gap
console.warn(`⚠️ [ContentReassembler] termination by dialect without 'dialectStopReason' - possible dialect parser issue.`);
return { outcome: 'completed', tsr: undefined, errorMessage: 'Note: response may be incomplete - the finish reason was not provided by the model.' };
case 'done-dispatch-closed':
// Stream EOF before completion - provider closed the connection without sending a termination signal
console.warn(`⚠️ [ContentReassembler] done-dispatch-closed without tokenStopReason - possible truncation`);
this._appendErrorFragment('Message may be truncated: stream ended before completion.');
return 'issue';
case 'done-dispatch-closed': // (!) VERY COMMON
// BROKEN - Stream EOF before the dialect sent a termination signal - provider closed the connection early
console.warn(`⚠️ [ContentReassembler] done-dispatch-closed without dialectStopReason - possible truncation`);
return { outcome: 'failed', tsr: 'issue', errorMessage: 'Response may be truncated - stream ended before completion.' };
case 'done-dispatch-aborted':
// Dispatch connection may have been severed
console.warn(`⚠️ [ContentReassembler] done-dispatch-aborted - stream was aborted, likely due to connection issues. assuming client abort.`);
this._appendErrorFragment('Message may be incomplete: AI provider stream was aborted, likely due to connection issues.');
return 'client-abort';
// BROKEN - Infrastructure abort (not user-initiated) - dispatch connection severed
console.warn(`⚠️ [ContentReassembler] done-dispatch-aborted - connection lost, not user-initiated.`);
return { outcome: 'failed', tsr: 'issue', errorMessage: 'Response interrupted - the AI provider connection was lost.' };
case 'issue-dialect':
case 'issue-dispatch-rpc':
// error messages already added
return 'issue';
return { outcome: 'failed', tsr: 'issue' /* error fragments already added by upstream issue particles */};
default:
const _exhaustiveCheck: never = this.accumulator._terminationReason;
console.warn(`⚠️ [ContentReassembler] unmapped termination reason: ${this.accumulator._terminationReason} - no tokenStopReason can be derived.`);
return undefined;
const _exhaustiveCheck: never = endReason;
console.warn(`⚠️ [ContentReassembler] unmapped termination reason: ${endReason}`);
return { outcome: 'failed', tsr: undefined };
}
}
@@ -874,10 +897,14 @@ export class ContentReassembler {
// NOTE: not sure I like the flow at all here
// there seem to be some bad conditions when issues are raised while the active part is not text
if (MERGE_ISSUES_INTO_TEXT_PART_IF_OPEN) {
const currentTextFragment = this.accumulator._textFragmentIndex === null ? null
: this.accumulator.fragments[this.accumulator._textFragmentIndex];
const currentTextFragment = this.S._textFragmentIndex === null ? null
: this.S.fragments[this.S._textFragmentIndex];
if (currentTextFragment && isTextContentFragment(currentTextFragment)) {
currentTextFragment.part.text += (currentTextFragment.part.text ? '\n' : ' ') + issueText;
const idx = this.S._textFragmentIndex!;
this._replaceFragmentAt(idx, {
...currentTextFragment,
part: { ...currentTextFragment.part, text: currentTextFragment.part.text + (currentTextFragment.part.text ? '\n' : ' ') + issueText },
});
return;
}
}
@@ -886,14 +913,14 @@ export class ContentReassembler {
private onAixInfo({ ait, text }: Extract<AixWire_Particles.ChatGenerateOp, { cg: 'aix-info' }>): void {
// -> ph: show info
this.accumulator.fragments.push(createPlaceholderVoidFragment(text, undefined, {
this._pushFragment(createPlaceholderVoidFragment(text, undefined, {
ctl: 'ac-info',
ait: ait,
}));
}
private onAixRetryReset({ rScope, rClearStrategy, attempt, maxAttempts, delayMs, reason, causeHttp, causeConn }: Extract<AixWire_Particles.ChatGenerateOp, { cg: 'aix-retry-reset' }>): void {
const _prevFragments = DEBUG_FLOW ? this.accumulator.fragments.length : 0;
const _prevFragments = DEBUG_FLOW ? this.S.fragments.length : 0;
switch (rClearStrategy) {
case 'none':
// keep everything (e.g. L1 connection retries - no content streamed yet)
@@ -901,18 +928,18 @@ export class ContentReassembler {
break;
case 'since-checkpoint':
// atomic restore to checkpoint
if (!this.checkpointSnapshot)
// atomic restore to checkpoint (fall back to initial state if no checkpoint)
if (!this.checkpointState)
console.warn('[ContentReassembler] since-checkpoint restore with no checkpoint - falling back to full clear');
Object.assign(this.accumulator, structuredClone(this.checkpointSnapshot) ?? _createEmptyAccumulatorState());
Object.assign(this.S, structuredClone(this.checkpointState ?? this.initialState));
this.wireParticlesBacklog.length = 0; // should have been drained/completed already
if (DEBUG_FLOW) console.log(`[DEV] [flow] retry-reset ${rScope}: since-checkpoint (${_prevFragments} -> ${this.accumulator.fragments.length} fragments) - ${reason}`);
if (DEBUG_FLOW) console.log(`[DEV] [flow] retry-reset ${rScope}: since-checkpoint (${_prevFragments} -> ${this.S.fragments.length} fragments) - ${reason}`);
break;
case 'all':
// full wipe for reconnect scenarios (L4 client reconnect)
Object.assign(this.accumulator, _createEmptyAccumulatorState());
this.checkpointSnapshot = undefined;
Object.assign(this.S, structuredClone(this.initialState));
this.checkpointState = undefined;
this.wireParticlesBacklog.length = 0; // should have been drained/completed already
if (DEBUG_FLOW) console.log(`[DEV] [flow] retry-reset ${rScope}: all (${_prevFragments} -> 0 fragments, checkpoint discarded) - ${reason}`);
break;
@@ -925,7 +952,7 @@ export class ContentReassembler {
// -> ph: show retry status
const retryMessage = `Retrying [${attempt}/${maxAttempts}] in ${Math.round(delayMs / 100) / 10}s - ${reason}`;
this.accumulator.fragments.push(createPlaceholderVoidFragment(retryMessage, undefined, {
this._pushFragment(createPlaceholderVoidFragment(retryMessage, undefined, {
ctl: 'ec-retry',
rScope: rScope,
rAttempt: attempt,
@@ -936,16 +963,16 @@ export class ContentReassembler {
private onMetrics({ metrics }: Extract<AixWire_Particles.ChatGenerateOp, { cg: 'set-metrics' }>): void {
// type check point for AixWire_Particles.CGSelectMetrics -> DMetricsChatGenerate_Lg
this.accumulator.genMetricsLg = metrics;
metricsPendChatGenerateLg(this.accumulator.genMetricsLg);
this.S.cgMetricsLg = metrics;
metricsPendChatGenerateLg(this.S.cgMetricsLg);
}
private onModelName({ name }: Extract<AixWire_Particles.ChatGenerateOp, { cg: 'set-model' }>): void {
this.accumulator.genModelName = name;
this.S.generator = { ...this.S.generator, name };
}
private onProviderInfra({ label }: Extract<AixWire_Particles.ChatGenerateOp, { cg: 'set-provider-infra' }>): void {
this.accumulator.genProviderInfraLabel = label;
this.S.generator = { ...this.S.generator, providerInfraLabel: label };
}
private onResponseHandle({ handle }: Extract<AixWire_Particles.ChatGenerateOp, { cg: 'set-upstream-handle' }>): void {
@@ -955,7 +982,24 @@ export class ContentReassembler {
return;
}
// type check point for AixWire_Particles.ChatControlOp('set-upstream-handle') -> DUpstreamResponseHandle
this.accumulator.genUpstreamHandle = handle;
this.S.generator = { ...this.S.generator, upstreamHandle: handle };
}
// Fragment helpers - structural sharing: every mutation creates a new array reference
private _pushFragment(fragment: AixChatGenerateContent_LL['fragments'][number]): void {
this.S.fragments = [...this.S.fragments, fragment];
}
private _replaceFragmentAt(index: number, fragment: AixChatGenerateContent_LL['fragments'][number]): void {
this.S.fragments = this.S.fragments.map((f, i) => i === index ? fragment : f);
}
private _spliceFragment(index: number): void {
this.S.fragments = [...this.S.fragments.slice(0, index), ...this.S.fragments.slice(index + 1)];
if (this.S._textFragmentIndex !== null && this.S._textFragmentIndex > index)
this.S._textFragmentIndex--;
}
@@ -972,8 +1016,8 @@ export class ContentReassembler {
}
private _appendErrorFragment(errorText: string, errorHint?: DMessageErrorPart['hint']): void {
this.accumulator.fragments.push(createErrorContentFragment(errorText, errorHint));
this.accumulator._textFragmentIndex = null;
this._pushFragment(createErrorContentFragment(errorText, errorHint));
this.S._textFragmentIndex = null;
}
}
+205 -196
View File
@@ -3,8 +3,8 @@ import { findServiceAccessOrThrow } from '~/modules/llms/vendors/vendor.helpers'
import type { MaybePromise } from '~/common/types/useful.types';
import { AudioPlayer } from '~/common/util/audio/AudioPlayer';
import { DLLM, DLLMId, LLM_IF_HOTFIX_NoTemperature, LLM_IF_OAI_Responses, LLM_IF_Outputs_Audio, LLM_IF_Outputs_Image, LLM_IF_Outputs_NoText } from '~/common/stores/llms/llms.types';
import { DMessage, DMessageGenerator, messageSetGeneratorAIX_AutoLabel } from '~/common/stores/chat/chat.message';
import { DMetricsChatGenerate_Lg, metricsChatGenerateLgToMd, metricsComputeChatGenerateCostsMd } from '~/common/stores/metrics/metrics.chatgenerate';
import { DMessage, DMessageGenerator, createGeneratorAIX_AutoLabel } from '~/common/stores/chat/chat.message';
import { DMetricsChatGenerate_Lg, DMetricsChatGenerate_Md, metricsChatGenerateLgToMd, metricsComputeChatGenerateCostsMd, } from '~/common/stores/metrics/metrics.chatgenerate';
import { DModelParameterValues, getAllModelParameterValues } from '~/common/stores/llms/llms.parameters';
import { apiStream } from '~/common/util/trpc.client';
import { createErrorContentFragment, DMessageContentFragment, DMessageErrorPart, DMessageVoidFragment, isContentFragment, isErrorPart } from '~/common/stores/chat/chat.fragments';
@@ -165,24 +165,6 @@ export function aixCreateModelFromLLMOptions(
}
/**
* Accumulator for ChatGenerate output data, as it is being streamed.
* The object is modified in-place from the lower layers and passed to the callback for efficiency.
*/
export interface AixChatGenerateContent_DMessageGuts extends Pick<DMessage, 'fragments' | 'generator' | 'pendingIncomplete'> {
fragments: (DMessageContentFragment | DMessageVoidFragment /* no AttachmentFragments */)[];
// Since 'aixChatGenerateContent_DMessage_FromConversation' starts from named (before replacement from LL), we can't Extract
generator: DMessageGenerator; // Extract<DMessageGenerator, { mgt: 'aix' }>;
pendingIncomplete: boolean;
}
type StreamMessageStatus = {
outcome: 'success' | 'aborted' | 'errored',
lastDMessage: AixChatGenerateContent_DMessageGuts,
errorMessage?: string
};
interface AixClientOptions {
abortSignal: AbortSignal | 'NON_ABORTABLE'; // 'NON_ABORTABLE' is a special case for non-abortable operations
throttleParallelThreads?: number; // 0: disable, 1: default throttle (12Hz), 2+ reduce frequency with the square root
@@ -193,8 +175,19 @@ interface AixClientOptions {
}
// --- L3 - Conversation-level generation (builds chat request, error wrapping) ---
/** L3 return: final DMessage-compatible object + terminal outcome. errorMessage is only set for pre-LL errors. */
type _AixChatGenerateContent_FromConversation_Result = {
lastDMessage: AixChatGenerateContent_DMessageGuts,
outcome: AixChatGenerateTerminal_LL,
outcomeFailedMessage?: string,
};
/**
* Level 3 Generation from an LLM Id + Chat History.
* Updates use Zustand-style immutable references: .fragments and .generator are new objects on each update.
* Callers can pass the update directly to stores without deep cloning.
*/
export async function aixChatGenerateContent_DMessage_FromConversation(
// chat-inputs -> Partial<DMessage> outputs
@@ -207,9 +200,7 @@ export async function aixChatGenerateContent_DMessage_FromConversation(
// others
clientOptions: AixClientOptions,
onStreamingUpdate: (update: AixChatGenerateContent_DMessageGuts, isDone: boolean) => MaybePromise<void>,
): Promise<StreamMessageStatus> {
let errorMessage: string | undefined;
): Promise<_AixChatGenerateContent_FromConversation_Result> {
let lastDMessage: AixChatGenerateContent_DMessageGuts = {
fragments: [],
@@ -226,7 +217,7 @@ export async function aixChatGenerateContent_DMessage_FromConversation(
chatSequence: await aixCGR_ChatSequence_FromDMessagesOrThrow(chatHistoryWithoutSystemMessages),
};
await aixChatGenerateContent_DMessage_orThrow(
const { outcome, ...resultDMessage } = await aixChatGenerateContent_DMessage_orThrow(
llmId,
aixChatContentGenerateRequest,
aixCreateChatGenerateContext(aixContextName, aixContextRef),
@@ -238,47 +229,37 @@ export async function aixChatGenerateContent_DMessage_FromConversation(
},
);
return { outcome, lastDMessage: resultDMessage };
} catch (error: any) {
// this can only be a large, user-visible error, such as LLM not found
console.warn('[DEV] aixChatGenerateContentStreaming error:', { error });
// > error fragment
errorMessage = error.message || (typeof error === 'string' ? error : 'Chat stopped.');
lastDMessage.fragments.push(createErrorContentFragment(`Issue: ${errorMessage}`));
// .generator: 'issue', no pendingIncomplete
lastDMessage.generator = { ...lastDMessage.generator, tokenStopReason: 'issue' };
lastDMessage.pendingIncomplete = false;
// pre-LL error (e.g. LLM not found, service misconfigured, content assembly error) - the LL likly never ran
console.warn('[DEV] aixChatGenerateContent error:', { error });
const errorMessage = error.message || (typeof error === 'string' ? error : 'Chat stopped.');
lastDMessage = {
fragments: [...lastDMessage.fragments, createErrorContentFragment(`Issue: ${errorMessage}`)],
generator: { ...lastDMessage.generator, tokenStopReason: 'issue' },
pendingIncomplete: false,
}
return { outcome: 'failed', lastDMessage, outcomeFailedMessage: errorMessage };
}
// Derive outcome: client-abort wins (user intent), then errors/issues, then success
const tokenStopReason = lastDMessage.generator?.tokenStopReason;
const outcome: StreamMessageStatus['outcome'] =
tokenStopReason === 'client-abort' ? 'aborted'
: (errorMessage || tokenStopReason === 'issue') ? 'errored'
: 'success';
return {
outcome,
lastDMessage: lastDMessage,
errorMessage: errorMessage || undefined,
};
}
// --- L2-Simple - Text-only facade (resolves LLM, calls LL, returns string) ---
/**
* Accumulator for the simple text-only API
* L2-S - Accumulator for the simple text-only API
*/
interface AixChatGenerateText_Simple {
interface _AixChatGenerateText_Simple {
text: string | null;
generator: DMessageGenerator;
isDone: boolean;
}
/**
* Level 2 - Simpler facade to text-only inputs and text-only outputs - and nothing else. Old-school V1-like API.
* L2-S - Simpler facade to text-only inputs and text-only outputs - and nothing else. Old-school V1-like API.
*
* NOTE: this is a simplified version of the `aixChatGenerateContent_DMessage` function, with text-only inputs and outputs.
* NOTE: it's missing throttling; there's the chance we could abstract and consolidate the two functions, because they are
@@ -334,12 +315,11 @@ export async function aixChatGenerateText_Simple(
// Variable to store the final text
const state: AixChatGenerateText_Simple = {
const state: _AixChatGenerateText_Simple = {
text: null,
generator: { mgt: 'named', name: 'replace-me-ll' },
generator: createGeneratorAIX_AutoLabel(llm.vId, llm.id),
isDone: false,
};
messageSetGeneratorAIX_AutoLabel(state, llm.vId, llm.id);
// NO streaming initial notification - only notified past the first real characters
// await onTextStreamUpdate?.(dText.text, false);
@@ -353,17 +333,18 @@ export async function aixChatGenerateText_Simple(
: new AbortController().signal; // since this is a 'simple' low-stakes API, we can 'ignore' the abort signal and not enforce it with the caller
// Aix Low-Level Chat Generation - does not throw, but may return an error in the final text
const ll = await _aixChatGenerateContent_LL(
// Aix LL Chat Generation - does not throw, but may return an error in the final text
const { cgMetricsLg, outcome, ...llFinal } = await _aixChatGenerateContent_LL(
aixAccess,
aixModel,
aixChatGenerate,
aixContext,
aixStreaming,
state.generator,
abortSignal,
clientOptions?.throttleParallelThreads ?? 0,
!aixStreaming ? undefined : async (ll: AixChatGenerateContent_LL, _isDone: boolean /* we want to issue this, in case the next action is an exception */) => {
_llToText(ll, state);
_llToL2Simple(ll, state);
if (onTextStreamUpdate && state.text !== null)
await onTextStreamUpdate(state.text, false, state.generator);
},
@@ -373,8 +354,9 @@ export async function aixChatGenerateText_Simple(
state.isDone = true;
// LLM Cost computation & Aggregations
_llToText(ll, state);
_updateGeneratorCostsInPlace(state.generator, llm, `aix_chatgenerate_text-${aixContextName}`);
_llToL2Simple(llFinal, state);
const metrics = _finalizeLlmMetricsWithCosts(cgMetricsLg, llm, `aix_chatgenerate_text-${aixContextName}`);
if (metrics) state.generator = { ...state.generator, metrics };
// re-throw the user-initiated abort, as the former function catches it
@@ -386,12 +368,16 @@ export async function aixChatGenerateText_Simple(
throw new Error('AIX: Empty text response.');
// throw if there are error fragments
const errorMessage = ll.fragments
const errorMessage = llFinal.fragments
.filter(f => isContentFragment(f) && isErrorPart(f.part))
.map(f => (f.part as DMessageErrorPart).error).join('\n');
if (errorMessage)
throw new Error('AIX: Error in response: ' + errorMessage);
// throw if the outcome is failed
if (outcome === 'failed')
throw new Error('AIX: Generation failed.');
// final update
await onTextStreamUpdate?.(state.text, true, state.generator);
@@ -404,45 +390,59 @@ export async function aixChatGenerateText_Simple(
* - error -> inline error text: DO NOT THROW HERE, as the LL will catch it and add another error part with the same text
* - tool -> throw: the LL will catch it and add the error text. However when done outside the LL (secondary usage) this will throw freely
*/
function _llToText(src: AixChatGenerateContent_LL, dest: AixChatGenerateText_Simple) {
// copy over just the generator by using the accumulator -> DMessage-like copier
_llToDMessageGuts(src, {
generator: dest.generator, // target our dest's object
fragments: [], pendingIncomplete: false, // unused, mocked
});
function _llToL2Simple({ fragments, generator }: AixChatGenerateContent_LL, dest: _AixChatGenerateText_Simple) {
// transfer generator by reference - already structurally shared
dest.generator = generator;
// transform the fragments to plain text
if (src.fragments.length) {
dest.text = '';
for (let fragment of src.fragments) {
const pt = fragment.part.pt;
switch (pt) {
case 'text':
dest.text += fragment.part.text;
break;
case 'error':
dest.text += (dest.text ? '\n' : '') + fragment.part.error;
break;
case 'tool_invocation':
throw new Error(`AIX: Unexpected tool invocation ${fragment.part.invocation?.type === 'function_call' ? fragment.part.invocation.name : fragment.part.id} in the Text response.`);
case 'annotations': // citations - ignored
case 'ma': // model annotations (thinking tokens) - ignored
case 'ph': // placeholder - ignored
case 'reference': // impossible
case 'image_ref': // impossible
case 'tool_response': // impossible - stopped at the invocation already
case '_pt_sentinel': // impossible
break;
default:
const _exhaustiveCheck: never = pt;
}
// ll.fragments[] -> dest.text (with error handling)
// NOTE: similar to messageFragmentsReduceText, but with a more adapted behavior and throwing
dest.text = '';
for (let fragment of fragments) {
const pt = fragment.part.pt;
switch (pt) {
case 'text':
dest.text += fragment.part.text;
break;
case 'error':
dest.text += (dest.text ? '\n' : '') + fragment.part.error;
break;
case 'tool_invocation':
throw new Error(`AIX: Unexpected tool invocation ${fragment.part.invocation?.type === 'function_call' ? fragment.part.invocation.name : fragment.part.id} in the Text response.`);
case 'annotations': // citations - ignored
case 'ma': // model annotations (thinking tokens) - ignored
case 'ph': // placeholder - ignored
case 'reference': // impossible
case 'image_ref': // impossible
case 'tool_response': // impossible - stopped at the invocation already
case '_pt_sentinel': // impossible
break;
default:
const _exhaustiveCheck: never = pt;
}
}
}
// --- L2 - DMessage generation (resolves LLM, calls LL, finalizes costs, returns DMessageGuts + outcome) ---
/**
* Level 1 - Generates chat content using a specified LLM and ChatGenerateRequest (incl. Tools) and returns a DMessage-compatible object.
* L2 Accumulator for ChatGenerate DMessage output data, as it is being streamed.
* Uses Zustand-style immutable references: .fragments and .generator are replaced (not mutated) on each update.
*/
export interface AixChatGenerateContent_DMessageGuts extends Pick<DMessage, 'fragments' | 'generator' | 'pendingIncomplete'> {
fragments: (DMessageContentFragment | DMessageVoidFragment /* no AttachmentFragments */)[];
// Since 'aixChatGenerateContent_DMessage_FromConversation' starts from named (before replacement from LL), we can't Extract
generator: DMessageGenerator; // Extract<DMessageGenerator, { mgt: 'aix' }>;
pendingIncomplete: boolean;
}
/** L2 return type: DMessage-compatible guts + LL outcome (kept separate to prevent leaking into stores) */
type _AixChatGenerateContent_DMessageGuts_WithOutcome = AixChatGenerateContent_DMessageGuts & {
outcome: AixChatGenerateTerminal_LL;
};
/**
* Level 2 - Generates chat content using a specified LLM and ChatGenerateRequest (incl. Tools) and returns a DMessage-compatible object.
*
* Contract:
* - empty fragments means no content yet, and no error
@@ -476,7 +476,7 @@ export async function aixChatGenerateContent_DMessage_orThrow<TServiceSettings e
// others
clientOptions: AixClientOptions,
onStreamingUpdate?: (update: AixChatGenerateContent_DMessageGuts, isDone: boolean) => MaybePromise<void>,
): Promise<AixChatGenerateContent_DMessageGuts> {
): Promise<_AixChatGenerateContent_DMessageGuts_WithOutcome> {
// Aix Access
const llm = findLLMOrThrow(llmId);
@@ -493,16 +493,12 @@ export async function aixChatGenerateContent_DMessage_orThrow<TServiceSettings e
// Legacy Note: awaited OpenAI moderation check was removed (was only on this codepath)
// Aix Low-Level Chat Generation
// Aix LL Chat Generation
const dMessage: AixChatGenerateContent_DMessageGuts = {
fragments: [],
generator: { mgt: 'named', name: 'replace-me-ll' /* metrics: undefined, tokenStopReason: undefined */ },
generator: createGeneratorAIX_AutoLabel(llm.vId, llm.id), // using llm.id (not aixModel.id/ref) so we can re-select them in the UI (Beam)
pendingIncomplete: true,
};
// Note on the Generator. Besides the simple set below:
// - it will get replaced once, and then it's the same from that point on
// - using llm.id instead of aixModel.id (the ref) so we can re-select them in the UI (Beam)
messageSetGeneratorAIX_AutoLabel(dMessage, llm.vId, llm.id);
// streaming initial notification, for UI updates
await onStreamingUpdate?.(dMessage, false);
@@ -517,8 +513,16 @@ export async function aixChatGenerateContent_DMessage_orThrow<TServiceSettings e
clientOptions.abortSignal = new AbortController().signal;
}
// Aix Low-Level Chat Generation
const llAccumulator = await _aixChatGenerateContent_LL(aixAccess, aixModel, aixChatGenerate, aixContext, aixStreaming, clientOptions.abortSignal, clientOptions.throttleParallelThreads ?? 0,
// Aix LL Chat Generation
const { cgMetricsLg, outcome, ...llFinal } = await _aixChatGenerateContent_LL(
aixAccess,
aixModel,
aixChatGenerate,
aixContext,
aixStreaming,
dMessage.generator,
clientOptions.abortSignal,
clientOptions.throttleParallelThreads ?? 0,
async (ll: AixChatGenerateContent_LL, isDone: boolean) => {
if (isDone) return; // optimization, as there aren't branches between here and the final update below
if (onStreamingUpdate) {
@@ -528,77 +532,86 @@ export async function aixChatGenerateContent_DMessage_orThrow<TServiceSettings e
},
);
// Mark as complete
// Finalize DMessage
_llToDMessageGuts(llFinal, dMessage);
const metrics = _finalizeLlmMetricsWithCosts(cgMetricsLg, llm, `aix_chatgenerate_content-${aixContext.name}`);
if (metrics) dMessage.generator = { ...dMessage.generator, metrics };
dMessage.pendingIncomplete = false;
// LLM Cost computation & Aggregations
_llToDMessageGuts(llAccumulator, dMessage);
_updateGeneratorCostsInPlace(dMessage.generator, llm, `aix_chatgenerate_content-${aixContext.name}`);
// final update (could ignore and take the dMessage)
// final update
await onStreamingUpdate?.(dMessage, true);
return dMessage;
// return DMessageGuts spread + outcome
return { ...dMessage, outcome };
}
function _llToDMessageGuts(src: AixChatGenerateContent_LL, dest: AixChatGenerateContent_DMessageGuts) {
// replace the fragments if we have any
if (src.fragments.length)
dest.fragments = src.fragments; // Note: this gets replaced once, and then it's the same from that point on
// replace the generator pieces
if (src.genMetricsLg)
dest.generator.metrics = metricsChatGenerateLgToMd(src.genMetricsLg); // reduce the size to store in DMessage
if (src.genModelName)
dest.generator.name = src.genModelName;
if (src.genProviderInfraLabel)
dest.generator.providerInfraLabel = src.genProviderInfraLabel;
if (src.genUpstreamHandle)
dest.generator.upstreamHandle = src.genUpstreamHandle;
if (src.legacyGenTokenStopReason)
dest.generator.tokenStopReason = src.legacyGenTokenStopReason;
function _llToDMessageGuts({ fragments, generator }: AixChatGenerateContent_LL, dest: AixChatGenerateContent_DMessageGuts) {
// transfer fragments by reference - safe because the accumulator replaces (never mutates) its arrays
// cast: LL enforces ReadonlyArray (no .push on accumulator), DMessage uses mutable arrays - the boundary is here
dest.fragments = fragments as (DMessageContentFragment | DMessageVoidFragment)[];
// transfer generator by reference - already structurally shared by the reassembler
dest.generator = generator;
}
function _updateGeneratorCostsInPlace(generator: DMessageGenerator, llm: DLLM, debugCostSource: string) {
function _finalizeLlmMetricsWithCosts(cgMetricsLg: undefined | DMetricsChatGenerate_Lg, llm: DLLM, debugCostSource: string): undefined | DMetricsChatGenerate_Md {
// Compute the Md metrics from Lg
let metricsMd = cgMetricsLg ? metricsChatGenerateLgToMd(cgMetricsLg) : undefined;
// Compute costs
const logLlmRefId = getAllModelParameterValues(llm.initialParameters, llm.userParameters).llmRef || llm.id;
const adjChatPricing = llmChatPricing_adjusted(llm);
const costs = metricsComputeChatGenerateCostsMd(generator.metrics, adjChatPricing, logLlmRefId);
const costs = metricsComputeChatGenerateCostsMd(metricsMd, adjChatPricing, logLlmRefId);
if (!costs) {
// FIXME: we shall warn that the costs are missing, as the only way to get pricing is through surfacing missing prices
return;
return metricsMd;
}
// Add the costs to the generator.metrics object
if (generator.metrics)
Object.assign(generator.metrics, costs);
metricsMd = { ...metricsMd /* TIn, TOut, ... */, ...costs /* $c, ... $code */ };
// Run aggregations
const m = generator.metrics;
const m = metricsMd;
const inputTokens = (m?.TIn || 0) + (m?.TCacheRead || 0) + (m?.TCacheWrite || 0);
const outputTokens = (m?.TOut || 0) /* + (m?.TOutR || 0) THIS IS A BREAKDOWN, IT'S ALREADY IN */;
metricsStoreAddChatGenerate(costs, inputTokens, outputTokens, llm, debugCostSource);
// Merge costs into a new generator
return metricsMd;
}
// --- LL Low-Level (Level 1) - Streaming loop with retry/reassembler ---
/**
* Accumulator for Lower Level ChatGenerate output data, as it is being streamed.
* The object is modified in-place and passed to the callback for efficiency.
* Streaming accumulator for LL ChatGenerate - the live view during streaming.
*
* Structural sharing contract (Zustand-style):
* - .fragments and .generator are REPLACED with new references on each LL update, never mutated in place
* - Callers receive stable snapshots by reference - safe to forward to stores, do not mutate
*/
export interface AixChatGenerateContent_LL {
// source of truth for any caller
// - empty array means no content yet, and no error
fragments: (DMessageContentFragment | DMessageVoidFragment)[];
// pieces of generator
genMetricsLg?: DMetricsChatGenerate_Lg;
genModelName?: string;
genProviderInfraLabel?: string;
genUpstreamHandle?: DMessageGenerator['upstreamHandle'];
legacyGenTokenStopReason?: DMessageGenerator['tokenStopReason'];
fragments: ReadonlyArray<DMessageContentFragment | DMessageVoidFragment>;
generator: Readonly<DMessageGenerator>;
}
/**
* Low-level-0 client-side ChatGenerateContent, with optional streaming.
* Finalized LL result - extends the streaming accumulator with fields only available after finalization.
*/
export interface AixChatGenerateContent_LL_Result extends AixChatGenerateContent_LL {
outcome: AixChatGenerateTerminal_LL;
// Lg metrics - kept separate from generator.metrics (Md) because Lg is richer and used for final summaries
cgMetricsLg?: DMetricsChatGenerate_Lg;
}
/**
* Terminal state of a single LL generation call - why it stopped.
* - 'completed': model responded normally (content may have tokenStopReason detail like out-of-tokens or filter)
* - 'aborted': user cancelled
* - 'failed': error occurred (error fragments carry the detail)
*/
export type AixChatGenerateTerminal_LL = 'completed' | 'aborted' | 'failed';
/**
* LL (Level 1) - Client-side ChatGenerateContent, with optional streaming.
*
* Contract:
* - empty fragments means no content yet, and no error
@@ -622,15 +635,15 @@ export interface AixChatGenerateContent_LL {
* - other special parts include the Anthropic Caching hints, on select message
* @param aixContext specifies the scope of the caller, such as what's the high level objective of this call
* @param aixStreaming requests the source to provide incremental updates
* @param initialGenerator generator initial value, which will be updated for every new piece of information received
* @param abortSignal allows the caller to stop the operation
* @param throttleParallelThreads allows the caller to limit the number of parallel threads
*
* The output is an accumulator object with the fragments, and the generator
* pieces (metrics, model name, token stop reason)
* The output is an accumulator object with the fragments and generator.
*
* @param onGenerateContentUpdate updated with the same accumulator at every step, and at the end (with isDone=true)
* @returns the final accumulator object
* @throws Error if there are rare low-level errors, or if [CSF] client-side fails to load
* @throws Error if there are rare LL errors, or if [CSF] client-side fails to load
*
*/
async function _aixChatGenerateContent_LL(
@@ -641,16 +654,17 @@ async function _aixChatGenerateContent_LL(
aixContext: AixAPI_Context_ChatGenerate,
aixStreaming: boolean,
// others
initialGenerator: DMessageGenerator,
abortSignal: AbortSignal,
throttleParallelThreads: number | undefined,
// optional streaming callback: not fired until the first piece of content
onGenerateContentUpdate?: (accumulator: AixChatGenerateContent_LL, isDone: boolean) => MaybePromise<void>,
): Promise<AixChatGenerateContent_LL> {
): Promise<AixChatGenerateContent_LL_Result> {
// Inspector support - can be requested by the client, but granted on the server side
const inspectorEnabled = getAixInspectorEnabled();
const inspectorTransport = inspectorEnabled ? aixAccess.clientSideFetch ? 'csf' as const : 'trpc' as const : undefined;
const inspectorContext = inspectorEnabled ? { contextName: aixContext.name, contextRef: aixContext.ref } : undefined;
const inspectorTransport = !inspectorEnabled ? undefined : aixAccess.clientSideFetch ? 'csf' : 'trpc';
const inspectorContext = !inspectorEnabled ? undefined : { contextName: aixContext.name, contextRef: aixContext.ref };
// [DEV] Inspector - request body override
const requestBodyOverrideJson = inspectorEnabled && aixClientDebuggerGetRBO();
@@ -683,6 +697,7 @@ async function _aixChatGenerateContent_LL(
// Particles Reassembler - owns the accumulator, reused across Client-side retries
const reassembler = new ContentReassembler(
initialGenerator,
inspectorTransport,
inspectorContext,
getLabsLosslessImages(),
@@ -694,10 +709,10 @@ async function _aixChatGenerateContent_LL(
.finally(() => URL.revokeObjectURL(audioUrl));
},
);
const accumulator_LL = reassembler.accumulator; // stable ref - readonly, same object throughout
const accumulator_LL = reassembler.S; // stable ref - readonly, same object throughout
// Retry/Reconnect - low-level state machine
// Retry/Reconnect - LL state machine
// - reconnect: for server overload/busy (429, 503, 502) and transient errors
// - resume: for network disconnects with OpenAI Responses API handle
const rsm = new AixStreamRetry(0, 0); // sensible: 3, 2
@@ -705,7 +720,7 @@ async function _aixChatGenerateContent_LL(
while (true) {
// fresh decimated callback per iteration (decimator has start/stop lifecycle)
const sendContentUpdate = !onGenerateContentUpdate ? undefined : withDecimator(throttleParallelThreads ?? 0, 'aicChatGenerateContent', async (accumulator: AixChatGenerateContent_LL, contentStarted: boolean) => {
const sendContentUpdate = !onGenerateContentUpdate ? undefined : withDecimator(throttleParallelThreads ?? 0, 'aixChatGenerateContent', async (accumulator: AixChatGenerateContent_LL, contentStarted: boolean) => {
/**
* We want the first caller's update to have actual content.
* However note that we won't be sending out the model name very fast this way,
@@ -725,7 +740,7 @@ async function _aixChatGenerateContent_LL(
let particleStream: AsyncIterable<AixWire_Particles.ChatGenerateOp, void>;
// AIX [CSM] Direct Execution
if (!accumulator_LL.genUpstreamHandle && clientSideChatGenerate)
if (!accumulator_LL.generator.upstreamHandle && clientSideChatGenerate)
particleStream = clientSideChatGenerate(
aixAccess,
aixModel,
@@ -737,7 +752,7 @@ async function _aixChatGenerateContent_LL(
);
// AIX tRPC Streaming Generation from Chat input
else if (!accumulator_LL.genUpstreamHandle)
else if (!accumulator_LL.generator.upstreamHandle)
particleStream = await apiStream.aix.chatGenerateContent.mutate({
access: aixAccess,
model: aixModel,
@@ -747,11 +762,11 @@ async function _aixChatGenerateContent_LL(
connectionOptions: aixConnectionOptions,
}, { signal: abortSignal });
// AIX tRPC Streaming re-attachment from handle - for low-level auto-resume
// AIX tRPC Streaming re-attachment from handle - for LL auto-resume
else
particleStream = await apiStream.aix.reattachContent.mutate({
access: aixAccess,
resumeHandle: accumulator_LL.genUpstreamHandle,
resumeHandle: accumulator_LL.generator.upstreamHandle,
context: aixContext,
streaming: true,
connectionOptions: aixConnectionOptions,
@@ -781,65 +796,59 @@ async function _aixChatGenerateContent_LL(
// synchronize any pending async tasks
await reassembler.waitForWireComplete();
break; // -> terminal: completed
} catch (error: any) {
// stop the deadline decimator, as we're into error handling mode now
sendContentUpdate?.stop?.();
// drain in-flight processing before any terminal/retry decision (prevents ghost fragments from mid-await particles)
await reassembler.waitForWireComplete().catch(() => {/* processing errors are already handled internally */});
// classify error
const { errorType, errorMessage } = aixClassifyStreamingError(error, abortSignal.aborted, !!accumulator_LL.fragments.length);
const maybeErrorStatusCode = error?.status || error?.response?.status || undefined;
// client-side-retry decision - resume handle from accumulator determines strategy (resume vs reconnect)
const shallRetry = rsm.shallRetry(errorType, maybeErrorStatusCode, !!accumulator_LL.genUpstreamHandle);
if (!shallRetry) {
const shallRetry = rsm.shallRetry(errorType, maybeErrorStatusCode, !!accumulator_LL.generator.upstreamHandle);
if (shallRetry) {
// NOT retryable: e.g. client-abort, or missing handle
if (errorType === 'client-aborted')
reassembler.setClientAborted();
else {
const errorHint: DMessageErrorPart['hint'] = `aix-${errorType}`; // MUST MATCH our `aixClassifyStreamingError` hints with 'aix-<type>' in DMessageErrorPart
reassembler.setClientExcepted(errorMessage, errorHint);
}
// ... fall through (traditional single path)
} else {
// drain in-flight processing before resetting state (prevents ghost fragments from mid-await particles)
try {
await reassembler.waitForWireComplete();
} catch (_) {
/* processing errors are already handled internally */
}
// fragment-notify of our ongoing retry attempt
// notify UI of our ongoing retry attempt
try {
await reassembler.setClientRetrying(shallRetry.strategy, errorMessage, shallRetry.attemptNumber, 0, shallRetry.delayMs, typeof maybeErrorStatusCode === 'number' ? maybeErrorStatusCode : undefined, errorType);
await onGenerateContentUpdate?.(accumulator_LL, false /* partial */);
} catch (e) {
// .. ignore the notification error
await onGenerateContentUpdate?.(accumulator_LL, false);
} catch (_) {
// ignore notification errors
}
// delay then RETRY
// delay then retry
const stepResult = await rsm.delayedStep(shallRetry.delayMs, abortSignal);
if (stepResult === 'completed')
continue; // -> Loop
// user-aborted during retry-backoff
reassembler.setClientAborted();
// ... fall through (aborted during backoff)
continue; // -> retry: loop
// user-aborted during retry backoff
// ...fall through to classify with the original error
}
// Terminal: not retryable, or user-aborted during retry backoff
if (errorType === 'client-aborted')
reassembler.setClientAborted();
else {
const errorHint: DMessageErrorPart['hint'] = `aix-${errorType}`; // MUST MATCH our `aixClassifyStreamingError` hints with 'aix-<type>' in DMessageErrorPart
reassembler.setClientExcepted(errorMessage, errorHint);
}
break; // -> terminal: failed or aborted
}
// NOTE: sooner or later we fall through on this code path, maybe looped or not, maybe with good data or maybe with reassembled errors...
// and we're done
reassembler.finalizeAccumulator();
// final update bypasses decimation entirely and contains complete content
await onGenerateContentUpdate?.(accumulator_LL, true /* Last message, done */);
// return the final accumulated message
return accumulator_LL;
}
// Finalize - classify termination, append error fragments, compute outcome
const llResult = reassembler.finalizeReassembly();
// final update bypasses decimation entirely and contains complete content
await onGenerateContentUpdate?.(llResult, true /* only true here */);
return llResult;
}
@@ -78,12 +78,12 @@ export async function executeGatherInstruction(_i: GatherInstruction, inputs: Ex
];
// update the UI
const onMessageUpdated = (update: AixChatGenerateContent_DMessageGuts, completed: boolean) => {
// in-place update of the intermediate message
const { fragments: incrementalFragments, ...incrementalRest } = update;
Object.assign(inputs.intermediateDMessage, incrementalRest);
if (incrementalFragments?.length) {
inputs.intermediateDMessage.fragments = incrementalFragments;
const onMessageUpdated = (messageOverwriteShallow: AixChatGenerateContent_DMessageGuts, completed: boolean) => {
// fragments and generator are already immutable (new refs per update) - no deep clone needed
const { fragments, ...rest } = messageOverwriteShallow;
Object.assign(inputs.intermediateDMessage, rest);
if (fragments?.length) {
inputs.intermediateDMessage.fragments = fragments;
inputs.intermediateDMessage.updated = Date.now();
}
if (completed)
@@ -95,7 +95,7 @@ export async function executeGatherInstruction(_i: GatherInstruction, inputs: Ex
case 'character-count':
inputs.updateInstructionComponent(
<Typography level='body-xs' sx={{ opacity: 0.5 }}>{messageFragmentsReduceText(incrementalFragments || []).length} characters</Typography>,
<Typography level='body-xs' sx={{ opacity: 0.5 }}>{messageFragmentsReduceText(fragments || []).length} characters</Typography>,
);
return;
@@ -136,8 +136,8 @@ export async function executeGatherInstruction(_i: GatherInstruction, inputs: Ex
// this message will be discarded as the abort status is checked in the next `catch`
throw new Error('Instruction Stopped.');
}
if (status.outcome === 'errored')
throw new Error(`Model execution error: ${status.errorMessage || 'Unknown error'}`);
if (status.outcome === 'failed')
throw new Error(`Model execution error: ${status.outcomeFailedMessage || 'Unknown error'}`);
// Proceed to the next step
return messageFragmentsReduceText(inputs.intermediateDMessage.fragments); // returns the PipedValueType
+10 -8
View File
@@ -61,15 +61,17 @@ function rayScatterStart(ray: BRay, llmId: DLLMId | null, inputHistory: DMessage
const abortController = new AbortController();
const onMessageUpdated = (incrementalMessage: AixChatGenerateContent_DMessageGuts, completed: boolean) => {
const { fragments: incrementalFragments, ...incrementalRest } = incrementalMessage;
const onMessageUpdated = (messageOverwriteShallow: AixChatGenerateContent_DMessageGuts, completed: boolean) => {
// fragments and generator are already immutable (new refs per update) - no deep clone needed
const { fragments, ...rest } = messageOverwriteShallow;
const hasFragments = !!fragments?.length;
_rayUpdate(ray.rayId, (ray) => ({
message: {
...ray.message,
...(incrementalFragments?.length ? { fragments: [...incrementalFragments] } : {}),
...incrementalRest,
...(hasFragments ? { fragments, updated: Date.now() } : {}),
...rest,
...(completed ? { pendingIncomplete: undefined } : {}), // clear the pending flag once the message is complete
...(incrementalFragments?.length ? { updated: Date.now() } : {}), // refresh the update timestamp once the content comes
},
}));
};
@@ -87,10 +89,10 @@ function rayScatterStart(ray: BRay, llmId: DLLMId | null, inputHistory: DMessage
const clearFragments = messageWasInterruptedAtStart(status.lastDMessage);
_rayUpdate(ray.rayId, {
...(clearFragments && { message: createDMessageEmpty('assistant') }),
status: (status.outcome === 'success') ? 'success'
status: (status.outcome === 'completed') ? 'success'
: (status.outcome === 'aborted') ? 'stopped'
: (status.outcome === 'errored') ? 'error' : 'empty',
scatterIssue: status.errorMessage || undefined,
: (status.outcome === 'failed') ? 'error' : 'empty',
scatterIssue: status.outcomeFailedMessage || undefined,
genAbortController: undefined,
});
})