Compare commits

...

8 Commits

13 changed files with 23761 additions and 499 deletions
@@ -24,7 +24,7 @@ import { animationSpinHalfPause } from '~/common/util/animUtils';
// configuration
const DATASTREAM_VISUALIZATION_DELAY = Math.round(2 * Math.PI * 1000);
const MODELOP_TIMEOUT_DELAY = 5; // seconds
const MODELOP_TIMEOUT_LIMIT = 3600; // seconds - 1hr for long ops, such as Gemini Deep Research
const MODELOP_TIMEOUT_LIMIT = 6 * 60 * 60; // seconds - 6hr for long ops, such as Gemini Deep Research
const modelOperationConfig: Record<DVoidPlaceholderMOp['mot'], { Icon: React.ElementType, color: ColorPaletteProp }> = {
'search-web': { Icon: SearchRoundedIcon, color: 'neutral' },
+6 -1
View File
@@ -336,7 +336,7 @@ export class ContentReassembler {
// PartParticleOp
case 'p' in op:
// heuristics to remove the placeholder if real user-destined content arrives
if (op.p !== '❤' && op.p !== 'vp' && op.p !== 'urlc' && op.p !== 'svs')
if (op.p !== '❤' && op.p !== 'vp' && op.p !== 'urlc' && op.p !== 'hres' && op.p !== 'svs' && op.p !== 'tr_' && op.p !== 'trs')
await this._removeLastVoidPlaceholderDelayed();
switch (op.p) {
case '❤':
@@ -880,6 +880,11 @@ export class ContentReassembler {
}
private async _removeLastVoidPlaceholderDelayed(): Promise<boolean> {
// NOTE: disabled because it also introduces visual bugs - instead added tr_ and trs exceptions on a caller
// function _isVPNotOp(f: DMessageFragment) {
// // make 'vp' fragments eligible for removal only if they have no opLog entries, like tool invocations (the explicit counterpart)
// return isVoidPlaceholderFragment(f) && !f.part.opLog?.length;
// }
// skip if none
if (this.S.fragments.findLastIndex(isVoidPlaceholderFragment) < 0) return false;
@@ -17,8 +17,13 @@ type TInputPart = z.infer<typeof GeminiInteractionsWire_API_Interactions.InputCo
*
* Scope:
* - Stateless multi-turn: `chatSequence` is flattened to role-tagged turns and sent as `input`.
* - `systemMessage` text (if any) is prepended to the first user turn; background agents do not
* accept a dedicated `system_instruction`.
* - `systemMessage` text: routing differs by agent type
* - deep-research agents REJECT the top-level `system_instruction` field (tested 2026-04-23,
* API error: "not supported for the deep-research-* agent. Please include any specific
* instructions in the input prompt instead"), so we prepend the system text into the first
* user turn.
* - non-DR agents (future MCP/Computer Use) use the native `system_instruction` field, matching
* the gemini.generateContent.ts convention for clean separation.
* - Multimodal: user and model turns carry images as content-part arrays when any image is present,
* otherwise stay as plain strings (preserves the API's convenience shape).
* - Doc parts render as text via `approxDocPart_To_String`; in-reference-to XML is prepended to the user turn.
@@ -30,8 +35,14 @@ export function aixToGeminiInteractionsCreate(model: AixAPI_Model, chatGenerateR
// Normalize: move any 'spillable' system parts (e.g. images) into a synthetic user message up front
const chatGenerate = aixSpillSystemToUser(chatGenerateRaw);
// Extract leftover system text (to be prepended to the first user turn)
const systemPrefix = _collectSystemText(chatGenerate.systemMessage);
// The API expects a bare agent id (no 'models/' prefix)
const agent = model.id.startsWith('models/') ? model.id.slice('models/'.length) : model.id;
// Deep Research agents reject `system_instruction` at the top level - we prepend to input instead
const isDeepResearch = agent.includes('deep-research');
// Extract flattened system text (consumed below - DR: prepend to first user turn; else: native field)
const systemText = _collectSystemText(chatGenerate.systemMessage);
// Walk chatSequence -> turns
const turns: TTurn[] = [];
@@ -48,11 +59,11 @@ export function aixToGeminiInteractionsCreate(model: AixAPI_Model, chatGenerateR
if (!turns.length)
throw new Error('Gemini Interactions: no usable turns (Deep Research agents require at least one user message)');
// Prepend system prefix to the FIRST user turn (skip if none exists)
if (systemPrefix) {
// DR only: prepend system text into the first user turn (native `system_instruction` rejected)
if (isDeepResearch && systemText) {
const firstUserIdx = turns.findIndex(t => t.role === 'user');
if (firstUserIdx >= 0)
turns[firstUserIdx] = { role: 'user', content: _prependSystemText(turns[firstUserIdx].content, systemPrefix) };
turns[firstUserIdx] = { role: 'user', content: _prependSystemText(turns[firstUserIdx].content, systemText) };
}
// Sanity: the API expects the last turn to be 'user' (we're asking the model to respond)
@@ -64,14 +75,22 @@ export function aixToGeminiInteractionsCreate(model: AixAPI_Model, chatGenerateR
? turns[0].content
: turns;
// The API expects a bare agent id (no 'models/' prefix)
const agent = model.id.startsWith('models/') ? model.id.slice('models/'.length) : model.id;
return {
agent,
input,
background: true,
store: true, // API rejects store=false with background=true; the poller issues DELETE after terminal status
stream: true, // SSE streaming - upstream returns event-stream (interaction.start, content.start/delta/stop, interaction.complete, done). Required for live thought_summary deltas.
// FIXME: we only support SSE streaming parsing - we used to support parsing of the final answer (with the GET) but not anymore
background: true, // required by agents; also required alongside stream=true
store: true, // keep the interaction alive so clients can reattach via SSE replay within Gemini's retention window (1d free / 55d paid)
...(isDeepResearch && {
agent_config: {
type: 'deep-research',
thinking_summaries: 'auto', // Enable thought_summary blocks - without this the API would not emit summaries during streaming
// visualization defaults to 'auto' upstream; leave unset to keep the default (agent may generate charts/images).
},
}),
// non-DR agents: use native system_instruction field (matches gemini.generateContent.ts convention)
...(!isDeepResearch && systemText && { system_instruction: systemText }),
};
}
@@ -4,12 +4,13 @@ import { bedrockAccessAsync, bedrockResolveRegion, bedrockURLMantle, bedrockURLR
import { geminiAccess } from '~/modules/llms/server/gemini/gemini.access';
import { ollamaAccess } from '~/modules/llms/server/ollama/ollama.access';
import { fetchResponseOrTRPCThrow } from '~/server/trpc/trpc.router.fetchers';
import type { AixAPI_Access, AixAPI_Model, AixAPI_ResumeHandle, AixAPIChatGenerate_Request, AixWire_Particles } from '../../api/aix.wiretypes';
import type { AixDemuxers } from '../stream.demuxers';
import { GeminiWire_API_Generate_Content } from '../wiretypes/gemini.wiretypes';
import { createGeminiInteractionsConnect, createGeminiInteractionsResumeConnect } from './connectors/gemini.interactionsPoller';
import { GeminiInteractionsWire_API_Interactions } from '../wiretypes/gemini.interactions.wiretypes';
import { aixAnthropicHostedFeatures, aixToAnthropicMessageCreate } from './adapters/anthropic.messageCreate';
import { aixToBedrockConverse } from './adapters/bedrock.converse';
@@ -170,14 +171,25 @@ export async function createChatGenerateDispatch(access: AixAPI_Access, model: A
case 'gemini':
const requestedModelName = model.id.replace('models/', '');
// [Gemini Interactions API - ALPHA TEST]
if (model.vndGeminiAPI === 'interactions-agent')
// [Gemini Interactions API - ALPHA TEST] SSE-native: POST with stream=true, upstream returns event-stream we pipe through the fast-sse demuxer.
if (model.vndGeminiAPI === 'interactions-agent') {
const request: ChatGenerateDispatchRequest = {
...geminiAccess(access, null, GeminiInteractionsWire_API_Interactions.postPath, false),
method: 'POST',
body: aixToGeminiInteractionsCreate(model, chatGenerate),
};
return {
// { request, customConnect }
...createGeminiInteractionsConnect(access, aixToGeminiInteractionsCreate(model, chatGenerate)),
request,
// Custom-connect so we can neutralize the outer retrier on HTTP errors: a retried POST would create a second (billable) Deep Research interaction upstream
customConnect: (signal) => fetchResponseOrTRPCThrow({ ...request, signal, name: 'Aix.Gemini.Interactions.create', throwWithoutName: true })
.catch((error: any) => {
if (signal.aborted) throw error; // preserve abort identity for the executor's abort classifier
throw new Error(`Gemini Interactions POST: ${error?.message || 'upstream error'}`); // rewrapping TRPCFetcherError as plain Error makes the retrier treat it as non-retryable
}),
demuxerFormat: 'fast-sse',
chatGenerateParse: createGeminiInteractionsParser(requestedModelName),
};
}
const useV1Alpha = false; // !!model.vndGeminiShowThoughts || model.vndGeminiThinkingBudget !== undefined;
return {
@@ -307,16 +319,17 @@ export async function createChatGenerateResumeDispatch(access: AixAPI_Access, re
chatGenerateParse: streaming ? createOpenAIResponsesEventParser() : createOpenAIResponseParserNS(),
};
case 'gemini':
// [Gemini Interactions] Reattach to an in-progress Deep Research interaction by polling GET.
case 'gemini': {
// [Gemini Interactions] Reattach via SSE stream - GET /interactions/{id}?stream=true replays all events from the start (intentional - client's ContentReassembler replaces message content on reattach; partial resume via last_event_id is deliberately NOT used).
if (resumeHandle.uht !== 'vnd.gem.interactions')
throw new Error(`Resume handle mismatch for gemini: expected 'vnd.gem.interactions', got '${resumeHandle.uht}'`);
const { url: _baseUrl, headers: _headers } = geminiAccess(access, null, GeminiInteractionsWire_API_Interactions.getPath(resumeHandle.runId /* Gemini interaction.id */), false);
return {
// { request, customConnect }
...createGeminiInteractionsResumeConnect(access, resumeHandle.runId /* Gemini interaction.id */),
request: { url: `${_baseUrl}${_baseUrl.includes('?') ? '&' : '?'}stream=true`, method: 'GET', headers: _headers },
demuxerFormat: 'fast-sse',
chatGenerateParse: createGeminiInteractionsParser(null /* model name unknown at resume time - caller's DMessage already has it */),
};
}
default:
const _exhaustiveCheck: never = dialect;
@@ -1,245 +0,0 @@
import type * as z from 'zod/v4';
import { fetchJsonOrTRPCThrow, fetchResponseOrTRPCThrow } from '~/server/trpc/trpc.router.fetchers';
import type { GeminiAccessSchema } from '~/modules/llms/server/gemini/gemini.access';
import { geminiAccess } from '~/modules/llms/server/gemini/gemini.access';
import { ChatGenerateDispatch } from '../chatGenerate.dispatch';
import { GeminiInteractionsWire_API_Interactions } from '../../wiretypes/gemini.interactions.wiretypes';
// configuration
const INITIAL_POLL_DELAY_MS = 3_000; // first poll happens this long after the POST accepted the job
const STEADY_POLL_INTERVAL_MS = 10_000; // subsequent polls
const MAX_SLEEP_CHUNK_MS = 1_000; // wake often to honor abort promptly
type TRequestBody = z.infer<typeof GeminiInteractionsWire_API_Interactions.RequestBody_schema>;
type TInteraction = z.infer<typeof GeminiInteractionsWire_API_Interactions.Interaction_schema>;
/**
* Gemini Interactions API - Poll-to-Stream bridge (CREATE flow).
*
* Returns a `customConnect` function suitable for `ChatGenerateDispatch.customConnect`.
*
* Flow:
* 1. POST /v1beta/interactions with `background: true` - returns the interaction id (usually with status 'in_progress')
* 2. Return a `Response` whose body is a ReadableStream of SSE frames
* 3. Background producer polls GET /v1beta/interactions/{id} every few seconds until status is terminal
* 4. Each poll writes one SSE frame (the full Interaction JSON) - the parser diffs vs. prior state
* 5. DELETE only on natural terminal status (completed/failed/cancelled). On client abort or stream
* error we leave the interaction ALIVE upstream so the client can reattach across reloads via
* `createGeminiInteractionsResumeConnect`. Gemini auto-cleans after its retention window (1d free / 55d paid).
* WARNING: this trades off orphan risk for reattach viability - if the client never reattaches,
* the upstream agent keeps running and consuming tokens until the retention window expires.
*/
export function createGeminiInteractionsConnect(access: GeminiAccessSchema, body: TRequestBody): Pick<ChatGenerateDispatch, 'request' | 'customConnect'> {
// compute URL/headers once - exposed as `request` for debug echo, and reused by the POST
const { url, headers } = geminiAccess(access, null, GeminiInteractionsWire_API_Interactions.postPath, false);
const request = { url, headers, method: 'POST' as const, body };
const connect = async function (signal: AbortSignal): Promise<Response> {
// 1. Initial POST - reuses the same url/headers we expose via `request`
const initial = await _postInteractionAt(url, headers, body, signal);
// 2. Build the streaming response seeded with the initial snapshot
return _buildStreamingResponse(access, initial.id, initial, signal);
};
return { request, customConnect: connect };
}
/**
* Gemini Interactions API - Poll-to-Stream bridge (RESUME flow).
*
* Same shape as `createGeminiInteractionsConnect`, but skips the POST step and starts
* directly from GET-polling an existing interaction id. Used by `createChatGenerateResumeDispatch`
* when reattaching to an in-progress Deep Research run after a page reload.
*/
export function createGeminiInteractionsResumeConnect(access: GeminiAccessSchema, interactionId: string): Pick<ChatGenerateDispatch, 'request' | 'customConnect'> {
// compute URL/headers for debug echo - real I/O happens in customConnect below (GET-poll loop)
const requestDummy = {
...geminiAccess(access, null, GeminiInteractionsWire_API_Interactions.getPath(interactionId), false),
method: 'GET' as const,
};
const connect = async function (signal: AbortSignal): Promise<Response> {
// no POST - jump straight into polling this known interaction id
return _buildStreamingResponse(access, interactionId, null, signal);
};
return { request: requestDummy, customConnect: connect };
}
// --- Shared stream-building logic ---
function _buildStreamingResponse(
access: GeminiAccessSchema,
interactionId: string,
initial: TInteraction | null,
signal: AbortSignal,
): Response {
const encoder = new TextEncoder();
const stream = new ReadableStream<Uint8Array>({
start(controller) {
const emitSseFrame = (interaction: TInteraction) => {
const data = JSON.stringify(interaction);
controller.enqueue(encoder.encode(`data: ${data}\n\n`));
};
void (async () => {
// `store: true` is required by the API when background=true, so Gemini retains the interaction.
// WARNING: orphan risk - we only DELETE on natural terminal status. On abort/error we leave the
// interaction alive upstream so the client can reattach across reloads. If the client never
// comes back, the agent keeps running and consuming tokens until Gemini's retention window
// (1d free / 55d paid) auto-cleans it.
let shouldDelete = false;
try {
// First frame (create flow only): seed with the POST response
if (initial) {
emitSseFrame(initial);
if (_isTerminalStatus(initial.status)) {
shouldDelete = true;
controller.close();
return;
}
await _sleepOrAbort(INITIAL_POLL_DELAY_MS, signal);
}
// Polling loop
while (!signal.aborted) {
const snapshot = await _getInteraction(access, interactionId, signal);
emitSseFrame(snapshot);
if (_isTerminalStatus(snapshot.status)) {
shouldDelete = true;
controller.close();
return;
}
await _sleepOrAbort(STEADY_POLL_INTERVAL_MS, signal);
}
// aborted by client (reload or user cancel): close cleanly, do NOT delete so the client can reattach
controller.close();
} catch (err: any) {
// surfaces to the stream consumer; executor treats as 'dispatch-read' issue
// do NOT delete on error - the interaction may still be running upstream and the client may reattach
if (signal.aborted)
controller.close();
else
controller.error(err);
} finally {
if (shouldDelete)
void _deleteInteraction(access, interactionId).catch(() => { /* ignore */ });
}
})();
},
cancel(_reason) {
// the consumer cancelled - upstream abort will have fired too
},
});
return new Response(stream, {
status: 200,
headers: { 'content-type': 'text/event-stream; charset=utf-8' },
});
}
// --- HTTP helpers ---
//
// We use the unified `fetchJsonOrTRPCThrow` / `fetchResponseOrTRPCThrow` helpers from
// `~/server/trpc/trpc.router.fetchers`. They are isomorphic (CSF-safe), auto-check
// `response.ok`, and throw a well-shaped `TRPCFetcherError` with category + httpStatus.
//
// !! RETRY LANDMINE !! The outer executor wraps `customConnect` in
// `fetchWithAbortableConnectionRetry`, which retries on any TRPCFetcherError of
// category 'connection' or 'http' (502/503/429). Letting such an error propagate
// out of `customConnect` would re-execute the initial POST, creating a DUPLICATE
// Deep Research interaction server-side. So `_postInteraction` catches and rewraps
// TRPCFetcherError as plain Error (retrier ignores unknown errors).
//
// `_getInteraction` and `_deleteInteraction` run INSIDE the ReadableStream producer,
// which is beyond the retry boundary - their TRPCFetcherErrors are surfaced via
// `controller.error(err)` -> executor's `dispatch-read` path, and `safeErrorString`
// renders the .message nicely. So those can let TRPCFetcherError through.
async function _postInteractionAt(url: string, headers: HeadersInit, body: TRequestBody, signal: AbortSignal): Promise<TInteraction> {
let rawJson: Record<string, unknown>;
try {
rawJson = await fetchJsonOrTRPCThrow<Record<string, unknown>, TRequestBody>({
url, method: 'POST', headers, body,
signal, name: 'Gemini.Interactions.create', throwWithoutName: true,
});
} catch (error: any) {
// Preserve abort identity (name='TRPCFetcherError') so the executor's abort classifier
// can route this to the clean 'done-dispatch-aborted' path. The retrier already
// short-circuits on signal.aborted, so the duplicate-POST concern below doesn't apply.
if (signal.aborted) throw error;
// Rewrap TRPCFetcherError -> plain Error so the outer retry wrapper does NOT
// re-execute this customConnect (which would duplicate the Interaction).
throw new Error(`Gemini Interactions POST: ${error?.message || 'upstream error'}`);
}
return _validateInteraction(rawJson, 'POST');
}
async function _getInteraction(access: GeminiAccessSchema, interactionId: string, signal: AbortSignal): Promise<TInteraction> {
const { url, headers } = geminiAccess(access, null, GeminiInteractionsWire_API_Interactions.getPath(interactionId), false);
const rawJson = await fetchJsonOrTRPCThrow<Record<string, unknown>>({
url, method: 'GET', headers,
signal, name: 'Gemini.Interactions.get', throwWithoutName: true,
});
return _validateInteraction(rawJson, 'GET');
}
async function _deleteInteraction(access: GeminiAccessSchema, interactionId: string): Promise<void> {
// Best-effort: own short timeout, caller swallows any throw
const ac = new AbortController();
const timer = setTimeout(() => ac.abort(), 3_000);
try {
const { url, headers } = geminiAccess(access, null, GeminiInteractionsWire_API_Interactions.deletePath(interactionId), false);
await fetchResponseOrTRPCThrow({
url, method: 'DELETE', headers,
signal: ac.signal, name: 'Gemini.Interactions.delete', throwWithoutName: true,
});
} finally {
clearTimeout(timer);
}
}
function _validateInteraction(rawJson: unknown, method: 'POST' | 'GET'): TInteraction {
const parsed = GeminiInteractionsWire_API_Interactions.Interaction_schema.safeParse(rawJson);
if (!parsed.success)
throw new Error(`Gemini Interactions ${method}: unexpected response shape (${parsed.error.message})`);
return parsed.data;
}
// --- Small utils ---
function _isTerminalStatus(status: TInteraction['status']): boolean {
return status === 'completed' || status === 'failed' || status === 'cancelled';
}
async function _sleepOrAbort(totalMs: number, signal: AbortSignal): Promise<void> {
let remaining = totalMs;
while (remaining > 0 && !signal.aborted) {
const chunk = Math.min(remaining, MAX_SLEEP_CHUNK_MS);
await new Promise<void>(resolve => {
const t = setTimeout(() => { signal.removeEventListener('abort', onAbort); resolve(); }, chunk);
const onAbort = () => { clearTimeout(t); resolve(); };
signal.addEventListener('abort', onAbort, { once: true });
});
remaining -= chunk;
}
}
@@ -15,267 +15,219 @@ const DISABLE_CITATIONS = true;
type TInteraction = z.infer<typeof GeminiInteractionsWire_API_Interactions.Interaction_schema>;
type TUsage = NonNullable<TInteraction['usage']>;
type BlockState = {
kind: 'thought' | 'text' | 'image' | 'other';
emittedCitationKeys: Set<string>; // `${url}@${start}-${end}` for de-dupe (url_citations can repeat)
};
/**
* Gemini Interactions API parser (for Deep Research and future multimodal agents).
* Gemini Interactions API SSE parser (for Deep Research and future agents).
*
* Each SSE frame carries a *full* Interaction snapshot (from POST or from a GET poll).
* The parser diffs against prior state and emits only new content.
* The upstream request is sent with `stream=true` and returns `text/event-stream` with events:
* - interaction.start / interaction.status_update / interaction.complete (lifecycle)
* - content.start / content.delta / content.stop (per-index content blocks)
* - error (non-fatal, empty payload observed mid-stream)
* - done (terminator, data: [DONE])
*
* Emission rules per output type:
* - `text` -> `pt.appendText(newSuffix)`. New url_citation annotations are emitted once.
* - `thought` -> `pt.appendReasoningText(newSuffix)`; signatures recorded via `setReasoningSignature`.
* - `image` -> `pt.appendImageInline(...)` once per index (images are whole, not incremental).
* URI-only variants emit a visible note + `console.warn` (not yet wired as fetches).
* - `audio` -> PCM -> WAV via `geminiConvertPCM2WAV`, then `pt.appendAudioInline(...)` once per index.
* - unknown types -> `console.warn` + inline `_Unsupported content block: <type>_` note, once per index.
* Non-terminating: Deep Research streams are long-lived and must not blow up on new blocks.
* The SSE demuxer (`fast-sse`) invokes this parser once per frame with `eventData` (the JSON body)
* and `eventName` (the `event:` line). We dispatch on the JSON payload's `event_type` field which
* mirrors the SSE event name - staying resilient if the demuxer drops the event name.
*
* Part boundaries: when the output type at a given index changes kind (e.g. thought -> text),
* we call `endMessagePart()` so the transmitter flushes the previous part cleanly.
* Delta variants (content.delta's `delta` payload):
* - thought_summary -> `pt.appendReasoningText(delta.content.text)`
* - text -> `pt.appendText(delta.text)`
* - text_annotation -> `pt.appendUrlCitation(...)` for each url_citation (if `DISABLE_CITATIONS` is false)
* - image -> `pt.appendImageInline(...)` (forward-looking; not observed in captures yet)
*
* Resume: GET /v1beta/interactions/{id}?stream=true[&last_event_id=<cursor>] - Gemini replays from
* the cursor (or from start if omitted). Our parser is position-idempotent within a single run
* because the transmitter's state carries across events.
*/
export function createGeminiInteractionsParser(requestedModelName: string | null): ChatGenerateParseFunction {
const parserCreationTimestamp = Date.now();
let timeToFirstEvent: number | undefined;
let timeToFirstContent: number | undefined;
// on resume we don't know the model name (the DMessage already has it) - skip emission
let modelNameSent = requestedModelName == null;
let modelNameSent = requestedModelName == null; // on resume, DMessage already has the model name
let upstreamHandleSent = false;
let operationOpId: string | null = null; // interaction id, set once; used to pair in-progress/done operation state
let operationOpId: string | null = null; // interaction id; used to pair in-progress / done operation-state updates
let operationOpenEmitted = false;
let interactionIdCache: string | null = null; // cached for the `operation-state done` emission on interaction.complete
// per-index emission state (array index in `outputs[]`)
type EmittedState = {
kind: 'text' | 'thought' | 'image' | 'audio' | 'other';
emittedTextLen: number;
emittedCitationKeys: Set<string>; // `${url}@${start}-${end}` to de-dupe
signatureSent: boolean;
mediaEmitted: boolean; // image/audio: emit only once (whole, not incremental)
otherWarned: boolean; // unknown type: warn only once per index
};
const emitted: EmittedState[] = [];
let lastOpenIdx = -1; // index of the most recently opened part; -1 = none
// per-index content-block state (mirrors what content.start declared; persists across delta/stop)
const blocks: Record<number, BlockState> = Object.create(null);
let lastOpenIdx = -1; // most recently opened content-block index; -1 = none open
return function parse(pt: IParticleTransmitter, rawEventData: string): void {
return function parse(pt: IParticleTransmitter, rawEventData: string, _eventName?: string): void {
// model name is announced once (agents don't populate modelVersion the same way)
// model name announced once (agents don't populate modelVersion the way generateContent does)
if (!modelNameSent && requestedModelName != null) {
pt.setModelName(requestedModelName);
modelNameSent = true;
}
// parse + validate
const parsed = GeminiInteractionsWire_API_Interactions.Interaction_schema.safeParse(JSON.parse(rawEventData));
if (!parsed.success)
throw new Error(`malformed interaction snapshot: ${parsed.error.message}`);
const interaction: TInteraction = parsed.data;
// `event: done` carries the literal string `[DONE]` - terminate cleanly without a JSON parse
if (rawEventData === '[DONE]')
return;
// emit the upstream handle on the first frame that has an id (enables reattach across reloads)
if (!upstreamHandleSent && interaction.id) {
pt.setUpstreamHandle(interaction.id, 'vnd.gem.interactions');
upstreamHandleSent = true;
// parse the JSON body + validate against the event-union
let rawJson: unknown;
try {
rawJson = JSON.parse(rawEventData);
} catch (e: any) {
throw new Error(`malformed SSE frame (not JSON): ${e?.message || String(e)}`);
}
// Operation state: give the UI a live progress indicator while the background agent runs.
// Pinned to the interaction id so the terminal 'done'/'error' replaces the same entry.
if (interaction.id && !operationOpId)
operationOpId = interaction.id;
if (operationOpId && !operationOpenEmitted && interaction.status === 'in_progress') {
pt.sendOperationState('search-web', 'Deep Research in progress...', { opId: operationOpId });
operationOpenEmitted = true;
const parsed = GeminiInteractionsWire_API_Interactions.StreamEvent_schema.safeParse(rawJson);
if (!parsed.success) {
// tolerate future/unknown event types rather than failing the whole stream
console.warn('[GeminiInteractions] unknown SSE event shape:', rawJson);
return;
}
const event = parsed.data;
// record time-to-first-content (first frame that carries outputs)
if (timeToFirstEvent === undefined && interaction.outputs && interaction.outputs.length > 0)
timeToFirstEvent = Date.now() - parserCreationTimestamp;
switch (event.event_type) {
// process outputs (may be absent on early in_progress frames).
// Each raw output is classified via Zod safeParse against a discriminated union.
// - Untyped/empty placeholders (`{}`, no `type` field) are skipped silently without creating
// state, so a later snapshot that populates them can classify cleanly.
// - Typed-but-unknown shapes warn once per index with a visible note (non-terminating).
const outputs = interaction.outputs ?? [];
for (let i = 0; i < outputs.length; i++) {
const raw = outputs[i] as { type?: unknown };
const rawType = typeof raw?.type === 'string' ? raw.type : null;
// --- Lifecycle ---
// skip not-yet-populated placeholder blocks silently (Deep Research pre-allocates slots)
if (rawType === null) continue;
case 'interaction.start':
interactionIdCache = event.interaction.id;
if (!upstreamHandleSent) {
pt.setUpstreamHandle(event.interaction.id, 'vnd.gem.interactions');
upstreamHandleSent = true;
}
break;
// silent-skip Deep Research internal tool call/result blocks. These are streamed as content
// alongside text/thought but shouldn't surface to the user - the top-level "Deep Research in
// progress" operation state already signals activity.
if (GeminiInteractionsWire_API_Interactions.INTERNAL_OUTPUT_TYPES.has(rawType)) continue;
case 'interaction.status_update':
interactionIdCache = event.interaction_id;
if (!upstreamHandleSent) {
pt.setUpstreamHandle(event.interaction_id, 'vnd.gem.interactions');
upstreamHandleSent = true;
}
// Surface the in-progress label the first time we see it. Pinned to the interaction id so
// the terminal done/error (emitted from interaction.complete) replaces the same entry.
if (event.status === 'in_progress' && !operationOpenEmitted) {
operationOpId = event.interaction_id;
pt.sendOperationState('search-web', 'Deep Research in progress...', { opId: operationOpId });
operationOpenEmitted = true;
}
break;
const classified = GeminiInteractionsWire_API_Interactions.KnownOutput_schema.safeParse(raw);
const kind: EmittedState['kind'] = !classified.success ? 'other' : classified.data.type;
case 'interaction.complete':
_handleInteractionComplete(pt, event.interaction, operationOpId ?? interactionIdCache, lastOpenIdx, parserCreationTimestamp, timeToFirstContent);
break;
// first time we see this index: initialize + flush previous part if switching kinds
let state = emitted[i];
if (!state) {
state = { kind, emittedTextLen: 0, emittedCitationKeys: new Set(), signatureSent: false, mediaEmitted: false, otherWarned: false };
emitted[i] = state;
// --- Content-block lifecycle ---
// close previous part if we're opening a new index (natural part boundary)
if (lastOpenIdx !== -1 && lastOpenIdx !== i)
case 'content.start': {
const kind = _classifyContentKind(event.content?.type);
blocks[event.index] = { kind, emittedCitationKeys: new Set() };
// natural part boundary: close any previously open part when switching indices
if (lastOpenIdx !== -1 && lastOpenIdx !== event.index)
pt.endMessagePart();
lastOpenIdx = i;
lastOpenIdx = event.index;
break;
}
// 'other': warn once per index with visible note, then continue
if (!classified.success) {
if (!state.otherWarned) {
console.warn(`[GeminiInteractions] unsupported output type: ${rawType}`, raw);
pt.appendText(`\n_Unsupported content block: ${rawType}_\n`);
state.otherWarned = true;
}
continue;
}
case 'content.stop':
// the final `endMessagePart` is emitted by `interaction.complete` after status evaluation,
// so we don't auto-close here - lets multi-block streams flow naturally
break;
const out = classified.data;
switch (out.type) {
case 'text': {
if (out.text.length > state.emittedTextLen) {
pt.appendText(out.text.slice(state.emittedTextLen));
state.emittedTextLen = out.text.length;
}
// url_citation annotations: loose-typed in Output_schema, validated per-item here
if (!DISABLE_CITATIONS && out.annotations) {
for (const annRaw of out.annotations) {
const annParse = GeminiInteractionsWire_API_Interactions.UrlCitationAnnotation_schema.safeParse(annRaw);
if (!annParse.success) continue; // not a url_citation (place_citation, file_citation, ...)
const ann = annParse.data;
const key = `${ann.url}@${ann.start_index ?? ''}-${ann.end_index ?? ''}`;
if (state.emittedCitationKeys.has(key)) continue;
state.emittedCitationKeys.add(key);
pt.appendUrlCitation(ann.title || ann.url, ann.url, undefined, ann.start_index, ann.end_index, undefined, undefined);
// --- Delta routing ---
case 'content.delta': {
if (timeToFirstContent === undefined)
timeToFirstContent = Date.now() - parserCreationTimestamp;
// Ensure state exists even if we missed content.start (tolerant)
if (!blocks[event.index])
blocks[event.index] = { kind: 'other', emittedCitationKeys: new Set() };
const state = blocks[event.index];
// Classify the delta payload - unknown types warn once and are dropped
const deltaParse = GeminiInteractionsWire_API_Interactions.StreamDelta_schema.safeParse(event.delta);
if (!deltaParse.success) {
// Empty deltas ({}) appear alongside placeholder blocks (e.g. internal tool slots) - silent skip
if (event.delta && Object.keys(event.delta).length === 0) break;
console.warn('[GeminiInteractions] unknown content.delta shape at index', event.index, event.delta);
break;
}
const delta = deltaParse.data;
switch (delta.type) {
case 'thought_summary':
if (delta.content?.text) pt.appendReasoningText(delta.content.text);
// Intentionally NOT re-emitting sendOperationState here. The chip is created ONCE at
// interaction.status_update and left alone - its `cts` is anchored to the run's
// createdAt via the upstream handle (reassembler line 775), so the chip's timer shows
// the true elapsed run time and survives reattach cleanly. Re-emitting would pollute
// the opLog without adding user value.
break;
case 'thought_signature':
if (delta.signature) pt.setReasoningSignature(delta.signature);
break;
case 'text':
pt.appendText(delta.text);
break;
case 'text_annotation':
if (!DISABLE_CITATIONS && delta.annotations) {
for (const annRaw of delta.annotations) {
const ann = GeminiInteractionsWire_API_Interactions.UrlCitationAnnotation_schema.safeParse(annRaw);
if (!ann.success) continue; // place_citation, file_citation, etc. - not surfaced here
const a = ann.data;
const key = `${a.url}@${a.start_index ?? ''}-${a.end_index ?? ''}`;
if (state.emittedCitationKeys.has(key)) continue;
state.emittedCitationKeys.add(key);
pt.appendUrlCitation(a.title || a.url, a.url, undefined, a.start_index, a.end_index, undefined, undefined);
}
}
}
break;
}
case 'thought': {
// summary may be a string (preview) or an array of {type:'text', text} blocks (documented shape)
const summary = typeof out.summary === 'string'
? out.summary
: Array.isArray(out.summary)
? out.summary.map(s => s.text).join('\n\n')
: '';
if (summary.length > state.emittedTextLen) {
pt.appendReasoningText(summary.slice(state.emittedTextLen));
state.emittedTextLen = summary.length;
}
if (!state.signatureSent && out.signature) {
pt.setReasoningSignature(out.signature);
state.signatureSent = true;
}
break;
}
case 'image': {
if (!state.mediaEmitted) {
if (out.data) {
// hintSkipResize=true: Deep Research images are typically figures/charts where
// PNG->jpeg recompression would degrade text legibility and fine detail.
pt.appendImageInline(out.mime_type, out.data, 'Gemini Generated Image', 'Gemini', '', true);
} else if (out.uri) {
// URI-hosted images aren't fetched here (yet); surface the link inline
console.warn('[GeminiInteractions] image output via URI is not yet fetched inline:', out.uri);
pt.appendText(`\n[Image: ${out.uri}]\n`);
} else {
console.warn('[GeminiInteractions] image output with neither data nor uri:', out);
pt.appendText(`\n_Image block without payload_\n`);
break;
case 'image':
// Forward-looking: inline bytes flow through appendImageInline; URI-only gets a visible note.
if (delta.data && delta.mime_type) {
pt.appendImageInline(delta.mime_type, delta.data, 'Gemini Generated Image', 'Gemini', '', true);
} else if (delta.uri) {
console.warn('[GeminiInteractions] image delta via URI not fetched:', delta.uri);
pt.appendText(`\n[Image: ${delta.uri}]\n`);
}
state.mediaEmitted = true;
}
break;
}
case 'audio': {
if (!state.mediaEmitted) {
if (out.data) {
const mime = out.mime_type.toLowerCase();
break;
case 'audio':
// Forward-looking: audio deltas per spec (not yet observed in DR streams). PCM needs WAV conversion; packaged formats pass through.
if (delta.data && delta.mime_type) {
const mime = delta.mime_type.toLowerCase();
const isPCM = mime.startsWith('audio/l16') || mime.includes('codec=pcm');
if (isPCM) {
try {
const wav = geminiConvertPCM2WAV(out.mime_type, out.data);
const wav = geminiConvertPCM2WAV(delta.mime_type, delta.data);
pt.appendAudioInline(wav.mimeType, wav.base64Data, 'Gemini Generated Audio', 'Gemini', wav.durationMs);
} catch (error) {
console.warn('[GeminiInteractions] audio PCM convert failed:', error);
pt.appendText(`\n_Audio conversion failed: ${String(error)}_\n`);
}
} else {
// already a packaged format (audio/wav, audio/mp3, audio/aac, ...) - pass through
pt.appendAudioInline(out.mime_type, out.data, 'Gemini Generated Audio', 'Gemini', 0);
pt.appendAudioInline(delta.mime_type, delta.data, 'Gemini Generated Audio', 'Gemini', 0);
}
} else if (out.uri) {
console.warn('[GeminiInteractions] audio output via URI is not yet fetched inline:', out.uri);
pt.appendText(`\n[Audio: ${out.uri}]\n`);
} else {
console.warn('[GeminiInteractions] audio output with neither data nor uri:', out);
pt.appendText(`\n_Audio block without payload_\n`);
}
state.mediaEmitted = true;
break;
default: {
const _exhaustive: never = delta;
break;
}
break;
}
default: {
const _exhaustiveCheck: never = out;
console.warn('[GeminiInteractions] unreachable: unhandled emittable type', { out });
break;
}
break;
}
}
// terminal states: flush current part and signal end
switch (interaction.status) {
case 'completed':
if (lastOpenIdx !== -1) pt.endMessagePart();
if (operationOpId)
pt.sendOperationState('search-web', 'Deep Research complete', { opId: operationOpId, state: 'done' });
_emitUsageMetrics(pt, interaction.usage, parserCreationTimestamp, timeToFirstEvent);
pt.setTokenStopReason('ok');
pt.setDialectEnded('done-dialect');
break;
case 'failed':
if (operationOpId)
pt.sendOperationState('search-web', 'Deep Research failed', { opId: operationOpId, state: 'error' });
_emitUsageMetrics(pt, interaction.usage, parserCreationTimestamp, timeToFirstEvent);
pt.setDialectTerminatingIssue('Deep Research interaction failed', null, 'srv-warn');
break;
case 'cancelled':
if (operationOpId)
pt.sendOperationState('search-web', 'Deep Research cancelled', { opId: operationOpId, state: 'done' });
_emitUsageMetrics(pt, interaction.usage, parserCreationTimestamp, timeToFirstEvent);
pt.setTokenStopReason('cg-issue');
pt.setDialectEnded('done-dialect');
break;
case 'requires_action':
// Not expected for Deep Research agents - fail loudly so we notice
if (operationOpId)
pt.sendOperationState('search-web', 'Deep Research needs action', { opId: operationOpId, state: 'error' });
pt.setDialectTerminatingIssue('Deep Research returned requires_action (not supported in this client)', null, 'srv-warn');
break;
case 'incomplete':
// Run stopped early (token limit, etc.). Terminate gracefully with a visible note; we keep any content already emitted.
if (lastOpenIdx !== -1) pt.endMessagePart();
if (operationOpId)
pt.sendOperationState('search-web', 'Deep Research incomplete', { opId: operationOpId, state: 'done' });
pt.appendText('\n_Response incomplete (run stopped early)._\n');
_emitUsageMetrics(pt, interaction.usage, parserCreationTimestamp, timeToFirstEvent);
pt.setTokenStopReason('out-of-tokens');
pt.setDialectEnded('done-dialect');
break;
case 'in_progress':
// keep polling
case 'error':
// Observed mid-stream with an empty payload between content blocks - non-fatal, the stream
// continues with further events and eventually an interaction.complete. Silent-skip empty
// payloads (Beta noise); warn only when actual error info is present.
if (event.error?.message || event.error?.code)
console.warn('[GeminiInteractions] SSE error event:', event.error);
break;
default: {
const _exhaustiveCheck: never = interaction.status;
console.warn('[GeminiInteractions] unreachable: unhandled status', { status: interaction.status });
const _exhaustiveCheck: never = event;
console.warn('[GeminiInteractions] unreachable: unhandled emittable type', { event });
break;
}
}
@@ -283,6 +235,82 @@ export function createGeminiInteractionsParser(requestedModelName: string | null
}
// --- helpers ---
function _classifyContentKind(rawType: unknown): BlockState['kind'] {
if (rawType === 'thought') return 'thought';
if (rawType === 'text') return 'text';
if (rawType === 'image') return 'image';
return 'other';
}
function _handleInteractionComplete(
pt: IParticleTransmitter,
interaction: TInteraction,
operationOpId: string | null,
lastOpenIdx: number,
parserCreationTimestamp: number,
timeToFirstContent: number | undefined,
): void {
// Flush any content parts that were open when the final block arrived
if (lastOpenIdx !== -1) pt.endMessagePart();
switch (interaction.status) {
case 'completed':
if (operationOpId)
pt.sendOperationState('search-web', 'Deep Research complete', { opId: operationOpId, state: 'done' });
_emitUsageMetrics(pt, interaction.usage, parserCreationTimestamp, timeToFirstContent);
pt.setTokenStopReason('ok');
pt.setDialectEnded('done-dialect');
break;
case 'failed':
if (operationOpId)
pt.sendOperationState('search-web', 'Deep Research failed', { opId: operationOpId, state: 'error' });
_emitUsageMetrics(pt, interaction.usage, parserCreationTimestamp, timeToFirstContent);
pt.setDialectTerminatingIssue('Deep Research interaction failed', null, 'srv-warn');
break;
case 'cancelled':
if (operationOpId)
pt.sendOperationState('search-web', 'Deep Research cancelled', { opId: operationOpId, state: 'done' });
_emitUsageMetrics(pt, interaction.usage, parserCreationTimestamp, timeToFirstContent);
pt.setTokenStopReason('cg-issue');
pt.setDialectEnded('done-dialect');
break;
case 'requires_action':
// Not expected for Deep Research agents - fail loudly so we notice
if (operationOpId)
pt.sendOperationState('search-web', 'Deep Research needs action', { opId: operationOpId, state: 'error' });
pt.setDialectTerminatingIssue('Deep Research returned requires_action (not supported in this client)', null, 'srv-warn');
break;
case 'incomplete':
// Run stopped early (token limit, etc.). Terminate gracefully with a visible note; we keep any content already emitted.
if (operationOpId)
pt.sendOperationState('search-web', 'Deep Research incomplete', { opId: operationOpId, state: 'done' });
pt.appendText('\n_Response incomplete (run stopped early)._\n');
_emitUsageMetrics(pt, interaction.usage, parserCreationTimestamp, timeToFirstContent);
pt.setTokenStopReason('out-of-tokens');
pt.setDialectEnded('done-dialect');
break;
case 'in_progress':
// interaction.complete with in_progress shouldn't happen per the spec - log and keep the stream open
console.warn('[GeminiInteractions] interaction.complete with status=in_progress; ignoring');
break;
default: {
const _exhaustiveCheck: never = interaction.status;
console.warn('[GeminiInteractions] unreachable status', interaction.status);
break;
}
}
}
/**
* Map Gemini Interactions `usage` to `CGSelectMetrics`.
*
@@ -292,14 +320,14 @@ export function createGeminiInteractionsParser(requestedModelName: string | null
* - `total_tool_use_tokens` is distinct from input/output/thought and accounts for internal tool calls
* (web search, code exec, etc.). For Deep Research this dominates - we fold it into TIn so displayed
* input reflects true model consumption; there is no dedicated slot in CGSelectMetrics today.
* - `total_output_tokens` excludes thought tokens; `gemini.parser.ts` (line 110) already adds TOutR into TOut
* - `total_output_tokens` excludes thought tokens; `gemini.parser.ts` already adds TOutR into TOut
* for consistency, and we follow the same convention here.
*/
function _emitUsageMetrics(
pt: IParticleTransmitter,
usage: TUsage | undefined,
parserCreationTimestamp: number,
timeToFirstEvent: number | undefined,
timeToFirstContent: number | undefined,
): void {
if (!usage) return;
@@ -324,9 +352,9 @@ function _emitUsageMetrics(
// timing
const dtAll = Date.now() - parserCreationTimestamp;
m.dtAll = dtAll;
if (timeToFirstEvent !== undefined) {
m.dtStart = timeToFirstEvent;
const dtInner = dtAll - timeToFirstEvent;
if (timeToFirstContent !== undefined) {
m.dtStart = timeToFirstContent;
const dtInner = dtAll - timeToFirstContent;
if (dtInner > 0) {
m.dtInner = dtInner;
if (totalOut > 0)
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,21 @@
#!/usr/bin/env bash
# Re-sync all upstream snapshots in this directory.
# For each *.md file: fetch the URL declared in the `Source:` header line,
# preserve the header block (everything through the closing `-->`), bump the
# `Synced:` date, and rewrite the body in place. Add a new snapshot with the
# same header convention and it gets picked up automatically.
set -euo pipefail
DIR="$(cd "$(dirname "$0")" && pwd)"
TODAY=$(date -u +%Y-%m-%d)
for f in "$DIR"/*.md; do
url=$(grep -m1 '^ Source: ' "$f" | sed 's/^ Source: //' || true)
if [ -z "${url:-}" ]; then
echo "skip (no Source: header): $(basename "$f")"
continue
fi
echo "$(basename "$f")$url"
header=$(awk '{print} /^-->$/{exit}' "$f" | sed -E "s|^ Synced: .*| Synced: $TODAY|")
body=$(curl -sSL --max-time 30 "$url")
printf '%s\n\n%s\n' "$header" "$body" > "$f"
done
echo "Done. Run \`git diff $DIR\` to see upstream changes."
@@ -6,14 +6,16 @@ import * as z from 'zod/v4';
*
* 2026-04-21: NOTE - MINIMAL IMPL for DEEP RESEARCH AGENT
* Scope: only what the Deep Research agents need.
* - Stateless (no `previous_interaction_id`)
* - `store: true` - required by the API when `background: true` (Deep Research agents are background-only).
* - Single-turn by default (we don't yet send `previous_interaction_id` for multi-turn state reuse)
* - `store: true` is sent (spec says optional; DR guide recommends it for background runs).
* We best-effort DELETE on completion/abort to minimize server-side retention.
* - Single-turn: last user text becomes `input`
* - No tools, no system_instruction, no thinking config
* - `system_instruction` is accepted for non-DR agents; DR agents reject it and we prepend to `input` instead.
* - No `tools` (yet), no model-side `generation_config`.
*
* Docs: https://ai.google.dev/gemini-api/docs/interactions
* https://ai.google.dev/api/interactions-api
* Source-of-truth snapshots (for diffing across upstream changes, see ./_upstream/sync.sh):
* ./_upstream/gemini.interactions.spec.md - the formal API reference
* ./_upstream/gemini.interactions.guide.md - the prose guide
* ./_upstream/gemini.deep-research.guide.md - the Deep Research agent guide
*/
export namespace GeminiInteractionsWire_API_Interactions {
@@ -51,15 +53,66 @@ export namespace GeminiInteractionsWire_API_Interactions {
]),
});
// agent_config: polymorphic discriminated union on `type` (formal spec: AgentConfig).
// Only applicable when `agent` is set (mutually exclusive with `generation_config`, which is the
// model-path equivalent). See ./_upstream/gemini.interactions.spec.md#agent_config.
//
// Variants:
// - DynamicAgentConfig { type: 'dynamic' }
// Dynamic agents - no tunable config documented beyond the discriminator.
// - DeepResearchAgentConfig { type: 'deep-research', ... }
// See ./_upstream/gemini.deep-research.guide.md#agent-configuration for defaults and semantics.
const _DynamicAgentConfig_schema = z.object({
type: z.literal('dynamic'),
});
const _DeepResearchAgentConfig_schema = z.object({
type: z.literal('deep-research'),
thinking_summaries: z.enum(['auto', 'none']).optional(), // default 'none'. 'auto' emits intermediate reasoning events during streaming.
visualization: z.enum(['auto', 'off']).optional(), // default 'auto'. 'auto' lets the agent generate charts/images as part of the output.
collaborative_planning: z.boolean().optional(), // default false. Plan-then-execute: the agent returns a research plan that the user confirms in a follow-up interaction.
});
export const AgentConfig_schema = z.discriminatedUnion('type', [
_DynamicAgentConfig_schema,
_DeepResearchAgentConfig_schema,
]);
// RequestBody_schema: POST /v1beta/interactions body.
//
// Cross-field constraints (from the formal spec):
// - `agent` XOR `model` is REQUIRED. We only model the agent path here.
// - `agent_config` XOR `generation_config` - config object is picked by path:
// `agent`+`agent_config` OR `model`+`generation_config`. Never both.
// - `system_instruction` is top-level (not inside config) but rejected by deep-research agents.
// - `previous_interaction_id` carries conversation history but NOT per-interaction knobs:
// `tools`, `system_instruction`, and `generation_config` are interaction-scoped and must be
// re-sent each turn.
export const RequestBody_schema = z.object({
agent: z.string(), // e.g. 'deep-research-pro-preview-12-2025' (note: we send bare id, without 'models/' prefix)
// --- Target: what to call ---
agent: z.string(), // Spec: agent is AgentOption (optional, required if `model` not provided). Send the BARE id; no 'models/' prefix.
// model: z.string(), // alternative path - not used here; would require generation_config instead of agent_config
// --- Inputs ---
input: z.union([
z.string(), // single-turn text convenience
z.array(InputContentPart_schema), // single-turn multimodal
z.array(Turn_schema), // stateless multi-turn history
z.string(), // single-turn text convenience (string shortcut for a user-text turn)
z.array(InputContentPart_schema), // single-turn multimodal (text + image parts for one user turn)
z.array(Turn_schema), // stateless multi-turn history (role-tagged turns)
]),
background: z.literal(true), // required for agents
store: z.literal(true), // required when background=true; we DELETE after completion to minimize retention
system_instruction: z.string().optional(), // NOT supported by deep-research agents (tested 2026-04-23, API: 'not supported for the deep-research-* agent') - for those, prepend to `input` instead.
// --- Config (picks the agent or model path) ---
agent_config: AgentConfig_schema.optional(), // Polymorphic on `type`: 'deep-research' | 'dynamic'. MUTUALLY EXCLUSIVE with `generation_config` (model path). Enables thought-summary streaming, visualizations, collaborative planning.
// generation_config: GenerationConfig_schema.optional(), // model path - not modeled here yet
// --- Runtime flags (literals below force correct behavior at the adapter layer) ---
stream: z.boolean().optional(), // SSE streaming - when true, POST returns an event-stream (interaction.start, content.start/delta/stop, interaction.complete, done). On reattach, GET ?stream=true replays the full event sequence (we do not send `last_event_id` - full replay is the intentional semantic; see poller comment).
store: z.literal(true), // spec-optional; we lock to `true` so the interaction is retrievable post-run (replay via GET stream, resume via `last_event_id`). Required alongside `background=true` for agents per the DR guide.
background: z.literal(true), // spec-optional; DR agents REQUIRE `true` ('Agents are required to use background=true'). Locked to true to prevent accidental sync-mode sends.
// --- Multi-turn continuation ---
previous_interaction_id: z.string().optional(), // reuses prior interaction's stored inputs/outputs. Per-turn knobs (tools, system_instruction, generation_config) are NOT carried and must be re-sent.
});
@@ -164,8 +217,10 @@ export namespace GeminiInteractionsWire_API_Interactions {
// -- Usage (populated in the terminal frame) --
// Modality enum: per spec ResponseModality - ISO 8601 in descriptions clarifies this is the
// runtime modality, not the model's response_modalities request field.
const UsageByModality_schema = z.object({
modality: z.string(), // 'text' | 'image' | 'audio' | ...
modality: z.enum(['text', 'image', 'audio', 'video', 'document']).or(z.string()), // permissive for future modalities
tokens: z.number(),
});
@@ -175,17 +230,189 @@ export namespace GeminiInteractionsWire_API_Interactions {
total_cached_tokens: z.number().optional(),
total_output_tokens: z.number().optional(),
total_thought_tokens: z.number().optional(),
total_tool_use_tokens: z.number().optional(), // Deep Research: tokens consumed by internal tool calls (web search, etc.)
total_tool_use_tokens: z.number().optional(), // Deep Research: tokens consumed by internal tool calls (web search, etc.)
input_tokens_by_modality: z.array(UsageByModality_schema).optional(),
cached_tokens_by_modality: z.array(UsageByModality_schema).optional(), // spec: cached-tokens breakdown (input subset)
output_tokens_by_modality: z.array(UsageByModality_schema).optional(),
tool_use_tokens_by_modality: z.array(UsageByModality_schema).optional(), // spec: tool-use breakdown - DR search/urlcontext/code-exec consumption per modality
});
export const Interaction_schema = z.object({
// Full Interaction resource (spec: Resource:Interaction). We model ALL required fields and the
// ones we observe in responses; remaining optional echo fields are left as `.passthrough()` - see
// `Output_schema` comment for rationale.
export const Interaction_schema = z.looseObject({
// required (all marked "Required. Output only." in spec)
id: z.string(),
status: Status_enum,
created: z.string().optional(), // ISO 8601 - spec says Required but marked optional-with-default upstream; we keep optional for forward-compat
updated: z.string().optional(), // ISO 8601
// commonly-observed fields
role: z.string().optional(), // 'agent' | 'user' | ... - output only
object: z.string().optional(), // 'interaction' literal observed in responses
agent: z.string().optional(), // echoed back on agent-path interactions
model: z.string().optional(), // echoed back on model-path interactions
// content + metrics
outputs: z.array(Output_schema).optional(), // absent until first content arrives
usage: Usage_schema.optional(), // populated in terminal frames (completed/failed/cancelled)
// We ignore model/agent echo for now
usage: Usage_schema.optional(), // populated in terminal frames (completed/failed/cancelled/incomplete)
// (remaining echo fields - system_instruction, tools, agent_config, previous_interaction_id,
// input, response_modalities, response_format, etc. - pass through via looseObject for now)
});
// -- SSE Stream Events --
//
// When the POST (or resume GET) is sent with `stream=true`, the API returns `text/event-stream`
// with the following frames (captured empirically 2026-04-23, see _upstream/gemini.deep-research.guide.md#streaming):
//
// event: interaction.start one at the top; carries { interaction: {id, status, agent, ...} }
// event: interaction.status_update status transitions (in_progress, completed, ...)
// event: content.start opens a content block at {index, content:{type:'thought'|'text'|...}}
// event: content.delta incremental data for index; polymorphic delta (see below); some carry `event_id` for resume
// event: content.stop closes a content block at index
// event: error spec shape: { error?: { code, message } }; observed with EMPTY payload in Beta - non-fatal, continue
// event: interaction.complete final snapshot carrying the full Interaction incl. usage
// event: done terminator with data: [DONE] (OpenAI-style)
//
// Resume: GET /v1beta/interactions/{id}?stream=true
// Spec also allows `&last_event_id=<event_id>` for incremental resume, but we do NOT use it.
// Full replay from the beginning is the intentional semantic - the client's ContentReassembler
// REPLACES message content on reattach, so partial resume would be a mismatch. Works identically
// on in-progress, completed, failed, and cancelled interactions (within Gemini's retention window).
// --- ContentDeltaData variants (spec: polymorphic on `type`) ---
//
// Spec defines: text, image, audio, document, video, thought_summary, thought_signature,
// text_annotation, function_call, + tool-call/result variants (code_execution_*, url_context_*,
// google_search_*, google_maps_*, file_search_*, mcp_server_tool_*). We model variants we emit
// to the UI; unknown ones fail safeParse at the parser and are silently dropped (mirrors the
// INTERNAL_OUTPUT_TYPES policy on the non-streaming path).
const TextDelta_schema = z.object({
type: z.literal('text'),
text: z.string(),
});
const ThoughtSummaryDelta_schema = z.object({
type: z.literal('thought_summary'),
// Spec: ThoughtSummaryContent - polymorphic (only `text` variant documented). Optional per spec.
content: z.object({
type: z.literal('text'),
text: z.string(),
}).optional(),
});
// Backend validation hash - routed to `pt.setReasoningSignature` when present.
const ThoughtSignatureDelta_schema = z.object({
type: z.literal('thought_signature'),
signature: z.string().optional(),
});
// text_annotation arrives as its own delta on the same index as a text block, carrying citation metadata for the text already emitted.
const TextAnnotationDelta_schema = z.object({
type: z.literal('text_annotation'),
// Spec: Annotation - polymorphic on `type` (url_citation, file_citation, place_citation). Optional per spec.
annotations: z.array(z.looseObject({ type: z.string() })).optional(), // validated per-item via UrlCitationAnnotation_schema
});
const ImageDelta_schema = z.object({
type: z.literal('image'),
data: z.string().optional(), // base64
uri: z.string().optional(),
mime_type: z.string().optional(), // spec enum: image/png, image/jpeg, image/webp, image/heic, image/heif, image/gif, image/bmp, image/tiff
resolution: z.enum(['low', 'medium', 'high', 'ultra_high']).optional(), // spec: MediaResolution
});
const AudioDelta_schema = z.object({
type: z.literal('audio'),
data: z.string().optional(),
uri: z.string().optional(),
mime_type: z.string().optional(), // spec enum: audio/wav, audio/mp3, audio/aiff, audio/aac, audio/ogg, audio/flac, audio/mpeg, audio/m4a, audio/l16, audio/opus, audio/alaw, audio/mulaw
rate: z.number().optional(),
channels: z.number().optional(),
});
// Delta discriminated union - covers variants we emit to the UI. Unknown variants (document,
// video, function_call, + tool-call/result) fail safeParse in the parser and are silently dropped.
export const StreamDelta_schema = z.discriminatedUnion('type', [
TextDelta_schema,
ImageDelta_schema,
AudioDelta_schema,
ThoughtSummaryDelta_schema,
ThoughtSignatureDelta_schema,
TextAnnotationDelta_schema,
]);
// --- SSE event data payloads (spec: InteractionSseEvent - polymorphic on `event_type`) ---
//
// Per spec, EVERY variant carries an OPTIONAL `event_id` resume cursor. At runtime only a subset
// of events actually include one, so the schema accepts it on all but our parser uses whichever
// is present to advance the cursor.
const InteractionStart_event_schema = z.object({
event_type: z.literal('interaction.start'),
interaction: Interaction_schema.partial().extend({ id: z.string(), status: Status_enum.optional() }),
event_id: z.string().optional(),
});
const InteractionStatusUpdate_event_schema = z.object({
event_type: z.literal('interaction.status_update'),
interaction_id: z.string(),
status: Status_enum,
event_id: z.string().optional(),
});
const ContentStart_event_schema = z.object({
event_type: z.literal('content.start'),
index: z.number(),
content: z.looseObject({ type: z.string() }), // spec: Content (polymorphic)
event_id: z.string().optional(),
});
const ContentDelta_event_schema = z.object({
event_type: z.literal('content.delta'),
index: z.number(),
delta: z.looseObject({}), // spec: ContentDeltaData - tolerant at ingest; parsed later via StreamDelta_schema.safeParse
event_id: z.string().optional(),
});
const ContentStop_event_schema = z.object({
event_type: z.literal('content.stop'),
index: z.number(),
event_id: z.string().optional(),
});
const Error_event_schema = z.object({
event_type: z.literal('error'),
// Spec: Error (optional) - { code?: string (URI), message?: string }. Observed empty in Beta.
error: z.object({
code: z.string().optional(),
message: z.string().optional(),
}).optional(),
event_id: z.string().optional(),
});
const InteractionComplete_event_schema = z.object({
event_type: z.literal('interaction.complete'),
// Spec note: "The completed interaction with EMPTY OUTPUTS to reduce the payload size. Use the
// preceding ContentDelta events for the actual output." We rely on `status` + `usage` here.
interaction: Interaction_schema,
event_id: z.string().optional(),
});
// `event: done` carries the literal string `[DONE]` instead of JSON; handled specially in the parser
/** Discriminated union of JSON-bodied SSE events. The `done` terminator is handled as a string-valued special case in the parser. */
export const StreamEvent_schema = z.discriminatedUnion('event_type', [
InteractionStart_event_schema,
InteractionStatusUpdate_event_schema,
ContentStart_event_schema,
ContentDelta_event_schema,
ContentStop_event_schema,
Error_event_schema,
InteractionComplete_event_schema,
]);
}