AIX: require clean connection ends. #980

This commit is contained in:
Enrico Ros
2026-02-12 16:30:38 -08:00
parent dd5b7cb8c2
commit facffbc6c8
9 changed files with 128 additions and 85 deletions
+4 -4
View File
@@ -96,7 +96,7 @@ export class ContentReassembler {
console.log('-> aix.p: abort-client'); console.log('-> aix.p: abort-client');
// NOTE: this doesn't go to the debugger anymore - as we only publish external particles to the debugger // NOTE: this doesn't go to the debugger anymore - as we only publish external particles to the debugger
await this.#reassembleParticle({ cg: 'end', reason: 'abort-client', tokenStopReason: 'client-abort-signal' }); await this.#reassembleParticle({ cg: 'end', terminationReason: 'abort-client', tokenStopReason: 'client-abort-signal' });
} }
async setClientExcepted(errorAsText: string, errorHint?: DMessageErrorPart['hint']): Promise<void> { async setClientExcepted(errorAsText: string, errorHint?: DMessageErrorPart['hint']): Promise<void> {
@@ -106,7 +106,7 @@ export class ContentReassembler {
this.onCGIssue({ cg: 'issue', issueId: 'client-read', issueText: errorAsText, issueHint: errorHint }); this.onCGIssue({ cg: 'issue', issueId: 'client-read', issueText: errorAsText, issueHint: errorHint });
// NOTE: this doesn't go to the debugger anymore - as we only publish external particles to the debugger // NOTE: this doesn't go to the debugger anymore - as we only publish external particles to the debugger
await this.#reassembleParticle({ cg: 'end', reason: 'issue-rpc', tokenStopReason: 'cg-issue' }); await this.#reassembleParticle({ cg: 'end', terminationReason: 'issue-rpc', tokenStopReason: 'cg-issue' });
} }
async setClientRetrying(strategy: 'reconnect' | 'resume', errorMessage: string, attempt: number, maxAttempts: number, delayMs: number, causeHttp?: number, causeConn?: string) { async setClientRetrying(strategy: 'reconnect' | 'resume', errorMessage: string, attempt: number, maxAttempts: number, delayMs: number, causeHttp?: number, causeConn?: string) {
@@ -410,7 +410,7 @@ export class ContentReassembler {
// Break text accumulation, as we have a full audio part in the middle // Break text accumulation, as we have a full audio part in the middle
this.currentTextFragmentIndex = null; this.currentTextFragmentIndex = null;
const { mimeType, a_b64: base64Data, label, generator, durationMs } = particle; const { mimeType, a_b64: base64Data, label, /*generator,*/ durationMs } = particle;
const safeLabel = label || 'Generated Audio'; const safeLabel = label || 'Generated Audio';
try { try {
@@ -638,7 +638,7 @@ export class ContentReassembler {
/// Rest of the data /// /// Rest of the data ///
private onCGEnd({ reason: _reason /* Redundant: no information */, tokenStopReason }: Extract<AixWire_Particles.ChatGenerateOp, { cg: 'end' }>): void { private onCGEnd({ terminationReason, tokenStopReason }: Extract<AixWire_Particles.ChatGenerateOp, { cg: 'end' }>): void {
// NOTE: no new info in particle.reason // NOTE: no new info in particle.reason
// - abort-client: user abort (already captured in the stop reason) // - abort-client: user abort (already captured in the stop reason)
+2 -2
View File
@@ -639,7 +639,7 @@ export namespace AixWire_Particles {
export type ChatControlOp = export type ChatControlOp =
// | { cg: 'start' } // not really used for now // | { cg: 'start' } // not really used for now
| { cg: 'end', reason: CGEndReason, tokenStopReason: GCTokenStopReason } | { cg: 'end', terminationReason: CGEndReason, tokenStopReason: GCTokenStopReason }
| { cg: 'issue', issueId: CGIssueId, issueText: string } | { cg: 'issue', issueId: CGIssueId, issueText: string }
| { cg: 'retry-reset', rScope: 'srv-dispatch' | 'srv-op' | 'cli-ll', rShallClear: boolean, reason: string, attempt: number, maxAttempts: number, delayMs: number, causeHttp?: number, causeConn?: string } | { cg: 'retry-reset', rScope: 'srv-dispatch' | 'srv-op' | 'cli-ll', rShallClear: boolean, reason: string, attempt: number, maxAttempts: number, delayMs: number, causeHttp?: number, causeConn?: string }
| { cg: 'set-metrics', metrics: CGSelectMetrics } | { cg: 'set-metrics', metrics: CGSelectMetrics }
@@ -649,7 +649,7 @@ export namespace AixWire_Particles {
| { cg: '_debugProfiler', measurements: Record<string, number | string>[] }; | { cg: '_debugProfiler', measurements: Record<string, number | string>[] };
export type CGEndReason = // the reason for the end of the chat generation export type CGEndReason = // the reason for the end of the chat generation
| 'abort-client' // user aborted before the end of stream | 'abort-client' // (set by the Reassembler on the client side) user aborted before the end of stream
| 'done-dialect' // OpenAI signals the '[DONE]' event, or Anthropic sends the 'message_stop' event | 'done-dialect' // OpenAI signals the '[DONE]' event, or Anthropic sends the 'message_stop' event
| 'done-dispatch-aborted' // this shall never see the light of day, as it was a reaction to the intake being aborted first | 'done-dispatch-aborted' // this shall never see the light of day, as it was a reaction to the intake being aborted first
| 'done-dispatch-closed' // dispatch connection closed | 'done-dispatch-closed' // dispatch connection closed
@@ -5,7 +5,7 @@ import { objectDeepCloneWithStringLimit, objectEstimateJsonSize } from '~/common
import type { AixWire_Particles } from '../../api/aix.wiretypes'; import type { AixWire_Particles } from '../../api/aix.wiretypes';
import type { IParticleTransmitter, ParticleServerLogLevel } from './parsers/IParticleTransmitter'; import type { IParticleTransmitter, ParticleCGDialectEndReason, ParticleServerLogLevel } from './parsers/IParticleTransmitter';
// configuration // configuration
@@ -102,7 +102,7 @@ export class ChatGenerateTransmitter implements IParticleTransmitter {
const dispatchOrDialectIssue = this.terminationReason === 'issue-dialect' || this.terminationReason === 'issue-rpc'; const dispatchOrDialectIssue = this.terminationReason === 'issue-dialect' || this.terminationReason === 'issue-rpc';
this.transmissionQueue.push({ this.transmissionQueue.push({
cg: 'end', cg: 'end',
reason: this.terminationReason, terminationReason: this.terminationReason,
tokenStopReason: this.tokenStopReason || (dispatchOrDialectIssue ? 'cg-issue' : 'ok'), tokenStopReason: this.tokenStopReason || (dispatchOrDialectIssue ? 'cg-issue' : 'ok'),
}); });
// Keep this in a terminated state, so that every subsequent call will yield errors (not implemented) // Keep this in a terminated state, so that every subsequent call will yield errors (not implemented)
@@ -129,10 +129,6 @@ export class ChatGenerateTransmitter implements IParticleTransmitter {
return this.tokenStopReason !== undefined; return this.tokenStopReason !== undefined;
} }
setRpcTerminatingIssue(issueId: AixWire_Particles.CGIssueId, issueText: string, serverLog: ParticleServerLogLevel) {
this._addIssue(issueId, issueText, serverLog);
this.setEnded('issue-rpc');
}
addDebugRequest(hideSensitiveData: boolean, url: string, headers: HeadersInit, body?: object) { addDebugRequest(hideSensitiveData: boolean, url: string, headers: HeadersInit, body?: object) {
// Ellipsize individual strings in the body object (e.g., base64 images) to reduce debug packet size // Ellipsize individual strings in the body object (e.g., base64 images) to reduce debug packet size
@@ -159,19 +155,40 @@ export class ChatGenerateTransmitter implements IParticleTransmitter {
} }
/// IPartTransmitter /// Dispatch termination
/** Set the end reason (NOTE: more comprehensive than just the IPartTransmitter.setEnded['reason'])*/ /** Set the end reason (NOTE: does not overlap with dialect-initiated end: IParticleTransmitter.setDialectEnded['reason']) */
setEnded(reason: AixWire_Particles.CGEndReason) { setDispatchEnded(reason: Extract<AixWire_Particles.CGEndReason,
| 'done-dispatch-closed' // stream ended
| 'done-dispatch-aborted' // stream aborted (abort signal)
| 'issue-rpc' // issues in one of 4 dispatch stages: prepare, fetch, read, parse - see below
>) {
if (SERVER_DEBUG_WIRE) if (SERVER_DEBUG_WIRE)
console.log('|terminate|', reason, this.terminationReason ? `(WARNING: already terminated ${this.terminationReason})` : ''); console.log('|terminate-dispatch|', reason, this.terminationReason ? `(WARNING: already terminated ${this.terminationReason})` : '');
if (this.terminationReason)
console.warn(`[AIX] setDispatchEnded('${reason}'): already terminated with reason '${this.terminationReason}' (overriding)`);
this.terminationReason = reason; this.terminationReason = reason;
} }
setTokenStopReason(reason: AixWire_Particles.GCTokenStopReason) { setDispatchRpcTerminatingIssue(issueId: Extract<AixWire_Particles.CGIssueId,
| 'dispatch-prepare'
| 'dispatch-fetch'
| 'dispatch-read'
| 'dispatch-parse'
>, issueText: string, serverLog: ParticleServerLogLevel) {
this._addIssue(issueId, issueText, serverLog);
this.setDispatchEnded('issue-rpc');
}
/// IPartTransmitter
setDialectEnded(reason: ParticleCGDialectEndReason) {
if (SERVER_DEBUG_WIRE) if (SERVER_DEBUG_WIRE)
console.log('|token-stop|', reason); console.log('|terminate-dialect|', reason, this.terminationReason ? `(WARNING: already terminated ${this.terminationReason})` : '');
this.tokenStopReason = reason; if (this.terminationReason)
console.warn(`[AIX] setDialectEnded('${reason}'): already terminated with reason '${this.terminationReason}' (overriding)`);
this.terminationReason = reason;
} }
/** /**
@@ -180,9 +197,18 @@ export class ChatGenerateTransmitter implements IParticleTransmitter {
*/ */
setDialectTerminatingIssue(dialectText: string, symbol: string | null, _serverLog: ParticleServerLogLevel = false) { setDialectTerminatingIssue(dialectText: string, symbol: string | null, _serverLog: ParticleServerLogLevel = false) {
this._addIssue('dialect-issue', ` ${symbol || ''} **[${this.prettyDialect} Issue]:** ${dialectText}`, _serverLog); this._addIssue('dialect-issue', ` ${symbol || ''} **[${this.prettyDialect} Issue]:** ${dialectText}`, _serverLog);
this.setEnded('issue-dialect'); this.setDialectEnded('issue-dialect');
} }
setTokenStopReason(reason: AixWire_Particles.GCTokenStopReason) {
if (SERVER_DEBUG_WIRE)
console.log('|token-stop|', reason);
if (this.tokenStopReason)
console.warn(`[AIX] setTokenStopReason('${reason}'): already has token stop reason '${this.tokenStopReason}' (overriding)`);
this.tokenStopReason = reason;
}
/** Closes the current part, also flushing it out */ /** Closes the current part, also flushing it out */
endMessagePart() { endMessagePart() {
// signals that the part has ended and should be transmitted // signals that the part has ended and should be transmitted
@@ -40,7 +40,7 @@ export async function* executeChatGenerate(
dispatch = dispatchCreatorFn(); dispatch = dispatchCreatorFn();
} catch (error: any) { } catch (error: any) {
// log but don't warn on the server console, this is typically a service configuration issue (e.g. a missing password will throw here) // log but don't warn on the server console, this is typically a service configuration issue (e.g. a missing password will throw here)
chatGenerateTx.setRpcTerminatingIssue('dispatch-prepare', `**[AIX Configuration Issue] ${_d.prettyDialect}**: ${safeErrorString(error) || 'Unknown service preparation error'}`, 'srv-log'); chatGenerateTx.setDispatchRpcTerminatingIssue('dispatch-prepare', `**[AIX Configuration Issue] ${_d.prettyDialect}**: ${safeErrorString(error) || 'Unknown service preparation error'}`, 'srv-log');
yield* chatGenerateTx.flushParticles(); yield* chatGenerateTx.flushParticles();
return; // exit return; // exit
} }
@@ -121,7 +121,7 @@ async function* _connectToDispatch(
} catch (error: any) { } catch (error: any) {
// Handle expected dispatch abortion while the first fetch hasn't even completed - both TRPCError (for RPC conversion) and TRPCFetcherError (for direct fetch) // Handle expected dispatch abortion while the first fetch hasn't even completed - both TRPCError (for RPC conversion) and TRPCFetcherError (for direct fetch)
if (error && (error?.name === 'TRPCError' /* tRPC */ || error?.name === 'TRPCFetcherError') && intakeAbortSignal.aborted) { if (error && (error?.name === 'TRPCError' /* tRPC */ || error?.name === 'TRPCFetcherError') && intakeAbortSignal.aborted) {
chatGenerateTx.setEnded('done-dispatch-aborted'); chatGenerateTx.setDispatchEnded('done-dispatch-aborted');
yield* chatGenerateTx.flushParticles(); yield* chatGenerateTx.flushParticles();
return null; // signal caller to exit return null; // signal caller to exit
} }
@@ -130,7 +130,7 @@ async function* _connectToDispatch(
const dispatchFetchError = safeErrorString(error) + (error?.cause ? ' · ' + JSON.stringify(error.cause) : ''); const dispatchFetchError = safeErrorString(error) + (error?.cause ? ' · ' + JSON.stringify(error.cause) : '');
const extraDevMessage = AIX_SECURITY_ONLY_IN_DEV_BUILDS ? ` - [DEV_URL: ${request.url}]` : ''; const extraDevMessage = AIX_SECURITY_ONLY_IN_DEV_BUILDS ? ` - [DEV_URL: ${request.url}]` : '';
chatGenerateTx.setRpcTerminatingIssue('dispatch-fetch', `**[Service Issue] ${_d.prettyDialect}**: ${dispatchFetchError}${extraDevMessage}`, _d.consoleLogErrors); chatGenerateTx.setDispatchRpcTerminatingIssue('dispatch-fetch', `**[Service Issue] ${_d.prettyDialect}**: ${dispatchFetchError}${extraDevMessage}`, _d.consoleLogErrors);
yield* chatGenerateTx.flushParticles(); yield* chatGenerateTx.flushParticles();
return null; // signal caller to exit return null; // signal caller to exit
} }
@@ -162,14 +162,21 @@ async function* _consumeDispatchUnified(
dispatchParserNS(chatGenerateTx, dispatchBody, undefined, parseContext); dispatchParserNS(chatGenerateTx, dispatchBody, undefined, parseContext);
_d.profiler?.measureEnd('parse-full'); _d.profiler?.measureEnd('parse-full');
// Normal termination with no more data // Handle the case where the Dialect hansn't signaled the end of generation
chatGenerateTx.setEnded('done-dispatch-closed'); if (!chatGenerateTx.isEnded) {
// dialects shall send a token stop reason before 'normal' stream close - otherwise it's a protocol 'bug' or an unexpected truncation (which needs investigation)
if (!chatGenerateTx.hasExplicitTokenStopReason)
console.warn(`[AIX] _consumeDispatchUnified: ${_d.prettyDialect}: stream closed (done-dispatch-closed) without provider termination signal - response may be truncated`);
chatGenerateTx.setDispatchEnded('done-dispatch-closed');
}
} catch (error: any) { } catch (error: any) {
if (dispatchBody === undefined) if (dispatchBody === undefined)
chatGenerateTx.setRpcTerminatingIssue('dispatch-read', `**[Reading Issue] ${_d.prettyDialect}**: ${safeErrorString(error) || 'Unknown stream reading error'}`, 'srv-warn'); chatGenerateTx.setDispatchRpcTerminatingIssue('dispatch-read', `**[Reading Issue] ${_d.prettyDialect}**: ${safeErrorString(error) || 'Unknown stream reading error'}`, 'srv-warn');
else else
chatGenerateTx.setRpcTerminatingIssue('dispatch-parse', ` **[Parsing Issue] ${_d.prettyDialect}**: ${safeErrorString(error) || 'Unknown stream parsing error'}.\n\nInput data: ${objectDeepCloneWithStringLimit(dispatchBody, 'aix.parsing-issue', 2048)}.\n\nPlease open a support ticket on GitHub.`, 'srv-warn'); chatGenerateTx.setDispatchRpcTerminatingIssue('dispatch-parse', ` **[Parsing Issue] ${_d.prettyDialect}**: ${safeErrorString(error) || 'Unknown stream parsing error'}.\n\nInput data: ${objectDeepCloneWithStringLimit(dispatchBody, 'aix.parsing-issue', 2048)}.\n\nPlease open a support ticket on GitHub.`, 'srv-warn');
} }
} }
@@ -207,8 +214,7 @@ async function* _consumeDispatchStream(
// Handle normal dispatch stream closure (no more data, AI Service closed the stream) // Handle normal dispatch stream closure (no more data, AI Service closed the stream)
if (done) { if (done) {
// we used to `chatGenerateTx.setEnded('done-dispatch-closed');` here and break out of the processing loop, // mark as ended,
// but there may be recoverable events in the demuxer's buffer
isFinalIteration = true; isFinalIteration = true;
// 2. Decode nothing new // 2. Decode nothing new
@@ -239,12 +245,12 @@ async function* _consumeDispatchStream(
// Handle expected dispatch stream abortion - nothing to do, as the intake is already closed // Handle expected dispatch stream abortion - nothing to do, as the intake is already closed
// TODO: check if 'AbortError' is also a cause. Seems like ResponseAborted is NextJS vs signal driven. // TODO: check if 'AbortError' is also a cause. Seems like ResponseAborted is NextJS vs signal driven.
if (error && (error?.name === 'ResponseAborted' /* tRPC */ || error?.name === 'AbortError' /* CSF */)) { if (error && (error?.name === 'ResponseAborted' /* tRPC */ || error?.name === 'AbortError' /* CSF */)) {
chatGenerateTx.setEnded('done-dispatch-aborted'); chatGenerateTx.setDispatchEnded('done-dispatch-aborted');
break; // outer do {} break; // outer do {}
} }
// Handle abnormal stream termination; print to the server console as well (important to debug) // Handle abnormal stream termination; print to the server console as well (important to debug)
chatGenerateTx.setRpcTerminatingIssue('dispatch-read', `**[Streaming Issue] ${_d.prettyDialect}**: ${safeErrorString(error) || 'Unknown stream reading error'}`, 'srv-warn'); chatGenerateTx.setDispatchRpcTerminatingIssue('dispatch-read', `**[Streaming Issue] ${_d.prettyDialect}**: ${safeErrorString(error) || 'Unknown stream reading error'}`, 'srv-warn');
break; // outer do {} break; // outer do {}
} }
@@ -267,7 +273,7 @@ async function* _consumeDispatchStream(
// [OpenAI] Special: stream termination marker // [OpenAI] Special: stream termination marker
if (demuxedItem.data === '[DONE]') { if (demuxedItem.data === '[DONE]') {
chatGenerateTx.setEnded('done-dialect'); chatGenerateTx.setDialectEnded('done-dialect'); // OpenAI ChatCompletions
break; // inner for {}, then outer do break; // inner for {}, then outer do
} }
@@ -287,20 +293,19 @@ async function* _consumeDispatchStream(
if (error instanceof RequestRetryError) throw error; if (error instanceof RequestRetryError) throw error;
// Handle parsing issue (likely a schema break); print it to the server console as well // Handle parsing issue (likely a schema break); print it to the server console as well
chatGenerateTx.setRpcTerminatingIssue('dispatch-parse', ` **[Service Parsing Issue] ${_d.prettyDialect}**: ${safeErrorString(error) || 'Unknown stream parsing error'}.\n\nInput data: ${objectDeepCloneWithStringLimit(demuxedItem.data, 'aix.service-parsing-issue', 2048)}.\n\nPlease open a support ticket on GitHub.`, 'srv-warn'); chatGenerateTx.setDispatchRpcTerminatingIssue('dispatch-parse', ` **[Service Parsing Issue] ${_d.prettyDialect}**: ${safeErrorString(error) || 'Unknown stream parsing error'}.\n\nInput data: ${objectDeepCloneWithStringLimit(demuxedItem.data, 'aix.service-parsing-issue', 2048)}.\n\nPlease open a support ticket on GitHub.`, 'srv-warn');
break; // inner for {}, then outer do break; // inner for {}, then outer do
} }
} }
// 6. Normal stream end - if didn't end in a connection or event parsing error // 6. Stream end - Handle the case where the Dialect hansn't signaled the end of generation
if (isFinalIteration && !chatGenerateTx.isEnded) { if (isFinalIteration && !chatGenerateTx.isEnded) {
// Log when the dispatch closed the stream but the dialect parser never set a stop reason // dialects shall send a token stop reason before 'normal' stream close - otherwise it's a protocol 'bug' or an unexpected truncation (which needs investigation)
// Either the provider does not close with a reason (protocol 'bug') or the response may be truncated (cause we're investigating)
if (!chatGenerateTx.hasExplicitTokenStopReason) if (!chatGenerateTx.hasExplicitTokenStopReason)
console.warn(`[AIX] _consumeDispatchStream: ${_d.prettyDialect}: stream closed (done-dispatch-closed) without provider termination signal - response may be truncated`); console.warn(`[AIX] _consumeDispatchStream: ${_d.prettyDialect}: stream closed (done-dispatch-closed) without provider termination signal - response may be truncated`);
chatGenerateTx.setEnded('done-dispatch-closed'); chatGenerateTx.setDispatchEnded('done-dispatch-closed');
break; // outer do {} break; // outer do {}
} }
@@ -3,16 +3,21 @@ import type { AixWire_Particles } from '~/modules/aix/server/api/aix.wiretypes';
export type ParticleServerLogLevel = false | 'srv-log' | 'srv-warn'; export type ParticleServerLogLevel = false | 'srv-log' | 'srv-warn';
export type ParticleCGDialectEndReason = Extract<AixWire_Particles.CGEndReason, 'done-dialect' | 'issue-dialect'>;
export interface IParticleTransmitter { export interface IParticleTransmitter {
// Parser-initiated Control // // Parser-initiated Control //
/** Set the end reason - only use for 'done-dialect' to signal a dialect-close */ /** Set the end reason - only use for 'done-dialect' to signal a dialect-close */
setEnded(reason: Extract<AixWire_Particles.CGEndReason, 'done-dialect' | 'issue-dialect'>): void; setDialectEnded(reason: ParticleCGDialectEndReason): void;
/** End the current part and flush it */ /** End the current part and flush it, which also calls `setDialectEnded('issue-dialct')` */
setDialectTerminatingIssue(dialectText: string, symbol: string | null, serverLog: ParticleServerLogLevel): void; setDialectTerminatingIssue(dialectText: string, symbol: string | null, serverLog: ParticleServerLogLevel): void;
/** Communicates the finish reason to the client - Data only, this does not do Control, like the above */
setTokenStopReason(reason: AixWire_Particles.GCTokenStopReason): void;
// Parts data // // Parts data //
@@ -83,9 +88,6 @@ export interface IParticleTransmitter {
/** Communicates the upstream response handle, for remote control/resumability */ /** Communicates the upstream response handle, for remote control/resumability */
setUpstreamHandle(handle: string, type: 'oai-responses'): void; setUpstreamHandle(handle: string, type: 'oai-responses'): void;
/** Communicates the finish reason to the client */
setTokenStopReason(reason: AixWire_Particles.GCTokenStopReason): void;
/** Update the metrics, sent twice (after the first call, and then at the end of the transmission) */ /** Update the metrics, sent twice (after the first call, and then at the end of the transmission) */
updateMetrics(update: Partial<AixWire_Particles.CGSelectMetrics>): void; updateMetrics(update: Partial<AixWire_Particles.CGSelectMetrics>): void;
@@ -570,7 +570,7 @@ export function createAnthropicMessageParser(): ChatGenerateParseFunction {
case 'message_stop': case 'message_stop':
AnthropicWire_API_Message_Create.event_MessageStop_schema.parse(JSON.parse(eventData)); AnthropicWire_API_Message_Create.event_MessageStop_schema.parse(JSON.parse(eventData));
if (ANTHROPIC_DEBUG_EVENT_SEQUENCE) console.log('ant message_stop'); if (ANTHROPIC_DEBUG_EVENT_SEQUENCE) console.log('ant message_stop');
return pt.setEnded('done-dialect'); return pt.setDialectEnded('done-dialect'); // Anthropic: stop message
// UNDOCUMENTED - Occasionally, the server will send errors, such as {'type': 'error', 'error': {'type': 'overloaded_error', 'message': 'Overloaded'}} // UNDOCUMENTED - Occasionally, the server will send errors, such as {'type': 'error', 'error': {'type': 'overloaded_error', 'message': 'Overloaded'}}
case 'error': case 'error':
@@ -93,6 +93,43 @@ export function createGeminiGenerateContentResponseParser(requestedModelName: st
sentRequestedModelName = true; sentRequestedModelName = true;
} }
// -> Stats - before candidates to endings won't interfere/block
if (generationChunk.usageMetadata) {
const metricsUpdate: AixWire_Particles.CGSelectMetrics = {
TIn: generationChunk.usageMetadata.promptTokenCount,
TOut: generationChunk.usageMetadata.candidatesTokenCount,
};
// Add reasoning tokens if available
if (generationChunk.usageMetadata.thoughtsTokenCount) {
metricsUpdate.TOutR = generationChunk.usageMetadata.thoughtsTokenCount;
metricsUpdate.TOut = (metricsUpdate.TOut ?? 0) + metricsUpdate.TOutR; // in gemini candidatesTokenCount does not include reasoning tokens
}
// Subtract auto-cached (read) input tokens
if (generationChunk.usageMetadata.cachedContentTokenCount) {
metricsUpdate.TCacheRead = generationChunk.usageMetadata.cachedContentTokenCount;
if ((metricsUpdate.TIn ?? 0) > metricsUpdate.TCacheRead)
metricsUpdate.TIn = (metricsUpdate.TIn ?? 0) - metricsUpdate.TCacheRead;
}
if (isStreaming && timeToFirstEvent !== undefined)
metricsUpdate.dtStart = timeToFirstEvent;
// the first end-1 packet will be skipped (when streaming)
if (!skipComputingTotalsOnce) {
metricsUpdate.dtAll = Date.now() - parserCreationTimestamp;
if (!isStreaming && metricsUpdate.dtAll > timeToFirstEvent)
metricsUpdate.dtInner = metricsUpdate.dtAll - timeToFirstEvent;
if (isStreaming && metricsUpdate.TOut)
metricsUpdate.vTOutInner = Math.round(100 * 1000 /*ms/s*/ * metricsUpdate.TOut / (metricsUpdate.dtInner || metricsUpdate.dtAll)) / 100;
}
// the second (end) packet will be sent
skipComputingTotalsOnce = false;
pt.updateMetrics(metricsUpdate);
}
// -> Prompt Safety Blocking // -> Prompt Safety Blocking
if (generationChunk.promptFeedback?.blockReason) { if (generationChunk.promptFeedback?.blockReason) {
const { blockReason, safetyRatings } = generationChunk.promptFeedback; const { blockReason, safetyRatings } = generationChunk.promptFeedback;
@@ -262,15 +299,22 @@ export function createGeminiGenerateContentResponseParser(requestedModelName: st
switch (candidate0.finishReason) { switch (candidate0.finishReason) {
case 'STOP': case 'STOP':
// FORMER NOTE:
// this is expected for every fragment up to the end, when it may switch to one of the reasons below in the last packet // this is expected for every fragment up to the end, when it may switch to one of the reasons below in the last packet
// we cannot assume this signals a good ending, however it will be `pt` to set it to 'ok' if not set to an issue by the end // we cannot assume this signals a good ending, however it will be `pt` to set it to 'ok' if not set to an issue by the end
// NEW NOTE:
// 'STOP' seems to only be sent at the end now
pt.setTokenStopReason('ok')
pt.setDialectEnded('done-dialect'); // Gemini: generation finished successfully
break; break;
case 'MAX_TOKENS': case 'MAX_TOKENS':
pt.setTokenStopReason('out-of-tokens'); pt.setTokenStopReason('out-of-tokens');
// NOTE: we call setEnded instead of setDialectTerminatingIssue, because we don't want an extra message appended, // NOTE: we call setDialectEnded instead of setDialectTerminatingIssue, because we don't want an extra message appended,
// as we know that 'out-of-tokens' will likely append a brick wall (simple/universal enough). // as we know that 'out-of-tokens' will likely append a brick wall (simple/universal enough).
return pt.setEnded('issue-dialect'); pt.setDialectEnded('issue-dialect'); // Gemini: max tokens reached
break;
// will set both TokenStop and TerminatingIssue // will set both TokenStop and TerminatingIssue
case 'SAFETY': case 'SAFETY':
@@ -323,44 +367,7 @@ export function createGeminiGenerateContentResponseParser(requestedModelName: st
return pt.setDialectTerminatingIssue(`unexpected Gemini finish reason: ${candidate0?.finishReason})`, null, 'srv-warn'); return pt.setDialectTerminatingIssue(`unexpected Gemini finish reason: ${candidate0?.finishReason})`, null, 'srv-warn');
} }
} }
} /* end of .candidates */ } /* end of .candidates (single candidate is ensured) */
// -> Stats
if (generationChunk.usageMetadata) {
const metricsUpdate: AixWire_Particles.CGSelectMetrics = {
TIn: generationChunk.usageMetadata.promptTokenCount,
TOut: generationChunk.usageMetadata.candidatesTokenCount,
};
// Add reasoning tokens if available
if (generationChunk.usageMetadata.thoughtsTokenCount) {
metricsUpdate.TOutR = generationChunk.usageMetadata.thoughtsTokenCount;
metricsUpdate.TOut = (metricsUpdate.TOut ?? 0) + metricsUpdate.TOutR; // in gemini candidatesTokenCount does not include reasoning tokens
}
// Subtract auto-cached (read) input tokens
if (generationChunk.usageMetadata.cachedContentTokenCount) {
metricsUpdate.TCacheRead = generationChunk.usageMetadata.cachedContentTokenCount;
if ((metricsUpdate.TIn ?? 0) > metricsUpdate.TCacheRead)
metricsUpdate.TIn = (metricsUpdate.TIn ?? 0) - metricsUpdate.TCacheRead;
}
if (isStreaming && timeToFirstEvent !== undefined)
metricsUpdate.dtStart = timeToFirstEvent;
// the first end-1 packet will be skipped (when streaming)
if (!skipComputingTotalsOnce) {
metricsUpdate.dtAll = Date.now() - parserCreationTimestamp;
if (!isStreaming && metricsUpdate.dtAll > timeToFirstEvent)
metricsUpdate.dtInner = metricsUpdate.dtAll - timeToFirstEvent;
if (isStreaming && metricsUpdate.TOut)
metricsUpdate.vTOutInner = Math.round(100 * 1000 /*ms/s*/ * metricsUpdate.TOut / (metricsUpdate.dtInner || metricsUpdate.dtAll)) / 100;
}
// the second (end) packet will be sent
skipComputingTotalsOnce = false;
pt.updateMetrics(metricsUpdate);
}
}; };
} }
@@ -323,6 +323,9 @@ export function createOpenAIResponsesEventParser(): ChatGenerateParseFunction {
if (metrics) if (metrics)
pt.updateMetrics(metrics); pt.updateMetrics(metrics);
} }
// -> End of the response
pt.setDialectEnded('done-dialect'); // OpenAI Responses: 'response.completed'
break; break;
case 'response.failed': case 'response.failed':
+2 -2
View File
@@ -16,7 +16,7 @@ import * as fs from 'node:fs';
import * as path from 'node:path'; import * as path from 'node:path';
import type { AixAPI_Access, AixAPI_Model, AixAPIChatGenerate_Request, AixWire_Particles } from '~/modules/aix/server/api/aix.wiretypes'; import type { AixAPI_Access, AixAPI_Model, AixAPIChatGenerate_Request, AixWire_Particles } from '~/modules/aix/server/api/aix.wiretypes';
import type { IParticleTransmitter, ParticleServerLogLevel } from '~/modules/aix/server/dispatch/chatGenerate/parsers/IParticleTransmitter'; import type { IParticleTransmitter, ParticleCGDialectEndReason, ParticleServerLogLevel } from '~/modules/aix/server/dispatch/chatGenerate/parsers/IParticleTransmitter';
import type { ModelDescriptionSchema } from '~/modules/llms/server/llm.server.types'; import type { ModelDescriptionSchema } from '~/modules/llms/server/llm.server.types';
import { listModelsRunDispatch } from '~/modules/llms/server/listModels.dispatch'; import { listModelsRunDispatch } from '~/modules/llms/server/listModels.dispatch';
import { createChatGenerateDispatch } from '~/modules/aix/server/dispatch/chatGenerate/chatGenerate.dispatch'; import { createChatGenerateDispatch } from '~/modules/aix/server/dispatch/chatGenerate/chatGenerate.dispatch';
@@ -287,7 +287,7 @@ class SweepCollectorTransmitter implements IParticleTransmitter {
get hasError(): boolean { return this.dialectIssue !== null; } get hasError(): boolean { return this.dialectIssue !== null; }
// Parser-initiated Control // Parser-initiated Control
setEnded(reason: 'done-dialect' | 'issue-dialect'): void { setDialectEnded(reason: ParticleCGDialectEndReason): void {
this.endReason = reason; this.endReason = reason;
} }