diff --git a/src/proxy/aws.ts b/src/proxy/aws.ts index 0dedaca..5c92aab 100644 --- a/src/proxy/aws.ts +++ b/src/proxy/aws.ts @@ -3,6 +3,7 @@ import { createProxyMiddleware } from "http-proxy-middleware"; import { v4 } from "uuid"; import { config } from "../config"; import { logger } from "../logger"; +import { keyPool } from "../shared/key-management"; import { createQueueMiddleware } from "./queue"; import { ipLimiter } from "./rate-limit"; import { handleProxyError } from "./middleware/common"; @@ -134,6 +135,7 @@ const awsProxy = createQueueMiddleware( on: { proxyReq: createOnProxyReqHandler({ pipeline: [ + (_, req) => keyPool.throttle(req.key!), applyQuotaLimits, // Credentials are added by signAwsRequest preprocessor languageFilter, diff --git a/src/proxy/middleware/request/add-key.ts b/src/proxy/middleware/request/add-key.ts index 507c292..0b4a441 100644 --- a/src/proxy/middleware/request/add-key.ts +++ b/src/proxy/middleware/request/add-key.ts @@ -59,6 +59,7 @@ export const addKey: ProxyRequestMiddleware = (proxyReq, req) => { } } + keyPool.throttle(assignedKey); req.key = assignedKey; req.log.info( { diff --git a/src/proxy/middleware/response/handle-streamed-response.ts b/src/proxy/middleware/response/handle-streamed-response.ts index b07c86b..51175aa 100644 --- a/src/proxy/middleware/response/handle-streamed-response.ts +++ b/src/proxy/middleware/response/handle-streamed-response.ts @@ -4,7 +4,7 @@ import { promisify } from "util"; import { buildFakeSse, copySseResponseHeaders, - initializeSseStream + initializeSseStream, } from "../../../shared/streaming"; import { enqueue } from "../../queue"; import { decodeResponseBody, RawResponseBodyHandler, RetryableError } from "."; @@ -83,7 +83,7 @@ export const handleStreamedResponse: RawResponseBodyHandler = async ( return aggregator.getFinalResponse(); } catch (err) { if (err instanceof RetryableError) { - keyPool.markRateLimited(req.key!) + keyPool.markRateLimited(req.key!); req.log.warn( { key: req.key!.hash, retryCount: req.retryCount }, `Re-enqueueing request due to retryable error during streaming response.` diff --git a/src/shared/key-management/anthropic/provider.ts b/src/shared/key-management/anthropic/provider.ts index 0f5b30a..9b7fab9 100644 --- a/src/shared/key-management/anthropic/provider.ts +++ b/src/shared/key-management/anthropic/provider.ts @@ -153,11 +153,6 @@ export class AnthropicKeyProvider implements KeyProvider { const selectedKey = keysByPriority[0]; selectedKey.lastUsed = now; - selectedKey.rateLimitedAt = now; - // Intended to throttle the queue processor as otherwise it will just - // flood the API with requests and we want to wait a sec to see if we're - // going to get a rate limit error on this key. - selectedKey.rateLimitedUntil = now + KEY_REUSE_DELAY; return { ...selectedKey }; } @@ -226,4 +221,11 @@ export class AnthropicKeyProvider implements KeyProvider { }); this.checker?.scheduleNextCheck(); } + + public throttle(hash: string) { + const key = this.keys.find((k) => k.hash === hash)!; + const now = Date.now(); + key.rateLimitedAt = now; + key.rateLimitedUntil = now + KEY_REUSE_DELAY; + } } diff --git a/src/shared/key-management/aws/provider.ts b/src/shared/key-management/aws/provider.ts index a4136cd..17b97d7 100644 --- a/src/shared/key-management/aws/provider.ts +++ b/src/shared/key-management/aws/provider.ts @@ -37,13 +37,13 @@ export interface AwsBedrockKey extends Key, AwsBedrockKeyUsage { * Upon being rate limited, a key will be locked out for this many milliseconds * while we wait for other concurrent requests to finish. */ -const RATE_LIMIT_LOCKOUT = 1000; +const RATE_LIMIT_LOCKOUT = 4000; /** * Upon assigning a key, we will wait this many milliseconds before allowing it * to be used again. This is to prevent the queue from flooding a key with too * many requests while we wait to learn whether previous ones succeeded. */ -const KEY_REUSE_DELAY = 500; +const KEY_REUSE_DELAY = 250; export class AwsBedrockKeyProvider implements KeyProvider { readonly service = "aws"; @@ -131,11 +131,6 @@ export class AwsBedrockKeyProvider implements KeyProvider { const selectedKey = keysByPriority[0]; selectedKey.lastUsed = now; - selectedKey.rateLimitedAt = now; - // Intended to throttle the queue processor as otherwise it will just - // flood the API with requests and we want to wait a sec to see if we're - // going to get a rate limit error on this key. - selectedKey.rateLimitedUntil = now + KEY_REUSE_DELAY; return { ...selectedKey }; } @@ -199,4 +194,11 @@ export class AwsBedrockKeyProvider implements KeyProvider { this.update(hash, { lastChecked: 0, isDisabled: false }) ); } + + public throttle(hash: string) { + const key = this.keys.find((k) => k.hash === hash)!; + const now = Date.now(); + key.rateLimitedAt = now; + key.rateLimitedUntil = now + KEY_REUSE_DELAY; + } } diff --git a/src/shared/key-management/index.ts b/src/shared/key-management/index.ts index 78459d3..edd1c63 100644 --- a/src/shared/key-management/index.ts +++ b/src/shared/key-management/index.ts @@ -63,6 +63,7 @@ export interface KeyProvider { getLockoutPeriod(model: Model): number; markRateLimited(hash: string): void; recheck(): void; + throttle(hash: string): void; } export const keyPool = new KeyPool(); @@ -80,4 +81,4 @@ export { export { AnthropicKey } from "./anthropic/provider"; export { OpenAIKey } from "./openai/provider"; export { GooglePalmKey } from "./palm/provider"; -export { AwsBedrockKey } from "./aws/provider"; \ No newline at end of file +export { AwsBedrockKey } from "./aws/provider"; diff --git a/src/shared/key-management/key-pool.ts b/src/shared/key-management/key-pool.ts index 07a7b4c..8ba4adf 100644 --- a/src/shared/key-management/key-pool.ts +++ b/src/shared/key-management/key-pool.ts @@ -72,6 +72,11 @@ export class KeyPool { }, 0); } + public throttle(key: Key) { + const provider = this.getKeyProvider(key.service); + provider.throttle(key.hash); + } + public incrementUsage(key: Key, model: string, tokens: number): void { const provider = this.getKeyProvider(key.service); provider.incrementUsage(key.hash, model, tokens); diff --git a/src/shared/key-management/openai/provider.ts b/src/shared/key-management/openai/provider.ts index 364648f..8b230a9 100644 --- a/src/shared/key-management/openai/provider.ts +++ b/src/shared/key-management/openai/provider.ts @@ -221,15 +221,6 @@ export class OpenAIKeyProvider implements KeyProvider { const selectedKey = keysByPriority[0]; selectedKey.lastUsed = now; - - // When a key is selected, we rate-limit it for a brief period of time to - // prevent the queue processor from immediately flooding it with requests - // while the initial request is still being processed (which is when we will - // get new rate limit headers). - // Instead, we will let a request through every second until the key - // becomes fully saturated and locked out again. - selectedKey.rateLimitedAt = now; - selectedKey.rateLimitRequestsReset = KEY_REUSE_DELAY; return { ...selectedKey }; } @@ -383,20 +374,16 @@ export class OpenAIKeyProvider implements KeyProvider { this.checker?.scheduleNextCheck(); } - /** Writes key status to disk. */ - // public writeKeyStatus() { - // const keys = this.keys.map((key) => ({ - // key: key.key, - // isGpt4: key.isGpt4, - // usage: key.usage, - // hardLimit: key.hardLimit, - // isDisabled: key.isDisabled, - // })); - // fs.writeFileSync( - // path.join(__dirname, "..", "keys.json"), - // JSON.stringify(keys, null, 2) - // ); - // } + /** + * Called when a key is selected for a request, briefly disabling it to + * avoid spamming the API with requests while we wait to learn whether this + * key is already rate limited. + */ + public throttle(hash: string) { + const key = this.keys.find((k) => k.hash === hash)!; + key.rateLimitedAt = Date.now(); + key.rateLimitRequestsReset = KEY_REUSE_DELAY; + } } /** diff --git a/src/shared/key-management/palm/provider.ts b/src/shared/key-management/palm/provider.ts index ce8c8a7..0e80e7d 100644 --- a/src/shared/key-management/palm/provider.ts +++ b/src/shared/key-management/palm/provider.ts @@ -122,11 +122,6 @@ export class GooglePalmKeyProvider implements KeyProvider { const selectedKey = keysByPriority[0]; selectedKey.lastUsed = now; - selectedKey.rateLimitedAt = now; - // Intended to throttle the queue processor as otherwise it will just - // flood the API with requests and we want to wait a sec to see if we're - // going to get a rate limit error on this key. - selectedKey.rateLimitedUntil = now + KEY_REUSE_DELAY; return { ...selectedKey }; } @@ -186,4 +181,11 @@ export class GooglePalmKeyProvider implements KeyProvider { } public recheck() {} + + public throttle(hash: string) { + const key = this.keys.find((k) => k.hash === hash)!; + const now = Date.now(); + key.rateLimitedAt = now; + key.rateLimitedUntil = now + KEY_REUSE_DELAY; + } }