Persona: port Throttle

This commit is contained in:
Enrico Ros
2024-07-04 03:18:07 -07:00
parent 065f30ac38
commit 42e97eed4c
+46 -36
View File
@@ -102,55 +102,37 @@ export async function llmGenerateContentStream(
messagesHistory: VChatMessageIn[],
contextName: VChatStreamContextName,
contextRef: VChatContextRef,
throttleUnits: number, // 0: disable, 1: default throttle (12Hz), 2+ reduce frequency with the square root
parallelViewCount: number, // 0: disable, 1: default throttle (12Hz), 2+ reduce frequency with the square root
abortSignal: AbortSignal,
onMessageUpdated: (incrementalMessage: Partial<StreamMessageUpdate>, messageComplete: boolean) => void,
): Promise<StreamMessageStatus> {
const returnStatus: StreamMessageStatus = {
outcome: 'success',
errorMessage: undefined,
};
const returnStatus: StreamMessageStatus = { outcome: 'success', errorMessage: undefined };
// Throttling setup
let lastCallTime = 0;
let throttleDelay = 1000 / 12; // 12 messages per second works well for 60Hz displays (single chat, and 24 in 4 chats, see the square root below)
if (throttleUnits > 1)
throttleDelay = Math.round(throttleDelay * Math.sqrt(throttleUnits));
function throttledEditMessage(updatedMessage: Partial<StreamMessageUpdate>) {
const now = Date.now();
if (throttleUnits === 0 || now - lastCallTime >= throttleDelay) {
onMessageUpdated(updatedMessage, false);
lastCallTime = now;
}
}
const throttler = new ThrottleFunctionCall(parallelViewCount);
// TODO: should clean this up once we have multi-fragment streaming/recombination
const incrementalAnswer: StreamMessageUpdate = {
fragments: [],
};
console.log('PERSONA HERE');
try {
const onUpdate = (update: StreamingClientUpdate, done: boolean) => {
// console.log('PERSONA UPDATE', update, done);
const textSoFar = update.textSoFar;
await aixStreamingChatGenerate(llmId, messagesHistory, contextName, contextRef, null, null, abortSignal,
(update: StreamingClientUpdate, done: boolean) => {
// grow the incremental message
if (textSoFar) incrementalAnswer.fragments = messageFragmentsReplaceLastContentText(incrementalAnswer.fragments, textSoFar);
if (update.originLLM) incrementalAnswer.originLLM = update.originLLM;
if (update.typing !== undefined)
incrementalAnswer.pendingIncomplete = update.typing ? true : undefined;
// grow the incremental message
if (update.textSoFar) incrementalAnswer.fragments = messageFragmentsReplaceLastContentText(incrementalAnswer.fragments, update.textSoFar);
if (update.originLLM) incrementalAnswer.originLLM = update.originLLM;
if (update.typing !== undefined)
incrementalAnswer.pendingIncomplete = update.typing ? true : undefined;
// Update the data store, with optional max-frequency throttling (e.g. OpenAI is downsamped 50 -> 12Hz)
// This can be toggled from the settings
throttledEditMessage(incrementalAnswer);
};
await aixStreamingChatGenerate(llmId, messagesHistory, contextName, contextRef, null, null, abortSignal, onUpdate);
// throttle the update
throttler.handleUpdate(() => {
onMessageUpdated(incrementalAnswer, false);
});
},
);
} catch (error: any) {
if (error?.name !== 'AbortError') {
@@ -164,7 +146,35 @@ export async function llmGenerateContentStream(
}
// Ensure the last content is flushed out, and mark as complete
onMessageUpdated({ ...incrementalAnswer, pendingIncomplete: undefined }, true);
throttler.finalize(() => {
onMessageUpdated({ ...incrementalAnswer, pendingIncomplete: undefined }, true);
});
return returnStatus;
}
}
export class ThrottleFunctionCall {
private readonly throttleDelay: number;
private lastCallTime: number = 0;
constructor(throttleUnits: number) {
// 12 messages per second works well for 60Hz displays (single chat, and 24 in 4 chats, see the square root below)
const baseDelayMs = 1000 / 12;
this.throttleDelay = throttleUnits === 0 ? 0
: throttleUnits > 1 ? Math.round(baseDelayMs * Math.sqrt(throttleUnits))
: baseDelayMs;
}
handleUpdate(fn: () => void): void {
const now = Date.now();
if (this.throttleDelay === 0 || this.lastCallTime === 0 || now - this.lastCallTime >= this.throttleDelay) {
fn();
this.lastCallTime = now;
}
}
finalize(fn: () => void): void {
fn(); // Always execute the final update
}
}