BlockOpUpstreamResume: full recovery. Fixes #1088

This commit is contained in:
Enrico Ros
2026-05-05 04:14:00 -07:00
parent 0001f7392b
commit 26ae3545a7
3 changed files with 118 additions and 60 deletions
+50 -15
View File
@@ -124,6 +124,15 @@ export function ChatMessageList(props: {
}
}, [conversationHandler, conversationId, onConversationExecuteHistory]);
// Resume in-flight tracking - lives at this level (NOT inside BlockOpUpstreamResume) so it
// survives any remount of the message bubble during a long-running stream (e.g. Deep Research).
// - `resumeInFlight` (state) drives the loading/Detach UI on BlockOpUpstreamResume via props.
// - `resumeAbortersRef` (ref) holds the AbortController so Detach can abort even after a remount.
// Map keyed by messageId so multiple messages could in principle resume concurrently.
const [resumeInFlight, setResumeInFlight] = React.useState<Record<DMessageId, AixReattachMode>>({});
const resumeAbortersRef = React.useRef<Map<DMessageId, AbortController>>(new Map());
const handleMessageUpstreamResume = React.useCallback(async (generator: DMessageGenerator, messageId: DMessageId, mode: AixReattachMode) => {
if (!conversationId || !conversationHandler) return;
if (!generator.upstreamHandle) throw new Error('No upstream handle on generator');
@@ -132,21 +141,36 @@ export function ChatMessageList(props: {
const llmId = generator.mgt === 'aix' ? generator.aix.mId : undefined;
if (!llmId) throw new Error('No model id on generator');
const controller = new AbortController();
resumeAbortersRef.current.set(messageId, controller);
setResumeInFlight(prev => ({ ...prev, [messageId]: mode }));
const { aixCreateChatGenerateContext, aixReattachContent_DMessage_orThrow } = await import('~/modules/aix/client/aix.client');
const result = await aixReattachContent_DMessage_orThrow(
llmId,
generator,
aixCreateChatGenerateContext('conversation', conversationId),
mode,
{ abortSignal: 'NON_ABORTABLE', throttleParallelThreads: 0 },
async (update, isDone) => {
conversationHandler.messageEdit(messageId, {
fragments: update.fragments,
generator: update.generator,
pendingIncomplete: update.pendingIncomplete,
}, isDone, isDone); // remove the pending state and updte only when done
},
);
try {
await aixReattachContent_DMessage_orThrow(
llmId,
generator,
aixCreateChatGenerateContext('conversation', conversationId),
mode,
{ abortSignal: controller.signal, throttleParallelThreads: 0 }, // Detach: aborting kills the local fetch; upstream run keeps going.
async (update, isDone) => {
conversationHandler.messageEdit(messageId, {
fragments: update.fragments,
generator: update.generator,
pendingIncomplete: update.pendingIncomplete,
}, isDone, isDone); // remove the pending state and update only when done
},
);
} finally {
// Clear local tracking only if this attempt is still the current one (avoid races on rapid retry)
if (resumeAbortersRef.current.get(messageId) === controller)
resumeAbortersRef.current.delete(messageId);
setResumeInFlight(prev => {
if (prev[messageId] !== mode) return prev;
const { [messageId]: _, ...rest } = prev;
return rest;
});
}
// Manual reattach is one-shot: on failure (e.g. upstream 404 from expired or already-consumed handle),
// drop the upstreamHandle so the Resume button doesn't keep luring the user into the same error.
@@ -158,6 +182,11 @@ export function ChatMessageList(props: {
// }, false /* messageComplete */, true /* touch */);
}, [conversationHandler, conversationId]);
const handleMessageUpstreamDetach = React.useCallback((messageId: DMessageId) => {
resumeAbortersRef.current.get(messageId)?.abort();
}, []);
const handleMessageUpstreamDelete = React.useCallback(async (generator: DMessageGenerator, messageId: DMessageId) => {
if (!conversationId || !conversationHandler) return;
if (!generator.upstreamHandle) throw new Error('No upstream handle on generator');
@@ -397,7 +426,11 @@ export function ChatMessageList(props: {
{filteredMessages.map((message, idx) => {
// Optimization: only memo complete components, or we'd be memoizing garbage
// Optimization: only memo complete components, or we'd be memoizing garbage (fragments
// change every chunk during streaming, so the equality check would always fail).
// CAVEAT: switching between memo and non-memo at the same position causes React to
// remount the subtree (different component types). Any state that must survive that
// boundary lives on this component (e.g. resumeInFlight, resumeAbortersRef).
const ChatMessageMemoOrNot = !message.pendingIncomplete ? ChatMessageMemo : ChatMessage;
return props.isMessageSelectionMode ? (
@@ -429,7 +462,9 @@ export function ChatMessageList(props: {
onMessageBranch={handleMessageBranch}
onMessageContinue={handleMessageContinue}
onMessageUpstreamResume={handleMessageUpstreamResume}
onMessageUpstreamDetach={handleMessageUpstreamDetach}
onMessageUpstreamDelete={handleMessageUpstreamDelete}
upstreamResumeMode={resumeInFlight[message.id]}
onMessageDelete={handleMessageDelete}
onMessageFragmentAppend={handleMessageAppendFragment}
onMessageFragmentDelete={handleMessageDeleteFragment}
@@ -3,6 +3,7 @@ import TimeAgo from 'react-timeago';
import { Box, Button, ButtonGroup, Tooltip, Typography } from '@mui/joy';
import DownloadIcon from '@mui/icons-material/Download';
import LinkOffRoundedIcon from '@mui/icons-material/LinkOffRounded';
import PlayArrowRoundedIcon from '@mui/icons-material/PlayArrowRounded';
import StopRoundedIcon from '@mui/icons-material/StopRounded';
@@ -17,52 +18,65 @@ const ARM_TIMEOUT_MS = 4000;
/**
* Resume controls for an upstream-stored run.
* - Resume: SSE replay (live deltas) - canonical path. Always offered when onResume exists.
* - Recover: one-shot JSON GET - shown only for vendors that benefit from it (see _NS_RECOVER_UHTS).
* - Cancel/Stop: terminate the upstream run (delete the resource).
* - Recover: one-shot JSON GET - shown only for vendors that benefit from it (Gemini Interactions).
* - Detach: abort the local fetch but leave the upstream run alive. Visible only when a resume
* is in-flight (`inFlightMode != null`). Resume/Recover stay available afterwards.
* - Stop: terminate the upstream run + delete the resource.
*
* Single callback `onResume(streaming: boolean)` covers both Resume (true) and Recover (false).
* IMPORTANT: in-flight state is owned by the parent (`inFlightMode` + `onDetach`) so it survives
* remounts that happen while a long-running stream is active (e.g. Deep Research).
*/
export function BlockOpUpstreamResume(props: {
upstreamHandle: Exclude<DMessageGenerator['upstreamHandle'], undefined>,
pending?: boolean; // true while the message is actively streaming; labels the Delete button as "Stop"
pending?: boolean; // true iff a local in-flight op (initial POST or resume); drives the state machine + hides the expiry footer
inFlightMode?: AixReattachMode; // set by the parent while a resume is in flight; drives the loading/Detach UI
onResume?: (mode: AixReattachMode) => void | Promise<void>;
onDetach?: () => void;
onDelete?: () => void | Promise<void>;
}) {
// state - separate flags so each button shows its own loading spinner
const [isReplaying, setIsReplaying] = React.useState(false);
const [isSnapshotting, setIsSnapshotting] = React.useState(false);
const [isCancelling, setIsCancelling] = React.useState(false);
// local state - only for short-lived ops the parent doesn't own
const [isDeleting, setIsDeleting] = React.useState(false);
const [deleteArmed, setDeleteArmed] = React.useState(false);
const [error, setError] = React.useState<string | null>(null);
// expiration: boolean is evaluated at render (may lag briefly if nothing re-renders past expiry).
// TimeAgo handles its own tick for the label; the button's disabled state is the only consumer of this flag.
const { expiresAt /*, runId = ''*/ } = props.upstreamHandle;
// const isExpired = expiresAt != null && Date.now() > expiresAt;
// self-gate: show "Recover" only for Gemini Interactions (SSE can hang while the JSON resource stays fetchable).
// Other vendors recover via SSE replay alone. See kb/modules/LLM-gemini-interactions.md.
const showRecover = !!props.onResume && props.upstreamHandle.uht === 'vnd.gem.interactions';
// State machine - mutually exclusive triplet (idle | initial-POST | resume | recover):
// - Idle : !pending - run not active locally (incl. post-reload, since
// chats.converters.ts clears pendingIncomplete on hydrate).
// - Initial POST : pending && !inFlightMode - first generation streaming.
// - Resume replay : pending && mode='replay' - we own this resume cycle.
// - Recover snap : pending && mode='snapshot' - we own this snapshot fetch.
//
// Visibility matrix (see BlockOpUpstreamResume props doc):
// Resume Recover Detach Cancel
// Idle ✅ ✅¹ — ✅
// Initial POST — — — ✅
// Resume in flight — — ✅ ✅
// Recover in flight — ✅² — —
// ¹ only for Gemini Interactions ² with loading spinner
const isReplaying = props.inFlightMode === 'replay';
const isSnapshotting = props.inFlightMode === 'snapshot';
const isIdle = !props.pending;
const canRecoverVendor = props.upstreamHandle.uht === 'vnd.gem.interactions';
const showResume = isIdle && !!props.onResume;
const showRecover = (isIdle || isSnapshotting) && !!props.onResume && canRecoverVendor;
const showDetach = isReplaying && !!props.onDetach;
const showCancel = !isSnapshotting && !!props.onDelete;
// handlers
const handleResume = React.useCallback(async (mode: AixReattachMode) => {
const handleResume = React.useCallback((mode: AixReattachMode) => {
if (!props.onResume) return;
const setBusy = mode === 'replay' ? setIsReplaying : setIsSnapshotting;
setError(null);
setBusy(true);
try {
await props.onResume(mode);
} catch (err: any) {
setError(err?.message || (mode === 'replay' ? 'Resume failed' : 'Recover failed'));
} finally {
setBusy(false);
}
// fire-and-forget: parent owns the promise lifecycle and the abort controller.
// If it rejects, the parent surfaces the error via its own UI; we stay silent.
Promise.resolve(props.onResume(mode)).catch(() => { /* parent handles */ });
}, [props]);
// Two-click arm: first click arms (visible red "Confirm?"), second click (within ARM_TIMEOUT_MS) executes.
const handleDelete = React.useCallback(async () => {
if (!props.onDelete) return;
@@ -89,17 +103,6 @@ export function BlockOpUpstreamResume(props: {
return () => clearTimeout(t);
}, [deleteArmed]);
// Disabled-state policy:
// - Resume / Recover: disabled while ANY operation is in flight (mutually exclusive with each
// other AND with the short ops, to avoid double-firing).
// - Cancel: disabled only by the SHORT ops it would race with (not by Resume - cancelling a
// long-running stream from local is meaningful even if the stream isn't blocking).
// - Stop/Delete: only the short ops gate it. NOT gated by Resume - this is the user's escape
// hatch for hung Resume/Recover; locking it would defeat the entire "stuck stream" UX.
const inFlightAny = isReplaying || isSnapshotting || isCancelling || isDeleting;
const inFlightShort = isCancelling || isDeleting;
return (
<Box
sx={{
@@ -111,11 +114,10 @@ export function BlockOpUpstreamResume(props: {
}}
>
<ButtonGroup>
{props.onResume && (
{showResume && (
<Tooltip title='Resume by re-streaming from the upstream run'>
<Button
disabled={inFlightAny}
loading={isReplaying}
disabled={isDeleting}
startDecorator={<PlayArrowRoundedIcon color='success' />}
onClick={() => handleResume('replay')}
>
@@ -127,8 +129,9 @@ export function BlockOpUpstreamResume(props: {
{showRecover && (
<Tooltip title='Fetch the result without streaming - recovers stuck or hung runs'>
<Button
disabled={inFlightAny}
disabled={isDeleting}
loading={isSnapshotting}
loadingPosition='start'
startDecorator={<DownloadIcon />}
onClick={() => handleResume('snapshot')}
>
@@ -137,17 +140,29 @@ export function BlockOpUpstreamResume(props: {
</Tooltip>
)}
{props.onDelete && (
<Tooltip title={deleteArmed ? 'Click again to confirm - cancels the run upstream (no resume after)' : (props.pending ? 'Stop this response and cancel the upstream run' : 'Cancel the upstream run')}>
{showDetach && (
<Tooltip title='Close this connection only - the upstream run keeps going. Click Resume or Recover later to fetch results.'>
<Button
disabled={isDeleting}
startDecorator={<LinkOffRoundedIcon />}
onClick={props.onDetach}
>
Detach
</Button>
</Tooltip>
)}
{showCancel && (
<Tooltip title={deleteArmed ? 'Click again to confirm - cancels the upstream run and clears the handle' : 'Cancel the upstream run'}>
<Button
loading={isDeleting}
color={deleteArmed ? 'danger' : 'neutral'}
variant={deleteArmed ? 'solid' : 'outlined'}
startDecorator={<StopRoundedIcon />}
onClick={handleDelete}
disabled={inFlightShort}
disabled={isDeleting}
>
{deleteArmed ? 'Confirm?' : (props.pending ? 'Stop' : 'Cancel')}
{deleteArmed ? 'Confirm?' : 'Cancel'}
</Button>
</Tooltip>
)}
@@ -163,7 +163,9 @@ export function ChatMessage(props: {
onMessageBranch?: (messageId: string) => void,
onMessageContinue?: (messageId: string, continueText: null | string) => void,
onMessageUpstreamResume?: (generator: DMessageGenerator, messageId: string, mode: AixReattachMode) => Promise<void>,
onMessageUpstreamDetach?: (messageId: string) => void,
onMessageUpstreamDelete?: (generator: DMessageGenerator, messageId: string) => Promise<void>,
upstreamResumeMode?: AixReattachMode, // set by parent while a resume is in flight on this message
onMessageDelete?: (messageId: string) => void,
onMessageFragmentAppend?: (messageId: DMessageId, fragment: DMessageFragment) => void
onMessageFragmentDelete?: (messageId: DMessageId, fragmentId: DMessageFragmentId) => void,
@@ -248,7 +250,7 @@ export function ChatMessage(props: {
// const wordsDiff = useWordsDifference(textSubject, props.diffPreviousText, showDiff);
const { onMessageAssistantFrom, onMessageDelete, onMessageFragmentAppend, onMessageFragmentDelete, onMessageFragmentReplace, onMessageContinue, onMessageUpstreamResume, onMessageUpstreamDelete } = props;
const { onMessageAssistantFrom, onMessageDelete, onMessageFragmentAppend, onMessageFragmentDelete, onMessageFragmentReplace, onMessageContinue, onMessageUpstreamResume, onMessageUpstreamDetach, onMessageUpstreamDelete } = props;
const handleFragmentNew = React.useCallback(() => {
onMessageFragmentAppend?.(messageId, createTextContentFragment(''));
@@ -271,6 +273,10 @@ export function ChatMessage(props: {
return onMessageUpstreamResume?.(messageGenerator, messageId, mode);
}, [messageGenerator, messageId, onMessageUpstreamResume]);
const handleUpstreamDetach = React.useCallback(() => {
onMessageUpstreamDetach?.(messageId);
}, [messageId, onMessageUpstreamDetach]);
const handleUpstreamDelete = React.useCallback(() => {
if (!messageGenerator) return;
return onMessageUpstreamDelete?.(messageGenerator, messageId);
@@ -904,7 +910,9 @@ export function ChatMessage(props: {
<BlockOpUpstreamResume
upstreamHandle={messageGenerator.upstreamHandle}
pending={messagePendingIncomplete}
onResume={(!messagePendingIncomplete && onMessageUpstreamResume) ? handleUpstreamResume : undefined}
inFlightMode={props.upstreamResumeMode}
onResume={onMessageUpstreamResume ? handleUpstreamResume : undefined}
onDetach={onMessageUpstreamDetach ? handleUpstreamDetach : undefined}
onDelete={onMessageUpstreamDelete ? handleUpstreamDelete : undefined}
/>
)}