feat(deploy): stage streamed payloads to a temp file for replication#536
feat(deploy): stage streamed payloads to a temp file for replication#536kriszyp wants to merge 10 commits into
Conversation
When a deploy_component is replicated to peers, the origin needs to keep a copy of the payload so it can re-stream it to each peer. The payload Readable is consumed once by local extraction; without staging, peer replication would have nothing to send. deployComponent now stages the payload to a temp file before extraction when there are nodes to replicate to and the payload is a Readable (non-package, streaming deploys). Local extraction re-sources from the staged file, which keeps backpressure behavior identical to the non-replicated case. The temp file is cleaned up in a finally block after replication completes (success, failure, or skipped). Local-only deploys keep their zero-disk-copy property; package-identifier deploys are unaffected. This is the core-side staging needed for harper-pro's direct-HTTPS deploy relay (slice 3b, addressing #524). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
2 blockers unchanged from the prior run. 1. Staged payload leaks on load-phase failureFile: 2.
|
…into feat/deploy-component-payload-staging
…into feat/deploy-component-payload-staging
| } catch (err) { | ||
| progress?.emit('phase', { phase: 'extract_or_install', status: 'error', message: err?.message ?? String(err) }); | ||
| // Clean up the staged payload on early failure so we don't leak disk. | ||
| stagedPayload?.cleanup().catch(() => {}); |
There was a problem hiding this comment.
This catches prepareApplication failures, but there's a gap between this catch and the replicateOperation finally: the load-phase throw lastError path (the if (lastError) throw lastError block that runs when the pseudo-resource load fails, after line ~452) is not covered by either cleanup handler. If a multi-node streaming deploy reaches the load phase and the component fails to load, stagedPayload leaks.
| stagedPayload?.cleanup().catch(() => {}); | |
| stagedPayload?.cleanup().catch(() => {}); |
Same one-liner before throw lastError in the load-error branch (or restructure with a single outer try/finally wrapping both phases).
…into feat/deploy-component-payload-staging # Conflicts: # components/operations.js
…into feat/deploy-component-payload-staging
| ): Promise<{ path: string; cleanup: () => Promise<void> }> { | ||
| const dir = await mkdtemp(join(tmpdir(), `harper-deploy-${sanitize(projectName)}-`)); | ||
| const path = join(dir, 'payload.tar.gz'); | ||
| await pipeline(source, createWriteStream(path)); |
There was a problem hiding this comment.
Temp dir leaks on pipeline failure (still open from prior run)
mkdtemp already created dir before pipeline runs. If pipeline rejects (source stream error), the function throws without returning a cleanup handle — the caller has no way to delete the orphaned directory. The error-propagation test asserts the rejection propagates but does not assert the temp dir is removed.
| await pipeline(source, createWriteStream(path)); | |
| const path = join(dir, 'payload.tar.gz'); | |
| try { | |
| await pipeline(source, createWriteStream(path)); | |
| } catch (err) { | |
| await rm(dir, { recursive: true, force: true }).catch(() => {}); | |
| throw err; | |
| } |
Also add a test assertion: after stagePayloadToTempFile rejects, stat(path.dirname(...)) should throw ENOENT.
When useSse=true, httpRequest uses streamResponse mode and returns a raw IncomingMessage. The previous fallback branch read response.body which is undefined in that mode (e.g. on a 401 auth failure). Drain the stream chunks manually so error bodies are captured correctly. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| ): Promise<{ path: string; cleanup: () => Promise<void> }> { | ||
| const dir = await mkdtemp(join(tmpdir(), `harper-deploy-${sanitize(projectName)}-`)); | ||
| const path = join(dir, 'payload.tar.gz'); | ||
| await pipeline(source, createWriteStream(path)); |
There was a problem hiding this comment.
Temp dir leaks on pipeline failure (carry from prior run)
mkdtemp already created dir before pipeline runs. If pipeline rejects, the caller never receives a cleanup handle — the directory is orphaned. The error-propagation test asserts the rejection propagates but doesn't assert the temp dir is removed afterward.
| await pipeline(source, createWriteStream(path)); | |
| try { | |
| await pipeline(source, createWriteStream(path)); | |
| } catch (err) { | |
| await rm(dir, { recursive: true, force: true }).catch(() => {}); | |
| throw err; | |
| } |
…, error stringify - parseSSE: use StringDecoder so multi-byte chars split across chunk boundaries decode correctly instead of corrupting (e.g. box-drawing chars, emojis) - createSSEResponseStream: gate writes behind an active flag; listen to stream close/end to unsubscribe the emitter immediately on client disconnect so the subscription doesn't linger for the full operation lifetime - cliOperations SSE error: JSON.stringify fallback when sseError has no .message so the error renders as JSON rather than [object Object] Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| ): Promise<{ path: string; cleanup: () => Promise<void> }> { | ||
| const dir = await mkdtemp(join(tmpdir(), `harper-deploy-${sanitize(projectName)}-`)); | ||
| const path = join(dir, 'payload.tar.gz'); | ||
| await pipeline(source, createWriteStream(path)); |
There was a problem hiding this comment.
Temp dir leaks on pipeline failure (carry from prior run — still open)
mkdtemp already created dir before pipeline runs. If pipeline rejects (source stream error), the function throws without returning a cleanup handle — the caller has no way to delete the orphaned directory.
| await pipeline(source, createWriteStream(path)); | |
| const path = join(dir, 'payload.tar.gz'); | |
| try { | |
| await pipeline(source, createWriteStream(path)); | |
| } catch (err) { | |
| await rm(dir, { recursive: true, force: true }).catch(() => {}); | |
| throw err; | |
| } |
| ): Promise<{ path: string; cleanup: () => Promise<void> }> { | ||
| const dir = await mkdtemp(join(tmpdir(), `harper-deploy-${sanitize(projectName)}-`)); | ||
| const path = join(dir, 'payload.tar.gz'); | ||
| await pipeline(source, createWriteStream(path)); |
There was a problem hiding this comment.
Temp dir leaks on pipeline failure (carry from prior run — still open)
mkdtemp already created dir before pipeline runs. If pipeline rejects (source stream error), the function throws without returning a cleanup handle — the caller has no way to delete the orphaned directory.
| await pipeline(source, createWriteStream(path)); | |
| const path = join(dir, 'payload.tar.gz'); | |
| try { | |
| await pipeline(source, createWriteStream(path)); | |
| } catch (err) { | |
| await rm(dir, { recursive: true, force: true }).catch(() => {}); | |
| throw err; | |
| } |
Per the SSE spec, if a data value contains newlines each line must be emitted as a separate data: field. A single data: field with embedded newlines is not valid. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| ): Promise<{ path: string; cleanup: () => Promise<void> }> { | ||
| const dir = await mkdtemp(join(tmpdir(), `harper-deploy-${sanitize(projectName)}-`)); | ||
| const path = join(dir, 'payload.tar.gz'); | ||
| await pipeline(source, createWriteStream(path)); |
There was a problem hiding this comment.
Still leaks dir on pipeline failure. mkdtemp already ran at line 28; if pipeline throws here the function exits without returning a cleanup handle, so the caller can never delete the orphaned temp directory.
| await pipeline(source, createWriteStream(path)); | |
| try { | |
| await pipeline(source, createWriteStream(path)); | |
| } catch (err) { | |
| await rm(dir, { recursive: true, force: true }).catch(() => {}); | |
| throw err; | |
| } |
|
Pausing this PR — converting to draft. This work is being rebuilt on top of the design in #641: a replicated In the new design:
This work resumes as part of Slice B in #641 once Slice A (table + blob-backed multipart receive) lands. — Claude |
Summary
Origin-side payload staging needed for the direct-HTTPS peer relay (harper-pro slice 3b). When a `deploy_component` request will be replicated to peers, the origin now writes the streamed payload to a temp file before local extraction so it can re-stream the same bytes to each peer afterwards. Without this, the payload Readable is consumed once by local extract and gone by the time replication runs.
Third slice of #524.
Where to look
Backward compatibility
Test plan
🤖 Generated with Claude Code