Conversation
…evel
The global rate limiter was being applied at the FairQueue claim phase,
consuming 1 token per queue-claim-attempt rather than per item processed.
With many small queues (each batch is its own queue), consumers burned
through tokens on empty or single-item queues, causing aggressive
throttling well below the intended items/sec limit.
Changes:
- Move rate limiter from FairQueue claim phase to BatchQueue worker queue
consumer loop (before blockingPop), so each token = 1 item processed
- Replace the FairQueue rate limiter with a worker queue depth cap to
prevent unbounded growth that could cause visibility timeouts
- Add BATCH_QUEUE_WORKER_QUEUE_MAX_DEPTH env var (optional, disabled by default)
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review infoConfiguration used: Repository UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (2)
📜 Recent review details⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (27)
🧰 Additional context used📓 Path-based instructions (4)**/*.{ts,tsx}📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Files:
**/*.{ts,tsx,js,jsx}📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Files:
**/*.ts📄 CodeRabbit inference engine (.cursor/rules/otel-metrics.mdc)
Files:
**/*.{js,ts,jsx,tsx,json,md,yaml,yml}📄 CodeRabbit inference engine (AGENTS.md)
Files:
🧠 Learnings (13)📓 Common learnings📚 Learning: 2026-03-03T13:07:56.462ZApplied to files:
📚 Learning: 2026-03-02T12:43:43.162ZApplied to files:
📚 Learning: 2026-03-02T12:43:43.162ZApplied to files:
📚 Learning: 2025-11-27T16:27:35.304ZApplied to files:
📚 Learning: 2026-03-02T12:43:43.162ZApplied to files:
📚 Learning: 2026-03-03T13:07:27.810ZApplied to files:
📚 Learning: 2026-03-02T12:42:41.093ZApplied to files:
📚 Learning: 2026-03-02T12:42:56.102ZApplied to files:
📚 Learning: 2026-03-02T12:43:37.897ZApplied to files:
📚 Learning: 2026-03-02T12:43:25.243ZApplied to files:
📚 Learning: 2025-11-27T16:26:58.661ZApplied to files:
📚 Learning: 2026-01-12T17:18:09.451ZApplied to files:
🧬 Code graph analysis (1)internal-packages/run-engine/src/batch-queue/index.ts (1)
🔇 Additional comments (5)
WalkthroughThis PR moves global rate limiting out of FairQueue's claim phase into BatchQueue's worker-queue consumer so rate limits apply per-item. FairQueue removes GlobalRateLimiter and implements backpressure via Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes 🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
♻️ Duplicate comments (1)
internal-packages/run-engine/src/batch-queue/index.ts (1)
656-657:⚠️ Potential issue | 🟡 MinorPotential tight loop if
resetAtis undefined or in the past.When
result.resetAtis undefined or a past timestamp,waitMscomputes to 0, skipping the sleep entirely. If the rate limiter repeatedly returns{allowed: false}without a valid futureresetAt, this becomes a tight CPU-spinning loop.Consider adding a minimum wait and a fallback offset:
🛡️ Proposed defensive fix
- const waitMs = Math.max(0, (result.resetAt ?? Date.now()) - Date.now()); + const waitMs = Math.max(1, (result.resetAt ?? Date.now() + 100) - Date.now());🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal-packages/run-engine/src/batch-queue/index.ts` around lines 656 - 657, The tight-loop risk comes from computing waitMs solely as Math.max(0, (result.resetAt ?? Date.now()) - Date.now()) when result.resetAt can be undefined or past; change the logic around waitMs/result.resetAt to provide a defensive fallback and minimum delay (e.g., introduce constants MIN_WAIT_MS and FALLBACK_OFFSET_MS) and compute waitMs as Math.max(MIN_WAIT_MS, ( (result.resetAt ?? (Date.now() + FALLBACK_OFFSET_MS)) - Date.now() )); keep the existing conditional that sleeps only when waitMs > 0 but use this new waitMs to avoid CPU spinning when the rate limiter returns allowed: false without a future resetAt.
🧹 Nitpick comments (2)
packages/redis-worker/src/fair-queue/index.ts (1)
155-157: Guard against silent backpressure misconfiguration.If
workerQueueMaxDepth > 0butworkerQueueDepthCheckIdis unset, depth capping is effectively disabled without signal. Consider logging a warning (or throwing) during construction for this option combination.Suggested constructor guard
// Worker queue backpressure this.workerQueueMaxDepth = options.workerQueueMaxDepth ?? 0; this.workerQueueDepthCheckId = options.workerQueueDepthCheckId; + if (this.workerQueueMaxDepth > 0 && !this.workerQueueDepthCheckId) { + this.logger.warn("workerQueueMaxDepth is set but workerQueueDepthCheckId is missing; backpressure is disabled"); + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/redis-worker/src/fair-queue/index.ts` around lines 155 - 157, The constructor currently allows workerQueueMaxDepth > 0 while workerQueueDepthCheckId is unset, silently disabling depth checks; in the constructor (where workerQueueMaxDepth and workerQueueDepthCheckId are assigned) add a guard that detects this mismatch and either log a clear warning via the module logger or throw an Error, referencing the fields workerQueueMaxDepth and workerQueueDepthCheckId and the constructor function to locate the change; ensure the message explains that depth capping requires workerQueueDepthCheckId when workerQueueMaxDepth > 0 and include the offending config values for easier debugging.internal-packages/run-engine/src/batch-queue/index.ts (1)
658-670: Consider removing abort listener on timer completion (optional).When the timer fires before abort, the listener remains attached until
stop()eventually fires the abort signal. While this is not a leak (due toonce: true), removing the listener on timer completion is slightly cleaner.♻️ Optional cleanup pattern
await new Promise<void>((resolve, reject) => { const timer = setTimeout(resolve, waitMs); const onAbort = () => { clearTimeout(timer); reject(this.abortController.signal.reason); }; if (this.abortController.signal.aborted) { clearTimeout(timer); reject(this.abortController.signal.reason); return; } this.abortController.signal.addEventListener("abort", onAbort, { once: true }); + // Clean up listener if timer fires first + const originalResolve = resolve; + setTimeout(() => { + this.abortController.signal.removeEventListener("abort", onAbort); + originalResolve(); + }, waitMs); });Alternatively, a simpler approach using
AbortSignal.timeout()pattern if you want to explore that direction.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal-packages/run-engine/src/batch-queue/index.ts` around lines 658 - 670, The wait promise leaves the abort listener attached until abort fires even though the timer resolved; in the block creating the Promise in batch-queue's wait logic, update the timer-resolution path to remove the previously-registered onAbort listener from this.abortController.signal (call removeEventListener("abort", onAbort)) before resolving, or replace the whole pattern with AbortSignal.timeout(...) if preferred; ensure you still clearTimeout(timer) and handle the immediate-abort check and reject path the same way so onAbort remains correctly referenced for removal.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@internal-packages/run-engine/src/batch-queue/index.ts`:
- Around line 656-657: The tight-loop risk comes from computing waitMs solely as
Math.max(0, (result.resetAt ?? Date.now()) - Date.now()) when result.resetAt can
be undefined or past; change the logic around waitMs/result.resetAt to provide a
defensive fallback and minimum delay (e.g., introduce constants MIN_WAIT_MS and
FALLBACK_OFFSET_MS) and compute waitMs as Math.max(MIN_WAIT_MS, (
(result.resetAt ?? (Date.now() + FALLBACK_OFFSET_MS)) - Date.now() )); keep the
existing conditional that sleeps only when waitMs > 0 but use this new waitMs to
avoid CPU spinning when the rate limiter returns allowed: false without a future
resetAt.
---
Nitpick comments:
In `@internal-packages/run-engine/src/batch-queue/index.ts`:
- Around line 658-670: The wait promise leaves the abort listener attached until
abort fires even though the timer resolved; in the block creating the Promise in
batch-queue's wait logic, update the timer-resolution path to remove the
previously-registered onAbort listener from this.abortController.signal (call
removeEventListener("abort", onAbort)) before resolving, or replace the whole
pattern with AbortSignal.timeout(...) if preferred; ensure you still
clearTimeout(timer) and handle the immediate-abort check and reject path the
same way so onAbort remains correctly referenced for removal.
In `@packages/redis-worker/src/fair-queue/index.ts`:
- Around line 155-157: The constructor currently allows workerQueueMaxDepth > 0
while workerQueueDepthCheckId is unset, silently disabling depth checks; in the
constructor (where workerQueueMaxDepth and workerQueueDepthCheckId are assigned)
add a guard that detects this mismatch and either log a clear warning via the
module logger or throw an Error, referencing the fields workerQueueMaxDepth and
workerQueueDepthCheckId and the constructor function to locate the change;
ensure the message explains that depth capping requires workerQueueDepthCheckId
when workerQueueMaxDepth > 0 and include the offending config values for easier
debugging.
ℹ️ Review info
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
internal-packages/run-engine/src/batch-queue/index.tsinternal-packages/run-engine/src/batch-queue/tests/index.test.tspackages/redis-worker/src/fair-queue/index.tspackages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
🚧 Files skipped from review as they are similar to previous changes (2)
- internal-packages/run-engine/src/batch-queue/tests/index.test.ts
- packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (25)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: sdk-compat / Node.js 22.12 (ubuntu-latest)
- GitHub Check: sdk-compat / Cloudflare Workers
- GitHub Check: sdk-compat / Node.js 20.20 (ubuntu-latest)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: typecheck / typecheck
- GitHub Check: sdk-compat / Bun Runtime
🧰 Additional context used
📓 Path-based instructions (4)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead
**/*.{ts,tsx}: In TypeScript SDK usage, always import from@trigger.dev/sdk, never from@trigger.dev/sdk/v3or use deprecated client.defineJob
Import from@trigger.dev/coresubpaths only, never from the root
Use the Run Engine 2.0 (@internal/run-engine) and redis-worker for all new work, not legacy V1 MarQS queue or deprecated V1 functions
Files:
packages/redis-worker/src/fair-queue/index.tsinternal-packages/run-engine/src/batch-queue/index.ts
**/*.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Use function declarations instead of default exports
Files:
packages/redis-worker/src/fair-queue/index.tsinternal-packages/run-engine/src/batch-queue/index.ts
**/*.ts
📄 CodeRabbit inference engine (.cursor/rules/otel-metrics.mdc)
**/*.ts: When creating or editing OTEL metrics (counters, histograms, gauges), ensure metric attributes have low cardinality by using only enums, booleans, bounded error codes, or bounded shard IDs
Do not use high-cardinality attributes in OTEL metrics such as UUIDs/IDs (envId, userId, runId, projectId, organizationId), unbounded integers (itemCount, batchSize, retryCount), timestamps (createdAt, startTime), or free-form strings (errorMessage, taskName, queueName)
When exporting OTEL metrics via OTLP to Prometheus, be aware that the exporter automatically adds unit suffixes to metric names (e.g., 'my_duration_ms' becomes 'my_duration_ms_milliseconds', 'my_counter' becomes 'my_counter_total'). Account for these transformations when writing Grafana dashboards or Prometheus queries
Files:
packages/redis-worker/src/fair-queue/index.tsinternal-packages/run-engine/src/batch-queue/index.ts
**/*.{js,ts,jsx,tsx,json,md,yaml,yml}
📄 CodeRabbit inference engine (AGENTS.md)
Format code using Prettier before committing
Files:
packages/redis-worker/src/fair-queue/index.tsinternal-packages/run-engine/src/batch-queue/index.ts
🧠 Learnings (13)
📓 Common learnings
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3166
File: packages/redis-worker/src/fair-queue/index.ts:1114-1121
Timestamp: 2026-03-03T13:07:56.462Z
Learning: In packages/redis-worker/src/fair-queue/index.ts, it's acceptable for the worker queue depth cap check to allow overshooting by up to batchClaimSize messages per iteration, as the next iteration will recheck and prevent sustained growth beyond the limit.
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3166
File: internal-packages/run-engine/src/batch-queue/tests/index.test.ts:711-713
Timestamp: 2026-03-03T13:07:27.810Z
Learning: In `internal-packages/run-engine/src/batch-queue/tests/index.test.ts`, test assertions for rate limiter stubs can use `toBeGreaterThanOrEqual` rather than exact equality (`toBe`) because the consumer loop may call the rate limiter during empty pops in addition to actual item processing, and this over-calling is acceptable in integration tests.
📚 Learning: 2026-03-03T13:07:56.462Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3166
File: packages/redis-worker/src/fair-queue/index.ts:1114-1121
Timestamp: 2026-03-03T13:07:56.462Z
Learning: In packages/redis-worker/src/fair-queue/index.ts, it's acceptable for the worker queue depth cap check to allow overshooting by up to batchClaimSize messages per iteration, as the next iteration will recheck and prevent sustained growth beyond the limit.
Applied to files:
packages/redis-worker/src/fair-queue/index.tsinternal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2026-03-02T12:43:43.162Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: packages/redis-worker/CLAUDE.md:0-0
Timestamp: 2026-03-02T12:43:43.162Z
Learning: Applies to packages/redis-worker/**/redis-worker/src/queue.ts : Job queue abstraction should be Redis-backed in src/queue.ts
Applied to files:
packages/redis-worker/src/fair-queue/index.tsinternal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2026-03-02T12:43:43.162Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: packages/redis-worker/CLAUDE.md:0-0
Timestamp: 2026-03-02T12:43:43.162Z
Learning: Applies to packages/redis-worker/**/redis-worker/src/worker.ts : Worker loop and job processing should implement concurrency control in src/worker.ts
Applied to files:
packages/redis-worker/src/fair-queue/index.tsinternal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2026-03-02T12:43:43.162Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: packages/redis-worker/CLAUDE.md:0-0
Timestamp: 2026-03-02T12:43:43.162Z
Learning: Applies to packages/redis-worker/**/*@(job|queue|worker|background).{ts,tsx} : Use trigger.dev/redis-worker for all new background job implementations, replacing graphile-worker and zodworker
Applied to files:
packages/redis-worker/src/fair-queue/index.tsinternal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2026-03-02T12:42:41.093Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-02T12:42:41.093Z
Learning: Applies to **/*.{ts,tsx} : Use the Run Engine 2.0 (internal/run-engine) and redis-worker for all new work, not legacy V1 MarQS queue or deprecated V1 functions
Applied to files:
packages/redis-worker/src/fair-queue/index.tsinternal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2026-03-03T13:07:27.810Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3166
File: internal-packages/run-engine/src/batch-queue/tests/index.test.ts:711-713
Timestamp: 2026-03-03T13:07:27.810Z
Learning: In `internal-packages/run-engine/src/batch-queue/tests/index.test.ts`, test assertions for rate limiter stubs can use `toBeGreaterThanOrEqual` rather than exact equality (`toBe`) because the consumer loop may call the rate limiter during empty pops in addition to actual item processing, and this over-calling is acceptable in integration tests.
Applied to files:
packages/redis-worker/src/fair-queue/index.tsinternal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2026-03-02T12:42:56.102Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: apps/webapp/CLAUDE.md:0-0
Timestamp: 2026-03-02T12:42:56.102Z
Learning: Applies to apps/webapp/app/v3/*Worker.server.ts : New background job workers should use `trigger.dev/redis-worker` (e.g., `commonWorker.server.ts`, `alertsWorker.server.ts`, `batchTriggerWorker.server.ts`), not zodworker or graphile-worker
Applied to files:
packages/redis-worker/src/fair-queue/index.tsinternal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Control concurrency using the `queue` property with `concurrencyLimit` option
Applied to files:
packages/redis-worker/src/fair-queue/index.tsinternal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2026-02-10T16:18:48.654Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 2980
File: apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx:512-515
Timestamp: 2026-02-10T16:18:48.654Z
Learning: In apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx, environment.queueSizeLimit is a per-queue maximum that is configured at the environment level, not a shared limit across all queues. Each queue can have up to environment.queueSizeLimit items queued independently.
Applied to files:
packages/redis-worker/src/fair-queue/index.ts
📚 Learning: 2026-03-02T12:43:25.243Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: internal-packages/run-engine/CLAUDE.md:0-0
Timestamp: 2026-03-02T12:43:25.243Z
Learning: Applies to internal-packages/run-engine/src/engine/systems/**/*.ts : Integrate OpenTelemetry tracer and meter instrumentation in RunEngine systems for observability
Applied to files:
internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2025-11-27T16:26:58.661Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/webapp.mdc:0-0
Timestamp: 2025-11-27T16:26:58.661Z
Learning: Use `trigger.dev/redis-worker` for background job and worker system needs in the webapp and run engine
Applied to files:
internal-packages/run-engine/src/batch-queue/index.ts
📚 Learning: 2026-01-12T17:18:09.451Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2870
File: apps/webapp/app/services/redisConcurrencyLimiter.server.ts:56-66
Timestamp: 2026-01-12T17:18:09.451Z
Learning: In `apps/webapp/app/services/redisConcurrencyLimiter.server.ts`, the query concurrency limiter will not be deployed with Redis Cluster mode, so multi-key operations (keyKey and globalKey in different hash slots) are acceptable and will function correctly in standalone Redis mode.
Applied to files:
internal-packages/run-engine/src/batch-queue/index.ts
🧬 Code graph analysis (1)
internal-packages/run-engine/src/batch-queue/index.ts (1)
packages/redis-worker/src/fair-queue/types.ts (1)
GlobalRateLimiter(15-21)
🔇 Additional comments (5)
packages/redis-worker/src/fair-queue/index.ts (2)
99-101: Backpressure state addition looks clean.The new private fields are well-scoped and typed, and they align with the depth-cap behavior introduced later in the claim path.
1114-1125: Depth-based claim throttling is correctly enforced.The early return at cap plus
maxClaimCountreduction to remaining capacity is a solid backpressure implementation for this path.internal-packages/run-engine/src/batch-queue/index.ts (3)
21-21: LGTM!Type import for
GlobalRateLimiteris correctly added from the redis-worker package.
80-80: LGTM!The optional
globalRateLimiterfield is properly typed and assigned from options, enabling per-item rate limiting at the consumer level as intended by this PR.Also applies to: 100-100
180-182: LGTM!Worker queue depth configuration is correctly passed to FairQueue, enabling backpressure to prevent unbounded queue growth as described in the PR objectives.
nicktrn
left a comment
There was a problem hiding this comment.
couple of comments, none blocking
The global rate limiter was being applied at the FairQueue claim phase,
consuming 1 token per queue-claim-attempt rather than per item processed.
With many small queues (each batch is its own queue), consumers burned
through tokens on empty or single-item queues, causing aggressive
throttling well below the intended items/sec limit.
Changes:
consumer loop (before blockingPop), so each token = 1 item processed
prevent unbounded growth that could cause visibility timeouts