diff --git a/src/modules/aix/client/ContentReassembler.ts b/src/modules/aix/client/ContentReassembler.ts index c45138ab8..9b0ec64ab 100644 --- a/src/modules/aix/client/ContentReassembler.ts +++ b/src/modules/aix/client/ContentReassembler.ts @@ -96,7 +96,7 @@ export class ContentReassembler { console.log('-> aix.p: abort-client'); // NOTE: this doesn't go to the debugger anymore - as we only publish external particles to the debugger - await this.#reassembleParticle({ cg: 'end', reason: 'abort-client', tokenStopReason: 'client-abort-signal' }); + await this.#reassembleParticle({ cg: 'end', terminationReason: 'abort-client', tokenStopReason: 'client-abort-signal' }); } async setClientExcepted(errorAsText: string, errorHint?: DMessageErrorPart['hint']): Promise { @@ -106,7 +106,7 @@ export class ContentReassembler { this.onCGIssue({ cg: 'issue', issueId: 'client-read', issueText: errorAsText, issueHint: errorHint }); // NOTE: this doesn't go to the debugger anymore - as we only publish external particles to the debugger - await this.#reassembleParticle({ cg: 'end', reason: 'issue-rpc', tokenStopReason: 'cg-issue' }); + await this.#reassembleParticle({ cg: 'end', terminationReason: 'issue-rpc', tokenStopReason: 'cg-issue' }); } async setClientRetrying(strategy: 'reconnect' | 'resume', errorMessage: string, attempt: number, maxAttempts: number, delayMs: number, causeHttp?: number, causeConn?: string) { @@ -410,7 +410,7 @@ export class ContentReassembler { // Break text accumulation, as we have a full audio part in the middle this.currentTextFragmentIndex = null; - const { mimeType, a_b64: base64Data, label, generator, durationMs } = particle; + const { mimeType, a_b64: base64Data, label, /*generator,*/ durationMs } = particle; const safeLabel = label || 'Generated Audio'; try { @@ -638,7 +638,7 @@ export class ContentReassembler { /// Rest of the data /// - private onCGEnd({ reason: _reason /* Redundant: no information */, tokenStopReason }: Extract): void { + private onCGEnd({ terminationReason, tokenStopReason }: Extract): void { // NOTE: no new info in particle.reason // - abort-client: user abort (already captured in the stop reason) diff --git a/src/modules/aix/server/api/aix.wiretypes.ts b/src/modules/aix/server/api/aix.wiretypes.ts index e68a8c2c4..8feee2e75 100644 --- a/src/modules/aix/server/api/aix.wiretypes.ts +++ b/src/modules/aix/server/api/aix.wiretypes.ts @@ -639,7 +639,7 @@ export namespace AixWire_Particles { export type ChatControlOp = // | { cg: 'start' } // not really used for now - | { cg: 'end', reason: CGEndReason, tokenStopReason: GCTokenStopReason } + | { cg: 'end', terminationReason: CGEndReason, tokenStopReason: GCTokenStopReason } | { cg: 'issue', issueId: CGIssueId, issueText: string } | { cg: 'retry-reset', rScope: 'srv-dispatch' | 'srv-op' | 'cli-ll', rShallClear: boolean, reason: string, attempt: number, maxAttempts: number, delayMs: number, causeHttp?: number, causeConn?: string } | { cg: 'set-metrics', metrics: CGSelectMetrics } @@ -649,7 +649,7 @@ export namespace AixWire_Particles { | { cg: '_debugProfiler', measurements: Record[] }; export type CGEndReason = // the reason for the end of the chat generation - | 'abort-client' // user aborted before the end of stream + | 'abort-client' // (set by the Reassembler on the client side) user aborted before the end of stream | 'done-dialect' // OpenAI signals the '[DONE]' event, or Anthropic sends the 'message_stop' event | 'done-dispatch-aborted' // this shall never see the light of day, as it was a reaction to the intake being aborted first | 'done-dispatch-closed' // dispatch connection closed diff --git a/src/modules/aix/server/dispatch/chatGenerate/ChatGenerateTransmitter.ts b/src/modules/aix/server/dispatch/chatGenerate/ChatGenerateTransmitter.ts index a2afbd567..3e2ea94cb 100644 --- a/src/modules/aix/server/dispatch/chatGenerate/ChatGenerateTransmitter.ts +++ b/src/modules/aix/server/dispatch/chatGenerate/ChatGenerateTransmitter.ts @@ -5,7 +5,7 @@ import { objectDeepCloneWithStringLimit, objectEstimateJsonSize } from '~/common import type { AixWire_Particles } from '../../api/aix.wiretypes'; -import type { IParticleTransmitter, ParticleServerLogLevel } from './parsers/IParticleTransmitter'; +import type { IParticleTransmitter, ParticleCGDialectEndReason, ParticleServerLogLevel } from './parsers/IParticleTransmitter'; // configuration @@ -102,7 +102,7 @@ export class ChatGenerateTransmitter implements IParticleTransmitter { const dispatchOrDialectIssue = this.terminationReason === 'issue-dialect' || this.terminationReason === 'issue-rpc'; this.transmissionQueue.push({ cg: 'end', - reason: this.terminationReason, + terminationReason: this.terminationReason, tokenStopReason: this.tokenStopReason || (dispatchOrDialectIssue ? 'cg-issue' : 'ok'), }); // Keep this in a terminated state, so that every subsequent call will yield errors (not implemented) @@ -129,10 +129,6 @@ export class ChatGenerateTransmitter implements IParticleTransmitter { return this.tokenStopReason !== undefined; } - setRpcTerminatingIssue(issueId: AixWire_Particles.CGIssueId, issueText: string, serverLog: ParticleServerLogLevel) { - this._addIssue(issueId, issueText, serverLog); - this.setEnded('issue-rpc'); - } addDebugRequest(hideSensitiveData: boolean, url: string, headers: HeadersInit, body?: object) { // Ellipsize individual strings in the body object (e.g., base64 images) to reduce debug packet size @@ -159,19 +155,40 @@ export class ChatGenerateTransmitter implements IParticleTransmitter { } - /// IPartTransmitter + /// Dispatch termination - /** Set the end reason (NOTE: more comprehensive than just the IPartTransmitter.setEnded['reason'])*/ - setEnded(reason: AixWire_Particles.CGEndReason) { + /** Set the end reason (NOTE: does not overlap with dialect-initiated end: IParticleTransmitter.setDialectEnded['reason']) */ + setDispatchEnded(reason: Extract) { if (SERVER_DEBUG_WIRE) - console.log('|terminate|', reason, this.terminationReason ? `(WARNING: already terminated ${this.terminationReason})` : ''); + console.log('|terminate-dispatch|', reason, this.terminationReason ? `(WARNING: already terminated ${this.terminationReason})` : ''); + if (this.terminationReason) + console.warn(`[AIX] setDispatchEnded('${reason}'): already terminated with reason '${this.terminationReason}' (overriding)`); this.terminationReason = reason; } - setTokenStopReason(reason: AixWire_Particles.GCTokenStopReason) { + setDispatchRpcTerminatingIssue(issueId: Extract, issueText: string, serverLog: ParticleServerLogLevel) { + this._addIssue(issueId, issueText, serverLog); + this.setDispatchEnded('issue-rpc'); + } + + + /// IPartTransmitter + + setDialectEnded(reason: ParticleCGDialectEndReason) { if (SERVER_DEBUG_WIRE) - console.log('|token-stop|', reason); - this.tokenStopReason = reason; + console.log('|terminate-dialect|', reason, this.terminationReason ? `(WARNING: already terminated ${this.terminationReason})` : ''); + if (this.terminationReason) + console.warn(`[AIX] setDialectEnded('${reason}'): already terminated with reason '${this.terminationReason}' (overriding)`); + this.terminationReason = reason; } /** @@ -180,9 +197,18 @@ export class ChatGenerateTransmitter implements IParticleTransmitter { */ setDialectTerminatingIssue(dialectText: string, symbol: string | null, _serverLog: ParticleServerLogLevel = false) { this._addIssue('dialect-issue', ` ${symbol || ''} **[${this.prettyDialect} Issue]:** ${dialectText}`, _serverLog); - this.setEnded('issue-dialect'); + this.setDialectEnded('issue-dialect'); } + setTokenStopReason(reason: AixWire_Particles.GCTokenStopReason) { + if (SERVER_DEBUG_WIRE) + console.log('|token-stop|', reason); + if (this.tokenStopReason) + console.warn(`[AIX] setTokenStopReason('${reason}'): already has token stop reason '${this.tokenStopReason}' (overriding)`); + this.tokenStopReason = reason; + } + + /** Closes the current part, also flushing it out */ endMessagePart() { // signals that the part has ended and should be transmitted diff --git a/src/modules/aix/server/dispatch/chatGenerate/chatGenerate.executor.ts b/src/modules/aix/server/dispatch/chatGenerate/chatGenerate.executor.ts index 7af26443a..1a78c6fb8 100644 --- a/src/modules/aix/server/dispatch/chatGenerate/chatGenerate.executor.ts +++ b/src/modules/aix/server/dispatch/chatGenerate/chatGenerate.executor.ts @@ -40,7 +40,7 @@ export async function* executeChatGenerate( dispatch = dispatchCreatorFn(); } catch (error: any) { // log but don't warn on the server console, this is typically a service configuration issue (e.g. a missing password will throw here) - chatGenerateTx.setRpcTerminatingIssue('dispatch-prepare', `**[AIX Configuration Issue] ${_d.prettyDialect}**: ${safeErrorString(error) || 'Unknown service preparation error'}`, 'srv-log'); + chatGenerateTx.setDispatchRpcTerminatingIssue('dispatch-prepare', `**[AIX Configuration Issue] ${_d.prettyDialect}**: ${safeErrorString(error) || 'Unknown service preparation error'}`, 'srv-log'); yield* chatGenerateTx.flushParticles(); return; // exit } @@ -121,7 +121,7 @@ async function* _connectToDispatch( } catch (error: any) { // Handle expected dispatch abortion while the first fetch hasn't even completed - both TRPCError (for RPC conversion) and TRPCFetcherError (for direct fetch) if (error && (error?.name === 'TRPCError' /* tRPC */ || error?.name === 'TRPCFetcherError') && intakeAbortSignal.aborted) { - chatGenerateTx.setEnded('done-dispatch-aborted'); + chatGenerateTx.setDispatchEnded('done-dispatch-aborted'); yield* chatGenerateTx.flushParticles(); return null; // signal caller to exit } @@ -130,7 +130,7 @@ async function* _connectToDispatch( const dispatchFetchError = safeErrorString(error) + (error?.cause ? ' ยท ' + JSON.stringify(error.cause) : ''); const extraDevMessage = AIX_SECURITY_ONLY_IN_DEV_BUILDS ? ` - [DEV_URL: ${request.url}]` : ''; - chatGenerateTx.setRpcTerminatingIssue('dispatch-fetch', `**[Service Issue] ${_d.prettyDialect}**: ${dispatchFetchError}${extraDevMessage}`, _d.consoleLogErrors); + chatGenerateTx.setDispatchRpcTerminatingIssue('dispatch-fetch', `**[Service Issue] ${_d.prettyDialect}**: ${dispatchFetchError}${extraDevMessage}`, _d.consoleLogErrors); yield* chatGenerateTx.flushParticles(); return null; // signal caller to exit } @@ -162,14 +162,21 @@ async function* _consumeDispatchUnified( dispatchParserNS(chatGenerateTx, dispatchBody, undefined, parseContext); _d.profiler?.measureEnd('parse-full'); - // Normal termination with no more data - chatGenerateTx.setEnded('done-dispatch-closed'); + // Handle the case where the Dialect hansn't signaled the end of generation + if (!chatGenerateTx.isEnded) { + + // dialects shall send a token stop reason before 'normal' stream close - otherwise it's a protocol 'bug' or an unexpected truncation (which needs investigation) + if (!chatGenerateTx.hasExplicitTokenStopReason) + console.warn(`[AIX] _consumeDispatchUnified: ${_d.prettyDialect}: stream closed (done-dispatch-closed) without provider termination signal - response may be truncated`); + + chatGenerateTx.setDispatchEnded('done-dispatch-closed'); + } } catch (error: any) { if (dispatchBody === undefined) - chatGenerateTx.setRpcTerminatingIssue('dispatch-read', `**[Reading Issue] ${_d.prettyDialect}**: ${safeErrorString(error) || 'Unknown stream reading error'}`, 'srv-warn'); + chatGenerateTx.setDispatchRpcTerminatingIssue('dispatch-read', `**[Reading Issue] ${_d.prettyDialect}**: ${safeErrorString(error) || 'Unknown stream reading error'}`, 'srv-warn'); else - chatGenerateTx.setRpcTerminatingIssue('dispatch-parse', ` **[Parsing Issue] ${_d.prettyDialect}**: ${safeErrorString(error) || 'Unknown stream parsing error'}.\n\nInput data: ${objectDeepCloneWithStringLimit(dispatchBody, 'aix.parsing-issue', 2048)}.\n\nPlease open a support ticket on GitHub.`, 'srv-warn'); + chatGenerateTx.setDispatchRpcTerminatingIssue('dispatch-parse', ` **[Parsing Issue] ${_d.prettyDialect}**: ${safeErrorString(error) || 'Unknown stream parsing error'}.\n\nInput data: ${objectDeepCloneWithStringLimit(dispatchBody, 'aix.parsing-issue', 2048)}.\n\nPlease open a support ticket on GitHub.`, 'srv-warn'); } } @@ -207,8 +214,7 @@ async function* _consumeDispatchStream( // Handle normal dispatch stream closure (no more data, AI Service closed the stream) if (done) { - // we used to `chatGenerateTx.setEnded('done-dispatch-closed');` here and break out of the processing loop, - // but there may be recoverable events in the demuxer's buffer + // mark as ended, isFinalIteration = true; // 2. Decode nothing new @@ -239,12 +245,12 @@ async function* _consumeDispatchStream( // Handle expected dispatch stream abortion - nothing to do, as the intake is already closed // TODO: check if 'AbortError' is also a cause. Seems like ResponseAborted is NextJS vs signal driven. if (error && (error?.name === 'ResponseAborted' /* tRPC */ || error?.name === 'AbortError' /* CSF */)) { - chatGenerateTx.setEnded('done-dispatch-aborted'); + chatGenerateTx.setDispatchEnded('done-dispatch-aborted'); break; // outer do {} } // Handle abnormal stream termination; print to the server console as well (important to debug) - chatGenerateTx.setRpcTerminatingIssue('dispatch-read', `**[Streaming Issue] ${_d.prettyDialect}**: ${safeErrorString(error) || 'Unknown stream reading error'}`, 'srv-warn'); + chatGenerateTx.setDispatchRpcTerminatingIssue('dispatch-read', `**[Streaming Issue] ${_d.prettyDialect}**: ${safeErrorString(error) || 'Unknown stream reading error'}`, 'srv-warn'); break; // outer do {} } @@ -267,7 +273,7 @@ async function* _consumeDispatchStream( // [OpenAI] Special: stream termination marker if (demuxedItem.data === '[DONE]') { - chatGenerateTx.setEnded('done-dialect'); + chatGenerateTx.setDialectEnded('done-dialect'); // OpenAI ChatCompletions break; // inner for {}, then outer do } @@ -287,20 +293,19 @@ async function* _consumeDispatchStream( if (error instanceof RequestRetryError) throw error; // Handle parsing issue (likely a schema break); print it to the server console as well - chatGenerateTx.setRpcTerminatingIssue('dispatch-parse', ` **[Service Parsing Issue] ${_d.prettyDialect}**: ${safeErrorString(error) || 'Unknown stream parsing error'}.\n\nInput data: ${objectDeepCloneWithStringLimit(demuxedItem.data, 'aix.service-parsing-issue', 2048)}.\n\nPlease open a support ticket on GitHub.`, 'srv-warn'); + chatGenerateTx.setDispatchRpcTerminatingIssue('dispatch-parse', ` **[Service Parsing Issue] ${_d.prettyDialect}**: ${safeErrorString(error) || 'Unknown stream parsing error'}.\n\nInput data: ${objectDeepCloneWithStringLimit(demuxedItem.data, 'aix.service-parsing-issue', 2048)}.\n\nPlease open a support ticket on GitHub.`, 'srv-warn'); break; // inner for {}, then outer do } } - // 6. Normal stream end - if didn't end in a connection or event parsing error + // 6. Stream end - Handle the case where the Dialect hansn't signaled the end of generation if (isFinalIteration && !chatGenerateTx.isEnded) { - // Log when the dispatch closed the stream but the dialect parser never set a stop reason - // Either the provider does not close with a reason (protocol 'bug') or the response may be truncated (cause we're investigating) + // dialects shall send a token stop reason before 'normal' stream close - otherwise it's a protocol 'bug' or an unexpected truncation (which needs investigation) if (!chatGenerateTx.hasExplicitTokenStopReason) console.warn(`[AIX] _consumeDispatchStream: ${_d.prettyDialect}: stream closed (done-dispatch-closed) without provider termination signal - response may be truncated`); - chatGenerateTx.setEnded('done-dispatch-closed'); + chatGenerateTx.setDispatchEnded('done-dispatch-closed'); break; // outer do {} } diff --git a/src/modules/aix/server/dispatch/chatGenerate/parsers/IParticleTransmitter.ts b/src/modules/aix/server/dispatch/chatGenerate/parsers/IParticleTransmitter.ts index fc5a27024..4e7368fb4 100644 --- a/src/modules/aix/server/dispatch/chatGenerate/parsers/IParticleTransmitter.ts +++ b/src/modules/aix/server/dispatch/chatGenerate/parsers/IParticleTransmitter.ts @@ -3,16 +3,21 @@ import type { AixWire_Particles } from '~/modules/aix/server/api/aix.wiretypes'; export type ParticleServerLogLevel = false | 'srv-log' | 'srv-warn'; +export type ParticleCGDialectEndReason = Extract; + export interface IParticleTransmitter { // Parser-initiated Control // /** Set the end reason - only use for 'done-dialect' to signal a dialect-close */ - setEnded(reason: Extract): void; + setDialectEnded(reason: ParticleCGDialectEndReason): void; - /** End the current part and flush it */ + /** End the current part and flush it, which also calls `setDialectEnded('issue-dialct')` */ setDialectTerminatingIssue(dialectText: string, symbol: string | null, serverLog: ParticleServerLogLevel): void; + /** Communicates the finish reason to the client - Data only, this does not do Control, like the above */ + setTokenStopReason(reason: AixWire_Particles.GCTokenStopReason): void; + // Parts data // @@ -83,9 +88,6 @@ export interface IParticleTransmitter { /** Communicates the upstream response handle, for remote control/resumability */ setUpstreamHandle(handle: string, type: 'oai-responses'): void; - /** Communicates the finish reason to the client */ - setTokenStopReason(reason: AixWire_Particles.GCTokenStopReason): void; - /** Update the metrics, sent twice (after the first call, and then at the end of the transmission) */ updateMetrics(update: Partial): void; diff --git a/src/modules/aix/server/dispatch/chatGenerate/parsers/anthropic.parser.ts b/src/modules/aix/server/dispatch/chatGenerate/parsers/anthropic.parser.ts index 63d11696f..412210f24 100644 --- a/src/modules/aix/server/dispatch/chatGenerate/parsers/anthropic.parser.ts +++ b/src/modules/aix/server/dispatch/chatGenerate/parsers/anthropic.parser.ts @@ -570,7 +570,7 @@ export function createAnthropicMessageParser(): ChatGenerateParseFunction { case 'message_stop': AnthropicWire_API_Message_Create.event_MessageStop_schema.parse(JSON.parse(eventData)); if (ANTHROPIC_DEBUG_EVENT_SEQUENCE) console.log('ant message_stop'); - return pt.setEnded('done-dialect'); + return pt.setDialectEnded('done-dialect'); // Anthropic: stop message // UNDOCUMENTED - Occasionally, the server will send errors, such as {'type': 'error', 'error': {'type': 'overloaded_error', 'message': 'Overloaded'}} case 'error': diff --git a/src/modules/aix/server/dispatch/chatGenerate/parsers/gemini.parser.ts b/src/modules/aix/server/dispatch/chatGenerate/parsers/gemini.parser.ts index cc4c2f1ff..b65ceaa01 100644 --- a/src/modules/aix/server/dispatch/chatGenerate/parsers/gemini.parser.ts +++ b/src/modules/aix/server/dispatch/chatGenerate/parsers/gemini.parser.ts @@ -93,6 +93,43 @@ export function createGeminiGenerateContentResponseParser(requestedModelName: st sentRequestedModelName = true; } + // -> Stats - before candidates to endings won't interfere/block + if (generationChunk.usageMetadata) { + const metricsUpdate: AixWire_Particles.CGSelectMetrics = { + TIn: generationChunk.usageMetadata.promptTokenCount, + TOut: generationChunk.usageMetadata.candidatesTokenCount, + }; + + // Add reasoning tokens if available + if (generationChunk.usageMetadata.thoughtsTokenCount) { + metricsUpdate.TOutR = generationChunk.usageMetadata.thoughtsTokenCount; + metricsUpdate.TOut = (metricsUpdate.TOut ?? 0) + metricsUpdate.TOutR; // in gemini candidatesTokenCount does not include reasoning tokens + } + + // Subtract auto-cached (read) input tokens + if (generationChunk.usageMetadata.cachedContentTokenCount) { + metricsUpdate.TCacheRead = generationChunk.usageMetadata.cachedContentTokenCount; + if ((metricsUpdate.TIn ?? 0) > metricsUpdate.TCacheRead) + metricsUpdate.TIn = (metricsUpdate.TIn ?? 0) - metricsUpdate.TCacheRead; + } + + if (isStreaming && timeToFirstEvent !== undefined) + metricsUpdate.dtStart = timeToFirstEvent; + + // the first end-1 packet will be skipped (when streaming) + if (!skipComputingTotalsOnce) { + metricsUpdate.dtAll = Date.now() - parserCreationTimestamp; + if (!isStreaming && metricsUpdate.dtAll > timeToFirstEvent) + metricsUpdate.dtInner = metricsUpdate.dtAll - timeToFirstEvent; + if (isStreaming && metricsUpdate.TOut) + metricsUpdate.vTOutInner = Math.round(100 * 1000 /*ms/s*/ * metricsUpdate.TOut / (metricsUpdate.dtInner || metricsUpdate.dtAll)) / 100; + } + // the second (end) packet will be sent + skipComputingTotalsOnce = false; + + pt.updateMetrics(metricsUpdate); + } + // -> Prompt Safety Blocking if (generationChunk.promptFeedback?.blockReason) { const { blockReason, safetyRatings } = generationChunk.promptFeedback; @@ -262,15 +299,22 @@ export function createGeminiGenerateContentResponseParser(requestedModelName: st switch (candidate0.finishReason) { case 'STOP': + // FORMER NOTE: // this is expected for every fragment up to the end, when it may switch to one of the reasons below in the last packet // we cannot assume this signals a good ending, however it will be `pt` to set it to 'ok' if not set to an issue by the end + + // NEW NOTE: + // 'STOP' seems to only be sent at the end now + pt.setTokenStopReason('ok') + pt.setDialectEnded('done-dialect'); // Gemini: generation finished successfully break; case 'MAX_TOKENS': pt.setTokenStopReason('out-of-tokens'); - // NOTE: we call setEnded instead of setDialectTerminatingIssue, because we don't want an extra message appended, + // NOTE: we call setDialectEnded instead of setDialectTerminatingIssue, because we don't want an extra message appended, // as we know that 'out-of-tokens' will likely append a brick wall (simple/universal enough). - return pt.setEnded('issue-dialect'); + pt.setDialectEnded('issue-dialect'); // Gemini: max tokens reached + break; // will set both TokenStop and TerminatingIssue case 'SAFETY': @@ -323,44 +367,7 @@ export function createGeminiGenerateContentResponseParser(requestedModelName: st return pt.setDialectTerminatingIssue(`unexpected Gemini finish reason: ${candidate0?.finishReason})`, null, 'srv-warn'); } } - } /* end of .candidates */ - - // -> Stats - if (generationChunk.usageMetadata) { - const metricsUpdate: AixWire_Particles.CGSelectMetrics = { - TIn: generationChunk.usageMetadata.promptTokenCount, - TOut: generationChunk.usageMetadata.candidatesTokenCount, - }; - - // Add reasoning tokens if available - if (generationChunk.usageMetadata.thoughtsTokenCount) { - metricsUpdate.TOutR = generationChunk.usageMetadata.thoughtsTokenCount; - metricsUpdate.TOut = (metricsUpdate.TOut ?? 0) + metricsUpdate.TOutR; // in gemini candidatesTokenCount does not include reasoning tokens - } - - // Subtract auto-cached (read) input tokens - if (generationChunk.usageMetadata.cachedContentTokenCount) { - metricsUpdate.TCacheRead = generationChunk.usageMetadata.cachedContentTokenCount; - if ((metricsUpdate.TIn ?? 0) > metricsUpdate.TCacheRead) - metricsUpdate.TIn = (metricsUpdate.TIn ?? 0) - metricsUpdate.TCacheRead; - } - - if (isStreaming && timeToFirstEvent !== undefined) - metricsUpdate.dtStart = timeToFirstEvent; - - // the first end-1 packet will be skipped (when streaming) - if (!skipComputingTotalsOnce) { - metricsUpdate.dtAll = Date.now() - parserCreationTimestamp; - if (!isStreaming && metricsUpdate.dtAll > timeToFirstEvent) - metricsUpdate.dtInner = metricsUpdate.dtAll - timeToFirstEvent; - if (isStreaming && metricsUpdate.TOut) - metricsUpdate.vTOutInner = Math.round(100 * 1000 /*ms/s*/ * metricsUpdate.TOut / (metricsUpdate.dtInner || metricsUpdate.dtAll)) / 100; - } - // the second (end) packet will be sent - skipComputingTotalsOnce = false; - - pt.updateMetrics(metricsUpdate); - } + } /* end of .candidates (single candidate is ensured) */ }; } diff --git a/src/modules/aix/server/dispatch/chatGenerate/parsers/openai.responses.parser.ts b/src/modules/aix/server/dispatch/chatGenerate/parsers/openai.responses.parser.ts index 873beca42..ea12ba3e3 100644 --- a/src/modules/aix/server/dispatch/chatGenerate/parsers/openai.responses.parser.ts +++ b/src/modules/aix/server/dispatch/chatGenerate/parsers/openai.responses.parser.ts @@ -323,6 +323,9 @@ export function createOpenAIResponsesEventParser(): ChatGenerateParseFunction { if (metrics) pt.updateMetrics(metrics); } + + // -> End of the response + pt.setDialectEnded('done-dialect'); // OpenAI Responses: 'response.completed' break; case 'response.failed': diff --git a/tools/develop/llm-parameter-sweep/sweep.ts b/tools/develop/llm-parameter-sweep/sweep.ts index 143ccf6d5..50122852a 100644 --- a/tools/develop/llm-parameter-sweep/sweep.ts +++ b/tools/develop/llm-parameter-sweep/sweep.ts @@ -16,7 +16,7 @@ import * as fs from 'node:fs'; import * as path from 'node:path'; import type { AixAPI_Access, AixAPI_Model, AixAPIChatGenerate_Request, AixWire_Particles } from '~/modules/aix/server/api/aix.wiretypes'; -import type { IParticleTransmitter, ParticleServerLogLevel } from '~/modules/aix/server/dispatch/chatGenerate/parsers/IParticleTransmitter'; +import type { IParticleTransmitter, ParticleCGDialectEndReason, ParticleServerLogLevel } from '~/modules/aix/server/dispatch/chatGenerate/parsers/IParticleTransmitter'; import type { ModelDescriptionSchema } from '~/modules/llms/server/llm.server.types'; import { listModelsRunDispatch } from '~/modules/llms/server/listModels.dispatch'; import { createChatGenerateDispatch } from '~/modules/aix/server/dispatch/chatGenerate/chatGenerate.dispatch'; @@ -287,7 +287,7 @@ class SweepCollectorTransmitter implements IParticleTransmitter { get hasError(): boolean { return this.dialectIssue !== null; } // Parser-initiated Control - setEnded(reason: 'done-dialect' | 'issue-dialect'): void { + setDialectEnded(reason: ParticleCGDialectEndReason): void { this.endReason = reason; }