From 7f9256573991dbd1c6b722a03490f82ffbf7a0e0 Mon Sep 17 00:00:00 2001 From: nai-degen Date: Sun, 7 Jan 2024 16:19:22 -0600 Subject: [PATCH] SSE queueing adjustments, untested --- .../response/handle-streamed-response.ts | 2 +- src/proxy/middleware/response/index.ts | 94 ++++++++++++++----- src/proxy/queue.ts | 44 ++++++--- 3 files changed, 105 insertions(+), 35 deletions(-) diff --git a/src/proxy/middleware/response/handle-streamed-response.ts b/src/proxy/middleware/response/handle-streamed-response.ts index 25575f3..54bc43b 100644 --- a/src/proxy/middleware/response/handle-streamed-response.ts +++ b/src/proxy/middleware/response/handle-streamed-response.ts @@ -88,7 +88,7 @@ export const handleStreamedResponse: RawResponseBodyHandler = async ( `Re-enqueueing request due to retryable error during streaming response.` ); req.retryCount++; - enqueue(req); + await enqueue(req); } else { const { message, stack, lastEvent } = err; const eventText = JSON.stringify(lastEvent, null, 2) ?? "undefined" diff --git a/src/proxy/middleware/response/index.ts b/src/proxy/middleware/response/index.ts index b6d2b02..d485a5b 100644 --- a/src/proxy/middleware/response/index.ts +++ b/src/proxy/middleware/response/index.ts @@ -152,13 +152,13 @@ export const createOnProxyResHandler = (apiMiddleware: ProxyResMiddleware) => { }; }; -function reenqueueRequest(req: Request) { +async function reenqueueRequest(req: Request) { req.log.info( { key: req.key?.hash, retryCount: req.retryCount }, `Re-enqueueing request due to retryable error` ); req.retryCount++; - enqueue(req); + await enqueue(req); } /** @@ -301,14 +301,14 @@ const handleUpstreamErrors: ProxyResHandlerWithBody = async ( } else if (errorPayload.error?.code === "billing_hard_limit_reached") { // For some reason, some models return this 400 error instead of the // same 429 billing error that other models return. - handleOpenAIRateLimitError(req, tryAgainMessage, errorPayload); + await handleOpenAIRateLimitError(req, tryAgainMessage, errorPayload); } else { errorPayload.proxy_note = `The upstream API rejected the request. Your prompt may be too long for ${req.body?.model}.`; } break; case "anthropic": case "aws": - maybeHandleMissingPreambleError(req, errorPayload); + await maybeHandleMissingPreambleError(req, errorPayload); break; default: assertNever(service); @@ -343,20 +343,20 @@ const handleUpstreamErrors: ProxyResHandlerWithBody = async ( } else if (statusCode === 429) { switch (service) { case "openai": - handleOpenAIRateLimitError(req, tryAgainMessage, errorPayload); + await handleOpenAIRateLimitError(req, tryAgainMessage, errorPayload); break; case "anthropic": - handleAnthropicRateLimitError(req, errorPayload); + await handleAnthropicRateLimitError(req, errorPayload); break; case "aws": - handleAwsRateLimitError(req, errorPayload); + await handleAwsRateLimitError(req, errorPayload); break; case "azure": case "mistral-ai": - handleAzureRateLimitError(req, errorPayload); + await handleAzureRateLimitError(req, errorPayload); break; case "google-ai": - handleGoogleAIRateLimitError(req, errorPayload); + await handleGoogleAIRateLimitError(req, errorPayload); break; default: assertNever(service); @@ -428,7 +428,7 @@ const handleUpstreamErrors: ProxyResHandlerWithBody = async ( * } * ``` */ -function maybeHandleMissingPreambleError( +async function maybeHandleMissingPreambleError( req: Request, errorPayload: ProxiedErrorPayload ) { @@ -441,27 +441,27 @@ function maybeHandleMissingPreambleError( "Request failed due to missing preamble. Key will be marked as such for subsequent requests." ); keyPool.update(req.key!, { requiresPreamble: true }); - reenqueueRequest(req); + await reenqueueRequest(req); throw new RetryableError("Claude request re-enqueued to add preamble."); } else { errorPayload.proxy_note = `Proxy received unrecognized error from Anthropic. Check the specific error for more information.`; } } -function handleAnthropicRateLimitError( +async function handleAnthropicRateLimitError( req: Request, errorPayload: ProxiedErrorPayload ) { if (errorPayload.error?.type === "rate_limit_error") { keyPool.markRateLimited(req.key!); - reenqueueRequest(req); + await reenqueueRequest(req); throw new RetryableError("Claude rate-limited request re-enqueued."); } else { errorPayload.proxy_note = `Unrecognized rate limit error from Anthropic. Key may be over quota.`; } } -function handleAwsRateLimitError( +async function handleAwsRateLimitError( req: Request, errorPayload: ProxiedErrorPayload ) { @@ -469,7 +469,7 @@ function handleAwsRateLimitError( switch (errorType) { case "ThrottlingException": keyPool.markRateLimited(req.key!); - reenqueueRequest(req); + await reenqueueRequest(req); throw new RetryableError("AWS rate-limited request re-enqueued."); case "ModelNotReadyException": errorPayload.proxy_note = `The requested model is overloaded. Try again in a few seconds.`; @@ -479,11 +479,11 @@ function handleAwsRateLimitError( } } -function handleOpenAIRateLimitError( +async function handleOpenAIRateLimitError( req: Request, tryAgainMessage: string, errorPayload: ProxiedErrorPayload -): Record { +): Promise> { const type = errorPayload.error?.type; switch (type) { case "insufficient_quota": @@ -512,8 +512,58 @@ function handleOpenAIRateLimitError( } // Per-minute request or token rate limit is exceeded, which we can retry - reenqueueRequest(req); + await reenqueueRequest(req); throw new RetryableError("Rate-limited request re-enqueued."); + // WIP/nonfunctional + // case "tokens_usage_based": + // // Weird new rate limit type that seems limited to preview models. + // // Distinct from `tokens` type. Can be per-minute or per-day. + // + // // I've seen reports of this error for 500k tokens/day and 10k tokens/min. + // // 10k tokens per minute is problematic, because this is much less than + // // GPT4-Turbo's max context size for a single prompt and is effectively a + // // cap on the max context size for just that key+model, which the app is + // // not able to deal with. + // + // // Similarly if there is a 500k tokens per day limit and 450k tokens have + // // been used today, the max context for that key becomes 50k tokens until + // // the next day and becomes progressively smaller as more tokens are used. + // + // // To work around these keys we will first retry the request a few times. + // // After that we will reject the request, and if it's a per-day limit we + // // will also disable the key. + // + // // "Rate limit reached for gpt-4-1106-preview in organization org-xxxxxxxxxxxxxxxxxxx on tokens_usage_based per day: Limit 500000, Used 460000, Requested 50000" + // // "Rate limit reached for gpt-4-1106-preview in organization org-xxxxxxxxxxxxxxxxxxx on tokens_usage_based per min: Limit 10000, Requested 40000" + // + // const regex = + // /Rate limit reached for .+ in organization .+ on \w+ per (day|min): Limit (\d+)(?:, Used (\d+))?, Requested (\d+)/; + // const [, period, limit, used, requested] = + // errorPayload.error?.message?.match(regex) || []; + // + // req.log.warn( + // { key: req.key?.hash, period, limit, used, requested }, + // "Received `tokens_usage_based` rate limit error from OpenAI." + // ); + // + // if (!period || !limit || !requested) { + // errorPayload.proxy_note = `Unrecognized rate limit error from OpenAI. (${errorPayload.error?.message})`; + // break; + // } + // + // if (req.retryCount < 2) { + // await reenqueueRequest(req); + // throw new RetryableError("Rate-limited request re-enqueued."); + // } + // + // if (period === "min") { + // errorPayload.proxy_note = `Assigned key can't be used for prompts longer than ${limit} tokens, and no other keys are available right now. Reduce the length of your prompt or try again in a few minutes.`; + // } else { + // errorPayload.proxy_note = `Assigned key has reached its per-day request limit for this model. Try another model.`; + // } + // + // keyPool.markRateLimited(req.key!); + // break; default: errorPayload.proxy_note = `This is likely a temporary error with OpenAI. Try again in a few seconds.`; break; @@ -521,7 +571,7 @@ function handleOpenAIRateLimitError( return errorPayload; } -function handleAzureRateLimitError( +async function handleAzureRateLimitError( req: Request, errorPayload: ProxiedErrorPayload ) { @@ -529,7 +579,7 @@ function handleAzureRateLimitError( switch (code) { case "429": keyPool.markRateLimited(req.key!); - reenqueueRequest(req); + await reenqueueRequest(req); throw new RetryableError("Rate-limited request re-enqueued."); default: errorPayload.proxy_note = `Unrecognized rate limit error from Azure (${code}). Please report this.`; @@ -538,7 +588,7 @@ function handleAzureRateLimitError( } //{"error":{"code":429,"message":"Resource has been exhausted (e.g. check quota).","status":"RESOURCE_EXHAUSTED"} -function handleGoogleAIRateLimitError( +async function handleGoogleAIRateLimitError( req: Request, errorPayload: ProxiedErrorPayload ) { @@ -546,7 +596,7 @@ function handleGoogleAIRateLimitError( switch (status) { case "RESOURCE_EXHAUSTED": keyPool.markRateLimited(req.key!); - reenqueueRequest(req); + await reenqueueRequest(req); throw new RetryableError("Rate-limited request re-enqueued."); default: errorPayload.proxy_note = `Unrecognized rate limit error from Google AI (${status}). Please report this.`; diff --git a/src/proxy/queue.ts b/src/proxy/queue.ts index b35f335..698aa29 100644 --- a/src/proxy/queue.ts +++ b/src/proxy/queue.ts @@ -41,6 +41,7 @@ const LOAD_THRESHOLD = parseFloat(process.env.LOAD_THRESHOLD ?? "50"); const PAYLOAD_SCALE_FACTOR = parseFloat( process.env.PAYLOAD_SCALE_FACTOR ?? "6" ); +const QUEUE_JOIN_TIMEOUT = 5000; /** * Returns an identifier for a request. This is used to determine if a @@ -64,7 +65,7 @@ const sharesIdentifierWith = (incoming: Request) => (queued: Request) => const isFromSharedIp = (req: Request) => SHARED_IP_ADDRESSES.has(req.ip); -export function enqueue(req: Request) { +export async function enqueue(req: Request) { const enqueuedRequestCount = queue.filter(sharesIdentifierWith(req)).length; let isGuest = req.user?.token === undefined; @@ -96,7 +97,7 @@ export function enqueue(req: Request) { if (stream === "true" || stream === true || req.isStreaming) { const res = req.res!; if (!res.headersSent) { - initStreaming(req); + await initStreaming(req); } registerHeartbeat(req); } else if (getProxyLoad() > LOAD_THRESHOLD) { @@ -391,20 +392,39 @@ function killQueuedRequest(req: Request) { } } -function initStreaming(req: Request) { +async function initStreaming(req: Request) { const res = req.res!; initializeSseStream(res); - if (req.query.badSseParser) { - // Some clients have a broken SSE parser that doesn't handle comments - // correctly. These clients can pass ?badSseParser=true to - // disable comments in the SSE stream. - res.write(getHeartbeatPayload()); - return; - } + const joinMsg = `: joining queue at position ${ + queue.length + }\n\n${getHeartbeatPayload()}`; - res.write(`: joining queue at position ${queue.length}\n\n`); - res.write(getHeartbeatPayload()); + let drainTimeout: NodeJS.Timeout; + const welcome = new Promise((resolve, reject) => { + const onDrain = () => { + clearTimeout(drainTimeout); + req.log.debug(`Client finished consuming join message.`); + res.off("drain", onDrain); + resolve(); + }; + + drainTimeout = setTimeout(() => { + res.off("drain", onDrain); + res.destroy(); + reject(new Error("Unreponsive streaming client; killing connection")); + }, QUEUE_JOIN_TIMEOUT); + + if (!res.write(joinMsg)) { + req.log.warn("Kernel buffer is full; holding client request."); + res.once("drain", onDrain); + } else { + clearTimeout(drainTimeout); + resolve(); + } + }); + + await welcome; } /**