From d395fa817d910e1793048032b47031c79533f8e6 Mon Sep 17 00:00:00 2001 From: Enrico Ros Date: Tue, 7 Apr 2026 00:20:49 -0700 Subject: [PATCH] AIX: Anthropic: Containers: parsing support via 'svs' --- src/common/stores/chat/chat.message.ts | 33 +++++--------- src/modules/aix/client/ContentReassembler.ts | 15 ++++++- src/modules/aix/server/api/aix.wiretypes.ts | 6 ++- .../chatGenerate/ChatGenerateTransmitter.ts | 8 +--- .../parsers/IParticleTransmitter.ts | 4 +- .../chatGenerate/parsers/anthropic.parser.ts | 44 +++++++++++++------ .../chatGenerate/parsers/gemini.parser.ts | 2 +- tools/develop/llm-parameter-sweep/sweep.ts | 2 +- 8 files changed, 66 insertions(+), 48 deletions(-) diff --git a/src/common/stores/chat/chat.message.ts b/src/common/stores/chat/chat.message.ts index 6e13ffff0..1c25bfa98 100644 --- a/src/common/stores/chat/chat.message.ts +++ b/src/common/stores/chat/chat.message.ts @@ -27,21 +27,9 @@ export interface DMessage { generator?: DMessageGenerator; // Assistant generator info, and metrics - /** - * Session metadata for multi-turn agentic sessions. - * - * Enables stateful time-monotonic multi-turn interactions in a stateless architecture: - * - Parsers accumulate session values (container IDs, response handles, etc.) - * - Request builders traverse history for latest non-expired values - * - Child messages inherit parent session, new values override - * - * Pattern: - * 1. Parser extracts vendor session data → stores in sessionMetadata - * 2. Request builder finds latest value per key → includes in next request - * 3. Vendor reuses session (e.g., Anthropic container for file access, OpenAI response for reconnection) - * - * Keys namespaced by vendor: 'anthropic.container.id', 'openai.response.id' - */ + // Session metadata for multi-turn agentic sessions was considered here (see commit 3cd38f47) + // but vendor session state (container IDs, response handles) is stored on DMessageGenerator + // fields instead (upstreamContainer, upstreamHandle) - simpler plumbing, no persistence migration. // sessionMetadata?: DMessageSessionMetadata; userFlags?: DMessageUserFlag[]; // (UI) user-set per-message flags @@ -60,13 +48,7 @@ export type DMessageId = string; export type DMessageRole = 'user' | 'assistant' | 'system'; -/** - * Session metadata carrying vendor-specific state across multi-turn agentic sessions. - * Namespaced keys (e.g., 'anthropic.container.id'), child inherits parent, new values override. - * - * NOTE: may use some typescript module augmentation to plug new keys and value types here. - * NOTE2: may add references to the parent sessions/unique Ids, although they may be the message itself - */ +// Superseded by DMessageGenerator.upstreamContainer / .upstreamHandle - see note above. // export type DMessageSessionMetadata = Record; @@ -143,6 +125,11 @@ export type DMessageGenerator = ({ }) & { metrics?: DMetricsChatGenerate_Md; // medium-sized metrics stored in the message providerInfraLabel?: string; // upstream provider that served the request (e.g., OpenRouter provider routing) + upstreamContainer?: { + uct: 'vnd.ant.container', + containerId: string, + expiresAt: string, // ISO 8601 UTC timestamp (e.g., "2026-04-07T05:59:32Z") + }, upstreamHandle?: { uht: 'vnd.oai.responses', responseId: string, @@ -249,6 +236,7 @@ export function duplicateDMessageGenerator(generator: Readonly): void { + + // Promote Anthropic container state -> Generator (message-scoped, for cross-turn reuse) + if (vs.vendor === 'anthropic' && 'container' in vs.state) { + const { id, expiresAt } = vs.state.container; + if (id && expiresAt) + this.S.generator = { + ...this.S.generator, + upstreamContainer: { uct: 'vnd.ant.container', containerId: id, expiresAt }, + }; + return; // container is message-scoped, not fragment-scoped + } + + // Fragment-scoped vendor states - attach to the last fragment (e.g. Gemini thoughtSignature) const lastIdx = this.S.fragments.length - 1; const lastFragment = this.S.fragments[lastIdx]; if (!lastFragment) { @@ -777,7 +790,7 @@ export class ContentReassembler { return; } - // attach vendor state + // attach fragment-level vendor state this._replaceFragmentAt(lastIdx, { ...lastFragment, vendorState: { diff --git a/src/modules/aix/server/api/aix.wiretypes.ts b/src/modules/aix/server/api/aix.wiretypes.ts index b98833957..d1aaa0389 100644 --- a/src/modules/aix/server/api/aix.wiretypes.ts +++ b/src/modules/aix/server/api/aix.wiretypes.ts @@ -748,7 +748,11 @@ export namespace AixWire_Particles { */ | { p: /*'mo'*/ 'vp', opId: string, text: string, mot: 'search-web' | 'gen-image' | 'code-exec', state?: 'done' | 'error', parentOpId?: string, iTexts?: string[], oTexts?: string[] } | { p: 'urlc', title: string, url: string, num?: number, from?: number, to?: number, text?: string, pubTs?: number } // url citation - pubTs: publication timestamp - | { p: 'svs', vendor: string, state: Record } // set vendor state - applies to the last emitted part (opaque protocol state) + | { p: 'svs' } & ( // set vendor state - vendor-specific opaque protocol state + | { vendor: 'anthropic', state: { container: { id: string; expiresAt: string } } } // message-level + | { vendor: 'gemini', state: { thoughtSignature: string } } // fragment-level + // | { vendor: string, state: Record } // disable catch-all becasue it forces casts in type discriminations + ) ; } diff --git a/src/modules/aix/server/dispatch/chatGenerate/ChatGenerateTransmitter.ts b/src/modules/aix/server/dispatch/chatGenerate/ChatGenerateTransmitter.ts index ed3d8659b..edeb21ba9 100644 --- a/src/modules/aix/server/dispatch/chatGenerate/ChatGenerateTransmitter.ts +++ b/src/modules/aix/server/dispatch/chatGenerate/ChatGenerateTransmitter.ts @@ -487,13 +487,9 @@ export class ChatGenerateTransmitter implements IParticleTransmitter { * Sends vendor-specific state modifier for the last emitted part. * This attaches opaque protocol state (e.g., Gemini thoughtSignature) without polluting core part schemas. */ - sendSetVendorState(vendor: string, state: Record) { + sendSetVendorState(svs: Extract) { // queue vendor state particle immediately after the content part has been queued (and if text, it will be emitted sooner anyway) - this.transmissionQueue.push({ - p: 'svs', - vendor, - state, - } satisfies Extract); + this.transmissionQueue.push(svs); } /** Communicates the model name to the client */ diff --git a/src/modules/aix/server/dispatch/chatGenerate/parsers/IParticleTransmitter.ts b/src/modules/aix/server/dispatch/chatGenerate/parsers/IParticleTransmitter.ts index ee4ac6ffd..75e35910a 100644 --- a/src/modules/aix/server/dispatch/chatGenerate/parsers/IParticleTransmitter.ts +++ b/src/modules/aix/server/dispatch/chatGenerate/parsers/IParticleTransmitter.ts @@ -79,9 +79,9 @@ export interface IParticleTransmitter { /** * Sends vendor-specific state modifier for the last emitted part. - * Used to attach opaque protocol state (e.g., Gemini thoughtSignature) without polluting core part schemas. + * Used to attach opaque protocol state (e.g., Gemini thoughtSignature, Anthropic container) without polluting core part schemas. */ - sendSetVendorState(vendor: string, state: unknown): void; + sendSetVendorState(svs: Extract): void; // Non-parts data // diff --git a/src/modules/aix/server/dispatch/chatGenerate/parsers/anthropic.parser.ts b/src/modules/aix/server/dispatch/chatGenerate/parsers/anthropic.parser.ts index 0e8bdf567..db42f8d2a 100644 --- a/src/modules/aix/server/dispatch/chatGenerate/parsers/anthropic.parser.ts +++ b/src/modules/aix/server/dispatch/chatGenerate/parsers/anthropic.parser.ts @@ -130,18 +130,10 @@ export function createAnthropicMessageParser(): ChatGenerateParseFunction { if (isFirstMessage) pt.setModelName(responseMessage.model); - // -> Container metadata (for Skills) + // -> Container metadata (for Skills) - propagate to client via svs for cross-turn reuse if (responseMessage.container) { - // TODO: [PRIORITY] Accumulate in DMessage.sessionMetadata: - // pt.setSessionMetadata('anthropic.container.id', container.id) - // pt.setSessionMetadata('anthropic.container.expiresAt', Date.parse(container.expires_at)) - // Request builder will find latest values and reuse container across turns for file access. - - console.log('[Anthropic] Container active:', { - id: responseMessage.container.id, - expires_at: responseMessage.container.expires_at, - skills: responseMessage.container.skills, - }); + _emitContainerState(pt, responseMessage.container); + if (ANTHROPIC_DEBUG_EVENT_SEQUENCE) console.log(`ant message_start: container=${responseMessage.container.id}`); } if (responseMessage.usage) { @@ -405,6 +397,10 @@ export function createAnthropicMessageParser(): ChatGenerateParseFunction { Object.assign(responseMessage, delta); + // -> Container state update - arrives here when container was created mid-stream + if (delta.container) + _emitContainerState(pt, delta.container); + // -> Token Stop Reason const tokenStopReason = _fromAnthropicStopReason(delta.stop_reason, 'message_delta'); if (tokenStopReason !== null) @@ -522,6 +518,10 @@ export function createAnthropicMessageParserNS(): ChatGenerateParseFunction { if (model) pt.setModelName(model); + // -> Container metadata (for Skills) - propagate to client via svs for cross-turn reuse + if (container) + _emitContainerState(pt, container); + // -> Content Blocks - Non-Streaming for (let i = 0; i < content.length; i++) { const contentBlock = content[i]; @@ -658,9 +658,7 @@ export function createAnthropicMessageParserNS(): ChatGenerateParseFunction { } -// --- Shared server tool result handlers (used by both S and NS parsers) --- - -type _ContentBlock = AnthropicWire_API_Message_Create.Response['content'][number]; +// --- Shared helpers (used by both S and NS parsers) --- /** Ellipsize long strings for iTexts/oTexts display (keeps start + end, shows byte count in the middle) */ function _ellipsizeContext(text: string, maxBytes = 512): string { @@ -670,6 +668,24 @@ function _ellipsizeContext(text: string, maxBytes = 512): string { return text.slice(0, half) + ellipsis + text.slice(-half); } +/** + * Emit container state via svs particle - reassembler promotes this to generator.upstreamContainer. + * NOTE: DMessage.sessionMetadata was designed for this (see chat.message.ts) but we use generator + * fields instead - simpler plumbing, no new persistence/migration, and ephemeral containers fit well. + */ +function _emitContainerState(pt: IParticleTransmitter, container: { id: string; expires_at: string }): void { + pt.sendSetVendorState({ + p: 'svs', + vendor: 'anthropic', + state: { container: { id: container.id, expiresAt: container.expires_at } }, + }); +} + + +// --- Shared server tool result handlers (used by both S and NS parsers) --- + +type _ContentBlock = AnthropicWire_API_Message_Create.Response['content'][number]; + function _handleCBS_ServerToolUse(pt: IParticleTransmitter, block: Extract<_ContentBlock, { type: 'server_tool_use' }>): void { // Server-side tool execution (e.g., web_search, web_fetch, Skills API tools) // NOTE: We don't create tool invocations for server tools - just show operation state diff --git a/src/modules/aix/server/dispatch/chatGenerate/parsers/gemini.parser.ts b/src/modules/aix/server/dispatch/chatGenerate/parsers/gemini.parser.ts index df8c4a919..bf327e645 100644 --- a/src/modules/aix/server/dispatch/chatGenerate/parsers/gemini.parser.ts +++ b/src/modules/aix/server/dispatch/chatGenerate/parsers/gemini.parser.ts @@ -296,7 +296,7 @@ export function createGeminiGenerateContentResponseParser(requestedModelName: st } // Set the thought signature if available - thoughtSignature && pt.sendSetVendorState('gemini', { thoughtSignature: mPart.thoughtSignature }); + thoughtSignature && pt.sendSetVendorState({ p: 'svs', vendor: 'gemini', state: { thoughtSignature } }); } // -> Candidates[0] -> Safety Ratings diff --git a/tools/develop/llm-parameter-sweep/sweep.ts b/tools/develop/llm-parameter-sweep/sweep.ts index 9c3520155..60947b262 100644 --- a/tools/develop/llm-parameter-sweep/sweep.ts +++ b/tools/develop/llm-parameter-sweep/sweep.ts @@ -333,7 +333,7 @@ class SweepCollectorTransmitter implements IParticleTransmitter { // Special sendCGControl(_cgCOp: AixWire_Particles.ChatControlOp, _flushQueue?: boolean): void { /* no-op */ } sendOperationState(_mot: 'search-web' | 'gen-image' | 'code-exec', _text: string, _opts?: any): void { /* no-op */ } - sendSetVendorState(_vendor: string, _state: unknown): void { /* no-op */ } + sendSetVendorState(_svs: any): void { /* no-op */ } // Non-parts data setModelName(_modelName: string): void { /* no-op */ }