diff --git a/apps/sim/app/api/schedules/execute/route.ts b/apps/sim/app/api/schedules/execute/route.ts index 0d1e41a9e1f..176103d682f 100644 --- a/apps/sim/app/api/schedules/execute/route.ts +++ b/apps/sim/app/api/schedules/execute/route.ts @@ -8,6 +8,7 @@ import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq' import { generateRequestId } from '@/lib/core/utils/request' import { generateId } from '@/lib/core/utils/uuid' import { enqueueWorkspaceDispatch } from '@/lib/core/workspace-dispatch' +import { getWorkflowById } from '@/lib/workflows/utils' import { executeJobInline, executeScheduleJob, @@ -115,7 +116,6 @@ export async function GET(request: NextRequest) { } try { - const { getWorkflowById } = await import('@/lib/workflows/utils') const resolvedWorkflow = schedule.workflowId ? await getWorkflowById(schedule.workflowId) : null diff --git a/apps/sim/app/api/webhooks/poll/[provider]/route.ts b/apps/sim/app/api/webhooks/poll/[provider]/route.ts index d314e8563bb..053d328b0dd 100644 --- a/apps/sim/app/api/webhooks/poll/[provider]/route.ts +++ b/apps/sim/app/api/webhooks/poll/[provider]/route.ts @@ -20,9 +20,6 @@ export async function GET( const { provider } = await params const requestId = generateShortId() - const LOCK_KEY = `${provider}-polling-lock` - let lockValue: string | undefined - try { const authError = verifyCronAuth(request, `${provider} webhook polling`) if (authError) return authError @@ -31,29 +28,38 @@ export async function GET( return NextResponse.json({ error: `Unknown polling provider: ${provider}` }, { status: 404 }) } - lockValue = requestId - const locked = await acquireLock(LOCK_KEY, lockValue, LOCK_TTL_SECONDS) - if (!locked) { - return NextResponse.json( - { - success: true, - message: 'Polling already in progress – skipped', - requestId, - status: 'skip', - }, - { status: 202 } - ) - } + const LOCK_KEY = `${provider}-polling-lock` + let lockValue: string | undefined + + try { + lockValue = requestId + const locked = await acquireLock(LOCK_KEY, lockValue, LOCK_TTL_SECONDS) + if (!locked) { + return NextResponse.json( + { + success: true, + message: 'Polling already in progress – skipped', + requestId, + status: 'skip', + }, + { status: 202 } + ) + } - const results = await pollProvider(provider) + const results = await pollProvider(provider) - return NextResponse.json({ - success: true, - message: `${provider} polling completed`, - requestId, - status: 'completed', - ...results, - }) + return NextResponse.json({ + success: true, + message: `${provider} polling completed`, + requestId, + status: 'completed', + ...results, + }) + } finally { + if (lockValue) { + await releaseLock(LOCK_KEY, lockValue).catch(() => {}) + } + } } catch (error) { logger.error(`Error during ${provider} polling (${requestId}):`, error) return NextResponse.json( @@ -65,9 +71,5 @@ export async function GET( }, { status: 500 } ) - } finally { - if (lockValue) { - await releaseLock(LOCK_KEY, lockValue).catch(() => {}) - } } } diff --git a/apps/sim/lib/webhooks/polling/gmail.ts b/apps/sim/lib/webhooks/polling/gmail.ts index 7db8587d2c2..7ca379194f2 100644 --- a/apps/sim/lib/webhooks/polling/gmail.ts +++ b/apps/sim/lib/webhooks/polling/gmail.ts @@ -151,44 +151,68 @@ async function fetchNewEmails( let latestHistoryId = config.historyId if (useHistoryApi) { - const historyUrl = `https://gmail.googleapis.com/gmail/v1/users/me/history?startHistoryId=${config.historyId}` + const messageIds = new Set() + let pageToken: string | undefined - const historyResponse = await fetch(historyUrl, { - headers: { Authorization: `Bearer ${accessToken}` }, - }) + do { + let historyUrl = `https://gmail.googleapis.com/gmail/v1/users/me/history?startHistoryId=${config.historyId}&historyTypes=messageAdded` + if (pageToken) { + historyUrl += `&pageToken=${pageToken}` + } - if (!historyResponse.ok) { - const errorData = await historyResponse.json() - logger.error(`[${requestId}] Gmail history API error:`, { - status: historyResponse.status, - statusText: historyResponse.statusText, - error: errorData, + const historyResponse = await fetch(historyUrl, { + headers: { Authorization: `Bearer ${accessToken}` }, }) - logger.info(`[${requestId}] Falling back to search API after history API failure`) - return searchEmails(accessToken, config, requestId, logger) - } + if (!historyResponse.ok) { + const status = historyResponse.status + const errorData = await historyResponse.json().catch(() => ({})) + logger.error(`[${requestId}] Gmail history API error:`, { + status, + statusText: historyResponse.statusText, + error: errorData, + }) + + if (status === 403 || status === 429) { + throw new Error( + `Gmail API error ${status} — skipping to retry next poll cycle: ${JSON.stringify(errorData)}` + ) + } - const historyData = await historyResponse.json() + logger.info(`[${requestId}] Falling back to search API after history API error ${status}`) + const searchResult = await searchEmails(accessToken, config, requestId, logger) + if (searchResult.emails.length === 0) { + const freshHistoryId = await getGmailProfileHistoryId(accessToken, requestId, logger) + if (freshHistoryId) { + logger.info( + `[${requestId}] Fetched fresh historyId ${freshHistoryId} after invalid historyId (was: ${config.historyId})` + ) + return { emails: [], latestHistoryId: freshHistoryId } + } + } + return searchResult + } - if (!historyData.history || !historyData.history.length) { - return { emails: [], latestHistoryId } - } + const historyData = await historyResponse.json() - if (historyData.historyId) { - latestHistoryId = historyData.historyId - } + if (historyData.historyId) { + latestHistoryId = historyData.historyId + } - const messageIds = new Set() - for (const history of historyData.history) { - if (history.messagesAdded) { - for (const messageAdded of history.messagesAdded) { - messageIds.add(messageAdded.message.id) + if (historyData.history) { + for (const history of historyData.history) { + if (history.messagesAdded) { + for (const messageAdded of history.messagesAdded) { + messageIds.add(messageAdded.message.id) + } + } } } - } - if (messageIds.size === 0) { + pageToken = historyData.nextPageToken + } while (pageToken) + + if (!messageIds.size) { return { emails: [], latestHistoryId } } @@ -352,6 +376,29 @@ async function searchEmails( } } +async function getGmailProfileHistoryId( + accessToken: string, + requestId: string, + logger: ReturnType +): Promise { + try { + const response = await fetch('https://gmail.googleapis.com/gmail/v1/users/me/profile', { + headers: { Authorization: `Bearer ${accessToken}` }, + }) + if (!response.ok) { + logger.warn( + `[${requestId}] Failed to fetch Gmail profile for fresh historyId: ${response.status}` + ) + return null + } + const profile = await response.json() + return (profile.historyId as string | undefined) ?? null + } catch (error) { + logger.warn(`[${requestId}] Error fetching Gmail profile:`, error) + return null + } +} + async function getEmailDetails(accessToken: string, messageId: string): Promise { const messageUrl = `https://gmail.googleapis.com/gmail/v1/users/me/messages/${messageId}?format=full` @@ -442,9 +489,7 @@ async function processEmails( if (headers.date) { try { date = new Date(headers.date).toISOString() - } catch (_e) { - // Keep date as null if parsing fails - } + } catch (_e) {} } else if (email.internalDate) { date = new Date(Number.parseInt(email.internalDate)).toISOString() } diff --git a/apps/sim/lib/webhooks/polling/imap.ts b/apps/sim/lib/webhooks/polling/imap.ts index e5822aa8882..f82a8bb0bbb 100644 --- a/apps/sim/lib/webhooks/polling/imap.ts +++ b/apps/sim/lib/webhooks/polling/imap.ts @@ -22,6 +22,7 @@ interface ImapWebhookConfig { includeAttachments: boolean lastProcessedUid?: number lastProcessedUidByMailbox?: Record + uidValidityByMailbox?: Record lastCheckedTimestamp?: string maxEmailsPerPoll?: number } @@ -90,48 +91,90 @@ export const imapPollingHandler: PollingProviderHandler = { return 'failure' } - const { emails, latestUidByMailbox } = await fetchNewEmails( - config, - requestId, - hostValidation.resolvedIP!, - logger - ) - const pollTimestamp = new Date().toISOString() + const client = new ImapFlow({ + host: hostValidation.resolvedIP!, + servername: config.host, + port: config.port || 993, + secure: config.secure ?? true, + auth: { + user: config.username, + pass: config.password, + }, + tls: { rejectUnauthorized: true }, + logger: false, + }) + + let emails: Awaited>['emails'] = [] + let latestUidByMailbox: Record = {} + let uidValidityByMailbox: Record = {} - if (!emails || !emails.length) { - await updateImapState(webhookId, latestUidByMailbox, pollTimestamp, config, logger) - await markWebhookSuccess(webhookId, logger) - logger.info(`[${requestId}] No new emails found for webhook ${webhookId}`) - return 'success' - } + try { + await client.connect() + + const result = await fetchNewEmails(client, config, requestId, logger) + emails = result.emails + latestUidByMailbox = result.latestUidByMailbox + uidValidityByMailbox = result.uidValidityByMailbox + + const pollTimestamp = new Date().toISOString() + + if (!emails.length) { + await updateImapState( + webhookId, + latestUidByMailbox, + pollTimestamp, + config, + logger, + uidValidityByMailbox + ) + await markWebhookSuccess(webhookId, logger) + logger.info(`[${requestId}] No new emails found for webhook ${webhookId}`) + await client.logout() + return 'success' + } - logger.info(`[${requestId}] Found ${emails.length} new emails for webhook ${webhookId}`) + logger.info(`[${requestId}] Found ${emails.length} new emails for webhook ${webhookId}`) - const { processedCount, failedCount } = await processEmails( - emails, - webhookData, - workflowData, - config, - requestId, - hostValidation.resolvedIP!, - logger - ) + const { processedCount, failedCount } = await processEmails( + emails, + webhookData, + workflowData, + config, + client, + requestId, + logger + ) + + await updateImapState( + webhookId, + latestUidByMailbox, + pollTimestamp, + config, + logger, + uidValidityByMailbox + ) - await updateImapState(webhookId, latestUidByMailbox, pollTimestamp, config, logger) + await client.logout() - if (failedCount > 0 && processedCount === 0) { - await markWebhookFailed(webhookId, logger) - logger.warn( - `[${requestId}] All ${failedCount} emails failed to process for webhook ${webhookId}` + if (failedCount > 0 && processedCount === 0) { + await markWebhookFailed(webhookId, logger) + logger.warn( + `[${requestId}] All ${failedCount} emails failed to process for webhook ${webhookId}` + ) + return 'failure' + } + + await markWebhookSuccess(webhookId, logger) + logger.info( + `[${requestId}] Successfully processed ${processedCount} emails for webhook ${webhookId}${failedCount > 0 ? ` (${failedCount} failed)` : ''}` ) - return 'failure' + return 'success' + } catch (innerError) { + try { + await client.logout() + } catch {} + throw innerError } - - await markWebhookSuccess(webhookId, logger) - logger.info( - `[${requestId}] Successfully processed ${processedCount} emails for webhook ${webhookId}${failedCount > 0 ? ` (${failedCount} failed)` : ''}` - ) - return 'success' } catch (error) { logger.error(`[${requestId}] Error processing IMAP webhook ${webhookId}:`, error) await markWebhookFailed(webhookId, logger) @@ -145,13 +188,35 @@ async function updateImapState( uidByMailbox: Record, timestamp: string, config: ImapWebhookConfig, - logger: ReturnType + logger: ReturnType, + uidValidityByMailbox: Record ) { const existingUidByMailbox = config.lastProcessedUidByMailbox || {} - const mergedUidByMailbox = { ...existingUidByMailbox } + const prevUidValidity = config.uidValidityByMailbox || {} + + const resetMailboxes = new Set( + Object.entries(uidValidityByMailbox) + .filter( + ([mailbox, validity]) => + prevUidValidity[mailbox] !== undefined && prevUidValidity[mailbox] !== validity + ) + .map(([mailbox]) => mailbox) + ) + + const mergedUidByMailbox: Record = {} + + for (const [mailbox, uid] of Object.entries(existingUidByMailbox)) { + if (!resetMailboxes.has(mailbox)) { + mergedUidByMailbox[mailbox] = uid + } + } for (const [mailbox, uid] of Object.entries(uidByMailbox)) { - mergedUidByMailbox[mailbox] = Math.max(uid, mergedUidByMailbox[mailbox] || 0) + if (resetMailboxes.has(mailbox)) { + mergedUidByMailbox[mailbox] = uid + } else { + mergedUidByMailbox[mailbox] = Math.max(uid, mergedUidByMailbox[mailbox] || 0) + } } await updateWebhookProviderConfig( @@ -159,30 +224,18 @@ async function updateImapState( { lastProcessedUidByMailbox: mergedUidByMailbox, lastCheckedTimestamp: timestamp, + uidValidityByMailbox, }, logger ) } async function fetchNewEmails( + client: ImapFlow, config: ImapWebhookConfig, requestId: string, - resolvedIP: string, logger: ReturnType ) { - const client = new ImapFlow({ - host: resolvedIP, - servername: config.host, - port: config.port || 993, - secure: config.secure ?? true, - auth: { - user: config.username, - pass: config.password, - }, - tls: { rejectUnauthorized: true }, - logger: false, - }) - const emails: Array<{ uid: number mailboxPath: string @@ -193,97 +246,93 @@ async function fetchNewEmails( const mailboxes = getMailboxesToCheck(config) const latestUidByMailbox: Record = { ...(config.lastProcessedUidByMailbox || {}) } + const uidValidityByMailbox: Record = { ...(config.uidValidityByMailbox || {}) } - try { - await client.connect() - - const maxEmails = config.maxEmailsPerPoll || 25 - let totalEmailsCollected = 0 + const maxEmails = config.maxEmailsPerPoll || 25 + let totalEmailsCollected = 0 - for (const mailboxPath of mailboxes) { - if (totalEmailsCollected >= maxEmails) break + for (const mailboxPath of mailboxes) { + if (totalEmailsCollected >= maxEmails) break - try { - await client.mailboxOpen(mailboxPath) - - let searchCriteria: Record = { unseen: true } - if (config.searchCriteria) { - if (typeof config.searchCriteria === 'object') { - searchCriteria = config.searchCriteria as unknown as Record - } else if (typeof config.searchCriteria === 'string') { - try { - searchCriteria = JSON.parse(config.searchCriteria) - } catch { - logger.warn(`[${requestId}] Invalid search criteria JSON, using default`) - } - } - } + try { + const mailbox = await client.mailboxOpen(mailboxPath) - const lastUidForMailbox = latestUidByMailbox[mailboxPath] || config.lastProcessedUid + const currentUidValidity = mailbox.uidValidity.toString() + const storedUidValidity = uidValidityByMailbox[mailboxPath] - if (lastUidForMailbox) { - searchCriteria = { ...searchCriteria, uid: `${lastUidForMailbox + 1}:*` } - } + if (storedUidValidity && storedUidValidity !== currentUidValidity) { + logger.warn( + `[${requestId}] UIDVALIDITY changed for ${mailboxPath} (${storedUidValidity} -> ${currentUidValidity}), discarding stored UID` + ) + delete latestUidByMailbox[mailboxPath] + } + uidValidityByMailbox[mailboxPath] = currentUidValidity - if (config.lastCheckedTimestamp) { - const lastChecked = new Date(config.lastCheckedTimestamp) - const bufferTime = new Date(lastChecked.getTime() - 60000) - searchCriteria = { ...searchCriteria, since: bufferTime } - } else { - const oneDayAgo = new Date(Date.now() - 24 * 60 * 60 * 1000) - searchCriteria = { ...searchCriteria, since: oneDayAgo } + let searchCriteria: Record = { unseen: true } + if (config.searchCriteria) { + if (typeof config.searchCriteria === 'object') { + searchCriteria = config.searchCriteria as unknown as Record + } else if (typeof config.searchCriteria === 'string') { + try { + searchCriteria = JSON.parse(config.searchCriteria) + } catch { + logger.warn(`[${requestId}] Invalid search criteria JSON, using default`) + } } + } - let messageUids: number[] = [] - try { - const searchResult = await client.search(searchCriteria, { uid: true }) - messageUids = searchResult === false ? [] : searchResult - } catch { - continue - } + const lastUidForMailbox = latestUidByMailbox[mailboxPath] - if (messageUids.length === 0) continue + if (lastUidForMailbox) { + searchCriteria = { ...searchCriteria, uid: `${lastUidForMailbox + 1}:*` } + } - messageUids.sort((a, b) => a - b) - const remainingSlots = maxEmails - totalEmailsCollected - const uidsToProcess = messageUids.slice(0, remainingSlots) + if (config.lastCheckedTimestamp) { + const lastChecked = new Date(config.lastCheckedTimestamp) + const bufferTime = new Date(lastChecked.getTime() - 60000) + searchCriteria = { ...searchCriteria, since: bufferTime } + } else { + const oneDayAgo = new Date(Date.now() - 24 * 60 * 60 * 1000) + searchCriteria = { ...searchCriteria, since: oneDayAgo } + } - if (uidsToProcess.length > 0) { - latestUidByMailbox[mailboxPath] = Math.max( - ...uidsToProcess, - latestUidByMailbox[mailboxPath] || 0 - ) - } + let messageUids: number[] = [] + try { + const searchResult = await client.search(searchCriteria, { uid: true }) + messageUids = searchResult === false ? [] : searchResult + } catch { + continue + } - for await (const msg of client.fetch( - uidsToProcess, - { uid: true, envelope: true, bodyStructure: true, source: true }, - { uid: true } - )) { - emails.push({ - uid: msg.uid, - mailboxPath, - envelope: msg.envelope, - bodyStructure: msg.bodyStructure, - source: msg.source, - }) - totalEmailsCollected++ + if (messageUids.length === 0) continue + + messageUids.sort((a, b) => a - b) + const remainingSlots = maxEmails - totalEmailsCollected + const uidsToProcess = messageUids.slice(0, remainingSlots) + + for await (const msg of client.fetch( + uidsToProcess, + { uid: true, envelope: true, bodyStructure: true, source: true }, + { uid: true } + )) { + emails.push({ + uid: msg.uid, + mailboxPath, + envelope: msg.envelope, + bodyStructure: msg.bodyStructure, + source: msg.source, + }) + if (msg.uid > (latestUidByMailbox[mailboxPath] || 0)) { + latestUidByMailbox[mailboxPath] = msg.uid } - } catch (mailboxError) { - logger.warn(`[${requestId}] Error processing mailbox ${mailboxPath}:`, mailboxError) + totalEmailsCollected++ } + } catch (mailboxError) { + logger.warn(`[${requestId}] Error processing mailbox ${mailboxPath}:`, mailboxError) } - - await client.logout() - return { emails, latestUidByMailbox } - } catch (error) { - try { - await client.logout() - } catch { - // Ignore logout errors - } - throw error } + + return { emails, latestUidByMailbox, uidValidityByMailbox } } function getMailboxesToCheck(config: ImapWebhookConfig): string[] { @@ -331,9 +380,7 @@ function extractTextFromSource(source: Buffer): { text: string; html: string } { if (lowerPart.includes('base64')) { try { text = Buffer.from(text.replace(/\s/g, ''), 'base64').toString('utf-8') - } catch { - // Keep as-is if base64 decode fails - } + } catch {} } } } else if (lowerPart.includes('content-type: text/html')) { @@ -348,9 +395,7 @@ function extractTextFromSource(source: Buffer): { text: string; html: string } { if (lowerPart.includes('base64')) { try { html = Buffer.from(html.replace(/\s/g, ''), 'base64').toString('utf-8') - } catch { - // Keep as-is if base64 decode fails - } + } catch {} } } } @@ -405,9 +450,7 @@ function extractAttachmentsFromSource( mimeType, size: buffer.length, }) - } catch { - // Skip if decode fails - } + } catch {} } } } @@ -437,34 +480,17 @@ async function processEmails( webhookData: PollWebhookContext['webhookData'], workflowData: PollWebhookContext['workflowData'], config: ImapWebhookConfig, + client: ImapFlow, requestId: string, - resolvedIP: string, logger: ReturnType ) { let processedCount = 0 let failedCount = 0 - const client = new ImapFlow({ - host: resolvedIP, - servername: config.host, - port: config.port || 993, - secure: config.secure ?? true, - auth: { - user: config.username, - pass: config.password, - }, - tls: { rejectUnauthorized: true }, - logger: false, - }) - let currentOpenMailbox: string | null = null const lockState: { lock: MailboxLockObject | null } = { lock: null } try { - if (config.markAsRead) { - await client.connect() - } - for (const email of emails) { try { await pollingIdempotency.executeWithIdempotency( @@ -541,7 +567,7 @@ async function processEmails( lockState.lock = await client.getMailboxLock(email.mailboxPath) currentOpenMailbox = email.mailboxPath } - await client.messageFlagsAdd({ uid: email.uid }, ['\\Seen'], { uid: true }) + await client.messageFlagsAdd(email.uid, ['\\Seen'], { uid: true }) } catch (flagError) { logger.warn( `[${requestId}] Failed to mark message ${email.uid} as read:`, @@ -565,15 +591,10 @@ async function processEmails( } } } finally { - if (config.markAsRead) { + if (lockState.lock) { try { - if (lockState.lock) { - lockState.lock.release() - } - await client.logout() - } catch { - // Ignore logout errors - } + lockState.lock.release() + } catch {} } } diff --git a/apps/sim/lib/webhooks/polling/outlook.ts b/apps/sim/lib/webhooks/polling/outlook.ts index e6874940c61..faef69776e5 100644 --- a/apps/sim/lib/webhooks/polling/outlook.ts +++ b/apps/sim/lib/webhooks/polling/outlook.ts @@ -1,5 +1,6 @@ import { htmlToText } from 'html-to-text' import { pollingIdempotency } from '@/lib/core/idempotency/service' +import { fetchWithRetry } from '@/lib/knowledge/documents/utils' import type { PollingProviderHandler, PollWebhookContext } from '@/lib/webhooks/polling/types' import { markWebhookFailed, @@ -166,6 +167,12 @@ export const outlookPollingHandler: PollingProviderHandler = { }, } +/** Hard cap on total emails fetched per poll to prevent unbounded pagination loops. */ +const OUTLOOK_HARD_MAX_EMAILS = 200 + +/** Number of items to request per Graph API page. Decoupled from the total cap so pagination actually runs. */ +const OUTLOOK_PAGE_SIZE = 50 + async function fetchNewOutlookEmails( accessToken: string, config: OutlookWebhookConfig, @@ -181,53 +188,77 @@ async function fetchNewOutlookEmails( 'id,conversationId,subject,bodyPreview,body,from,toRecipients,ccRecipients,receivedDateTime,sentDateTime,hasAttachments,isRead,parentFolderId' ) params.append('$orderby', 'receivedDateTime desc') - params.append('$top', (config.maxEmailsPerPoll || 25).toString()) + const maxEmails = Math.min(config.maxEmailsPerPoll || 25, OUTLOOK_HARD_MAX_EMAILS) + params.append('$top', OUTLOOK_PAGE_SIZE.toString()) if (config.lastCheckedTimestamp) { const lastChecked = new Date(config.lastCheckedTimestamp) const bufferTime = new Date(lastChecked.getTime() - 60000) params.append('$filter', `receivedDateTime gt ${bufferTime.toISOString()}`) } + const allEmails: OutlookEmail[] = [] + let nextUrl: string | undefined = `${apiUrl}?${params.toString()}` + logger.info(`[${requestId}] Fetching emails from: ${nextUrl}`) - const fullUrl = `${apiUrl}?${params.toString()}` - logger.info(`[${requestId}] Fetching emails from: ${fullUrl}`) + while (nextUrl && allEmails.length < maxEmails) { + const response = await fetchWithRetry(nextUrl, { + headers: { + Authorization: `Bearer ${accessToken}`, + }, + }) - const response = await fetch(fullUrl, { - headers: { - Authorization: `Bearer ${accessToken}`, - 'Content-Type': 'application/json', - }, - }) + if (!response.ok) { + const errorData = await response + .json() + .catch(() => ({ error: { message: 'Unknown error' } })) + logger.error(`[${requestId}] Microsoft Graph API error:`, { + status: response.status, + statusText: response.statusText, + error: errorData, + }) + throw new Error( + `Microsoft Graph API error: ${response.status} ${response.statusText} - ${JSON.stringify(errorData)}` + ) + } - if (!response.ok) { - const errorData = await response.json().catch(() => ({ error: { message: 'Unknown error' } })) - logger.error(`[${requestId}] Microsoft Graph API error:`, { - status: response.status, - statusText: response.statusText, - error: errorData, - }) - throw new Error( - `Microsoft Graph API error: ${response.status} ${response.statusText} - ${JSON.stringify(errorData)}` - ) + const data = await response.json() + const pageEmails: OutlookEmail[] = data.value || [] + const remaining = maxEmails - allEmails.length + allEmails.push(...pageEmails.slice(0, remaining)) + + nextUrl = + allEmails.length < maxEmails ? (data['@odata.nextLink'] as string | undefined) : undefined + + if (pageEmails.length === 0) break } - const data = await response.json() - const emails = data.value || [] + logger.info(`[${requestId}] Fetched ${allEmails.length} emails total`) + + const emails = allEmails let resolvedFolderIds: Map | undefined + let skipFolderFilter = false if (config.folderIds && config.folderIds.length > 0) { - const hasWellKnownFolders = config.folderIds.some(isWellKnownFolderName) - if (hasWellKnownFolders) { + const wellKnownFolders = config.folderIds.filter(isWellKnownFolderName) + if (wellKnownFolders.length > 0) { resolvedFolderIds = await resolveWellKnownFolderIds( accessToken, config.folderIds, requestId, logger ) + if (resolvedFolderIds.size < wellKnownFolders.length) { + logger.warn( + `[${requestId}] Could not resolve all well-known folders (${resolvedFolderIds.size}/${wellKnownFolders.length}) — skipping folder filter to avoid incorrect results` + ) + skipFolderFilter = true + } } } - const filteredEmails = filterEmailsByFolder(emails, config, resolvedFolderIds) + const filteredEmails = skipFolderFilter + ? emails + : filterEmailsByFolder(emails, config, resolvedFolderIds) logger.info( `[${requestId}] Fetched ${emails.length} emails, ${filteredEmails.length} after filtering` @@ -262,12 +293,14 @@ async function resolveWellKnownFolderId( logger: ReturnType ): Promise { try { - const response = await fetch(`https://graph.microsoft.com/v1.0/me/mailFolders/${folderName}`, { - headers: { - Authorization: `Bearer ${accessToken}`, - 'Content-Type': 'application/json', - }, - }) + const response = await fetchWithRetry( + `https://graph.microsoft.com/v1.0/me/mailFolders/${folderName}`, + { + headers: { + Authorization: `Bearer ${accessToken}`, + }, + } + ) if (!response.ok) { logger.warn( @@ -455,12 +488,11 @@ async function downloadOutlookAttachments( const attachments: OutlookAttachment[] = [] try { - const response = await fetch( + const response = await fetchWithRetry( `https://graph.microsoft.com/v1.0/me/messages/${messageId}/attachments`, { headers: { Authorization: `Bearer ${accessToken}`, - 'Content-Type': 'application/json', }, } ) @@ -511,14 +543,17 @@ async function markOutlookEmailAsRead( logger: ReturnType ) { try { - const response = await fetch(`https://graph.microsoft.com/v1.0/me/messages/${messageId}`, { - method: 'PATCH', - headers: { - Authorization: `Bearer ${accessToken}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ isRead: true }), - }) + const response = await fetchWithRetry( + `https://graph.microsoft.com/v1.0/me/messages/${messageId}`, + { + method: 'PATCH', + headers: { + Authorization: `Bearer ${accessToken}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ isRead: true }), + } + ) if (!response.ok) { logger.error( diff --git a/apps/sim/lib/webhooks/polling/rss.ts b/apps/sim/lib/webhooks/polling/rss.ts index 31044fc8924..5f52ad8afdc 100644 --- a/apps/sim/lib/webhooks/polling/rss.ts +++ b/apps/sim/lib/webhooks/polling/rss.ts @@ -12,7 +12,7 @@ import { } from '@/lib/webhooks/polling/utils' import { processPolledWebhookEvent } from '@/lib/webhooks/processor' -const MAX_GUIDS_TO_TRACK = 100 +const MAX_GUIDS_TO_TRACK = 500 interface RssWebhookConfig { feedUrl: string @@ -87,10 +87,15 @@ export const rssPollingHandler: PollingProviderHandler = { } const now = new Date() - const { feed, items: newItems } = await fetchNewRssItems(config, requestId, logger) + const { + feed, + items: newItems, + etag, + lastModified, + } = await fetchNewRssItems(config, requestId, logger) if (!newItems.length) { - await updateRssState(webhookId, now.toISOString(), [], config, logger) + await updateRssState(webhookId, now.toISOString(), [], config, logger, etag, lastModified) await markWebhookSuccess(webhookId, logger) logger.info(`[${requestId}] No new items found for webhook ${webhookId}`) return 'success' @@ -108,10 +113,23 @@ export const rssPollingHandler: PollingProviderHandler = { ) const newGuids = newItems - .map((item) => item.guid || item.link || '') + .map( + (item) => + item.guid || + item.link || + (item.title && item.pubDate ? `${item.title}-${item.pubDate}` : '') + ) .filter((guid) => guid.length > 0) - await updateRssState(webhookId, now.toISOString(), newGuids, config, logger) + await updateRssState( + webhookId, + now.toISOString(), + newGuids, + config, + logger, + etag, + lastModified + ) if (failedCount > 0 && processedCount === 0) { await markWebhookFailed(webhookId, logger) @@ -139,7 +157,9 @@ async function updateRssState( timestamp: string, newGuids: string[], config: RssWebhookConfig, - logger: ReturnType + logger: ReturnType, + etag?: string, + lastModified?: string ) { const existingGuids = config.lastSeenGuids || [] const allGuids = [...newGuids, ...existingGuids].slice(0, MAX_GUIDS_TO_TRACK) @@ -149,6 +169,8 @@ async function updateRssState( { lastCheckedTimestamp: timestamp, lastSeenGuids: allGuids, + ...(etag !== undefined ? { etag } : {}), + ...(lastModified !== undefined ? { lastModified } : {}), }, logger ) @@ -158,7 +180,7 @@ async function fetchNewRssItems( config: RssWebhookConfig, requestId: string, logger: ReturnType -): Promise<{ feed: RssFeed; items: RssItem[] }> { +): Promise<{ feed: RssFeed; items: RssItem[]; etag?: string; lastModified?: string }> { try { const urlValidation = await validateUrlWithDNS(config.feedUrl, 'feedUrl') if (!urlValidation.isValid) { @@ -166,24 +188,45 @@ async function fetchNewRssItems( throw new Error(`Invalid RSS feed URL: ${urlValidation.error}`) } + const headers: Record = { + 'User-Agent': 'Sim/1.0 RSS Poller', + Accept: 'application/rss+xml, application/xml, text/xml, */*', + } + if (config.etag) { + headers['If-None-Match'] = config.etag + } + if (config.lastModified) { + headers['If-Modified-Since'] = config.lastModified + } + const response = await secureFetchWithPinnedIP(config.feedUrl, urlValidation.resolvedIP!, { - headers: { - 'User-Agent': 'Sim/1.0 RSS Poller', - Accept: 'application/rss+xml, application/xml, text/xml, */*', - }, + headers, timeout: 30000, }) + if (response.status === 304) { + logger.info(`[${requestId}] RSS feed not modified (304) for ${config.feedUrl}`) + return { + feed: { items: [] } as RssFeed, + items: [], + etag: response.headers.get('etag') ?? config.etag, + lastModified: response.headers.get('last-modified') ?? config.lastModified, + } + } + if (!response.ok) { await response.text().catch(() => {}) throw new Error(`Failed to fetch RSS feed: ${response.status} ${response.statusText}`) } + const newEtag = response.headers.get('etag') ?? undefined + const newLastModified = response.headers.get('last-modified') ?? undefined + const xmlContent = await response.text() const feed = await parser.parseString(xmlContent) if (!feed.items || !feed.items.length) { - return { feed: feed as RssFeed, items: [] } + return { feed: feed as RssFeed, items: [], etag: newEtag, lastModified: newLastModified } } const lastCheckedTime = config.lastCheckedTimestamp @@ -192,7 +235,10 @@ async function fetchNewRssItems( const lastSeenGuids = new Set(config.lastSeenGuids || []) const newItems = feed.items.filter((item) => { - const itemGuid = item.guid || item.link || '' + const itemGuid = + item.guid || + item.link || + (item.title && item.pubDate ? `${item.title}-${item.pubDate}` : '') if (itemGuid && lastSeenGuids.has(itemGuid)) { return false @@ -220,7 +266,12 @@ async function fetchNewRssItems( `[${requestId}] Found ${newItems.length} new items (processing ${limitedItems.length})` ) - return { feed: feed as RssFeed, items: limitedItems as RssItem[] } + return { + feed: feed as RssFeed, + items: limitedItems as RssItem[], + etag: newEtag, + lastModified: newLastModified, + } } catch (error) { const errorMessage = error instanceof Error ? error.message : 'Unknown error' logger.error(`[${requestId}] Error fetching RSS feed:`, errorMessage) @@ -241,7 +292,17 @@ async function processRssItems( for (const item of items) { try { - const itemGuid = item.guid || item.link || `${item.title}-${item.pubDate}` + const itemGuid = + item.guid || + item.link || + (item.title && item.pubDate ? `${item.title}-${item.pubDate}` : '') + + if (!itemGuid) { + logger.warn( + `[${requestId}] Skipping RSS item with no identifiable GUID for webhook ${webhookData.id}` + ) + continue + } await pollingIdempotency.executeWithIdempotency( 'rss',