From 8d2b9cac865884ac6f790d02f3b560bb42577975 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 3 Mar 2026 11:54:57 +0000 Subject: [PATCH 1/7] fix(batch): move batch queue global rate limiter to worker consumer level The global rate limiter was being applied at the FairQueue claim phase, consuming 1 token per queue-claim-attempt rather than per item processed. With many small queues (each batch is its own queue), consumers burned through tokens on empty or single-item queues, causing aggressive throttling well below the intended items/sec limit. Changes: - Move rate limiter from FairQueue claim phase to BatchQueue worker queue consumer loop (before blockingPop), so each token = 1 item processed - Replace the FairQueue rate limiter with a worker queue depth cap to prevent unbounded growth that could cause visibility timeouts - Add BATCH_QUEUE_WORKER_QUEUE_MAX_DEPTH env var (optional, disabled by default) --- .server-changes/batch-rate-limiter-fix.md | 6 + apps/webapp/app/env.server.ts | 3 + apps/webapp/app/v3/runEngine.server.ts | 2 + .../run-engine/src/batch-queue/index.ts | 19 ++- .../src/batch-queue/tests/index.test.ts | 118 ++++++++++++++++++ .../run-engine/src/batch-queue/types.ts | 7 ++ .../run-engine/src/engine/index.ts | 1 + .../run-engine/src/engine/types.ts | 2 + packages/redis-worker/src/fair-queue/index.ts | 27 ++-- .../src/fair-queue/tests/fairQueue.test.ts | 72 +++++++++++ packages/redis-worker/src/fair-queue/types.ts | 20 ++- 11 files changed, 258 insertions(+), 19 deletions(-) create mode 100644 .server-changes/batch-rate-limiter-fix.md diff --git a/.server-changes/batch-rate-limiter-fix.md b/.server-changes/batch-rate-limiter-fix.md new file mode 100644 index 00000000000..4982839c3a3 --- /dev/null +++ b/.server-changes/batch-rate-limiter-fix.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Move batch queue global rate limiter from FairQueue claim phase to BatchQueue worker queue consumer for accurate per-item rate limiting. Add worker queue depth cap to prevent unbounded growth that could cause visibility timeouts. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 7868a38ac56..e5b221164b1 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -993,6 +993,9 @@ const EnvironmentSchema = z // Global rate limit: max items processed per second across all consumers // If not set, no global rate limiting is applied BATCH_QUEUE_GLOBAL_RATE_LIMIT: z.coerce.number().int().positive().optional(), + // Max items in the worker queue before claiming pauses (protects visibility timeouts) + // If not set, no depth limit is applied + BATCH_QUEUE_WORKER_QUEUE_MAX_DEPTH: z.coerce.number().int().positive().optional(), ADMIN_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"), ADMIN_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2), diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index 037f7c6dced..b0e3673eef7 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -198,6 +198,8 @@ function createRunEngine() { globalRateLimiter: env.BATCH_QUEUE_GLOBAL_RATE_LIMIT ? createBatchGlobalRateLimiter(env.BATCH_QUEUE_GLOBAL_RATE_LIMIT) : undefined, + // Worker queue depth cap - prevents unbounded growth protecting visibility timeouts + workerQueueMaxDepth: env.BATCH_QUEUE_WORKER_QUEUE_MAX_DEPTH, retry: { maxAttempts: 6, minTimeoutInMs: 1_000, diff --git a/internal-packages/run-engine/src/batch-queue/index.ts b/internal-packages/run-engine/src/batch-queue/index.ts index 312bf4772f7..a09865f6110 100644 --- a/internal-packages/run-engine/src/batch-queue/index.ts +++ b/internal-packages/run-engine/src/batch-queue/index.ts @@ -18,6 +18,7 @@ import { isAbortError, WorkerQueueManager, type FairQueueOptions, + type GlobalRateLimiter, } from "@trigger.dev/redis-worker"; import { BatchCompletionTracker } from "./completionTracker.js"; import type { @@ -76,6 +77,7 @@ export class BatchQueue { private abortController: AbortController; private workerQueueConsumerLoops: Promise[] = []; private workerQueueBlockingTimeoutSeconds: number; + private globalRateLimiter?: GlobalRateLimiter; private batchedSpanManager: BatchedSpanManager; // Metrics @@ -95,6 +97,7 @@ export class BatchQueue { this.maxAttempts = options.retry?.maxAttempts ?? 1; this.abortController = new AbortController(); this.workerQueueBlockingTimeoutSeconds = options.workerQueueBlockingTimeoutSeconds ?? 10; + this.globalRateLimiter = options.globalRateLimiter; // Initialize metrics if meter is provided if (options.meter) { @@ -174,8 +177,9 @@ export class BatchQueue { }, }, ], - // Optional global rate limiter to limit max items/sec across all consumers - globalRateLimiter: options.globalRateLimiter, + // Worker queue depth cap to prevent unbounded growth (protects visibility timeouts) + workerQueueMaxDepth: options.workerQueueMaxDepth, + workerQueueDepthCheckId: BATCH_WORKER_QUEUE_ID, // Enable retry with DLQ disabled when retry config is provided. // BatchQueue handles the "final failure" in its own processing loop, // so we don't need the DLQ - we just need the retry scheduling. @@ -641,6 +645,17 @@ export class BatchQueue { } try { + // Rate limit per-item at the processing level (1 token per message) + if (this.globalRateLimiter) { + const result = await this.globalRateLimiter.limit(); + if (!result.allowed && result.resetAt) { + const waitMs = Math.max(0, result.resetAt - Date.now()); + if (waitMs > 0) { + await new Promise((resolve) => setTimeout(resolve, waitMs)); + } + } + } + await this.batchedSpanManager.withBatchedSpan( loopId, async (span) => { diff --git a/internal-packages/run-engine/src/batch-queue/tests/index.test.ts b/internal-packages/run-engine/src/batch-queue/tests/index.test.ts index 98c7b0401f6..9861cc785e5 100644 --- a/internal-packages/run-engine/src/batch-queue/tests/index.test.ts +++ b/internal-packages/run-engine/src/batch-queue/tests/index.test.ts @@ -1,6 +1,7 @@ import { redisTest } from "@internal/testcontainers"; import { describe, expect, vi } from "vitest"; import { BatchQueue } from "../index.js"; +import type { GlobalRateLimiter } from "@trigger.dev/redis-worker"; import type { CompleteBatchResult, InitializeBatchOptions, BatchItem } from "../types.js"; vi.setConfig({ testTimeout: 60_000 }); @@ -658,4 +659,121 @@ describe("BatchQueue", () => { } ); }); + + describe("global rate limiter at worker queue consumer level", () => { + redisTest( + "should call rate limiter once per item processed", + async ({ redisContainer }) => { + let limitCallCount = 0; + const rateLimiter: GlobalRateLimiter = { + async limit() { + limitCallCount++; + return { allowed: true }; + }, + }; + + const queue = new BatchQueue({ + redis: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + keyPrefix: "test:", + }, + drr: { quantum: 5, maxDeficit: 50 }, + consumerCount: 1, + consumerIntervalMs: 50, + startConsumers: true, + globalRateLimiter: rateLimiter, + }); + + let completionResult: CompleteBatchResult | null = null; + + try { + queue.onProcessItem(async ({ itemIndex }) => { + return { success: true, runId: `run_${itemIndex}` }; + }); + + queue.onBatchComplete(async (result) => { + completionResult = result; + }); + + const itemCount = 5; + await queue.initializeBatch(createInitOptions("batch1", "env1", itemCount)); + await enqueueItems(queue, "batch1", "env1", createBatchItems(itemCount)); + + await vi.waitFor( + () => { + expect(completionResult).not.toBeNull(); + }, + { timeout: 10000 } + ); + + expect(completionResult!.successfulRunCount).toBe(itemCount); + // Rate limiter should be called at least once per item processed + // (may be called more due to consumer loop iterations with empty pops) + expect(limitCallCount).toBeGreaterThanOrEqual(itemCount); + } finally { + await queue.close(); + } + } + ); + + redisTest( + "should delay processing when rate limited", + async ({ redisContainer }) => { + let limitCallCount = 0; + const rateLimiter: GlobalRateLimiter = { + async limit() { + limitCallCount++; + // Rate limit the first 3 calls, then allow + if (limitCallCount <= 3) { + return { allowed: false, resetAt: Date.now() + 100 }; + } + return { allowed: true }; + }, + }; + + const queue = new BatchQueue({ + redis: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + keyPrefix: "test:", + }, + drr: { quantum: 5, maxDeficit: 50 }, + consumerCount: 1, + consumerIntervalMs: 50, + startConsumers: true, + globalRateLimiter: rateLimiter, + }); + + let completionResult: CompleteBatchResult | null = null; + + try { + queue.onProcessItem(async ({ itemIndex }) => { + return { success: true, runId: `run_${itemIndex}` }; + }); + + queue.onBatchComplete(async (result) => { + completionResult = result; + }); + + await queue.initializeBatch(createInitOptions("batch1", "env1", 3)); + await enqueueItems(queue, "batch1", "env1", createBatchItems(3)); + + // Should still complete despite initial rate limiting + await vi.waitFor( + () => { + expect(completionResult).not.toBeNull(); + }, + { timeout: 10000 } + ); + + expect(completionResult!.successfulRunCount).toBe(3); + // Rate limiter was called more times than items due to initial rejections + expect(limitCallCount).toBeGreaterThan(3); + } finally { + await queue.close(); + } + } + ); + }); }); diff --git a/internal-packages/run-engine/src/batch-queue/types.ts b/internal-packages/run-engine/src/batch-queue/types.ts index f472ff72bb5..aa02d84717a 100644 --- a/internal-packages/run-engine/src/batch-queue/types.ts +++ b/internal-packages/run-engine/src/batch-queue/types.ts @@ -213,8 +213,15 @@ export type BatchQueueOptions = { /** * Optional global rate limiter to limit processing across all consumers. * When configured, limits the max items/second processed globally. + * Rate limiting happens at the worker queue consumer level (1 token per item). */ globalRateLimiter?: GlobalRateLimiter; + /** + * Maximum number of items allowed in the worker queue before claiming pauses. + * Prevents unbounded worker queue growth which could cause visibility timeouts. + * Disabled by default (undefined = no limit). + */ + workerQueueMaxDepth?: number; /** Logger instance */ logger?: Logger; logLevel?: LogLevel; diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 846252398ed..9f6d19a9f42 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -398,6 +398,7 @@ export class RunEngine { consumerIntervalMs: options.batchQueue?.consumerIntervalMs ?? 100, defaultConcurrency: options.batchQueue?.defaultConcurrency ?? 10, globalRateLimiter: options.batchQueue?.globalRateLimiter, + workerQueueMaxDepth: options.batchQueue?.workerQueueMaxDepth, startConsumers: startBatchQueueConsumers, retry: options.batchQueue?.retry, tracer: options.tracer, diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index d0b12320f4f..9af808754ce 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -109,6 +109,8 @@ export type RunEngineOptions = { defaultConcurrency?: number; /** Optional global rate limiter to limit processing across all consumers */ globalRateLimiter?: GlobalRateLimiter; + /** Maximum worker queue depth before claiming pauses (protects visibility timeouts) */ + workerQueueMaxDepth?: number; /** Retry configuration for failed batch items */ retry?: { /** Maximum number of attempts (including the first). Default: 1 (no retries) */ diff --git a/packages/redis-worker/src/fair-queue/index.ts b/packages/redis-worker/src/fair-queue/index.ts index cf9d7d61977..0050095cc50 100644 --- a/packages/redis-worker/src/fair-queue/index.ts +++ b/packages/redis-worker/src/fair-queue/index.ts @@ -24,7 +24,6 @@ import type { FairQueueKeyProducer, FairQueueOptions, FairScheduler, - GlobalRateLimiter, QueueCooloffState, QueueDescriptor, SchedulerContext, @@ -97,8 +96,9 @@ export class FairQueue { private maxCooloffStatesSize: number; private queueCooloffStates = new Map(); - // Global rate limiter - private globalRateLimiter?: GlobalRateLimiter; + // Worker queue backpressure + private workerQueueMaxDepth: number; + private workerQueueDepthCheckId?: string; // Consumer tracing private consumerTraceMaxIterations: number; @@ -152,8 +152,9 @@ export class FairQueue { this.cooloffPeriodMs = options.cooloff?.periodMs ?? 10_000; this.maxCooloffStatesSize = options.cooloff?.maxStatesSize ?? 1000; - // Global rate limiter - this.globalRateLimiter = options.globalRateLimiter; + // Worker queue backpressure + this.workerQueueMaxDepth = options.workerQueueMaxDepth ?? 0; + this.workerQueueDepthCheckId = options.workerQueueDepthCheckId; // Consumer tracing this.consumerTraceMaxIterations = options.consumerTraceMaxIterations ?? 500; @@ -1110,15 +1111,13 @@ export class FairQueue { maxClaimCount = Math.min(maxClaimCount, availableCapacity); } - // Check global rate limit - wait if rate limited - if (this.globalRateLimiter) { - const result = await this.globalRateLimiter.limit(); - if (!result.allowed && result.resetAt) { - const waitMs = Math.max(0, result.resetAt - Date.now()); - if (waitMs > 0) { - this.logger.debug("Global rate limit reached, waiting", { waitMs, loopId }); - await new Promise((resolve) => setTimeout(resolve, waitMs)); - } + // Check worker queue depth to prevent unbounded growth. + // Messages in the worker queue are already in-flight with a visibility timeout. + // If the queue is too deep, consumers can't keep up, and messages risk timing out. + if (this.workerQueueMaxDepth > 0 && this.workerQueueDepthCheckId) { + const depth = await this.workerQueueManager.getLength(this.workerQueueDepthCheckId); + if (depth >= this.workerQueueMaxDepth) { + return 0; } } diff --git a/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts b/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts index 345d97c04d4..e5dc8d5e2b4 100644 --- a/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts +++ b/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts @@ -1282,4 +1282,76 @@ describe("FairQueue", () => { ); }); + describe("worker queue depth cap", () => { + redisTest( + "should respect worker queue max depth and resume after draining", + { timeout: 30000 }, + async ({ redisOptions }) => { + const processed: string[] = []; + keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + + const scheduler = new DRRScheduler({ + redis: redisOptions, + keys, + quantum: 100, + maxDeficit: 1000, + }); + + const workerQueueManager = new WorkerQueueManager({ + redis: redisOptions, + keys, + }); + + // Create FairQueue with a small depth cap + const maxDepth = 3; + const queue = new TestFairQueueHelper(redisOptions, keys, { + scheduler, + payloadSchema: TestPayloadSchema, + shardCount: 1, + consumerCount: 1, + consumerIntervalMs: 50, + visibilityTimeoutMs: 30000, + workerQueueMaxDepth: maxDepth, + workerQueueDepthCheckId: TEST_WORKER_QUEUE_ID, + startConsumers: false, + }); + + // Use a slow handler to let the worker queue build up + queue.onMessage(async (ctx) => { + await new Promise((resolve) => setTimeout(resolve, 200)); + processed.push(ctx.message.payload.value); + await ctx.complete(); + }); + + // Enqueue 10 messages + const totalMessages = 10; + for (let i = 0; i < totalMessages; i++) { + await queue.enqueue({ + queueId: "tenant:t1:queue:q1", + tenantId: "t1", + payload: { value: `msg-${i}` }, + }); + } + + // Start processing + queue.start(); + + // Verify all messages eventually get processed (depth cap doesn't permanently block) + await vi.waitFor( + () => { + expect(processed.length).toBe(totalMessages); + }, + { timeout: 25000 } + ); + + // Verify the worker queue is drained + const finalDepth = await workerQueueManager.getLength(TEST_WORKER_QUEUE_ID); + expect(finalDepth).toBe(0); + + await workerQueueManager.close(); + await queue.close(); + } + ); + }); + }); diff --git a/packages/redis-worker/src/fair-queue/types.ts b/packages/redis-worker/src/fair-queue/types.ts index d10cad1d0d4..07b58be0e5c 100644 --- a/packages/redis-worker/src/fair-queue/types.ts +++ b/packages/redis-worker/src/fair-queue/types.ts @@ -446,9 +446,23 @@ export interface FairQueueOptions 0 and the system uses a single shared worker queue. + * If not set, depth checking is disabled even if workerQueueMaxDepth is set. + */ + workerQueueDepthCheckId?: string; } // ============================================================================ From 8e142be35e9cb715bdec3507c1376fdc23dc19a3 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 3 Mar 2026 13:03:20 +0000 Subject: [PATCH 2/7] fix rate limiter loop --- .../run-engine/src/batch-queue/index.ts | 29 +++++++++++++++---- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/internal-packages/run-engine/src/batch-queue/index.ts b/internal-packages/run-engine/src/batch-queue/index.ts index a09865f6110..c889c2852ca 100644 --- a/internal-packages/run-engine/src/batch-queue/index.ts +++ b/internal-packages/run-engine/src/batch-queue/index.ts @@ -645,15 +645,34 @@ export class BatchQueue { } try { - // Rate limit per-item at the processing level (1 token per message) + // Rate limit per-item at the processing level (1 token per message). + // Loop until allowed so multiple consumers don't all rush through after one sleep. if (this.globalRateLimiter) { - const result = await this.globalRateLimiter.limit(); - if (!result.allowed && result.resetAt) { - const waitMs = Math.max(0, result.resetAt - Date.now()); + while (this.isRunning) { + const result = await this.globalRateLimiter.limit(); + if (result.allowed) { + break; + } + const waitMs = Math.max(0, (result.resetAt ?? Date.now()) - Date.now()); if (waitMs > 0) { - await new Promise((resolve) => setTimeout(resolve, waitMs)); + await new Promise((resolve, reject) => { + const timer = setTimeout(resolve, waitMs); + const onAbort = () => { + clearTimeout(timer); + reject(this.abortController.signal.reason); + }; + if (this.abortController.signal.aborted) { + clearTimeout(timer); + reject(this.abortController.signal.reason); + return; + } + this.abortController.signal.addEventListener("abort", onAbort, { once: true }); + }); } } + if (!this.isRunning) { + break; + } } await this.batchedSpanManager.withBatchedSpan( From acece57c69dec513582d3a26461b6475626016d3 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 3 Mar 2026 13:06:33 +0000 Subject: [PATCH 3/7] clarify test assertions --- .../run-engine/src/batch-queue/tests/index.test.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal-packages/run-engine/src/batch-queue/tests/index.test.ts b/internal-packages/run-engine/src/batch-queue/tests/index.test.ts index 9861cc785e5..a4d4e338089 100644 --- a/internal-packages/run-engine/src/batch-queue/tests/index.test.ts +++ b/internal-packages/run-engine/src/batch-queue/tests/index.test.ts @@ -662,7 +662,7 @@ describe("BatchQueue", () => { describe("global rate limiter at worker queue consumer level", () => { redisTest( - "should call rate limiter once per item processed", + "should call rate limiter before each processing attempt", async ({ redisContainer }) => { let limitCallCount = 0; const rateLimiter: GlobalRateLimiter = { @@ -708,8 +708,8 @@ describe("BatchQueue", () => { ); expect(completionResult!.successfulRunCount).toBe(itemCount); - // Rate limiter should be called at least once per item processed - // (may be called more due to consumer loop iterations with empty pops) + // Rate limiter is called before each blockingPop, including iterations + // where no message is available, so count >= items processed expect(limitCallCount).toBeGreaterThanOrEqual(itemCount); } finally { await queue.close(); From 22fdaff19e810b6600c6ad02b34b7f7c0ccfcbdd Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 3 Mar 2026 13:21:28 +0000 Subject: [PATCH 4/7] fix worker queue depth enforcement --- packages/redis-worker/src/fair-queue/index.ts | 3 +++ .../src/fair-queue/tests/fairQueue.test.ts | 20 ++++++++++++++++++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/packages/redis-worker/src/fair-queue/index.ts b/packages/redis-worker/src/fair-queue/index.ts index 0050095cc50..0c0b7921a6b 100644 --- a/packages/redis-worker/src/fair-queue/index.ts +++ b/packages/redis-worker/src/fair-queue/index.ts @@ -1119,6 +1119,9 @@ export class FairQueue { if (depth >= this.workerQueueMaxDepth) { return 0; } + // Cap claim size to remaining capacity so we don't overshoot the depth limit + const remainingCapacity = this.workerQueueMaxDepth - depth; + maxClaimCount = Math.min(maxClaimCount, remainingCapacity); } // Claim batch of messages with visibility timeout diff --git a/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts b/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts index e5dc8d5e2b4..d827da15907 100644 --- a/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts +++ b/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts @@ -1333,7 +1333,19 @@ describe("FairQueue", () => { }); } - // Start processing + // Start processing and track peak worker queue depth + let peakDepth = 0; + let polling = true; + const depthPoller = (async () => { + while (polling) { + const depth = await workerQueueManager.getLength(TEST_WORKER_QUEUE_ID); + if (depth > peakDepth) { + peakDepth = depth; + } + await new Promise((resolve) => setTimeout(resolve, 25)); + } + })(); + queue.start(); // Verify all messages eventually get processed (depth cap doesn't permanently block) @@ -1344,6 +1356,12 @@ describe("FairQueue", () => { { timeout: 25000 } ); + polling = false; + await depthPoller; + + // Verify the depth cap was respected during processing + expect(peakDepth).toBeLessThanOrEqual(maxDepth); + // Verify the worker queue is drained const finalDepth = await workerQueueManager.getLength(TEST_WORKER_QUEUE_ID); expect(finalDepth).toBe(0); From 3a3227d007440ffcf6d350ada256330d6d04dc76 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 3 Mar 2026 13:35:04 +0000 Subject: [PATCH 5/7] cleanup abort signal listeners when the timeout fires --- internal-packages/run-engine/src/batch-queue/index.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/internal-packages/run-engine/src/batch-queue/index.ts b/internal-packages/run-engine/src/batch-queue/index.ts index c889c2852ca..bda3f96539a 100644 --- a/internal-packages/run-engine/src/batch-queue/index.ts +++ b/internal-packages/run-engine/src/batch-queue/index.ts @@ -656,11 +656,16 @@ export class BatchQueue { const waitMs = Math.max(0, (result.resetAt ?? Date.now()) - Date.now()); if (waitMs > 0) { await new Promise((resolve, reject) => { - const timer = setTimeout(resolve, waitMs); const onAbort = () => { clearTimeout(timer); reject(this.abortController.signal.reason); }; + const timer = setTimeout(() => { + // Must remove listener when timeout fires, otherwise listeners accumulate + // (the { once: true } option only removes on abort, not on timeout) + this.abortController.signal.removeEventListener("abort", onAbort); + resolve(); + }, waitMs); if (this.abortController.signal.aborted) { clearTimeout(timer); reject(this.abortController.signal.reason); From cf67b8aaeb53bd605e5fe682683740d1796cef79 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 3 Mar 2026 14:05:29 +0000 Subject: [PATCH 6/7] Add batch_queue.rate_limit_denied counter metric --- internal-packages/run-engine/src/batch-queue/index.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/internal-packages/run-engine/src/batch-queue/index.ts b/internal-packages/run-engine/src/batch-queue/index.ts index bda3f96539a..e7155e580d9 100644 --- a/internal-packages/run-engine/src/batch-queue/index.ts +++ b/internal-packages/run-engine/src/batch-queue/index.ts @@ -89,6 +89,7 @@ export class BatchQueue { private batchProcessingDurationHistogram?: Histogram; private itemQueueTimeHistogram?: Histogram; private workerQueueLengthGauge?: ObservableGauge; + private rateLimitDeniedCounter?: Counter; constructor(private options: BatchQueueOptions) { this.logger = options.logger ?? new Logger("BatchQueue", options.logLevel ?? "info"); @@ -612,6 +613,11 @@ export class BatchQueue { unit: "ms", }); + this.rateLimitDeniedCounter = meter.createCounter("batch_queue.rate_limit_denied", { + description: "Number of times the global rate limiter denied processing", + unit: "denials", + }); + this.workerQueueLengthGauge = meter.createObservableGauge("batch_queue.worker_queue.length", { description: "Number of items waiting in the batch worker queue", unit: "items", @@ -653,6 +659,7 @@ export class BatchQueue { if (result.allowed) { break; } + this.rateLimitDeniedCounter?.add(1); const waitMs = Math.max(0, (result.resetAt ?? Date.now()) - Date.now()); if (waitMs > 0) { await new Promise((resolve, reject) => { From a39c602eaa1e46c08fb53410013329195b2f00e4 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 3 Mar 2026 14:25:57 +0000 Subject: [PATCH 7/7] always wait at least 10ms to prevent event loop blocking --- internal-packages/run-engine/src/batch-queue/index.ts | 2 +- packages/redis-worker/src/fair-queue/types.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal-packages/run-engine/src/batch-queue/index.ts b/internal-packages/run-engine/src/batch-queue/index.ts index e7155e580d9..22401f76b6e 100644 --- a/internal-packages/run-engine/src/batch-queue/index.ts +++ b/internal-packages/run-engine/src/batch-queue/index.ts @@ -660,7 +660,7 @@ export class BatchQueue { break; } this.rateLimitDeniedCounter?.add(1); - const waitMs = Math.max(0, (result.resetAt ?? Date.now()) - Date.now()); + const waitMs = Math.max(10, (result.resetAt ?? Date.now()) - Date.now()); if (waitMs > 0) { await new Promise((resolve, reject) => { const onAbort = () => { diff --git a/packages/redis-worker/src/fair-queue/types.ts b/packages/redis-worker/src/fair-queue/types.ts index 07b58be0e5c..3bc56b599fa 100644 --- a/packages/redis-worker/src/fair-queue/types.ts +++ b/packages/redis-worker/src/fair-queue/types.ts @@ -453,7 +453,7 @@ export interface FairQueueOptions