mirror of
https://github.com/enricoros/big-AGI.git
synced 2026-05-10 21:50:14 -07:00
Compare commits
8 Commits
0e587c4889
...
08506abaee
| Author | SHA1 | Date | |
|---|---|---|---|
| 08506abaee | |||
| 078c80d572 | |||
| b1c9f6be45 | |||
| fc497e9beb | |||
| 6ad01fd981 | |||
| 44ed8664c8 | |||
| 4cb16ee715 | |||
| 2dc9b87cda |
@@ -24,7 +24,7 @@ import { animationSpinHalfPause } from '~/common/util/animUtils';
|
||||
// configuration
|
||||
const DATASTREAM_VISUALIZATION_DELAY = Math.round(2 * Math.PI * 1000);
|
||||
const MODELOP_TIMEOUT_DELAY = 5; // seconds
|
||||
const MODELOP_TIMEOUT_LIMIT = 3600; // seconds - 1hr for long ops, such as Gemini Deep Research
|
||||
const MODELOP_TIMEOUT_LIMIT = 6 * 60 * 60; // seconds - 6hr for long ops, such as Gemini Deep Research
|
||||
|
||||
const modelOperationConfig: Record<DVoidPlaceholderMOp['mot'], { Icon: React.ElementType, color: ColorPaletteProp }> = {
|
||||
'search-web': { Icon: SearchRoundedIcon, color: 'neutral' },
|
||||
|
||||
@@ -336,7 +336,7 @@ export class ContentReassembler {
|
||||
// PartParticleOp
|
||||
case 'p' in op:
|
||||
// heuristics to remove the placeholder if real user-destined content arrives
|
||||
if (op.p !== '❤' && op.p !== 'vp' && op.p !== 'urlc' && op.p !== 'svs')
|
||||
if (op.p !== '❤' && op.p !== 'vp' && op.p !== 'urlc' && op.p !== 'hres' && op.p !== 'svs' && op.p !== 'tr_' && op.p !== 'trs')
|
||||
await this._removeLastVoidPlaceholderDelayed();
|
||||
switch (op.p) {
|
||||
case '❤':
|
||||
@@ -880,6 +880,11 @@ export class ContentReassembler {
|
||||
}
|
||||
|
||||
private async _removeLastVoidPlaceholderDelayed(): Promise<boolean> {
|
||||
// NOTE: disabled because it also introduces visual bugs - instead added tr_ and trs exceptions on a caller
|
||||
// function _isVPNotOp(f: DMessageFragment) {
|
||||
// // make 'vp' fragments eligible for removal only if they have no opLog entries, like tool invocations (the explicit counterpart)
|
||||
// return isVoidPlaceholderFragment(f) && !f.part.opLog?.length;
|
||||
// }
|
||||
// skip if none
|
||||
if (this.S.fragments.findLastIndex(isVoidPlaceholderFragment) < 0) return false;
|
||||
|
||||
|
||||
+31
-12
@@ -17,8 +17,13 @@ type TInputPart = z.infer<typeof GeminiInteractionsWire_API_Interactions.InputCo
|
||||
*
|
||||
* Scope:
|
||||
* - Stateless multi-turn: `chatSequence` is flattened to role-tagged turns and sent as `input`.
|
||||
* - `systemMessage` text (if any) is prepended to the first user turn; background agents do not
|
||||
* accept a dedicated `system_instruction`.
|
||||
* - `systemMessage` text: routing differs by agent type
|
||||
* - deep-research agents REJECT the top-level `system_instruction` field (tested 2026-04-23,
|
||||
* API error: "not supported for the deep-research-* agent. Please include any specific
|
||||
* instructions in the input prompt instead"), so we prepend the system text into the first
|
||||
* user turn.
|
||||
* - non-DR agents (future MCP/Computer Use) use the native `system_instruction` field, matching
|
||||
* the gemini.generateContent.ts convention for clean separation.
|
||||
* - Multimodal: user and model turns carry images as content-part arrays when any image is present,
|
||||
* otherwise stay as plain strings (preserves the API's convenience shape).
|
||||
* - Doc parts render as text via `approxDocPart_To_String`; in-reference-to XML is prepended to the user turn.
|
||||
@@ -30,8 +35,14 @@ export function aixToGeminiInteractionsCreate(model: AixAPI_Model, chatGenerateR
|
||||
// Normalize: move any 'spillable' system parts (e.g. images) into a synthetic user message up front
|
||||
const chatGenerate = aixSpillSystemToUser(chatGenerateRaw);
|
||||
|
||||
// Extract leftover system text (to be prepended to the first user turn)
|
||||
const systemPrefix = _collectSystemText(chatGenerate.systemMessage);
|
||||
// The API expects a bare agent id (no 'models/' prefix)
|
||||
const agent = model.id.startsWith('models/') ? model.id.slice('models/'.length) : model.id;
|
||||
|
||||
// Deep Research agents reject `system_instruction` at the top level - we prepend to input instead
|
||||
const isDeepResearch = agent.includes('deep-research');
|
||||
|
||||
// Extract flattened system text (consumed below - DR: prepend to first user turn; else: native field)
|
||||
const systemText = _collectSystemText(chatGenerate.systemMessage);
|
||||
|
||||
// Walk chatSequence -> turns
|
||||
const turns: TTurn[] = [];
|
||||
@@ -48,11 +59,11 @@ export function aixToGeminiInteractionsCreate(model: AixAPI_Model, chatGenerateR
|
||||
if (!turns.length)
|
||||
throw new Error('Gemini Interactions: no usable turns (Deep Research agents require at least one user message)');
|
||||
|
||||
// Prepend system prefix to the FIRST user turn (skip if none exists)
|
||||
if (systemPrefix) {
|
||||
// DR only: prepend system text into the first user turn (native `system_instruction` rejected)
|
||||
if (isDeepResearch && systemText) {
|
||||
const firstUserIdx = turns.findIndex(t => t.role === 'user');
|
||||
if (firstUserIdx >= 0)
|
||||
turns[firstUserIdx] = { role: 'user', content: _prependSystemText(turns[firstUserIdx].content, systemPrefix) };
|
||||
turns[firstUserIdx] = { role: 'user', content: _prependSystemText(turns[firstUserIdx].content, systemText) };
|
||||
}
|
||||
|
||||
// Sanity: the API expects the last turn to be 'user' (we're asking the model to respond)
|
||||
@@ -64,14 +75,22 @@ export function aixToGeminiInteractionsCreate(model: AixAPI_Model, chatGenerateR
|
||||
? turns[0].content
|
||||
: turns;
|
||||
|
||||
// The API expects a bare agent id (no 'models/' prefix)
|
||||
const agent = model.id.startsWith('models/') ? model.id.slice('models/'.length) : model.id;
|
||||
|
||||
return {
|
||||
agent,
|
||||
input,
|
||||
background: true,
|
||||
store: true, // API rejects store=false with background=true; the poller issues DELETE after terminal status
|
||||
stream: true, // SSE streaming - upstream returns event-stream (interaction.start, content.start/delta/stop, interaction.complete, done). Required for live thought_summary deltas.
|
||||
// FIXME: we only support SSE streaming parsing - we used to support parsing of the final answer (with the GET) but not anymore
|
||||
background: true, // required by agents; also required alongside stream=true
|
||||
store: true, // keep the interaction alive so clients can reattach via SSE replay within Gemini's retention window (1d free / 55d paid)
|
||||
...(isDeepResearch && {
|
||||
agent_config: {
|
||||
type: 'deep-research',
|
||||
thinking_summaries: 'auto', // Enable thought_summary blocks - without this the API would not emit summaries during streaming
|
||||
// visualization defaults to 'auto' upstream; leave unset to keep the default (agent may generate charts/images).
|
||||
},
|
||||
}),
|
||||
// non-DR agents: use native system_instruction field (matches gemini.generateContent.ts convention)
|
||||
...(!isDeepResearch && systemText && { system_instruction: systemText }),
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -4,12 +4,13 @@ import { bedrockAccessAsync, bedrockResolveRegion, bedrockURLMantle, bedrockURLR
|
||||
import { geminiAccess } from '~/modules/llms/server/gemini/gemini.access';
|
||||
import { ollamaAccess } from '~/modules/llms/server/ollama/ollama.access';
|
||||
|
||||
import { fetchResponseOrTRPCThrow } from '~/server/trpc/trpc.router.fetchers';
|
||||
|
||||
import type { AixAPI_Access, AixAPI_Model, AixAPI_ResumeHandle, AixAPIChatGenerate_Request, AixWire_Particles } from '../../api/aix.wiretypes';
|
||||
import type { AixDemuxers } from '../stream.demuxers';
|
||||
|
||||
import { GeminiWire_API_Generate_Content } from '../wiretypes/gemini.wiretypes';
|
||||
|
||||
import { createGeminiInteractionsConnect, createGeminiInteractionsResumeConnect } from './connectors/gemini.interactionsPoller';
|
||||
import { GeminiInteractionsWire_API_Interactions } from '../wiretypes/gemini.interactions.wiretypes';
|
||||
|
||||
import { aixAnthropicHostedFeatures, aixToAnthropicMessageCreate } from './adapters/anthropic.messageCreate';
|
||||
import { aixToBedrockConverse } from './adapters/bedrock.converse';
|
||||
@@ -170,14 +171,25 @@ export async function createChatGenerateDispatch(access: AixAPI_Access, model: A
|
||||
case 'gemini':
|
||||
const requestedModelName = model.id.replace('models/', '');
|
||||
|
||||
// [Gemini Interactions API - ALPHA TEST]
|
||||
if (model.vndGeminiAPI === 'interactions-agent')
|
||||
// [Gemini Interactions API - ALPHA TEST] SSE-native: POST with stream=true, upstream returns event-stream we pipe through the fast-sse demuxer.
|
||||
if (model.vndGeminiAPI === 'interactions-agent') {
|
||||
const request: ChatGenerateDispatchRequest = {
|
||||
...geminiAccess(access, null, GeminiInteractionsWire_API_Interactions.postPath, false),
|
||||
method: 'POST',
|
||||
body: aixToGeminiInteractionsCreate(model, chatGenerate),
|
||||
};
|
||||
return {
|
||||
// { request, customConnect }
|
||||
...createGeminiInteractionsConnect(access, aixToGeminiInteractionsCreate(model, chatGenerate)),
|
||||
request,
|
||||
// Custom-connect so we can neutralize the outer retrier on HTTP errors: a retried POST would create a second (billable) Deep Research interaction upstream
|
||||
customConnect: (signal) => fetchResponseOrTRPCThrow({ ...request, signal, name: 'Aix.Gemini.Interactions.create', throwWithoutName: true })
|
||||
.catch((error: any) => {
|
||||
if (signal.aborted) throw error; // preserve abort identity for the executor's abort classifier
|
||||
throw new Error(`Gemini Interactions POST: ${error?.message || 'upstream error'}`); // rewrapping TRPCFetcherError as plain Error makes the retrier treat it as non-retryable
|
||||
}),
|
||||
demuxerFormat: 'fast-sse',
|
||||
chatGenerateParse: createGeminiInteractionsParser(requestedModelName),
|
||||
};
|
||||
}
|
||||
|
||||
const useV1Alpha = false; // !!model.vndGeminiShowThoughts || model.vndGeminiThinkingBudget !== undefined;
|
||||
return {
|
||||
@@ -307,16 +319,17 @@ export async function createChatGenerateResumeDispatch(access: AixAPI_Access, re
|
||||
chatGenerateParse: streaming ? createOpenAIResponsesEventParser() : createOpenAIResponseParserNS(),
|
||||
};
|
||||
|
||||
case 'gemini':
|
||||
// [Gemini Interactions] Reattach to an in-progress Deep Research interaction by polling GET.
|
||||
case 'gemini': {
|
||||
// [Gemini Interactions] Reattach via SSE stream - GET /interactions/{id}?stream=true replays all events from the start (intentional - client's ContentReassembler replaces message content on reattach; partial resume via last_event_id is deliberately NOT used).
|
||||
if (resumeHandle.uht !== 'vnd.gem.interactions')
|
||||
throw new Error(`Resume handle mismatch for gemini: expected 'vnd.gem.interactions', got '${resumeHandle.uht}'`);
|
||||
const { url: _baseUrl, headers: _headers } = geminiAccess(access, null, GeminiInteractionsWire_API_Interactions.getPath(resumeHandle.runId /* Gemini interaction.id */), false);
|
||||
return {
|
||||
// { request, customConnect }
|
||||
...createGeminiInteractionsResumeConnect(access, resumeHandle.runId /* Gemini interaction.id */),
|
||||
request: { url: `${_baseUrl}${_baseUrl.includes('?') ? '&' : '?'}stream=true`, method: 'GET', headers: _headers },
|
||||
demuxerFormat: 'fast-sse',
|
||||
chatGenerateParse: createGeminiInteractionsParser(null /* model name unknown at resume time - caller's DMessage already has it */),
|
||||
};
|
||||
}
|
||||
|
||||
default:
|
||||
const _exhaustiveCheck: never = dialect;
|
||||
|
||||
@@ -1,245 +0,0 @@
|
||||
import type * as z from 'zod/v4';
|
||||
|
||||
import { fetchJsonOrTRPCThrow, fetchResponseOrTRPCThrow } from '~/server/trpc/trpc.router.fetchers';
|
||||
|
||||
import type { GeminiAccessSchema } from '~/modules/llms/server/gemini/gemini.access';
|
||||
import { geminiAccess } from '~/modules/llms/server/gemini/gemini.access';
|
||||
|
||||
import { ChatGenerateDispatch } from '../chatGenerate.dispatch';
|
||||
import { GeminiInteractionsWire_API_Interactions } from '../../wiretypes/gemini.interactions.wiretypes';
|
||||
|
||||
|
||||
// configuration
|
||||
const INITIAL_POLL_DELAY_MS = 3_000; // first poll happens this long after the POST accepted the job
|
||||
const STEADY_POLL_INTERVAL_MS = 10_000; // subsequent polls
|
||||
const MAX_SLEEP_CHUNK_MS = 1_000; // wake often to honor abort promptly
|
||||
|
||||
|
||||
type TRequestBody = z.infer<typeof GeminiInteractionsWire_API_Interactions.RequestBody_schema>;
|
||||
type TInteraction = z.infer<typeof GeminiInteractionsWire_API_Interactions.Interaction_schema>;
|
||||
|
||||
|
||||
/**
|
||||
* Gemini Interactions API - Poll-to-Stream bridge (CREATE flow).
|
||||
*
|
||||
* Returns a `customConnect` function suitable for `ChatGenerateDispatch.customConnect`.
|
||||
*
|
||||
* Flow:
|
||||
* 1. POST /v1beta/interactions with `background: true` - returns the interaction id (usually with status 'in_progress')
|
||||
* 2. Return a `Response` whose body is a ReadableStream of SSE frames
|
||||
* 3. Background producer polls GET /v1beta/interactions/{id} every few seconds until status is terminal
|
||||
* 4. Each poll writes one SSE frame (the full Interaction JSON) - the parser diffs vs. prior state
|
||||
* 5. DELETE only on natural terminal status (completed/failed/cancelled). On client abort or stream
|
||||
* error we leave the interaction ALIVE upstream so the client can reattach across reloads via
|
||||
* `createGeminiInteractionsResumeConnect`. Gemini auto-cleans after its retention window (1d free / 55d paid).
|
||||
* WARNING: this trades off orphan risk for reattach viability - if the client never reattaches,
|
||||
* the upstream agent keeps running and consuming tokens until the retention window expires.
|
||||
*/
|
||||
export function createGeminiInteractionsConnect(access: GeminiAccessSchema, body: TRequestBody): Pick<ChatGenerateDispatch, 'request' | 'customConnect'> {
|
||||
|
||||
// compute URL/headers once - exposed as `request` for debug echo, and reused by the POST
|
||||
const { url, headers } = geminiAccess(access, null, GeminiInteractionsWire_API_Interactions.postPath, false);
|
||||
const request = { url, headers, method: 'POST' as const, body };
|
||||
|
||||
const connect = async function (signal: AbortSignal): Promise<Response> {
|
||||
|
||||
// 1. Initial POST - reuses the same url/headers we expose via `request`
|
||||
const initial = await _postInteractionAt(url, headers, body, signal);
|
||||
|
||||
// 2. Build the streaming response seeded with the initial snapshot
|
||||
return _buildStreamingResponse(access, initial.id, initial, signal);
|
||||
};
|
||||
|
||||
return { request, customConnect: connect };
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Gemini Interactions API - Poll-to-Stream bridge (RESUME flow).
|
||||
*
|
||||
* Same shape as `createGeminiInteractionsConnect`, but skips the POST step and starts
|
||||
* directly from GET-polling an existing interaction id. Used by `createChatGenerateResumeDispatch`
|
||||
* when reattaching to an in-progress Deep Research run after a page reload.
|
||||
*/
|
||||
export function createGeminiInteractionsResumeConnect(access: GeminiAccessSchema, interactionId: string): Pick<ChatGenerateDispatch, 'request' | 'customConnect'> {
|
||||
|
||||
// compute URL/headers for debug echo - real I/O happens in customConnect below (GET-poll loop)
|
||||
const requestDummy = {
|
||||
...geminiAccess(access, null, GeminiInteractionsWire_API_Interactions.getPath(interactionId), false),
|
||||
method: 'GET' as const,
|
||||
};
|
||||
|
||||
const connect = async function (signal: AbortSignal): Promise<Response> {
|
||||
|
||||
// no POST - jump straight into polling this known interaction id
|
||||
return _buildStreamingResponse(access, interactionId, null, signal);
|
||||
};
|
||||
|
||||
return { request: requestDummy, customConnect: connect };
|
||||
}
|
||||
|
||||
|
||||
// --- Shared stream-building logic ---
|
||||
|
||||
function _buildStreamingResponse(
|
||||
access: GeminiAccessSchema,
|
||||
interactionId: string,
|
||||
initial: TInteraction | null,
|
||||
signal: AbortSignal,
|
||||
): Response {
|
||||
|
||||
const encoder = new TextEncoder();
|
||||
const stream = new ReadableStream<Uint8Array>({
|
||||
start(controller) {
|
||||
|
||||
const emitSseFrame = (interaction: TInteraction) => {
|
||||
const data = JSON.stringify(interaction);
|
||||
controller.enqueue(encoder.encode(`data: ${data}\n\n`));
|
||||
};
|
||||
|
||||
void (async () => {
|
||||
// `store: true` is required by the API when background=true, so Gemini retains the interaction.
|
||||
// WARNING: orphan risk - we only DELETE on natural terminal status. On abort/error we leave the
|
||||
// interaction alive upstream so the client can reattach across reloads. If the client never
|
||||
// comes back, the agent keeps running and consuming tokens until Gemini's retention window
|
||||
// (1d free / 55d paid) auto-cleans it.
|
||||
let shouldDelete = false;
|
||||
try {
|
||||
|
||||
// First frame (create flow only): seed with the POST response
|
||||
if (initial) {
|
||||
emitSseFrame(initial);
|
||||
if (_isTerminalStatus(initial.status)) {
|
||||
shouldDelete = true;
|
||||
controller.close();
|
||||
return;
|
||||
}
|
||||
await _sleepOrAbort(INITIAL_POLL_DELAY_MS, signal);
|
||||
}
|
||||
|
||||
// Polling loop
|
||||
while (!signal.aborted) {
|
||||
const snapshot = await _getInteraction(access, interactionId, signal);
|
||||
emitSseFrame(snapshot);
|
||||
if (_isTerminalStatus(snapshot.status)) {
|
||||
shouldDelete = true;
|
||||
controller.close();
|
||||
return;
|
||||
}
|
||||
await _sleepOrAbort(STEADY_POLL_INTERVAL_MS, signal);
|
||||
}
|
||||
// aborted by client (reload or user cancel): close cleanly, do NOT delete so the client can reattach
|
||||
controller.close();
|
||||
|
||||
} catch (err: any) {
|
||||
// surfaces to the stream consumer; executor treats as 'dispatch-read' issue
|
||||
// do NOT delete on error - the interaction may still be running upstream and the client may reattach
|
||||
if (signal.aborted)
|
||||
controller.close();
|
||||
else
|
||||
controller.error(err);
|
||||
} finally {
|
||||
if (shouldDelete)
|
||||
void _deleteInteraction(access, interactionId).catch(() => { /* ignore */ });
|
||||
}
|
||||
})();
|
||||
},
|
||||
cancel(_reason) {
|
||||
// the consumer cancelled - upstream abort will have fired too
|
||||
},
|
||||
});
|
||||
|
||||
return new Response(stream, {
|
||||
status: 200,
|
||||
headers: { 'content-type': 'text/event-stream; charset=utf-8' },
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
// --- HTTP helpers ---
|
||||
//
|
||||
// We use the unified `fetchJsonOrTRPCThrow` / `fetchResponseOrTRPCThrow` helpers from
|
||||
// `~/server/trpc/trpc.router.fetchers`. They are isomorphic (CSF-safe), auto-check
|
||||
// `response.ok`, and throw a well-shaped `TRPCFetcherError` with category + httpStatus.
|
||||
//
|
||||
// !! RETRY LANDMINE !! The outer executor wraps `customConnect` in
|
||||
// `fetchWithAbortableConnectionRetry`, which retries on any TRPCFetcherError of
|
||||
// category 'connection' or 'http' (502/503/429). Letting such an error propagate
|
||||
// out of `customConnect` would re-execute the initial POST, creating a DUPLICATE
|
||||
// Deep Research interaction server-side. So `_postInteraction` catches and rewraps
|
||||
// TRPCFetcherError as plain Error (retrier ignores unknown errors).
|
||||
//
|
||||
// `_getInteraction` and `_deleteInteraction` run INSIDE the ReadableStream producer,
|
||||
// which is beyond the retry boundary - their TRPCFetcherErrors are surfaced via
|
||||
// `controller.error(err)` -> executor's `dispatch-read` path, and `safeErrorString`
|
||||
// renders the .message nicely. So those can let TRPCFetcherError through.
|
||||
|
||||
async function _postInteractionAt(url: string, headers: HeadersInit, body: TRequestBody, signal: AbortSignal): Promise<TInteraction> {
|
||||
let rawJson: Record<string, unknown>;
|
||||
try {
|
||||
rawJson = await fetchJsonOrTRPCThrow<Record<string, unknown>, TRequestBody>({
|
||||
url, method: 'POST', headers, body,
|
||||
signal, name: 'Gemini.Interactions.create', throwWithoutName: true,
|
||||
});
|
||||
} catch (error: any) {
|
||||
// Preserve abort identity (name='TRPCFetcherError') so the executor's abort classifier
|
||||
// can route this to the clean 'done-dispatch-aborted' path. The retrier already
|
||||
// short-circuits on signal.aborted, so the duplicate-POST concern below doesn't apply.
|
||||
if (signal.aborted) throw error;
|
||||
// Rewrap TRPCFetcherError -> plain Error so the outer retry wrapper does NOT
|
||||
// re-execute this customConnect (which would duplicate the Interaction).
|
||||
throw new Error(`Gemini Interactions POST: ${error?.message || 'upstream error'}`);
|
||||
}
|
||||
return _validateInteraction(rawJson, 'POST');
|
||||
}
|
||||
|
||||
async function _getInteraction(access: GeminiAccessSchema, interactionId: string, signal: AbortSignal): Promise<TInteraction> {
|
||||
const { url, headers } = geminiAccess(access, null, GeminiInteractionsWire_API_Interactions.getPath(interactionId), false);
|
||||
const rawJson = await fetchJsonOrTRPCThrow<Record<string, unknown>>({
|
||||
url, method: 'GET', headers,
|
||||
signal, name: 'Gemini.Interactions.get', throwWithoutName: true,
|
||||
});
|
||||
return _validateInteraction(rawJson, 'GET');
|
||||
}
|
||||
|
||||
async function _deleteInteraction(access: GeminiAccessSchema, interactionId: string): Promise<void> {
|
||||
// Best-effort: own short timeout, caller swallows any throw
|
||||
const ac = new AbortController();
|
||||
const timer = setTimeout(() => ac.abort(), 3_000);
|
||||
try {
|
||||
const { url, headers } = geminiAccess(access, null, GeminiInteractionsWire_API_Interactions.deletePath(interactionId), false);
|
||||
await fetchResponseOrTRPCThrow({
|
||||
url, method: 'DELETE', headers,
|
||||
signal: ac.signal, name: 'Gemini.Interactions.delete', throwWithoutName: true,
|
||||
});
|
||||
} finally {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
}
|
||||
|
||||
function _validateInteraction(rawJson: unknown, method: 'POST' | 'GET'): TInteraction {
|
||||
const parsed = GeminiInteractionsWire_API_Interactions.Interaction_schema.safeParse(rawJson);
|
||||
if (!parsed.success)
|
||||
throw new Error(`Gemini Interactions ${method}: unexpected response shape (${parsed.error.message})`);
|
||||
return parsed.data;
|
||||
}
|
||||
|
||||
|
||||
// --- Small utils ---
|
||||
|
||||
function _isTerminalStatus(status: TInteraction['status']): boolean {
|
||||
return status === 'completed' || status === 'failed' || status === 'cancelled';
|
||||
}
|
||||
|
||||
async function _sleepOrAbort(totalMs: number, signal: AbortSignal): Promise<void> {
|
||||
let remaining = totalMs;
|
||||
while (remaining > 0 && !signal.aborted) {
|
||||
const chunk = Math.min(remaining, MAX_SLEEP_CHUNK_MS);
|
||||
await new Promise<void>(resolve => {
|
||||
const t = setTimeout(() => { signal.removeEventListener('abort', onAbort); resolve(); }, chunk);
|
||||
const onAbort = () => { clearTimeout(t); resolve(); };
|
||||
signal.addEventListener('abort', onAbort, { once: true });
|
||||
});
|
||||
remaining -= chunk;
|
||||
}
|
||||
}
|
||||
+241
-213
@@ -15,267 +15,219 @@ const DISABLE_CITATIONS = true;
|
||||
type TInteraction = z.infer<typeof GeminiInteractionsWire_API_Interactions.Interaction_schema>;
|
||||
type TUsage = NonNullable<TInteraction['usage']>;
|
||||
|
||||
type BlockState = {
|
||||
kind: 'thought' | 'text' | 'image' | 'other';
|
||||
emittedCitationKeys: Set<string>; // `${url}@${start}-${end}` for de-dupe (url_citations can repeat)
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* Gemini Interactions API parser (for Deep Research and future multimodal agents).
|
||||
* Gemini Interactions API SSE parser (for Deep Research and future agents).
|
||||
*
|
||||
* Each SSE frame carries a *full* Interaction snapshot (from POST or from a GET poll).
|
||||
* The parser diffs against prior state and emits only new content.
|
||||
* The upstream request is sent with `stream=true` and returns `text/event-stream` with events:
|
||||
* - interaction.start / interaction.status_update / interaction.complete (lifecycle)
|
||||
* - content.start / content.delta / content.stop (per-index content blocks)
|
||||
* - error (non-fatal, empty payload observed mid-stream)
|
||||
* - done (terminator, data: [DONE])
|
||||
*
|
||||
* Emission rules per output type:
|
||||
* - `text` -> `pt.appendText(newSuffix)`. New url_citation annotations are emitted once.
|
||||
* - `thought` -> `pt.appendReasoningText(newSuffix)`; signatures recorded via `setReasoningSignature`.
|
||||
* - `image` -> `pt.appendImageInline(...)` once per index (images are whole, not incremental).
|
||||
* URI-only variants emit a visible note + `console.warn` (not yet wired as fetches).
|
||||
* - `audio` -> PCM -> WAV via `geminiConvertPCM2WAV`, then `pt.appendAudioInline(...)` once per index.
|
||||
* - unknown types -> `console.warn` + inline `_Unsupported content block: <type>_` note, once per index.
|
||||
* Non-terminating: Deep Research streams are long-lived and must not blow up on new blocks.
|
||||
* The SSE demuxer (`fast-sse`) invokes this parser once per frame with `eventData` (the JSON body)
|
||||
* and `eventName` (the `event:` line). We dispatch on the JSON payload's `event_type` field which
|
||||
* mirrors the SSE event name - staying resilient if the demuxer drops the event name.
|
||||
*
|
||||
* Part boundaries: when the output type at a given index changes kind (e.g. thought -> text),
|
||||
* we call `endMessagePart()` so the transmitter flushes the previous part cleanly.
|
||||
* Delta variants (content.delta's `delta` payload):
|
||||
* - thought_summary -> `pt.appendReasoningText(delta.content.text)`
|
||||
* - text -> `pt.appendText(delta.text)`
|
||||
* - text_annotation -> `pt.appendUrlCitation(...)` for each url_citation (if `DISABLE_CITATIONS` is false)
|
||||
* - image -> `pt.appendImageInline(...)` (forward-looking; not observed in captures yet)
|
||||
*
|
||||
* Resume: GET /v1beta/interactions/{id}?stream=true[&last_event_id=<cursor>] - Gemini replays from
|
||||
* the cursor (or from start if omitted). Our parser is position-idempotent within a single run
|
||||
* because the transmitter's state carries across events.
|
||||
*/
|
||||
export function createGeminiInteractionsParser(requestedModelName: string | null): ChatGenerateParseFunction {
|
||||
|
||||
const parserCreationTimestamp = Date.now();
|
||||
let timeToFirstEvent: number | undefined;
|
||||
let timeToFirstContent: number | undefined;
|
||||
|
||||
// on resume we don't know the model name (the DMessage already has it) - skip emission
|
||||
let modelNameSent = requestedModelName == null;
|
||||
let modelNameSent = requestedModelName == null; // on resume, DMessage already has the model name
|
||||
let upstreamHandleSent = false;
|
||||
let operationOpId: string | null = null; // interaction id, set once; used to pair in-progress/done operation state
|
||||
let operationOpId: string | null = null; // interaction id; used to pair in-progress / done operation-state updates
|
||||
let operationOpenEmitted = false;
|
||||
let interactionIdCache: string | null = null; // cached for the `operation-state done` emission on interaction.complete
|
||||
|
||||
// per-index emission state (array index in `outputs[]`)
|
||||
type EmittedState = {
|
||||
kind: 'text' | 'thought' | 'image' | 'audio' | 'other';
|
||||
emittedTextLen: number;
|
||||
emittedCitationKeys: Set<string>; // `${url}@${start}-${end}` to de-dupe
|
||||
signatureSent: boolean;
|
||||
mediaEmitted: boolean; // image/audio: emit only once (whole, not incremental)
|
||||
otherWarned: boolean; // unknown type: warn only once per index
|
||||
};
|
||||
const emitted: EmittedState[] = [];
|
||||
let lastOpenIdx = -1; // index of the most recently opened part; -1 = none
|
||||
// per-index content-block state (mirrors what content.start declared; persists across delta/stop)
|
||||
const blocks: Record<number, BlockState> = Object.create(null);
|
||||
let lastOpenIdx = -1; // most recently opened content-block index; -1 = none open
|
||||
|
||||
return function parse(pt: IParticleTransmitter, rawEventData: string): void {
|
||||
return function parse(pt: IParticleTransmitter, rawEventData: string, _eventName?: string): void {
|
||||
|
||||
// model name is announced once (agents don't populate modelVersion the same way)
|
||||
// model name announced once (agents don't populate modelVersion the way generateContent does)
|
||||
if (!modelNameSent && requestedModelName != null) {
|
||||
pt.setModelName(requestedModelName);
|
||||
modelNameSent = true;
|
||||
}
|
||||
|
||||
// parse + validate
|
||||
const parsed = GeminiInteractionsWire_API_Interactions.Interaction_schema.safeParse(JSON.parse(rawEventData));
|
||||
if (!parsed.success)
|
||||
throw new Error(`malformed interaction snapshot: ${parsed.error.message}`);
|
||||
const interaction: TInteraction = parsed.data;
|
||||
// `event: done` carries the literal string `[DONE]` - terminate cleanly without a JSON parse
|
||||
if (rawEventData === '[DONE]')
|
||||
return;
|
||||
|
||||
// emit the upstream handle on the first frame that has an id (enables reattach across reloads)
|
||||
if (!upstreamHandleSent && interaction.id) {
|
||||
pt.setUpstreamHandle(interaction.id, 'vnd.gem.interactions');
|
||||
upstreamHandleSent = true;
|
||||
// parse the JSON body + validate against the event-union
|
||||
let rawJson: unknown;
|
||||
try {
|
||||
rawJson = JSON.parse(rawEventData);
|
||||
} catch (e: any) {
|
||||
throw new Error(`malformed SSE frame (not JSON): ${e?.message || String(e)}`);
|
||||
}
|
||||
|
||||
// Operation state: give the UI a live progress indicator while the background agent runs.
|
||||
// Pinned to the interaction id so the terminal 'done'/'error' replaces the same entry.
|
||||
if (interaction.id && !operationOpId)
|
||||
operationOpId = interaction.id;
|
||||
if (operationOpId && !operationOpenEmitted && interaction.status === 'in_progress') {
|
||||
pt.sendOperationState('search-web', 'Deep Research in progress...', { opId: operationOpId });
|
||||
operationOpenEmitted = true;
|
||||
const parsed = GeminiInteractionsWire_API_Interactions.StreamEvent_schema.safeParse(rawJson);
|
||||
if (!parsed.success) {
|
||||
// tolerate future/unknown event types rather than failing the whole stream
|
||||
console.warn('[GeminiInteractions] unknown SSE event shape:', rawJson);
|
||||
return;
|
||||
}
|
||||
const event = parsed.data;
|
||||
|
||||
// record time-to-first-content (first frame that carries outputs)
|
||||
if (timeToFirstEvent === undefined && interaction.outputs && interaction.outputs.length > 0)
|
||||
timeToFirstEvent = Date.now() - parserCreationTimestamp;
|
||||
switch (event.event_type) {
|
||||
|
||||
// process outputs (may be absent on early in_progress frames).
|
||||
// Each raw output is classified via Zod safeParse against a discriminated union.
|
||||
// - Untyped/empty placeholders (`{}`, no `type` field) are skipped silently without creating
|
||||
// state, so a later snapshot that populates them can classify cleanly.
|
||||
// - Typed-but-unknown shapes warn once per index with a visible note (non-terminating).
|
||||
const outputs = interaction.outputs ?? [];
|
||||
for (let i = 0; i < outputs.length; i++) {
|
||||
const raw = outputs[i] as { type?: unknown };
|
||||
const rawType = typeof raw?.type === 'string' ? raw.type : null;
|
||||
// --- Lifecycle ---
|
||||
|
||||
// skip not-yet-populated placeholder blocks silently (Deep Research pre-allocates slots)
|
||||
if (rawType === null) continue;
|
||||
case 'interaction.start':
|
||||
interactionIdCache = event.interaction.id;
|
||||
if (!upstreamHandleSent) {
|
||||
pt.setUpstreamHandle(event.interaction.id, 'vnd.gem.interactions');
|
||||
upstreamHandleSent = true;
|
||||
}
|
||||
break;
|
||||
|
||||
// silent-skip Deep Research internal tool call/result blocks. These are streamed as content
|
||||
// alongside text/thought but shouldn't surface to the user - the top-level "Deep Research in
|
||||
// progress" operation state already signals activity.
|
||||
if (GeminiInteractionsWire_API_Interactions.INTERNAL_OUTPUT_TYPES.has(rawType)) continue;
|
||||
case 'interaction.status_update':
|
||||
interactionIdCache = event.interaction_id;
|
||||
if (!upstreamHandleSent) {
|
||||
pt.setUpstreamHandle(event.interaction_id, 'vnd.gem.interactions');
|
||||
upstreamHandleSent = true;
|
||||
}
|
||||
// Surface the in-progress label the first time we see it. Pinned to the interaction id so
|
||||
// the terminal done/error (emitted from interaction.complete) replaces the same entry.
|
||||
if (event.status === 'in_progress' && !operationOpenEmitted) {
|
||||
operationOpId = event.interaction_id;
|
||||
pt.sendOperationState('search-web', 'Deep Research in progress...', { opId: operationOpId });
|
||||
operationOpenEmitted = true;
|
||||
}
|
||||
break;
|
||||
|
||||
const classified = GeminiInteractionsWire_API_Interactions.KnownOutput_schema.safeParse(raw);
|
||||
const kind: EmittedState['kind'] = !classified.success ? 'other' : classified.data.type;
|
||||
case 'interaction.complete':
|
||||
_handleInteractionComplete(pt, event.interaction, operationOpId ?? interactionIdCache, lastOpenIdx, parserCreationTimestamp, timeToFirstContent);
|
||||
break;
|
||||
|
||||
// first time we see this index: initialize + flush previous part if switching kinds
|
||||
let state = emitted[i];
|
||||
if (!state) {
|
||||
state = { kind, emittedTextLen: 0, emittedCitationKeys: new Set(), signatureSent: false, mediaEmitted: false, otherWarned: false };
|
||||
emitted[i] = state;
|
||||
// --- Content-block lifecycle ---
|
||||
|
||||
// close previous part if we're opening a new index (natural part boundary)
|
||||
if (lastOpenIdx !== -1 && lastOpenIdx !== i)
|
||||
case 'content.start': {
|
||||
const kind = _classifyContentKind(event.content?.type);
|
||||
blocks[event.index] = { kind, emittedCitationKeys: new Set() };
|
||||
// natural part boundary: close any previously open part when switching indices
|
||||
if (lastOpenIdx !== -1 && lastOpenIdx !== event.index)
|
||||
pt.endMessagePart();
|
||||
lastOpenIdx = i;
|
||||
lastOpenIdx = event.index;
|
||||
break;
|
||||
}
|
||||
|
||||
// 'other': warn once per index with visible note, then continue
|
||||
if (!classified.success) {
|
||||
if (!state.otherWarned) {
|
||||
console.warn(`[GeminiInteractions] unsupported output type: ${rawType}`, raw);
|
||||
pt.appendText(`\n_Unsupported content block: ${rawType}_\n`);
|
||||
state.otherWarned = true;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
case 'content.stop':
|
||||
// the final `endMessagePart` is emitted by `interaction.complete` after status evaluation,
|
||||
// so we don't auto-close here - lets multi-block streams flow naturally
|
||||
break;
|
||||
|
||||
const out = classified.data;
|
||||
switch (out.type) {
|
||||
case 'text': {
|
||||
if (out.text.length > state.emittedTextLen) {
|
||||
pt.appendText(out.text.slice(state.emittedTextLen));
|
||||
state.emittedTextLen = out.text.length;
|
||||
}
|
||||
// url_citation annotations: loose-typed in Output_schema, validated per-item here
|
||||
if (!DISABLE_CITATIONS && out.annotations) {
|
||||
for (const annRaw of out.annotations) {
|
||||
const annParse = GeminiInteractionsWire_API_Interactions.UrlCitationAnnotation_schema.safeParse(annRaw);
|
||||
if (!annParse.success) continue; // not a url_citation (place_citation, file_citation, ...)
|
||||
const ann = annParse.data;
|
||||
const key = `${ann.url}@${ann.start_index ?? ''}-${ann.end_index ?? ''}`;
|
||||
if (state.emittedCitationKeys.has(key)) continue;
|
||||
state.emittedCitationKeys.add(key);
|
||||
pt.appendUrlCitation(ann.title || ann.url, ann.url, undefined, ann.start_index, ann.end_index, undefined, undefined);
|
||||
// --- Delta routing ---
|
||||
|
||||
case 'content.delta': {
|
||||
if (timeToFirstContent === undefined)
|
||||
timeToFirstContent = Date.now() - parserCreationTimestamp;
|
||||
|
||||
// Ensure state exists even if we missed content.start (tolerant)
|
||||
if (!blocks[event.index])
|
||||
blocks[event.index] = { kind: 'other', emittedCitationKeys: new Set() };
|
||||
const state = blocks[event.index];
|
||||
|
||||
// Classify the delta payload - unknown types warn once and are dropped
|
||||
const deltaParse = GeminiInteractionsWire_API_Interactions.StreamDelta_schema.safeParse(event.delta);
|
||||
if (!deltaParse.success) {
|
||||
// Empty deltas ({}) appear alongside placeholder blocks (e.g. internal tool slots) - silent skip
|
||||
if (event.delta && Object.keys(event.delta).length === 0) break;
|
||||
console.warn('[GeminiInteractions] unknown content.delta shape at index', event.index, event.delta);
|
||||
break;
|
||||
}
|
||||
const delta = deltaParse.data;
|
||||
|
||||
switch (delta.type) {
|
||||
case 'thought_summary':
|
||||
if (delta.content?.text) pt.appendReasoningText(delta.content.text);
|
||||
// Intentionally NOT re-emitting sendOperationState here. The chip is created ONCE at
|
||||
// interaction.status_update and left alone - its `cts` is anchored to the run's
|
||||
// createdAt via the upstream handle (reassembler line 775), so the chip's timer shows
|
||||
// the true elapsed run time and survives reattach cleanly. Re-emitting would pollute
|
||||
// the opLog without adding user value.
|
||||
break;
|
||||
case 'thought_signature':
|
||||
if (delta.signature) pt.setReasoningSignature(delta.signature);
|
||||
break;
|
||||
case 'text':
|
||||
pt.appendText(delta.text);
|
||||
break;
|
||||
case 'text_annotation':
|
||||
if (!DISABLE_CITATIONS && delta.annotations) {
|
||||
for (const annRaw of delta.annotations) {
|
||||
const ann = GeminiInteractionsWire_API_Interactions.UrlCitationAnnotation_schema.safeParse(annRaw);
|
||||
if (!ann.success) continue; // place_citation, file_citation, etc. - not surfaced here
|
||||
const a = ann.data;
|
||||
const key = `${a.url}@${a.start_index ?? ''}-${a.end_index ?? ''}`;
|
||||
if (state.emittedCitationKeys.has(key)) continue;
|
||||
state.emittedCitationKeys.add(key);
|
||||
pt.appendUrlCitation(a.title || a.url, a.url, undefined, a.start_index, a.end_index, undefined, undefined);
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'thought': {
|
||||
// summary may be a string (preview) or an array of {type:'text', text} blocks (documented shape)
|
||||
const summary = typeof out.summary === 'string'
|
||||
? out.summary
|
||||
: Array.isArray(out.summary)
|
||||
? out.summary.map(s => s.text).join('\n\n')
|
||||
: '';
|
||||
if (summary.length > state.emittedTextLen) {
|
||||
pt.appendReasoningText(summary.slice(state.emittedTextLen));
|
||||
state.emittedTextLen = summary.length;
|
||||
}
|
||||
if (!state.signatureSent && out.signature) {
|
||||
pt.setReasoningSignature(out.signature);
|
||||
state.signatureSent = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'image': {
|
||||
if (!state.mediaEmitted) {
|
||||
if (out.data) {
|
||||
// hintSkipResize=true: Deep Research images are typically figures/charts where
|
||||
// PNG->jpeg recompression would degrade text legibility and fine detail.
|
||||
pt.appendImageInline(out.mime_type, out.data, 'Gemini Generated Image', 'Gemini', '', true);
|
||||
} else if (out.uri) {
|
||||
// URI-hosted images aren't fetched here (yet); surface the link inline
|
||||
console.warn('[GeminiInteractions] image output via URI is not yet fetched inline:', out.uri);
|
||||
pt.appendText(`\n[Image: ${out.uri}]\n`);
|
||||
} else {
|
||||
console.warn('[GeminiInteractions] image output with neither data nor uri:', out);
|
||||
pt.appendText(`\n_Image block without payload_\n`);
|
||||
break;
|
||||
case 'image':
|
||||
// Forward-looking: inline bytes flow through appendImageInline; URI-only gets a visible note.
|
||||
if (delta.data && delta.mime_type) {
|
||||
pt.appendImageInline(delta.mime_type, delta.data, 'Gemini Generated Image', 'Gemini', '', true);
|
||||
} else if (delta.uri) {
|
||||
console.warn('[GeminiInteractions] image delta via URI not fetched:', delta.uri);
|
||||
pt.appendText(`\n[Image: ${delta.uri}]\n`);
|
||||
}
|
||||
state.mediaEmitted = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'audio': {
|
||||
if (!state.mediaEmitted) {
|
||||
if (out.data) {
|
||||
const mime = out.mime_type.toLowerCase();
|
||||
break;
|
||||
case 'audio':
|
||||
// Forward-looking: audio deltas per spec (not yet observed in DR streams). PCM needs WAV conversion; packaged formats pass through.
|
||||
if (delta.data && delta.mime_type) {
|
||||
const mime = delta.mime_type.toLowerCase();
|
||||
const isPCM = mime.startsWith('audio/l16') || mime.includes('codec=pcm');
|
||||
if (isPCM) {
|
||||
try {
|
||||
const wav = geminiConvertPCM2WAV(out.mime_type, out.data);
|
||||
const wav = geminiConvertPCM2WAV(delta.mime_type, delta.data);
|
||||
pt.appendAudioInline(wav.mimeType, wav.base64Data, 'Gemini Generated Audio', 'Gemini', wav.durationMs);
|
||||
} catch (error) {
|
||||
console.warn('[GeminiInteractions] audio PCM convert failed:', error);
|
||||
pt.appendText(`\n_Audio conversion failed: ${String(error)}_\n`);
|
||||
}
|
||||
} else {
|
||||
// already a packaged format (audio/wav, audio/mp3, audio/aac, ...) - pass through
|
||||
pt.appendAudioInline(out.mime_type, out.data, 'Gemini Generated Audio', 'Gemini', 0);
|
||||
pt.appendAudioInline(delta.mime_type, delta.data, 'Gemini Generated Audio', 'Gemini', 0);
|
||||
}
|
||||
} else if (out.uri) {
|
||||
console.warn('[GeminiInteractions] audio output via URI is not yet fetched inline:', out.uri);
|
||||
pt.appendText(`\n[Audio: ${out.uri}]\n`);
|
||||
} else {
|
||||
console.warn('[GeminiInteractions] audio output with neither data nor uri:', out);
|
||||
pt.appendText(`\n_Audio block without payload_\n`);
|
||||
}
|
||||
state.mediaEmitted = true;
|
||||
break;
|
||||
default: {
|
||||
const _exhaustive: never = delta;
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
const _exhaustiveCheck: never = out;
|
||||
console.warn('[GeminiInteractions] unreachable: unhandled emittable type', { out });
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// terminal states: flush current part and signal end
|
||||
switch (interaction.status) {
|
||||
case 'completed':
|
||||
if (lastOpenIdx !== -1) pt.endMessagePart();
|
||||
if (operationOpId)
|
||||
pt.sendOperationState('search-web', 'Deep Research complete', { opId: operationOpId, state: 'done' });
|
||||
_emitUsageMetrics(pt, interaction.usage, parserCreationTimestamp, timeToFirstEvent);
|
||||
pt.setTokenStopReason('ok');
|
||||
pt.setDialectEnded('done-dialect');
|
||||
break;
|
||||
|
||||
case 'failed':
|
||||
if (operationOpId)
|
||||
pt.sendOperationState('search-web', 'Deep Research failed', { opId: operationOpId, state: 'error' });
|
||||
_emitUsageMetrics(pt, interaction.usage, parserCreationTimestamp, timeToFirstEvent);
|
||||
pt.setDialectTerminatingIssue('Deep Research interaction failed', null, 'srv-warn');
|
||||
break;
|
||||
|
||||
case 'cancelled':
|
||||
if (operationOpId)
|
||||
pt.sendOperationState('search-web', 'Deep Research cancelled', { opId: operationOpId, state: 'done' });
|
||||
_emitUsageMetrics(pt, interaction.usage, parserCreationTimestamp, timeToFirstEvent);
|
||||
pt.setTokenStopReason('cg-issue');
|
||||
pt.setDialectEnded('done-dialect');
|
||||
break;
|
||||
|
||||
case 'requires_action':
|
||||
// Not expected for Deep Research agents - fail loudly so we notice
|
||||
if (operationOpId)
|
||||
pt.sendOperationState('search-web', 'Deep Research needs action', { opId: operationOpId, state: 'error' });
|
||||
pt.setDialectTerminatingIssue('Deep Research returned requires_action (not supported in this client)', null, 'srv-warn');
|
||||
break;
|
||||
|
||||
case 'incomplete':
|
||||
// Run stopped early (token limit, etc.). Terminate gracefully with a visible note; we keep any content already emitted.
|
||||
if (lastOpenIdx !== -1) pt.endMessagePart();
|
||||
if (operationOpId)
|
||||
pt.sendOperationState('search-web', 'Deep Research incomplete', { opId: operationOpId, state: 'done' });
|
||||
pt.appendText('\n_Response incomplete (run stopped early)._\n');
|
||||
_emitUsageMetrics(pt, interaction.usage, parserCreationTimestamp, timeToFirstEvent);
|
||||
pt.setTokenStopReason('out-of-tokens');
|
||||
pt.setDialectEnded('done-dialect');
|
||||
break;
|
||||
|
||||
case 'in_progress':
|
||||
// keep polling
|
||||
case 'error':
|
||||
// Observed mid-stream with an empty payload between content blocks - non-fatal, the stream
|
||||
// continues with further events and eventually an interaction.complete. Silent-skip empty
|
||||
// payloads (Beta noise); warn only when actual error info is present.
|
||||
if (event.error?.message || event.error?.code)
|
||||
console.warn('[GeminiInteractions] SSE error event:', event.error);
|
||||
break;
|
||||
|
||||
default: {
|
||||
const _exhaustiveCheck: never = interaction.status;
|
||||
console.warn('[GeminiInteractions] unreachable: unhandled status', { status: interaction.status });
|
||||
const _exhaustiveCheck: never = event;
|
||||
console.warn('[GeminiInteractions] unreachable: unhandled emittable type', { event });
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -283,6 +235,82 @@ export function createGeminiInteractionsParser(requestedModelName: string | null
|
||||
}
|
||||
|
||||
|
||||
// --- helpers ---
|
||||
|
||||
function _classifyContentKind(rawType: unknown): BlockState['kind'] {
|
||||
if (rawType === 'thought') return 'thought';
|
||||
if (rawType === 'text') return 'text';
|
||||
if (rawType === 'image') return 'image';
|
||||
return 'other';
|
||||
}
|
||||
|
||||
function _handleInteractionComplete(
|
||||
pt: IParticleTransmitter,
|
||||
interaction: TInteraction,
|
||||
operationOpId: string | null,
|
||||
lastOpenIdx: number,
|
||||
parserCreationTimestamp: number,
|
||||
timeToFirstContent: number | undefined,
|
||||
): void {
|
||||
|
||||
// Flush any content parts that were open when the final block arrived
|
||||
if (lastOpenIdx !== -1) pt.endMessagePart();
|
||||
|
||||
switch (interaction.status) {
|
||||
case 'completed':
|
||||
if (operationOpId)
|
||||
pt.sendOperationState('search-web', 'Deep Research complete', { opId: operationOpId, state: 'done' });
|
||||
_emitUsageMetrics(pt, interaction.usage, parserCreationTimestamp, timeToFirstContent);
|
||||
pt.setTokenStopReason('ok');
|
||||
pt.setDialectEnded('done-dialect');
|
||||
break;
|
||||
|
||||
case 'failed':
|
||||
if (operationOpId)
|
||||
pt.sendOperationState('search-web', 'Deep Research failed', { opId: operationOpId, state: 'error' });
|
||||
_emitUsageMetrics(pt, interaction.usage, parserCreationTimestamp, timeToFirstContent);
|
||||
pt.setDialectTerminatingIssue('Deep Research interaction failed', null, 'srv-warn');
|
||||
break;
|
||||
|
||||
case 'cancelled':
|
||||
if (operationOpId)
|
||||
pt.sendOperationState('search-web', 'Deep Research cancelled', { opId: operationOpId, state: 'done' });
|
||||
_emitUsageMetrics(pt, interaction.usage, parserCreationTimestamp, timeToFirstContent);
|
||||
pt.setTokenStopReason('cg-issue');
|
||||
pt.setDialectEnded('done-dialect');
|
||||
break;
|
||||
|
||||
case 'requires_action':
|
||||
// Not expected for Deep Research agents - fail loudly so we notice
|
||||
if (operationOpId)
|
||||
pt.sendOperationState('search-web', 'Deep Research needs action', { opId: operationOpId, state: 'error' });
|
||||
pt.setDialectTerminatingIssue('Deep Research returned requires_action (not supported in this client)', null, 'srv-warn');
|
||||
break;
|
||||
|
||||
case 'incomplete':
|
||||
// Run stopped early (token limit, etc.). Terminate gracefully with a visible note; we keep any content already emitted.
|
||||
if (operationOpId)
|
||||
pt.sendOperationState('search-web', 'Deep Research incomplete', { opId: operationOpId, state: 'done' });
|
||||
pt.appendText('\n_Response incomplete (run stopped early)._\n');
|
||||
_emitUsageMetrics(pt, interaction.usage, parserCreationTimestamp, timeToFirstContent);
|
||||
pt.setTokenStopReason('out-of-tokens');
|
||||
pt.setDialectEnded('done-dialect');
|
||||
break;
|
||||
|
||||
case 'in_progress':
|
||||
// interaction.complete with in_progress shouldn't happen per the spec - log and keep the stream open
|
||||
console.warn('[GeminiInteractions] interaction.complete with status=in_progress; ignoring');
|
||||
break;
|
||||
|
||||
default: {
|
||||
const _exhaustiveCheck: never = interaction.status;
|
||||
console.warn('[GeminiInteractions] unreachable status', interaction.status);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Map Gemini Interactions `usage` to `CGSelectMetrics`.
|
||||
*
|
||||
@@ -292,14 +320,14 @@ export function createGeminiInteractionsParser(requestedModelName: string | null
|
||||
* - `total_tool_use_tokens` is distinct from input/output/thought and accounts for internal tool calls
|
||||
* (web search, code exec, etc.). For Deep Research this dominates - we fold it into TIn so displayed
|
||||
* input reflects true model consumption; there is no dedicated slot in CGSelectMetrics today.
|
||||
* - `total_output_tokens` excludes thought tokens; `gemini.parser.ts` (line 110) already adds TOutR into TOut
|
||||
* - `total_output_tokens` excludes thought tokens; `gemini.parser.ts` already adds TOutR into TOut
|
||||
* for consistency, and we follow the same convention here.
|
||||
*/
|
||||
function _emitUsageMetrics(
|
||||
pt: IParticleTransmitter,
|
||||
usage: TUsage | undefined,
|
||||
parserCreationTimestamp: number,
|
||||
timeToFirstEvent: number | undefined,
|
||||
timeToFirstContent: number | undefined,
|
||||
): void {
|
||||
if (!usage) return;
|
||||
|
||||
@@ -324,9 +352,9 @@ function _emitUsageMetrics(
|
||||
// timing
|
||||
const dtAll = Date.now() - parserCreationTimestamp;
|
||||
m.dtAll = dtAll;
|
||||
if (timeToFirstEvent !== undefined) {
|
||||
m.dtStart = timeToFirstEvent;
|
||||
const dtInner = dtAll - timeToFirstEvent;
|
||||
if (timeToFirstContent !== undefined) {
|
||||
m.dtStart = timeToFirstContent;
|
||||
const dtInner = dtAll - timeToFirstContent;
|
||||
if (dtInner > 0) {
|
||||
m.dtInner = dtInner;
|
||||
if (totalOut > 0)
|
||||
|
||||
+4781
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
+11529
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,21 @@
|
||||
#!/usr/bin/env bash
|
||||
# Re-sync all upstream snapshots in this directory.
|
||||
# For each *.md file: fetch the URL declared in the `Source:` header line,
|
||||
# preserve the header block (everything through the closing `-->`), bump the
|
||||
# `Synced:` date, and rewrite the body in place. Add a new snapshot with the
|
||||
# same header convention and it gets picked up automatically.
|
||||
set -euo pipefail
|
||||
DIR="$(cd "$(dirname "$0")" && pwd)"
|
||||
TODAY=$(date -u +%Y-%m-%d)
|
||||
for f in "$DIR"/*.md; do
|
||||
url=$(grep -m1 '^ Source: ' "$f" | sed 's/^ Source: //' || true)
|
||||
if [ -z "${url:-}" ]; then
|
||||
echo "skip (no Source: header): $(basename "$f")"
|
||||
continue
|
||||
fi
|
||||
echo "→ $(basename "$f") ← $url"
|
||||
header=$(awk '{print} /^-->$/{exit}' "$f" | sed -E "s|^ Synced: .*| Synced: $TODAY|")
|
||||
body=$(curl -sSL --max-time 30 "$url")
|
||||
printf '%s\n\n%s\n' "$header" "$body" > "$f"
|
||||
done
|
||||
echo "Done. Run \`git diff $DIR\` to see upstream changes."
|
||||
@@ -6,14 +6,16 @@ import * as z from 'zod/v4';
|
||||
*
|
||||
* 2026-04-21: NOTE - MINIMAL IMPL for DEEP RESEARCH AGENT
|
||||
* Scope: only what the Deep Research agents need.
|
||||
* - Stateless (no `previous_interaction_id`)
|
||||
* - `store: true` - required by the API when `background: true` (Deep Research agents are background-only).
|
||||
* - Single-turn by default (we don't yet send `previous_interaction_id` for multi-turn state reuse)
|
||||
* - `store: true` is sent (spec says optional; DR guide recommends it for background runs).
|
||||
* We best-effort DELETE on completion/abort to minimize server-side retention.
|
||||
* - Single-turn: last user text becomes `input`
|
||||
* - No tools, no system_instruction, no thinking config
|
||||
* - `system_instruction` is accepted for non-DR agents; DR agents reject it and we prepend to `input` instead.
|
||||
* - No `tools` (yet), no model-side `generation_config`.
|
||||
*
|
||||
* Docs: https://ai.google.dev/gemini-api/docs/interactions
|
||||
* https://ai.google.dev/api/interactions-api
|
||||
* Source-of-truth snapshots (for diffing across upstream changes, see ./_upstream/sync.sh):
|
||||
* ./_upstream/gemini.interactions.spec.md - the formal API reference
|
||||
* ./_upstream/gemini.interactions.guide.md - the prose guide
|
||||
* ./_upstream/gemini.deep-research.guide.md - the Deep Research agent guide
|
||||
*/
|
||||
export namespace GeminiInteractionsWire_API_Interactions {
|
||||
|
||||
@@ -51,15 +53,66 @@ export namespace GeminiInteractionsWire_API_Interactions {
|
||||
]),
|
||||
});
|
||||
|
||||
// agent_config: polymorphic discriminated union on `type` (formal spec: AgentConfig).
|
||||
// Only applicable when `agent` is set (mutually exclusive with `generation_config`, which is the
|
||||
// model-path equivalent). See ./_upstream/gemini.interactions.spec.md#agent_config.
|
||||
//
|
||||
// Variants:
|
||||
// - DynamicAgentConfig { type: 'dynamic' }
|
||||
// Dynamic agents - no tunable config documented beyond the discriminator.
|
||||
// - DeepResearchAgentConfig { type: 'deep-research', ... }
|
||||
// See ./_upstream/gemini.deep-research.guide.md#agent-configuration for defaults and semantics.
|
||||
|
||||
const _DynamicAgentConfig_schema = z.object({
|
||||
type: z.literal('dynamic'),
|
||||
});
|
||||
|
||||
const _DeepResearchAgentConfig_schema = z.object({
|
||||
type: z.literal('deep-research'),
|
||||
thinking_summaries: z.enum(['auto', 'none']).optional(), // default 'none'. 'auto' emits intermediate reasoning events during streaming.
|
||||
visualization: z.enum(['auto', 'off']).optional(), // default 'auto'. 'auto' lets the agent generate charts/images as part of the output.
|
||||
collaborative_planning: z.boolean().optional(), // default false. Plan-then-execute: the agent returns a research plan that the user confirms in a follow-up interaction.
|
||||
});
|
||||
|
||||
export const AgentConfig_schema = z.discriminatedUnion('type', [
|
||||
_DynamicAgentConfig_schema,
|
||||
_DeepResearchAgentConfig_schema,
|
||||
]);
|
||||
|
||||
// RequestBody_schema: POST /v1beta/interactions body.
|
||||
//
|
||||
// Cross-field constraints (from the formal spec):
|
||||
// - `agent` XOR `model` is REQUIRED. We only model the agent path here.
|
||||
// - `agent_config` XOR `generation_config` - config object is picked by path:
|
||||
// `agent`+`agent_config` OR `model`+`generation_config`. Never both.
|
||||
// - `system_instruction` is top-level (not inside config) but rejected by deep-research agents.
|
||||
// - `previous_interaction_id` carries conversation history but NOT per-interaction knobs:
|
||||
// `tools`, `system_instruction`, and `generation_config` are interaction-scoped and must be
|
||||
// re-sent each turn.
|
||||
export const RequestBody_schema = z.object({
|
||||
agent: z.string(), // e.g. 'deep-research-pro-preview-12-2025' (note: we send bare id, without 'models/' prefix)
|
||||
// --- Target: what to call ---
|
||||
agent: z.string(), // Spec: agent is AgentOption (optional, required if `model` not provided). Send the BARE id; no 'models/' prefix.
|
||||
// model: z.string(), // alternative path - not used here; would require generation_config instead of agent_config
|
||||
|
||||
// --- Inputs ---
|
||||
input: z.union([
|
||||
z.string(), // single-turn text convenience
|
||||
z.array(InputContentPart_schema), // single-turn multimodal
|
||||
z.array(Turn_schema), // stateless multi-turn history
|
||||
z.string(), // single-turn text convenience (string shortcut for a user-text turn)
|
||||
z.array(InputContentPart_schema), // single-turn multimodal (text + image parts for one user turn)
|
||||
z.array(Turn_schema), // stateless multi-turn history (role-tagged turns)
|
||||
]),
|
||||
background: z.literal(true), // required for agents
|
||||
store: z.literal(true), // required when background=true; we DELETE after completion to minimize retention
|
||||
system_instruction: z.string().optional(), // NOT supported by deep-research agents (tested 2026-04-23, API: 'not supported for the deep-research-* agent') - for those, prepend to `input` instead.
|
||||
|
||||
// --- Config (picks the agent or model path) ---
|
||||
agent_config: AgentConfig_schema.optional(), // Polymorphic on `type`: 'deep-research' | 'dynamic'. MUTUALLY EXCLUSIVE with `generation_config` (model path). Enables thought-summary streaming, visualizations, collaborative planning.
|
||||
// generation_config: GenerationConfig_schema.optional(), // model path - not modeled here yet
|
||||
|
||||
// --- Runtime flags (literals below force correct behavior at the adapter layer) ---
|
||||
stream: z.boolean().optional(), // SSE streaming - when true, POST returns an event-stream (interaction.start, content.start/delta/stop, interaction.complete, done). On reattach, GET ?stream=true replays the full event sequence (we do not send `last_event_id` - full replay is the intentional semantic; see poller comment).
|
||||
store: z.literal(true), // spec-optional; we lock to `true` so the interaction is retrievable post-run (replay via GET stream, resume via `last_event_id`). Required alongside `background=true` for agents per the DR guide.
|
||||
background: z.literal(true), // spec-optional; DR agents REQUIRE `true` ('Agents are required to use background=true'). Locked to true to prevent accidental sync-mode sends.
|
||||
|
||||
// --- Multi-turn continuation ---
|
||||
previous_interaction_id: z.string().optional(), // reuses prior interaction's stored inputs/outputs. Per-turn knobs (tools, system_instruction, generation_config) are NOT carried and must be re-sent.
|
||||
});
|
||||
|
||||
|
||||
@@ -164,8 +217,10 @@ export namespace GeminiInteractionsWire_API_Interactions {
|
||||
|
||||
// -- Usage (populated in the terminal frame) --
|
||||
|
||||
// Modality enum: per spec ResponseModality - ISO 8601 in descriptions clarifies this is the
|
||||
// runtime modality, not the model's response_modalities request field.
|
||||
const UsageByModality_schema = z.object({
|
||||
modality: z.string(), // 'text' | 'image' | 'audio' | ...
|
||||
modality: z.enum(['text', 'image', 'audio', 'video', 'document']).or(z.string()), // permissive for future modalities
|
||||
tokens: z.number(),
|
||||
});
|
||||
|
||||
@@ -175,17 +230,189 @@ export namespace GeminiInteractionsWire_API_Interactions {
|
||||
total_cached_tokens: z.number().optional(),
|
||||
total_output_tokens: z.number().optional(),
|
||||
total_thought_tokens: z.number().optional(),
|
||||
total_tool_use_tokens: z.number().optional(), // Deep Research: tokens consumed by internal tool calls (web search, etc.)
|
||||
total_tool_use_tokens: z.number().optional(), // Deep Research: tokens consumed by internal tool calls (web search, etc.)
|
||||
input_tokens_by_modality: z.array(UsageByModality_schema).optional(),
|
||||
cached_tokens_by_modality: z.array(UsageByModality_schema).optional(), // spec: cached-tokens breakdown (input subset)
|
||||
output_tokens_by_modality: z.array(UsageByModality_schema).optional(),
|
||||
tool_use_tokens_by_modality: z.array(UsageByModality_schema).optional(), // spec: tool-use breakdown - DR search/urlcontext/code-exec consumption per modality
|
||||
});
|
||||
|
||||
export const Interaction_schema = z.object({
|
||||
// Full Interaction resource (spec: Resource:Interaction). We model ALL required fields and the
|
||||
// ones we observe in responses; remaining optional echo fields are left as `.passthrough()` - see
|
||||
// `Output_schema` comment for rationale.
|
||||
export const Interaction_schema = z.looseObject({
|
||||
// required (all marked "Required. Output only." in spec)
|
||||
id: z.string(),
|
||||
status: Status_enum,
|
||||
created: z.string().optional(), // ISO 8601 - spec says Required but marked optional-with-default upstream; we keep optional for forward-compat
|
||||
updated: z.string().optional(), // ISO 8601
|
||||
|
||||
// commonly-observed fields
|
||||
role: z.string().optional(), // 'agent' | 'user' | ... - output only
|
||||
object: z.string().optional(), // 'interaction' literal observed in responses
|
||||
agent: z.string().optional(), // echoed back on agent-path interactions
|
||||
model: z.string().optional(), // echoed back on model-path interactions
|
||||
|
||||
// content + metrics
|
||||
outputs: z.array(Output_schema).optional(), // absent until first content arrives
|
||||
usage: Usage_schema.optional(), // populated in terminal frames (completed/failed/cancelled)
|
||||
// We ignore model/agent echo for now
|
||||
usage: Usage_schema.optional(), // populated in terminal frames (completed/failed/cancelled/incomplete)
|
||||
|
||||
// (remaining echo fields - system_instruction, tools, agent_config, previous_interaction_id,
|
||||
// input, response_modalities, response_format, etc. - pass through via looseObject for now)
|
||||
});
|
||||
|
||||
|
||||
// -- SSE Stream Events --
|
||||
//
|
||||
// When the POST (or resume GET) is sent with `stream=true`, the API returns `text/event-stream`
|
||||
// with the following frames (captured empirically 2026-04-23, see _upstream/gemini.deep-research.guide.md#streaming):
|
||||
//
|
||||
// event: interaction.start one at the top; carries { interaction: {id, status, agent, ...} }
|
||||
// event: interaction.status_update status transitions (in_progress, completed, ...)
|
||||
// event: content.start opens a content block at {index, content:{type:'thought'|'text'|...}}
|
||||
// event: content.delta incremental data for index; polymorphic delta (see below); some carry `event_id` for resume
|
||||
// event: content.stop closes a content block at index
|
||||
// event: error spec shape: { error?: { code, message } }; observed with EMPTY payload in Beta - non-fatal, continue
|
||||
// event: interaction.complete final snapshot carrying the full Interaction incl. usage
|
||||
// event: done terminator with data: [DONE] (OpenAI-style)
|
||||
//
|
||||
// Resume: GET /v1beta/interactions/{id}?stream=true
|
||||
// Spec also allows `&last_event_id=<event_id>` for incremental resume, but we do NOT use it.
|
||||
// Full replay from the beginning is the intentional semantic - the client's ContentReassembler
|
||||
// REPLACES message content on reattach, so partial resume would be a mismatch. Works identically
|
||||
// on in-progress, completed, failed, and cancelled interactions (within Gemini's retention window).
|
||||
|
||||
// --- ContentDeltaData variants (spec: polymorphic on `type`) ---
|
||||
//
|
||||
// Spec defines: text, image, audio, document, video, thought_summary, thought_signature,
|
||||
// text_annotation, function_call, + tool-call/result variants (code_execution_*, url_context_*,
|
||||
// google_search_*, google_maps_*, file_search_*, mcp_server_tool_*). We model variants we emit
|
||||
// to the UI; unknown ones fail safeParse at the parser and are silently dropped (mirrors the
|
||||
// INTERNAL_OUTPUT_TYPES policy on the non-streaming path).
|
||||
|
||||
const TextDelta_schema = z.object({
|
||||
type: z.literal('text'),
|
||||
text: z.string(),
|
||||
});
|
||||
|
||||
const ThoughtSummaryDelta_schema = z.object({
|
||||
type: z.literal('thought_summary'),
|
||||
// Spec: ThoughtSummaryContent - polymorphic (only `text` variant documented). Optional per spec.
|
||||
content: z.object({
|
||||
type: z.literal('text'),
|
||||
text: z.string(),
|
||||
}).optional(),
|
||||
});
|
||||
|
||||
// Backend validation hash - routed to `pt.setReasoningSignature` when present.
|
||||
const ThoughtSignatureDelta_schema = z.object({
|
||||
type: z.literal('thought_signature'),
|
||||
signature: z.string().optional(),
|
||||
});
|
||||
|
||||
// text_annotation arrives as its own delta on the same index as a text block, carrying citation metadata for the text already emitted.
|
||||
const TextAnnotationDelta_schema = z.object({
|
||||
type: z.literal('text_annotation'),
|
||||
// Spec: Annotation - polymorphic on `type` (url_citation, file_citation, place_citation). Optional per spec.
|
||||
annotations: z.array(z.looseObject({ type: z.string() })).optional(), // validated per-item via UrlCitationAnnotation_schema
|
||||
});
|
||||
|
||||
const ImageDelta_schema = z.object({
|
||||
type: z.literal('image'),
|
||||
data: z.string().optional(), // base64
|
||||
uri: z.string().optional(),
|
||||
mime_type: z.string().optional(), // spec enum: image/png, image/jpeg, image/webp, image/heic, image/heif, image/gif, image/bmp, image/tiff
|
||||
resolution: z.enum(['low', 'medium', 'high', 'ultra_high']).optional(), // spec: MediaResolution
|
||||
});
|
||||
|
||||
const AudioDelta_schema = z.object({
|
||||
type: z.literal('audio'),
|
||||
data: z.string().optional(),
|
||||
uri: z.string().optional(),
|
||||
mime_type: z.string().optional(), // spec enum: audio/wav, audio/mp3, audio/aiff, audio/aac, audio/ogg, audio/flac, audio/mpeg, audio/m4a, audio/l16, audio/opus, audio/alaw, audio/mulaw
|
||||
rate: z.number().optional(),
|
||||
channels: z.number().optional(),
|
||||
});
|
||||
|
||||
// Delta discriminated union - covers variants we emit to the UI. Unknown variants (document,
|
||||
// video, function_call, + tool-call/result) fail safeParse in the parser and are silently dropped.
|
||||
export const StreamDelta_schema = z.discriminatedUnion('type', [
|
||||
TextDelta_schema,
|
||||
ImageDelta_schema,
|
||||
AudioDelta_schema,
|
||||
ThoughtSummaryDelta_schema,
|
||||
ThoughtSignatureDelta_schema,
|
||||
TextAnnotationDelta_schema,
|
||||
]);
|
||||
|
||||
// --- SSE event data payloads (spec: InteractionSseEvent - polymorphic on `event_type`) ---
|
||||
//
|
||||
// Per spec, EVERY variant carries an OPTIONAL `event_id` resume cursor. At runtime only a subset
|
||||
// of events actually include one, so the schema accepts it on all but our parser uses whichever
|
||||
// is present to advance the cursor.
|
||||
|
||||
const InteractionStart_event_schema = z.object({
|
||||
event_type: z.literal('interaction.start'),
|
||||
interaction: Interaction_schema.partial().extend({ id: z.string(), status: Status_enum.optional() }),
|
||||
event_id: z.string().optional(),
|
||||
});
|
||||
|
||||
const InteractionStatusUpdate_event_schema = z.object({
|
||||
event_type: z.literal('interaction.status_update'),
|
||||
interaction_id: z.string(),
|
||||
status: Status_enum,
|
||||
event_id: z.string().optional(),
|
||||
});
|
||||
|
||||
const ContentStart_event_schema = z.object({
|
||||
event_type: z.literal('content.start'),
|
||||
index: z.number(),
|
||||
content: z.looseObject({ type: z.string() }), // spec: Content (polymorphic)
|
||||
event_id: z.string().optional(),
|
||||
});
|
||||
|
||||
const ContentDelta_event_schema = z.object({
|
||||
event_type: z.literal('content.delta'),
|
||||
index: z.number(),
|
||||
delta: z.looseObject({}), // spec: ContentDeltaData - tolerant at ingest; parsed later via StreamDelta_schema.safeParse
|
||||
event_id: z.string().optional(),
|
||||
});
|
||||
|
||||
const ContentStop_event_schema = z.object({
|
||||
event_type: z.literal('content.stop'),
|
||||
index: z.number(),
|
||||
event_id: z.string().optional(),
|
||||
});
|
||||
|
||||
const Error_event_schema = z.object({
|
||||
event_type: z.literal('error'),
|
||||
// Spec: Error (optional) - { code?: string (URI), message?: string }. Observed empty in Beta.
|
||||
error: z.object({
|
||||
code: z.string().optional(),
|
||||
message: z.string().optional(),
|
||||
}).optional(),
|
||||
event_id: z.string().optional(),
|
||||
});
|
||||
|
||||
const InteractionComplete_event_schema = z.object({
|
||||
event_type: z.literal('interaction.complete'),
|
||||
// Spec note: "The completed interaction with EMPTY OUTPUTS to reduce the payload size. Use the
|
||||
// preceding ContentDelta events for the actual output." We rely on `status` + `usage` here.
|
||||
interaction: Interaction_schema,
|
||||
event_id: z.string().optional(),
|
||||
});
|
||||
|
||||
// `event: done` carries the literal string `[DONE]` instead of JSON; handled specially in the parser
|
||||
|
||||
/** Discriminated union of JSON-bodied SSE events. The `done` terminator is handled as a string-valued special case in the parser. */
|
||||
export const StreamEvent_schema = z.discriminatedUnion('event_type', [
|
||||
InteractionStart_event_schema,
|
||||
InteractionStatusUpdate_event_schema,
|
||||
ContentStart_event_schema,
|
||||
ContentDelta_event_schema,
|
||||
ContentStop_event_schema,
|
||||
Error_event_schema,
|
||||
InteractionComplete_event_schema,
|
||||
]);
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user