Skip to content

Commit e38c062

Browse files
committed
Updates
1 parent 987092c commit e38c062

5 files changed

Lines changed: 283 additions & 336 deletions

File tree

apps/webapp/app/v3/services/aiRunFilterService.server.ts

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ const AIFilterResponseSchema = z
3030
export interface QueryQueues {
3131
query(
3232
search: string | undefined,
33-
type: "task" | "custom" | undefined
33+
type: "task" | "custom" | undefined,
3434
): Promise<{
3535
queues: string[];
3636
}>;
@@ -39,14 +39,14 @@ export interface QueryQueues {
3939
export interface QueryVersions {
4040
query(
4141
versionPrefix: string | undefined,
42-
isCurrent: boolean | undefined
42+
isCurrent: boolean | undefined,
4343
): Promise<
4444
| {
45-
versions: string[];
46-
}
45+
versions: string[];
46+
}
4747
| {
48-
version: string;
49-
}
48+
version: string;
49+
}
5050
>;
5151
}
5252

@@ -64,13 +64,13 @@ export interface QueryTasks {
6464

6565
export type AIFilterResult =
6666
| {
67-
success: true;
68-
filters: TaskRunListSearchFilters;
69-
}
67+
success: true;
68+
filters: TaskRunListSearchFilters;
69+
}
7070
| {
71-
success: false;
72-
error: string;
73-
};
71+
success: false;
72+
error: string;
73+
};
7474

7575
export class AIRunFilterService {
7676
constructor(
@@ -80,7 +80,7 @@ export class AIRunFilterService {
8080
queryQueues: QueryQueues;
8181
queryTasks: QueryTasks;
8282
},
83-
private readonly model: LanguageModelV1 = openai("gpt-4o-mini")
83+
private readonly model: LanguageModelV1 = openai("gpt-4o-mini"),
8484
) {}
8585

8686
async call(text: string, environmentId: string): Promise<AIFilterResult> {
@@ -92,7 +92,9 @@ export class AIRunFilterService {
9292
lookupTags: tool({
9393
description: "Look up available tags in the environment",
9494
parameters: z.object({
95-
query: z.string().optional().describe("Optional search query to filter tags"),
95+
query: z.string().optional().describe(
96+
"Optional search query to filter tags",
97+
),
9698
}),
9799
execute: async ({ query }) => {
98100
return await this.queryFns.queryTags.query(query);
@@ -110,22 +112,27 @@ export class AIRunFilterService {
110112
.string()
111113
.optional()
112114
.describe(
113-
"Optional version name to filter (e.g. 20250701.1), it uses contains to compare. Don't pass `latest` or `current`, the query has to be in the reverse date format specified. Leave out to get all recent versions."
115+
"Optional version name to filter (e.g. 20250701.1), it uses contains to compare. Don't pass `latest` or `current`, the query has to be in the reverse date format specified. Leave out to get all recent versions.",
114116
),
115117
}),
116118
execute: async ({ versionPrefix, isCurrent }) => {
117-
return await this.queryFns.queryVersions.query(versionPrefix, isCurrent);
119+
return await this.queryFns.queryVersions.query(
120+
versionPrefix,
121+
isCurrent,
122+
);
118123
},
119124
}),
120125
lookupQueues: tool({
121126
description: "Look up available queues in the environment",
122127
parameters: z.object({
123-
query: z.string().optional().describe("Optional search query to filter queues"),
128+
query: z.string().optional().describe(
129+
"Optional search query to filter queues",
130+
),
124131
type: z
125132
.enum(["task", "custom"])
126133
.optional()
127134
.describe(
128-
"Filter by queue type, only do this if the user specifies it explicitly."
135+
"Filter by queue type, only do this if the user specifies it explicitly.",
129136
),
130137
}),
131138
execute: async ({ query, type }) => {
@@ -142,12 +149,15 @@ export class AIRunFilterService {
142149
}),
143150
},
144151
maxSteps: 5,
145-
system: `You are an AI assistant that converts natural language descriptions into structured filter parameters for a task run filtering system.
152+
system:
153+
`You are an AI assistant that converts natural language descriptions into structured filter parameters for a task run filtering system.
146154
147155
Available filter options:
148156
- statuses: Array of run statuses (PENDING, EXECUTING, COMPLETED_SUCCESSFULLY, COMPLETED_WITH_ERRORS, CANCELED, TIMED_OUT, CRASHED, etc.)
149157
- period: Time period string (e.g., "1h", "7d", "30d", "1y")
150-
- from/to: ISO date string. Today's date is ${new Date().toISOString()}, if they only specify a day use the current month. If they don't specify a year use the current year. If they don't specify a time of day use midnight.
158+
- from/to: ISO date string. Today's date is ${
159+
new Date().toISOString()
160+
}, if they only specify a day use the current month. If they don't specify a year use the current year. If they don't specify a time of day use midnight.
151161
- tags: Array of tag names to filter by. Use the lookupTags tool to get the tags.
152162
- tasks: Array of task identifiers to filter by. Use the lookupTasks tool to get the tasks.
153163
- machines: Array of machine presets (micro, small, small-2x, medium, large, xlarge, etc.)
@@ -159,7 +169,7 @@ export class AIRunFilterService {
159169
- scheduleId: Specific schedule ID to filter by
160170
161171
162-
Common patterns to recognize:
172+
Common workflows to recognize:
163173
- "failed runs" → statuses: ["COMPLETED_WITH_ERRORS", "CRASHED", "TIMED_OUT", "SYSTEM_FAILURE"].
164174
- "runs not dequeued yet" → statuses: ["PENDING", "PENDING_VERSION", "DELAYED"]
165175
- If they say "only failed" then only use "COMPLETED_WITH_ERRORS".
@@ -232,7 +242,9 @@ export class AIRunFilterService {
232242
}
233243

234244
// Validate the filters against the schema to catch any issues
235-
const validationResult = AIFilters.safeParse(result.experimental_output.filters);
245+
const validationResult = AIFilters.safeParse(
246+
result.experimental_output.filters,
247+
);
236248
if (!validationResult.success) {
237249
logger.error("AI filter validation failed", {
238250
errors: validationResult.error.errors,
@@ -252,7 +264,9 @@ export class AIRunFilterService {
252264
from: validationResult.data.from
253265
? new Date(validationResult.data.from).getTime()
254266
: undefined,
255-
to: validationResult.data.to ? new Date(validationResult.data.to).getTime() : undefined,
267+
to: validationResult.data.to
268+
? new Date(validationResult.data.to).getTime()
269+
: undefined,
256270
},
257271
};
258272
} catch (error) {
Lines changed: 61 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,106 +1,88 @@
11
---
2-
title: "Data Processing & ETL"
3-
description: "Build reliable data processing and ETL pipelines with automatic retries, progress tracking, and no timeout limits using Trigger.dev"
2+
title: "Data processing & ETL workflows"
3+
sidebarTitle: "Data processing & ETL"
4+
description: "Learn how to use Trigger.dev for data processing and ETL including web scraping, database synchronization, batch enrichment, and streaming analytics workflows"
45
---
56

67
import UseCasesCards from "/snippets/use-cases-cards.mdx";
78

89
## Overview
910

10-
Data processing and ETL (Extract, Transform, Load) workflows require handling large datasets, complex transformations, and reliable data movement between systems. Build robust data pipelines in TypeScript with automatic retries, progress tracking, and no timeout limits; perfect for web scraping, database synchronization, real-time analytics, and large-scale data transformation.
11+
Build data pipelines that process large datasets without timeouts. Handle streaming analytics, batch enrichment, web scraping, database sync, and file processing with automatic retries and progress tracking.
1112

12-
## Basic data processing and ETL workflow implementation
13+
## Featured examples
1314

14-
A typical ETL pipeline:
15-
16-
1. **Extract**: Pull from APIs, databases, S3, or web scraping
17-
2. **Transform**: Clean, validate, enrich data
18-
3. **Load**: Write to warehouse, database, or storage
19-
4. **Monitor**: Track progress, handle failures
20-
21-
Each step is durable and retryable—if transformation fails, Trigger.dev automatically retries without re-extracting source data thanks to [checkpoint-resume](/how-it-works#the-checkpoint-resume-system) and [idempotency keys](/idempotency).
22-
23-
Trigger.dev is ideal for ETL pipelines because there are no [timeout limits](/runs/max-duration) (process datasets for hours or days), [batchTriggerAndWait()](/triggering#yourtask-batchtriggerandwait) parallelizes across thousands of records with [queue.concurrencyLimit](/queue-concurrency) to respect API rate limits, [metadata](/runs/metadata) + [realtime](/realtime) stream row-by-row progress to dashboards, and [schedules.task()](/tasks/scheduled) handles recurring jobs with cron syntax.
24-
25-
## Data processing workflow examples
26-
27-
<CardGroup cols={2}>
15+
<CardGroup cols={3}>
2816
<Card
2917
title="Realtime CSV importer"
3018
icon="book"
3119
href="/guides/example-projects/realtime-csv-importer"
3220
>
33-
Import CSV files with progress tracking streamed to the frontend.
21+
Import CSV files with progress streamed live to frontend.
3422
</Card>
3523
<Card title="Web scraper with BrowserBase" icon="book" href="/guides/examples/scrape-hacker-news">
36-
Scrape Hacker News using BrowserBase and Puppeteer, summarize with ChatGPT.
37-
</Card>
38-
<Card title="Firecrawl" icon="book" href="/guides/examples/firecrawl-url-crawl">
39-
Crawl URLs and return LLM-ready markdown using Firecrawl.
24+
Scrape websites using BrowserBase and Puppeteer.
4025
</Card>
4126
<Card
4227
title="Supabase database operations"
4328
icon="book"
4429
href="/guides/examples/supabase-database-operations"
4530
>
46-
Run CRUD operations on a Supabase database table.
47-
</Card>
48-
<Card title="Sequin database triggers" icon="book" href="/guides/frameworks/sequin">
49-
Trigger tasks from database changes using Sequin's CDC platform.
50-
</Card>
51-
<Card
52-
title="Sync Vercel environment variables"
53-
icon="book"
54-
href="/guides/examples/vercel-sync-env-vars"
55-
>
56-
Automatically sync environment variables from Vercel projects.
31+
Run CRUD operations on Supabase database tables.
5732
</Card>
5833
</CardGroup>
5934

60-
## Production use cases
61-
62-
<Card title="Papermark customer story" href="https://trigger.dev/customers/papermark-customer-story">
63-
64-
Read how Papermark processes thousands of documents per month using Trigger.dev.
65-
66-
</Card>
67-
68-
## Common data processing patterns
69-
70-
### Scheduled Data Syncs
71-
72-
Run ETL jobs on a schedule to keep systems in sync:
73-
74-
- Daily database exports and backups
75-
- Hourly API data pulls and transformations
76-
- Real-time webhook processing and routing
77-
- Periodic data warehouse updates
78-
79-
### Event-Driven Processing
80-
81-
Respond to data events with automated workflows:
82-
83-
- Process new database records as they're created
84-
- Transform uploaded files immediately
85-
- React to webhook events from external systems
86-
- Handle real-time data streams
87-
88-
### Batch Processing
89-
90-
Process large datasets efficiently:
91-
92-
- Import CSV files with thousands of rows
93-
- Bulk update records across systems
94-
- Process queued data in parallel batches
95-
- Generate reports from aggregated data
96-
97-
### Pipeline Orchestration
98-
99-
Chain multiple processing steps together:
100-
101-
- Extract from API → Transform → Load to database
102-
- Web scraping → Data cleaning → Analysis → Storage
103-
- File upload → Validation → Processing → Notification
104-
- Multi-source data aggregation and enrichment
35+
## Why Trigger.dev for data processing
36+
37+
**Process datasets for hours without timeouts**
38+
39+
Handle multi-hour transformations, large file processing, or complete database exports. No execution time limits.
40+
41+
**Parallel processing with built-in rate limiting**
42+
43+
Process thousands of records simultaneously while respecting API rate limits. Scale efficiently without overwhelming downstream services.
44+
45+
**Stream progress to your users in real-time**
46+
47+
Show row-by-row processing status updating live in your dashboard. Users see exactly where processing is and how long remains.
48+
49+
## Common workflows
50+
51+
Here are some basic examples of data processing and ETL workflows:
52+
53+
<Tabs>
54+
<Tab title="ETL pipeline">
55+
<Steps>
56+
<Step title="Extract">Pull from APIs, databases, S3, or web scraping</Step>
57+
<Step title="Transform">Clean, validate, enrich data</Step>
58+
<Step title="Load">Write to warehouse, database, or storage</Step>
59+
<Step title="Monitor">Track progress, handle failures</Step>
60+
</Steps>
61+
</Tab>
62+
<Tab title="Web scraping">
63+
<Steps>
64+
<Step title="Navigate">Load target pages with headless browser</Step>
65+
<Step title="Extract">Pull content, links, structured data</Step>
66+
<Step title="Transform">Clean HTML, parse JSON, normalize data</Step>
67+
<Step title="Store">Save to database or file storage</Step>
68+
</Steps>
69+
</Tab>
70+
<Tab title="Batch enrichment">
71+
<Steps>
72+
<Step title="Query">Fetch records needing enrichment</Step>
73+
<Step title="Enrich">Call external APIs in parallel batches</Step>
74+
<Step title="Validate">Check data quality and completeness</Step>
75+
<Step title="Update">Write enriched data back to database</Step>
76+
</Steps>
77+
</Tab>
78+
<Tab title="File processing">
79+
<Steps>
80+
<Step title="Upload">Receive file via webhook or storage event</Step>
81+
<Step title="Parse">Read CSV, JSON, XML, or binary format</Step>
82+
<Step title="Process">Transform, validate, chunk large files</Step>
83+
<Step title="Import">Bulk insert to database or data warehouse</Step>
84+
</Steps>
85+
</Tab>
86+
</Tabs>
10587

10688
<UseCasesCards />

0 commit comments

Comments
 (0)