LLMs/AIX: Gemini: Agentic models: recovery mode (non-streaming). Fixes #1088

This commit is contained in:
Enrico Ros
2026-05-05 03:18:45 -07:00
parent 6858b0b94a
commit 901d93b5f0
6 changed files with 288 additions and 35 deletions
+3 -1
View File
@@ -6,6 +6,7 @@ import { Box, List } from '@mui/joy';
import type { SystemPurposeExample } from '../../../data';
import type { AixReattachMode } from '~/modules/aix/client/aix.client';
import type { DiagramConfig } from '~/modules/aifn/digrams/DiagramsModal';
import { speakText } from '~/modules/speex/speex.client';
@@ -123,7 +124,7 @@ export function ChatMessageList(props: {
}
}, [conversationHandler, conversationId, onConversationExecuteHistory]);
const handleMessageUpstreamResume = React.useCallback(async (generator: DMessageGenerator, messageId: DMessageId) => {
const handleMessageUpstreamResume = React.useCallback(async (generator: DMessageGenerator, messageId: DMessageId, mode: AixReattachMode) => {
if (!conversationId || !conversationHandler) return;
if (!generator.upstreamHandle) throw new Error('No upstream handle on generator');
@@ -136,6 +137,7 @@ export function ChatMessageList(props: {
llmId,
generator,
aixCreateChatGenerateContext('conversation', conversationId),
mode,
{ abortSignal: 'NON_ABORTABLE', throttleParallelThreads: 0 },
async (update, isDone) => {
conversationHandler.messageEdit(messageId, {
@@ -2,9 +2,12 @@ import * as React from 'react';
import TimeAgo from 'react-timeago';
import { Box, Button, ButtonGroup, Tooltip, Typography } from '@mui/joy';
import DownloadIcon from '@mui/icons-material/Download';
import PlayArrowRoundedIcon from '@mui/icons-material/PlayArrowRounded';
import StopRoundedIcon from '@mui/icons-material/StopRounded';
import type { AixReattachMode } from '~/modules/aix/client/aix.client';
import type { DMessageGenerator } from '~/common/stores/chat/chat.message';
@@ -12,18 +15,24 @@ const ARM_TIMEOUT_MS = 4000;
/**
* FIXME: COMPLETE THIS
* Resume controls for an upstream-stored run.
* - Resume: SSE replay (live deltas) - canonical path. Always offered when onResume exists.
* - Recover: one-shot JSON GET - shown only for vendors that benefit from it (see _NS_RECOVER_UHTS).
* - Cancel/Stop: terminate the upstream run (delete the resource).
*
* Single callback `onResume(streaming: boolean)` covers both Resume (true) and Recover (false).
*/
export function BlockOpUpstreamResume(props: {
upstreamHandle: Exclude<DMessageGenerator['upstreamHandle'], undefined>,
pending?: boolean; // true while the message is actively streaming; labels the Delete button as "Stop"
onResume?: () => void | Promise<void>;
onResume?: (mode: AixReattachMode) => void | Promise<void>;
onCancel?: () => void | Promise<void>;
onDelete?: () => void | Promise<void>;
}) {
// state
const [isResuming, setIsResuming] = React.useState(false);
// state - separate flags so each button shows its own loading spinner
const [isReplaying, setIsReplaying] = React.useState(false);
const [isSnapshotting, setIsSnapshotting] = React.useState(false);
const [isCancelling, setIsCancelling] = React.useState(false);
const [isDeleting, setIsDeleting] = React.useState(false);
const [deleteArmed, setDeleteArmed] = React.useState(false);
@@ -34,18 +43,23 @@ export function BlockOpUpstreamResume(props: {
const { expiresAt /*, runId = ''*/ } = props.upstreamHandle;
// const isExpired = expiresAt != null && Date.now() > expiresAt;
// self-gate: show "Recover" only for Gemini Interactions (SSE can hang while the JSON resource stays fetchable).
// Other vendors recover via SSE replay alone. See kb/modules/LLM-gemini-interactions.md.
const showRecover = !!props.onResume && props.upstreamHandle.uht === 'vnd.gem.interactions';
// handlers
const handleResume = React.useCallback(async () => {
const handleResume = React.useCallback(async (mode: AixReattachMode) => {
if (!props.onResume) return;
const setBusy = mode === 'replay' ? setIsReplaying : setIsSnapshotting;
setError(null);
setIsResuming(true);
setBusy(true);
try {
await props.onResume();
await props.onResume(mode);
} catch (err: any) {
setError(err?.message || 'Resume failed');
setError(err?.message || (mode === 'replay' ? 'Resume failed' : 'Recover failed'));
} finally {
setIsResuming(false);
setBusy(false);
}
}, [props]);
@@ -88,6 +102,16 @@ export function BlockOpUpstreamResume(props: {
return () => clearTimeout(t);
}, [deleteArmed]);
// Disabled-state policy:
// - Resume / Recover: disabled while ANY operation is in flight (mutually exclusive with each
// other AND with the short ops, to avoid double-firing).
// - Cancel: disabled only by the SHORT ops it would race with (not by Resume - cancelling a
// long-running stream from local is meaningful even if the stream isn't blocking).
// - Stop/Delete: only the short ops gate it. NOT gated by Resume - this is the user's escape
// hatch for hung Resume/Recover; locking it would defeat the entire "stuck stream" UX.
const inFlightAny = isReplaying || isSnapshotting || isCancelling || isDeleting;
const inFlightShort = isCancelling || isDeleting;
return (
<Box
@@ -101,22 +125,35 @@ export function BlockOpUpstreamResume(props: {
>
<ButtonGroup>
{props.onResume && (
<Tooltip title='Resume generation from last checkpoint'>
<Tooltip title='Resume by re-streaming from the upstream run'>
<Button
disabled={isResuming || isCancelling || isDeleting}
loading={isResuming}
disabled={inFlightAny}
loading={isReplaying}
startDecorator={<PlayArrowRoundedIcon color='success' />}
onClick={handleResume}
onClick={() => handleResume('replay')}
>
Resume
</Button>
</Tooltip>
)}
{showRecover && (
<Tooltip title='Fetch the result without streaming - recovers stuck or hung runs'>
<Button
disabled={inFlightAny}
loading={isSnapshotting}
startDecorator={<DownloadIcon />}
onClick={() => handleResume('snapshot')}
>
Recover
</Button>
</Tooltip>
)}
{props.onCancel && (
<Tooltip title='Cancel the response generation'>
<Button
disabled={isResuming || isCancelling || isDeleting}
disabled={inFlightShort}
loading={isCancelling}
// startDecorator={<CancelIcon />}
onClick={handleCancel}
@@ -134,7 +171,7 @@ export function BlockOpUpstreamResume(props: {
variant={deleteArmed ? 'solid' : 'outlined'}
startDecorator={<StopRoundedIcon />}
onClick={handleDelete}
disabled={isCancelling || isDeleting}
disabled={inFlightShort}
>
{deleteArmed ? 'Confirm?' : (props.pending ? 'Stop' : 'Cancel')}
</Button>
@@ -29,6 +29,7 @@ import VerticalAlignBottomIcon from '@mui/icons-material/VerticalAlignBottom';
import VisibilityIcon from '@mui/icons-material/Visibility';
import VisibilityOffIcon from '@mui/icons-material/VisibilityOff';
import type { AixReattachMode } from '~/modules/aix/client/aix.client';
import { ModelVendorAnthropic } from '~/modules/llms/vendors/anthropic/anthropic.vendor';
import { AnthropicIcon } from '~/common/components/icons/vendors/AnthropicIcon';
@@ -161,7 +162,7 @@ export function ChatMessage(props: {
onMessageBeam?: (messageId: string) => Promise<void>,
onMessageBranch?: (messageId: string) => void,
onMessageContinue?: (messageId: string, continueText: null | string) => void,
onMessageUpstreamResume?: (generator: DMessageGenerator, messageId: string) => Promise<void>,
onMessageUpstreamResume?: (generator: DMessageGenerator, messageId: string, mode: AixReattachMode) => Promise<void>,
onMessageUpstreamDelete?: (generator: DMessageGenerator, messageId: string) => Promise<void>,
onMessageDelete?: (messageId: string) => void,
onMessageFragmentAppend?: (messageId: DMessageId, fragment: DMessageFragment) => void
@@ -265,9 +266,9 @@ export function ChatMessage(props: {
onMessageContinue?.(messageId, continueText);
}, [messageId, onMessageContinue]);
const handleUpstreamResume = React.useCallback(() => {
const handleUpstreamResume = React.useCallback((mode: AixReattachMode) => {
if (!messageGenerator) return;
return onMessageUpstreamResume?.(messageGenerator, messageId);
return onMessageUpstreamResume?.(messageGenerator, messageId, mode);
}, [messageGenerator, messageId, onMessageUpstreamResume]);
const handleUpstreamDelete = React.useCallback(() => {
+17 -9
View File
@@ -645,22 +645,30 @@ function _finalizeLlmMetricsWithCosts(cgMetricsLg: undefined | DMetricsChatGener
// --- L2 - Content Generation reattachment as DMessage ---
/**
* Reattach mode selects how to reconstruct an in-progress upstream run:
* - 'replay' - canonical: SSE replays the event sequence from the start. Live deltas reach
* the UI as the run progresses (or as past content is replayed).
* - 'snapshot' - one-shot JSON GET returns the resource as-is right now. Used to recover when
* the SSE endpoint is broken upstream but the resource itself is still readable.
*
* Names describe what you get, not how. See `kb/modules/LLM-gemini-interactions.md` for failure modes.
*/
export type AixReattachMode = 'replay' | 'snapshot';
/**
* Reattach facade: wraps `aixChatGenerateContent_DMessage_orThrow` for the reattach-to-upstream flow.
* - Validates the generator carries an `upstreamHandle`
* - Stubs the unused chat-generate request, and
* - Seeds the base function so the LL's reattach branch fires.
*
* On an in-progress upstream run (Gemini Deep Research today, extensible to OAI Responses), the server
* just needs the handle to GET-poll; no chat-generate body is needed. This facade:
* - validates the generator carries an `upstreamHandle`,
* - stubs the chat-generate request (unused on the reattach path - the server uses the handle),
* - seeds the base function via `clientOptions.reattachGenerator` so the LL's reattach branch fires.
*
* The reassembler starts with empty fragments; since Gemini Interactions snapshots are cumulative,
* the stream will rebuild the complete content from scratch. Any partial content from the original run is replaced.
* The reassembler replaces content on reattach (Gemini Interactions snapshots are cumulative, so this rebuilds from scratch).
*/
export async function aixReattachContent_DMessage_orThrow(
llmId: DLLMId,
reattachGenerator: Readonly<DMessageGenerator>,
aixContext: AixAPI_Context_ChatGenerate,
mode: AixReattachMode,
clientOptions: Pick<AixClientOptions, 'abortSignal' | 'throttleParallelThreads'>,
onStreamingUpdate?: (update: AixChatGenerateContent_DMessageGuts, isDone: boolean) => MaybePromise<void>,
): Promise<_AixChatGenerateContent_DMessageGuts_WithOutcome> {
@@ -675,7 +683,7 @@ export async function aixReattachContent_DMessage_orThrow(
llmId,
stubChatGenerate,
aixContext,
true, // streaming
mode === 'replay', // wire-level: SSE demuxer (replay) vs one-shot JSON body (snapshot)
{ ...clientOptions, reattachGenerator: reattachGenerator as any /* guaranteed by the check */ },
onStreamingUpdate,
);
@@ -25,7 +25,7 @@ import { createAnthropicFileInlineTransform } from './parsers/anthropic.transfor
import { createAnthropicMessageParser, createAnthropicMessageParserNS } from './parsers/anthropic.parser';
import { createBedrockConverseParserNS, createBedrockConverseStreamParser } from './parsers/bedrock-converse.parser';
import { createGeminiGenerateContentResponseParser } from './parsers/gemini.parser';
import { createGeminiInteractionsParserSSE } from './parsers/gemini.interactions.parser';
import { createGeminiInteractionsParserNS, createGeminiInteractionsParserSSE } from './parsers/gemini.interactions.parser';
import { createOpenAIChatCompletionsChunkParser, createOpenAIChatCompletionsParserNS } from './parsers/openai.parser';
import { createOpenAIResponseParserNS, createOpenAIResponsesEventParser } from './parsers/openai.responses.parser';
@@ -329,16 +329,16 @@ export async function createChatGenerateResumeDispatch(access: AixAPI_Access, re
};
case 'gemini': {
// [Gemini Interactions] Reattach via SSE stream - GET /interactions/{id}?stream=true replays all events from the start (intentional - client's ContentReassembler replaces message content on reattach; partial resume via last_event_id is deliberately NOT used).
// [Gemini Interactions] Reattach: SSE replay (?stream=true) or JSON snapshot (no query). See kb/modules/LLM-gemini-interactions.md.
if (resumeHandle.uht !== 'vnd.gem.interactions')
throw new Error(`Resume handle mismatch for gemini: expected 'vnd.gem.interactions', got '${resumeHandle.uht}'`);
if (!streaming) console.warn(`[DEV] Gemini Interactions API - Resume only supported in SSE mode, ignoring streaming=false for ${resumeHandle.runId}`);
const { url: _baseUrl, headers: _headers } = geminiAccess(access, null, GeminiInteractionsWire_API_Interactions.getPath(resumeHandle.runId /* Gemini interaction.id */), false);
return {
request: { url: `${_baseUrl}${_baseUrl.includes('?') ? '&' : '?'}stream=true`, method: 'GET', headers: _headers },
/** Again, only support SSE here, for now (see comment in `createChatGenerateDispatch`) */
demuxerFormat: 'fast-sse',
chatGenerateParse: createGeminiInteractionsParserSSE(null /* model name unknown at resume time - caller's DMessage already has it */),
request: { url: streaming ? `${_baseUrl}${_baseUrl.includes('?') ? '&' : '?'}stream=true` : _baseUrl, method: 'GET', headers: _headers },
demuxerFormat: streaming ? 'fast-sse' : null,
chatGenerateParse: streaming
? createGeminiInteractionsParserSSE(null /* model name unknown at resume time - caller's DMessage already has it */)
: createGeminiInteractionsParserNS(null),
};
}
@@ -241,6 +241,192 @@ export function createGeminiInteractionsParserSSE(requestedModelName: string | n
}
/**
* Non-streaming parser: reads the GET /v1beta/interactions/{id} JSON body once and emits the same
* particles the SSE parser would, in a single batch.
*
* Used by the "Recover" path when SSE delivery is broken upstream (10-min cuts; see KB doc) but the
* resource is still fetchable. We always re-emit the upstream handle so failed/in_progress runs
* remain retryable; only `status: completed` clears it (via the reassembler's outcome=='completed' policy).
*
* See `kb/modules/LLM-gemini-interactions.md` for failure modes and recovery model.
*/
export function createGeminiInteractionsParserNS(requestedModelName: string | null): ChatGenerateParseFunction {
const parserCreationTimestamp = Date.now();
return function parse(pt: IParticleTransmitter, rawEventData: string, _eventName?: string): void {
// model name (preserved from caller's DMessage on resume; first-call only on fresh fetches)
if (requestedModelName != null)
pt.setModelName(requestedModelName);
// parse + validate against the Interaction resource schema (looseObject - tolerant to upstream additions)
let rawJson: unknown;
try {
rawJson = JSON.parse(rawEventData);
} catch (e: any) {
throw new Error(`malformed Interaction JSON: ${e?.message || String(e)}`);
}
const parsed = GeminiInteractionsWire_API_Interactions.Interaction_schema.safeParse(rawJson);
if (!parsed.success) {
console.warn('[GeminiInteractions-NS] unexpected Interaction shape:', rawJson);
throw new Error('Gemini Interactions: unexpected resource shape (no `id`/`status` fields)');
}
const interaction = parsed.data;
// upstream handle - preserve so user can retry / delete
pt.setUpstreamHandle(interaction.id, 'vnd.gem.interactions');
// Walk outputs in order. Each output is loose; we safeParse against KnownOutput_schema and
// silently skip INTERNAL_OUTPUT_TYPES (tool calls/results). Order matters - thoughts and
// text interleave in the report and the user reads them top-to-bottom.
const outputs = interaction.outputs ?? [];
let lastEmittedKind: 'thought' | 'text' | 'image' | 'audio' | null = null;
for (const rawOut of outputs) {
const outType = (rawOut as { type?: string })?.type;
// silent-skip internal tool-call outputs (matches SSE parser policy for INTERNAL_OUTPUT_TYPES)
if (outType && GeminiInteractionsWire_API_Interactions.INTERNAL_OUTPUT_TYPES.has(outType))
continue;
const knownOut = GeminiInteractionsWire_API_Interactions.KnownOutput_schema.safeParse(rawOut);
if (!knownOut.success) {
if (outType) console.warn('[GeminiInteractions-NS] unknown output type, skipping:', outType);
continue;
}
// emit a part boundary when switching kinds, mirrors SSE behavior on content.start across indices
if (lastEmittedKind !== null && lastEmittedKind !== knownOut.data.type)
pt.endMessagePart();
switch (knownOut.data.type) {
case 'thought': {
const summary = knownOut.data.summary;
if (typeof summary === 'string') {
if (summary) pt.appendReasoningText(summary);
} else if (Array.isArray(summary)) {
for (const item of summary)
if (item.text) pt.appendReasoningText(item.text);
}
if (knownOut.data.signature)
pt.setReasoningSignature(knownOut.data.signature);
lastEmittedKind = 'thought';
break;
}
case 'text': {
if (knownOut.data.text)
pt.appendText(knownOut.data.text);
// Citations: matches SSE policy - DISABLE_CITATIONS kill-switch dictates Deep Research drops them
if (!DISABLE_CITATIONS && knownOut.data.annotations) {
for (const annRaw of knownOut.data.annotations) {
const ann = GeminiInteractionsWire_API_Interactions.UrlCitationAnnotation_schema.safeParse(annRaw);
if (!ann.success) continue;
const a = ann.data;
pt.appendUrlCitation(a.title || a.url, a.url, undefined, a.start_index, a.end_index, undefined, undefined);
}
}
lastEmittedKind = 'text';
break;
}
case 'image': {
if (knownOut.data.data && knownOut.data.mime_type)
pt.appendImageInline(knownOut.data.mime_type, knownOut.data.data, 'Gemini Generated Image', 'Gemini', '', true);
else if (knownOut.data.uri)
pt.appendText(`\n[Image: ${knownOut.data.uri}]\n`);
lastEmittedKind = 'image';
break;
}
case 'audio': {
if (knownOut.data.data && knownOut.data.mime_type) {
const mime = knownOut.data.mime_type.toLowerCase();
const isPCM = mime.startsWith('audio/l16') || mime.includes('codec=pcm');
if (isPCM) {
try {
const wav = geminiConvertPCM2WAV(knownOut.data.mime_type, knownOut.data.data);
pt.appendAudioInline(wav.mimeType, wav.base64Data, 'Gemini Generated Audio', 'Gemini', wav.durationMs);
} catch (error) {
console.warn('[GeminiInteractions-NS] audio PCM convert failed:', error);
}
} else {
pt.appendAudioInline(knownOut.data.mime_type, knownOut.data.data, 'Gemini Generated Audio', 'Gemini', 0);
}
}
lastEmittedKind = 'audio';
break;
}
default: {
const _exhaustive: never = knownOut.data;
break;
}
}
}
// close out any open part before the terminal status emission
if (lastEmittedKind !== null) pt.endMessagePart();
// Terminal status -> stop reason + dialect end (mirrors _handleInteractionComplete)
switch (interaction.status) {
case 'completed':
_emitUsageMetrics(pt, interaction.usage, parserCreationTimestamp, undefined);
pt.setTokenStopReason('ok');
pt.setDialectEnded('done-dialect');
break;
case 'failed':
_emitUsageMetrics(pt, interaction.usage, parserCreationTimestamp, undefined);
pt.setDialectTerminatingIssue('Deep Research interaction failed', null, 'srv-warn');
break;
case 'cancelled':
_emitUsageMetrics(pt, interaction.usage, parserCreationTimestamp, undefined);
pt.setTokenStopReason('cg-issue');
pt.setDialectEnded('done-dialect');
break;
case 'incomplete':
pt.appendText('\n_Response incomplete (run stopped early)._\n');
_emitUsageMetrics(pt, interaction.usage, parserCreationTimestamp, undefined);
pt.setTokenStopReason('out-of-tokens');
pt.setDialectEnded('done-dialect');
break;
case 'requires_action':
pt.setDialectTerminatingIssue('Deep Research returned requires_action (not supported in this client)', null, 'srv-warn');
break;
case 'in_progress': {
// Two scenarios both surface as `in_progress`:
// 1) Run is genuinely live server-side (just slow) - polling later will yield content.
// 2) "Zombie": the generator crashed but the status never transitioned. Stays `in_progress`
// for days with no outputs. Not recoverable - the only remedy is delete + retry.
// We can't disambiguate from one frame, so we surface {created, updated, outputs.length}
// and let the user decide. `tokenStopReason='cg-issue'` keeps the upstream handle alive
// (vs 'ok' which would clear it via the reassembler's clean-completion policy).
// see kb/modules/LLM-gemini-interactions.md#failure-modes (C)
const elapsedMin = _minutesSince(interaction.created);
const updatedMin = _minutesSince(interaction.updated);
const outCount = (interaction.outputs ?? []).length;
const lines: string[] = ['\n_Deep Research run is **`in_progress`** server-side._\n'];
if (elapsedMin != null) lines.push(`- Started: **${_humanDuration(elapsedMin)} ago**`);
if (updatedMin != null && updatedMin !== elapsedMin) lines.push(`- Last server update: **${_humanDuration(updatedMin)} ago**`);
lines.push(`- Outputs so far: **${outCount === 0 ? 'none' : outCount}**`);
// Heuristic threshold: stale-and-empty for >60 min is almost certainly a zombie.
const looksStuck = outCount === 0 && elapsedMin != null && elapsedMin > 60;
if (looksStuck)
lines.push('\nThis run looks **stuck** (no content for over an hour). Click **Cancel** to delete it and try again.');
else
lines.push('\nTry **Recover** again in a few minutes; if it stays empty, click **Cancel** to delete and retry.');
pt.appendText(lines.join('\n') + '\n');
pt.setTokenStopReason('cg-issue');
pt.setDialectEnded('done-dialect');
break;
}
default: {
const _exhaustiveCheck: never = interaction.status;
console.warn('[GeminiInteractions-NS] unreachable status', interaction.status);
break;
}
}
};
}
// --- helpers ---
function _classifyContentKind(rawType: unknown): BlockState['kind'] {
@@ -370,3 +556,22 @@ function _emitUsageMetrics(
pt.updateMetrics(m);
}
/** Minutes elapsed between an upstream ISO 8601 timestamp and now. Returns null on parse failure. */
function _minutesSince(iso: string | undefined | null): number | null {
if (!iso) return null;
const ms = Date.parse(iso);
if (!Number.isFinite(ms)) return null;
return Math.max(0, (Date.now() - ms) / 60_000);
}
/** Human-readable elapsed-time string for in_progress diagnostic messages. */
function _humanDuration(minutes: number): string {
if (minutes < 1) return 'less than a minute';
if (minutes < 60) return `${Math.round(minutes)} min`;
const hours = minutes / 60;
if (hours < 24) return `${Math.round(hours * 10) / 10} hours`;
const days = hours / 24;
return `${Math.round(days * 10) / 10} days`;
}