AIX: Anthropic: Containers: parsing support via 'svs'

This commit is contained in:
Enrico Ros
2026-04-07 00:20:49 -07:00
parent 9cfc8c513b
commit d395fa817d
8 changed files with 66 additions and 48 deletions
+11 -22
View File
@@ -27,21 +27,9 @@ export interface DMessage {
generator?: DMessageGenerator; // Assistant generator info, and metrics
/**
* Session metadata for multi-turn agentic sessions.
*
* Enables stateful time-monotonic multi-turn interactions in a stateless architecture:
* - Parsers accumulate session values (container IDs, response handles, etc.)
* - Request builders traverse history for latest non-expired values
* - Child messages inherit parent session, new values override
*
* Pattern:
* 1. Parser extracts vendor session data → stores in sessionMetadata
* 2. Request builder finds latest value per key → includes in next request
* 3. Vendor reuses session (e.g., Anthropic container for file access, OpenAI response for reconnection)
*
* Keys namespaced by vendor: 'anthropic.container.id', 'openai.response.id'
*/
// Session metadata for multi-turn agentic sessions was considered here (see commit 3cd38f47)
// but vendor session state (container IDs, response handles) is stored on DMessageGenerator
// fields instead (upstreamContainer, upstreamHandle) - simpler plumbing, no persistence migration.
// sessionMetadata?: DMessageSessionMetadata;
userFlags?: DMessageUserFlag[]; // (UI) user-set per-message flags
@@ -60,13 +48,7 @@ export type DMessageId = string;
export type DMessageRole = 'user' | 'assistant' | 'system';
/**
* Session metadata carrying vendor-specific state across multi-turn agentic sessions.
* Namespaced keys (e.g., 'anthropic.container.id'), child inherits parent, new values override.
*
* NOTE: may use some typescript module augmentation to plug new keys and value types here.
* NOTE2: may add references to the parent sessions/unique Ids, although they may be the message itself
*/
// Superseded by DMessageGenerator.upstreamContainer / .upstreamHandle - see note above.
// export type DMessageSessionMetadata = Record<string, string | number | boolean | null>;
@@ -143,6 +125,11 @@ export type DMessageGenerator = ({
}) & {
metrics?: DMetricsChatGenerate_Md; // medium-sized metrics stored in the message
providerInfraLabel?: string; // upstream provider that served the request (e.g., OpenRouter provider routing)
upstreamContainer?: {
uct: 'vnd.ant.container',
containerId: string,
expiresAt: string, // ISO 8601 UTC timestamp (e.g., "2026-04-07T05:59:32Z")
},
upstreamHandle?: {
uht: 'vnd.oai.responses',
responseId: string,
@@ -249,6 +236,7 @@ export function duplicateDMessageGenerator(generator: Readonly<DMessageGenerator
// ...(generator.xeOpCode ? { xeOpCode: generator.xeOpCode } : {}),
...(generator.metrics ? { metrics: { ...generator.metrics } } : {}),
...(generator.providerInfraLabel ? { providerInfraLabel: generator.providerInfraLabel } : {}),
...(generator.upstreamContainer ? { upstreamContainer: { ...generator.upstreamContainer } } : {}),
...(generator.upstreamHandle ? { upstreamHandle: { ...generator.upstreamHandle } } : {}),
...(generator.tokenStopReason ? { tokenStopReason: generator.tokenStopReason } : {}),
};
@@ -259,6 +247,7 @@ export function duplicateDMessageGenerator(generator: Readonly<DMessageGenerator
aix: { ...generator.aix },
...(generator.metrics ? { metrics: { ...generator.metrics } } : {}),
...(generator.providerInfraLabel ? { providerInfraLabel: generator.providerInfraLabel } : {}),
...(generator.upstreamContainer ? { upstreamContainer: { ...generator.upstreamContainer } } : {}),
...(generator.upstreamHandle ? { upstreamHandle: { ...generator.upstreamHandle } } : {}),
...(generator.tokenStopReason ? { tokenStopReason: generator.tokenStopReason } : {}),
};
+14 -1
View File
@@ -770,6 +770,19 @@ export class ContentReassembler {
}
private onSetVendorState(vs: Extract<AixWire_Particles.PartParticleOp, { p: 'svs' }>): void {
// Promote Anthropic container state -> Generator (message-scoped, for cross-turn reuse)
if (vs.vendor === 'anthropic' && 'container' in vs.state) {
const { id, expiresAt } = vs.state.container;
if (id && expiresAt)
this.S.generator = {
...this.S.generator,
upstreamContainer: { uct: 'vnd.ant.container', containerId: id, expiresAt },
};
return; // container is message-scoped, not fragment-scoped
}
// Fragment-scoped vendor states - attach to the last fragment (e.g. Gemini thoughtSignature)
const lastIdx = this.S.fragments.length - 1;
const lastFragment = this.S.fragments[lastIdx];
if (!lastFragment) {
@@ -777,7 +790,7 @@ export class ContentReassembler {
return;
}
// attach vendor state
// attach fragment-level vendor state
this._replaceFragmentAt(lastIdx, {
...lastFragment,
vendorState: {
+5 -1
View File
@@ -748,7 +748,11 @@ export namespace AixWire_Particles {
*/
| { p: /*'mo'*/ 'vp', opId: string, text: string, mot: 'search-web' | 'gen-image' | 'code-exec', state?: 'done' | 'error', parentOpId?: string, iTexts?: string[], oTexts?: string[] }
| { p: 'urlc', title: string, url: string, num?: number, from?: number, to?: number, text?: string, pubTs?: number } // url citation - pubTs: publication timestamp
| { p: 'svs', vendor: string, state: Record<string, unknown> } // set vendor state - applies to the last emitted part (opaque protocol state)
| { p: 'svs' } & ( // set vendor state - vendor-specific opaque protocol state
| { vendor: 'anthropic', state: { container: { id: string; expiresAt: string } } } // message-level
| { vendor: 'gemini', state: { thoughtSignature: string } } // fragment-level
// | { vendor: string, state: Record<string, unknown> } // disable catch-all becasue it forces casts in type discriminations
)
;
}
@@ -487,13 +487,9 @@ export class ChatGenerateTransmitter implements IParticleTransmitter {
* Sends vendor-specific state modifier for the last emitted part.
* This attaches opaque protocol state (e.g., Gemini thoughtSignature) without polluting core part schemas.
*/
sendSetVendorState(vendor: string, state: Record<string, unknown>) {
sendSetVendorState(svs: Extract<AixWire_Particles.PartParticleOp, { p: 'svs' }>) {
// queue vendor state particle immediately after the content part has been queued (and if text, it will be emitted sooner anyway)
this.transmissionQueue.push({
p: 'svs',
vendor,
state,
} satisfies Extract<AixWire_Particles.PartParticleOp, { p: 'svs' }>);
this.transmissionQueue.push(svs);
}
/** Communicates the model name to the client */
@@ -79,9 +79,9 @@ export interface IParticleTransmitter {
/**
* Sends vendor-specific state modifier for the last emitted part.
* Used to attach opaque protocol state (e.g., Gemini thoughtSignature) without polluting core part schemas.
* Used to attach opaque protocol state (e.g., Gemini thoughtSignature, Anthropic container) without polluting core part schemas.
*/
sendSetVendorState(vendor: string, state: unknown): void;
sendSetVendorState(svs: Extract<AixWire_Particles.PartParticleOp, { p: 'svs' }>): void;
// Non-parts data //
@@ -130,18 +130,10 @@ export function createAnthropicMessageParser(): ChatGenerateParseFunction {
if (isFirstMessage)
pt.setModelName(responseMessage.model);
// -> Container metadata (for Skills)
// -> Container metadata (for Skills) - propagate to client via svs for cross-turn reuse
if (responseMessage.container) {
// TODO: [PRIORITY] Accumulate in DMessage.sessionMetadata:
// pt.setSessionMetadata('anthropic.container.id', container.id)
// pt.setSessionMetadata('anthropic.container.expiresAt', Date.parse(container.expires_at))
// Request builder will find latest values and reuse container across turns for file access.
console.log('[Anthropic] Container active:', {
id: responseMessage.container.id,
expires_at: responseMessage.container.expires_at,
skills: responseMessage.container.skills,
});
_emitContainerState(pt, responseMessage.container);
if (ANTHROPIC_DEBUG_EVENT_SEQUENCE) console.log(`ant message_start: container=${responseMessage.container.id}`);
}
if (responseMessage.usage) {
@@ -405,6 +397,10 @@ export function createAnthropicMessageParser(): ChatGenerateParseFunction {
Object.assign(responseMessage, delta);
// -> Container state update - arrives here when container was created mid-stream
if (delta.container)
_emitContainerState(pt, delta.container);
// -> Token Stop Reason
const tokenStopReason = _fromAnthropicStopReason(delta.stop_reason, 'message_delta');
if (tokenStopReason !== null)
@@ -522,6 +518,10 @@ export function createAnthropicMessageParserNS(): ChatGenerateParseFunction {
if (model)
pt.setModelName(model);
// -> Container metadata (for Skills) - propagate to client via svs for cross-turn reuse
if (container)
_emitContainerState(pt, container);
// -> Content Blocks - Non-Streaming
for (let i = 0; i < content.length; i++) {
const contentBlock = content[i];
@@ -658,9 +658,7 @@ export function createAnthropicMessageParserNS(): ChatGenerateParseFunction {
}
// --- Shared server tool result handlers (used by both S and NS parsers) ---
type _ContentBlock = AnthropicWire_API_Message_Create.Response['content'][number];
// --- Shared helpers (used by both S and NS parsers) ---
/** Ellipsize long strings for iTexts/oTexts display (keeps start + end, shows byte count in the middle) */
function _ellipsizeContext(text: string, maxBytes = 512): string {
@@ -670,6 +668,24 @@ function _ellipsizeContext(text: string, maxBytes = 512): string {
return text.slice(0, half) + ellipsis + text.slice(-half);
}
/**
* Emit container state via svs particle - reassembler promotes this to generator.upstreamContainer.
* NOTE: DMessage.sessionMetadata was designed for this (see chat.message.ts) but we use generator
* fields instead - simpler plumbing, no new persistence/migration, and ephemeral containers fit well.
*/
function _emitContainerState(pt: IParticleTransmitter, container: { id: string; expires_at: string }): void {
pt.sendSetVendorState({
p: 'svs',
vendor: 'anthropic',
state: { container: { id: container.id, expiresAt: container.expires_at } },
});
}
// --- Shared server tool result handlers (used by both S and NS parsers) ---
type _ContentBlock = AnthropicWire_API_Message_Create.Response['content'][number];
function _handleCBS_ServerToolUse(pt: IParticleTransmitter, block: Extract<_ContentBlock, { type: 'server_tool_use' }>): void {
// Server-side tool execution (e.g., web_search, web_fetch, Skills API tools)
// NOTE: We don't create tool invocations for server tools - just show operation state
@@ -296,7 +296,7 @@ export function createGeminiGenerateContentResponseParser(requestedModelName: st
}
// Set the thought signature if available
thoughtSignature && pt.sendSetVendorState('gemini', { thoughtSignature: mPart.thoughtSignature });
thoughtSignature && pt.sendSetVendorState({ p: 'svs', vendor: 'gemini', state: { thoughtSignature } });
}
// -> Candidates[0] -> Safety Ratings
+1 -1
View File
@@ -333,7 +333,7 @@ class SweepCollectorTransmitter implements IParticleTransmitter {
// Special
sendCGControl(_cgCOp: AixWire_Particles.ChatControlOp, _flushQueue?: boolean): void { /* no-op */ }
sendOperationState(_mot: 'search-web' | 'gen-image' | 'code-exec', _text: string, _opts?: any): void { /* no-op */ }
sendSetVendorState(_vendor: string, _state: unknown): void { /* no-op */ }
sendSetVendorState(_svs: any): void { /* no-op */ }
// Non-parts data
setModelName(_modelName: string): void { /* no-op */ }