improves streaming error handling

This commit is contained in:
nai-degen
2023-12-14 04:39:06 -06:00
parent de34d41918
commit 5599a83ae4
7 changed files with 176 additions and 104 deletions
+39 -18
View File
@@ -2,7 +2,7 @@ import { Request, Response } from "express";
import httpProxy from "http-proxy";
import { ZodError } from "zod";
import { generateErrorMessage } from "zod-error";
import { buildFakeSse } from "../../shared/streaming";
import { makeCompletionSSE } from "../../shared/streaming";
import { assertNever } from "../../shared/utils";
import { QuotaExceededError } from "./request/preprocessors/apply-quota-limits";
@@ -40,11 +40,13 @@ export function writeErrorResponse(
req: Request,
res: Response,
statusCode: number,
statusMessage: string,
errorPayload: Record<string, any>
) {
const errorSource = errorPayload.error?.type?.startsWith("proxy")
? "proxy"
: "upstream";
const msg =
statusCode === 500
? `The proxy encountered an error while trying to process your prompt.`
: `The proxy encountered an error while trying to send your prompt to the upstream service.`;
// If we're mid-SSE stream, send a data event with the error payload and end
// the stream. Otherwise just send a normal error response.
@@ -52,10 +54,15 @@ export function writeErrorResponse(
res.headersSent ||
String(res.getHeader("content-type")).startsWith("text/event-stream")
) {
const errorTitle = `${errorSource} error (${statusCode})`;
const errorContent = JSON.stringify(errorPayload, null, 2);
const msg = buildFakeSse(errorTitle, errorContent, req);
res.write(msg);
const event = makeCompletionSSE({
format: req.inboundApi,
title: `Proxy error (HTTP ${statusCode} ${statusMessage})`,
message: `${msg} Further technical details are provided below.`,
obj: errorPayload,
reqId: req.id,
model: req.body?.model,
});
res.write(event);
res.write(`data: [DONE]\n\n`);
res.end();
} else {
@@ -77,8 +84,9 @@ export const classifyErrorAndSend = (
res: Response
) => {
try {
const { status, userMessage, ...errorDetails } = classifyError(err);
writeErrorResponse(req, res, status, {
const { statusCode, statusMessage, userMessage, ...errorDetails } =
classifyError(err);
writeErrorResponse(req, res, statusCode, statusMessage, {
error: { message: userMessage, ...errorDetails },
});
} catch (error) {
@@ -88,14 +96,17 @@ export const classifyErrorAndSend = (
function classifyError(err: Error): {
/** HTTP status code returned to the client. */
status: number;
statusCode: number;
/** HTTP status message returned to the client. */
statusMessage: string;
/** Message displayed to the user. */
userMessage: string;
/** Short error type, e.g. "proxy_validation_error". */
type: string;
} & Record<string, any> {
const defaultError = {
status: 500,
statusCode: 500,
statusMessage: "Internal Server Error",
userMessage: `Reverse proxy error: ${err.message}`,
type: "proxy_internal_error",
stack: err.stack,
@@ -112,19 +123,26 @@ function classifyError(err: Error): {
return `At '${rest.pathComponent}': ${issue.message}`;
},
});
return { status: 400, userMessage, type: "proxy_validation_error" };
return {
statusCode: 400,
statusMessage: "Bad Request",
userMessage,
type: "proxy_validation_error",
};
case "ForbiddenError":
// Mimics a ban notice from OpenAI, thrown when blockZoomerOrigins blocks
// a request.
return {
status: 403,
statusCode: 403,
statusMessage: "Forbidden",
userMessage: `Your account has been disabled for violating our terms of service.`,
type: "organization_account_disabled",
code: "policy_violation",
};
case "QuotaExceededError":
return {
status: 429,
statusCode: 429,
statusMessage: "Too Many Requests",
userMessage: `You've exceeded your token quota for this model type.`,
type: "proxy_quota_exceeded",
info: (err as QuotaExceededError).quotaInfo,
@@ -134,21 +152,24 @@ function classifyError(err: Error): {
switch (err.code) {
case "ENOTFOUND":
return {
status: 502,
statusCode: 502,
statusMessage: "Bad Gateway",
userMessage: `Reverse proxy encountered a DNS error while trying to connect to the upstream service.`,
type: "proxy_network_error",
code: err.code,
};
case "ECONNREFUSED":
return {
status: 502,
statusCode: 502,
statusMessage: "Bad Gateway",
userMessage: `Reverse proxy couldn't connect to the upstream service.`,
type: "proxy_network_error",
code: err.code,
};
case "ECONNRESET":
return {
status: 504,
statusCode: 504,
statusMessage: "Gateway Timeout",
userMessage: `Reverse proxy timed out while waiting for the upstream service to respond.`,
type: "proxy_network_error",
code: err.code,
@@ -1,7 +1,7 @@
import { pipeline } from "stream";
import { promisify } from "util";
import {
buildFakeSse,
makeCompletionSSE,
copySseResponseHeaders,
initializeSseStream,
} from "../../../shared/streaming";
@@ -90,8 +90,18 @@ export const handleStreamedResponse: RawResponseBodyHandler = async (
req.retryCount++;
enqueue(req);
} else {
const errorEvent = buildFakeSse("stream-error", err.message, req);
res.write(`${errorEvent}data: [DONE]\n\n`);
const { message, stack, lastEvent } = err;
const eventText = JSON.stringify(lastEvent, null, 2) ?? "undefined"
const errorEvent = makeCompletionSSE({
format: req.inboundApi,
title: "Proxy stream error",
message: "An unexpected error occurred while streaming the response.",
obj: { message, stack, lastEvent: eventText },
reqId: req.id,
model: req.body?.model,
});
res.write(errorEvent);
res.write(`data: [DONE]\n\n`);
res.end();
}
throw err;
+11 -8
View File
@@ -192,7 +192,7 @@ export const decodeResponseBody: RawResponseBodyHandler = async (
} else {
const errorMessage = `Proxy received response with unsupported content-encoding: ${contentEncoding}`;
req.log.warn({ contentEncoding, key: req.key?.hash }, errorMessage);
writeErrorResponse(req, res, 500, {
writeErrorResponse(req, res, 500, "Internal Server Error", {
error: errorMessage,
contentEncoding,
});
@@ -209,7 +209,9 @@ export const decodeResponseBody: RawResponseBodyHandler = async (
} catch (error: any) {
const errorMessage = `Proxy received response with invalid JSON: ${error.message}`;
req.log.warn({ error: error.stack, key: req.key?.hash }, errorMessage);
writeErrorResponse(req, res, 500, { error: errorMessage });
writeErrorResponse(req, res, 500, "Internal Server Error", {
error: errorMessage,
});
return reject(errorMessage);
}
});
@@ -237,6 +239,7 @@ const handleUpstreamErrors: ProxyResHandlerWithBody = async (
body
) => {
const statusCode = proxyRes.statusCode || 500;
const statusMessage = proxyRes.statusMessage || "Internal Server Error";
if (statusCode < 400) {
return;
@@ -253,16 +256,16 @@ const handleUpstreamErrors: ProxyResHandlerWithBody = async (
} catch (parseError) {
// Likely Bad Gateway or Gateway Timeout from upstream's reverse proxy
const hash = req.key?.hash;
const statusMessage = proxyRes.statusMessage || "Unknown error";
req.log.warn({ statusCode, statusMessage, key: hash }, parseError.message);
const errorObject = {
statusCode,
statusMessage: proxyRes.statusMessage,
error: parseError.message,
proxy_note: `This is likely a temporary error with the upstream service.`,
status: statusCode,
statusMessage,
proxy_note: `Proxy got back an error, but it was not in JSON format. This is likely a temporary problem with the upstream service.`,
};
writeErrorResponse(req, res, statusCode, errorObject);
writeErrorResponse(req, res, statusCode, statusMessage, errorObject);
throw new HttpError(statusCode, parseError.message);
}
@@ -397,7 +400,7 @@ const handleUpstreamErrors: ProxyResHandlerWithBody = async (
);
}
writeErrorResponse(req, res, statusCode, errorPayload);
writeErrorResponse(req, res, statusCode, statusMessage, errorPayload);
throw new HttpError(statusCode, errorPayload.error?.message);
};
@@ -93,6 +93,7 @@ export class SSEMessageTransformer extends Transform {
this.push(transformedMessage);
callback();
} catch (err) {
err.lastEvent = chunk?.toString();
this.log.error(err, "Error transforming SSE message");
callback(err);
}
@@ -7,6 +7,7 @@ import { logger } from "../../../../logger";
import { RetryableError } from "../index";
import { APIFormat } from "../../../../shared/key-management";
import StreamArray from "stream-json/streamers/StreamArray";
import { makeCompletionSSE } from "../../../../shared/streaming";
const log = logger.child({ module: "sse-stream-adapter" });
@@ -29,8 +30,8 @@ type AwsEventStreamMessage = {
export class SSEStreamAdapter extends Transform {
private readonly isAwsStream;
private readonly isGoogleStream;
private parser = new Parser();
private jsonStream = StreamArray.withParser();
private awsParser = new Parser();
private jsonParser = StreamArray.withParser();
private partialMessage = "";
private decoder = new StringDecoder("utf8");
@@ -40,13 +41,14 @@ export class SSEStreamAdapter extends Transform {
options?.contentType === "application/vnd.amazon.eventstream";
this.isGoogleStream = options?.api === "google-ai";
this.parser.on("data", (data: AwsEventStreamMessage) => {
this.awsParser.on("data", (data: AwsEventStreamMessage) => {
const message = this.processAwsEvent(data);
if (message) {
this.push(Buffer.from(message + "\n\n"), "utf8");
}
});
this.jsonStream.on("data", (data: { value: any }) => {
this.jsonParser.on("data", (data: { value: any }) => {
const message = this.processGoogleValue(data.value);
if (message) {
this.push(Buffer.from(message + "\n\n"), "utf8");
@@ -70,11 +72,16 @@ export class SSEStreamAdapter extends Transform {
);
throw new RetryableError("AWS request throttled mid-stream");
} else {
log.error(
{ event: eventStr },
"Received unexpected AWS event stream message"
);
return getFakeErrorCompletion("proxy AWS error", eventStr);
log.error({ event: eventStr }, "Received bad AWS stream event");
return makeCompletionSSE({
format: "anthropic",
title: "Proxy stream error",
message:
"The proxy received malformed or unexpected data from AWS while streaming.",
obj: event,
reqId: "proxy-sse-adapter-message",
model: "",
});
}
} else {
const { bytes } = payload;
@@ -91,26 +98,36 @@ export class SSEStreamAdapter extends Transform {
// Google doesn't use event streams and just sends elements in an array over
// a long-lived HTTP connection. Needs stream-json to parse the array.
protected processGoogleValue(value: any): string | null {
if ("candidates" in value) {
return `data: ${JSON.stringify(value)}`;
} else {
log.error(
{ value },
"Received unexpected Google AI event stream message"
);
return getFakeErrorCompletion(
"proxy Google AI error",
JSON.stringify(value)
);
try {
const candidates = value.candidates ?? [{}];
const hasParts = candidates[0].content?.parts?.length > 0;
if (hasParts) {
return `data: ${JSON.stringify(value)}`;
} else {
log.error({ event: value }, "Received bad Google AI event");
return `data: ${makeCompletionSSE({
format: "google-ai",
title: "Proxy stream error",
message:
"The proxy received malformed or unexpected data from Google AI while streaming.",
obj: value,
reqId: "proxy-sse-adapter-message",
model: "",
})}`;
}
} catch (error) {
error.lastEvent = value;
this.emit("error", error);
return null;
}
}
_transform(chunk: Buffer, _encoding: BufferEncoding, callback: Function) {
try {
if (this.isAwsStream) {
this.parser.write(chunk);
this.awsParser.write(chunk);
} else if (this.isGoogleStream) {
this.jsonStream.write(chunk);
this.jsonParser.write(chunk);
} else {
// We may receive multiple (or partial) SSE messages in a single chunk,
// so we need to buffer and emit separate stream events for full
@@ -131,22 +148,9 @@ export class SSEStreamAdapter extends Transform {
}
callback();
} catch (error) {
error.lastEvent = chunk?.toString();
this.emit("error", error);
callback(error);
}
}
}
function getFakeErrorCompletion(type: string, message: string) {
const content = `\`\`\`\n[${type}: ${message}]\n\`\`\`\n`;
const fakeEvent = JSON.stringify({
log_id: "aws-proxy-sse-message",
stop_reason: type,
completion:
"\nProxy encountered an error during streaming response.\n" + content,
truncated: false,
stop: null,
model: "",
});
return ["event: completion", `data: ${fakeEvent}\n\n`].join("\n");
}
+19 -16
View File
@@ -14,8 +14,12 @@
import crypto from "crypto";
import type { Handler, Request } from "express";
import { keyPool } from "../shared/key-management";
import { getModelFamilyForRequest, MODEL_FAMILIES, ModelFamily } from "../shared/models";
import { buildFakeSse, initializeSseStream } from "../shared/streaming";
import {
getModelFamilyForRequest,
MODEL_FAMILIES,
ModelFamily,
} from "../shared/models";
import { makeCompletionSSE, initializeSseStream } from "../shared/streaming";
import { logger } from "../logger";
import { getUniqueIps, SHARED_IP_ADDRESSES } from "./rate-limit";
import { RequestPreprocessor } from "./middleware/request";
@@ -189,10 +193,10 @@ function processQueue() {
reqs.filter(Boolean).forEach((req) => {
if (req?.proceed) {
const modelFamily = getModelFamilyForRequest(req!);
req.log.info({
retries: req.retryCount,
partition: modelFamily,
}, `Dequeuing request.`);
req.log.info(
{ retries: req.retryCount, partition: modelFamily },
`Dequeuing request.`
);
req.proceed();
}
});
@@ -367,8 +371,15 @@ function killQueuedRequest(req: Request) {
try {
const message = `Your request has been terminated by the proxy because it has been in the queue for more than 5 minutes.`;
if (res.headersSent) {
const fakeErrorEvent = buildFakeSse("proxy queue error", message, req);
res.write(fakeErrorEvent);
const event = makeCompletionSSE({
format: req.inboundApi,
title: "Proxy queue error",
message,
reqId: String(req.id),
model: req.body?.model,
});
res.write(event);
res.write(`data: [DONE]\n\n`);
res.end();
} else {
res.status(500).json({ error: message });
@@ -451,14 +462,6 @@ function removeProxyMiddlewareEventListeners(req: Request) {
export function registerHeartbeat(req: Request) {
const res = req.res!;
const currentSize = getHeartbeatSize();
req.log.debug({
currentSize,
HEARTBEAT_INTERVAL,
PAYLOAD_SCALE_FACTOR,
MAX_HEARTBEAT_SIZE,
}, "Joining queue with heartbeat.");
let isBufferFull = false;
let bufferFullCount = 0;
req.heartbeatInterval = setInterval(() => {