Chat Generation metrics:

This commit is contained in:
Enrico Ros
2024-08-17 15:20:35 -07:00
parent c6d4f2834e
commit 6265868658
10 changed files with 159 additions and 112 deletions
+3 -3
View File
@@ -122,14 +122,14 @@ export async function llmGenerateContentStream(
(update: StreamingClientUpdate, done: boolean) => {
// convert the StreamingClientUpdate to the StreamMessageUpdate
const { typing, stats, fragments, originLLM, metadata } = update;
const { typing, metrics, fragments, modelName, metadata } = update;
messageOverwrite.pendingIncomplete = typing ? true : undefined;
messageOverwrite.fragments = fragments;
messageOverwrite.originLLM = originLLM;
messageOverwrite.originLLM = modelName;
if (metadata)
messageOverwrite.metadata = metadata;
console.log('overwrite', messageOverwrite, stats);
console.log('overwrite', messageOverwrite, metrics);
// throttle the update - and skip the last done message
if (!done)
@@ -0,0 +1,21 @@
/**
* This is a stored type - IMPORTANT: do not break.
* In particular this is used 'as' AixWire_Particles.ChatGenerateMetrics
*/
export type DChatGenerateMetrics = {
// T = Tokens
TIn?: number,
TCacheRead?: number,
TCacheWrite?: number,
TOut?: number,
TAll?: number,
// v = Tokens/s
vTOutInner?: number, // TOut / dtInner
vTOutAll?: number, // TOut / dtAll
// dt = milliseconds
dtStart?: number,
dtInner?: number,
dtAll?: number,
};
+1 -1
View File
@@ -65,8 +65,8 @@ export class PartReassembler {
// handled outside
case 'end':
case 'set-metrics':
case 'set-model':
case 'update-counts':
break;
case '_debugRequest':
+21 -17
View File
@@ -1,6 +1,7 @@
import type { ChatStreamingInputSchema } from '~/modules/llms/server/llm.server.streaming';
import { findServiceAccessOrThrow } from '~/modules/llms/vendors/vendor.helpers';
import type { DChatGenerateMetrics } from '~/common/stores/metrics/metrics.types';
import type { DLLMId } from '~/common/stores/llms/dllm.types';
import type { DMessageContentFragment } from '~/common/stores/chat/chat.fragments';
import type { DMessageMetadata } from '~/common/stores/chat/chat.message';
@@ -18,16 +19,11 @@ import { PartReassembler } from './PartReassembler';
export type StreamingClientUpdate = {
// interpreted
typing: boolean;
stats?: {
chatIn?: number,
chatOut?: number,
chatOutRate?: number,
chatTimeInner?: number,
};
metrics: DChatGenerateMetrics;
// replacers in DMessage
fragments: DMessageContentFragment[];
originLLM: string;
modelName: string;
// additive to DMessage
metadata?: DMessageMetadata;
@@ -70,7 +66,11 @@ export async function aixStreamingChatGenerate<TServiceSettings extends object =
// execute via the vendor
// return await vendor.streamingChatGenerateOrThrow(aixAccess, llmId, llmOptions, messages, contextName, contextRef, functions, forceFunctionName, abortSignal, onUpdate);
return await _aix_LL_ChatGenerateContent(aixAccess, aixModel, aixChatGenerate, aixContext, streaming, abortSignal, onUpdate);
const value = await _aix_LL_ChatGenerateContent(aixAccess, aixModel, aixChatGenerate, aixContext, streaming, abortSignal, onUpdate);
console.log(value.metrics);
return value;
}
@@ -168,10 +168,10 @@ async function _aix_LL_ChatGenerateContent(
let messageAccumulator: StreamingClientUpdate = {
typing: true,
// stats: not added until we have it
fragments: [],
originLLM: aixModel.id,
metrics: {},
modelName: aixModel.id,
typing: true,
// metadata: not set because additive and overwriting when set
};
@@ -208,13 +208,11 @@ async function _aix_LL_ChatGenerateContent(
break;
}
break;
case 'set-model':
messageAccumulator.originLLM = update.name;
case 'set-metrics':
messageAccumulator.metrics = update.metrics;
break;
case 'update-counts':
messageAccumulator.stats = update.counts;
case 'set-model':
messageAccumulator.modelName = update.name;
break;
}
}
@@ -238,6 +236,12 @@ async function _aix_LL_ChatGenerateContent(
}
}
// add aggregate metrics
const metrics = messageAccumulator.metrics;
metrics.TAll = (metrics.TIn || 0) + (metrics.TOut || 0) + (metrics.TCacheRead || 0) + (metrics.TCacheWrite || 0);
if (metrics.TOut !== undefined && metrics.dtAll !== undefined && metrics.dtAll > 0)
metrics.vTOutAll = Math.round(100 * metrics.TOut / (metrics.dtAll / 1000)) / 100;
// and we're done
messageAccumulator = {
...messageAccumulator,
+9 -12
View File
@@ -1,5 +1,7 @@
import { z } from 'zod';
// We return for now the same object that ends up stored
import type { DChatGenerateMetrics } from '~/common/stores/metrics/metrics.types';
// Used to align Partlces to the Typescript definitions from the frontend-side, on 'chat.fragments.ts'
import type { DMessageToolResponsePart } from '~/common/stores/chat/chat.fragments';
@@ -439,8 +441,8 @@ export namespace AixWire_Particles {
// | { cg: 'start' } // not really used for now
| { cg: 'end', reason: CGEndReason, tokenStopReason: GCTokenStopReason }
| { cg: 'issue', issueId: CGIssueId, issueText: string }
| { cg: 'set-metrics', metrics: ChatGenerateMetrics }
| { cg: 'set-model', name: string }
| { cg: 'update-counts', counts: Partial<ChatGenerateCounts> }
| { cg: '_debugRequest', security: 'dev-env', request: { url: string, headers: string, body: string } }; // may generalize this in the future
export type CGEndReason = // the reason for the end of the chat generation
@@ -469,17 +471,12 @@ export namespace AixWire_Particles {
| 'filter-recitation' // recitation filter (e.g. recitation)
| 'out-of-tokens'; // got out of tokens
export type ChatGenerateCounts = {
chatIn?: number,
chatInCacheRead?: number,
chatInCacheWrite?: number,
chatOut?: number,
chatOutRate?: number,
chatTimeStart?: number,
chatTimeInner?: number,
chatTimeTotal?: number,
};
/**
* NOTE: break compatbility with this D-stored-type only when we'll
* start to need backwards-incompatible Particle->Reassembler flexibility,
* which can't be just extended in the D-stored-type.
*/
export type ChatGenerateMetrics = Omit<DChatGenerateMetrics, 'TAll' | 'vTOutAll'>;
// TextParticle / PartParticle - keep in line with the DMessage*Part counterparts
@@ -45,10 +45,10 @@ export class ChatGenerateTransmitter implements IParticleTransmitter {
// Token stop reason
private tokenStopReason: AixWire_Particles.GCTokenStopReason | undefined = undefined;
// Counters
private accCounts: AixWire_Particles.ChatGenerateCounts | undefined = undefined;
private sentCounts: boolean = false;
private freshCounts: boolean = false;
// Metrics
private accMetrics: AixWire_Particles.ChatGenerateMetrics | undefined = undefined;
private sentMetrics: boolean = false;
private freshMetrics: boolean = false;
constructor(private readonly prettyDialect: string, _throttleTimeMs: number | undefined) {
@@ -75,13 +75,13 @@ export class ChatGenerateTransmitter implements IParticleTransmitter {
/// aix.router.ts
* emitParticles(): Generator<AixWire_Particles.ChatGenerateOp> {
// Counters: emit at the beginning and the end -- if there's data to transmit
if (!this.sentCounts && this.freshCounts && this.accCounts) {
this.sentCounts = true;
this.freshCounts = false;
// Metrics: emit at the beginning and the end -- if there's data to transmit
if (!this.sentMetrics && this.freshMetrics && this.accMetrics) {
this.sentMetrics = true;
this.freshMetrics = false;
this.transmissionQueue.push({
cg: 'update-counts',
counts: this.accCounts,
cg: 'set-metrics',
metrics: this.accMetrics,
});
}
@@ -105,7 +105,7 @@ export class ChatGenerateTransmitter implements IParticleTransmitter {
* flushParticles(): Generator<AixWire_Particles.ChatGenerateOp> {
this._queueParticleS();
this.sentCounts = false; // enable sending counters again
this.sentMetrics = false; // enable sending metrics again
return yield* this.emitParticles();
}
@@ -271,19 +271,19 @@ export class ChatGenerateTransmitter implements IParticleTransmitter {
});
}
/** Update the counters, sent twice (after the first call, and then at the end of the transmission) */
setCounters(counts: AixWire_Particles.ChatGenerateCounts) {
if (!this.accCounts)
this.accCounts = {};
/** Update the metrics, sent twice (after the first call, and then at the end of the transmission) */
updateMetrics(update: Partial<AixWire_Particles.ChatGenerateMetrics>) {
if (!this.accMetrics)
this.accMetrics = {};
// similar to Object.assign, but takes care of removing the "undefined" entries
for (const key in counts) {
const value = (counts as any)[key] as number | undefined;
for (const key in update) {
const value = (update as any)[key] as number | undefined;
if (value !== undefined)
(this.accCounts as any)[key] = value;
(this.accMetrics as any)[key] = value;
}
this.freshCounts = true;
this.freshMetrics = true;
}
}
@@ -47,7 +47,7 @@ export interface IParticleTransmitter {
/** Communicates the finish reason to the client */
setTokenStopReason(reason: AixWire_Particles.GCTokenStopReason): void;
/** Update the counters, sent twice (after the first call, and then at the end of the transmission) */
setCounters(counts: AixWire_Particles.ChatGenerateCounts): void;
/** Update the metrics, sent twice (after the first call, and then at the end of the transmission) */
updateMetrics(update: Partial<AixWire_Particles.ChatGenerateMetrics>): void;
}
@@ -1,5 +1,6 @@
import { safeErrorString } from '~/server/wire';
import type { AixWire_Particles } from '../../../api/aix.wiretypes';
import type { ChatGenerateParseFunction } from '../chatGenerate.dispatch';
import type { IParticleTransmitter } from '../IParticleTransmitter';
import { IssueSymbols } from '../ChatGenerateTransmitter';
@@ -78,13 +79,16 @@ export function createAnthropicMessageParser(): ChatGenerateParseFunction {
pt.setModelName(responseMessage.model);
if (responseMessage.usage) {
chatInTokens = responseMessage.usage.input_tokens;
pt.setCounters({
chatIn: chatInTokens,
chatInCacheRead: responseMessage.usage.cache_read_input_tokens || 0,
chatInCacheWrite: responseMessage.usage.cache_creation_input_tokens || 0,
chatOut: responseMessage.usage.output_tokens,
chatTimeStart: timeToFirstEvent,
});
const metricsUpdate: AixWire_Particles.ChatGenerateMetrics = {
TIn: chatInTokens,
TOut: responseMessage.usage.output_tokens,
dtStart: timeToFirstEvent,
};
if (responseMessage.usage.cache_read_input_tokens !== undefined)
metricsUpdate.TCacheRead = responseMessage.usage.cache_read_input_tokens;
if (responseMessage.usage.cache_creation_input_tokens !== undefined)
metricsUpdate.TCacheWrite = responseMessage.usage.cache_creation_input_tokens;
pt.updateMetrics(metricsUpdate);
}
break;
@@ -170,15 +174,16 @@ export function createAnthropicMessageParser(): ChatGenerateParseFunction {
pt.setTokenStopReason(tokenStopReason);
if (usage?.output_tokens && messageStartTime) {
const elapsedTimeSeconds = (Date.now() - messageStartTime) / 1000;
const elapsedTimeMilliseconds = Date.now() - messageStartTime;
const elapsedTimeSeconds = elapsedTimeMilliseconds / 1000;
const chatOutRate = elapsedTimeSeconds > 0 ? usage.output_tokens / elapsedTimeSeconds : 0;
pt.setCounters({
chatIn: chatInTokens !== undefined ? chatInTokens : -1,
chatOut: usage.output_tokens,
chatOutRate: Math.round(chatOutRate * 100) / 100, // Round to 2 decimal places
chatTimeStart: timeToFirstEvent,
chatTimeInner: elapsedTimeSeconds,
chatTimeTotal: Date.now() - parserCreationTimestamp,
pt.updateMetrics({
TIn: chatInTokens !== undefined ? chatInTokens : -1,
TOut: usage.output_tokens,
vTOutInner: Math.round(chatOutRate * 100) / 100, // Round to 2 decimal places
dtStart: timeToFirstEvent,
dtInner: elapsedTimeMilliseconds,
dtAll: Date.now() - parserCreationTimestamp,
});
}
} else
@@ -205,7 +210,7 @@ export function createAnthropicMessageParser(): ChatGenerateParseFunction {
export function createAnthropicMessageParserNS(): ChatGenerateParseFunction {
let messageStartTime: number = Date.now();
const parserCreationTimestamp = Date.now();
return function(pt: IParticleTransmitter, fullData: string): void {
@@ -246,18 +251,22 @@ export function createAnthropicMessageParserNS(): ChatGenerateParseFunction {
// -> Stats
if (usage) {
const elapsedTimeSeconds = (Date.now() - messageStartTime) / 1000;
const elapsedTimeMilliseconds = Date.now() - parserCreationTimestamp;
const elapsedTimeSeconds = elapsedTimeMilliseconds / 1000;
const chatOutRate = elapsedTimeSeconds > 0 ? usage.output_tokens / elapsedTimeSeconds : 0;
pt.setCounters({
chatIn: usage.input_tokens,
chatInCacheRead: usage.cache_read_input_tokens || 0,
chatInCacheWrite: usage.cache_creation_input_tokens || 0,
chatOut: usage.output_tokens,
chatOutRate: Math.round(chatOutRate * 100) / 100, // Round to 2 decimal places
// chatTimeStart: // meaningless non-streaming
// chatTimeInner: // we don't know
chatTimeTotal: elapsedTimeSeconds,
});
const metricsUpdate: AixWire_Particles.ChatGenerateMetrics = {
TIn: usage.input_tokens,
TOut: usage.output_tokens,
vTOutInner: Math.round(chatOutRate * 100) / 100, // Round to 2 decimal places
// dtStart: // we don't know
// dtInner: // we don't know
dtAll: elapsedTimeMilliseconds,
};
if (usage.cache_read_input_tokens !== undefined)
metricsUpdate.TCacheRead = usage.cache_read_input_tokens;
if (usage.cache_creation_input_tokens !== undefined)
metricsUpdate.TCacheWrite = usage.cache_creation_input_tokens;
pt.updateMetrics(metricsUpdate);
}
};
}
@@ -1,3 +1,4 @@
import type { AixWire_Particles } from '../../../api/aix.wiretypes';
import type { ChatGenerateParseFunction } from '../chatGenerate.dispatch';
import type { IParticleTransmitter } from '../IParticleTransmitter';
import { IssueSymbols } from '../ChatGenerateTransmitter';
@@ -26,7 +27,7 @@ export function createGeminiGenerateContentResponseParser(modelId: string): Chat
const modelName = modelId.replace('models/', '');
let hasBegun = false;
let timeToFirstEvent: number;
let skipSendingTotalTimeOnce = true;
let skipComputingTotalsOnce = true;
// this can throw, it's caught by the caller
return function(pt: IParticleTransmitter, eventData: string): void {
@@ -153,14 +154,22 @@ export function createGeminiGenerateContentResponseParser(modelId: string): Chat
// -> Stats
if (generationChunk.usageMetadata) {
pt.setCounters({
chatIn: generationChunk.usageMetadata.promptTokenCount,
chatOut: generationChunk.usageMetadata.candidatesTokenCount,
chatTimeStart: timeToFirstEvent,
// chatTimeInner: // not reported
...skipSendingTotalTimeOnce ? {} : { chatTimeTotal: Date.now() - parserCreationTimestamp }, // the second...end-1 packets will be held
});
skipSendingTotalTimeOnce = false;
const metricsUpdate: AixWire_Particles.ChatGenerateMetrics = {
TIn: generationChunk.usageMetadata.promptTokenCount,
TOut: generationChunk.usageMetadata.candidatesTokenCount,
dtStart: timeToFirstEvent,
};
// the first end-1 packet will be skipped
if (!skipComputingTotalsOnce) {
metricsUpdate.dtAll = Date.now() - parserCreationTimestamp;
if (metricsUpdate.dtAll > timeToFirstEvent)
metricsUpdate.dtInner = metricsUpdate.dtAll - timeToFirstEvent;
if (metricsUpdate.TOut)
metricsUpdate.vTOutInner = Math.round(100 * 1000 /*ms/s*/ * metricsUpdate.TOut / (metricsUpdate.dtInner || metricsUpdate.dtAll)) / 100;
}
pt.updateMetrics(metricsUpdate);
// the second (end) packet will be sent
skipComputingTotalsOnce = false;
}
};
@@ -1,6 +1,7 @@
import { safeErrorString } from '~/server/wire';
import { serverSideId } from '~/server/api/trpc.nanoid';
import type { AixWire_Particles } from '../../../api/aix.wiretypes';
import type { ChatGenerateParseFunction } from '../chatGenerate.dispatch';
import type { IParticleTransmitter } from '../IParticleTransmitter';
import { IssueSymbols } from '../ChatGenerateTransmitter';
@@ -87,14 +88,17 @@ export function createOpenAIChatCompletionsChunkParser(): ChatGenerateParseFunct
// -> Stats
if (json.usage) {
if (json.usage.completion_tokens !== undefined)
pt.setCounters({
chatIn: json.usage.prompt_tokens || -1,
chatOut: json.usage.completion_tokens,
...timeToFirstEvent !== undefined ? { chatTimeStart: timeToFirstEvent } : {},
// chatTimeInner: openAI is not reporting the time as seen by the servers
chatTimeTotal: Date.now() - parserCreationTimestamp,
});
if (json.usage.completion_tokens !== undefined) {
const metricsUpdate: AixWire_Particles.ChatGenerateMetrics = {
TIn: json.usage.prompt_tokens || -1,
TOut: json.usage.completion_tokens,
// dtInner: openAI is not reporting the time as seen by the servers
dtAll: Date.now() - parserCreationTimestamp,
};
if (timeToFirstEvent !== undefined)
metricsUpdate.dtStart = timeToFirstEvent;
pt.updateMetrics(metricsUpdate);
}
// [OpenAI] Expected correct case: the last object has usage, but an empty choices array
if (!json.choices.length)
@@ -106,14 +110,16 @@ export function createOpenAIChatCompletionsChunkParser(): ChatGenerateParseFunct
timeToFirstEvent = undefined;
if (json.x_groq?.usage) {
const { prompt_tokens, completion_tokens, completion_time } = json.x_groq.usage;
pt.setCounters({
chatIn: prompt_tokens,
chatOut: completion_tokens,
chatOutRate: (completion_tokens && completion_time) ? Math.round((completion_tokens / completion_time) * 100) / 100 : undefined,
...timeToFirstEvent !== undefined ? { chatTimeStart: timeToFirstEvent } : {},
chatTimeInner: Math.round((completion_time || 0) * 1000),
chatTimeTotal: Date.now() - parserCreationTimestamp,
});
const metricsUpdate: AixWire_Particles.ChatGenerateMetrics = {
TIn: prompt_tokens,
TOut: completion_tokens,
vTOutInner: (completion_tokens && completion_time) ? Math.round((completion_tokens / completion_time) * 100) / 100 : undefined,
dtInner: Math.round((completion_time || 0) * 1000),
dtAll: Date.now() - parserCreationTimestamp,
};
if (timeToFirstEvent !== undefined)
metricsUpdate.dtStart = timeToFirstEvent;
pt.updateMetrics(metricsUpdate);
}
// expect: 1 completion, or stop
@@ -227,12 +233,13 @@ export function createOpenAIChatCompletionsParserNS(): ChatGenerateParseFunction
// -> Stats
if (json.usage)
pt.setCounters({
chatIn: json.usage.prompt_tokens,
chatOut: json.usage.completion_tokens,
// chatTimeStart: ... // not meaningful for non-streaming
// chatTimeInner: ... // not measured/reportd by OpenAI
chatTimeTotal: Date.now() - parserCreationTimestamp,
pt.updateMetrics({
TIn: json.usage.prompt_tokens,
TOut: json.usage.completion_tokens,
// vTOutInner: ... // we don't have the inner time to compute this
// dtStart: ... // not meaningful for non-streaming
// dtInner: ... // not measured/reportd by OpenAI
dtAll: Date.now() - parserCreationTimestamp,
});
// Assumption/validate: expect 1 completion, or stop