improves handle-streamed-response comments/docs [skip-ci]
This commit is contained in:
@@ -15,14 +15,18 @@ import { keyPool } from "../../../shared/key-management";
|
||||
const pipelineAsync = promisify(pipeline);
|
||||
|
||||
/**
|
||||
* Consume the SSE stream and forward events to the client. Once the stream is
|
||||
* stream is closed, resolve with the full response body so that subsequent
|
||||
* middleware can work with it.
|
||||
* `handleStreamedResponse` consumes and transforms a streamed response from the
|
||||
* upstream service, forwarding events to the client in their requested format.
|
||||
* After the entire stream has been consumed, it resolves with the full response
|
||||
* body so that subsequent middleware in the chain can process it as if it were
|
||||
* a non-streaming response.
|
||||
*
|
||||
* Typically we would only need of the raw response handlers to execute, but
|
||||
* in the event a streamed request results in a non-200 response, we need to
|
||||
* fall back to the non-streaming response handler so that the error handler
|
||||
* can inspect the error response.
|
||||
* In the event of an error, the request's streaming flag is unset and the non-
|
||||
* streaming response handler is called instead.
|
||||
*
|
||||
* If the error is retryable, that handler will re-enqueue the request and also
|
||||
* reset the streaming flag. Unfortunately the streaming flag is set and unset
|
||||
* in multiple places, so it's hard to keep track of.
|
||||
*/
|
||||
export const handleStreamedResponse: RawResponseBodyHandler = async (
|
||||
proxyRes,
|
||||
@@ -48,8 +52,8 @@ export const handleStreamedResponse: RawResponseBodyHandler = async (
|
||||
`Starting to proxy SSE stream.`
|
||||
);
|
||||
|
||||
// Users waiting in the queue already have a SSE connection open for the
|
||||
// heartbeat, so we can't always send the stream headers.
|
||||
// Typically, streaming will have already been initialized by the request
|
||||
// queue to send heartbeat pings.
|
||||
if (!res.headersSent) {
|
||||
copySseResponseHeaders(proxyRes, res);
|
||||
initializeSseStream(res);
|
||||
@@ -58,8 +62,11 @@ export const handleStreamedResponse: RawResponseBodyHandler = async (
|
||||
const prefersNativeEvents = req.inboundApi === req.outboundApi;
|
||||
const contentType = proxyRes.headers["content-type"];
|
||||
|
||||
// Adapter turns some arbitrary stream (binary, JSON, etc.) into SSE events.
|
||||
const adapter = new SSEStreamAdapter({ contentType, api: req.outboundApi });
|
||||
// Aggregator compiles all events into a single response object.
|
||||
const aggregator = new EventAggregator({ format: req.outboundApi });
|
||||
// Transformer converts events to the user's requested format.
|
||||
const transformer = new SSEMessageTransformer({
|
||||
inputFormat: req.outboundApi,
|
||||
inputApiVersion: String(req.headers["anthropic-version"]),
|
||||
|
||||
Reference in New Issue
Block a user