From 1b60f1062f3ed2e8311650b96daa712f773fab33 Mon Sep 17 00:00:00 2001 From: Enrico Ros Date: Mon, 14 Oct 2024 22:52:18 -0700 Subject: [PATCH] Aix: push down throttling. --- src/modules/aix/client/aix.client.ts | 36 +++++++++++++++------------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/src/modules/aix/client/aix.client.ts b/src/modules/aix/client/aix.client.ts index 178bc40ae..c524cb4bd 100644 --- a/src/modules/aix/client/aix.client.ts +++ b/src/modules/aix/client/aix.client.ts @@ -244,13 +244,11 @@ export async function aixChatGenerateText_Simple( aixContext, aixStreaming, abortSignal, - !aixStreaming ? undefined : (ll: AixChatGenerateContent_LL, isDone: boolean) => { - if (isDone) return; // optimization + clientOptions?.throttleParallelThreads ?? 0, + !aixStreaming ? undefined : (ll: AixChatGenerateContent_LL, _isDone: boolean /* we want to issue this, in case the next action is an exception */) => { _llToText(ll, state); - if (onTextStreamUpdate && state.text !== null) { - // TODO: throttler? or push it down to the lower level + if (onTextStreamUpdate && state.text !== null) onTextStreamUpdate(state.text, false, state.generator); - } }, ); @@ -403,9 +401,6 @@ export async function aixChatGenerateContent_DMessage { - if (isDone) return; - _llToDMessage(ll, dMessage); + if (isDone) return; // optimization, as there aren't branches between here and the final update below if (onStreamingUpdate) { - if (throttler) - throttler.decimate(() => onStreamingUpdate(dMessage, false)); - else - onStreamingUpdate(dMessage, false); + _llToDMessage(ll, dMessage); + onStreamingUpdate(dMessage, false); } }, ); @@ -512,6 +504,7 @@ export interface AixChatGenerateContent_LL { * @param aixContext specifies the scope of the caller, such as what's the high level objective of this call * @param aixStreaming requests the source to provide incremental updates * @param abortSignal allows the caller to stop the operation + * @param throttleParallelThreads allows the caller to limit the number of parallel threads * * The output is an accumulator object with the fragments, and the generator * pieces (metrics, model name, token stop reason) @@ -529,6 +522,7 @@ async function _aixChatGenerateContent_LL( aixStreaming: boolean, // others abortSignal: AbortSignal, + throttleParallelThreads: number | undefined, // optional streaming callback onReassemblyUpdate?: (accumulator: AixChatGenerateContent_LL, isDone: boolean) => void, ): Promise { @@ -540,6 +534,11 @@ async function _aixChatGenerateContent_LL( }; const contentReassembler = new ContentReassembler(accumulator_LL); + // Initialize throttler if throttling is enabled + const throttler = (onReassemblyUpdate && throttleParallelThreads) + ? new ThrottleFunctionCall(throttleParallelThreads) + : null; + try { // tRPC Aix Chat Generation (streaming) API - inside the try block for deployment path errors @@ -561,7 +560,12 @@ async function _aixChatGenerateContent_LL( // reassemble the particles for await (const particle of particles) { contentReassembler.reassembleParticle(particle, abortSignal.aborted); - onReassemblyUpdate?.(accumulator_LL, false); + if (onReassemblyUpdate) { + if (throttler) + throttler.decimate(() => onReassemblyUpdate(accumulator_LL, false)); + else + onReassemblyUpdate(accumulator_LL, false); + } } } catch (error: any) {