diff --git a/src/server/trpc/trpc.fetchers.retrier.ts b/src/server/trpc/trpc.fetchers.retrier.ts index afaccdc56..ad734c441 100644 --- a/src/server/trpc/trpc.fetchers.retrier.ts +++ b/src/server/trpc/trpc.fetchers.retrier.ts @@ -1,4 +1,5 @@ import { TRPCFetcherError } from '~/server/trpc/trpc.router.fetchers'; +import { abortableDelay } from '~/server/wire'; const AIX_DEBUG_SERVER_RETRY = true; @@ -138,24 +139,7 @@ export function createRetryablePromise(operationFn: () => Promise, abortSi console.log(`[fetchers.retrier] 🔄 Retrying attempt ${attemptNumber - 1}/${rp.maxAttempts - 1} after ${delayMs}ms delay`); // abortable wait - await new Promise((resolveDelay) => { - if (abortSignal.aborted || delayMs <= 0) { - resolveDelay(); - return; - } - - const timer = setTimeout(resolveDelay, delayMs); - - const onAbort = () => { - clearTimeout(timer); - resolveDelay(); - }; - - abortSignal.addEventListener('abort', onAbort, { once: true }); - }); - - // check if the wait was aborted - if (abortSignal.aborted) { + if (await abortableDelay(delayMs, abortSignal)) { reject(error); return; } diff --git a/src/server/wire.ts b/src/server/wire.ts index e4686d2bf..50b629650 100644 --- a/src/server/wire.ts +++ b/src/server/wire.ts @@ -134,6 +134,30 @@ export function createEmptyReadableStream(): ReadableStream { } +/** + * Used in retry logic to wait between attempts while respecting abort signals. + * @returns True if aborted, false if completed normally + */ +export function abortableDelay(delayMs: number, abortSignal: AbortSignal): Promise { + return new Promise((resolve) => { + // pre-check: already aborted or invalid delay + if (abortSignal.aborted || delayMs <= 0) { + resolve(abortSignal.aborted); + return; + } + + const timer = setTimeout(() => resolve(false), delayMs); + + const onAbort = () => { + clearTimeout(timer); + resolve(true); + }; + + abortSignal.addEventListener('abort', onAbort, { once: true }); + }); +} + + /** * Small debugging utility to log train of events, used on the server-side * for incoming packets (e.g. SSE).