Aix: push down throttling.

This commit is contained in:
Enrico Ros
2024-10-14 22:52:18 -07:00
parent d105e28ca8
commit 1b60f1062f
+20 -16
View File
@@ -244,13 +244,11 @@ export async function aixChatGenerateText_Simple(
aixContext,
aixStreaming,
abortSignal,
!aixStreaming ? undefined : (ll: AixChatGenerateContent_LL, isDone: boolean) => {
if (isDone) return; // optimization
clientOptions?.throttleParallelThreads ?? 0,
!aixStreaming ? undefined : (ll: AixChatGenerateContent_LL, _isDone: boolean /* we want to issue this, in case the next action is an exception */) => {
_llToText(ll, state);
if (onTextStreamUpdate && state.text !== null) {
// TODO: throttler? or push it down to the lower level
if (onTextStreamUpdate && state.text !== null)
onTextStreamUpdate(state.text, false, state.generator);
}
},
);
@@ -403,9 +401,6 @@ export async function aixChatGenerateContent_DMessage<TServiceSettings extends o
// apply any vendor-specific rate limit
await llmVendor.rateLimitChatGenerate?.(llm, llmServiceSettings);
// decimator for the updates
const throttler = (onStreamingUpdate && clientOptions.throttleParallelThreads) ? new ThrottleFunctionCall(clientOptions.throttleParallelThreads) : null;
// Abort: if the operation is non-abortable, we can't use the AbortSignal
if (clientOptions.abortSignal === 'NON_ABORTABLE') {
// [DEV] UGLY: here we have non-abortable operations -- we silence the warning, but something may be done in the future
@@ -414,15 +409,12 @@ export async function aixChatGenerateContent_DMessage<TServiceSettings extends o
}
// Aix Low-Level Chat Generation
const llAccumulator = await _aixChatGenerateContent_LL(aixAccess, aixModel, aixChatGenerate, aixContext, aixStreaming, clientOptions.abortSignal,
const llAccumulator = await _aixChatGenerateContent_LL(aixAccess, aixModel, aixChatGenerate, aixContext, aixStreaming, clientOptions.abortSignal, clientOptions.throttleParallelThreads ?? 0,
(ll: AixChatGenerateContent_LL, isDone: boolean) => {
if (isDone) return;
_llToDMessage(ll, dMessage);
if (isDone) return; // optimization, as there aren't branches between here and the final update below
if (onStreamingUpdate) {
if (throttler)
throttler.decimate(() => onStreamingUpdate(dMessage, false));
else
onStreamingUpdate(dMessage, false);
_llToDMessage(ll, dMessage);
onStreamingUpdate(dMessage, false);
}
},
);
@@ -512,6 +504,7 @@ export interface AixChatGenerateContent_LL {
* @param aixContext specifies the scope of the caller, such as what's the high level objective of this call
* @param aixStreaming requests the source to provide incremental updates
* @param abortSignal allows the caller to stop the operation
* @param throttleParallelThreads allows the caller to limit the number of parallel threads
*
* The output is an accumulator object with the fragments, and the generator
* pieces (metrics, model name, token stop reason)
@@ -529,6 +522,7 @@ async function _aixChatGenerateContent_LL(
aixStreaming: boolean,
// others
abortSignal: AbortSignal,
throttleParallelThreads: number | undefined,
// optional streaming callback
onReassemblyUpdate?: (accumulator: AixChatGenerateContent_LL, isDone: boolean) => void,
): Promise<AixChatGenerateContent_LL> {
@@ -540,6 +534,11 @@ async function _aixChatGenerateContent_LL(
};
const contentReassembler = new ContentReassembler(accumulator_LL);
// Initialize throttler if throttling is enabled
const throttler = (onReassemblyUpdate && throttleParallelThreads)
? new ThrottleFunctionCall(throttleParallelThreads)
: null;
try {
// tRPC Aix Chat Generation (streaming) API - inside the try block for deployment path errors
@@ -561,7 +560,12 @@ async function _aixChatGenerateContent_LL(
// reassemble the particles
for await (const particle of particles) {
contentReassembler.reassembleParticle(particle, abortSignal.aborted);
onReassemblyUpdate?.(accumulator_LL, false);
if (onReassemblyUpdate) {
if (throttler)
throttler.decimate(() => onReassemblyUpdate(accumulator_LL, false));
else
onReassemblyUpdate(accumulator_LL, false);
}
}
} catch (error: any) {