Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/batch-rate-limiter-fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Move batch queue global rate limiter from FairQueue claim phase to BatchQueue worker queue consumer for accurate per-item rate limiting. Add worker queue depth cap to prevent unbounded growth that could cause visibility timeouts.
3 changes: 3 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,9 @@ const EnvironmentSchema = z
// Global rate limit: max items processed per second across all consumers
// If not set, no global rate limiting is applied
BATCH_QUEUE_GLOBAL_RATE_LIMIT: z.coerce.number().int().positive().optional(),
// Max items in the worker queue before claiming pauses (protects visibility timeouts)
// If not set, no depth limit is applied
BATCH_QUEUE_WORKER_QUEUE_MAX_DEPTH: z.coerce.number().int().positive().optional(),

ADMIN_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
ADMIN_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/v3/runEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ function createRunEngine() {
globalRateLimiter: env.BATCH_QUEUE_GLOBAL_RATE_LIMIT
? createBatchGlobalRateLimiter(env.BATCH_QUEUE_GLOBAL_RATE_LIMIT)
: undefined,
// Worker queue depth cap - prevents unbounded growth protecting visibility timeouts
workerQueueMaxDepth: env.BATCH_QUEUE_WORKER_QUEUE_MAX_DEPTH,
retry: {
maxAttempts: 6,
minTimeoutInMs: 1_000,
Expand Down
50 changes: 48 additions & 2 deletions internal-packages/run-engine/src/batch-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
isAbortError,
WorkerQueueManager,
type FairQueueOptions,
type GlobalRateLimiter,
} from "@trigger.dev/redis-worker";
import { BatchCompletionTracker } from "./completionTracker.js";
import type {
Expand Down Expand Up @@ -76,6 +77,7 @@ export class BatchQueue {
private abortController: AbortController;
private workerQueueConsumerLoops: Promise<void>[] = [];
private workerQueueBlockingTimeoutSeconds: number;
private globalRateLimiter?: GlobalRateLimiter;
private batchedSpanManager: BatchedSpanManager;

// Metrics
Expand All @@ -87,6 +89,7 @@ export class BatchQueue {
private batchProcessingDurationHistogram?: Histogram;
private itemQueueTimeHistogram?: Histogram;
private workerQueueLengthGauge?: ObservableGauge;
private rateLimitDeniedCounter?: Counter;

constructor(private options: BatchQueueOptions) {
this.logger = options.logger ?? new Logger("BatchQueue", options.logLevel ?? "info");
Expand All @@ -95,6 +98,7 @@ export class BatchQueue {
this.maxAttempts = options.retry?.maxAttempts ?? 1;
this.abortController = new AbortController();
this.workerQueueBlockingTimeoutSeconds = options.workerQueueBlockingTimeoutSeconds ?? 10;
this.globalRateLimiter = options.globalRateLimiter;

// Initialize metrics if meter is provided
if (options.meter) {
Expand Down Expand Up @@ -174,8 +178,9 @@ export class BatchQueue {
},
},
],
// Optional global rate limiter to limit max items/sec across all consumers
globalRateLimiter: options.globalRateLimiter,
// Worker queue depth cap to prevent unbounded growth (protects visibility timeouts)
workerQueueMaxDepth: options.workerQueueMaxDepth,
workerQueueDepthCheckId: BATCH_WORKER_QUEUE_ID,
// Enable retry with DLQ disabled when retry config is provided.
// BatchQueue handles the "final failure" in its own processing loop,
// so we don't need the DLQ - we just need the retry scheduling.
Expand Down Expand Up @@ -608,6 +613,11 @@ export class BatchQueue {
unit: "ms",
});

this.rateLimitDeniedCounter = meter.createCounter("batch_queue.rate_limit_denied", {
description: "Number of times the global rate limiter denied processing",
unit: "denials",
});

this.workerQueueLengthGauge = meter.createObservableGauge("batch_queue.worker_queue.length", {
description: "Number of items waiting in the batch worker queue",
unit: "items",
Expand Down Expand Up @@ -641,6 +651,42 @@ export class BatchQueue {
}

try {
// Rate limit per-item at the processing level (1 token per message).
// Loop until allowed so multiple consumers don't all rush through after one sleep.
if (this.globalRateLimiter) {
while (this.isRunning) {
const result = await this.globalRateLimiter.limit();
if (result.allowed) {
break;
}
this.rateLimitDeniedCounter?.add(1);
const waitMs = Math.max(10, (result.resetAt ?? Date.now()) - Date.now());
if (waitMs > 0) {
await new Promise<void>((resolve, reject) => {
const onAbort = () => {
clearTimeout(timer);
reject(this.abortController.signal.reason);
};
const timer = setTimeout(() => {
// Must remove listener when timeout fires, otherwise listeners accumulate
// (the { once: true } option only removes on abort, not on timeout)
this.abortController.signal.removeEventListener("abort", onAbort);
resolve();
}, waitMs);
if (this.abortController.signal.aborted) {
clearTimeout(timer);
reject(this.abortController.signal.reason);
return;
}
this.abortController.signal.addEventListener("abort", onAbort, { once: true });
});
}
}
if (!this.isRunning) {
break;
}
}

await this.batchedSpanManager.withBatchedSpan(
loopId,
async (span) => {
Expand Down
118 changes: 118 additions & 0 deletions internal-packages/run-engine/src/batch-queue/tests/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { redisTest } from "@internal/testcontainers";
import { describe, expect, vi } from "vitest";
import { BatchQueue } from "../index.js";
import type { GlobalRateLimiter } from "@trigger.dev/redis-worker";
import type { CompleteBatchResult, InitializeBatchOptions, BatchItem } from "../types.js";

vi.setConfig({ testTimeout: 60_000 });
Expand Down Expand Up @@ -658,4 +659,121 @@ describe("BatchQueue", () => {
}
);
});

describe("global rate limiter at worker queue consumer level", () => {
redisTest(
"should call rate limiter before each processing attempt",
async ({ redisContainer }) => {
let limitCallCount = 0;
const rateLimiter: GlobalRateLimiter = {
async limit() {
limitCallCount++;
return { allowed: true };
},
};

const queue = new BatchQueue({
redis: {
host: redisContainer.getHost(),
port: redisContainer.getPort(),
keyPrefix: "test:",
},
drr: { quantum: 5, maxDeficit: 50 },
consumerCount: 1,
consumerIntervalMs: 50,
startConsumers: true,
globalRateLimiter: rateLimiter,
});

let completionResult: CompleteBatchResult | null = null;

try {
queue.onProcessItem(async ({ itemIndex }) => {
return { success: true, runId: `run_${itemIndex}` };
});

queue.onBatchComplete(async (result) => {
completionResult = result;
});

const itemCount = 5;
await queue.initializeBatch(createInitOptions("batch1", "env1", itemCount));
await enqueueItems(queue, "batch1", "env1", createBatchItems(itemCount));

await vi.waitFor(
() => {
expect(completionResult).not.toBeNull();
},
{ timeout: 10000 }
);

expect(completionResult!.successfulRunCount).toBe(itemCount);
// Rate limiter is called before each blockingPop, including iterations
// where no message is available, so count >= items processed
expect(limitCallCount).toBeGreaterThanOrEqual(itemCount);
} finally {
await queue.close();
}
}
);

redisTest(
"should delay processing when rate limited",
async ({ redisContainer }) => {
let limitCallCount = 0;
const rateLimiter: GlobalRateLimiter = {
async limit() {
limitCallCount++;
// Rate limit the first 3 calls, then allow
if (limitCallCount <= 3) {
return { allowed: false, resetAt: Date.now() + 100 };
}
return { allowed: true };
},
};

const queue = new BatchQueue({
redis: {
host: redisContainer.getHost(),
port: redisContainer.getPort(),
keyPrefix: "test:",
},
drr: { quantum: 5, maxDeficit: 50 },
consumerCount: 1,
consumerIntervalMs: 50,
startConsumers: true,
globalRateLimiter: rateLimiter,
});

let completionResult: CompleteBatchResult | null = null;

try {
queue.onProcessItem(async ({ itemIndex }) => {
return { success: true, runId: `run_${itemIndex}` };
});

queue.onBatchComplete(async (result) => {
completionResult = result;
});

await queue.initializeBatch(createInitOptions("batch1", "env1", 3));
await enqueueItems(queue, "batch1", "env1", createBatchItems(3));

// Should still complete despite initial rate limiting
await vi.waitFor(
() => {
expect(completionResult).not.toBeNull();
},
{ timeout: 10000 }
);

expect(completionResult!.successfulRunCount).toBe(3);
// Rate limiter was called more times than items due to initial rejections
expect(limitCallCount).toBeGreaterThan(3);
} finally {
await queue.close();
}
}
);
});
});
7 changes: 7 additions & 0 deletions internal-packages/run-engine/src/batch-queue/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,15 @@ export type BatchQueueOptions = {
/**
* Optional global rate limiter to limit processing across all consumers.
* When configured, limits the max items/second processed globally.
* Rate limiting happens at the worker queue consumer level (1 token per item).
*/
globalRateLimiter?: GlobalRateLimiter;
/**
* Maximum number of items allowed in the worker queue before claiming pauses.
* Prevents unbounded worker queue growth which could cause visibility timeouts.
* Disabled by default (undefined = no limit).
*/
workerQueueMaxDepth?: number;
/** Logger instance */
logger?: Logger;
logLevel?: LogLevel;
Expand Down
1 change: 1 addition & 0 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ export class RunEngine {
consumerIntervalMs: options.batchQueue?.consumerIntervalMs ?? 100,
defaultConcurrency: options.batchQueue?.defaultConcurrency ?? 10,
globalRateLimiter: options.batchQueue?.globalRateLimiter,
workerQueueMaxDepth: options.batchQueue?.workerQueueMaxDepth,
startConsumers: startBatchQueueConsumers,
retry: options.batchQueue?.retry,
tracer: options.tracer,
Expand Down
2 changes: 2 additions & 0 deletions internal-packages/run-engine/src/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ export type RunEngineOptions = {
defaultConcurrency?: number;
/** Optional global rate limiter to limit processing across all consumers */
globalRateLimiter?: GlobalRateLimiter;
/** Maximum worker queue depth before claiming pauses (protects visibility timeouts) */
workerQueueMaxDepth?: number;
/** Retry configuration for failed batch items */
retry?: {
/** Maximum number of attempts (including the first). Default: 1 (no retries) */
Expand Down
30 changes: 16 additions & 14 deletions packages/redis-worker/src/fair-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import type {
FairQueueKeyProducer,
FairQueueOptions,
FairScheduler,
GlobalRateLimiter,
QueueCooloffState,
QueueDescriptor,
SchedulerContext,
Expand Down Expand Up @@ -97,8 +96,9 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
private maxCooloffStatesSize: number;
private queueCooloffStates = new Map<string, QueueCooloffState>();

// Global rate limiter
private globalRateLimiter?: GlobalRateLimiter;
// Worker queue backpressure
private workerQueueMaxDepth: number;
private workerQueueDepthCheckId?: string;

// Consumer tracing
private consumerTraceMaxIterations: number;
Expand Down Expand Up @@ -152,8 +152,9 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
this.cooloffPeriodMs = options.cooloff?.periodMs ?? 10_000;
this.maxCooloffStatesSize = options.cooloff?.maxStatesSize ?? 1000;

// Global rate limiter
this.globalRateLimiter = options.globalRateLimiter;
// Worker queue backpressure
this.workerQueueMaxDepth = options.workerQueueMaxDepth ?? 0;
this.workerQueueDepthCheckId = options.workerQueueDepthCheckId;

// Consumer tracing
this.consumerTraceMaxIterations = options.consumerTraceMaxIterations ?? 500;
Expand Down Expand Up @@ -1110,16 +1111,17 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
maxClaimCount = Math.min(maxClaimCount, availableCapacity);
}

// Check global rate limit - wait if rate limited
if (this.globalRateLimiter) {
const result = await this.globalRateLimiter.limit();
if (!result.allowed && result.resetAt) {
const waitMs = Math.max(0, result.resetAt - Date.now());
if (waitMs > 0) {
this.logger.debug("Global rate limit reached, waiting", { waitMs, loopId });
await new Promise((resolve) => setTimeout(resolve, waitMs));
}
// Check worker queue depth to prevent unbounded growth.
// Messages in the worker queue are already in-flight with a visibility timeout.
// If the queue is too deep, consumers can't keep up, and messages risk timing out.
if (this.workerQueueMaxDepth > 0 && this.workerQueueDepthCheckId) {
const depth = await this.workerQueueManager.getLength(this.workerQueueDepthCheckId);
if (depth >= this.workerQueueMaxDepth) {
return 0;
}
// Cap claim size to remaining capacity so we don't overshoot the depth limit
const remainingCapacity = this.workerQueueMaxDepth - depth;
maxClaimCount = Math.min(maxClaimCount, remainingCapacity);
}

// Claim batch of messages with visibility timeout
Expand Down
Loading