From 26ae3545a77547d2ceeeb669f6070ecd1c65b9fe Mon Sep 17 00:00:00 2001 From: Enrico Ros Date: Tue, 5 May 2026 04:14:00 -0700 Subject: [PATCH] BlockOpUpstreamResume: full recovery. Fixes #1088 --- src/apps/chat/components/ChatMessageList.tsx | 65 ++++++++--- .../message/BlockOpUpstreamResume.tsx | 101 ++++++++++-------- .../chat/components/message/ChatMessage.tsx | 12 ++- 3 files changed, 118 insertions(+), 60 deletions(-) diff --git a/src/apps/chat/components/ChatMessageList.tsx b/src/apps/chat/components/ChatMessageList.tsx index 020d4e594..33d8096f4 100644 --- a/src/apps/chat/components/ChatMessageList.tsx +++ b/src/apps/chat/components/ChatMessageList.tsx @@ -124,6 +124,15 @@ export function ChatMessageList(props: { } }, [conversationHandler, conversationId, onConversationExecuteHistory]); + + // Resume in-flight tracking - lives at this level (NOT inside BlockOpUpstreamResume) so it + // survives any remount of the message bubble during a long-running stream (e.g. Deep Research). + // - `resumeInFlight` (state) drives the loading/Detach UI on BlockOpUpstreamResume via props. + // - `resumeAbortersRef` (ref) holds the AbortController so Detach can abort even after a remount. + // Map keyed by messageId so multiple messages could in principle resume concurrently. + const [resumeInFlight, setResumeInFlight] = React.useState>({}); + const resumeAbortersRef = React.useRef>(new Map()); + const handleMessageUpstreamResume = React.useCallback(async (generator: DMessageGenerator, messageId: DMessageId, mode: AixReattachMode) => { if (!conversationId || !conversationHandler) return; if (!generator.upstreamHandle) throw new Error('No upstream handle on generator'); @@ -132,21 +141,36 @@ export function ChatMessageList(props: { const llmId = generator.mgt === 'aix' ? generator.aix.mId : undefined; if (!llmId) throw new Error('No model id on generator'); + const controller = new AbortController(); + resumeAbortersRef.current.set(messageId, controller); + setResumeInFlight(prev => ({ ...prev, [messageId]: mode })); + const { aixCreateChatGenerateContext, aixReattachContent_DMessage_orThrow } = await import('~/modules/aix/client/aix.client'); - const result = await aixReattachContent_DMessage_orThrow( - llmId, - generator, - aixCreateChatGenerateContext('conversation', conversationId), - mode, - { abortSignal: 'NON_ABORTABLE', throttleParallelThreads: 0 }, - async (update, isDone) => { - conversationHandler.messageEdit(messageId, { - fragments: update.fragments, - generator: update.generator, - pendingIncomplete: update.pendingIncomplete, - }, isDone, isDone); // remove the pending state and updte only when done - }, - ); + try { + await aixReattachContent_DMessage_orThrow( + llmId, + generator, + aixCreateChatGenerateContext('conversation', conversationId), + mode, + { abortSignal: controller.signal, throttleParallelThreads: 0 }, // Detach: aborting kills the local fetch; upstream run keeps going. + async (update, isDone) => { + conversationHandler.messageEdit(messageId, { + fragments: update.fragments, + generator: update.generator, + pendingIncomplete: update.pendingIncomplete, + }, isDone, isDone); // remove the pending state and update only when done + }, + ); + } finally { + // Clear local tracking only if this attempt is still the current one (avoid races on rapid retry) + if (resumeAbortersRef.current.get(messageId) === controller) + resumeAbortersRef.current.delete(messageId); + setResumeInFlight(prev => { + if (prev[messageId] !== mode) return prev; + const { [messageId]: _, ...rest } = prev; + return rest; + }); + } // Manual reattach is one-shot: on failure (e.g. upstream 404 from expired or already-consumed handle), // drop the upstreamHandle so the Resume button doesn't keep luring the user into the same error. @@ -158,6 +182,11 @@ export function ChatMessageList(props: { // }, false /* messageComplete */, true /* touch */); }, [conversationHandler, conversationId]); + const handleMessageUpstreamDetach = React.useCallback((messageId: DMessageId) => { + resumeAbortersRef.current.get(messageId)?.abort(); + }, []); + + const handleMessageUpstreamDelete = React.useCallback(async (generator: DMessageGenerator, messageId: DMessageId) => { if (!conversationId || !conversationHandler) return; if (!generator.upstreamHandle) throw new Error('No upstream handle on generator'); @@ -397,7 +426,11 @@ export function ChatMessageList(props: { {filteredMessages.map((message, idx) => { - // Optimization: only memo complete components, or we'd be memoizing garbage + // Optimization: only memo complete components, or we'd be memoizing garbage (fragments + // change every chunk during streaming, so the equality check would always fail). + // CAVEAT: switching between memo and non-memo at the same position causes React to + // remount the subtree (different component types). Any state that must survive that + // boundary lives on this component (e.g. resumeInFlight, resumeAbortersRef). const ChatMessageMemoOrNot = !message.pendingIncomplete ? ChatMessageMemo : ChatMessage; return props.isMessageSelectionMode ? ( @@ -429,7 +462,9 @@ export function ChatMessageList(props: { onMessageBranch={handleMessageBranch} onMessageContinue={handleMessageContinue} onMessageUpstreamResume={handleMessageUpstreamResume} + onMessageUpstreamDetach={handleMessageUpstreamDetach} onMessageUpstreamDelete={handleMessageUpstreamDelete} + upstreamResumeMode={resumeInFlight[message.id]} onMessageDelete={handleMessageDelete} onMessageFragmentAppend={handleMessageAppendFragment} onMessageFragmentDelete={handleMessageDeleteFragment} diff --git a/src/apps/chat/components/message/BlockOpUpstreamResume.tsx b/src/apps/chat/components/message/BlockOpUpstreamResume.tsx index 8fd341622..7fa64742d 100644 --- a/src/apps/chat/components/message/BlockOpUpstreamResume.tsx +++ b/src/apps/chat/components/message/BlockOpUpstreamResume.tsx @@ -3,6 +3,7 @@ import TimeAgo from 'react-timeago'; import { Box, Button, ButtonGroup, Tooltip, Typography } from '@mui/joy'; import DownloadIcon from '@mui/icons-material/Download'; +import LinkOffRoundedIcon from '@mui/icons-material/LinkOffRounded'; import PlayArrowRoundedIcon from '@mui/icons-material/PlayArrowRounded'; import StopRoundedIcon from '@mui/icons-material/StopRounded'; @@ -17,52 +18,65 @@ const ARM_TIMEOUT_MS = 4000; /** * Resume controls for an upstream-stored run. * - Resume: SSE replay (live deltas) - canonical path. Always offered when onResume exists. - * - Recover: one-shot JSON GET - shown only for vendors that benefit from it (see _NS_RECOVER_UHTS). - * - Cancel/Stop: terminate the upstream run (delete the resource). + * - Recover: one-shot JSON GET - shown only for vendors that benefit from it (Gemini Interactions). + * - Detach: abort the local fetch but leave the upstream run alive. Visible only when a resume + * is in-flight (`inFlightMode != null`). Resume/Recover stay available afterwards. + * - Stop: terminate the upstream run + delete the resource. * - * Single callback `onResume(streaming: boolean)` covers both Resume (true) and Recover (false). + * IMPORTANT: in-flight state is owned by the parent (`inFlightMode` + `onDetach`) so it survives + * remounts that happen while a long-running stream is active (e.g. Deep Research). */ export function BlockOpUpstreamResume(props: { upstreamHandle: Exclude, - pending?: boolean; // true while the message is actively streaming; labels the Delete button as "Stop" + pending?: boolean; // true iff a local in-flight op (initial POST or resume); drives the state machine + hides the expiry footer + inFlightMode?: AixReattachMode; // set by the parent while a resume is in flight; drives the loading/Detach UI onResume?: (mode: AixReattachMode) => void | Promise; + onDetach?: () => void; onDelete?: () => void | Promise; }) { - // state - separate flags so each button shows its own loading spinner - const [isReplaying, setIsReplaying] = React.useState(false); - const [isSnapshotting, setIsSnapshotting] = React.useState(false); - const [isCancelling, setIsCancelling] = React.useState(false); + // local state - only for short-lived ops the parent doesn't own const [isDeleting, setIsDeleting] = React.useState(false); const [deleteArmed, setDeleteArmed] = React.useState(false); const [error, setError] = React.useState(null); // expiration: boolean is evaluated at render (may lag briefly if nothing re-renders past expiry). - // TimeAgo handles its own tick for the label; the button's disabled state is the only consumer of this flag. const { expiresAt /*, runId = ''*/ } = props.upstreamHandle; - // const isExpired = expiresAt != null && Date.now() > expiresAt; - // self-gate: show "Recover" only for Gemini Interactions (SSE can hang while the JSON resource stays fetchable). - // Other vendors recover via SSE replay alone. See kb/modules/LLM-gemini-interactions.md. - const showRecover = !!props.onResume && props.upstreamHandle.uht === 'vnd.gem.interactions'; + // State machine - mutually exclusive triplet (idle | initial-POST | resume | recover): + // - Idle : !pending - run not active locally (incl. post-reload, since + // chats.converters.ts clears pendingIncomplete on hydrate). + // - Initial POST : pending && !inFlightMode - first generation streaming. + // - Resume replay : pending && mode='replay' - we own this resume cycle. + // - Recover snap : pending && mode='snapshot' - we own this snapshot fetch. + // + // Visibility matrix (see BlockOpUpstreamResume props doc): + // Resume Recover Detach Cancel + // Idle ✅ ✅¹ — ✅ + // Initial POST — — — ✅ + // Resume in flight — — ✅ ✅ + // Recover in flight — ✅² — — + // ¹ only for Gemini Interactions ² with loading spinner + const isReplaying = props.inFlightMode === 'replay'; + const isSnapshotting = props.inFlightMode === 'snapshot'; + const isIdle = !props.pending; + + const canRecoverVendor = props.upstreamHandle.uht === 'vnd.gem.interactions'; + const showResume = isIdle && !!props.onResume; + const showRecover = (isIdle || isSnapshotting) && !!props.onResume && canRecoverVendor; + const showDetach = isReplaying && !!props.onDetach; + const showCancel = !isSnapshotting && !!props.onDelete; // handlers - const handleResume = React.useCallback(async (mode: AixReattachMode) => { + const handleResume = React.useCallback((mode: AixReattachMode) => { if (!props.onResume) return; - const setBusy = mode === 'replay' ? setIsReplaying : setIsSnapshotting; setError(null); - setBusy(true); - try { - await props.onResume(mode); - } catch (err: any) { - setError(err?.message || (mode === 'replay' ? 'Resume failed' : 'Recover failed')); - } finally { - setBusy(false); - } + // fire-and-forget: parent owns the promise lifecycle and the abort controller. + // If it rejects, the parent surfaces the error via its own UI; we stay silent. + Promise.resolve(props.onResume(mode)).catch(() => { /* parent handles */ }); }, [props]); - // Two-click arm: first click arms (visible red "Confirm?"), second click (within ARM_TIMEOUT_MS) executes. const handleDelete = React.useCallback(async () => { if (!props.onDelete) return; @@ -89,17 +103,6 @@ export function BlockOpUpstreamResume(props: { return () => clearTimeout(t); }, [deleteArmed]); - // Disabled-state policy: - // - Resume / Recover: disabled while ANY operation is in flight (mutually exclusive with each - // other AND with the short ops, to avoid double-firing). - // - Cancel: disabled only by the SHORT ops it would race with (not by Resume - cancelling a - // long-running stream from local is meaningful even if the stream isn't blocking). - // - Stop/Delete: only the short ops gate it. NOT gated by Resume - this is the user's escape - // hatch for hung Resume/Recover; locking it would defeat the entire "stuck stream" UX. - const inFlightAny = isReplaying || isSnapshotting || isCancelling || isDeleting; - const inFlightShort = isCancelling || isDeleting; - - return ( - {props.onResume && ( + {showResume && ( + + )} + + {showCancel && ( + )} diff --git a/src/apps/chat/components/message/ChatMessage.tsx b/src/apps/chat/components/message/ChatMessage.tsx index 7d6def891..306298181 100644 --- a/src/apps/chat/components/message/ChatMessage.tsx +++ b/src/apps/chat/components/message/ChatMessage.tsx @@ -163,7 +163,9 @@ export function ChatMessage(props: { onMessageBranch?: (messageId: string) => void, onMessageContinue?: (messageId: string, continueText: null | string) => void, onMessageUpstreamResume?: (generator: DMessageGenerator, messageId: string, mode: AixReattachMode) => Promise, + onMessageUpstreamDetach?: (messageId: string) => void, onMessageUpstreamDelete?: (generator: DMessageGenerator, messageId: string) => Promise, + upstreamResumeMode?: AixReattachMode, // set by parent while a resume is in flight on this message onMessageDelete?: (messageId: string) => void, onMessageFragmentAppend?: (messageId: DMessageId, fragment: DMessageFragment) => void onMessageFragmentDelete?: (messageId: DMessageId, fragmentId: DMessageFragmentId) => void, @@ -248,7 +250,7 @@ export function ChatMessage(props: { // const wordsDiff = useWordsDifference(textSubject, props.diffPreviousText, showDiff); - const { onMessageAssistantFrom, onMessageDelete, onMessageFragmentAppend, onMessageFragmentDelete, onMessageFragmentReplace, onMessageContinue, onMessageUpstreamResume, onMessageUpstreamDelete } = props; + const { onMessageAssistantFrom, onMessageDelete, onMessageFragmentAppend, onMessageFragmentDelete, onMessageFragmentReplace, onMessageContinue, onMessageUpstreamResume, onMessageUpstreamDetach, onMessageUpstreamDelete } = props; const handleFragmentNew = React.useCallback(() => { onMessageFragmentAppend?.(messageId, createTextContentFragment('')); @@ -271,6 +273,10 @@ export function ChatMessage(props: { return onMessageUpstreamResume?.(messageGenerator, messageId, mode); }, [messageGenerator, messageId, onMessageUpstreamResume]); + const handleUpstreamDetach = React.useCallback(() => { + onMessageUpstreamDetach?.(messageId); + }, [messageId, onMessageUpstreamDetach]); + const handleUpstreamDelete = React.useCallback(() => { if (!messageGenerator) return; return onMessageUpstreamDelete?.(messageGenerator, messageId); @@ -904,7 +910,9 @@ export function ChatMessage(props: { )}