Skip to content

improvement(hitl): streaming, async support + update docs #4058

Merged
icecrasher321 merged 9 commits intostagingfrom
improvement/hitl-streaming
Apr 9, 2026
Merged

improvement(hitl): streaming, async support + update docs #4058
icecrasher321 merged 9 commits intostagingfrom
improvement/hitl-streaming

Conversation

@icecrasher321
Copy link
Copy Markdown
Collaborator

Summary

Respect headers passed in via initial execute calls and update docs to match expected behavior and add relevant endpoints.

Type of Change

  • Bug fix
  • Other: UX Improvement

Testing

Tested manually. Will test async support on staging environment.

Checklist

  • Code follows project style guidelines
  • Self-reviewed my changes
  • Tests added/updated and passing
  • No new warnings introduced
  • I confirm that I have read and agree to the terms outlined in the Contributor License Agreement (CLA)

@cursor
Copy link
Copy Markdown

cursor bot commented Apr 8, 2026

PR Summary

Medium Risk
Changes workflow resume behavior to branch into sync, SSE streaming, or queued async execution based on stored execution snapshots, adding new background job plumbing; regressions could impact HITL resumption and job dispatch reliability.

Overview
Human-in-the-loop resume now mirrors the original execution mode. The POST /api/resume/{workflowId}/{executionId}/{contextId} route reads executionMode (and selectedOutputs) from the stored execution snapshot and returns either a full JSON result (sync), an SSE stream (stream), or 202 after dispatching a background resume job (async).

Async resume execution is now a first-class job type. Adds resume-execution support across async backends (Trigger.dev and BullMQ), including a new background task (background/resume-execution.ts), a BullMQ worker processor, queue registration, and lookup support.

Streaming infrastructure was generalized and pause-aware. createStreamingResponse now takes an executeFn callback (instead of workflow/input/user), tracks completed blocks to avoid emitting partial selected outputs, and emits a final SSE payload that includes status: "paused" (with full pause output) when execution pauses.

API routes and UI were updated to use the new streaming executor interface. Chat/form/workflow execute endpoints now pass executeFn that calls executeWorkflow with explicit executionMode, tests were adjusted accordingly, and the output selection UI excludes human_in_the_loop blocks. Docs were updated to reflect new resume input access (<blockId.fieldName>), _resume payloads, execution-mode behavior, and polling guidance.

Reviewed by Cursor Bugbot for commit 8075708. Configure here.

@vercel
Copy link
Copy Markdown

vercel bot commented Apr 8, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

1 Skipped Deployment
Project Deployment Actions Updated (UTC)
docs Skipped Skipped Apr 9, 2026 0:24am

Request Review

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps bot commented Apr 8, 2026

Greptile Summary

This PR adds streaming and async support to the HITL resume endpoint by storing the original executionMode in the execution snapshot and reading it back on resume, so API callers get the same response type (SSE stream, sync JSON, or 202 async) they used in the initial execute call. It also refactors createStreamingResponse to accept an executeFn callback instead of bundling workflow/input/executingUserId inside the utility, making the function reusable across both initial executions and resumes.

Confidence Score: 5/5

Safe to merge — all new execution paths are gated behind correct isApiCaller + executionMode checks, with graceful fallbacks.

The only finding is a P2 style suggestion (missing abortSignal propagation in streaming resume). All critical paths — validation, snapshot serialization, queue registration, worker registration — are implemented correctly and symmetrically with existing patterns. Backwards compatibility is maintained via the ?? 'sync' default for old snapshots.

apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts — streaming resume timeout/abort handling

Vulnerabilities

No security concerns identified. The resume endpoint continues to validate workflow access via validateWorkflowAccess and preprocessExecution before branching into any of the new execution modes. Async dispatch re-uses the same authenticated userId and workspaceId already obtained during validation.

Important Files Changed

Filename Overview
apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts Core change: reads executionMode from the stored snapshot and dispatches streaming/sync/async resume accordingly. Logic is correct; minor gap is that abortSignal is not forwarded to startResumeExecution in the stream branch.
apps/sim/lib/workflows/streaming/streaming.ts Refactored to accept executeFn callback, making the streaming utility reusable for resume; adds completedBlockIds tracking to filter output correctly on pause/resume.
apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts Adds optional sendEvent, onStream, and onBlockComplete callbacks to StartResumeExecutionArgs so callers can inject streaming hooks; adds getPausedExecutionById helper for background job use.
apps/sim/background/resume-execution.ts New TriggerDev background task for async resume; fetches paused execution and delegates to PauseResumeManager.startResumeExecution.
apps/sim/worker/processors/resume.ts New BullMQ processor for resume-execution queue; mirrors the pattern of other processors.
apps/sim/executor/execution/snapshot-serializer.ts Persists executionMode into the pause snapshot so it survives across the pause/resume boundary.

Sequence Diagram

sequenceDiagram
    participant Client
    participant ExecuteRoute as POST /execute
    participant Snapshot as ExecutionSnapshot
    participant HITL as HITL Block
    participant ResumeRoute as POST /resume
    participant Manager as PauseResumeManager

    Client->>ExecuteRoute: POST (sync/stream/async header)
    ExecuteRoute->>Snapshot: store executionMode in metadata
    ExecuteRoute->>HITL: workflow reaches HITL block
    HITL-->>ExecuteRoute: pause, serialize snapshot
    ExecuteRoute-->>Client: paused response + _resume URLs

    Client->>ResumeRoute: POST /resume/.../contextId
    ResumeRoute->>Snapshot: getStoredSnapshotConfig(executionMode)
    alt executionMode == stream
        ResumeRoute->>Manager: createStreamingResponse(executeFn)
        Manager-->>Client: SSE stream (blocks after HITL)
    else executionMode == sync (default)
        ResumeRoute->>Manager: startResumeExecution()
        Manager-->>Client: JSON result
    else executionMode == async
        ResumeRoute->>Manager: enqueue resume-execution job
        Manager-->>Client: 202 Accepted
    end
Loading

Reviews (2): Last reviewed commit: "fix tests" | Re-trigger Greptile

@icecrasher321
Copy link
Copy Markdown
Collaborator Author

bugbot run

@icecrasher321
Copy link
Copy Markdown
Collaborator Author

bugbot run

@icecrasher321
Copy link
Copy Markdown
Collaborator Author

bugbot run

@icecrasher321
Copy link
Copy Markdown
Collaborator Author

bugbot run

Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Bugbot reviewed your changes and found no new issues!

Comment @cursor review or bugbot run to trigger another review on this PR

Reviewed by Cursor Bugbot for commit 8075708. Configure here.

@icecrasher321 icecrasher321 merged commit 3e85218 into staging Apr 9, 2026
12 checks passed
@waleedlatif1 waleedlatif1 deleted the improvement/hitl-streaming branch April 9, 2026 07:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant