SSE queueing adjustments, untested

This commit is contained in:
nai-degen
2024-01-07 16:19:22 -06:00
parent 936d3c0721
commit 7f92565739
3 changed files with 105 additions and 35 deletions
@@ -88,7 +88,7 @@ export const handleStreamedResponse: RawResponseBodyHandler = async (
`Re-enqueueing request due to retryable error during streaming response.`
);
req.retryCount++;
enqueue(req);
await enqueue(req);
} else {
const { message, stack, lastEvent } = err;
const eventText = JSON.stringify(lastEvent, null, 2) ?? "undefined"
+72 -22
View File
@@ -152,13 +152,13 @@ export const createOnProxyResHandler = (apiMiddleware: ProxyResMiddleware) => {
};
};
function reenqueueRequest(req: Request) {
async function reenqueueRequest(req: Request) {
req.log.info(
{ key: req.key?.hash, retryCount: req.retryCount },
`Re-enqueueing request due to retryable error`
);
req.retryCount++;
enqueue(req);
await enqueue(req);
}
/**
@@ -301,14 +301,14 @@ const handleUpstreamErrors: ProxyResHandlerWithBody = async (
} else if (errorPayload.error?.code === "billing_hard_limit_reached") {
// For some reason, some models return this 400 error instead of the
// same 429 billing error that other models return.
handleOpenAIRateLimitError(req, tryAgainMessage, errorPayload);
await handleOpenAIRateLimitError(req, tryAgainMessage, errorPayload);
} else {
errorPayload.proxy_note = `The upstream API rejected the request. Your prompt may be too long for ${req.body?.model}.`;
}
break;
case "anthropic":
case "aws":
maybeHandleMissingPreambleError(req, errorPayload);
await maybeHandleMissingPreambleError(req, errorPayload);
break;
default:
assertNever(service);
@@ -343,20 +343,20 @@ const handleUpstreamErrors: ProxyResHandlerWithBody = async (
} else if (statusCode === 429) {
switch (service) {
case "openai":
handleOpenAIRateLimitError(req, tryAgainMessage, errorPayload);
await handleOpenAIRateLimitError(req, tryAgainMessage, errorPayload);
break;
case "anthropic":
handleAnthropicRateLimitError(req, errorPayload);
await handleAnthropicRateLimitError(req, errorPayload);
break;
case "aws":
handleAwsRateLimitError(req, errorPayload);
await handleAwsRateLimitError(req, errorPayload);
break;
case "azure":
case "mistral-ai":
handleAzureRateLimitError(req, errorPayload);
await handleAzureRateLimitError(req, errorPayload);
break;
case "google-ai":
handleGoogleAIRateLimitError(req, errorPayload);
await handleGoogleAIRateLimitError(req, errorPayload);
break;
default:
assertNever(service);
@@ -428,7 +428,7 @@ const handleUpstreamErrors: ProxyResHandlerWithBody = async (
* }
* ```
*/
function maybeHandleMissingPreambleError(
async function maybeHandleMissingPreambleError(
req: Request,
errorPayload: ProxiedErrorPayload
) {
@@ -441,27 +441,27 @@ function maybeHandleMissingPreambleError(
"Request failed due to missing preamble. Key will be marked as such for subsequent requests."
);
keyPool.update(req.key!, { requiresPreamble: true });
reenqueueRequest(req);
await reenqueueRequest(req);
throw new RetryableError("Claude request re-enqueued to add preamble.");
} else {
errorPayload.proxy_note = `Proxy received unrecognized error from Anthropic. Check the specific error for more information.`;
}
}
function handleAnthropicRateLimitError(
async function handleAnthropicRateLimitError(
req: Request,
errorPayload: ProxiedErrorPayload
) {
if (errorPayload.error?.type === "rate_limit_error") {
keyPool.markRateLimited(req.key!);
reenqueueRequest(req);
await reenqueueRequest(req);
throw new RetryableError("Claude rate-limited request re-enqueued.");
} else {
errorPayload.proxy_note = `Unrecognized rate limit error from Anthropic. Key may be over quota.`;
}
}
function handleAwsRateLimitError(
async function handleAwsRateLimitError(
req: Request,
errorPayload: ProxiedErrorPayload
) {
@@ -469,7 +469,7 @@ function handleAwsRateLimitError(
switch (errorType) {
case "ThrottlingException":
keyPool.markRateLimited(req.key!);
reenqueueRequest(req);
await reenqueueRequest(req);
throw new RetryableError("AWS rate-limited request re-enqueued.");
case "ModelNotReadyException":
errorPayload.proxy_note = `The requested model is overloaded. Try again in a few seconds.`;
@@ -479,11 +479,11 @@ function handleAwsRateLimitError(
}
}
function handleOpenAIRateLimitError(
async function handleOpenAIRateLimitError(
req: Request,
tryAgainMessage: string,
errorPayload: ProxiedErrorPayload
): Record<string, any> {
): Promise<Record<string, any>> {
const type = errorPayload.error?.type;
switch (type) {
case "insufficient_quota":
@@ -512,8 +512,58 @@ function handleOpenAIRateLimitError(
}
// Per-minute request or token rate limit is exceeded, which we can retry
reenqueueRequest(req);
await reenqueueRequest(req);
throw new RetryableError("Rate-limited request re-enqueued.");
// WIP/nonfunctional
// case "tokens_usage_based":
// // Weird new rate limit type that seems limited to preview models.
// // Distinct from `tokens` type. Can be per-minute or per-day.
//
// // I've seen reports of this error for 500k tokens/day and 10k tokens/min.
// // 10k tokens per minute is problematic, because this is much less than
// // GPT4-Turbo's max context size for a single prompt and is effectively a
// // cap on the max context size for just that key+model, which the app is
// // not able to deal with.
//
// // Similarly if there is a 500k tokens per day limit and 450k tokens have
// // been used today, the max context for that key becomes 50k tokens until
// // the next day and becomes progressively smaller as more tokens are used.
//
// // To work around these keys we will first retry the request a few times.
// // After that we will reject the request, and if it's a per-day limit we
// // will also disable the key.
//
// // "Rate limit reached for gpt-4-1106-preview in organization org-xxxxxxxxxxxxxxxxxxx on tokens_usage_based per day: Limit 500000, Used 460000, Requested 50000"
// // "Rate limit reached for gpt-4-1106-preview in organization org-xxxxxxxxxxxxxxxxxxx on tokens_usage_based per min: Limit 10000, Requested 40000"
//
// const regex =
// /Rate limit reached for .+ in organization .+ on \w+ per (day|min): Limit (\d+)(?:, Used (\d+))?, Requested (\d+)/;
// const [, period, limit, used, requested] =
// errorPayload.error?.message?.match(regex) || [];
//
// req.log.warn(
// { key: req.key?.hash, period, limit, used, requested },
// "Received `tokens_usage_based` rate limit error from OpenAI."
// );
//
// if (!period || !limit || !requested) {
// errorPayload.proxy_note = `Unrecognized rate limit error from OpenAI. (${errorPayload.error?.message})`;
// break;
// }
//
// if (req.retryCount < 2) {
// await reenqueueRequest(req);
// throw new RetryableError("Rate-limited request re-enqueued.");
// }
//
// if (period === "min") {
// errorPayload.proxy_note = `Assigned key can't be used for prompts longer than ${limit} tokens, and no other keys are available right now. Reduce the length of your prompt or try again in a few minutes.`;
// } else {
// errorPayload.proxy_note = `Assigned key has reached its per-day request limit for this model. Try another model.`;
// }
//
// keyPool.markRateLimited(req.key!);
// break;
default:
errorPayload.proxy_note = `This is likely a temporary error with OpenAI. Try again in a few seconds.`;
break;
@@ -521,7 +571,7 @@ function handleOpenAIRateLimitError(
return errorPayload;
}
function handleAzureRateLimitError(
async function handleAzureRateLimitError(
req: Request,
errorPayload: ProxiedErrorPayload
) {
@@ -529,7 +579,7 @@ function handleAzureRateLimitError(
switch (code) {
case "429":
keyPool.markRateLimited(req.key!);
reenqueueRequest(req);
await reenqueueRequest(req);
throw new RetryableError("Rate-limited request re-enqueued.");
default:
errorPayload.proxy_note = `Unrecognized rate limit error from Azure (${code}). Please report this.`;
@@ -538,7 +588,7 @@ function handleAzureRateLimitError(
}
//{"error":{"code":429,"message":"Resource has been exhausted (e.g. check quota).","status":"RESOURCE_EXHAUSTED"}
function handleGoogleAIRateLimitError(
async function handleGoogleAIRateLimitError(
req: Request,
errorPayload: ProxiedErrorPayload
) {
@@ -546,7 +596,7 @@ function handleGoogleAIRateLimitError(
switch (status) {
case "RESOURCE_EXHAUSTED":
keyPool.markRateLimited(req.key!);
reenqueueRequest(req);
await reenqueueRequest(req);
throw new RetryableError("Rate-limited request re-enqueued.");
default:
errorPayload.proxy_note = `Unrecognized rate limit error from Google AI (${status}). Please report this.`;
+32 -12
View File
@@ -41,6 +41,7 @@ const LOAD_THRESHOLD = parseFloat(process.env.LOAD_THRESHOLD ?? "50");
const PAYLOAD_SCALE_FACTOR = parseFloat(
process.env.PAYLOAD_SCALE_FACTOR ?? "6"
);
const QUEUE_JOIN_TIMEOUT = 5000;
/**
* Returns an identifier for a request. This is used to determine if a
@@ -64,7 +65,7 @@ const sharesIdentifierWith = (incoming: Request) => (queued: Request) =>
const isFromSharedIp = (req: Request) => SHARED_IP_ADDRESSES.has(req.ip);
export function enqueue(req: Request) {
export async function enqueue(req: Request) {
const enqueuedRequestCount = queue.filter(sharesIdentifierWith(req)).length;
let isGuest = req.user?.token === undefined;
@@ -96,7 +97,7 @@ export function enqueue(req: Request) {
if (stream === "true" || stream === true || req.isStreaming) {
const res = req.res!;
if (!res.headersSent) {
initStreaming(req);
await initStreaming(req);
}
registerHeartbeat(req);
} else if (getProxyLoad() > LOAD_THRESHOLD) {
@@ -391,20 +392,39 @@ function killQueuedRequest(req: Request) {
}
}
function initStreaming(req: Request) {
async function initStreaming(req: Request) {
const res = req.res!;
initializeSseStream(res);
if (req.query.badSseParser) {
// Some clients have a broken SSE parser that doesn't handle comments
// correctly. These clients can pass ?badSseParser=true to
// disable comments in the SSE stream.
res.write(getHeartbeatPayload());
return;
}
const joinMsg = `: joining queue at position ${
queue.length
}\n\n${getHeartbeatPayload()}`;
res.write(`: joining queue at position ${queue.length}\n\n`);
res.write(getHeartbeatPayload());
let drainTimeout: NodeJS.Timeout;
const welcome = new Promise<void>((resolve, reject) => {
const onDrain = () => {
clearTimeout(drainTimeout);
req.log.debug(`Client finished consuming join message.`);
res.off("drain", onDrain);
resolve();
};
drainTimeout = setTimeout(() => {
res.off("drain", onDrain);
res.destroy();
reject(new Error("Unreponsive streaming client; killing connection"));
}, QUEUE_JOIN_TIMEOUT);
if (!res.write(joinMsg)) {
req.log.warn("Kernel buffer is full; holding client request.");
res.once("drain", onDrain);
} else {
clearTimeout(drainTimeout);
resolve();
}
});
await welcome;
}
/**