diff --git a/.server-changes/batch-rate-limiter-fix.md b/.server-changes/batch-rate-limiter-fix.md new file mode 100644 index 00000000000..4982839c3a3 --- /dev/null +++ b/.server-changes/batch-rate-limiter-fix.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Move batch queue global rate limiter from FairQueue claim phase to BatchQueue worker queue consumer for accurate per-item rate limiting. Add worker queue depth cap to prevent unbounded growth that could cause visibility timeouts. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 7868a38ac56..e5b221164b1 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -993,6 +993,9 @@ const EnvironmentSchema = z // Global rate limit: max items processed per second across all consumers // If not set, no global rate limiting is applied BATCH_QUEUE_GLOBAL_RATE_LIMIT: z.coerce.number().int().positive().optional(), + // Max items in the worker queue before claiming pauses (protects visibility timeouts) + // If not set, no depth limit is applied + BATCH_QUEUE_WORKER_QUEUE_MAX_DEPTH: z.coerce.number().int().positive().optional(), ADMIN_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"), ADMIN_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2), diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index 037f7c6dced..b0e3673eef7 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -198,6 +198,8 @@ function createRunEngine() { globalRateLimiter: env.BATCH_QUEUE_GLOBAL_RATE_LIMIT ? createBatchGlobalRateLimiter(env.BATCH_QUEUE_GLOBAL_RATE_LIMIT) : undefined, + // Worker queue depth cap - prevents unbounded growth protecting visibility timeouts + workerQueueMaxDepth: env.BATCH_QUEUE_WORKER_QUEUE_MAX_DEPTH, retry: { maxAttempts: 6, minTimeoutInMs: 1_000, diff --git a/internal-packages/run-engine/src/batch-queue/index.ts b/internal-packages/run-engine/src/batch-queue/index.ts index 312bf4772f7..22401f76b6e 100644 --- a/internal-packages/run-engine/src/batch-queue/index.ts +++ b/internal-packages/run-engine/src/batch-queue/index.ts @@ -18,6 +18,7 @@ import { isAbortError, WorkerQueueManager, type FairQueueOptions, + type GlobalRateLimiter, } from "@trigger.dev/redis-worker"; import { BatchCompletionTracker } from "./completionTracker.js"; import type { @@ -76,6 +77,7 @@ export class BatchQueue { private abortController: AbortController; private workerQueueConsumerLoops: Promise[] = []; private workerQueueBlockingTimeoutSeconds: number; + private globalRateLimiter?: GlobalRateLimiter; private batchedSpanManager: BatchedSpanManager; // Metrics @@ -87,6 +89,7 @@ export class BatchQueue { private batchProcessingDurationHistogram?: Histogram; private itemQueueTimeHistogram?: Histogram; private workerQueueLengthGauge?: ObservableGauge; + private rateLimitDeniedCounter?: Counter; constructor(private options: BatchQueueOptions) { this.logger = options.logger ?? new Logger("BatchQueue", options.logLevel ?? "info"); @@ -95,6 +98,7 @@ export class BatchQueue { this.maxAttempts = options.retry?.maxAttempts ?? 1; this.abortController = new AbortController(); this.workerQueueBlockingTimeoutSeconds = options.workerQueueBlockingTimeoutSeconds ?? 10; + this.globalRateLimiter = options.globalRateLimiter; // Initialize metrics if meter is provided if (options.meter) { @@ -174,8 +178,9 @@ export class BatchQueue { }, }, ], - // Optional global rate limiter to limit max items/sec across all consumers - globalRateLimiter: options.globalRateLimiter, + // Worker queue depth cap to prevent unbounded growth (protects visibility timeouts) + workerQueueMaxDepth: options.workerQueueMaxDepth, + workerQueueDepthCheckId: BATCH_WORKER_QUEUE_ID, // Enable retry with DLQ disabled when retry config is provided. // BatchQueue handles the "final failure" in its own processing loop, // so we don't need the DLQ - we just need the retry scheduling. @@ -608,6 +613,11 @@ export class BatchQueue { unit: "ms", }); + this.rateLimitDeniedCounter = meter.createCounter("batch_queue.rate_limit_denied", { + description: "Number of times the global rate limiter denied processing", + unit: "denials", + }); + this.workerQueueLengthGauge = meter.createObservableGauge("batch_queue.worker_queue.length", { description: "Number of items waiting in the batch worker queue", unit: "items", @@ -641,6 +651,42 @@ export class BatchQueue { } try { + // Rate limit per-item at the processing level (1 token per message). + // Loop until allowed so multiple consumers don't all rush through after one sleep. + if (this.globalRateLimiter) { + while (this.isRunning) { + const result = await this.globalRateLimiter.limit(); + if (result.allowed) { + break; + } + this.rateLimitDeniedCounter?.add(1); + const waitMs = Math.max(10, (result.resetAt ?? Date.now()) - Date.now()); + if (waitMs > 0) { + await new Promise((resolve, reject) => { + const onAbort = () => { + clearTimeout(timer); + reject(this.abortController.signal.reason); + }; + const timer = setTimeout(() => { + // Must remove listener when timeout fires, otherwise listeners accumulate + // (the { once: true } option only removes on abort, not on timeout) + this.abortController.signal.removeEventListener("abort", onAbort); + resolve(); + }, waitMs); + if (this.abortController.signal.aborted) { + clearTimeout(timer); + reject(this.abortController.signal.reason); + return; + } + this.abortController.signal.addEventListener("abort", onAbort, { once: true }); + }); + } + } + if (!this.isRunning) { + break; + } + } + await this.batchedSpanManager.withBatchedSpan( loopId, async (span) => { diff --git a/internal-packages/run-engine/src/batch-queue/tests/index.test.ts b/internal-packages/run-engine/src/batch-queue/tests/index.test.ts index 98c7b0401f6..a4d4e338089 100644 --- a/internal-packages/run-engine/src/batch-queue/tests/index.test.ts +++ b/internal-packages/run-engine/src/batch-queue/tests/index.test.ts @@ -1,6 +1,7 @@ import { redisTest } from "@internal/testcontainers"; import { describe, expect, vi } from "vitest"; import { BatchQueue } from "../index.js"; +import type { GlobalRateLimiter } from "@trigger.dev/redis-worker"; import type { CompleteBatchResult, InitializeBatchOptions, BatchItem } from "../types.js"; vi.setConfig({ testTimeout: 60_000 }); @@ -658,4 +659,121 @@ describe("BatchQueue", () => { } ); }); + + describe("global rate limiter at worker queue consumer level", () => { + redisTest( + "should call rate limiter before each processing attempt", + async ({ redisContainer }) => { + let limitCallCount = 0; + const rateLimiter: GlobalRateLimiter = { + async limit() { + limitCallCount++; + return { allowed: true }; + }, + }; + + const queue = new BatchQueue({ + redis: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + keyPrefix: "test:", + }, + drr: { quantum: 5, maxDeficit: 50 }, + consumerCount: 1, + consumerIntervalMs: 50, + startConsumers: true, + globalRateLimiter: rateLimiter, + }); + + let completionResult: CompleteBatchResult | null = null; + + try { + queue.onProcessItem(async ({ itemIndex }) => { + return { success: true, runId: `run_${itemIndex}` }; + }); + + queue.onBatchComplete(async (result) => { + completionResult = result; + }); + + const itemCount = 5; + await queue.initializeBatch(createInitOptions("batch1", "env1", itemCount)); + await enqueueItems(queue, "batch1", "env1", createBatchItems(itemCount)); + + await vi.waitFor( + () => { + expect(completionResult).not.toBeNull(); + }, + { timeout: 10000 } + ); + + expect(completionResult!.successfulRunCount).toBe(itemCount); + // Rate limiter is called before each blockingPop, including iterations + // where no message is available, so count >= items processed + expect(limitCallCount).toBeGreaterThanOrEqual(itemCount); + } finally { + await queue.close(); + } + } + ); + + redisTest( + "should delay processing when rate limited", + async ({ redisContainer }) => { + let limitCallCount = 0; + const rateLimiter: GlobalRateLimiter = { + async limit() { + limitCallCount++; + // Rate limit the first 3 calls, then allow + if (limitCallCount <= 3) { + return { allowed: false, resetAt: Date.now() + 100 }; + } + return { allowed: true }; + }, + }; + + const queue = new BatchQueue({ + redis: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + keyPrefix: "test:", + }, + drr: { quantum: 5, maxDeficit: 50 }, + consumerCount: 1, + consumerIntervalMs: 50, + startConsumers: true, + globalRateLimiter: rateLimiter, + }); + + let completionResult: CompleteBatchResult | null = null; + + try { + queue.onProcessItem(async ({ itemIndex }) => { + return { success: true, runId: `run_${itemIndex}` }; + }); + + queue.onBatchComplete(async (result) => { + completionResult = result; + }); + + await queue.initializeBatch(createInitOptions("batch1", "env1", 3)); + await enqueueItems(queue, "batch1", "env1", createBatchItems(3)); + + // Should still complete despite initial rate limiting + await vi.waitFor( + () => { + expect(completionResult).not.toBeNull(); + }, + { timeout: 10000 } + ); + + expect(completionResult!.successfulRunCount).toBe(3); + // Rate limiter was called more times than items due to initial rejections + expect(limitCallCount).toBeGreaterThan(3); + } finally { + await queue.close(); + } + } + ); + }); }); diff --git a/internal-packages/run-engine/src/batch-queue/types.ts b/internal-packages/run-engine/src/batch-queue/types.ts index f472ff72bb5..aa02d84717a 100644 --- a/internal-packages/run-engine/src/batch-queue/types.ts +++ b/internal-packages/run-engine/src/batch-queue/types.ts @@ -213,8 +213,15 @@ export type BatchQueueOptions = { /** * Optional global rate limiter to limit processing across all consumers. * When configured, limits the max items/second processed globally. + * Rate limiting happens at the worker queue consumer level (1 token per item). */ globalRateLimiter?: GlobalRateLimiter; + /** + * Maximum number of items allowed in the worker queue before claiming pauses. + * Prevents unbounded worker queue growth which could cause visibility timeouts. + * Disabled by default (undefined = no limit). + */ + workerQueueMaxDepth?: number; /** Logger instance */ logger?: Logger; logLevel?: LogLevel; diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 846252398ed..9f6d19a9f42 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -398,6 +398,7 @@ export class RunEngine { consumerIntervalMs: options.batchQueue?.consumerIntervalMs ?? 100, defaultConcurrency: options.batchQueue?.defaultConcurrency ?? 10, globalRateLimiter: options.batchQueue?.globalRateLimiter, + workerQueueMaxDepth: options.batchQueue?.workerQueueMaxDepth, startConsumers: startBatchQueueConsumers, retry: options.batchQueue?.retry, tracer: options.tracer, diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index d0b12320f4f..9af808754ce 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -109,6 +109,8 @@ export type RunEngineOptions = { defaultConcurrency?: number; /** Optional global rate limiter to limit processing across all consumers */ globalRateLimiter?: GlobalRateLimiter; + /** Maximum worker queue depth before claiming pauses (protects visibility timeouts) */ + workerQueueMaxDepth?: number; /** Retry configuration for failed batch items */ retry?: { /** Maximum number of attempts (including the first). Default: 1 (no retries) */ diff --git a/packages/redis-worker/src/fair-queue/index.ts b/packages/redis-worker/src/fair-queue/index.ts index cf9d7d61977..0c0b7921a6b 100644 --- a/packages/redis-worker/src/fair-queue/index.ts +++ b/packages/redis-worker/src/fair-queue/index.ts @@ -24,7 +24,6 @@ import type { FairQueueKeyProducer, FairQueueOptions, FairScheduler, - GlobalRateLimiter, QueueCooloffState, QueueDescriptor, SchedulerContext, @@ -97,8 +96,9 @@ export class FairQueue { private maxCooloffStatesSize: number; private queueCooloffStates = new Map(); - // Global rate limiter - private globalRateLimiter?: GlobalRateLimiter; + // Worker queue backpressure + private workerQueueMaxDepth: number; + private workerQueueDepthCheckId?: string; // Consumer tracing private consumerTraceMaxIterations: number; @@ -152,8 +152,9 @@ export class FairQueue { this.cooloffPeriodMs = options.cooloff?.periodMs ?? 10_000; this.maxCooloffStatesSize = options.cooloff?.maxStatesSize ?? 1000; - // Global rate limiter - this.globalRateLimiter = options.globalRateLimiter; + // Worker queue backpressure + this.workerQueueMaxDepth = options.workerQueueMaxDepth ?? 0; + this.workerQueueDepthCheckId = options.workerQueueDepthCheckId; // Consumer tracing this.consumerTraceMaxIterations = options.consumerTraceMaxIterations ?? 500; @@ -1110,16 +1111,17 @@ export class FairQueue { maxClaimCount = Math.min(maxClaimCount, availableCapacity); } - // Check global rate limit - wait if rate limited - if (this.globalRateLimiter) { - const result = await this.globalRateLimiter.limit(); - if (!result.allowed && result.resetAt) { - const waitMs = Math.max(0, result.resetAt - Date.now()); - if (waitMs > 0) { - this.logger.debug("Global rate limit reached, waiting", { waitMs, loopId }); - await new Promise((resolve) => setTimeout(resolve, waitMs)); - } + // Check worker queue depth to prevent unbounded growth. + // Messages in the worker queue are already in-flight with a visibility timeout. + // If the queue is too deep, consumers can't keep up, and messages risk timing out. + if (this.workerQueueMaxDepth > 0 && this.workerQueueDepthCheckId) { + const depth = await this.workerQueueManager.getLength(this.workerQueueDepthCheckId); + if (depth >= this.workerQueueMaxDepth) { + return 0; } + // Cap claim size to remaining capacity so we don't overshoot the depth limit + const remainingCapacity = this.workerQueueMaxDepth - depth; + maxClaimCount = Math.min(maxClaimCount, remainingCapacity); } // Claim batch of messages with visibility timeout diff --git a/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts b/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts index 345d97c04d4..d827da15907 100644 --- a/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts +++ b/packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts @@ -1282,4 +1282,94 @@ describe("FairQueue", () => { ); }); + describe("worker queue depth cap", () => { + redisTest( + "should respect worker queue max depth and resume after draining", + { timeout: 30000 }, + async ({ redisOptions }) => { + const processed: string[] = []; + keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + + const scheduler = new DRRScheduler({ + redis: redisOptions, + keys, + quantum: 100, + maxDeficit: 1000, + }); + + const workerQueueManager = new WorkerQueueManager({ + redis: redisOptions, + keys, + }); + + // Create FairQueue with a small depth cap + const maxDepth = 3; + const queue = new TestFairQueueHelper(redisOptions, keys, { + scheduler, + payloadSchema: TestPayloadSchema, + shardCount: 1, + consumerCount: 1, + consumerIntervalMs: 50, + visibilityTimeoutMs: 30000, + workerQueueMaxDepth: maxDepth, + workerQueueDepthCheckId: TEST_WORKER_QUEUE_ID, + startConsumers: false, + }); + + // Use a slow handler to let the worker queue build up + queue.onMessage(async (ctx) => { + await new Promise((resolve) => setTimeout(resolve, 200)); + processed.push(ctx.message.payload.value); + await ctx.complete(); + }); + + // Enqueue 10 messages + const totalMessages = 10; + for (let i = 0; i < totalMessages; i++) { + await queue.enqueue({ + queueId: "tenant:t1:queue:q1", + tenantId: "t1", + payload: { value: `msg-${i}` }, + }); + } + + // Start processing and track peak worker queue depth + let peakDepth = 0; + let polling = true; + const depthPoller = (async () => { + while (polling) { + const depth = await workerQueueManager.getLength(TEST_WORKER_QUEUE_ID); + if (depth > peakDepth) { + peakDepth = depth; + } + await new Promise((resolve) => setTimeout(resolve, 25)); + } + })(); + + queue.start(); + + // Verify all messages eventually get processed (depth cap doesn't permanently block) + await vi.waitFor( + () => { + expect(processed.length).toBe(totalMessages); + }, + { timeout: 25000 } + ); + + polling = false; + await depthPoller; + + // Verify the depth cap was respected during processing + expect(peakDepth).toBeLessThanOrEqual(maxDepth); + + // Verify the worker queue is drained + const finalDepth = await workerQueueManager.getLength(TEST_WORKER_QUEUE_ID); + expect(finalDepth).toBe(0); + + await workerQueueManager.close(); + await queue.close(); + } + ); + }); + }); diff --git a/packages/redis-worker/src/fair-queue/types.ts b/packages/redis-worker/src/fair-queue/types.ts index d10cad1d0d4..3bc56b599fa 100644 --- a/packages/redis-worker/src/fair-queue/types.ts +++ b/packages/redis-worker/src/fair-queue/types.ts @@ -446,9 +446,23 @@ export interface FairQueueOptions 0 and the system uses a single shared worker queue. + * If not set, depth checking is disabled even if workerQueueMaxDepth is set. + */ + workerQueueDepthCheckId?: string; } // ============================================================================