Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
0b209c7
feat: add @trigger.dev/ai package with TriggerChatTransport
cursoragent Feb 15, 2026
c6fdda8
test: add comprehensive unit tests for TriggerChatTransport
cursoragent Feb 15, 2026
9241cd6
refactor: polish TriggerChatTransport implementation
cursoragent Feb 15, 2026
1d44a83
test: add abort signal, multiple sessions, and body merging tests
cursoragent Feb 15, 2026
ec633c1
chore: add changeset for @trigger.dev/ai package
cursoragent Feb 15, 2026
e70734a
refactor: remove internal ChatSessionState from public exports
cursoragent Feb 15, 2026
cf2490f
feat: support dynamic accessToken function for token refresh
cursoragent Feb 15, 2026
1ad672f
refactor: avoid double-resolving accessToken in sendMessages
cursoragent Feb 15, 2026
05232a2
feat: add chat transport and AI chat helpers to @trigger.dev/sdk
cursoragent Feb 15, 2026
dad08fa
test: move chat transport tests to @trigger.dev/sdk
cursoragent Feb 15, 2026
a26d67e
refactor: delete packages/ai/ — moved to @trigger.dev/sdk subpaths
cursoragent Feb 15, 2026
9727039
chore: update changeset to target @trigger.dev/sdk
cursoragent Feb 15, 2026
adc0933
fix: address CodeRabbit review feedback
cursoragent Feb 15, 2026
876454c
docs(ai): add AI Chat with useChat guide
cursoragent Feb 15, 2026
993ed9b
feat(reference): add ai-chat Next.js reference project
cursoragent Feb 15, 2026
01dc75a
fix(reference): use compatible @ai-sdk v3 packages, await convertToMo…
cursoragent Feb 15, 2026
076c32b
Use a single run with iterative waitpoint token completions
ericallam Feb 21, 2026
735e845
Added tool example
ericallam Feb 21, 2026
16bb046
expose a useTriggerChatTransport hook
ericallam Feb 21, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions .changeset/ai-sdk-chat-transport.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
---
"@trigger.dev/sdk": minor
---

Add AI SDK chat transport integration via two new subpath exports:

**`@trigger.dev/sdk/chat`** (frontend, browser-safe):
- `TriggerChatTransport` — custom `ChatTransport` for the AI SDK's `useChat` hook that runs chat completions as durable Trigger.dev tasks
- `createChatTransport()` — factory function

```tsx
import { useChat } from "@ai-sdk/react";
import { TriggerChatTransport } from "@trigger.dev/sdk/chat";

const { messages, sendMessage } = useChat({
transport: new TriggerChatTransport({
task: "my-chat-task",
accessToken,
}),
});
```

**`@trigger.dev/sdk/ai`** (backend, extends existing `ai.tool`/`ai.currentToolOptions`):
- `chatTask()` — pre-typed task wrapper with auto-pipe support
- `pipeChat()` — pipe a `StreamTextResult` or stream to the frontend
- `CHAT_STREAM_KEY` — the default stream key constant
- `ChatTaskPayload` type

```ts
import { chatTask } from "@trigger.dev/sdk/ai";
import { streamText, convertToModelMessages } from "ai";

export const myChatTask = chatTask({
id: "my-chat-task",
run: async ({ messages }) => {
return streamText({
model: openai("gpt-4o"),
messages: convertToModelMessages(messages),
});
},
});
Comment on lines +29 to +41
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# First, find and check the package.json to see which version of the AI SDK is used
find . -name "package.json" -type f | head -5

Repository: triggerdotdev/trigger.dev

Length of output: 234


🏁 Script executed:

# Check the dependencies in package.json files for AI SDK references
rg '"@ai-sdk/openai"|"ai":' --type json -A 2

Repository: triggerdotdev/trigger.dev

Length of output: 3300


🏁 Script executed:

# Look for existing examples of openai imports in the codebase
rg 'import.*openai.*from.*ai-sdk' -i --type ts --type tsx --type md

Repository: triggerdotdev/trigger.dev

Length of output: 96


🏁 Script executed:

# Search for openai imports in code files (fix the file type flag)
rg 'import.*openai.*from.*ai-sdk' -i -A 1 -B 1

Repository: triggerdotdev/trigger.dev

Length of output: 6928


🏁 Script executed:

# Also check the actual changeset file
cat -n ".changeset/ai-sdk-chat-transport.md"

Repository: triggerdotdev/trigger.dev

Length of output: 1589


Add missing openai provider import in backend snippet.

openai("gpt-4o") is used but not imported, which makes the example non-copy-pasteable as written.

Proposed doc snippet fix
 import { chatTask } from "@trigger.dev/sdk/ai";
 import { streamText, convertToModelMessages } from "ai";
+import { openai } from "@ai-sdk/openai";
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In @.changeset/ai-sdk-chat-transport.md around lines 29 - 41, The snippet uses
openai("gpt-4o") but the openai provider isn't imported; update the example to
import the OpenAI provider and include it alongside the existing imports so the
backend snippet is copy-pasteable — specifically add an import for openai (the
provider used in the run function) to the top of the file where chatTask,
streamText, and convertToModelMessages are imported so myChatTask can call
openai("gpt-4o") without error.

```
257 changes: 257 additions & 0 deletions .scratch/plan-graceful-oversized-batch-items.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
# Graceful handling of oversized batch items

## Prerequisites

This plan builds on top of PR #2980 which provides:
- `TriggerFailedTaskService` at `apps/webapp/app/runEngine/services/triggerFailedTask.server.ts` - creates pre-failed TaskRuns with proper trace events, waitpoint connections, and parent run associations
- `engine.createFailedTaskRun()` on RunEngine - creates a SYSTEM_FAILURE run with associated waitpoints
- Retry support in `processItemCallback` with `attempt` and `isFinalAttempt` params
- The callback already uses `TriggerFailedTaskService` for items that fail after retries

## Problem

When the NDJSON parser in `createNdjsonParserStream` detects an oversized line, it throws inside the TransformStream's `transform()` method. This aborts the request body stream (due to `pipeThrough` coupling), causing the client's `fetch()` to see `TypeError: fetch failed` instead of the server's 400 response. The SDK treats this as a connection error and retries with exponential backoff (~25s wasted).

## Goal

Instead of throwing, treat oversized items as per-item failures that flow through the existing batch failure pipeline. The batch seals normally, other items process fine, and the user sees a clear failure for the specific oversized item(s).

## Approach

The NDJSON parser emits an error marker object instead of throwing. `StreamBatchItemsService` detects these markers and enqueues the item to the FairQueue with error metadata in its options. The `processItemCallback` (already enhanced with `TriggerFailedTaskService` in PR #2980) detects the error metadata and creates a pre-failed run via `TriggerFailedTaskService`, which handles all the waitpoint/trace machinery.

## Changes

### 1. Byte-level key extractor for oversized lines

**`apps/webapp/app/runEngine/services/streamBatchItems.server.ts`** - new function

Add `extractIndexAndTask(bytes: Uint8Array): { index: number; task: string }` - a state machine that extracts top-level `"index"` and `"task"` values from raw bytes without decoding the full line.

How it works:
- Scan bytes tracking JSON nesting depth (count `{`/`[` vs `}`/`]`)
- At depth 1 (inside the top-level object), look for byte sequences matching `"index"` and `"task"` key patterns
- For `"index"`: after the `:`, parse the digit sequence as a number
- For `"task"`: after the `:`, find opening `"`, read bytes until closing `"`, decode just that slice
- Stop when both found, or after scanning 512 bytes (whichever comes first)
- Fallback: `index = -1`, `task = "unknown"` if not found

This avoids decoding/allocating the full 3MB line - only the first few hundred bytes are examined.

### 2. Modify `createNdjsonParserStream` to emit error markers

**`apps/webapp/app/runEngine/services/streamBatchItems.server.ts`**

Define a marker type:
```typescript
type OversizedItemMarker = {
__batchItemError: "OVERSIZED";
index: number;
task: string;
actualSize: number;
maxSize: number;
};
```

**Case 1 - Complete line exceeds limit** (newline found, `newlineIndex > maxItemBytes`):
- Call `extractLine(newlineIndex)` to consume the line from the buffer
- Call `extractIndexAndTask(lineBytes)` on the extracted bytes
- `controller.enqueue(marker)` instead of throwing
- Increment `lineNumber` and continue

**Case 2 - Incomplete line exceeds limit** (no newline, `totalBytes > maxItemBytes`):
- Call `extractIndexAndTask(concatenateChunks())` on current buffer
- `controller.enqueue(marker)`
- Clear the buffer (`chunks = []; totalBytes = 0`)
- Return from transform (don't throw)

**Case 3 - Flush with oversized remaining** (`totalBytes > maxItemBytes` in flush):
- Same as case 2 but in `flush()`.

### 3. Handle markers in `StreamBatchItemsService`

**`apps/webapp/app/runEngine/services/streamBatchItems.server.ts`** - in the `for await` loop

Before the existing `BatchItemNDJSONSchema.safeParse(rawItem)`, check for the marker:

```typescript
if (rawItem && typeof rawItem === "object" && (rawItem as any).__batchItemError === "OVERSIZED") {
const marker = rawItem as OversizedItemMarker;
const itemIndex = marker.index >= 0 ? marker.index : lastIndex + 1;

const errorMessage = `Batch item payload is too large (${(marker.actualSize / 1024).toFixed(1)} KB). Maximum allowed size is ${(marker.maxSize / 1024).toFixed(1)} KB. Reduce the payload size or offload large data to external storage.`;

// Enqueue the item normally but with error metadata in options.
// The processItemCallback will detect __error and use TriggerFailedTaskService
// to create a pre-failed run with proper waitpoint connections.
const batchItem: BatchItem = {
task: marker.task,
payload: "{}",
payloadType: "application/json",
options: {
__error: errorMessage,
__errorCode: "PAYLOAD_TOO_LARGE",
},
};

const result = await this._engine.enqueueBatchItem(
batchId, environment.id, itemIndex, batchItem
);

if (result.enqueued) {
itemsAccepted++;
} else {
itemsDeduplicated++;
}
lastIndex = itemIndex;
continue;
}
```

### 4. Handle `__error` items in `processItemCallback`

**`apps/webapp/app/v3/runEngineHandlers.server.ts`** - in the `setupBatchQueueCallbacks` function

In the `processItemCallback`, before the `TriggerTaskService.call()`, check for `__error` in `item.options`:

```typescript
const itemError = item.options?.__error as string | undefined;
if (itemError) {
const errorCode = (item.options?.__errorCode as string) ?? "ITEM_ERROR";

// Use TriggerFailedTaskService to create a pre-failed run.
// This creates a proper TaskRun with waitpoint connections so the
// parent's batchTriggerAndWait resolves correctly for this item.
const failedRunId = await triggerFailedTaskService.call({
taskId: item.task,
environment,
payload: item.payload ?? "{}",
payloadType: item.payloadType,
errorMessage: itemError,
errorCode: errorCode as TaskRunErrorCodes,
parentRunId: meta.parentRunId,
resumeParentOnCompletion: meta.resumeParentOnCompletion,
batch: { id: batchId, index: itemIndex },
traceContext: meta.traceContext as Record<string, unknown> | undefined,
spanParentAsLink: meta.spanParentAsLink,
});

if (failedRunId) {
span.setAttribute("batch.result.pre_failed", true);
span.setAttribute("batch.result.run_id", failedRunId);
span.end();
return { success: true as const, runId: failedRunId };
}

// Fallback if TriggerFailedTaskService fails
span.end();
return { success: false as const, error: itemError, errorCode };
}
```

Note: this returns `{ success: true, runId }` because the pre-failed run IS a real run. The BatchQueue records it as a success (run was created). The run itself is already in SYSTEM_FAILURE status, so the batch completion flow handles it correctly.

If `environment` is null (environment not found), fall through to the existing environment-not-found handling which already uses `triggerFailedTaskService.callWithoutTraceEvents()` on `isFinalAttempt`.

### 5. Handle undefined/null payload in BatchQueue serialization

**`internal-packages/run-engine/src/batch-queue/index.ts`** - in `#handleMessage`

Both payload serialization blocks (in the `success: false` branch and the `catch` block) do:
```typescript
const str = typeof item.payload === "string" ? item.payload : JSON.stringify(item.payload);
innerSpan?.setAttribute("batch.payloadSize", str.length);
```

`JSON.stringify(undefined)` returns `undefined`, causing `str.length` to crash. Fix both:
```typescript
const str =
item.payload === undefined || item.payload === null
? "{}"
: typeof item.payload === "string"
? item.payload
: JSON.stringify(item.payload);
```

### 6. Remove stale error handling in route

**`apps/webapp/app/routes/api.v3.batches.$batchId.items.ts`**

The `error.message.includes("exceeds maximum size")` branch is no longer reachable since oversized items don't throw. Remove that condition, keep the `"Invalid JSON"` check.

### 7. Remove `BatchItemTooLargeError` and SDK pre-validation

**`packages/core/src/v3/apiClient/errors.ts`** - remove `BatchItemTooLargeError` class

**`packages/core/src/v3/apiClient/index.ts`**:
- Remove `BatchItemTooLargeError` import
- Remove `instanceof BatchItemTooLargeError` check in the retry catch block
- Remove `MAX_BATCH_ITEM_BYTES` constant
- Remove size validation from `createNdjsonStream` (revert `encodeAndValidate` to simple encode)

**`packages/trigger-sdk/src/v3/shared.ts`** - remove `BatchItemTooLargeError` import and handling in `buildBatchErrorMessage`

**`packages/trigger-sdk/src/v3/index.ts`** - remove `BatchItemTooLargeError` re-export

### 8. Update tests

**`apps/webapp/test/engine/streamBatchItems.test.ts`**:
- Update "should reject lines exceeding maxItemBytes" to assert `OversizedItemMarker` emission instead of throw
- Update "should reject unbounded accumulation without newlines" similarly
- Update the emoji byte-size test to assert marker instead of throw

### 9. Update reference project test task

**`references/hello-world/src/trigger/batches.ts`**:
- Remove `BatchItemTooLargeError` import
- Update `batchSealFailureOversizedPayload` task to test the new behavior:
- Send 2 items: one normal, one oversized (~3.2MB)
- Assert `batchTriggerAndWait` returns (doesn't throw)
- Assert `results.runs[0].ok === true` (normal item succeeded)
- Assert `results.runs[1].ok === false` (oversized item failed)
- Assert error message contains "too large"

## Data flow

```
NDJSON bytes arrive
|
createNdjsonParserStream
|-- Line <= limit --> parse JSON --> enqueue object
`-- Line > limit --> extractIndexAndTask(bytes) --> enqueue OversizedItemMarker
|
StreamBatchItemsService for-await loop
|-- OversizedItemMarker --> engine.enqueueBatchItem() with __error in options
`-- Normal item --> validate --> engine.enqueueBatchItem()
|
FairQueue consumer (#handleMessage)
|-- __error in options --> processItemCallback detects it
| --> TriggerFailedTaskService.call()
| --> Creates pre-failed TaskRun with SYSTEM_FAILURE status
| --> Proper waitpoint + TaskRunWaitpoint connections created
| --> Returns { success: true, runId: failedRunFriendlyId }
`-- Normal item --> TriggerTaskService.call() --> creates normal run
|
Batch sealing: enqueuedCount === runCount (all items go through enqueueBatchItem)
Batch completion: all items have runs (real or pre-failed), waitpoints resolve normally
Parent run: batchTriggerAndWait resolves with per-item results
```
Comment on lines +216 to +238
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add a language to the fenced block to satisfy markdownlint MD040.

The flow diagram fence is unlabeled. Use a language tag (e.g., text) to pass linting.

Suggested fix
-```
+```text
 NDJSON bytes arrive
   |
 createNdjsonParserStream
@@
 Parent run: batchTriggerAndWait resolves with per-item results
</details>

<details>
<summary>🧰 Tools</summary>

<details>
<summary>🪛 markdownlint-cli2 (0.21.0)</summary>

[warning] 216-216: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

</details>

</details>

<details>
<summary>🤖 Prompt for AI Agents</summary>

Verify each finding against the current code and only fix it if needed.

In @.scratch/plan-graceful-oversized-batch-items.md around lines 216 - 238, The
fenced flow-diagram block is unlabeled which fails markdownlint MD040; update
the opening fence for the diagram to include a language tag (for example change
totext) so the block is recognized as code. Locate the diagram fenced
block in .scratch/plan-graceful-oversized-batch-items.md (the section showing
"NDJSON bytes arrive" through "batchTriggerAndWait resolves with per-item
results") and replace the opening fence with text keeping the existing closing unchanged.


</details>

<!-- fingerprinting:phantom:triton:hawk -->

<!-- This is an auto-generated comment by CodeRabbit -->


## Why this works

The key insight is that `TriggerFailedTaskService` (from PR #2980) creates a real `TaskRun` in `SYSTEM_FAILURE` status. This means:
1. A RUN waitpoint is created and connected to the parent via `TaskRunWaitpoint` with correct `batchId`/`batchIndex`
2. The run is immediately completed, which completes the waitpoint
3. The SDK's `waitForBatch` resolver for that index fires with the error result
4. The batch completion flow counts this as a processed item (it's a real run)
5. No special-casing needed in the batch completion callback

## Verification

1. Rebuild `@trigger.dev/core`, `@trigger.dev/sdk`, `@internal/run-engine`
2. Restart webapp + trigger dev
3. Trigger `batch-seal-failure-oversized` task - should complete in ~2-3s with:
- Normal item: `ok: true`
- Oversized item: `ok: false` with "too large" error
4. Run NDJSON parser tests: updated tests assert marker emission instead of throws
5. Run `pnpm run build --filter @internal/run-engine --filter @trigger.dev/core --filter @trigger.dev/sdk`
1 change: 1 addition & 0 deletions docs/docs.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
"tags",
"runs/metadata",
"tasks/streams",
"guides/ai-chat",
"run-usage",
"context",
"runs/priority",
Expand Down
Loading
Loading