Skip to content

feat(deploy): stage streamed payloads to a temp file for replication#536

Draft
kriszyp wants to merge 10 commits into
feat/deploy-component-progressfrom
feat/deploy-component-payload-staging
Draft

feat(deploy): stage streamed payloads to a temp file for replication#536
kriszyp wants to merge 10 commits into
feat/deploy-component-progressfrom
feat/deploy-component-payload-staging

Conversation

@kriszyp
Copy link
Copy Markdown
Member

@kriszyp kriszyp commented May 14, 2026

Stacked on #531 — please review #530 and #531 first; this PR will be retargeted to `main` after both land.

Summary

Origin-side payload staging needed for the direct-HTTPS peer relay (harper-pro slice 3b). When a `deploy_component` request will be replicated to peers, the origin now writes the streamed payload to a temp file before local extraction so it can re-stream the same bytes to each peer afterwards. Without this, the payload Readable is consumed once by local extract and gone by the time replication runs.

Third slice of #524.

Where to look

  • `components/payloadStaging.ts` — single-purpose utility that pipelines a Readable into a temp `payload.tar.gz` and returns a cleanup callback. The trade-off comment in the file header explains why we write-then-read instead of teeing in flight (simpler code, identical backpressure for extraction; disk speed isn't the bottleneck).
  • `components/operations.js` — staging gate in `deployComponent`. Skipped when:
    • the deploy is package-identifier-based (`req.package`, no `payload` stream),
    • `req.replicated === false` (the operation is itself a replicated deploy that the peer received, so no further fan-out),
    • or there are no peers (`server.nodes.length === 0` — single-node deploys keep their zero-disk-copy property).
  • Cleanup runs in a `finally` after `server.replication.replicateOperation(req)`, plus a defensive cleanup if `prepareApplication` fails before replication.

Backward compatibility

  • The staging is invisible to the operations API contract: `req.payload` is replaced internally with a `createReadStream` over the temp file before the `Application` is constructed, so every downstream code path (`Application.extractApplication`, the SSE phase emitter, anything reading `application.payload`) keeps seeing a `Readable` exactly as before.
  • Single-node and package-identifier deploys: zero change in behavior, no temp file written.

Test plan

  • `npx mocha unitTests/components/payloadStaging.test.js` — 5/5 pass (round-trip contents, cleanup removes file+dir, double-cleanup is safe, path-traversal sanitization keeps the staged dir under tmpdir, source-stream errors propagate).
  • `npx mocha unitTests/server/fastifyRoutes/operations.test.js` — 21/21 pass (existing deployComponent tests; the staging branch is only entered when nodes are present, which these tests don't simulate).
  • `npx mocha unitTests/components/packageComponent.test.js unitTests/server/serverHelpers/multipartParser.test.js unitTests/bin/multipartBuilder.test.js unitTests/server/serverHelpers/progressEmitter.test.js unitTests/bin/sseConsumer.test.js` — 32/32 pass (slice 1 + slice 2 unaffected).
  • End-to-end replicated deploy — exercised in harper-pro's slice 3b integration test, not here.

🤖 Generated with Claude Code

When a deploy_component is replicated to peers, the origin needs to keep
a copy of the payload so it can re-stream it to each peer. The payload
Readable is consumed once by local extraction; without staging, peer
replication would have nothing to send.

deployComponent now stages the payload to a temp file before extraction
when there are nodes to replicate to and the payload is a Readable
(non-package, streaming deploys). Local extraction re-sources from the
staged file, which keeps backpressure behavior identical to the
non-replicated case. The temp file is cleaned up in a finally block
after replication completes (success, failure, or skipped).

Local-only deploys keep their zero-disk-copy property; package-identifier
deploys are unaffected.

This is the core-side staging needed for harper-pro's direct-HTTPS deploy
relay (slice 3b, addressing #524).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@kriszyp kriszyp requested a review from a team as a code owner May 14, 2026 15:15
@kriszyp kriszyp marked this pull request as draft May 14, 2026 15:18
@claude
Copy link
Copy Markdown
Contributor

claude Bot commented May 14, 2026

2 blockers unchanged from the prior run.

1. Staged payload leaks on load-phase failure

File: components/operations.js (load-phase throw lastError)
What: The throw lastError that fires after loadComponent exits deployComponent before the replicateOperation try/finally block — the only place stagedPayload?.cleanup() is called. The prepareApplication catch block doesn't cover this code path.
Why it matters: A multi-node streaming deploy that fails the load check accumulates unbounded temp files.
Suggested fix: Add stagedPayload?.cleanup().catch(() => {}) immediately before throw lastError in the load-error branch.

2. stagePayloadToTempFile leaks temp dir on pipeline failure

File: components/payloadStaging.ts:30 (inline comment posted)
What: mkdtemp creates dir before pipeline runs. If pipeline rejects, the function throws without returning a cleanup handle — the caller has no way to delete the orphaned directory. The new test covers error propagation but not the resulting disk leak.
Suggested fix:

try {
    await pipeline(source, createWriteStream(path));
} catch (err) {
    await rm(dir, { recursive: true, force: true }).catch(() => {});
    throw err;
}

Comment thread components/operations.js
} catch (err) {
progress?.emit('phase', { phase: 'extract_or_install', status: 'error', message: err?.message ?? String(err) });
// Clean up the staged payload on early failure so we don't leak disk.
stagedPayload?.cleanup().catch(() => {});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This catches prepareApplication failures, but there's a gap between this catch and the replicateOperation finally: the load-phase throw lastError path (the if (lastError) throw lastError block that runs when the pseudo-resource load fails, after line ~452) is not covered by either cleanup handler. If a multi-node streaming deploy reaches the load phase and the component fails to load, stagedPayload leaks.

Suggested change
stagedPayload?.cleanup().catch(() => {});
stagedPayload?.cleanup().catch(() => {});

Same one-liner before throw lastError in the load-error branch (or restructure with a single outer try/finally wrapping both phases).

kriszyp and others added 3 commits May 19, 2026 08:04
…into feat/deploy-component-payload-staging

# Conflicts:
#	components/operations.js
): Promise<{ path: string; cleanup: () => Promise<void> }> {
const dir = await mkdtemp(join(tmpdir(), `harper-deploy-${sanitize(projectName)}-`));
const path = join(dir, 'payload.tar.gz');
await pipeline(source, createWriteStream(path));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temp dir leaks on pipeline failure (still open from prior run)

mkdtemp already created dir before pipeline runs. If pipeline rejects (source stream error), the function throws without returning a cleanup handle — the caller has no way to delete the orphaned directory. The error-propagation test asserts the rejection propagates but does not assert the temp dir is removed.

Suggested change
await pipeline(source, createWriteStream(path));
const path = join(dir, 'payload.tar.gz');
try {
await pipeline(source, createWriteStream(path));
} catch (err) {
await rm(dir, { recursive: true, force: true }).catch(() => {});
throw err;
}

Also add a test assertion: after stagePayloadToTempFile rejects, stat(path.dirname(...)) should throw ENOENT.

When useSse=true, httpRequest uses streamResponse mode and returns a raw
IncomingMessage. The previous fallback branch read response.body which is
undefined in that mode (e.g. on a 401 auth failure). Drain the stream
chunks manually so error bodies are captured correctly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
): Promise<{ path: string; cleanup: () => Promise<void> }> {
const dir = await mkdtemp(join(tmpdir(), `harper-deploy-${sanitize(projectName)}-`));
const path = join(dir, 'payload.tar.gz');
await pipeline(source, createWriteStream(path));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temp dir leaks on pipeline failure (carry from prior run)

mkdtemp already created dir before pipeline runs. If pipeline rejects, the caller never receives a cleanup handle — the directory is orphaned. The error-propagation test asserts the rejection propagates but doesn't assert the temp dir is removed afterward.

Suggested change
await pipeline(source, createWriteStream(path));
try {
await pipeline(source, createWriteStream(path));
} catch (err) {
await rm(dir, { recursive: true, force: true }).catch(() => {});
throw err;
}

kriszyp and others added 2 commits May 20, 2026 03:15
…, error stringify

- parseSSE: use StringDecoder so multi-byte chars split across chunk boundaries
  decode correctly instead of corrupting (e.g. box-drawing chars, emojis)
- createSSEResponseStream: gate writes behind an active flag; listen to
  stream close/end to unsubscribe the emitter immediately on client disconnect
  so the subscription doesn't linger for the full operation lifetime
- cliOperations SSE error: JSON.stringify fallback when sseError has no
  .message so the error renders as JSON rather than [object Object]

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
): Promise<{ path: string; cleanup: () => Promise<void> }> {
const dir = await mkdtemp(join(tmpdir(), `harper-deploy-${sanitize(projectName)}-`));
const path = join(dir, 'payload.tar.gz');
await pipeline(source, createWriteStream(path));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temp dir leaks on pipeline failure (carry from prior run — still open)

mkdtemp already created dir before pipeline runs. If pipeline rejects (source stream error), the function throws without returning a cleanup handle — the caller has no way to delete the orphaned directory.

Suggested change
await pipeline(source, createWriteStream(path));
const path = join(dir, 'payload.tar.gz');
try {
await pipeline(source, createWriteStream(path));
} catch (err) {
await rm(dir, { recursive: true, force: true }).catch(() => {});
throw err;
}

): Promise<{ path: string; cleanup: () => Promise<void> }> {
const dir = await mkdtemp(join(tmpdir(), `harper-deploy-${sanitize(projectName)}-`));
const path = join(dir, 'payload.tar.gz');
await pipeline(source, createWriteStream(path));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temp dir leaks on pipeline failure (carry from prior run — still open)

mkdtemp already created dir before pipeline runs. If pipeline rejects (source stream error), the function throws without returning a cleanup handle — the caller has no way to delete the orphaned directory.

Suggested change
await pipeline(source, createWriteStream(path));
const path = join(dir, 'payload.tar.gz');
try {
await pipeline(source, createWriteStream(path));
} catch (err) {
await rm(dir, { recursive: true, force: true }).catch(() => {});
throw err;
}

Per the SSE spec, if a data value contains newlines each line must be
emitted as a separate data: field. A single data: field with embedded
newlines is not valid.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
): Promise<{ path: string; cleanup: () => Promise<void> }> {
const dir = await mkdtemp(join(tmpdir(), `harper-deploy-${sanitize(projectName)}-`));
const path = join(dir, 'payload.tar.gz');
await pipeline(source, createWriteStream(path));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still leaks dir on pipeline failure. mkdtemp already ran at line 28; if pipeline throws here the function exits without returning a cleanup handle, so the caller can never delete the orphaned temp directory.

Suggested change
await pipeline(source, createWriteStream(path));
try {
await pipeline(source, createWriteStream(path));
} catch (err) {
await rm(dir, { recursive: true, force: true }).catch(() => {});
throw err;
}

@kriszyp
Copy link
Copy Markdown
Member Author

kriszyp commented May 20, 2026

Pausing this PR — converting to draft.

This work is being rebuilt on top of the design in #641: a replicated hdb_deployment system table that owns the deploy lifecycle, audit trail, and (because Harper's BLOB_CHUNK replication already supports chunked, back-pressured blob transfer) the payload delivery channel.

In the new design:

  • The ProgressEmitter from this PR becomes one of two subscribers — the second being a DeploymentRecorder that writes the persistent record. SSE remains the live channel for the CLI; get_deployment becomes content-negotiated to serve the same stream to Studio.
  • _stagedPayloadPath (feat(deploy): stage streamed payloads to a temp file for replication #536) and the direct-HTTPS peer relay (harper-pro#146) both go away — peers read the payload directly from the replicated blob attribute on the row.

This work resumes as part of Slice B in #641 once Slice A (table + blob-backed multipart receive) lands.

— Claude

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant