Compare commits

...

12 Commits

14 changed files with 934 additions and 39 deletions
+26 -1
View File
@@ -15,7 +15,7 @@ import { DConversationId, excludeSystemMessages } from '~/common/stores/chat/cha
import { ShortcutKey, useGlobalShortcuts } from '~/common/components/shortcuts/useGlobalShortcuts';
import { clipboardInterceptCtrlCForCleanup } from '~/common/util/clipboardUtils';
import { convertFilesToDAttachmentFragments } from '~/common/attachment-drafts/attachment.pipeline';
import { createDMessageFromFragments, createDMessageTextContent, DMessage, DMessageId, DMessageUserFlag, DMetaReferenceItem, MESSAGE_FLAG_AIX_SKIP, messageHasUserFlag } from '~/common/stores/chat/chat.message';
import { createDMessageFromFragments, createDMessageTextContent, DMessage, DMessageGenerator, DMessageId, DMessageUserFlag, DMetaReferenceItem, MESSAGE_FLAG_AIX_SKIP, messageHasUserFlag } from '~/common/stores/chat/chat.message';
import { createTextContentFragment, DMessageFragment, DMessageFragmentId } from '~/common/stores/chat/chat.fragments';
import { openFileForAttaching } from '~/common/components/ButtonAttachFiles';
import { optimaOpenPreferences } from '~/common/layout/optima/useOptima';
@@ -123,6 +123,30 @@ export function ChatMessageList(props: {
}
}, [conversationHandler, conversationId, onConversationExecuteHistory]);
const handleMessageUpstreamResume = React.useCallback(async (generator: DMessageGenerator, messageId: DMessageId) => {
if (!conversationId || !conversationHandler) return;
if (!generator.upstreamHandle) throw new Error('No upstream handle on generator');
// For AIX generators the DLLMId is at .aix.mId
const llmId = generator.mgt === 'aix' ? generator.aix.mId : undefined;
if (!llmId) throw new Error('No model id on generator');
const { aixCreateChatGenerateContext, aixReattachContent_DMessage_orThrow } = await import('~/modules/aix/client/aix.client');
await aixReattachContent_DMessage_orThrow(
llmId,
generator,
aixCreateChatGenerateContext('conversation', conversationId),
{ abortSignal: 'NON_ABORTABLE', throttleParallelThreads: 0 },
async (update, isDone) => {
conversationHandler.messageEdit(messageId, {
fragments: update.fragments,
generator: update.generator,
pendingIncomplete: update.pendingIncomplete,
}, isDone /* messageComplete */, true /* touch */);
},
);
}, [conversationHandler, conversationId]);
// message menu methods proxy
@@ -371,6 +395,7 @@ export function ChatMessageList(props: {
onMessageBeam={handleMessageBeam}
onMessageBranch={handleMessageBranch}
onMessageContinue={handleMessageContinue}
onMessageUpstreamResume={handleMessageUpstreamResume}
onMessageDelete={handleMessageDelete}
onMessageFragmentAppend={handleMessageAppendFragment}
onMessageFragmentDelete={handleMessageDeleteFragment}
@@ -36,7 +36,7 @@ import { ModelVendorAnthropic } from '~/modules/llms/vendors/anthropic/anthropic
import { AnthropicIcon } from '~/common/components/icons/vendors/AnthropicIcon';
import { ChatBeamIcon } from '~/common/components/icons/ChatBeamIcon';
import { CloseablePopup } from '~/common/components/CloseablePopup';
import { DMessage, DMessageId, DMessageUserFlag, DMetaReferenceItem, MESSAGE_FLAG_AIX_SKIP, MESSAGE_FLAG_NOTIFY_COMPLETE, MESSAGE_FLAG_STARRED, MESSAGE_FLAG_VND_ANT_CACHE_AUTO, MESSAGE_FLAG_VND_ANT_CACHE_USER, messageFragmentsReduceText, messageHasUserFlag } from '~/common/stores/chat/chat.message';
import { DMessage, DMessageGenerator, DMessageId, DMessageUserFlag, DMetaReferenceItem, MESSAGE_FLAG_AIX_SKIP, MESSAGE_FLAG_NOTIFY_COMPLETE, MESSAGE_FLAG_STARRED, MESSAGE_FLAG_VND_ANT_CACHE_AUTO, MESSAGE_FLAG_VND_ANT_CACHE_USER, messageFragmentsReduceText, messageHasUserFlag } from '~/common/stores/chat/chat.message';
import { KeyStroke } from '~/common/components/KeyStroke';
import { MarkHighlightIcon } from '~/common/components/icons/MarkHighlightIcon';
import { PhTreeStructure } from '~/common/components/icons/phosphor/PhTreeStructure';
@@ -162,7 +162,7 @@ export function ChatMessage(props: {
onMessageBeam?: (messageId: string) => Promise<void>,
onMessageBranch?: (messageId: string) => void,
onMessageContinue?: (messageId: string, continueText: null | string) => void,
onMessageUpstreamResume?: (messageId: string) => Promise<void>,
onMessageUpstreamResume?: (generator: DMessageGenerator, messageId: string) => Promise<void>,
onMessageDelete?: (messageId: string) => void,
onMessageFragmentAppend?: (messageId: DMessageId, fragment: DMessageFragment) => void
onMessageFragmentDelete?: (messageId: DMessageId, fragmentId: DMessageFragmentId) => void,
@@ -266,8 +266,9 @@ export function ChatMessage(props: {
}, [messageId, onMessageContinue]);
const handleUpstreamResume = React.useCallback(() => {
return onMessageUpstreamResume?.(messageId);
}, [messageId, onMessageUpstreamResume]);
if (!messageGenerator) return;
return onMessageUpstreamResume?.(messageGenerator, messageId);
}, [messageGenerator, messageId, onMessageUpstreamResume]);
// Text Editing
@@ -893,10 +894,10 @@ export function ChatMessage(props: {
)}
{/* Upstream Resume - shows whenever there's a stored handle (incl. post-reload, where no error fragment is present) */}
{fromAssistant && messageGenerator?.upstreamHandle && (
{!messagePendingIncomplete && props.isBottom && fromAssistant && messageGenerator?.upstreamHandle && !!onMessageUpstreamResume && (
<BlockOpUpstreamResume
upstreamHandle={messageGenerator.upstreamHandle}
onResume={onMessageUpstreamResume ? handleUpstreamResume : undefined}
onResume={handleUpstreamResume}
/>
)}
@@ -24,7 +24,7 @@ import { animationSpinHalfPause } from '~/common/util/animUtils';
// configuration
const DATASTREAM_VISUALIZATION_DELAY = Math.round(2 * Math.PI * 1000);
const MODELOP_TIMEOUT_DELAY = 5; // seconds
const MODELOP_TIMEOUT_LIMIT = 300; // seconds
const MODELOP_TIMEOUT_LIMIT = 3600; // seconds - 1hr for long ops, such as Gemini Deep Research
const modelOperationConfig: Record<DVoidPlaceholderMOp['mot'], { Icon: React.ElementType, color: ColorPaletteProp }> = {
'search-web': { Icon: SearchRoundedIcon, color: 'neutral' },
@@ -32,6 +32,16 @@ const modelOperationConfig: Record<DVoidPlaceholderMOp['mot'], { Icon: React.Ele
'code-exec': { Icon: CodeIcon, color: 'primary' },
} as const;
function _formatElapsed(seconds: number): string {
if (seconds < 60) return `${seconds}s`;
const m = Math.floor(seconds / 60);
const s = seconds % 60;
if (m < 60) return s ? `${m}m ${s}s` : `${m}m`;
const h = Math.floor(m / 60);
const rm = m % 60;
return rm ? `${h}h ${rm}m` : `${h}h`;
}
const _styles = {
followUpChip: {
@@ -301,7 +311,7 @@ function ModelOperationChip(props: {
{text}
{elapsedSeconds >= MODELOP_TIMEOUT_DELAY && (
<span style={{ opacity: 0.6 }}>
{' · '}<span style={{ display: 'inline-block', minWidth: elapsedSeconds >= 100 ? '4ch' : '3ch' }}>{elapsedSeconds}s</span>
{' · '}<span style={{ display: 'inline-block', minWidth: elapsedSeconds >= 60 ? '6ch' : '3ch' }}>{_formatElapsed(elapsedSeconds)}</span>
</span>
)}
</span>
+1 -1
View File
@@ -23,7 +23,7 @@ export const Release = {
// this is here to trigger revalidation of data, e.g. models refresh
Monotonics: {
Aix: 66,
Aix: 67,
NewsVersion: 204,
},
+14 -3
View File
@@ -154,6 +154,10 @@ export class ContentReassembler {
// termination -> User/AI issue message
if (errorMessage) this._appendErrorFragment(errorMessage);
// clean completion -> remove upstreamHandle (SET in onResponseHandle, CLEARED here on clean completion)
if (outcome === 'completed' && this.S.generator.upstreamHandle)
this._clearGeneratorUpstreamHandle();
// Fragment finalization heuristics:
@@ -913,9 +917,10 @@ export class ContentReassembler {
'cg-issue': { outcome: 'failed', tsr: 'issue' },
// model completed but with a specific stop condition
'out-of-tokens': { outcome: 'completed', tsr: 'out-of-tokens' },
'filter-content': { outcome: 'completed', tsr: 'filter' },
'filter-recitation': { outcome: 'completed', tsr: 'filter' },
'filter-refusal': { outcome: 'completed', tsr: 'filter' },
// filter-triggered terminations - treated as failures so all surfaces (chat, beam, etc.) render the error state uniformly
'filter-content': { outcome: 'failed', tsr: 'filter', errorMessage: 'Response blocked by the provider\'s content filter.' },
'filter-recitation': { outcome: 'failed', tsr: 'filter', errorMessage: 'Response blocked - potential copyrighted/recited content.' },
'filter-refusal': { outcome: 'failed', tsr: 'filter', errorMessage: 'Response refused by the provider\'s safety filter.' },
} as const;
if (dialectTokenStopReason in classification)
return classification[dialectTokenStopReason];
@@ -1052,6 +1057,12 @@ export class ContentReassembler {
this.S.generator = { ...this.S.generator, upstreamHandle: handle };
}
private _clearGeneratorUpstreamHandle(): void {
if (!this.S.generator?.upstreamHandle) return;
const { upstreamHandle, ...rest } = this.S.generator;
this.S.generator = rest;
}
// Fragment helpers - structural sharing: every mutation creates a new array reference
@@ -6,10 +6,10 @@
import { capitalizeFirstLetter } from '~/common/util/textUtils';
// IMPORTANT: client-side bundle imports server-side code including stubbed code
import type { AixAPI_Access, AixAPI_ConnectionOptions_ChatGenerate, AixAPI_Context_ChatGenerate, AixAPI_Model, AixAPIChatGenerate_Request, AixWire_Particles } from '../server/api/aix.wiretypes';
import type { AixAPI_Access, AixAPI_ConnectionOptions_ChatGenerate, AixAPI_Context_ChatGenerate, AixAPI_Model, AixAPI_ResumeHandle, AixAPIChatGenerate_Request, AixWire_Particles } from '../server/api/aix.wiretypes';
import type { AixDebugObject } from '../server/dispatch/chatGenerate/chatGenerate.debug';
import { AIX_INSPECTOR_ALLOWED_CONTEXTS, AIX_SECURITY_ONLY_IN_DEV_BUILDS } from '../server/api/aix.security';
import { createChatGenerateDispatch } from '../server/dispatch/chatGenerate/chatGenerate.dispatch';
import { createChatGenerateDispatch, createChatGenerateResumeDispatch } from '../server/dispatch/chatGenerate/chatGenerate.dispatch';
import { executeChatGenerateWithContinuation } from '../server/dispatch/chatGenerate/chatGenerate.continuation';
@@ -40,6 +40,24 @@ export async function* clientSideChatGenerate(
yield* executeChatGenerateWithContinuation(chatGenerateDispatchCreator, streaming, abortSignal, _d);
}
/**
* Client-side reattach - uses server's executeChatGenerateWithContinuation directly.
* Matches the `aixRouter.reattachContent` server-side procedure. Streaming-only.
*/
export async function* clientSideReattach(
access: AixAPI_Access,
resumeHandle: AixAPI_ResumeHandle,
context: AixAPI_Context_ChatGenerate,
connectionOptions: AixAPI_ConnectionOptions_ChatGenerate,
abortSignal: AbortSignal,
): AsyncGenerator<AixWire_Particles.ChatGenerateOp, void> {
// keep in sync with the `aixRouter.reattachContent` server-side procedure
const _d: AixDebugObject = _createClientDebugConfig(access, connectionOptions, context.name);
const resumeDispatchCreator = () => createChatGenerateResumeDispatch(access, resumeHandle, true /* streaming */);
yield* executeChatGenerateWithContinuation(resumeDispatchCreator, true /* streaming */, abortSignal, _d);
}
// CSF debug config - lighter than server-side
function _createClientDebugConfig(access: AixAPI_Access, options: undefined | { debugDispatchRequest?: boolean, debugProfilePerformance?: boolean, debugRequestBodyOverride?: Record<string, unknown> }, chatGenerateContextName: string): AixDebugObject {
const echoRequest = !!options?.debugDispatchRequest && (AIX_SECURITY_ONLY_IN_DEV_BUILDS || AIX_INSPECTOR_ALLOWED_CONTEXTS.includes(chatGenerateContextName));
+63 -7
View File
@@ -190,6 +190,10 @@ interface AixClientOptions {
abortSignal: AbortSignal | 'NON_ABORTABLE'; // 'NON_ABORTABLE' is a special case for non-abortable operations
throttleParallelThreads?: number; // 0: disable, 1: default throttle (12Hz), 2+ reduce frequency with the square root
// [Reattach] Internal hook - set by `aixReattachContent_DMessage_orThrow`. When present, seeds the LL
// with this generator; the upstreamHandle on it triggers the LL reattach branch (Gemini Deep Research GET-poll).
reattachGenerator?: Readonly<DMessageGenerator> & Required<Pick<DMessageGenerator, 'upstreamHandle'>>;
// LLM parameter configuration layers: full replacement of user params and/or overrides of a set of individual params
llmUserParametersReplacement?: DModelParameterValues; // can replace the 'global' llm user configuration with an alternate config (e.g. persona, or per-chat)
llmOptionsOverride?: Omit<DModelParameterValues, 'llmRef'>; // overrides (sets/replaces) individual LLM parameters
@@ -494,7 +498,7 @@ type _AixChatGenerateContent_DMessageGuts_WithOutcome = AixChatGenerateContent_D
* @throws Error if the LLM is not found or other misconfigurations, but handles most other errors internally.
*
* Features:
* - Throttling if requrested (decimates the requests based on the square root of the number parllel hints)
* - Throttling if requested (decimates the requests based on the square root of the number parallel hints)
* - computes the costs and metrics for the chat generation
* - vendor-specific rate limit
* - 'pendingIncomplete' logic
@@ -543,7 +547,7 @@ export async function aixChatGenerateContent_DMessage_orThrow<TServiceSettings e
// Aix LL Chat Generation
const dMessage: AixChatGenerateContent_DMessageGuts = {
fragments: [],
generator: createGeneratorAIX_AutoLabel(llm.vId, llm.id), // using llm.id (not aixModel.id/ref) so we can re-select them in the UI (Beam)
generator: clientOptions.reattachGenerator ?? createGeneratorAIX_AutoLabel(llm.vId, llm.id), // using llm.id (not aixModel.id/ref) so we can re-select them in the UI (Beam)
pendingIncomplete: true,
};
@@ -626,6 +630,45 @@ function _finalizeLlmMetricsWithCosts(cgMetricsLg: undefined | DMetricsChatGener
}
// --- L2 - Content Generation reattachment as DMessage ---
/**
* Reattach facade: wraps `aixChatGenerateContent_DMessage_orThrow` for the reattach-to-upstream flow.
*
* On an in-progress upstream run (Gemini Deep Research today, extensible to OAI Responses), the server
* just needs the handle to GET-poll; no chat-generate body is needed. This facade:
* - validates the generator carries an `upstreamHandle`,
* - stubs the chat-generate request (unused on the reattach path - the server uses the handle),
* - seeds the base function via `clientOptions.reattachGenerator` so the LL's reattach branch fires.
*
* The reassembler starts with empty fragments; since Gemini Interactions snapshots are cumulative,
* the stream will rebuild the complete content from scratch. Any partial content from the original run is replaced.
*/
export async function aixReattachContent_DMessage_orThrow(
llmId: DLLMId,
reattachGenerator: Readonly<DMessageGenerator>,
aixContext: AixAPI_Context_ChatGenerate,
clientOptions: Pick<AixClientOptions, 'abortSignal' | 'throttleParallelThreads'>,
onStreamingUpdate?: (update: AixChatGenerateContent_DMessageGuts, isDone: boolean) => MaybePromise<void>,
): Promise<_AixChatGenerateContent_DMessageGuts_WithOutcome> {
if (!reattachGenerator.upstreamHandle)
throw new Error('aixReattachContent: generator must have an upstreamHandle');
// Stub chat-generate request - unused on reattach (server GET-polls by the handle on reattachGenerator)
const stubChatGenerate: AixAPIChatGenerate_Request = { systemMessage: null, chatSequence: [] };
return aixChatGenerateContent_DMessage_orThrow(
llmId,
stubChatGenerate,
aixContext,
true, // streaming
{ ...clientOptions, reattachGenerator: reattachGenerator as any /* guaranteed by the check */ },
onStreamingUpdate,
);
}
// --- LL Low-Level (Level 1) - Streaming loop with retry/reassembler ---
/**
@@ -734,9 +777,12 @@ async function _aixChatGenerateContent_LL(
// [CSF] Pre-load client-side executor if needed
let clientSideChatGenerate: typeof import('./aix.client.direct-chatGenerate').clientSideChatGenerate | undefined = undefined;
let clientSideReattach: typeof import('./aix.client.direct-chatGenerate').clientSideReattach | undefined = undefined;
if (aixAccess.clientSideFetch)
try {
clientSideChatGenerate = (await import('./aix.client.direct-chatGenerate')).clientSideChatGenerate;
const csfModule = await import('./aix.client.direct-chatGenerate');
clientSideChatGenerate = csfModule.clientSideChatGenerate;
clientSideReattach = csfModule.clientSideReattach;
} catch (error) {
throw new Error(`Direct connection unsuccessful: ${(error as any)?.message || 'unknown loading error'}`, { cause: error });
}
@@ -770,7 +816,7 @@ async function _aixChatGenerateContent_LL(
// Retry/Reconnect - LL state machine
// - reconnect: for server overload/busy (429, 503, 502) and transient errors
// - resume: for network disconnects with OpenAI Responses API handle
// - reattach: for network disconnects with uptream handles
const rsm = new AixStreamRetry(0, 0); // sensible: 3, 2
while (true) {
@@ -795,7 +841,7 @@ async function _aixChatGenerateContent_LL(
let particleStream: AsyncIterable<AixWire_Particles.ChatGenerateOp, void>;
// AIX [CSM] Direct Execution
// AIX [CSF] Direct Execution - fresh
if (!accumulator_LL.generator.upstreamHandle && clientSideChatGenerate)
particleStream = clientSideChatGenerate(
aixAccess,
@@ -818,7 +864,17 @@ async function _aixChatGenerateContent_LL(
connectionOptions: aixConnectionOptions,
}, { signal: abortSignal });
// AIX tRPC Streaming re-attachment from handle - for LL auto-resume
// AIX [CSF] Direct Reattach from handle - bypasses edge runtime, no 5-min timeout
else if (clientSideReattach)
particleStream = clientSideReattach(
aixAccess,
accumulator_LL.generator.upstreamHandle,
aixContext,
aixConnectionOptions,
abortSignal,
);
// AIX tRPC Streaming re-attachment from handle - for LL auto-reattach
else
particleStream = await apiStream.aix.reattachContent.mutate({
access: aixAccess,
@@ -867,7 +923,7 @@ async function _aixChatGenerateContent_LL(
const { errorType, errorMessage } = aixClassifyStreamingError(error, abortSignal.aborted, !!accumulator_LL.fragments.length);
const maybeErrorStatusCode = error?.status || error?.response?.status || undefined;
// client-side-retry decision - resume handle from accumulator determines strategy (resume vs reconnect)
// client-side-retry decision - reattach handle from accumulator determines strategy (reattach vs reconnect)
const shallRetry = rsm.shallRetry(errorType, maybeErrorStatusCode, !!accumulator_LL.generator.upstreamHandle);
if (shallRetry) {
+8 -8
View File
@@ -28,28 +28,28 @@ export const aixRouter = createTRPCRouter({
}))
.mutation(async function* ({ input, ctx }) {
const _d = _createDebugConfig(input.access, input.connectionOptions, input.context.name);
const chatGenerateDispatchCreator = () => createChatGenerateDispatch(input.access, input.model, input.chatGenerate, input.streaming, !!input.connectionOptions?.enableResumability);
const dispatchCreator = () => createChatGenerateDispatch(input.access, input.model, input.chatGenerate, input.streaming, !!input.connectionOptions?.enableResumability);
yield* executeChatGenerateWithContinuation(chatGenerateDispatchCreator, input.streaming, ctx.reqSignal, _d);
yield* executeChatGenerateWithContinuation(dispatchCreator, input.streaming, ctx.reqSignal, _d);
}),
/**
* Chat content generation RESUME, streaming only.
* Reconnects to an in-progress response by its ID - OpenAI Responses API only.
* Chat content generation - reattach to an in-progress upstream run by handle, streaming only.
* Today: OpenAI Responses API (network-disconnect recovery) and Gemini Interactions (Deep Research across reloads).
*/
reattachContent: edgeProcedure
.input(z.object({
access: AixWire_API.Access_schema,
resumeHandle: AixWire_API.ResumeHandle_schema, // resume has a handle instead of 'model + chatGenerate'
resumeHandle: AixWire_API.ResumeHandle_schema, // reattach uses a handle instead of 'model + chatGenerate'
context: AixWire_API.ContextChatGenerate_schema,
streaming: z.literal(true), // resume is always streaming
streaming: z.literal(true), // reattach is always streaming
connectionOptions: AixWire_API.ConnectionOptionsChatGenerate_schema.pick({ debugDispatchRequest: true }).optional(), // debugDispatchRequest
}))
.mutation(async function* ({ input, ctx }) {
const _d = _createDebugConfig(input.access, input.connectionOptions, input.context.name);
const resumeDispatchCreator = () => createChatGenerateResumeDispatch(input.access, input.resumeHandle, input.streaming);
const dispatchCreator = () => createChatGenerateResumeDispatch(input.access, input.resumeHandle, input.streaming);
yield* executeChatGenerateWithContinuation(resumeDispatchCreator, input.streaming, ctx.reqSignal, _d);
yield* executeChatGenerateWithContinuation(dispatchCreator, input.streaming, ctx.reqSignal, _d);
}),
});
@@ -0,0 +1,144 @@
import type * as z from 'zod/v4';
import type { AixAPI_Model, AixAPIChatGenerate_Request } from '../../../api/aix.wiretypes';
import { GeminiInteractionsWire_API_Interactions } from '../../wiretypes/gemini.interactions.wiretypes';
import { approxDocPart_To_String, approxInReferenceTo_To_XMLString, aixSpillSystemToUser } from './adapters.common';
type TRequestBody = z.infer<typeof GeminiInteractionsWire_API_Interactions.RequestBody_schema>;
type TTurn = z.infer<typeof GeminiInteractionsWire_API_Interactions.Turn_schema>;
/**
* MINIMAL - Build the POST /v1beta/interactions body for Deep Research agents.
*
* Scope:
* - Stateless multi-turn: the full `chatSequence` is flattened to role-tagged turns and sent as `input`.
* - `systemMessage` text (if any) is prepended to the first user turn, since the Interactions API for
* background agents does not accept a dedicated `system_instruction`.
* - Text-only content: doc parts are rendered via `approxDocPart_To_String`; in-reference-to XML is prepended to the user turn.
* - Model messages containing only tool invocations/responses/aux (no text) are dropped.
* - Non-text user parts (images, audio, cache-control) are silently dropped.
*/
export function aixToGeminiInteractionsCreate(model: AixAPI_Model, chatGenerateRaw: AixAPIChatGenerate_Request): TRequestBody {
// Normalize: move any 'spillable' system parts (e.g. images) into a synthetic user message up front
const chatGenerate = aixSpillSystemToUser(chatGenerateRaw);
// Extract leftover system text (to be prepended to the first user turn)
const systemPrefix = _collectSystemText(chatGenerate.systemMessage);
// Walk chatSequence -> turns
const turns: TTurn[] = [];
for (const msg of chatGenerate.chatSequence) {
if (msg.role === 'user') {
const content = _flattenUserParts(msg.parts);
if (content) turns.push({ role: 'user', content });
} else if (msg.role === 'model') {
const content = _flattenModelParts(msg.parts);
if (content) turns.push({ role: 'model', content });
}
}
if (!turns.length)
throw new Error('Gemini Interactions: no usable turns (Deep Research agents require at least one user message)');
// Prepend system prefix to the FIRST user turn (skip if none exists)
if (systemPrefix) {
const firstUserIdx = turns.findIndex(t => t.role === 'user');
if (firstUserIdx >= 0)
turns[firstUserIdx] = { role: 'user', content: `${systemPrefix}\n\n${turns[firstUserIdx].content}` };
}
// Sanity: the API expects the last turn to be 'user' (we're asking the model to respond)
if (turns[turns.length - 1].role !== 'user')
throw new Error('Gemini Interactions: last turn must be from user (chat sequence ended with a model message)');
// Simplify single-turn to string form (matches the Python/JS SDK convenience shape)
const input: TRequestBody['input'] = (turns.length === 1 && turns[0].role === 'user')
? turns[0].content
: turns;
// The API expects a bare agent id (no 'models/' prefix)
const agent = model.id.startsWith('models/') ? model.id.slice('models/'.length) : model.id;
return {
agent,
input,
background: true,
store: true, // API rejects store=false with background=true; the poller issues DELETE after terminal status
};
}
// -- part flattening --
function _collectSystemText(systemMessage: AixAPIChatGenerate_Request['systemMessage']): string {
if (!systemMessage?.parts?.length) return '';
const chunks: string[] = [];
for (const part of systemMessage.parts) {
switch (part.pt) {
case 'text':
chunks.push(part.text);
break;
case 'doc':
chunks.push(approxDocPart_To_String(part));
break;
case 'inline_image':
case 'meta_cache_control':
break; // silently drop
default:
const _exhaustive: never = part;
}
}
return chunks.join('\n').trim();
}
function _flattenUserParts(parts: Extract<AixAPIChatGenerate_Request['chatSequence'][number], { role: 'user' }>['parts']): string {
const chunks: string[] = [];
const prefixChunks: string[] = []; // in-reference-to goes before body
for (const part of parts) {
switch (part.pt) {
case 'text':
chunks.push(part.text);
break;
case 'doc':
chunks.push(approxDocPart_To_String(part));
break;
case 'meta_in_reference_to':
const irt = approxInReferenceTo_To_XMLString(part);
if (irt) prefixChunks.push(irt);
break;
case 'inline_image':
case 'meta_cache_control':
break; // unsupported here; dropped
default:
const _exhaustive: never = part;
}
}
return [...prefixChunks, ...chunks].join('\n\n').trim();
}
function _flattenModelParts(parts: Extract<AixAPIChatGenerate_Request['chatSequence'][number], { role: 'model' }>['parts']): string {
const chunks: string[] = [];
for (const part of parts) {
switch (part.pt) {
case 'text':
chunks.push(part.text);
break;
case 'inline_audio':
case 'inline_image':
case 'tool_invocation':
case 'tool_response':
case 'ma': // model aux (reasoning, etc.)
case 'meta_cache_control':
break; // drop non-text model output for Deep Research replays
default:
const _exhaustive: never = part;
}
}
return chunks.join('\n\n').trim();
}
@@ -9,9 +9,12 @@ import type { AixDemuxers } from '../stream.demuxers';
import { GeminiWire_API_Generate_Content } from '../wiretypes/gemini.wiretypes';
import { createGeminiInteractionsConnect, createGeminiInteractionsResumeConnect } from './connectors/gemini.interactionsPoller';
import { aixAnthropicHostedFeatures, aixToAnthropicMessageCreate } from './adapters/anthropic.messageCreate';
import { aixToBedrockConverse } from './adapters/bedrock.converse';
import { aixToGeminiGenerateContent } from './adapters/gemini.generateContent';
import { aixToGeminiInteractionsCreate } from './adapters/gemini.interactionsCreate';
import { aixToOpenAIChatCompletions } from './adapters/openai.chatCompletions';
import { aixToOpenAIResponses } from './adapters/openai.responsesCreate';
import { aixToXAIResponses } from './adapters/xai.responsesCreate';
@@ -21,6 +24,7 @@ import { createAnthropicFileInlineTransform } from './parsers/anthropic.transfor
import { createAnthropicMessageParser, createAnthropicMessageParserNS } from './parsers/anthropic.parser';
import { createBedrockConverseParserNS, createBedrockConverseStreamParser } from './parsers/bedrock-converse.parser';
import { createGeminiGenerateContentResponseParser } from './parsers/gemini.parser';
import { createGeminiInteractionsParser } from './parsers/gemini.interactions.parser';
import { createOpenAIChatCompletionsChunkParser, createOpenAIChatCompletionsParserNS } from './parsers/openai.parser';
import { createOpenAIResponseParserNS, createOpenAIResponsesEventParser } from './parsers/openai.responses.parser';
@@ -29,6 +33,8 @@ import { createOpenAIResponseParserNS, createOpenAIResponsesEventParser } from '
export type ChatGenerateDispatch = {
request: ChatGenerateDispatchRequest;
/** Used by dialects that need multi-step I/O. The returned response is consumed normally via demuxerFormat/chatGenerateParse */
customConnect?: (signal: AbortSignal) => Promise<Response>;
bodyTransform?: AixDemuxers.StreamBodyTransform;
demuxerFormat: AixDemuxers.StreamDemuxerFormat;
chatGenerateParse: ChatGenerateParseFunction;
@@ -162,9 +168,17 @@ export async function createChatGenerateDispatch(access: AixAPI_Access, model: A
}
case 'gemini':
/**
* [Gemini, 2025-04-17] For newer thinking parameters, use v1alpha (we only see statistically better results)
*/
const requestedModelName = model.id.replace('models/', '');
// [Gemini Interactions API - ALPHA TEST]
if (model.vndGeminiAPI === 'interactions-agent')
return {
// { request, customConnect }
...createGeminiInteractionsConnect(access, aixToGeminiInteractionsCreate(model, chatGenerate)),
demuxerFormat: 'fast-sse',
chatGenerateParse: createGeminiInteractionsParser(requestedModelName),
};
const useV1Alpha = false; // !!model.vndGeminiShowThoughts || model.vndGeminiThinkingBudget !== undefined;
return {
request: {
@@ -174,7 +188,7 @@ export async function createChatGenerateDispatch(access: AixAPI_Access, model: A
},
// we verified that 'fast-sse' works well with Gemini
demuxerFormat: streaming ? 'fast-sse' : null,
chatGenerateParse: createGeminiGenerateContentResponseParser(model.id.replace('models/', ''), streaming),
chatGenerateParse: createGeminiGenerateContentResponseParser(requestedModelName, streaming),
};
/**
@@ -265,8 +279,9 @@ export async function createChatGenerateDispatch(access: AixAPI_Access, model: A
/**
* Specializes to the correct vendor a request for resuming chat generation (OpenAI Responses API only).
* Constructs a GET request to retrieve and stream a response by its ID.
* Specializes to the correct vendor a request for reattaching to an in-progress upstream run.
* - OpenAI Responses API: GET /v1/responses/{id} to resume streaming from a response id
* - Gemini Interactions API: GET-poll /v1beta/interactions/{id} to resume a Deep Research run
*/
export async function createChatGenerateResumeDispatch(access: AixAPI_Access, resumeHandle: AixAPI_ResumeHandle, streaming: boolean): Promise<ChatGenerateDispatch> {
@@ -292,6 +307,17 @@ export async function createChatGenerateResumeDispatch(access: AixAPI_Access, re
chatGenerateParse: streaming ? createOpenAIResponsesEventParser() : createOpenAIResponseParserNS(),
};
case 'gemini':
// [Gemini Interactions] Reattach to an in-progress Deep Research interaction by polling GET.
if (resumeHandle.uht !== 'vnd.gem.interactions')
throw new Error(`Resume handle mismatch for gemini: expected 'vnd.gem.interactions', got '${resumeHandle.uht}'`);
return {
// { request, customConnect }
...createGeminiInteractionsResumeConnect(access, resumeHandle.runId /* Gemini interaction.id */),
demuxerFormat: 'fast-sse',
chatGenerateParse: createGeminiInteractionsParser(null /* model name unknown at resume time - caller's DMessage already has it */),
};
default:
const _exhaustiveCheck: never = dialect;
// fallthrough
@@ -299,7 +325,6 @@ export async function createChatGenerateResumeDispatch(access: AixAPI_Access, re
case 'anthropic':
case 'bedrock':
case 'deepseek':
case 'gemini':
case 'groq':
case 'lmstudio':
case 'localai':
@@ -51,7 +51,7 @@ export async function* executeChatGenerateDispatch(
dispatch.request.body = { ...dispatch.request.body, ..._d.requestBodyOverride };
// Connect to the dispatch (yields debug echo particles directly, bypasses particle transforms)
const dispatchResponse = yield* _connectToDispatch(dispatch.request, intakeAbortSignal, chatGenerateTx, _d);
const dispatchResponse = yield* _connectToDispatch(dispatch.request, dispatch.customConnect, intakeAbortSignal, chatGenerateTx, _d);
if (!dispatchResponse)
return; // exit: error already handled
@@ -103,6 +103,7 @@ export async function* executeChatGenerateDispatch(
*/
async function* _connectToDispatch(
request: ChatGenerateDispatchRequest,
customConnect: ((signal: AbortSignal) => Promise<Response>) | undefined,
intakeAbortSignal: AbortSignal,
chatGenerateTx: ChatGenerateTransmitter,
_d: AixDebugObject,
@@ -122,7 +123,8 @@ async function* _connectToDispatch(
// Blocking fetch with heartbeats - combats timeouts, for instance with long Anthropic requests (>25s on large requests for Opus 3 models)
_d.profiler?.measureStart('connect');
const connectionOperationCreator = () => fetchResponseOrTRPCThrow({
// [customConnect] dialects that need multi-step I/O (e.g. Gemini Interactions poll loop) own the connection
const connectionOperationCreator = customConnect ? () => customConnect(intakeAbortSignal) : () => fetchResponseOrTRPCThrow({
...request,
signal: intakeAbortSignal,
name: `Aix.${_d.prettyDialect}`,
@@ -0,0 +1,245 @@
import type * as z from 'zod/v4';
import { fetchJsonOrTRPCThrow, fetchResponseOrTRPCThrow } from '~/server/trpc/trpc.router.fetchers';
import type { GeminiAccessSchema } from '~/modules/llms/server/gemini/gemini.access';
import { geminiAccess } from '~/modules/llms/server/gemini/gemini.access';
import { ChatGenerateDispatch } from '../chatGenerate.dispatch';
import { GeminiInteractionsWire_API_Interactions } from '../../wiretypes/gemini.interactions.wiretypes';
// configuration
const INITIAL_POLL_DELAY_MS = 3_000; // first poll happens this long after the POST accepted the job
const STEADY_POLL_INTERVAL_MS = 10_000; // subsequent polls
const MAX_SLEEP_CHUNK_MS = 1_000; // wake often to honor abort promptly
type TRequestBody = z.infer<typeof GeminiInteractionsWire_API_Interactions.RequestBody_schema>;
type TInteraction = z.infer<typeof GeminiInteractionsWire_API_Interactions.Interaction_schema>;
/**
* Gemini Interactions API - Poll-to-Stream bridge (CREATE flow).
*
* Returns a `customConnect` function suitable for `ChatGenerateDispatch.customConnect`.
*
* Flow:
* 1. POST /v1beta/interactions with `background: true` - returns the interaction id (usually with status 'in_progress')
* 2. Return a `Response` whose body is a ReadableStream of SSE frames
* 3. Background producer polls GET /v1beta/interactions/{id} every few seconds until status is terminal
* 4. Each poll writes one SSE frame (the full Interaction JSON) - the parser diffs vs. prior state
* 5. DELETE only on natural terminal status (completed/failed/cancelled). On client abort or stream
* error we leave the interaction ALIVE upstream so the client can reattach across reloads via
* `createGeminiInteractionsResumeConnect`. Gemini auto-cleans after its retention window (1d free / 55d paid).
* WARNING: this trades off orphan risk for reattach viability - if the client never reattaches,
* the upstream agent keeps running and consuming tokens until the retention window expires.
*/
export function createGeminiInteractionsConnect(access: GeminiAccessSchema, body: TRequestBody): Pick<ChatGenerateDispatch, 'request' | 'customConnect'> {
// compute URL/headers once - exposed as `request` for debug echo, and reused by the POST
const { url, headers } = geminiAccess(access, null, GeminiInteractionsWire_API_Interactions.postPath, false);
const request = { url, headers, method: 'POST' as const, body };
const connect = async function (signal: AbortSignal): Promise<Response> {
// 1. Initial POST - reuses the same url/headers we expose via `request`
const initial = await _postInteractionAt(url, headers, body, signal);
// 2. Build the streaming response seeded with the initial snapshot
return _buildStreamingResponse(access, initial.id, initial, signal);
};
return { request, customConnect: connect };
}
/**
* Gemini Interactions API - Poll-to-Stream bridge (RESUME flow).
*
* Same shape as `createGeminiInteractionsConnect`, but skips the POST step and starts
* directly from GET-polling an existing interaction id. Used by `createChatGenerateResumeDispatch`
* when reattaching to an in-progress Deep Research run after a page reload.
*/
export function createGeminiInteractionsResumeConnect(access: GeminiAccessSchema, interactionId: string): Pick<ChatGenerateDispatch, 'request' | 'customConnect'> {
// compute URL/headers for debug echo - real I/O happens in customConnect below (GET-poll loop)
const requestDummy = {
...geminiAccess(access, null, GeminiInteractionsWire_API_Interactions.getPath(interactionId), false),
method: 'GET' as const,
};
const connect = async function (signal: AbortSignal): Promise<Response> {
// no POST - jump straight into polling this known interaction id
return _buildStreamingResponse(access, interactionId, null, signal);
};
return { request: requestDummy, customConnect: connect };
}
// --- Shared stream-building logic ---
function _buildStreamingResponse(
access: GeminiAccessSchema,
interactionId: string,
initial: TInteraction | null,
signal: AbortSignal,
): Response {
const encoder = new TextEncoder();
const stream = new ReadableStream<Uint8Array>({
start(controller) {
const emitSseFrame = (interaction: TInteraction) => {
const data = JSON.stringify(interaction);
controller.enqueue(encoder.encode(`data: ${data}\n\n`));
};
void (async () => {
// `store: true` is required by the API when background=true, so Gemini retains the interaction.
// WARNING: orphan risk - we only DELETE on natural terminal status. On abort/error we leave the
// interaction alive upstream so the client can reattach across reloads. If the client never
// comes back, the agent keeps running and consuming tokens until Gemini's retention window
// (1d free / 55d paid) auto-cleans it.
let shouldDelete = false;
try {
// First frame (create flow only): seed with the POST response
if (initial) {
emitSseFrame(initial);
if (_isTerminalStatus(initial.status)) {
shouldDelete = true;
controller.close();
return;
}
await _sleepOrAbort(INITIAL_POLL_DELAY_MS, signal);
}
// Polling loop
while (!signal.aborted) {
const snapshot = await _getInteraction(access, interactionId, signal);
emitSseFrame(snapshot);
if (_isTerminalStatus(snapshot.status)) {
shouldDelete = true;
controller.close();
return;
}
await _sleepOrAbort(STEADY_POLL_INTERVAL_MS, signal);
}
// aborted by client (reload or user cancel): close cleanly, do NOT delete so the client can reattach
controller.close();
} catch (err: any) {
// surfaces to the stream consumer; executor treats as 'dispatch-read' issue
// do NOT delete on error - the interaction may still be running upstream and the client may reattach
if (signal.aborted)
controller.close();
else
controller.error(err);
} finally {
if (shouldDelete)
void _deleteInteraction(access, interactionId).catch(() => { /* ignore */ });
}
})();
},
cancel(_reason) {
// the consumer cancelled - upstream abort will have fired too
},
});
return new Response(stream, {
status: 200,
headers: { 'content-type': 'text/event-stream; charset=utf-8' },
});
}
// --- HTTP helpers ---
//
// We use the unified `fetchJsonOrTRPCThrow` / `fetchResponseOrTRPCThrow` helpers from
// `~/server/trpc/trpc.router.fetchers`. They are isomorphic (CSF-safe), auto-check
// `response.ok`, and throw a well-shaped `TRPCFetcherError` with category + httpStatus.
//
// !! RETRY LANDMINE !! The outer executor wraps `customConnect` in
// `fetchWithAbortableConnectionRetry`, which retries on any TRPCFetcherError of
// category 'connection' or 'http' (502/503/429). Letting such an error propagate
// out of `customConnect` would re-execute the initial POST, creating a DUPLICATE
// Deep Research interaction server-side. So `_postInteraction` catches and rewraps
// TRPCFetcherError as plain Error (retrier ignores unknown errors).
//
// `_getInteraction` and `_deleteInteraction` run INSIDE the ReadableStream producer,
// which is beyond the retry boundary - their TRPCFetcherErrors are surfaced via
// `controller.error(err)` -> executor's `dispatch-read` path, and `safeErrorString`
// renders the .message nicely. So those can let TRPCFetcherError through.
async function _postInteractionAt(url: string, headers: HeadersInit, body: TRequestBody, signal: AbortSignal): Promise<TInteraction> {
let rawJson: Record<string, unknown>;
try {
rawJson = await fetchJsonOrTRPCThrow<Record<string, unknown>, TRequestBody>({
url, method: 'POST', headers, body,
signal, name: 'Gemini.Interactions.create', throwWithoutName: true,
});
} catch (error: any) {
// Preserve abort identity (name='TRPCFetcherError') so the executor's abort classifier
// can route this to the clean 'done-dispatch-aborted' path. The retrier already
// short-circuits on signal.aborted, so the duplicate-POST concern below doesn't apply.
if (signal.aborted) throw error;
// Rewrap TRPCFetcherError -> plain Error so the outer retry wrapper does NOT
// re-execute this customConnect (which would duplicate the Interaction).
throw new Error(`Gemini Interactions POST: ${error?.message || 'upstream error'}`);
}
return _validateInteraction(rawJson, 'POST');
}
async function _getInteraction(access: GeminiAccessSchema, interactionId: string, signal: AbortSignal): Promise<TInteraction> {
const { url, headers } = geminiAccess(access, null, GeminiInteractionsWire_API_Interactions.getPath(interactionId), false);
const rawJson = await fetchJsonOrTRPCThrow<Record<string, unknown>>({
url, method: 'GET', headers,
signal, name: 'Gemini.Interactions.get', throwWithoutName: true,
});
return _validateInteraction(rawJson, 'GET');
}
async function _deleteInteraction(access: GeminiAccessSchema, interactionId: string): Promise<void> {
// Best-effort: own short timeout, caller swallows any throw
const ac = new AbortController();
const timer = setTimeout(() => ac.abort(), 3_000);
try {
const { url, headers } = geminiAccess(access, null, GeminiInteractionsWire_API_Interactions.deletePath(interactionId), false);
await fetchResponseOrTRPCThrow({
url, method: 'DELETE', headers,
signal: ac.signal, name: 'Gemini.Interactions.delete', throwWithoutName: true,
});
} finally {
clearTimeout(timer);
}
}
function _validateInteraction(rawJson: unknown, method: 'POST' | 'GET'): TInteraction {
const parsed = GeminiInteractionsWire_API_Interactions.Interaction_schema.safeParse(rawJson);
if (!parsed.success)
throw new Error(`Gemini Interactions ${method}: unexpected response shape (${parsed.error.message})`);
return parsed.data;
}
// --- Small utils ---
function _isTerminalStatus(status: TInteraction['status']): boolean {
return status === 'completed' || status === 'failed' || status === 'cancelled';
}
async function _sleepOrAbort(totalMs: number, signal: AbortSignal): Promise<void> {
let remaining = totalMs;
while (remaining > 0 && !signal.aborted) {
const chunk = Math.min(remaining, MAX_SLEEP_CHUNK_MS);
await new Promise<void>(resolve => {
const t = setTimeout(() => { signal.removeEventListener('abort', onAbort); resolve(); }, chunk);
const onAbort = () => { clearTimeout(t); resolve(); };
signal.addEventListener('abort', onAbort, { once: true });
});
remaining -= chunk;
}
}
@@ -0,0 +1,233 @@
import type * as z from 'zod/v4';
import type { AixWire_Particles } from '../../../api/aix.wiretypes';
import type { ChatGenerateParseFunction } from '../chatGenerate.dispatch';
import type { IParticleTransmitter } from './IParticleTransmitter';
import { GeminiInteractionsWire_API_Interactions } from '../../wiretypes/gemini.interactions.wiretypes';
// Kill-switch: drop url_citation annotations - Deep Research ships opaque grounding-redirect URLs with no titles, and the text already contains a numbered source list.
const DISABLE_CITATIONS = true;
type TInteraction = z.infer<typeof GeminiInteractionsWire_API_Interactions.Interaction_schema>;
type TUsage = NonNullable<TInteraction['usage']>;
/**
* Gemini Interactions API parser (for Deep Research agents).
*
* Each SSE frame carries a *full* Interaction snapshot (from POST or from a GET poll).
* The parser diffs against prior state and emits only new content.
*
* Emission rules per output type:
* - `text` -> `pt.appendText(newSuffix)`. New url_citation annotations are emitted once.
* - `thought` -> `pt.appendReasoningText(newSuffix)`; signatures recorded via `setReasoningSignature`.
* - any other type -> ignored (Deep Research primarily emits text + thought).
*
* Part boundaries: when the output type at a given index changes kind (e.g. thought -> text),
* we call `endMessagePart()` so the transmitter flushes the previous part cleanly.
*/
export function createGeminiInteractionsParser(requestedModelName: string | null): ChatGenerateParseFunction {
const parserCreationTimestamp = Date.now();
let timeToFirstEvent: number | undefined;
// on resume we don't know the model name (the DMessage already has it) - skip emission
let modelNameSent = requestedModelName == null;
let upstreamHandleSent = false;
let operationOpId: string | null = null; // interaction id, set once; used to pair in-progress/done operation state
let operationOpenEmitted = false;
// per-index emission state (array index in `outputs[]`)
type EmittedState = {
kind: 'text' | 'thought' | 'other';
emittedTextLen: number;
emittedCitationKeys: Set<string>; // `${url}@${start}-${end}` to de-dupe
signatureSent: boolean;
};
const emitted: EmittedState[] = [];
let lastOpenIdx = -1; // index of the most recently opened part; -1 = none
return function parse(pt: IParticleTransmitter, rawEventData: string): void {
// model name is announced once (agents don't populate modelVersion the same way)
if (!modelNameSent && requestedModelName != null) {
pt.setModelName(requestedModelName);
modelNameSent = true;
}
// parse + validate
const parsed = GeminiInteractionsWire_API_Interactions.Interaction_schema.safeParse(JSON.parse(rawEventData));
if (!parsed.success)
throw new Error(`malformed interaction snapshot: ${parsed.error.message}`);
const interaction: TInteraction = parsed.data;
// emit the upstream handle on the first frame that has an id (enables reattach across reloads)
if (!upstreamHandleSent && interaction.id) {
pt.setUpstreamHandle(interaction.id, 'vnd.gem.interactions');
upstreamHandleSent = true;
}
// Operation state: give the UI a live progress indicator while the background agent runs.
// Pinned to the interaction id so the terminal 'done'/'error' replaces the same entry.
if (interaction.id && !operationOpId)
operationOpId = interaction.id;
if (operationOpId && !operationOpenEmitted && interaction.status === 'in_progress') {
pt.sendOperationState('search-web', 'Deep Research in progress...', { opId: operationOpId });
operationOpenEmitted = true;
}
// record time-to-first-content (first frame that carries outputs)
if (timeToFirstEvent === undefined && interaction.outputs && interaction.outputs.length > 0)
timeToFirstEvent = Date.now() - parserCreationTimestamp;
// process outputs (may be absent on early in_progress frames).
// Each raw output is classified via Zod safeParse against a discriminated union; unknown
// shapes fall through to `kind: 'other'` and are silently ignored.
const outputs = interaction.outputs ?? [];
for (let i = 0; i < outputs.length; i++) {
const classified = GeminiInteractionsWire_API_Interactions.KnownOutput_schema.safeParse(outputs[i]);
const kind: EmittedState['kind'] = !classified.success ? 'other' : classified.data.type;
// first time we see this index: initialize + flush previous part if switching kinds
let state = emitted[i];
if (!state) {
state = { kind, emittedTextLen: 0, emittedCitationKeys: new Set(), signatureSent: false };
emitted[i] = state;
// close previous part if we're opening a new index (natural part boundary)
if (lastOpenIdx !== -1 && lastOpenIdx !== i)
pt.endMessagePart();
lastOpenIdx = i;
}
if (!classified.success) continue; // 'other': ignored for now
const out = classified.data;
if (out.type === 'text') {
if (out.text.length > state.emittedTextLen) {
pt.appendText(out.text.slice(state.emittedTextLen));
state.emittedTextLen = out.text.length;
}
// url_citation annotations: loose-typed in Output_schema, validated per-item here
if (!DISABLE_CITATIONS && out.annotations) {
for (const annRaw of out.annotations) {
const annParse = GeminiInteractionsWire_API_Interactions.UrlCitationAnnotation_schema.safeParse(annRaw);
if (!annParse.success) continue; // not a url_citation (place_citation, file_citation, ...)
const ann = annParse.data;
const key = `${ann.url}@${ann.start_index ?? ''}-${ann.end_index ?? ''}`;
if (state.emittedCitationKeys.has(key)) continue;
state.emittedCitationKeys.add(key);
pt.appendUrlCitation(ann.title || ann.url, ann.url, undefined, ann.start_index, ann.end_index, undefined, undefined);
}
}
} else /* out.type === 'thought' */ {
const summary = out.summary ?? '';
if (summary.length > state.emittedTextLen) {
pt.appendReasoningText(summary.slice(state.emittedTextLen));
state.emittedTextLen = summary.length;
}
if (!state.signatureSent && out.signature) {
pt.setReasoningSignature(out.signature);
state.signatureSent = true;
}
}
}
// terminal states: flush current part and signal end
switch (interaction.status) {
case 'completed':
if (lastOpenIdx !== -1) pt.endMessagePart();
if (operationOpId)
pt.sendOperationState('search-web', 'Deep Research complete', { opId: operationOpId, state: 'done' });
_emitUsageMetrics(pt, interaction.usage, parserCreationTimestamp, timeToFirstEvent);
pt.setTokenStopReason('ok');
pt.setDialectEnded('done-dialect');
break;
case 'failed':
if (operationOpId)
pt.sendOperationState('search-web', 'Deep Research failed', { opId: operationOpId, state: 'error' });
_emitUsageMetrics(pt, interaction.usage, parserCreationTimestamp, timeToFirstEvent);
pt.setDialectTerminatingIssue('Deep Research interaction failed', null, 'srv-warn');
break;
case 'cancelled':
if (operationOpId)
pt.sendOperationState('search-web', 'Deep Research cancelled', { opId: operationOpId, state: 'done' });
_emitUsageMetrics(pt, interaction.usage, parserCreationTimestamp, timeToFirstEvent);
pt.setTokenStopReason('cg-issue');
pt.setDialectEnded('done-dialect');
break;
case 'requires_action':
// Not expected for Deep Research agents - fail loudly so we notice
if (operationOpId)
pt.sendOperationState('search-web', 'Deep Research needs action', { opId: operationOpId, state: 'error' });
pt.setDialectTerminatingIssue('Deep Research returned requires_action (not supported in this client)', null, 'srv-warn');
break;
case 'in_progress':
default:
// keep polling
break;
}
};
}
/**
* Map Gemini Interactions `usage` to `CGSelectMetrics`.
*
* Notes on the token model (per the Interactions API):
* - `total_input_tokens` counts the user-visible prompt input.
* - `total_cached_tokens` is a *subset* of input that was served from cache (we subtract to get "new" input).
* - `total_tool_use_tokens` is distinct from input/output/thought and accounts for internal tool calls
* (web search, code exec, etc.). For Deep Research this dominates - we fold it into TIn so displayed
* input reflects true model consumption; there is no dedicated slot in CGSelectMetrics today.
* - `total_output_tokens` excludes thought tokens; `gemini.parser.ts` (line 110) already adds TOutR into TOut
* for consistency, and we follow the same convention here.
*/
function _emitUsageMetrics(
pt: IParticleTransmitter,
usage: TUsage | undefined,
parserCreationTimestamp: number,
timeToFirstEvent: number | undefined,
): void {
if (!usage) return;
const m: AixWire_Particles.CGSelectMetrics = {};
const inputTokens = usage.total_input_tokens ?? 0;
const cachedTokens = usage.total_cached_tokens ?? 0;
const toolUseTokens = usage.total_tool_use_tokens ?? 0;
const outputTokens = usage.total_output_tokens ?? 0;
const thoughtTokens = usage.total_thought_tokens ?? 0;
// TIn = "new" input, i.e. prompt tokens beyond cache, plus tool-use tokens (folded in - no dedicated slot)
const newInput = Math.max(0, inputTokens - cachedTokens) + toolUseTokens;
if (newInput > 0) m.TIn = newInput;
if (cachedTokens > 0) m.TCacheRead = cachedTokens;
// TOut = output + thought (match gemini.parser.ts convention: candidatesTokenCount excludes thoughts)
const totalOut = outputTokens + thoughtTokens;
if (totalOut > 0) m.TOut = totalOut;
if (thoughtTokens > 0) m.TOutR = thoughtTokens;
// timing
const dtAll = Date.now() - parserCreationTimestamp;
m.dtAll = dtAll;
if (timeToFirstEvent !== undefined) {
m.dtStart = timeToFirstEvent;
const dtInner = dtAll - timeToFirstEvent;
if (dtInner > 0) {
m.dtInner = dtInner;
if (totalOut > 0)
m.vTOutInner = Math.round(100 * 1000 /*ms/s*/ * totalOut / dtInner) / 100;
}
}
pt.updateMetrics(m);
}
@@ -0,0 +1,125 @@
import * as z from 'zod/v4';
/**
* Gemini Interactions API wiretypes (Beta)
*
* 2026-04-21: NOTE - MINIMAL IMPL for DEEP RESEARCH AGENT
* Scope: only what the Deep Research agents need.
* - Stateless (no `previous_interaction_id`)
* - `store: true` - required by the API when `background: true` (Deep Research agents are background-only).
* We best-effort DELETE on completion/abort to minimize server-side retention.
* - Single-turn: last user text becomes `input`
* - No tools, no system_instruction, no thinking config
*
* Docs: https://ai.google.dev/gemini-api/docs/interactions
* https://ai.google.dev/api/interactions-api
*/
export namespace GeminiInteractionsWire_API_Interactions {
export const postPath = '/v1beta/interactions';
export const getPath = (id: string) => `/v1beta/interactions/${encodeURIComponent(id)}`;
export const deletePath = (id: string) => `/v1beta/interactions/${encodeURIComponent(id)}`;
// -- Request Body (POST /v1beta/interactions) --
// A turn in a stateless multi-turn conversation (when `input` is an array).
// Content is kept as a plain string for now; the API also accepts a list of content objects for multimodal.
export const Turn_schema = z.object({
role: z.enum(['user', 'model']),
content: z.string(),
});
export const RequestBody_schema = z.object({
agent: z.string(), // e.g. 'deep-research-pro-preview-12-2025' (note: we send bare id, without 'models/' prefix)
input: z.union([
z.string(), // single-turn convenience
z.array(Turn_schema), // stateless multi-turn history
]),
background: z.literal(true), // required for agents
store: z.literal(true), // required when background=true; we DELETE after completion to minimize retention
});
// -- Output blocks --
//
// The top-level Output_schema is fully permissive because Deep Research (in preview) sometimes
// emits blocks without a `type` field or in shapes not yet documented. Validating strictly on
// ingestion would blow up the entire stream on a single unknown variant.
//
// The *known* shapes are defined as sub-schemas below. The parser `safeParse`s each output
// against `KnownOutput_schema` and skips anything that doesn't match - no casts, no duck-typing.
export const Output_schema = z.looseObject({});
// -- Known output variants (parser uses these via safeParse) --
export const UrlCitationAnnotation_schema = z.looseObject({
type: z.literal('url_citation'),
url: z.string(),
title: z.string().optional(),
start_index: z.number().optional(),
end_index: z.number().optional(),
});
const TextOutput_schema = z.object({
type: z.literal('text'),
text: z.string(),
// annotations is a heterogeneous array (url_citation, place_citation, file_citation, ...) - we
// filter it later via `UrlCitationAnnotation_schema.safeParse` per annotation.
annotations: z.array(z.looseObject({ type: z.string() })).optional(),
});
const ThoughtOutput_schema = z.object({
type: z.literal('thought'),
summary: z.string().optional(), // may be absent on simple/minimal thinking
signature: z.string().optional(),
});
/** Discriminated union of output shapes we act on. Anything else: safeParse fails -> parser skips. */
export const KnownOutput_schema = z.discriminatedUnion('type', [
TextOutput_schema,
ThoughtOutput_schema,
]);
// -- Response: Create / Get --
export const Status_enum = z.enum([
'in_progress',
'completed',
'failed',
'cancelled',
'requires_action',
]);
// -- Usage (populated in the terminal frame) --
const UsageByModality_schema = z.object({
modality: z.string(), // 'text' | 'image' | 'audio' | ...
tokens: z.number(),
});
const Usage_schema = z.object({
total_tokens: z.number().optional(),
total_input_tokens: z.number().optional(),
total_cached_tokens: z.number().optional(),
total_output_tokens: z.number().optional(),
total_thought_tokens: z.number().optional(),
total_tool_use_tokens: z.number().optional(), // Deep Research: tokens consumed by internal tool calls (web search, etc.)
input_tokens_by_modality: z.array(UsageByModality_schema).optional(),
output_tokens_by_modality: z.array(UsageByModality_schema).optional(),
});
export const Interaction_schema = z.object({
id: z.string(),
status: Status_enum,
outputs: z.array(Output_schema).optional(), // absent until first content arrives
usage: Usage_schema.optional(), // populated in terminal frames (completed/failed/cancelled)
// We ignore model/agent echo for now
});
}