AIX: Demuxers: add recovery of incomplete buffers and server-side logging

This commit is contained in:
Enrico Ros
2026-01-30 14:23:29 -08:00
parent 12203daa22
commit 207e257778
3 changed files with 192 additions and 16 deletions
@@ -195,6 +195,7 @@ async function* _consumeDispatchStream(
// Stream... -> Events[] (& yield heartbeats)
let demuxedEvents: AixDemuxers.DemuxedEvent[] = [];
let isFinalIteration = false;
try {
// 1. Blocking read with heartbeats
@@ -205,20 +206,32 @@ async function* _consumeDispatchStream(
// Handle normal dispatch stream closure (no more data, AI Service closed the stream)
if (done) {
chatGenerateTx.setEnded('done-dispatch-closed');
break; // outer do {}
// we used to `chatGenerateTx.setEnded('done-dispatch-closed');` here and break out of the processing loop,
// but there may be recoverable events in the demuxer's buffer
isFinalIteration = true;
// 2. Decode nothing new
// 3. Recover leftover events
_d.profiler?.measureStart('demux');
demuxedEvents = dispatchDemuxer.flushRemaining();
_d.profiler?.measureEnd('demux');
} else {
// 2. Decode the chunk - does Not throw (see the constructor for why)
_d.profiler?.measureStart('decode');
const chunk = dispatchDecoder.decode(value, { stream: true });
_d.profiler?.measureEnd('decode');
// 3. Demux the chunk into 0 or more events
_d.profiler?.measureStart('demux');
demuxedEvents = dispatchDemuxer.demux(chunk);
_d.profiler?.measureEnd('demux');
}
// 2. Decode the chunk - does Not throw (see the constructor for why)
_d.profiler?.measureStart('decode');
const chunk = dispatchDecoder.decode(value, { stream: true });
_d.profiler?.measureEnd('decode');
// 3. Demux the chunk into 0 or more events
_d.profiler?.measureStart('demux');
demuxedEvents = dispatchDemuxer.demux(chunk);
_d.profiler?.measureEnd('demux');
} catch (error: any) {
// Handle expected dispatch stream abortion - nothing to do, as the intake is already closed
// TODO: check if 'AbortError' is also a cause. Seems like ResponseAborted is NextJS vs signal driven.
@@ -276,5 +289,11 @@ async function* _consumeDispatchStream(
}
}
// 6. Normal stream end - if didn't end in a connection or event parsing error
if (isFinalIteration && !chatGenerateTx.isEnded) {
chatGenerateTx.setEnded('done-dispatch-closed');
break; // outer do {}
}
} while (!chatGenerateTx.isEnded);
}
@@ -192,9 +192,134 @@ export function createFastEventSourceDemuxer(): AixDemuxers.StreamDemuxer {
return events;
},
remaining: () => buffer,
/**
* Recover events from unflushed buffer at stream end.
* Some servers don't send proper \n\n terminators between SSE events, causing
* data to accumulate without dispatching. This applies lenient parsing to
* extract JSON payloads from the remaining buffer.
*/
flushRemaining: (): AixDemuxers.DemuxedEvent[] => {
// capture pending event type before clearing state
const pendingEventType = eventType || undefined;
// combine both sources: eventData first (no prefixes), then buffer (raw SSE)
const combined = ((eventData ? eventData + '\n' : '') + buffer).trim();
// clear state
buffer = '';
eventData = '';
eventType = '';
eolSearchIndex = 0;
if (!combined)
return [];
// extract JSON events with lenient parsing
const { events, skippedParts } = _extractJsonEventsFromSSE(combined, pendingEventType);
// warn about recovery for protocol debugging
if (events.length > 0 || skippedParts.length > 0)
console.warn(`[AIX] SSE demuxer: recovered ${events.length} event(s) from unterminated stream`, {
skippedParts: skippedParts.length,
combinedLen: combined.length,
combinedSample: combined.length <= 300 ? combined : combined.slice(0, 300) + '...',
...(skippedParts.length > 0 && { skipped: skippedParts }),
});
return events;
},
reconnectInterval: () => streamReconnectTime,
lastEventId: () => streamLastEventId,
};
}
/**
* Extract complete JSON objects from SSE buffer remnants.
* Handles `event:` and `data:` prefixes, tracks brace depth and string context.
*/
function _extractJsonEventsFromSSE(input: string, initialEventName: string | undefined): {
events: AixDemuxers.DemuxedEvent[];
skippedParts: string[];
} {
const events: AixDemuxers.DemuxedEvent[] = [];
const skippedParts: string[] = [];
let i = 0;
let currentEventName = initialEventName;
while (i < input.length) {
// skip whitespace
while (i < input.length && /\s/.test(input[i])) i++;
if (i >= input.length) break;
// check for `event:` line (SSE event name, used by Anthropic format)
if (input.slice(i, i + 6) === 'event:') {
i += 6;
while (i < input.length && input[i] === ' ') i++;
const lineEnd = input.indexOf('\n', i);
currentEventName = input.slice(i, lineEnd === -1 ? undefined : lineEnd).trim();
i = lineEnd === -1 ? input.length : lineEnd + 1;
continue;
}
// check for `data:` prefix and skip it
if (input.slice(i, i + 5) === 'data:') {
i += 5;
while (i < input.length && input[i] === ' ') i++;
// check for [DONE] marker
if (input.slice(i, i + 6) === '[DONE]') {
i += 6;
continue;
}
}
// look for start of JSON object
if (input[i] !== '{') {
const lineEnd = input.indexOf('\n', i);
const skipped = input.slice(i, lineEnd === -1 ? undefined : lineEnd).trim();
if (skipped && skipped !== '[DONE]')
skippedParts.push(skipped.length > 100 ? skipped.slice(0, 100) + '...' : skipped);
i = lineEnd === -1 ? input.length : lineEnd + 1;
continue;
}
// parse JSON object by tracking brace depth and string context
const start = i;
let depth = 0;
let inString = false;
let escaped = false;
while (i < input.length) {
const char = input[i];
if (escaped) {
escaped = false;
} else if (char === '\\' && inString) {
escaped = true;
} else if (char === '"') {
inString = !inString;
} else if (!inString) {
if (char === '{') depth++;
else if (char === '}') {
depth--;
if (depth === 0) {
events.push({ type: 'event', name: currentEventName, data: input.slice(start, i + 1) });
currentEventName = undefined; // reset for next event
i++;
break;
}
}
}
i++;
}
// incomplete JSON at end
if (depth !== 0) {
const partial = input.slice(start);
skippedParts.push(partial.length > 100 ? partial.slice(0, 100) + '...' : partial);
break;
}
}
return { events, skippedParts };
}
@@ -36,7 +36,11 @@ export namespace AixDemuxers {
export type StreamDemuxer = {
demux: (chunk: string) => DemuxedEvent[];
remaining: () => string;
/**
* Attempt to recover events from unflushed buffer data at stream end.
* @returns Recovered events, or empty array if nothing to recover
*/
flushRemaining: () => DemuxedEvent[];
// unused, but may be provided by some demuxers
lastEventId?: () => string | undefined; // not used for now - SSE defines it for the stream
@@ -67,7 +71,35 @@ function _createJsonNlDemuxer(): AixDemuxers.StreamDemuxer {
}));
},
remaining: () => buffer,
flushRemaining: (): AixDemuxers.DemuxedEvent[] => {
const remaining = buffer.trim();
buffer = '';
if (!remaining) return [];
const events: AixDemuxers.DemuxedEvent[] = [];
const skippedLines: string[] = [];
// recover by splitting and finding potential "{ .. }" lines
for (const rawLine of remaining.split('\n')) {
const line = rawLine.trim();
if (!line) continue;
if (line.startsWith('{') && line.endsWith('}'))
events.push({ type: 'event', data: line });
else
skippedLines.push(line.length > 100 ? line.slice(0, 100) + '...' : line);
}
// warn about recovery for protocol debugging
if (events.length > 0 || skippedLines.length > 0)
console.warn(`[AIX] JSON-NL demuxer: recovered ${events.length} event(s) from unterminated stream`, {
skippedLines: skippedLines.length,
bufferLen: remaining.length,
bufferSample: remaining.length <= 200 ? remaining : remaining.slice(0, 200) + '...',
...(skippedLines.length > 0 && { skipped: skippedLines }),
});
return events;
},
};
}
@@ -77,5 +109,5 @@ const _nullStreamDemuxerWarn: AixDemuxers.StreamDemuxer = {
console.warn('Null demuxer called - shall not happen, as it is only created in non-streaming');
return [];
},
remaining: () => '',
flushRemaining: () => [],
};