Skip to content

Commit 234604a

Browse files
committed
improvement(polling): fix correctness and efficiency across all polling handlers
- Gmail: paginate history API, add historyTypes filter, differentiate 403/429, fetch fresh historyId on fallback to break 404 retry loop - Outlook: follow @odata.nextLink pagination, use fetchWithRetry for all Graph calls, fix $top alignment, skip folder filter on partial resolution failure, remove Content-Type from GET requests - RSS: add conditional GET (ETag/If-None-Match), raise GUID cap to 500, fix 304 ETag capture per RFC 9111, align GUID tracking with idempotency fallback key - IMAP: single connection reuse, UIDVALIDITY tracking per mailbox, advance UID only on successful fetch, fix messageFlagsAdd range type, remove cross-mailbox legacy UID fallback - Dispatch polling via trigger.dev task with per-provider concurrency key; fall back to synchronous Redis-locked polling for self-hosted
1 parent 9fbe514 commit 234604a

6 files changed

Lines changed: 473 additions & 256 deletions

File tree

Lines changed: 66 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import { createLogger } from '@sim/logger'
22
import { type NextRequest, NextResponse } from 'next/server'
33
import { verifyCronAuth } from '@/lib/auth/internal'
4+
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
45
import { acquireLock, releaseLock } from '@/lib/core/config/redis'
56
import { generateShortId } from '@/lib/core/utils/uuid'
67
import { pollProvider, VALID_POLLING_PROVIDERS } from '@/lib/webhooks/polling'
8+
import { providerPolling } from '@/background/provider-polling'
79

810
const logger = createLogger('PollingAPI')
911

@@ -20,9 +22,6 @@ export async function GET(
2022
const { provider } = await params
2123
const requestId = generateShortId()
2224

23-
const LOCK_KEY = `${provider}-polling-lock`
24-
let lockValue: string | undefined
25-
2625
try {
2726
const authError = verifyCronAuth(request, `${provider} webhook polling`)
2827
if (authError) return authError
@@ -31,29 +30,75 @@ export async function GET(
3130
return NextResponse.json({ error: `Unknown polling provider: ${provider}` }, { status: 404 })
3231
}
3332

34-
lockValue = requestId
35-
const locked = await acquireLock(LOCK_KEY, lockValue, LOCK_TTL_SECONDS)
36-
if (!locked) {
37-
return NextResponse.json(
38-
{
33+
// When trigger.dev is enabled, dispatch polling as an async task and return immediately.
34+
// Per-provider concurrency (concurrencyKey) ensures only one poll per provider runs at a time,
35+
// while different providers (gmail vs outlook) can poll in parallel.
36+
if (isTriggerDevEnabled) {
37+
try {
38+
const handle = await providerPolling.trigger(
39+
{ provider, requestId },
40+
{
41+
concurrencyKey: provider,
42+
tags: [`provider:${provider}`],
43+
}
44+
)
45+
46+
logger.info(`[${requestId}] Dispatched ${provider} polling to trigger.dev`, {
47+
runId: handle.id,
48+
})
49+
50+
return NextResponse.json({
3951
success: true,
40-
message: 'Polling already in progress – skipped',
52+
message: `${provider} polling dispatched`,
4153
requestId,
42-
status: 'skip',
43-
},
44-
{ status: 202 }
45-
)
54+
runId: handle.id,
55+
status: 'dispatched',
56+
})
57+
} catch (triggerError) {
58+
// If trigger.dev is unavailable, fall through to synchronous polling below.
59+
logger.warn(
60+
`[${requestId}] Trigger.dev dispatch failed for ${provider}, falling back to synchronous polling`,
61+
{
62+
error: triggerError instanceof Error ? triggerError.message : String(triggerError),
63+
}
64+
)
65+
}
4666
}
4767

48-
const results = await pollProvider(provider)
68+
// Fallback: synchronous polling when trigger.dev is not enabled (self-hosted).
69+
// Redis lock prevents concurrent polls for the same provider.
70+
const LOCK_KEY = `${provider}-polling-lock`
71+
let lockValue: string | undefined
72+
73+
try {
74+
lockValue = requestId
75+
const locked = await acquireLock(LOCK_KEY, lockValue, LOCK_TTL_SECONDS)
76+
if (!locked) {
77+
return NextResponse.json(
78+
{
79+
success: true,
80+
message: 'Polling already in progress – skipped',
81+
requestId,
82+
status: 'skip',
83+
},
84+
{ status: 202 }
85+
)
86+
}
4987

50-
return NextResponse.json({
51-
success: true,
52-
message: `${provider} polling completed`,
53-
requestId,
54-
status: 'completed',
55-
...results,
56-
})
88+
const results = await pollProvider(provider)
89+
90+
return NextResponse.json({
91+
success: true,
92+
message: `${provider} polling completed`,
93+
requestId,
94+
status: 'completed',
95+
...results,
96+
})
97+
} finally {
98+
if (lockValue) {
99+
await releaseLock(LOCK_KEY, lockValue).catch(() => {})
100+
}
101+
}
57102
} catch (error) {
58103
logger.error(`Error during ${provider} polling (${requestId}):`, error)
59104
return NextResponse.json(
@@ -65,9 +110,5 @@ export async function GET(
65110
},
66111
{ status: 500 }
67112
)
68-
} finally {
69-
if (lockValue) {
70-
await releaseLock(LOCK_KEY, lockValue).catch(() => {})
71-
}
72113
}
73114
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import { createLogger } from '@sim/logger'
2+
import { task } from '@trigger.dev/sdk'
3+
import { pollProvider } from '@/lib/webhooks/polling'
4+
5+
const logger = createLogger('TriggerProviderPolling')
6+
7+
export type ProviderPollingPayload = {
8+
provider: string
9+
requestId: string
10+
}
11+
12+
export const providerPolling = task({
13+
id: 'provider-polling',
14+
machine: 'medium-1x',
15+
maxDuration: 300,
16+
retry: {
17+
maxAttempts: 1,
18+
},
19+
queue: {
20+
name: 'provider-polling',
21+
concurrencyLimit: 1,
22+
},
23+
run: async (payload: ProviderPollingPayload) => {
24+
const { provider, requestId } = payload
25+
26+
logger.info(`[${requestId}] Starting ${provider} polling`)
27+
28+
const result = await pollProvider(provider)
29+
30+
logger.info(`[${requestId}] ${provider} polling completed`, result)
31+
32+
return result
33+
},
34+
})

apps/sim/lib/webhooks/polling/gmail.ts

Lines changed: 76 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -151,44 +151,70 @@ async function fetchNewEmails(
151151
let latestHistoryId = config.historyId
152152

153153
if (useHistoryApi) {
154-
const historyUrl = `https://gmail.googleapis.com/gmail/v1/users/me/history?startHistoryId=${config.historyId}`
154+
const messageIds = new Set<string>()
155+
let pageToken: string | undefined
155156

156-
const historyResponse = await fetch(historyUrl, {
157-
headers: { Authorization: `Bearer ${accessToken}` },
158-
})
157+
do {
158+
let historyUrl = `https://gmail.googleapis.com/gmail/v1/users/me/history?startHistoryId=${config.historyId}&historyTypes=messageAdded`
159+
if (pageToken) {
160+
historyUrl += `&pageToken=${pageToken}`
161+
}
159162

160-
if (!historyResponse.ok) {
161-
const errorData = await historyResponse.json()
162-
logger.error(`[${requestId}] Gmail history API error:`, {
163-
status: historyResponse.status,
164-
statusText: historyResponse.statusText,
165-
error: errorData,
163+
const historyResponse = await fetch(historyUrl, {
164+
headers: { Authorization: `Bearer ${accessToken}` },
166165
})
167166

168-
logger.info(`[${requestId}] Falling back to search API after history API failure`)
169-
return searchEmails(accessToken, config, requestId, logger)
170-
}
167+
if (!historyResponse.ok) {
168+
const status = historyResponse.status
169+
const errorData = await historyResponse.json().catch(() => ({}))
170+
logger.error(`[${requestId}] Gmail history API error:`, {
171+
status,
172+
statusText: historyResponse.statusText,
173+
error: errorData,
174+
})
175+
176+
if (status === 403 || status === 429) {
177+
throw new Error(
178+
`Gmail API error ${status} — skipping to retry next poll cycle: ${JSON.stringify(errorData)}`
179+
)
180+
}
171181

172-
const historyData = await historyResponse.json()
182+
logger.info(`[${requestId}] Falling back to search API after history API error ${status}`)
183+
const searchResult = await searchEmails(accessToken, config, requestId, logger)
184+
// When search finds 0 emails after a history API failure, the stored historyId is likely
185+
// invalid. Fetch a fresh one from the profile API to break the potential 404 retry loop.
186+
if (searchResult.emails.length === 0) {
187+
const freshHistoryId = await getGmailProfileHistoryId(accessToken, requestId, logger)
188+
if (freshHistoryId) {
189+
logger.info(
190+
`[${requestId}] Fetched fresh historyId ${freshHistoryId} after invalid historyId (was: ${config.historyId})`
191+
)
192+
return { emails: [], latestHistoryId: freshHistoryId }
193+
}
194+
}
195+
return searchResult
196+
}
173197

174-
if (!historyData.history || !historyData.history.length) {
175-
return { emails: [], latestHistoryId }
176-
}
198+
const historyData = await historyResponse.json()
177199

178-
if (historyData.historyId) {
179-
latestHistoryId = historyData.historyId
180-
}
200+
if (historyData.historyId) {
201+
latestHistoryId = historyData.historyId
202+
}
181203

182-
const messageIds = new Set<string>()
183-
for (const history of historyData.history) {
184-
if (history.messagesAdded) {
185-
for (const messageAdded of history.messagesAdded) {
186-
messageIds.add(messageAdded.message.id)
204+
if (historyData.history) {
205+
for (const history of historyData.history) {
206+
if (history.messagesAdded) {
207+
for (const messageAdded of history.messagesAdded) {
208+
messageIds.add(messageAdded.message.id)
209+
}
210+
}
187211
}
188212
}
189-
}
190213

191-
if (messageIds.size === 0) {
214+
pageToken = historyData.nextPageToken
215+
} while (pageToken)
216+
217+
if (!messageIds.size) {
192218
return { emails: [], latestHistoryId }
193219
}
194220

@@ -352,6 +378,29 @@ async function searchEmails(
352378
}
353379
}
354380

381+
async function getGmailProfileHistoryId(
382+
accessToken: string,
383+
requestId: string,
384+
logger: ReturnType<typeof import('@sim/logger').createLogger>
385+
): Promise<string | null> {
386+
try {
387+
const response = await fetch('https://gmail.googleapis.com/gmail/v1/users/me/profile', {
388+
headers: { Authorization: `Bearer ${accessToken}` },
389+
})
390+
if (!response.ok) {
391+
logger.warn(
392+
`[${requestId}] Failed to fetch Gmail profile for fresh historyId: ${response.status}`
393+
)
394+
return null
395+
}
396+
const profile = await response.json()
397+
return (profile.historyId as string | undefined) ?? null
398+
} catch (error) {
399+
logger.warn(`[${requestId}] Error fetching Gmail profile:`, error)
400+
return null
401+
}
402+
}
403+
355404
async function getEmailDetails(accessToken: string, messageId: string): Promise<GmailEmail> {
356405
const messageUrl = `https://gmail.googleapis.com/gmail/v1/users/me/messages/${messageId}?format=full`
357406

0 commit comments

Comments
 (0)