Skip to content

fix(batch): move batch queue global rate limiter to worker consumer level#3166

Merged
ericallam merged 7 commits intomainfrom
feature/tri-7697-fix-batch-queue-global-rate-limiter
Mar 3, 2026
Merged

fix(batch): move batch queue global rate limiter to worker consumer level#3166
ericallam merged 7 commits intomainfrom
feature/tri-7697-fix-batch-queue-global-rate-limiter

Conversation

@ericallam
Copy link
Member

The global rate limiter was being applied at the FairQueue claim phase,
consuming 1 token per queue-claim-attempt rather than per item processed.
With many small queues (each batch is its own queue), consumers burned
through tokens on empty or single-item queues, causing aggressive
throttling well below the intended items/sec limit.

Changes:

  • Move rate limiter from FairQueue claim phase to BatchQueue worker queue
    consumer loop (before blockingPop), so each token = 1 item processed
  • Replace the FairQueue rate limiter with a worker queue depth cap to
    prevent unbounded growth that could cause visibility timeouts
  • Add BATCH_QUEUE_WORKER_QUEUE_MAX_DEPTH env var (optional, disabled by default)

…evel

The global rate limiter was being applied at the FairQueue claim phase,
  consuming 1 token per queue-claim-attempt rather than per item processed.
  With many small queues (each batch is its own queue), consumers burned
  through tokens on empty or single-item queues, causing aggressive
  throttling well below the intended items/sec limit.

  Changes:
  - Move rate limiter from FairQueue claim phase to BatchQueue worker queue
    consumer loop (before blockingPop), so each token = 1 item processed
  - Replace the FairQueue rate limiter with a worker queue depth cap to
    prevent unbounded growth that could cause visibility timeouts
  - Add BATCH_QUEUE_WORKER_QUEUE_MAX_DEPTH env var (optional, disabled by default)
@changeset-bot
Copy link

changeset-bot bot commented Mar 3, 2026

⚠️ No Changeset found

Latest commit: a39c602

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 3, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cf67b8a and a39c602.

📒 Files selected for processing (2)
  • internal-packages/run-engine/src/batch-queue/index.ts
  • packages/redis-worker/src/fair-queue/types.ts
📜 Recent review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (27)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: sdk-compat / Cloudflare Workers
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: sdk-compat / Node.js 22.12 (ubuntu-latest)
  • GitHub Check: sdk-compat / Node.js 20.20 (ubuntu-latest)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: sdk-compat / Deno Runtime
  • GitHub Check: sdk-compat / Bun Runtime
🧰 Additional context used
📓 Path-based instructions (4)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead

**/*.{ts,tsx}: In TypeScript SDK usage, always import from @trigger.dev/sdk, never from @trigger.dev/sdk/v3 or use deprecated client.defineJob
Import from @trigger.dev/core subpaths only, never from the root
Use the Run Engine 2.0 (@internal/run-engine) and redis-worker for all new work, not legacy V1 MarQS queue or deprecated V1 functions

Files:

  • packages/redis-worker/src/fair-queue/types.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
**/*.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use function declarations instead of default exports

Files:

  • packages/redis-worker/src/fair-queue/types.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
**/*.ts

📄 CodeRabbit inference engine (.cursor/rules/otel-metrics.mdc)

**/*.ts: When creating or editing OTEL metrics (counters, histograms, gauges), ensure metric attributes have low cardinality by using only enums, booleans, bounded error codes, or bounded shard IDs
Do not use high-cardinality attributes in OTEL metrics such as UUIDs/IDs (envId, userId, runId, projectId, organizationId), unbounded integers (itemCount, batchSize, retryCount), timestamps (createdAt, startTime), or free-form strings (errorMessage, taskName, queueName)
When exporting OTEL metrics via OTLP to Prometheus, be aware that the exporter automatically adds unit suffixes to metric names (e.g., 'my_duration_ms' becomes 'my_duration_ms_milliseconds', 'my_counter' becomes 'my_counter_total'). Account for these transformations when writing Grafana dashboards or Prometheus queries

Files:

  • packages/redis-worker/src/fair-queue/types.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
**/*.{js,ts,jsx,tsx,json,md,yaml,yml}

📄 CodeRabbit inference engine (AGENTS.md)

Format code using Prettier before committing

Files:

  • packages/redis-worker/src/fair-queue/types.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
🧠 Learnings (13)
📓 Common learnings
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3166
File: packages/redis-worker/src/fair-queue/index.ts:1114-1121
Timestamp: 2026-03-03T13:07:56.462Z
Learning: In packages/redis-worker/src/fair-queue/index.ts, it's acceptable for the worker queue depth cap check to allow overshooting by up to batchClaimSize messages per iteration, as the next iteration will recheck and prevent sustained growth beyond the limit.
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3166
File: internal-packages/run-engine/src/batch-queue/tests/index.test.ts:711-713
Timestamp: 2026-03-03T13:07:27.810Z
Learning: In `internal-packages/run-engine/src/batch-queue/tests/index.test.ts`, test assertions for rate limiter stubs can use `toBeGreaterThanOrEqual` rather than exact equality (`toBe`) because the consumer loop may call the rate limiter during empty pops in addition to actual item processing, and this over-calling is acceptable in integration tests.
📚 Learning: 2026-03-03T13:07:56.462Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3166
File: packages/redis-worker/src/fair-queue/index.ts:1114-1121
Timestamp: 2026-03-03T13:07:56.462Z
Learning: In packages/redis-worker/src/fair-queue/index.ts, it's acceptable for the worker queue depth cap check to allow overshooting by up to batchClaimSize messages per iteration, as the next iteration will recheck and prevent sustained growth beyond the limit.

Applied to files:

  • packages/redis-worker/src/fair-queue/types.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2026-03-02T12:43:43.162Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: packages/redis-worker/CLAUDE.md:0-0
Timestamp: 2026-03-02T12:43:43.162Z
Learning: Applies to packages/redis-worker/**/redis-worker/src/queue.ts : Job queue abstraction should be Redis-backed in src/queue.ts

Applied to files:

  • packages/redis-worker/src/fair-queue/types.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2026-03-02T12:43:43.162Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: packages/redis-worker/CLAUDE.md:0-0
Timestamp: 2026-03-02T12:43:43.162Z
Learning: Applies to packages/redis-worker/**/redis-worker/src/worker.ts : Worker loop and job processing should implement concurrency control in src/worker.ts

Applied to files:

  • packages/redis-worker/src/fair-queue/types.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Control concurrency using the `queue` property with `concurrencyLimit` option

Applied to files:

  • packages/redis-worker/src/fair-queue/types.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2026-03-02T12:43:43.162Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: packages/redis-worker/CLAUDE.md:0-0
Timestamp: 2026-03-02T12:43:43.162Z
Learning: Applies to packages/redis-worker/**/*@(job|queue|worker|background).{ts,tsx} : Use trigger.dev/redis-worker for all new background job implementations, replacing graphile-worker and zodworker

Applied to files:

  • packages/redis-worker/src/fair-queue/types.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2026-03-03T13:07:27.810Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3166
File: internal-packages/run-engine/src/batch-queue/tests/index.test.ts:711-713
Timestamp: 2026-03-03T13:07:27.810Z
Learning: In `internal-packages/run-engine/src/batch-queue/tests/index.test.ts`, test assertions for rate limiter stubs can use `toBeGreaterThanOrEqual` rather than exact equality (`toBe`) because the consumer loop may call the rate limiter during empty pops in addition to actual item processing, and this over-calling is acceptable in integration tests.

Applied to files:

  • packages/redis-worker/src/fair-queue/types.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2026-03-02T12:42:41.093Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-02T12:42:41.093Z
Learning: Applies to **/*.{ts,tsx} : Use the Run Engine 2.0 (internal/run-engine) and redis-worker for all new work, not legacy V1 MarQS queue or deprecated V1 functions

Applied to files:

  • packages/redis-worker/src/fair-queue/types.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2026-03-02T12:42:56.102Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: apps/webapp/CLAUDE.md:0-0
Timestamp: 2026-03-02T12:42:56.102Z
Learning: Applies to apps/webapp/app/v3/*Worker.server.ts : New background job workers should use `trigger.dev/redis-worker` (e.g., `commonWorker.server.ts`, `alertsWorker.server.ts`, `batchTriggerWorker.server.ts`), not zodworker or graphile-worker

Applied to files:

  • packages/redis-worker/src/fair-queue/types.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2026-03-02T12:43:37.897Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: packages/core/CLAUDE.md:0-0
Timestamp: 2026-03-02T12:43:37.897Z
Learning: Exercise caution with changes to trigger.dev/core as they affect both the customer-facing SDK and server-side webapp - breaking changes can impact deployed user tasks and the platform simultaneously

Applied to files:

  • packages/redis-worker/src/fair-queue/types.ts
📚 Learning: 2026-03-02T12:43:25.243Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: internal-packages/run-engine/CLAUDE.md:0-0
Timestamp: 2026-03-02T12:43:25.243Z
Learning: Applies to internal-packages/run-engine/src/engine/systems/**/*.ts : Integrate OpenTelemetry tracer and meter instrumentation in RunEngine systems for observability

Applied to files:

  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2025-11-27T16:26:58.661Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/webapp.mdc:0-0
Timestamp: 2025-11-27T16:26:58.661Z
Learning: Use `trigger.dev/redis-worker` for background job and worker system needs in the webapp and run engine

Applied to files:

  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2026-01-12T17:18:09.451Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2870
File: apps/webapp/app/services/redisConcurrencyLimiter.server.ts:56-66
Timestamp: 2026-01-12T17:18:09.451Z
Learning: In `apps/webapp/app/services/redisConcurrencyLimiter.server.ts`, the query concurrency limiter will not be deployed with Redis Cluster mode, so multi-key operations (keyKey and globalKey in different hash slots) are acceptable and will function correctly in standalone Redis mode.

Applied to files:

  • internal-packages/run-engine/src/batch-queue/index.ts
🧬 Code graph analysis (1)
internal-packages/run-engine/src/batch-queue/index.ts (1)
packages/redis-worker/src/fair-queue/types.ts (1)
  • GlobalRateLimiter (15-21)
🔇 Additional comments (5)
packages/redis-worker/src/fair-queue/types.ts (1)

449-465: Backpressure option docs look clear and consistent.

The new workerQueueMaxDepth/workerQueueDepthCheckId comments clearly define enablement behavior and dependency, and align with the per-item rate-limiter move.

internal-packages/run-engine/src/batch-queue/index.ts (4)

21-21: LGTM!

Clean wiring for the optional GlobalRateLimiter dependency—type import, private field declaration, and constructor assignment are all correctly implemented.

Also applies to: 80-80, 101-101


616-619: LGTM!

The rateLimitDeniedCounter metric is properly defined with no high-cardinality attributes, following the coding guidelines for OTEL metrics.


181-183: LGTM!

Worker queue depth cap configuration is correctly wired, using the existing BATCH_WORKER_QUEUE_ID constant for the depth check. This provides backpressure to protect against visibility timeouts when the rate limiter moves to the consumer level.


654-688: LGTM!

The rate limiter implementation correctly addresses all prior review concerns:

  1. Minimum backoff: Math.max(10, ...) on line 663 prevents tight loops when resetAt is undefined or in the past.
  2. Retry until allowed: The while (this.isRunning) loop ensures consumers don't proceed until rate limit allows.
  3. Abort-aware sleep: The promise properly handles abort signals with listener cleanup to prevent memory leaks.

The abort handling pattern correctly removes the event listener both on timeout (line 673) and on abort (via { once: true }), preventing listener accumulation.


Walkthrough

This PR moves global rate limiting out of FairQueue's claim phase into BatchQueue's worker-queue consumer so rate limits apply per-item. FairQueue removes GlobalRateLimiter and implements backpressure via workerQueueMaxDepth and workerQueueDepthCheckId to cap claims. A new env var BATCH_QUEUE_WORKER_QUEUE_MAX_DEPTH is added and propagated through RunEngine into BatchQueue as workerQueueMaxDepth. Tests and a changelog note were added to verify per-item rate-limiter behavior and that the worker-queue depth is capped and drains.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Description check ⚠️ Warning The description explains the problem, solution, and specific changes clearly. However, it does not follow the provided template structure with Testing, Changelog, and Checklist sections. Restructure the description to match the template: add checklist, Testing section, and Changelog section as specified in the repository template.
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main change: moving the batch queue global rate limiter from FairQueue claim phase to BatchQueue worker consumer level for per-item accuracy.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/tri-7697-fix-batch-queue-global-rate-limiter

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

coderabbitai[bot]

This comment was marked as resolved.

devin-ai-integration[bot]

This comment was marked as resolved.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
internal-packages/run-engine/src/batch-queue/index.ts (1)

656-657: ⚠️ Potential issue | 🟡 Minor

Potential tight loop if resetAt is undefined or in the past.

When result.resetAt is undefined or a past timestamp, waitMs computes to 0, skipping the sleep entirely. If the rate limiter repeatedly returns {allowed: false} without a valid future resetAt, this becomes a tight CPU-spinning loop.

Consider adding a minimum wait and a fallback offset:

🛡️ Proposed defensive fix
-              const waitMs = Math.max(0, (result.resetAt ?? Date.now()) - Date.now());
+              const waitMs = Math.max(1, (result.resetAt ?? Date.now() + 100) - Date.now());
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal-packages/run-engine/src/batch-queue/index.ts` around lines 656 -
657, The tight-loop risk comes from computing waitMs solely as Math.max(0,
(result.resetAt ?? Date.now()) - Date.now()) when result.resetAt can be
undefined or past; change the logic around waitMs/result.resetAt to provide a
defensive fallback and minimum delay (e.g., introduce constants MIN_WAIT_MS and
FALLBACK_OFFSET_MS) and compute waitMs as Math.max(MIN_WAIT_MS, (
(result.resetAt ?? (Date.now() + FALLBACK_OFFSET_MS)) - Date.now() )); keep the
existing conditional that sleeps only when waitMs > 0 but use this new waitMs to
avoid CPU spinning when the rate limiter returns allowed: false without a future
resetAt.
🧹 Nitpick comments (2)
packages/redis-worker/src/fair-queue/index.ts (1)

155-157: Guard against silent backpressure misconfiguration.

If workerQueueMaxDepth > 0 but workerQueueDepthCheckId is unset, depth capping is effectively disabled without signal. Consider logging a warning (or throwing) during construction for this option combination.

Suggested constructor guard
    // Worker queue backpressure
    this.workerQueueMaxDepth = options.workerQueueMaxDepth ?? 0;
    this.workerQueueDepthCheckId = options.workerQueueDepthCheckId;
+   if (this.workerQueueMaxDepth > 0 && !this.workerQueueDepthCheckId) {
+     this.logger.warn("workerQueueMaxDepth is set but workerQueueDepthCheckId is missing; backpressure is disabled");
+   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/redis-worker/src/fair-queue/index.ts` around lines 155 - 157, The
constructor currently allows workerQueueMaxDepth > 0 while
workerQueueDepthCheckId is unset, silently disabling depth checks; in the
constructor (where workerQueueMaxDepth and workerQueueDepthCheckId are assigned)
add a guard that detects this mismatch and either log a clear warning via the
module logger or throw an Error, referencing the fields workerQueueMaxDepth and
workerQueueDepthCheckId and the constructor function to locate the change;
ensure the message explains that depth capping requires workerQueueDepthCheckId
when workerQueueMaxDepth > 0 and include the offending config values for easier
debugging.
internal-packages/run-engine/src/batch-queue/index.ts (1)

658-670: Consider removing abort listener on timer completion (optional).

When the timer fires before abort, the listener remains attached until stop() eventually fires the abort signal. While this is not a leak (due to once: true), removing the listener on timer completion is slightly cleaner.

♻️ Optional cleanup pattern
               await new Promise<void>((resolve, reject) => {
                 const timer = setTimeout(resolve, waitMs);
                 const onAbort = () => {
                   clearTimeout(timer);
                   reject(this.abortController.signal.reason);
                 };
                 if (this.abortController.signal.aborted) {
                   clearTimeout(timer);
                   reject(this.abortController.signal.reason);
                   return;
                 }
                 this.abortController.signal.addEventListener("abort", onAbort, { once: true });
+                // Clean up listener if timer fires first
+                const originalResolve = resolve;
+                setTimeout(() => {
+                  this.abortController.signal.removeEventListener("abort", onAbort);
+                  originalResolve();
+                }, waitMs);
               });

Alternatively, a simpler approach using AbortSignal.timeout() pattern if you want to explore that direction.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal-packages/run-engine/src/batch-queue/index.ts` around lines 658 -
670, The wait promise leaves the abort listener attached until abort fires even
though the timer resolved; in the block creating the Promise in batch-queue's
wait logic, update the timer-resolution path to remove the previously-registered
onAbort listener from this.abortController.signal (call
removeEventListener("abort", onAbort)) before resolving, or replace the whole
pattern with AbortSignal.timeout(...) if preferred; ensure you still
clearTimeout(timer) and handle the immediate-abort check and reject path the
same way so onAbort remains correctly referenced for removal.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@internal-packages/run-engine/src/batch-queue/index.ts`:
- Around line 656-657: The tight-loop risk comes from computing waitMs solely as
Math.max(0, (result.resetAt ?? Date.now()) - Date.now()) when result.resetAt can
be undefined or past; change the logic around waitMs/result.resetAt to provide a
defensive fallback and minimum delay (e.g., introduce constants MIN_WAIT_MS and
FALLBACK_OFFSET_MS) and compute waitMs as Math.max(MIN_WAIT_MS, (
(result.resetAt ?? (Date.now() + FALLBACK_OFFSET_MS)) - Date.now() )); keep the
existing conditional that sleeps only when waitMs > 0 but use this new waitMs to
avoid CPU spinning when the rate limiter returns allowed: false without a future
resetAt.

---

Nitpick comments:
In `@internal-packages/run-engine/src/batch-queue/index.ts`:
- Around line 658-670: The wait promise leaves the abort listener attached until
abort fires even though the timer resolved; in the block creating the Promise in
batch-queue's wait logic, update the timer-resolution path to remove the
previously-registered onAbort listener from this.abortController.signal (call
removeEventListener("abort", onAbort)) before resolving, or replace the whole
pattern with AbortSignal.timeout(...) if preferred; ensure you still
clearTimeout(timer) and handle the immediate-abort check and reject path the
same way so onAbort remains correctly referenced for removal.

In `@packages/redis-worker/src/fair-queue/index.ts`:
- Around line 155-157: The constructor currently allows workerQueueMaxDepth > 0
while workerQueueDepthCheckId is unset, silently disabling depth checks; in the
constructor (where workerQueueMaxDepth and workerQueueDepthCheckId are assigned)
add a guard that detects this mismatch and either log a clear warning via the
module logger or throw an Error, referencing the fields workerQueueMaxDepth and
workerQueueDepthCheckId and the constructor function to locate the change;
ensure the message explains that depth capping requires workerQueueDepthCheckId
when workerQueueMaxDepth > 0 and include the offending config values for easier
debugging.

ℹ️ Review info

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8d2b9ca and 22fdaff.

📒 Files selected for processing (4)
  • internal-packages/run-engine/src/batch-queue/index.ts
  • internal-packages/run-engine/src/batch-queue/tests/index.test.ts
  • packages/redis-worker/src/fair-queue/index.ts
  • packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
🚧 Files skipped from review as they are similar to previous changes (2)
  • internal-packages/run-engine/src/batch-queue/tests/index.test.ts
  • packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (25)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: sdk-compat / Node.js 22.12 (ubuntu-latest)
  • GitHub Check: sdk-compat / Cloudflare Workers
  • GitHub Check: sdk-compat / Node.js 20.20 (ubuntu-latest)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: sdk-compat / Bun Runtime
🧰 Additional context used
📓 Path-based instructions (4)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead

**/*.{ts,tsx}: In TypeScript SDK usage, always import from @trigger.dev/sdk, never from @trigger.dev/sdk/v3 or use deprecated client.defineJob
Import from @trigger.dev/core subpaths only, never from the root
Use the Run Engine 2.0 (@internal/run-engine) and redis-worker for all new work, not legacy V1 MarQS queue or deprecated V1 functions

Files:

  • packages/redis-worker/src/fair-queue/index.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
**/*.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use function declarations instead of default exports

Files:

  • packages/redis-worker/src/fair-queue/index.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
**/*.ts

📄 CodeRabbit inference engine (.cursor/rules/otel-metrics.mdc)

**/*.ts: When creating or editing OTEL metrics (counters, histograms, gauges), ensure metric attributes have low cardinality by using only enums, booleans, bounded error codes, or bounded shard IDs
Do not use high-cardinality attributes in OTEL metrics such as UUIDs/IDs (envId, userId, runId, projectId, organizationId), unbounded integers (itemCount, batchSize, retryCount), timestamps (createdAt, startTime), or free-form strings (errorMessage, taskName, queueName)
When exporting OTEL metrics via OTLP to Prometheus, be aware that the exporter automatically adds unit suffixes to metric names (e.g., 'my_duration_ms' becomes 'my_duration_ms_milliseconds', 'my_counter' becomes 'my_counter_total'). Account for these transformations when writing Grafana dashboards or Prometheus queries

Files:

  • packages/redis-worker/src/fair-queue/index.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
**/*.{js,ts,jsx,tsx,json,md,yaml,yml}

📄 CodeRabbit inference engine (AGENTS.md)

Format code using Prettier before committing

Files:

  • packages/redis-worker/src/fair-queue/index.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
🧠 Learnings (13)
📓 Common learnings
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3166
File: packages/redis-worker/src/fair-queue/index.ts:1114-1121
Timestamp: 2026-03-03T13:07:56.462Z
Learning: In packages/redis-worker/src/fair-queue/index.ts, it's acceptable for the worker queue depth cap check to allow overshooting by up to batchClaimSize messages per iteration, as the next iteration will recheck and prevent sustained growth beyond the limit.
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3166
File: internal-packages/run-engine/src/batch-queue/tests/index.test.ts:711-713
Timestamp: 2026-03-03T13:07:27.810Z
Learning: In `internal-packages/run-engine/src/batch-queue/tests/index.test.ts`, test assertions for rate limiter stubs can use `toBeGreaterThanOrEqual` rather than exact equality (`toBe`) because the consumer loop may call the rate limiter during empty pops in addition to actual item processing, and this over-calling is acceptable in integration tests.
📚 Learning: 2026-03-03T13:07:56.462Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3166
File: packages/redis-worker/src/fair-queue/index.ts:1114-1121
Timestamp: 2026-03-03T13:07:56.462Z
Learning: In packages/redis-worker/src/fair-queue/index.ts, it's acceptable for the worker queue depth cap check to allow overshooting by up to batchClaimSize messages per iteration, as the next iteration will recheck and prevent sustained growth beyond the limit.

Applied to files:

  • packages/redis-worker/src/fair-queue/index.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2026-03-02T12:43:43.162Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: packages/redis-worker/CLAUDE.md:0-0
Timestamp: 2026-03-02T12:43:43.162Z
Learning: Applies to packages/redis-worker/**/redis-worker/src/queue.ts : Job queue abstraction should be Redis-backed in src/queue.ts

Applied to files:

  • packages/redis-worker/src/fair-queue/index.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2026-03-02T12:43:43.162Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: packages/redis-worker/CLAUDE.md:0-0
Timestamp: 2026-03-02T12:43:43.162Z
Learning: Applies to packages/redis-worker/**/redis-worker/src/worker.ts : Worker loop and job processing should implement concurrency control in src/worker.ts

Applied to files:

  • packages/redis-worker/src/fair-queue/index.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2026-03-02T12:43:43.162Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: packages/redis-worker/CLAUDE.md:0-0
Timestamp: 2026-03-02T12:43:43.162Z
Learning: Applies to packages/redis-worker/**/*@(job|queue|worker|background).{ts,tsx} : Use trigger.dev/redis-worker for all new background job implementations, replacing graphile-worker and zodworker

Applied to files:

  • packages/redis-worker/src/fair-queue/index.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2026-03-02T12:42:41.093Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-02T12:42:41.093Z
Learning: Applies to **/*.{ts,tsx} : Use the Run Engine 2.0 (internal/run-engine) and redis-worker for all new work, not legacy V1 MarQS queue or deprecated V1 functions

Applied to files:

  • packages/redis-worker/src/fair-queue/index.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2026-03-03T13:07:27.810Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3166
File: internal-packages/run-engine/src/batch-queue/tests/index.test.ts:711-713
Timestamp: 2026-03-03T13:07:27.810Z
Learning: In `internal-packages/run-engine/src/batch-queue/tests/index.test.ts`, test assertions for rate limiter stubs can use `toBeGreaterThanOrEqual` rather than exact equality (`toBe`) because the consumer loop may call the rate limiter during empty pops in addition to actual item processing, and this over-calling is acceptable in integration tests.

Applied to files:

  • packages/redis-worker/src/fair-queue/index.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2026-03-02T12:42:56.102Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: apps/webapp/CLAUDE.md:0-0
Timestamp: 2026-03-02T12:42:56.102Z
Learning: Applies to apps/webapp/app/v3/*Worker.server.ts : New background job workers should use `trigger.dev/redis-worker` (e.g., `commonWorker.server.ts`, `alertsWorker.server.ts`, `batchTriggerWorker.server.ts`), not zodworker or graphile-worker

Applied to files:

  • packages/redis-worker/src/fair-queue/index.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Control concurrency using the `queue` property with `concurrencyLimit` option

Applied to files:

  • packages/redis-worker/src/fair-queue/index.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2026-02-10T16:18:48.654Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 2980
File: apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx:512-515
Timestamp: 2026-02-10T16:18:48.654Z
Learning: In apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx, environment.queueSizeLimit is a per-queue maximum that is configured at the environment level, not a shared limit across all queues. Each queue can have up to environment.queueSizeLimit items queued independently.

Applied to files:

  • packages/redis-worker/src/fair-queue/index.ts
📚 Learning: 2026-03-02T12:43:25.243Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: internal-packages/run-engine/CLAUDE.md:0-0
Timestamp: 2026-03-02T12:43:25.243Z
Learning: Applies to internal-packages/run-engine/src/engine/systems/**/*.ts : Integrate OpenTelemetry tracer and meter instrumentation in RunEngine systems for observability

Applied to files:

  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2025-11-27T16:26:58.661Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/webapp.mdc:0-0
Timestamp: 2025-11-27T16:26:58.661Z
Learning: Use `trigger.dev/redis-worker` for background job and worker system needs in the webapp and run engine

Applied to files:

  • internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2026-01-12T17:18:09.451Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2870
File: apps/webapp/app/services/redisConcurrencyLimiter.server.ts:56-66
Timestamp: 2026-01-12T17:18:09.451Z
Learning: In `apps/webapp/app/services/redisConcurrencyLimiter.server.ts`, the query concurrency limiter will not be deployed with Redis Cluster mode, so multi-key operations (keyKey and globalKey in different hash slots) are acceptable and will function correctly in standalone Redis mode.

Applied to files:

  • internal-packages/run-engine/src/batch-queue/index.ts
🧬 Code graph analysis (1)
internal-packages/run-engine/src/batch-queue/index.ts (1)
packages/redis-worker/src/fair-queue/types.ts (1)
  • GlobalRateLimiter (15-21)
🔇 Additional comments (5)
packages/redis-worker/src/fair-queue/index.ts (2)

99-101: Backpressure state addition looks clean.

The new private fields are well-scoped and typed, and they align with the depth-cap behavior introduced later in the claim path.


1114-1125: Depth-based claim throttling is correctly enforced.

The early return at cap plus maxClaimCount reduction to remaining capacity is a solid backpressure implementation for this path.

internal-packages/run-engine/src/batch-queue/index.ts (3)

21-21: LGTM!

Type import for GlobalRateLimiter is correctly added from the redis-worker package.


80-80: LGTM!

The optional globalRateLimiter field is properly typed and assigned from options, enabling per-item rate limiting at the consumer level as intended by this PR.

Also applies to: 100-100


180-182: LGTM!

Worker queue depth configuration is correctly passed to FairQueue, enabling backpressure to prevent unbounded queue growth as described in the PR objectives.

coderabbitai[bot]

This comment was marked as resolved.

@ericallam ericallam marked this pull request as ready for review March 3, 2026 13:52
devin-ai-integration[bot]

This comment was marked as resolved.

Copy link
Collaborator

@nicktrn nicktrn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couple of comments, none blocking

@ericallam ericallam merged commit dee6f1d into main Mar 3, 2026
38 checks passed
@ericallam ericallam deleted the feature/tri-7697-fix-batch-queue-global-rate-limiter branch March 3, 2026 14:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants