diff --git a/CHANGELOG.md b/CHANGELOG.md index cfb88efc..a400d21a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,37 @@ project uses [Semantic Versioning](https://semver.org/). Detailed per-release notes are on the [GitHub Releases page](https://github.com/TeoSlayer/pilotprotocol/releases). +## [Unreleased] + +Reliable P2P data transfer across NAT. Tag intentionally held for review. + +### Added +- **Chunked, ACK'd, resumable file transfer (`TypeFileStream`).** `pilotctl + send-file` now streams files in 48 KiB chunks with per-chunk ACKs, an + end-to-end SHA-256 integrity check, and automatic resume from the last + contiguous byte after an interrupted transfer. Replaces the single + atomic frame that stalled large transfers on any non-trivial path. + Backward compatible: falls back to the legacy `TypeFile` path when the + receiver is too old to answer the stream handshake. `--no-stream` forces + the legacy path. +- **`pilotctl prefer-direct `** and **`send-file --prefer-direct`** — + drop a peer's tunnel + cached resolution so the next dial re-runs the + full resolve + NAT hole-punch flow and prefers the direct path. +- `send-file` reports `transport`, `sha256`, and `throughput_mbps`; adds + `--timeout`. + +### Fixed +- **NAT traversal now actually establishes (and holds) a direct path.** The + relay→direct upgrade sent a one-way probe that a stateful NAT/firewall + always dropped, so peers stayed on the beacon relay indefinitely. The + daemon now runs a beacon-coordinated hole-punch and immediately probes + the peer's real address to promote the path, retrying every 15 s (was + 5 min). Result on the dual-NAT rig: relay→direct in ~8 s, held through a + 50 MB transfer, ~7–15× the relay throughput. +- **Dual-NAT key-exchange convergence.** Key exchange is now sent over both + the direct and relay paths, so two NAT'd peers reconverge in ~1 RTT + instead of waiting 28 s–3 min for blackhole detection. + ## [1.11.2] - 2026-06-15 ### Added diff --git a/cmd/pilotctl/main.go b/cmd/pilotctl/main.go index 6726e298..193f8abe 100644 --- a/cmd/pilotctl/main.go +++ b/cmd/pilotctl/main.go @@ -231,6 +231,20 @@ func formatBytes(b uint64) string { } } +// fmtCount renders a large count compactly: 1234 → "1.2K", 5400000 → "5.4M". +func fmtCount(n uint64) string { + switch { + case n >= 1_000_000_000: + return fmt.Sprintf("%.1fB", float64(n)/1e9) + case n >= 1_000_000: + return fmt.Sprintf("%.1fM", float64(n)/1e6) + case n >= 10_000: + return fmt.Sprintf("%.1fK", float64(n)/1e3) + default: + return fmt.Sprintf("%d", n) + } +} + // --- Env / config helpers --- func getSocket() string { @@ -738,7 +752,7 @@ Send echo packets and report round-trip latency. Flags: --count number of packets to send (default: 4) - --timeout per-ping deadline (default: 30s) + --timeout per-ping deadline (default: 5s) --reuse-conn reuse tunnel connection across packets --trace print per-step timing breakdown to stderr @@ -809,42 +823,56 @@ See also: pilotctl approve, pilotctl pending, pilotctl trust `, "peers": `Usage: pilotctl peers [flags] -List currently connected peers and their connection quality. +Summarize currently connected peers. Shows a one-line breakdown +(encrypted+authenticated / relay / direct) and then only the exceptions — +peers that are unencrypted or unauthenticated. -Columns: NODE ID, ENCRYPTED, AUTH, PATH (direct/relay) - PATH=relay means this peer is behind symmetric NAT and traffic goes through - the beacon. It's normal but adds ~50-150ms latency vs a direct path. +PATH=relay means a peer is behind symmetric NAT and traffic goes through +the beacon. It's normal but adds ~50-150ms latency vs a direct path. Flags: + --all full peer table instead of exceptions only + --limit rows to show (default 20, 0 = all) --search filter by node ID substring See also: pilotctl ping — measure RTT to a specific peer `, - "inbox": `Usage: pilotctl inbox [flags] + "inbox": `Usage: pilotctl inbox [read ] [flags] -Show messages received via send-message from other agents. +Show messages received from other agents — newest first, 10 by default. -Flags: - --clear delete all inbox messages after displaying them - --trace include relative age and byte count per message +Subcommands: + read print the full body of one message -Tip: use with send-message --wait to get a reply inline: - pilotctl send-message list-agents --data '/data {}' --wait +Flags: + --latest full body of the single newest message (after filters) + --limit how many to show (default 10, 0 = all) + --from only messages from this address or hostname + --since only messages newer than a duration (5m, 1h) or RFC3339 time + --full full bodies instead of one-line previews + --clear delete the matched messages (all if no filters) + --before with --clear: only delete messages older than this + +Agent patterns: + pilotctl --json inbox --latest # newest reply, full body + pilotctl --json inbox --from list-agents --limit 3 # last 3 from one peer + pilotctl inbox --clear --before 24h # keep today, purge older + +Tip: send-message --wait already returns the matching reply inline; the +inbox is for replies that arrive later or that you want to re-read. `, "info": `Usage: pilotctl info [flags] -Print full daemon state: address, hostname, uptime, peers, encryption, -beacon, active connections, traffic counters. - -Key fields: - Peers total / direct / relay breakdown - Encryption per-peer encrypted + authenticated counts - Beacon which beacon was picked for relay and NAT traversal - Handshakes pending approvals (run 'pilotctl pending' to act on them) - Traffic cumulative bytes and packet counts since start +Print full daemon state, grouped: + identity hostname, address, node ID, key fingerprint, email + network peers (encrypted/relay/direct), pending handshakes, + beacon, connections, ports + traffic cumulative bytes and packet counts since start + skills agent tools with the pilot skill installed -See also: pilotctl health — lightweight status check - pilotctl peers — per-peer detail +See also: pilotctl health — lightweight status check + pilotctl peers — per-peer detail + pilotctl connections — per-connection detail `, "health": `Usage: pilotctl health @@ -938,15 +966,20 @@ To reject: pilotctl reject [reason] Note: nodes in the embedded trusted-agents list are auto-approved on first contact. `, - "trust": `Usage: pilotctl trust + "trust": `Usage: pilotctl trust [flags] -List all nodes this daemon currently trusts, with mutual status and approval time. +List the nodes this daemon currently trusts, newest first (20 by default). -MUTUAL=yes means the other node also approved you — messages flow freely. -MUTUAL=no means you approved them but they haven't approved you yet, or vice versa. +Mutual trust is the norm and prints nothing; rows tagged "one-way" mean +you approved them but they haven't approved you yet, or vice versa. + +Flags: + --limit rows to show (default 20, 0 = all) + --search filter by node ID (or hostname when known) See also: pilotctl pending — incoming requests waiting for your approval pilotctl handshake — initiate trust with a new peer + pilotctl untrust — revoke trust with a node `, "trusted": `Usage: pilotctl trusted @@ -1060,11 +1093,14 @@ Sends FIN to the remote side. // Messaging "received": `Usage: pilotctl received [flags] -Show raw received messages (lower-level than inbox — includes all ports, -not just the data-exchange inbox port). +List files received via send-file (~/.pilot/received/), newest first. +Shows the 10 newest by default. Flags: - --clear delete all received messages after displaying them + --limit show at most N files (default: 10, 0 = all) + --since only files newer than a duration (5m, 1h) or timestamp + --clear delete the matched files + --before with --clear: only delete files older than `, "dgram": `Usage: pilotctl dgram --data @@ -1101,13 +1137,15 @@ Flags: --count number of entries to show (default: 5) --scope filter by scope tag (e.g. protocol, cli, networks) `, - "context": `Usage: pilotctl context + "context": `Usage: pilotctl context [command] Print machine-readable metadata for every command: name, description, argument templates, and return field names. Used by agent tools (Claude Code, OpenClaw, etc.) to auto-generate command invocations. -Output is a JSON object keyed by command name. +With no argument, prints the full catalog as a JSON object keyed by command +name. With a command name (e.g. 'pilotctl context send-message' or +'pilotctl context daemon start'), prints only that command's entry. `, "skills": `Usage: pilotctl skills [subcommand] @@ -1116,9 +1154,9 @@ Manage the SKILL.md files the daemon installs for each detected agent tool use pilotctl without manual setup. Subcommands: - status (default) show install state for each detected tool - paths print install paths only — one per line, shell-friendly - check run one reconcile pass right now (re-installs if missing/outdated) + status [--verbose] (default) per-tool install state; --verbose adds per-file detail + paths print install paths only — one per line, shell-friendly + check run one reconcile pass right now (re-installs if missing/outdated) The daemon reconciles skill files every 15 minutes automatically. `, @@ -1138,10 +1176,41 @@ Flags: Publish a message to a topic on a remote node. `, - "send-file": `Usage: pilotctl send-file + "send-file": `Usage: pilotctl send-file [--timeout ] [--prefer-direct] + +Send a file to a remote node via the data-exchange stream. Files are +capped at 256 MiB (the data-exchange frame ceiling) unless both daemons +have raised PILOT_DATAEXCHANGE_MAX_FRAME — see the dataexchange package. -Send a file to a remote node via the reliable data-exchange stream. -The receiver gets the filename and contents; an ACK is printed on success. +Flags: + --timeout give up if the receiver does not ACK within this + window (default 90s). Use a value comfortably + larger than (file size / expected throughput) + + receiver disk-flush time. On timeout the sender + exits with a non-zero code and a clear hint + instead of hanging until SO_KEEPALIVE trips + (~120s by default on the OS). + --prefer-direct drop the existing tunnel + sticky relay flag for + this peer before dialing, so the daemon retries + a direct UDP path instead of reusing the + beacon-mediated relay tunnel. Useful when ping + works but send-file hangs — typical sign of a + relay path that established once and got stuck. + Best-effort: if the peer is genuinely behind a + symmetric NAT the daemon will still fall back to + relay within the dial retry budget. + +What you see during a transfer (TTY only): + sending to self-rewriting elapsed line + (--json suppresses it for agent consumption) + +Reliability caveats (current implementation): + - File is transferred as a single atomic frame; on any error the + receiver may end up with no file or a partial one. + - No resume protocol — a dropped transfer means a full retry. + - End-to-end integrity is the tunnel's AEAD tag; there is no + application-level content hash. See + docs/PROPOSAL-reliable-file-transfer.md for the planned fix. `, // appstore: keep the help block here in lockstep with @@ -1229,7 +1298,7 @@ Management commands: pilotctl disconnect Mailbox: - pilotctl received [--clear] + pilotctl received [--limit ] [--since ] [--clear [--before ]] pilotctl inbox [--clear] Service Agents: @@ -1359,7 +1428,7 @@ dispatch: case "config": cmdConfig(cmdArgs) case "context": - cmdContext() + cmdContext(cmdArgs) // Daemon lifecycle case "daemon": @@ -1476,9 +1545,11 @@ dispatch: case "pending": cmdPending() case "trust": - cmdTrust() + cmdTrust(cmdArgs) case "trusted": cmdTrusted(cmdArgs) + case "prefer-direct": + cmdPreferDirect(cmdArgs) // Networks case "network": @@ -1738,13 +1809,47 @@ func cmdConfig(args []string) { if _, ok := cfg["socket"]; !ok { cfg["socket"] = getSocket() } - output(cfg) + if jsonOutput { + output(cfg) + return + } + + // Human mode: aligned key-value list, config_path pinned to the top + // (it's the answer to "which file am I editing?"). Keys are padded + // before styling so ANSI escapes don't break the column alignment. + keys := make([]string, 0, len(cfg)) + width := len("config_path") + for k := range cfg { + if k == "config_path" { + continue + } + keys = append(keys, k) + if len(k) > width { + width = len(k) + } + } + sort.Strings(keys) + + renderValue := func(v interface{}) string { + if s, ok := v.(string); ok { + return s + } + b, _ := json.Marshal(v) + return string(b) + } + fmt.Printf("%s %s\n", sDim(fmt.Sprintf("%-*s", width, "config_path")), sAccent(renderValue(cfg["config_path"]))) + for _, k := range keys { + fmt.Printf("%s %s\n", sDim(fmt.Sprintf("%-*s", width, k)), renderValue(cfg[k])) + } } // ===================== CONTEXT ===================== -func cmdContext() { - ctx := map[string]interface{}{ +// contextCatalog returns the full machine-readable command catalog that +// `pilotctl context` dumps. Kept as a function so cmdContext can also +// serve single-command lookups without holding the map at package scope. +func contextCatalog() map[string]interface{} { + return map[string]interface{}{ "version": "1.3", "note": "Core commands cover everything an agent needs. Use 'pilotctl extras ' for operator/admin operations. 'pilot-gateway' is a separate installed binary.", @@ -1909,7 +2014,7 @@ func cmdContext() { "send-message": map[string]interface{}{ "args": []string{"", "--data ", "[--type text|json|binary]", "[--count ]", "[--reuse-conn]"}, "description": "Send a typed message to a node via data exchange (port 1001). --count N sends N messages; --reuse-conn shares one connection across all N (env: PILOT_SENDMSG_REUSE_CONN=1). Default type: text", - "returns": "target, type, bytes, ack, reuse_conn", + "returns": "target, to, type, bytes, ack, reuse_conn", }, "send-file": map[string]interface{}{ "args": []string{"", ""}, @@ -1917,14 +2022,14 @@ func cmdContext() { "returns": "filename, bytes, destination, ack", }, "inbox": map[string]interface{}{ - "args": []string{"[--clear]"}, - "description": "List received messages (~/.pilot/inbox/). --clear to delete all", - "returns": "messages [{type, from, data, received_at}], total, dir", + "args": []string{"[read ]", "[--latest]", "[--limit ]", "[--from ]", "[--since ]", "[--full]", "[--clear [--before ]]"}, + "description": "List received messages newest-first (~/.pilot/inbox/). Default limit 10 with previews; --latest for the newest full body; read for one message", + "returns": "messages [{id, from, received_at, type, bytes, preview|data}], total, shown, dir", }, "received": map[string]interface{}{ - "args": []string{"[--clear]"}, - "description": "List received files (~/.pilot/received/). --clear to delete all", - "returns": "files [{name, bytes, modified, path}], total, dir", + "args": []string{"[--limit ]", "[--since ]", "[--clear [--before ]]"}, + "description": "List received files (~/.pilot/received/) newest-first. Default limit 10 in text mode; --clear deletes the matched set", + "returns": "files [{name, bytes, modified, path}], total, shown, dir", }, // Pub/Sub @@ -1948,7 +2053,7 @@ func cmdContext() { "ping": map[string]interface{}{ "args": []string{"", "[--count ]", "[--timeout ]"}, "description": "Ping a node via echo port (port 7). Default 4 pings", - "returns": "target, results [{seq, bytes, rtt_ms, error}], timeout (bool)", + "returns": "target, to, results [{seq, bytes, rtt_ms, error}], timeout (bool)", }, }, @@ -2061,7 +2166,42 @@ func cmdContext() { }, "config_file": "~/.pilot/config.json", } - output(ctx) +} + +// cmdContext prints the full command catalog, or — with a positional +// argument — only that command's entry: `pilotctl context send-message` +// returns the one JSON object instead of the whole ~18 KB dump. +// Multi-word commands work too: `pilotctl context daemon start`. +func cmdContext(args []string) { + ctx := contextCatalog() + _, pos := parseFlags(args) + if len(pos) == 0 { + output(ctx) + return + } + name := strings.Join(pos, " ") + commands, _ := ctx["commands"].(map[string]interface{}) + if entry, ok := commands[name]; ok { + output(entry) + return + } + // Fall back to the extras and gateway catalogs so e.g. + // `pilotctl context policy set` resolves too. + for _, section := range []string{"extras", "pilot_gateway"} { + sec, _ := ctx[section].(map[string]interface{}) + cmds, _ := sec["commands"].(map[string]interface{}) + if entry, ok := cmds[name]; ok { + output(entry) + return + } + } + valid := make([]string, 0, len(commands)) + for k := range commands { + valid = append(valid, k) + } + sort.Strings(valid) + fatalCode("not_found", "unknown command %q — valid: %s (extras/gateway names also accepted)", + name, strings.Join(valid, ", ")) } func cmdExtrasHelp() { @@ -2632,8 +2772,8 @@ func cmdDaemonStatus(args []string) { if jsonOutput { output(result) } else { - fmt.Println("Daemon: stopped") - fmt.Printf(" start with: pilotctl daemon start\n") + fmt.Printf("%s %s stopped\n", statusDot("err"), sBold("pilot-daemon")) + fmt.Printf(" %s\n", sDim("start: pilotctl daemon start")) } return } @@ -2642,7 +2782,12 @@ func cmdDaemonStatus(args []string) { info, err := d.Info() if err != nil { result["responsive"] = false - output(result) + if jsonOutput { + output(result) + } else { + fmt.Printf("%s %s unresponsive %s\n", statusDot("err"), sBold("pilot-daemon"), sDim("— socket accepted the connection but info failed")) + fmt.Printf(" %s\n", sDim("restart: pilotctl daemon stop && pilotctl daemon start")) + } return } @@ -2658,21 +2803,27 @@ func cmdDaemonStatus(args []string) { result["connections"] = int(info["connections"].(float64)) if !jsonOutput { + // The socket responded, so the daemon IS running — regardless of + // what the PID file says (it may be missing or stale when the + // daemon was started by launchd/systemd instead of pilotctl). uptime := info["uptime_secs"].(float64) hours := int(uptime) / 3600 mins := (int(uptime) % 3600) / 60 secs := int(uptime) % 60 - statusStr := "stopped" + fmt.Printf("%s %s running\n", statusDot("ok"), sBold("pilot-daemon")) + meta := fmt.Sprintf("uptime %02d:%02d:%02d", hours, mins, secs) if running { - statusStr = "running" + meta += fmt.Sprintf(" · pid %d", pid) } - fmt.Printf("Daemon: %s (pid %d)\n", statusStr, pid) - fmt.Printf(" Node ID: %d\n", int(info["node_id"].(float64))) - fmt.Printf(" Address: %s\n", info["address"]) + meta += fmt.Sprintf(" · %d connection(s)", int(info["connections"].(float64))) + fmt.Printf(" %s\n", sDim(meta)) + + nodeLine := fmt.Sprintf("%d · %s", int(info["node_id"].(float64)), info["address"]) if h, ok := info["hostname"].(string); ok && h != "" { - fmt.Printf(" Hostname: %s\n", h) + nodeLine += " · " + h } - fmt.Printf(" Uptime: %02d:%02d:%02d\n", hours, mins, secs) + fmt.Printf(" node %s\n", sAccent(nodeLine)) + peers := int(info["peers"].(float64)) encPeers := 0 if ep, ok := info["encrypted_peers"].(float64); ok { @@ -2686,14 +2837,12 @@ func cmdDaemonStatus(args []string) { if hp, ok := info["handshake_pending_count"].(float64); ok { pending = int(hp) } - peerLine := fmt.Sprintf("%d", peers) - if peers > 0 { - peerLine = fmt.Sprintf("%d (%d encrypted, %d via relay)", peers, encPeers, relayPeers) + fmt.Printf(" peers %s %s\n", sBold(fmt.Sprintf("%d", peers)), sDim(fmt.Sprintf("(%d encrypted, %d via relay)", encPeers, relayPeers))) + if !running { + fmt.Printf(" %s\n", sDim("pid file stale — daemon was likely started by launchd/systemd")) } - fmt.Printf(" Peers: %s\n", peerLine) - fmt.Printf(" Connections: %d\n", int(info["connections"].(float64))) if pending > 0 { - fmt.Printf(" Handshakes: %d pending\n", pending) + fmt.Printf(" %s %s\n", sWarn(fmt.Sprintf("%d pending handshake(s)", pending)), sDim("— review with: pilotctl pending")) } return } @@ -3526,25 +3675,111 @@ func cmdDgram(args []string) { } } +// cmdSendFile transfers a file via the dataexchange overlay stream. +// +// Until the chunked streaming protocol lands (see +// docs/PROPOSAL-reliable-file-transfer.md), this M0 implementation gives +// us four reliability primitives without changing the wire format: +// +// 1. **Bounded ACK wait.** The original code blocked on `client.Recv()` +// forever; if the receiver crashed mid-write, the sender hung until +// SO_KEEPALIVE finally fired (~120s on most kernels). We now wrap the +// receive in a context with a configurable `--timeout` (default 90s) +// and close the connection on expiry so the underlying goroutine +// unblocks. +// 2. **Progress indicator.** While the transfer is in flight, an +// elapsed-time line is rewritten on stderr (TTY-only, hidden under +// --json) so the user knows the command is alive. Uses the existing +// startWaitProgress helper. +// 3. **Throughput in the result.** The JSON output now carries +// elapsed_ms and a megabits-per-second rate, so agents can see at a +// glance whether the transfer was abnormally slow. +// 4. **Sharper error messages.** Timeouts and receiver-side ERR ACKs +// get distinct exit codes and surface the next command to run +// ("pilotctl ping" / "pilotctl peers"), matching the house pattern. func cmdSendFile(args []string) { - if len(args) < 2 { - fatalCode("invalid_argument", "usage: pilotctl send-file ") + flags, pos := parseFlags(args) + if len(pos) < 2 { + fatalCode("invalid_argument", "usage: pilotctl send-file [--timeout ]") } + // Default 90s is comfortable for transfers up to a hundred MiB over + // a relay path; users with bigger files or slower peers should bump + // it explicitly. We intentionally do not derive timeout from file + // size — that hides the failure mode where the receiver hangs + // post-write (the actual symptom of the original bug). + timeout := flagDuration(flags, "timeout", 90*time.Second) + d := connectDriver() defer d.Close() - target, err := parseAddrOrHostname(d, args[0]) + target, err := parseAddrOrHostname(d, pos[0]) if err != nil { fatalCode("invalid_argument", "%v", err) } // Auto-handshake to peers in the embedded trusted-agents list. // Best-effort: warns on stderr and continues if handshake fails. - // (send-file uses positional args — no flag map; pass false.) maybeAutoHandshake(d, target, false) - filePath := args[1] + // --prefer-direct breaks the daemon out of a stuck-on-relay tunnel + // BEFORE we dial port 1001. Without this, a previously-established + // relay tunnel is reused and the dial inherits its broken stream + // behavior. We send the IPC, log what the daemon reset, and proceed + // regardless — an old daemon returns "unknown command" which we + // treat as a best-effort hint, not a hard failure. + if flagBool(flags, "prefer-direct") { + resp, perr := d.PreferDirect(target.Node) + switch { + case perr != nil && strings.Contains(perr.Error(), "unknown command"): + fmt.Fprintln(os.Stderr, sDim("--prefer-direct: daemon does not support it (pre-v1.12.0); proceeding with existing tunnel")) + case perr != nil: + fmt.Fprintln(os.Stderr, sDim("--prefer-direct: "+perr.Error()+" (continuing)")) + default: + had, _ := resp["had_tunnel"].(bool) + wasActive, _ := resp["was_relay_active"].(bool) + wasPinned, _ := resp["was_relay_pinned"].(bool) + fmt.Fprintln(os.Stderr, sDim(fmt.Sprintf("--prefer-direct: tunnel=%v relay_was_active=%v relay_was_pinned=%v", + had, wasActive, wasPinned))) + } + } + + filePath := pos[1] + filename := filepath.Base(filePath) + + fi, err := os.Stat(filePath) + if err != nil { + if os.IsNotExist(err) { + fatalCode("not_found", "file not found: %s", filePath) + } + if os.IsPermission(err) { + fatalCode("internal", "permission denied: %s", filePath) + } + fatalCode("internal", "stat file: %v", err) + } + if fi.IsDir() { + fatalCode("invalid_argument", "%s is a directory, not a file", filePath) + } + size := fi.Size() + + // Streamed transfer (default): chunked, ACK'd, resumable, end-to-end + // SHA-256 verified — no per-frame size cap, and big files no longer + // collapse into one giant frame that stalls over relay (or over a + // direct link that flips to relay under one-way load). Falls back to + // the single-frame TypeFile path when the receiver is too old to + // understand TypeFileStream (it never sends an INIT-ACK). + if !flagBool(flags, "no-stream") { + if res, serr := streamSendFile(d, target, filePath, filename, size, timeout); serr == nil { + outputOK(res) + return + } else if !errors.Is(serr, dataexchange.ErrStreamUnsupported) { + fatalHint("connection_failed", + "check reachability: pilotctl ping "+target.String()+" · for very large/slow links raise --timeout", + "streamed send-file failed: %v", serr) + } + fmt.Fprintln(os.Stderr, sDim("receiver does not support streamed transfer (pre-v1.12.0); falling back to single-frame TypeFile")) + } + data, err := os.ReadFile(filePath) if err != nil { if os.IsNotExist(err) { @@ -3559,13 +3794,12 @@ func cmdSendFile(args []string) { // Reject files that would exceed the data-exchange frame cap before // opening the connection — keeps the failure path clean and avoids // streaming a quarter-gigabyte just to have the receiver close. - if len(data) > dataexchange.MaxFrameSize { + if uint32(len(data)) > dataexchange.MaxFrameSize { fatalCode("invalid_argument", - "file too large: %d bytes (max %d)", len(data), dataexchange.MaxFrameSize) + "file too large: %d bytes (max %d) for the legacy single-frame path. Use the default streamed transfer (omit --no-stream) against a v1.12.0+ receiver.", + len(data), dataexchange.MaxFrameSize) } - filename := filepath.Base(filePath) - client, err := dataexchange.Dial(d, target) if err != nil { hint := classifyDaemonError(err) @@ -3577,25 +3811,66 @@ func cmdSendFile(args []string) { } defer client.Close() + stop := startWaitProgress(fmt.Sprintf("sending %s to %s", filename, target)) + start := time.Now() + if err := client.SendFile(filename, data); err != nil { + stop() fatalCode("connection_failed", "send failed: %v", err) } - // Read ACK - ack, err := client.Recv() - if err != nil { - // Sender wrote all bytes but never got the receiver's ACK back - // (likely receiver crashed or restarted mid-transfer). That's - // not a silent success — surface as a loud error so callers - // don't mistake it for full delivery. - fatalCode("connection_failed", - "send wrote all bytes but no ACK from receiver: %v", err) + // Wait for the ACK with a bounded deadline. The dataexchange.Client + // does not expose a Recv-with-context, so we run the read in a + // goroutine and race it against a timer. On timeout we close the + // connection — that unblocks the goroutine's ReadFrame with an + // error, which we then drop on the floor because we've already + // decided the transfer is a failure. + type ackResult struct { + frame *dataexchange.Frame + err error + } + ackCh := make(chan ackResult, 1) + go func() { + f, err := client.Recv() + ackCh <- ackResult{f, err} + }() + + var ack *dataexchange.Frame + select { + case res := <-ackCh: + ack = res.frame + if res.err != nil { + stop() + // Sender wrote all bytes but never got the receiver's ACK + // back (likely receiver crashed or restarted mid-transfer). + fatalHint("connection_failed", + "the receiver may have crashed or restarted mid-transfer · check reachability: pilotctl ping "+target.String(), + "send wrote all bytes but no ACK from receiver: %v", res.err) + } + case <-time.After(timeout): + stop() + // Closing the conn lets the goroutine unwind. We deliberately + // don't wait for it here — we've already given the receiver its + // budget. + _ = client.Close() + fatalHint("timeout", + "the receiver did not ACK within "+timeout.String()+" · check reachability: pilotctl ping "+target.String()+" · for very large files try --timeout 5m", + "send-file timed out waiting for ACK from %s after %s", target, timeout) + } + + stop() + elapsed := time.Since(start) + mbps := 0.0 + if elapsed > 0 { + mbps = (float64(len(data)) * 8.0) / (1e6 * elapsed.Seconds()) } result := map[string]interface{}{ - "filename": filename, - "bytes": len(data), - "destination": target.String(), + "filename": filename, + "bytes": len(data), + "destination": target.String(), + "elapsed_ms": elapsed.Milliseconds(), + "throughput_mbps": mbps, } if ack != nil { ackText := string(ack.Payload) @@ -3610,6 +3885,54 @@ func cmdSendFile(args []string) { outputOK(result) } +// streamSendFile transfers filePath with the chunked, ACK'd, resumable +// TypeFileStream protocol. It returns a result map on success; the sentinel +// dataexchange.ErrStreamUnsupported tells the caller to fall back to the +// single-frame TypeFile path (the receiver is too old). timeout bounds the +// wait for any single ACK and for the receiver's final verification. +func streamSendFile(d *driver.Driver, target protocol.Addr, filePath, filename string, size int64, timeout time.Duration) (map[string]interface{}, error) { + client, err := dataexchange.Dial(d, target) + if err != nil { + return nil, err + } + defer client.Close() + + f, err := os.Open(filePath) + if err != nil { + return nil, err + } + defer f.Close() + + stop := startWaitProgress(fmt.Sprintf("streaming %s to %s", filename, target)) + start := time.Now() + res, serr := client.SendFileStream(filename, f, size, timeout) + stop() + if serr != nil { + return nil, serr + } + if !res.OK { + return nil, fmt.Errorf("receiver rejected file: %s", res.Message) + } + + elapsed := time.Since(start) + mbps := 0.0 + if elapsed > 0 { + mbps = (float64(res.TotalBytes) * 8.0) / (1e6 * elapsed.Seconds()) + } + return map[string]interface{}{ + "filename": filename, + "bytes": res.TotalBytes, + "bytes_sent": res.BytesSent, + "bytes_resumed": res.BytesResumed, + "sha256": res.Sha256, + "destination": target.String(), + "elapsed_ms": elapsed.Milliseconds(), + "throughput_mbps": mbps, + "transport": "filestream", + "verified": res.OK, + }, nil +} + func cmdSendMessage(args []string) { flags, pos := parseFlags(args) if len(pos) < 1 { @@ -3776,6 +4099,7 @@ func cmdSendMessage(args []string) { r := sendOne(cl, 0, false) result := map[string]interface{}{ "target": target.String(), + "to": target.String(), "type": msgType, } for k, v := range r { @@ -3789,7 +4113,9 @@ func cmdSendMessage(args []string) { if !jsonOutput { fmt.Fprintf(os.Stderr, "waiting for reply from %s (up to %s)...\n", pos[0], waitDur) } + stop := startWaitProgress("waiting for reply") reply, err := waitForInboxReply(agentHint, inboxCutoff, waitDur) + stop() if err != nil { fatalCode("timeout", "%v", err) } @@ -3810,6 +4136,7 @@ func cmdSendMessage(args []string) { } outputOK(map[string]interface{}{ "target": target.String(), + "to": target.String(), "type": msgType, "reuse_conn": true, "results": results, @@ -3828,6 +4155,7 @@ func cmdSendMessage(args []string) { } outputOK(map[string]interface{}{ "target": target.String(), + "to": target.String(), "type": msgType, "reuse_conn": false, "results": results, @@ -4038,8 +4366,7 @@ func cmdHandshake(args []string) { fmt.Printf("already trusted with node %d — ready to communicate\n", nodeID) } else { fmt.Printf("handshake request sent to node %d\n", nodeID) - fmt.Printf(" next: node %d must approve — or send a handshake back for auto-approval\n", nodeID) - fmt.Printf(" check: pilotctl trust\n") + fmt.Println(sDim(" they must approve on their side · check status: pilotctl trust · propagation can take ~60s")) } } } @@ -4062,7 +4389,7 @@ func cmdApprove(args []string) { output(result) } else { fmt.Printf("trust established with node %d\n", nodeID) - fmt.Printf(" try: pilotctl ping %d\n", nodeID) + fmt.Println(sDim(fmt.Sprintf(" trust is now mutual — message them: pilotctl send-message %d --data \"hi\"", nodeID))) } } @@ -4091,6 +4418,44 @@ func cmdReject(args []string) { } } +// cmdPreferDirect drops the daemon's tunnel + sticky routing state for a +// peer so the next dial retries a fresh direct UDP path. +// +// Use case: ping works (the small UDP fits through the beacon +// relay just fine) but send-file hangs ~120s and EOFs — symptom +// of a relay-mediated tunnel that established once and got stuck for +// stream traffic. Calling prefer-direct + retrying the dial routes the +// next attempt through ensureTunnel's resolve-and-punch path, which +// usually re-establishes a working direct UDP path. +func cmdPreferDirect(args []string) { + if len(args) < 1 { + fatalCode("invalid_argument", "usage: pilotctl prefer-direct ") + } + d := connectDriver() + defer d.Close() + + nodeID := resolveToNodeID(d, args[0]) + resp, err := d.PreferDirect(nodeID) + if err != nil { + if strings.Contains(err.Error(), "unknown command") { + fatalHint("not_implemented", + "upgrade the daemon: brew upgrade pilotprotocol (or re-run install.sh)", + "daemon does not support prefer-direct (pre-v1.12.0)") + } + fatalCode("connection_failed", "prefer-direct: %v", err) + } + if jsonOutput { + outputOK(resp) + return + } + had, _ := resp["had_tunnel"].(bool) + wasActive, _ := resp["was_relay_active"].(bool) + wasPinned, _ := resp["was_relay_pinned"].(bool) + fmt.Printf("reset routing state for node %d\n", nodeID) + fmt.Println(sDim(fmt.Sprintf(" tunnel was up: %v · relay was active: %v · relay was pinned: %v", had, wasActive, wasPinned))) + fmt.Println(sDim(" next dial will re-resolve from registry and prefer direct; falls back to relay if direct still fails")) +} + func cmdUntrust(args []string) { if len(args) < 1 { fatalCode("invalid_argument", "usage: pilotctl untrust ") @@ -4103,7 +4468,12 @@ func cmdUntrust(args []string) { if err != nil { fatalCode("connection_failed", "untrust: %v", err) } - outputOK(map[string]interface{}{"node_id": nodeID}) + if jsonOutput { + outputOK(map[string]interface{}{"node_id": nodeID}) + return + } + fmt.Printf("trust revoked for node %d\n", nodeID) + fmt.Println(sDim(fmt.Sprintf(" re-establish later: pilotctl handshake %d", nodeID))) } func cmdPending() { @@ -4131,18 +4501,28 @@ func cmdPending() { return } - fmt.Printf("%-10s %-40s %s\n", "NODE ID", "JUSTIFICATION", "RECEIVED") + fmt.Printf("%s\n\n", sBold(fmt.Sprintf("Pending handshakes — %d", len(pending)))) + now := time.Now() for _, p := range pending { req := p.(map[string]interface{}) nodeID := int(req["node_id"].(float64)) justification, _ := req["justification"].(string) receivedAt := int64(req["received_at"].(float64)) t := time.Unix(receivedAt, 0) - fmt.Printf("%-10d %-40s %s\n", nodeID, justification, t.Format("2006-01-02 15:04:05")) + fmt.Printf(" %s %s\n", sAccent(fmt.Sprintf("node %d", nodeID)), sDim(fmt.Sprintf("· %s ago (%s)", fmtDuration(now.Sub(t)), t.Format("2006-01-02 15:04")))) + if justification != "" { + fmt.Printf(" %s\n", inboxPreview(justification, 160)) + } + fmt.Printf(" %s\n\n", sDim(fmt.Sprintf("accept: pilotctl approve %d · decline: pilotctl reject %d \"reason\"", nodeID, nodeID))) } } -func cmdTrust() { +func cmdTrust(args []string) { + flags, _ := parseFlags(args) + limit := flagInt(flags, "limit", 20) + _, limitExplicit := flags["limit"] + search := flagString(flags, "search", "") + d := connectDriver() defer d.Close() @@ -4156,19 +4536,75 @@ func cmdTrust() { trusted = []interface{}{} } + // Filter by --search (node id substring; hostname too when present). + if search != "" { + needle := strings.ToLower(search) + var matched []interface{} + for _, t := range trusted { + rec, _ := t.(map[string]interface{}) + if rec == nil { + continue + } + nodeIDStr := fmt.Sprintf("%d", int(rec["node_id"].(float64))) + hostname, _ := rec["hostname"].(string) + if strings.Contains(nodeIDStr, needle) || strings.Contains(strings.ToLower(hostname), needle) { + matched = append(matched, t) + } + } + trusted = matched + if trusted == nil { + trusted = []interface{}{} + } + } + + // Newest first by approval time. + sort.SliceStable(trusted, func(i, j int) bool { + ri, _ := trusted[i].(map[string]interface{}) + rj, _ := trusted[j].(map[string]interface{}) + ai, _ := ri["approved_at"].(float64) + aj, _ := rj["approved_at"].(float64) + return ai > aj + }) + + total := len(trusted) + if jsonOutput { - output(map[string]interface{}{"trusted": trusted}) + // Shape unchanged; "total" is the pre-limit count. The list is + // only bounded when --limit is passed explicitly so existing + // agent invocations keep seeing the full set. + list := trusted + if limitExplicit && limit > 0 && len(list) > limit { + list = list[:limit] + } + output(map[string]interface{}{"trusted": list, "total": total}) return } - if len(trusted) == 0 { - fmt.Println("no trusted peers") - fmt.Println(" establish trust: pilotctl handshake \"reason\"") + if total == 0 { + if search != "" { + fmt.Printf("no trusted peers matching %q\n", search) + } else { + fmt.Println("no trusted peers") + fmt.Println(" establish trust: pilotctl handshake \"reason\"") + } return } - fmt.Printf("%-10s %-10s %-10s %s\n", "NODE ID", "MUTUAL", "NETWORK", "APPROVED AT") - for _, t := range trusted { + shown := trusted + if limit > 0 && len(shown) > limit { + shown = shown[:limit] + } + header := sBold(fmt.Sprintf("Trusted peers — %d", total)) + qualifier := "" + if search != "" { + qualifier = fmt.Sprintf(" · matching %q", search) + } + if len(shown) < total { + qualifier += fmt.Sprintf(" · showing %d newest", len(shown)) + } + fmt.Printf("%s%s\n\n", header, sDim(qualifier)) + + for _, t := range shown { rec := t.(map[string]interface{}) nodeID := int(rec["node_id"].(float64)) mutual := false @@ -4182,16 +4618,22 @@ func cmdTrust() { approvedAt := int64(rec["approved_at"].(float64)) at := time.Unix(approvedAt, 0) - mutualStr := "no" - if mutual { - mutualStr = "yes" + id := sAccent(fmt.Sprintf("node %d", nodeID)) + if hostname, _ := rec["hostname"].(string); hostname != "" { + id += " " + sAccent(hostname) } - netStr := "-" + meta := fmt.Sprintf("· approved %s", at.Format("2006-01-02 15:04")) if network > 0 { - netStr = fmt.Sprintf("%d", network) + meta += fmt.Sprintf(" · net %d", network) + } + line := fmt.Sprintf(" %s %s", id, sDim(meta)) + if !mutual { + // MUTUAL=yes is the norm — only the asymmetric case is news. + line += " " + sWarn("one-way") } - fmt.Printf("%-10d %-10s %-10s %s\n", nodeID, mutualStr, netStr, at.Format("2006-01-02 15:04:05")) + fmt.Println(line) } + fmt.Printf("\n%s\n", sDim("show more: --limit (0 = all) · search: --search · revoke: pilotctl untrust · json: --json")) } // ===================== MANAGEMENT ===================== @@ -4309,11 +4751,8 @@ func cmdInfo(args []string) { return } - // Human-readable + // Human-readable: health-style grouped layout. uptime := info["uptime_secs"].(float64) - hours := int(uptime) / 3600 - mins := (int(uptime) % 3600) / 60 - secs := int(uptime) % 60 bytesSent := uint64(info["bytes_sent"].(float64)) bytesRecv := uint64(info["bytes_recv"].(float64)) @@ -4329,111 +4768,102 @@ func cmdInfo(args []string) { encryptedPeers = int(ep) } - fmt.Printf("Pilot Protocol Daemon\n") + const labelW = 10 + label := func(s string) string { return " " + sDim(fmt.Sprintf("%-*s", labelW, s)) + " " } + cont := " " + strings.Repeat(" ", labelW) + " " + + // Headline: ● pilot-daemon v1.10.9 · up 56h12m + head := statusDot("ok") + " " + sBold("pilot-daemon") if v, ok := info["version"].(string); ok && v != "" { - fmt.Printf(" Version: %s\n", v) + head += " " + v } - fmt.Printf(" Node ID: %d\n", int(info["node_id"].(float64))) - fmt.Printf(" Address: %s\n", info["address"]) + fmt.Printf("%s %s\n\n", head, sDim("· up "+fmtDuration(time.Duration(uptime)*time.Second))) + + // identity — hostname · address · node id, then key/persistence/email. + var idParts []string if hostname, ok := info["hostname"].(string); ok && hostname != "" { - fmt.Printf(" Hostname: %s\n", hostname) - } - fmt.Printf(" Uptime: %02d:%02d:%02d\n", hours, mins, secs) - fmt.Printf(" Connections: %d\n", int(info["connections"].(float64))) - fmt.Printf(" Ports: %d\n", int(info["ports"].(float64))) - totalPeers := int(info["peers"].(float64)) - relayPeers := 0 - if rp, ok := info["relay_peer_count"].(float64); ok { - relayPeers = int(rp) - } - directPeers := totalPeers - relayPeers - fmt.Printf(" Peers: %d (%d direct, %d via relay)\n", totalPeers, directPeers, relayPeers) - authenticatedPeers := 0 - if ap, ok := info["authenticated_peers"].(float64); ok { - authenticatedPeers = int(ap) - } - if encryptEnabled { - fmt.Printf(" Encryption: enabled (X25519 + AES-256-GCM), %d/%d peers encrypted, %d authenticated\n", - encryptedPeers, totalPeers, authenticatedPeers) - } else { - fmt.Printf(" Encryption: disabled\n") - } - if pending, ok := info["handshake_pending_count"].(float64); ok && int(pending) > 0 { - fmt.Printf(" Handshakes: %d pending approval\n", int(pending)) - } - if beacon, ok := info["beacon_addr"].(string); ok && beacon != "" { - fmt.Printf(" Beacon: %s\n", beacon) + idParts = append(idParts, sAccent(hostname)) } + idParts = append(idParts, sAccent(fmt.Sprint(info["address"]))) + idParts = append(idParts, fmt.Sprintf("node %d", int(info["node_id"].(float64)))) + fmt.Printf("%s%s\n", label("identity"), strings.Join(idParts, " · ")) + hasIdentity := false if id, ok := info["identity"].(bool); ok { hasIdentity = id } + var keyParts []string if hasIdentity { pubKey, _ := info["public_key"].(string) - fingerprint := pubKey - if len(fingerprint) > 16 { - fingerprint = fingerprint[:16] + "..." + if len(pubKey) > 8 { + pubKey = pubKey[:8] + "…" } - fmt.Printf(" Identity: persistent (Ed25519 %s)\n", fingerprint) + keyParts = append(keyParts, "Ed25519 "+pubKey, "persistent") } else { - fmt.Printf(" Identity: ephemeral (not persisted)\n") + keyParts = append(keyParts, "ephemeral (not persisted)") } if email, ok := info["email"].(string); ok && email != "" { - // Tag synthetic emails so users (and agents inspecting the output) - // can tell at a glance whether this node has a real identity. // Synthetic emails are auto-derived from the public-key fingerprint - // and end with @nodes.pilotprotocol.network. To replace one, run - // `pilotctl set-email `. + // and end with @nodes.pilotprotocol.network — tag them so users see + // at a glance whether this node has a real identity. if strings.HasSuffix(email, "@nodes.pilotprotocol.network") { - fmt.Printf(" Email: %s (auto-generated; optional — `pilotctl set-email ` to set your own)\n", email) + keyParts = append(keyParts, email+" "+sDim("(auto-generated — set your own: pilotctl set-email )")) } else { - fmt.Printf(" Email: %s\n", email) + keyParts = append(keyParts, email) } } + fmt.Printf("%s%s\n", cont, strings.Join(keyParts, " · ")) + + // network — peers breakdown + pending, then beacon/connections/ports. + totalPeers := int(info["peers"].(float64)) + relayPeers := 0 + if rp, ok := info["relay_peer_count"].(float64); ok { + relayPeers = int(rp) + } + directPeers := totalPeers - relayPeers + netLine := fmt.Sprintf("%d peers %s", totalPeers, + sDim(fmt.Sprintf("(%d encrypted · %d relay · %d direct)", encryptedPeers, relayPeers, directPeers))) + if !encryptEnabled { + netLine += " · " + sWarn("encryption disabled") + } + if pending, ok := info["handshake_pending_count"].(float64); ok && int(pending) > 0 { + netLine += " · " + sWarn(fmt.Sprintf("%d pending handshake(s)", int(pending))) + " " + sDim("— pilotctl pending") + } + fmt.Printf("%s%s\n", label("network"), netLine) + + var infraParts []string + if beacon, ok := info["beacon_addr"].(string); ok && beacon != "" { + infraParts = append(infraParts, "beacon "+sAccent(beacon)) + } + connList, _ := info["conn_list"].([]interface{}) + connsPart := fmt.Sprintf("%d connections", int(info["connections"].(float64))) + if len(connList) > 0 { + connsPart += " " + sDim("(details: pilotctl connections)") + } + infraParts = append(infraParts, connsPart) + infraParts = append(infraParts, fmt.Sprintf("%d ports", int(info["ports"].(float64)))) + fmt.Printf("%s%s\n", cont, strings.Join(infraParts, " · ")) + + // networks — joined overlay networks, one compact line. if nets, ok := info["networks"].([]interface{}); ok && len(nets) > 0 { - fmt.Printf(" Networks: %d\n", len(nets)) + var netParts []string for _, n := range nets { nm, _ := n.(map[string]interface{}) - netID := int(nm["network_id"].(float64)) addr, _ := nm["address"].(string) - fmt.Printf(" - network %d: %s\n", netID, addr) + netParts = append(netParts, sAccent(addr)) } + fmt.Printf("%s%d %s\n", label("networks"), len(nets), sDim("— ")+strings.Join(netParts, " · ")) } - fmt.Printf(" Traffic: %s sent / %s recv\n", formatBytes(bytesSent), formatBytes(bytesRecv)) - fmt.Printf(" Packets: %d sent / %d recv\n", pktsSent, pktsRecv) - printSkillInstallSummary() + // traffic — ↑ sent · ↓ received with compact packet counts. + fmt.Printf("%s↑ %s %s · ↓ %s %s\n", label("traffic"), + formatBytes(bytesSent), sDim(fmt.Sprintf("(%s pkts)", fmtCount(pktsSent))), + formatBytes(bytesRecv), sDim(fmt.Sprintf("(%s pkts)", fmtCount(pktsRecv)))) - connList, ok := info["conn_list"].([]interface{}) - if ok && len(connList) > 0 { - maxDisplay := 50 - fmt.Printf("\nActive connections: %d\n", len(connList)) - fmt.Printf(" %-4s %-6s %-22s %-6s %-11s %-8s %-8s %-6s\n", - "ID", "LOCAL", "REMOTE ADDR", "RPORT", "STATE", "CWND", "FLIGHT", "SRTT") - displayed := 0 - for _, c := range connList { - if displayed >= maxDisplay { - fmt.Printf("\n ... and %d more connections (showing first %d)\n", len(connList)-maxDisplay, maxDisplay) - break - } - displayed++ - conn := c.(map[string]interface{}) - recoveryStr := "" - if inRec, ok := conn["in_recovery"].(bool); ok && inRec { - recoveryStr = " [RECOVERY]" - } - fmt.Printf(" %-4d %-6d %-22s %-6d %-11s %-8s %-8s %.0fms%s\n", - int(conn["id"].(float64)), - int(conn["local_port"].(float64)), - conn["remote_addr"], - int(conn["remote_port"].(float64)), - conn["state"], - formatBytes(uint64(conn["cong_win"].(float64))), - formatBytes(uint64(conn["in_flight"].(float64))), - conn["srtt_ms"].(float64), - recoveryStr, - ) - } + // skills — tool names only; full paths live in `pilotctl skills`. + if tools := skillInstallTools(); len(tools) > 0 { + fmt.Printf("%sinstalled in %s %s\n", label("skills"), + strings.Join(tools, " · "), sDim("(details: pilotctl skills)")) } } @@ -4481,27 +4911,33 @@ func cmdHealth() { webhookDropped = uint64(wd) } - fmt.Printf("Daemon Health\n") - fmt.Printf(" Status: %s\n", health["status"]) - fmt.Printf(" Uptime: %02d:%02d:%02d\n", hours, mins, secs) - fmt.Printf(" Connections: %d\n", int(health["connections"].(float64))) - fmt.Printf(" Peers: %d (%d encrypted, %d via relay)\n", peers, encPeers, relayPeers) + status, _ := health["status"].(string) + dotState := "ok" + if status != "ok" { + dotState = "err" + } + fmt.Printf("%s %s %s\n", statusDot(dotState), sBold("pilot-daemon"), status) + fmt.Printf(" %s\n", sDim(fmt.Sprintf("uptime %02d:%02d:%02d · %d connection(s)", hours, mins, secs, int(health["connections"].(float64))))) + fmt.Printf(" peers %s %s\n", sBold(fmt.Sprintf("%d", peers)), sDim(fmt.Sprintf("(%d encrypted, %d via relay)", encPeers, relayPeers))) + fmt.Printf(" traffic %s\n", sDim(fmt.Sprintf("↑ %s ↓ %s", + formatBytes(uint64(health["bytes_sent"].(float64))), + formatBytes(uint64(health["bytes_recv"].(float64)))))) if pending > 0 { - fmt.Printf(" Handshakes: %d pending\n", pending) + fmt.Printf(" %s %s\n", sWarn(fmt.Sprintf("%d pending handshake(s)", pending)), sDim("— review with: pilotctl pending")) } - fmt.Printf(" Bytes Sent: %s\n", formatBytes(uint64(health["bytes_sent"].(float64)))) - fmt.Printf(" Bytes Recv: %s\n", formatBytes(uint64(health["bytes_recv"].(float64)))) if queueDrops > 0 { - fmt.Printf(" Queue Drops: %d (accept queue full — increase system limits if persistent)\n", queueDrops) + fmt.Printf(" %s %s\n", sWarn(fmt.Sprintf("%d accept-queue drop(s)", queueDrops)), sDim("— increase system limits if persistent")) } if webhookDropped > 0 { - fmt.Printf(" Webhook: %d events dropped\n", webhookDropped) + fmt.Printf(" %s\n", sWarn(fmt.Sprintf("%d webhook event(s) dropped", webhookDropped))) } } func cmdPeers(args []string) { flags, _ := parseFlags(args) search := flagString(flags, "search", "") + showAll := flagBool(flags, "all") + limit := flagInt(flags, "limit", 20) d := connectDriver() defer d.Close() @@ -4539,15 +4975,40 @@ func cmdPeers(args []string) { filtered = append(filtered, cp) } + // Summary counts. "secure" = encrypted AND authenticated; everything + // else is an exception worth surfacing. + total := len(filtered) + encCount, secure, relayCount := 0, 0, 0 + var exceptions []map[string]interface{} + for _, p := range filtered { + peer := p.(map[string]interface{}) + enc, _ := peer["encrypted"].(bool) + auth, _ := peer["authenticated"].(bool) + rly, _ := peer["relay"].(bool) + if enc { + encCount++ + } + if rly { + relayCount++ + } + if enc && auth { + secure++ + } else { + exceptions = append(exceptions, peer) + } + } + direct := total - relayCount + if jsonOutput { output(map[string]interface{}{ - "peers": filtered, - "total": len(filtered), + "peers": filtered, + "total": total, + "encrypted": encCount, }) return } - if len(filtered) == 0 { + if total == 0 { if search != "" { fmt.Printf("no peers matching %q\n", search) } else { @@ -4557,32 +5018,78 @@ func cmdPeers(args []string) { return } - fmt.Printf("%-10s %-20s %-16s %-6s\n", "NODE ID", "ENCRYPTED", "AUTH", "PATH") - displayed := 0 - for _, p := range filtered { - if displayed >= 50 { - fmt.Printf("\n... and %d more peers (showing first 50)\n", len(filtered)-50) - break - } - displayed++ - peer := p.(map[string]interface{}) - encrypted, _ := peer["encrypted"].(bool) - authenticated, _ := peer["authenticated"].(bool) - relay, _ := peer["relay"].(bool) - encStr := "no" - if encrypted { - encStr = "yes (AES-256-GCM)" - } - authStr := "no" - if authenticated { - authStr = "yes (Ed25519)" - } - pathStr := "direct" - if relay { - pathStr = "relay" + noun := "peers" + if total == 1 { + noun = "peer" + } + dotState := "ok" + if len(exceptions) > 0 { + dotState = "warn" + } + fmt.Printf("%s %s %s\n", statusDot(dotState), + sBold(fmt.Sprintf("%d %s", total, noun)), + sDim(fmt.Sprintf("— %d encrypted+authenticated · %d relay · %d direct", secure, relayCount, direct))) + + if showAll { + fmt.Printf("\n%-10s %-10s %-10s %-6s\n", "NODE ID", "ENCRYPTED", "AUTH", "PATH") + shown := 0 + for _, p := range filtered { + if limit > 0 && shown >= limit { + fmt.Printf("%s\n", sDim(fmt.Sprintf(" … and %d more — show more: --limit %d (0 = all)", total-shown, total))) + break + } + shown++ + peer := p.(map[string]interface{}) + enc, _ := peer["encrypted"].(bool) + auth, _ := peer["authenticated"].(bool) + rly, _ := peer["relay"].(bool) + encStr := "yes" + if !enc { + encStr = sWarn(fmt.Sprintf("%-10s", "no")) + } else { + encStr = fmt.Sprintf("%-10s", encStr) + } + authStr := "yes" + if !auth { + authStr = sWarn(fmt.Sprintf("%-10s", "no")) + } else { + authStr = fmt.Sprintf("%-10s", authStr) + } + pathStr := sOK("direct") + if rly { + pathStr = sDim("relay") + } + fmt.Printf("%-10d %s %s %s\n", int(peer["node_id"].(float64)), encStr, authStr, pathStr) + } + } else if len(exceptions) > 0 { + fmt.Println() + shown := 0 + for _, peer := range exceptions { + if limit > 0 && shown >= limit { + fmt.Printf(" %s\n", sDim(fmt.Sprintf("… and %d more exception(s) — show more: --limit %d", len(exceptions)-shown, len(exceptions)))) + break + } + shown++ + enc, _ := peer["encrypted"].(bool) + auth, _ := peer["authenticated"].(bool) + rly, _ := peer["relay"].(bool) + var probs []string + if !enc { + probs = append(probs, "unencrypted") + } + if !auth { + probs = append(probs, "unauthenticated") + } + pathStr := "direct" + if rly { + pathStr = "relay" + } + fmt.Printf(" %s %s %s\n", statusDot("warn"), + sAccent(fmt.Sprintf("node %d", int(peer["node_id"].(float64)))), + sDim(fmt.Sprintf("— %s · %s", strings.Join(probs, " · "), pathStr))) } - fmt.Printf("%-10d %-20s %-16s %-6s\n", int(peer["node_id"].(float64)), encStr, authStr, pathStr) } + fmt.Printf("\n%s\n", sDim("list all: --all · search: --search · json: --json")) } func cmdPing(args []string) { @@ -4592,7 +5099,9 @@ func cmdPing(args []string) { } count := flagInt(flags, "count", 4) - timeout := flagDuration(flags, "timeout", 30*time.Second) + // Default 5s (not the 30s used elsewhere): ping is a reachability probe, + // and a fast verdict beats a patient one. Override with --timeout. + timeout := flagDuration(flags, "timeout", 5*time.Second) // --trace (or PILOTCTL_TRACE_TIME=1) prints per-step timing to stderr: // startup overhead, IPC connect, hostname lookup, and per-packet @@ -4634,6 +5143,17 @@ func cmdPing(args []string) { fmt.Printf("PING %s\n", target) } + // On dial/echo failure, print one actionable hint (text mode only, + // once per invocation) mirroring the send-message dial-timeout hint. + hintPrinted := false + printFailHint := func() { + if jsonOutput || hintPrinted { + return + } + hintPrinted = true + fmt.Fprintln(os.Stderr, sDim("hint: peer may be relay-converging after a beacon roll (~30s) — check reachability: pilotctl peers · trust state: pilotctl trust")) + } + var results []map[string]interface{} overall := time.NewTimer(timeout) defer overall.Stop() @@ -4695,6 +5215,7 @@ func cmdPing(args []string) { if jsonOutput { output(map[string]interface{}{ "target": target.String(), + "to": target.String(), "results": results, "timeout": true, }) @@ -4710,16 +5231,24 @@ func cmdPing(args []string) { var dialElapsed time.Duration connReused := false + // Per-attempt progress line: dial+echo against a slow or ghost + // peer can silently block up to perAttempt (>=10s). Stopped + // before any stdout print so the animation never interleaves + // with seq=N result lines. stopProgress is idempotent. + stopProgress := startWaitProgress(fmt.Sprintf("pinging %s", target)) + if reuseConn { if sharedConn == nil { var dialErr error sharedConn, dialElapsed, dialErr = dialOnce() if dialErr != nil { + stopProgress() r := map[string]interface{}{"seq": i, "error": dialErr.Error()} results = append(results, r) if !jsonOutput { fmt.Printf("seq=%d error: %v\n", i, dialErr) } + printFailHint() if i < count-1 { time.Sleep(time.Second) } @@ -4733,11 +5262,13 @@ func cmdPing(args []string) { var dialErr error conn, dialElapsed, dialErr = dialOnce() if dialErr != nil { + stopProgress() r := map[string]interface{}{"seq": i, "error": dialErr.Error()} results = append(results, r) if !jsonOutput { fmt.Printf("seq=%d error: %v\n", i, dialErr) } + printFailHint() if i < count-1 { time.Sleep(time.Second) } @@ -4766,6 +5297,7 @@ func cmdPing(args []string) { n, readErr := conn.Read(buf) recvAtNs := time.Now().UnixNano() echoElapsed := time.Since(echoStart) + stopProgress() if !reuseConn { conn.Close() @@ -4795,6 +5327,7 @@ func cmdPing(args []string) { if !jsonOutput { fmt.Printf("seq=%d error: %v\n", i, err) } + printFailHint() } else { r["bytes"] = n // Parse TRCE response: [TRCE][sent_at_ns][server_recv_at_ns] @@ -4858,6 +5391,7 @@ func cmdPing(args []string) { if jsonOutput { out := map[string]interface{}{ "target": target.String(), + "to": target.String(), "results": results, "timeout": false, } @@ -4900,10 +5434,15 @@ func cmdTraceroute(args []string) { connDone <- conn }() + // Tunnel negotiation against a slow or unreachable peer blocks here + // silently for up to --timeout; show elapsed progress on a TTY. + stopProgress := startWaitProgress(fmt.Sprintf("tracing %s", target)) var conn *driver.Conn select { case conn = <-connDone: + stopProgress() case <-time.After(timeout): + stopProgress() fatalCode("timeout", "dial timeout") } @@ -5038,9 +5577,14 @@ func cmdBench(args []string) { } sendDuration := time.Since(start) + // Waiting for the full echo to come back is the silent half of the + // benchmark — on a slow path it can run to --timeout with no output. + stopProgress := startWaitProgress(fmt.Sprintf("benchmarking %s", target)) select { case <-recvDone: + stopProgress() case <-time.After(timeout): + stopProgress() if !jsonOutput { fmt.Printf("warning: receive timed out (got %s of %s)\n", formatBytes(uint64(recvTotal)), formatBytes(uint64(totalSize))) @@ -5203,8 +5747,22 @@ func cmdBroadcast(args []string) { // ===================== MAILBOX ===================== +// receivedFile is one entry in ~/.pilot/received/ — a file delivered by +// the daemon's data-exchange service (filenames carry no sender info). +type receivedFile struct { + name string + size int64 + mod time.Time + path string +} + // cmdReceived lists or clears files received via data exchange (port 1001). // Files are saved to ~/.pilot/received/ by the daemon's built-in service. +// +// Same agent-first shape as cmdInbox: newest-first, bounded by default +// (--limit 10), filterable by age (--since ), clearable with +// --clear [--before ]. JSON stays unbounded unless --limit is passed +// explicitly so existing agent invocations keep seeing the full set. func cmdReceived(args []string) { flags, _ := parseFlags(args) @@ -5214,35 +5772,26 @@ func cmdReceived(args []string) { } dir := filepath.Join(home, ".pilot", "received") - if flagBool(flags, "clear") { - entries, err := os.ReadDir(dir) - if err != nil { - if os.IsNotExist(err) { - fatalCode("not_found", "no received files") - } - fatalCode("internal", "read directory: %v", err) - } - count := 0 - for _, e := range entries { - if e.IsDir() { - continue - } - os.Remove(filepath.Join(dir, e.Name())) - count++ - } - if jsonOutput { - outputOK(map[string]interface{}{"cleared": count}) + // --since accepts a duration (5m, 1h) or an RFC3339 timestamp. + var sinceCutoff time.Time + if s := flagString(flags, "since", ""); s != "" { + if d, derr := time.ParseDuration(s); derr == nil { + sinceCutoff = time.Now().Add(-d) + } else if t, terr := time.Parse(time.RFC3339, s); terr == nil { + sinceCutoff = t } else { - fmt.Printf("cleared %d received file(s)\n", count) + fatalCode("invalid_argument", "--since must be a duration (5m, 1h) or an RFC3339 timestamp") } - return } entries, err := os.ReadDir(dir) if err != nil { if os.IsNotExist(err) { + if flagBool(flags, "clear") { + fatalCode("not_found", "no received files") + } if jsonOutput { - output(map[string]interface{}{"files": []interface{}{}, "total": 0}) + output(map[string]interface{}{"files": []interface{}{}, "total": 0, "shown": 0}) } else { fmt.Println("no received files") fmt.Println(" files appear here when someone sends: pilotctl send-file ") @@ -5252,45 +5801,115 @@ func cmdReceived(args []string) { fatalCode("internal", "read directory: %v", err) } - var files []map[string]interface{} + var all []receivedFile for _, e := range entries { if e.IsDir() { continue } - info, err := e.Info() - if err != nil { + info, ierr := e.Info() + if ierr != nil { continue } - files = append(files, map[string]interface{}{ - "name": e.Name(), - "bytes": info.Size(), - "modified": info.ModTime().Format(time.RFC3339), - "path": filepath.Join(dir, e.Name()), + all = append(all, receivedFile{ + name: e.Name(), + size: info.Size(), + mod: info.ModTime(), + path: filepath.Join(dir, e.Name()), }) } + // Newest first. + sort.Slice(all, func(i, j int) bool { return all[i].mod.After(all[j].mod) }) + + var filtered []receivedFile + for _, f := range all { + if !sinceCutoff.IsZero() && f.mod.Before(sinceCutoff) { + continue + } + filtered = append(filtered, f) + } + + // --clear deletes the matched set (everything if no filters given). + // --before restricts the clear to files older than the duration. + if flagBool(flags, "clear") { + var beforeCutoff time.Time + if b := flagString(flags, "before", ""); b != "" { + d, derr := time.ParseDuration(b) + if derr != nil { + fatalCode("invalid_argument", "--before must be a duration (24h, 30m)") + } + beforeCutoff = time.Now().Add(-d) + } + count := 0 + for _, f := range filtered { + if !beforeCutoff.IsZero() && !f.mod.Before(beforeCutoff) { + continue + } + if os.Remove(f.path) == nil { + count++ + } + } + if jsonOutput { + outputOK(map[string]interface{}{"cleared": count, "remaining": len(all) - count}) + } else { + fmt.Printf("cleared %d received file(s), %d remaining\n", count, len(all)-count) + } + return + } + + limit := flagInt(flags, "limit", 10) + _, limitExplicit := flags["limit"] + total := len(filtered) if jsonOutput { + // Back-compat: the list is only bounded when --limit is passed + // explicitly; "total" is always the pre-limit count. + list := filtered + if limitExplicit && limit > 0 && len(list) > limit { + list = list[:limit] + } + files := make([]map[string]interface{}, 0, len(list)) + for _, f := range list { + files = append(files, map[string]interface{}{ + "name": f.name, + "bytes": f.size, + "modified": f.mod.Format(time.RFC3339), + "path": f.path, + }) + } output(map[string]interface{}{ "files": files, - "total": len(files), + "total": total, + "shown": len(files), "dir": dir, }) return } - if len(files) == 0 { + if total == 0 { + if !sinceCutoff.IsZero() { + fmt.Println("no received files match the filters") + return + } fmt.Println("no received files") fmt.Println(" files appear here when someone sends: pilotctl send-file ") return } - fmt.Printf("Received files (%s):\n\n", dir) - fmt.Printf(" %-40s %-10s %s\n", "NAME", "SIZE", "RECEIVED") - for _, f := range files { - fmt.Printf(" %-40s %-10s %s\n", - f["name"], formatBytes(uint64(f["bytes"].(int64))), f["modified"]) + shown := filtered + if limit > 0 && len(shown) > limit { + shown = shown[:limit] + } + qualifier := "" + if len(shown) < total { + qualifier = fmt.Sprintf(" · showing %d newest", len(shown)) + } + fmt.Printf("%s%s\n\n", sBold(fmt.Sprintf("Received files — %d", total)), sDim(qualifier+" · "+dir)) + now := time.Now() + for _, f := range shown { + fmt.Printf(" %s\n", sAccent(f.name)) + fmt.Printf(" %s\n\n", sDim(fmt.Sprintf("%s ago · %s", fmtDuration(now.Sub(f.mod)), formatBytes(uint64(f.size))))) } - fmt.Printf("\ntotal: %d\n", len(files)) + fmt.Println(sDim("filters: --since --limit (0 = all) · clear: --clear [--before 24h] · json: --json")) } // cmdInbox lists or clears messages received via data exchange (port 1001). @@ -5341,10 +5960,109 @@ func waitForInboxReply(agentHint string, cutoff time.Time, timeout time.Duration return nil, fmt.Errorf("no reply from %q within %s", agentHint, timeout) } +// inboxMessage is one parsed inbox entry plus its stable ID — the filename +// without .json, already unique: {TYPE}-{date}-{time.ms}-{seq}. +type inboxMessage struct { + id string + msg map[string]interface{} + name string // filename with extension, used by --clear +} + +// readInboxNewestFirst loads all inbox entries sorted newest-first. +// Inbox filenames are {TYPE}-{ts}-{seq}.json. Plain alpha order groups by +// type (BINARY nj + } + return ni[di:] > nj[dj:] + }) + var out []inboxMessage + for _, e := range entries { + if e.IsDir() || !strings.HasSuffix(e.Name(), ".json") { + continue + } + data, err := os.ReadFile(filepath.Join(dir, e.Name())) + if err != nil { + continue + } + var m map[string]interface{} + if json.Unmarshal(data, &m) != nil { + continue + } + out = append(out, inboxMessage{ + id: strings.TrimSuffix(e.Name(), ".json"), + msg: m, + name: e.Name(), + }) + } + return out, nil +} + +// inboxPreview collapses whitespace and caps the body for one-line display. +// Rune-safe so multi-byte content isn't cut mid-character. +func inboxPreview(data string, max int) string { + collapsed := strings.Join(strings.Fields(data), " ") + r := []rune(collapsed) + if len(r) > max { + return string(r[:max]) + "…" + } + return collapsed +} + +// cmdInboxRead prints the full body of a single message by ID. +func cmdInboxRead(dir, id string) { + if id != filepath.Base(id) || strings.Contains(id, "..") { + fatalCode("invalid_argument", "invalid message id %q", id) + } + name := id + if !strings.HasSuffix(name, ".json") { + name += ".json" + } + data, err := os.ReadFile(filepath.Join(dir, name)) + if err != nil { + if os.IsNotExist(err) { + fatalCode("not_found", "no message %q — list ids with: pilotctl inbox", id) + } + fatalCode("internal", "read message: %v", err) + } + var m map[string]interface{} + if err := json.Unmarshal(data, &m); err != nil { + fatalCode("internal", "parse message: %v", err) + } + m["id"] = strings.TrimSuffix(name, ".json") + if jsonOutput { + output(m) + return + } + from, _ := m["from"].(string) + ts, _ := m["received_at"].(string) + msgType, _ := m["type"].(string) + bytes, _ := m["bytes"].(float64) + fmt.Printf("ID: %s\nFrom: %s\nWhen: %s\nType: %s\nBytes: %d\n\n", m["id"], from, ts, msgType, int(bytes)) + body, _ := m["data"].(string) + fmt.Println(body) +} + // Messages are saved to ~/.pilot/inbox/ by the daemon's built-in service. +// +// Agent-first design: newest-first, bounded by default (--limit 10), full +// bodies only on request (--latest, --full, read ), filterable by +// sender (--from) and age (--since). Every mode is non-interactive and +// stable under --json so agents can consume the output directly instead of +// scraping ~/.pilot/inbox/ with shell one-liners. func cmdInbox(args []string) { - flags, _ := parseFlags(args) - traceTime := flagBool(flags, "trace") + flags, pos := parseFlags(args) home, err := os.UserHomeDir() if err != nil { @@ -5352,35 +6070,48 @@ func cmdInbox(args []string) { } dir := filepath.Join(home, ".pilot", "inbox") - if flagBool(flags, "clear") { - entries, err := os.ReadDir(dir) - if err != nil { - if os.IsNotExist(err) { - fatalCode("not_found", "inbox is empty") - } - fatalCode("internal", "read directory: %v", err) + // Subcommand: inbox read — full body of one message. + if len(pos) > 0 && pos[0] == "read" { + if len(pos) < 2 { + fatalCode("invalid_argument", "usage: pilotctl inbox read ") } - count := 0 - for _, e := range entries { - if e.IsDir() { - continue + cmdInboxRead(dir, pos[1]) + return + } + + // --from accepts an address or a hostname. Hostname resolution needs the + // daemon; an address-shaped filter works even with the daemon stopped + // (the inbox is just files on disk). + fromFilter := flagString(flags, "from", "") + if fromFilter != "" { + if _, perr := protocol.ParseAddr(fromFilter); perr != nil { + d := connectDriver() + addr, rerr := parseAddrOrHostname(d, fromFilter) + d.Close() + if rerr != nil { + fatalCode("not_found", "cannot resolve --from %q: %v", fromFilter, rerr) } - os.Remove(filepath.Join(dir, e.Name())) - count++ + fromFilter = addr.String() } - if jsonOutput { - outputOK(map[string]interface{}{"cleared": count}) + } + + // --since accepts a duration (5m, 1h) or an RFC3339 timestamp. + var sinceCutoff time.Time + if s := flagString(flags, "since", ""); s != "" { + if d, derr := time.ParseDuration(s); derr == nil { + sinceCutoff = time.Now().Add(-d) + } else if t, terr := time.Parse(time.RFC3339, s); terr == nil { + sinceCutoff = t } else { - fmt.Printf("cleared %d message(s)\n", count) + fatalCode("invalid_argument", "--since must be a duration (5m, 1h) or an RFC3339 timestamp") } - return } - entries, err := os.ReadDir(dir) + all, err := readInboxNewestFirst(dir) if err != nil { if os.IsNotExist(err) { if jsonOutput { - output(map[string]interface{}{"messages": []interface{}{}, "total": 0}) + output(map[string]interface{}{"messages": []interface{}{}, "total": 0, "shown": 0}) } else { fmt.Println("inbox is empty") fmt.Println(" messages appear here when someone sends: pilotctl send-message --data \"hello\"") @@ -5390,82 +6121,122 @@ func cmdInbox(args []string) { fatalCode("internal", "read directory: %v", err) } - // Inbox filenames are {type}-{ts-ms}-{seq}.json. Plain alpha order - // groups by type (binary additionally restricts the clear to messages older + // than the duration, so `--clear --before 24h` keeps today's messages. + if flagBool(flags, "clear") { + var beforeCutoff time.Time + if b := flagString(flags, "before", ""); b != "" { + d, derr := time.ParseDuration(b) + if derr != nil { + fatalCode("invalid_argument", "--before must be a duration (24h, 30m)") + } + beforeCutoff = time.Now().Add(-d) } - data, err := os.ReadFile(filepath.Join(dir, e.Name())) - if err != nil { - continue + count := 0 + for _, im := range filtered { + if !beforeCutoff.IsZero() { + ts, _ := im.msg["received_at"].(string) + t, terr := time.Parse(time.RFC3339Nano, ts) + if terr == nil && !t.Before(beforeCutoff) { + continue + } + } + if os.Remove(filepath.Join(dir, im.name)) == nil { + count++ + } } - var msg map[string]interface{} - if err := json.Unmarshal(data, &msg); err != nil { - continue + if jsonOutput { + outputOK(map[string]interface{}{"cleared": count, "remaining": len(all) - count}) + } else { + fmt.Printf("cleared %d message(s), %d remaining\n", count, len(all)-count) } - messages = append(messages, msg) + return + } + + // --latest = full body of the single newest (post-filter) message. + full := flagBool(flags, "full") + limit := flagInt(flags, "limit", 10) + if flagBool(flags, "latest") { + limit, full = 1, true + } + total := len(filtered) + shown := filtered + if limit > 0 && len(shown) > limit { + shown = shown[:limit] } if jsonOutput { + msgs := make([]map[string]interface{}, 0, len(shown)) + for _, im := range shown { + m := map[string]interface{}{ + "id": im.id, + "from": im.msg["from"], + "received_at": im.msg["received_at"], + "type": im.msg["type"], + "bytes": im.msg["bytes"], + } + if body, _ := im.msg["data"].(string); full { + m["data"] = body + } else { + m["preview"] = inboxPreview(body, 120) + } + msgs = append(msgs, m) + } output(map[string]interface{}{ - "messages": messages, - "total": len(messages), + "messages": msgs, + "total": total, + "shown": len(msgs), "dir": dir, }) return } - if len(messages) == 0 { - fmt.Println("inbox is empty") - fmt.Println(" messages appear here when someone sends: pilotctl send-message --data \"hello\"") + if total == 0 { + fmt.Println("inbox is empty (no messages match the filters)") return } - fmt.Printf("Inbox (%d messages):\n\n", len(messages)) + fmt.Printf("%s %s\n\n", sBold(fmt.Sprintf("Inbox — %d message(s)", total)), sDim(fmt.Sprintf("· showing %d newest", len(shown)))) now := time.Now() - for _, m := range messages { - msgType, _ := m["type"].(string) - from, _ := m["from"].(string) - ts, _ := m["received_at"].(string) - data, _ := m["data"].(string) - bytes, _ := m["bytes"].(float64) - - var tsLine string - if traceTime { - t, err := time.Parse(time.RFC3339Nano, ts) - if err == nil { - ago := now.Sub(t) - tsLine = fmt.Sprintf("%s (%s ago, %d bytes)", ts, fmtDuration(ago), int(bytes)) - } else { - tsLine = ts - } + for _, im := range shown { + from, _ := im.msg["from"].(string) + ts, _ := im.msg["received_at"].(string) + msgType, _ := im.msg["type"].(string) + bytes, _ := im.msg["bytes"].(float64) + age := "" + if t, terr := time.Parse(time.RFC3339Nano, ts); terr == nil { + age = fmtDuration(now.Sub(t)) + " ago" + } + fmt.Printf(" %s\n", sAccent(im.id)) + fmt.Printf(" %s\n", sDim(fmt.Sprintf("%s · %s · %s · %s", from, msgType, age, formatBytes(uint64(bytes))))) + body, _ := im.msg["data"].(string) + if full { + fmt.Printf(" %s\n\n", body) } else { - tsLine = ts - } - - preview := data - if len(preview) > 80 { - preview = preview[:80] + "..." + fmt.Printf(" %s\n\n", inboxPreview(body, 120)) } - fmt.Printf(" [%s] from %s type=%s\n", tsLine, from, msgType) - fmt.Printf(" %s\n", preview) } - fmt.Printf("\nclear with: pilotctl inbox --clear\n") + fmt.Println(sDim("full body: pilotctl inbox read · --latest · filters: --from --since --limit · clear: --clear [--before 24h]")) } // --- Network commands --- @@ -5488,22 +6259,49 @@ func cmdNetworkList() { return } // Member counts are admin-only at the registry. Without admin_token - // the registry omits the `members` field; render "—" so the column - // stays aligned and it's clear the count is hidden by policy rather - // than broken. - fmt.Printf("%-8s %-30s %-10s %s\n", "ID", "NAME", "JOIN RULE", "MEMBERS") + // the registry omits the `members` field for every network — in that + // case the column would be a wall of "—" that just looks broken, so + // we drop it entirely and say why in a footnote. When at least one + // network carries a count, the column stays ("—" marks the hidden ones). + memberCount := func(nm map[string]interface{}) (string, bool) { + if members, ok := nm["members"].([]interface{}); ok { + return fmt.Sprintf("%d", len(members)), true + } + if mc, ok := nm["members"].(float64); ok { + return fmt.Sprintf("%d", int(mc)), true + } + return "—", false + } + anyMembers := false + for _, n := range nets { + nm, _ := n.(map[string]interface{}) + if _, ok := memberCount(nm); ok { + anyMembers = true + break + } + } + if anyMembers { + fmt.Printf("%-8s %-30s %-10s %s\n", "ID", "NAME", "JOIN RULE", "MEMBERS") + } else { + fmt.Printf("%-8s %-30s %s\n", "ID", "NAME", "JOIN RULE") + } for _, n := range nets { nm, _ := n.(map[string]interface{}) id := uint16(nm["id"].(float64)) name, _ := nm["name"].(string) rule, _ := nm["join_rule"].(string) - memberStr := "—" - if members, ok := nm["members"].([]interface{}); ok { - memberStr = fmt.Sprintf("%d", len(members)) - } else if mc, ok := nm["members"].(float64); ok { - memberStr = fmt.Sprintf("%d", int(mc)) + // Pad before styling so ANSI escapes don't break column alignment. + nameCol := sAccent(fmt.Sprintf("%-30s", name)) + ruleCol := sDim(fmt.Sprintf("%-10s", rule)) + if anyMembers { + memberStr, _ := memberCount(nm) + fmt.Printf("%-8d %s %s %s\n", id, nameCol, ruleCol, memberStr) + } else { + fmt.Printf("%-8d %s %s\n", id, nameCol, ruleCol) } - fmt.Printf("%-8d %-30s %-10s %s\n", id, name, rule, memberStr) + } + if !anyMembers { + fmt.Printf("\n%s\n", sDim("member counts hidden — admin only")) } } diff --git a/cmd/pilotctl/skills.go b/cmd/pilotctl/skills.go index 932ec492..5d12e2d9 100644 --- a/cmd/pilotctl/skills.go +++ b/cmd/pilotctl/skills.go @@ -58,9 +58,11 @@ func runTick() (*skillinject.Report, error) { } // cmdSkillsStatus runs one tick (fetching the manifest + entrypoint over -// HTTPS) and prints what state each managed file is in and what action -// the daemon's next live tick would take. -func cmdSkillsStatus(_ []string) { +// HTTPS) and prints a per-tool summary line (statusDot + what, if anything, +// the next tick would change). Per-file detail lines are behind --verbose. +func cmdSkillsStatus(args []string) { + flags, _ := parseFlags(args) + verbose := flagBool(flags, "verbose") report, err := runTick() if err != nil { fatalCode("internal", "skills tick: %v", err) @@ -87,10 +89,8 @@ func cmdSkillsStatus(_ []string) { return } - fmt.Println("Pilot Protocol skill — install status") - fmt.Println("=====================================") - fmt.Printf("Reconcile cadence: every %s (default), plus once on daemon startup.\n", skillinject.DefaultInterval) - fmt.Println("All paths below are auto-managed by the daemon — edits are reverted on next tick.") + fmt.Println(sBold("Pilot Protocol skill — install status")) + fmt.Println(sDim(fmt.Sprintf("Reconcile cadence: every %s (default), plus once on daemon startup · paths are auto-managed — manual edits revert on next tick", skillinject.DefaultInterval))) fmt.Println() if len(report.Outcomes) == 0 { @@ -116,8 +116,28 @@ func cmdSkillsStatus(_ []string) { sort.Strings(tools) for _, tool := range tools { - fmt.Printf("[%s]\n", tool) - for _, o := range byTool[tool] { + outs := byTool[tool] + // One summary line per tool: ok when every managed file is + // identical; otherwise warn naming the first offending file and + // the action the daemon's next tick will take; err on tick errors. + dot, summary := "ok", "skill+heartbeat ok" + for _, o := range outs { + if o.Err != "" { + dot = "err" + summary = fmt.Sprintf("%s — %s", filepath.Base(o.Path), o.Err) + break + } + if o.State != skillinject.StateIdentical && dot == "ok" { + dot = "warn" + summary = fmt.Sprintf("%s %s — next: %s", filepath.Base(o.Path), o.State, o.Action) + } + } + fmt.Printf("%s %s %s\n", statusDot(dot), sBold(tool), sDim(summary)) + + if !verbose { + continue + } + for _, o := range outs { label := "skill copy: " switch o.Kind { case skillinject.KindMarker: @@ -138,6 +158,9 @@ func cmdSkillsStatus(_ []string) { fmt.Println() } + if !verbose { + fmt.Printf("\n%s\n", sDim("per-file detail: --verbose · paths only: pilotctl skills paths · force a pass: pilotctl skills check")) + } if len(report.Skipped) > 0 { fmt.Printf("Not installed (skipped): %s\n", strings.Join(report.Skipped, ", ")) } @@ -361,18 +384,37 @@ func cmdSkillsEnable(args []string) { } } -// printSkillInstallSummary is called from cmdInfo to surface the agent -// skill install paths in the standard daemon diagnostic. Quiet (no header) -// when no agent tools are detected on the host. +// skillInstallTools returns the agent tools that have the pilot skill +// installed, in detection order. Empty when no agent tools are present +// on the host. Same data source as `pilotctl skills`, collapsed to one +// entry per tool. +func skillInstallTools() []string { + report, err := runTick() + if err != nil || report == nil || len(report.Outcomes) == 0 { + return nil + } + seen := map[string]bool{} + var order []string + for _, o := range report.Outcomes { + if o.Kind != skillinject.KindSkill { + continue + } + if !seen[o.Tool] { + seen[o.Tool] = true + order = append(order, o.Tool) + } + } + return order +} + +// printSkillInstallSummary surfaces the agent skill install paths. +// Quiet (no header) when no agent tools are detected on the host. func printSkillInstallSummary() { report, err := runTick() if err != nil || report == nil || len(report.Outcomes) == 0 { return } // Collapse to one path per tool: prefer the skill copy over the marker. - type entry struct { - Tool, Path string - } seen := map[string]string{} order := []string{} for _, o := range report.Outcomes { diff --git a/cmd/pilotctl/style.go b/cmd/pilotctl/style.go new file mode 100644 index 00000000..4470696d --- /dev/null +++ b/cmd/pilotctl/style.go @@ -0,0 +1,140 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +// style.go — ANSI styling for human-facing output. +// +// Rules: +// - Color only when stdout is a TTY and NO_COLOR / TERM=dumb are unset. +// - --json output is NEVER styled (agents parse it byte-for-byte). +// - Piped text output stays plain so grep/awk/cut keep working. +// +// The palette is semantic, not decorative: ok=green, warn=yellow, +// err=red, accent=cyan (ids, hostnames), dim=metadata. Use sparingly — +// a screen that is all color is as unreadable as one with none. + +package main + +import ( + "fmt" + "os" + "sync" + "time" +) + +// colorEnabled is computed once at startup. Tests capture stdout via a +// pipe, so styling is automatically off there and assertions on plain +// text stay stable. +var colorEnabled = func() bool { + if os.Getenv("NO_COLOR") != "" || os.Getenv("PILOT_NO_COLOR") != "" { + return false + } + if os.Getenv("TERM") == "dumb" { + return false + } + fi, err := os.Stdout.Stat() + if err != nil { + return false + } + return fi.Mode()&os.ModeCharDevice != 0 +}() + +func sc(code, s string) string { + if !colorEnabled { + return s + } + return "\x1b[" + code + "m" + s + "\x1b[0m" +} + +func sBold(s string) string { return sc("1", s) } +func sDim(s string) string { return sc("2", s) } +func sOK(s string) string { return sc("32", s) } +func sWarn(s string) string { return sc("33", s) } +func sErr(s string) string { return sc("31", s) } +func sAccent(s string) string { return sc("36", s) } + +// stderrIsTTY mirrors colorEnabled but for os.Stderr. It gates the +// animated wait-progress line: animation (cursor rewrites) only makes +// sense on an interactive terminal, and NO_COLOR / TERM=dumb users have +// asked for plain output, so they get silence instead of a spinner. +var stderrIsTTY = func() bool { + if os.Getenv("NO_COLOR") != "" || os.Getenv("PILOT_NO_COLOR") != "" { + return false + } + if os.Getenv("TERM") == "dumb" { + return false + } + fi, err := os.Stderr.Stat() + if err != nil { + return false + } + return fi.Mode()&os.ModeCharDevice != 0 +}() + +// startWaitProgress shows a single self-rewriting stderr line +// ("label… Ns") while a blocking wait is in flight, so commands like +// `send-message --wait` don't sit in 30 s of dead silence. Returns a +// stop func that erases the line and terminates the ticker goroutine. +// +// Safe by construction: +// - no-op when stderr is not a TTY or --json is set (agents and pipes +// never see control sequences); +// - stop() is idempotent (sync.Once), so it can be called on every +// exit path of a wait without bookkeeping; +// - stop() blocks until the goroutine has erased the line, so output +// printed right after stop() never interleaves with the animation. +// +// First draw happens at the first 500 ms tick — sub-500 ms waits finish +// without any flicker. +func startWaitProgress(label string) (stop func()) { + if !stderrIsTTY || jsonOutput { + return func() {} + } + done := make(chan struct{}) + finished := make(chan struct{}) + start := time.Now() + go func() { + defer close(finished) + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-done: + fmt.Fprint(os.Stderr, "\r\x1b[2K") + return + case <-ticker.C: + elapsed := int(time.Since(start).Seconds()) + fmt.Fprintf(os.Stderr, "\r\x1b[2K%s", + sDim(fmt.Sprintf("%s… %ds", label, elapsed))) + } + } + }() + var once sync.Once + return func() { + once.Do(func() { + close(done) + <-finished + }) + } +} + +// statusDot renders a colored ● for state lines: ok=green, warn=yellow, +// err=red. Falls back to plain symbols when color is off so piped output +// still distinguishes states. +func statusDot(state string) string { + switch state { + case "ok": + if colorEnabled { + return sOK("●") + } + return "●" + case "warn": + if colorEnabled { + return sWarn("●") + } + return "◐" + default: + if colorEnabled { + return sErr("●") + } + return "○" + } +} diff --git a/cmd/pilotctl/updates.go b/cmd/pilotctl/updates.go index 0b5426f2..33bf5295 100644 --- a/cmd/pilotctl/updates.go +++ b/cmd/pilotctl/updates.go @@ -114,24 +114,53 @@ func cmdUpdates(args []string) { date = t.Format("2006-01-02") } title := strings.TrimSpace(it.Title) - fmt.Printf("• %s %s\n", date, title) + fmt.Printf("• %s %s\n", sAccent(date), title) if len(it.Categories) > 0 { - fmt.Printf(" [%s]\n", strings.Join(it.Categories, ", ")) + fmt.Printf(" %s\n", sDim("["+strings.Join(it.Categories, ", ")+"]")) } if d := strings.TrimSpace(it.Description); d != "" { - d = collapseWhitespace(d) - if len(d) > 200 { - d = d[:197] + "..." - } - fmt.Printf(" %s\n", d) + fmt.Printf(" %s\n", wrapText(collapseWhitespace(d), 100, 4)) } if l := strings.TrimSpace(it.Link); l != "" { - fmt.Printf(" %s\n", l) + fmt.Printf(" %s\n", sDim(l)) } fmt.Println() } } +// wrapText word-wraps s at word boundaries so no line exceeds width +// columns (counting the indent), with a hanging indent: continuation +// lines are prefixed with `indent` spaces to line up under a first line +// the caller has already indented by the same amount. Words longer than +// a full line are emitted unbroken — never split mid-word. +func wrapText(s string, width, indent int) string { + words := strings.Fields(s) + if len(words) == 0 { + return "" + } + pad := strings.Repeat(" ", indent) + var b strings.Builder + lineLen := indent // caller prints the first line's indent + for i, w := range words { + wl := len([]rune(w)) + switch { + case i == 0: + b.WriteString(w) + lineLen += wl + case lineLen+1+wl > width: + b.WriteString("\n") + b.WriteString(pad) + b.WriteString(w) + lineLen = indent + wl + default: + b.WriteString(" ") + b.WriteString(w) + lineLen += 1 + wl + } + } + return b.String() +} + // filterAndTruncate applies the --scope category filter (case-insensitive, // match-any-category) and the --count cap to a list of feed items. Both // arguments are inert when zero/empty, so callers can pass scope="" or diff --git a/cmd/pilotctl/zz_cmds_daemon_test.go b/cmd/pilotctl/zz_cmds_daemon_test.go index 98b8563b..2c3e7e4a 100644 --- a/cmd/pilotctl/zz_cmds_daemon_test.go +++ b/cmd/pilotctl/zz_cmds_daemon_test.go @@ -40,9 +40,9 @@ func TestCmdInfoHumanOutput(t *testing.T) { out := captureStdout(t, func() { cmdInfo(nil) }) for _, want := range []string{ - "Pilot Protocol Daemon", "host-x", "Node ID: 42", - "Address: 0:0000.0000.002A", "Beacon: b.example:9001", - "Identity: persistent", + "pilot-daemon", "host-x", "node 42", + "0:0000.0000.002A", "beacon b.example:9001", + "persistent", "Ed25519 deadbeef…", } { if !strings.Contains(out, want) { t.Errorf("missing %q in:\n%s", want, out) @@ -88,7 +88,7 @@ func TestCmdHealthHumanOutput(t *testing.T) { jsonOutput = false out := captureStdout(t, func() { cmdHealth() }) - for _, want := range []string{"Daemon Health", "Status: ok", "Connections: 1"} { + for _, want := range []string{"pilot-daemon", "ok", "1 connection(s)", "peers"} { if !strings.Contains(out, want) { t.Errorf("missing %q in: %s", want, out) } @@ -154,11 +154,31 @@ func TestCmdPeersWithData(t *testing.T) { jsonOutput = false out := captureStdout(t, func() { cmdPeers(nil) }) - for _, want := range []string{"NODE ID", "99", "yes (AES-256-GCM)", "yes (Ed25519)", "direct"} { + // Encrypted+authenticated peers are the norm — only the summary shows. + for _, want := range []string{"1 peer", "1 encrypted+authenticated", "0 relay", "1 direct"} { if !strings.Contains(out, want) { t.Errorf("missing %q in: %s", want, out) } } + + // --all expands to the full table with the node id. The fake daemon + // serves a single connection, so spin up a fresh one. + d2 := newFakeDaemon(t) + d2.useDaemon(t) + d2.onJSON(tdCmdInfo, tdCmdInfoOK, `{ + "node_id": 1, "address": "0:0000.0000.0001", + "uptime_secs": 0, "connections": 0, "ports": 0, "peers": 1, + "bytes_sent": 0, "bytes_recv": 0, "pkts_sent": 0, "pkts_recv": 0, + "peer_list": [ + {"node_id": 99, "encrypted": true, "authenticated": true, "relay": false} + ] + }`) + outAll := captureStdout(t, func() { cmdPeers([]string{"--all"}) }) + for _, want := range []string{"NODE ID", "99", "yes", "direct"} { + if !strings.Contains(outAll, want) { + t.Errorf("missing %q in --all output: %s", want, outAll) + } + } } func TestCmdPeersJSON(t *testing.T) { @@ -555,9 +575,12 @@ func TestCmdPendingHasOne(t *testing.T) { defer func() { jsonOutput = prev }() jsonOutput = false out := captureStdout(t, func() { cmdPending() }) - if !strings.Contains(out, "NODE ID") || !strings.Contains(out, "hi") { + if !strings.Contains(out, "node 7") || !strings.Contains(out, "hi") { t.Errorf("missing pending row: %s", out) } + if !strings.Contains(out, "pilotctl approve 7") { + t.Errorf("missing inline approve hint: %s", out) + } } func TestCmdTrustEmpty(t *testing.T) { @@ -567,7 +590,7 @@ func TestCmdTrustEmpty(t *testing.T) { prev := jsonOutput defer func() { jsonOutput = prev }() jsonOutput = false - out := captureStdout(t, func() { cmdTrust() }) + out := captureStdout(t, func() { cmdTrust(nil) }) if !strings.Contains(out, "no trusted peers") { t.Errorf("missing 'no trusted peers': %s", out) } @@ -581,12 +604,16 @@ func TestCmdTrustHasEntries(t *testing.T) { prev := jsonOutput defer func() { jsonOutput = prev }() jsonOutput = false - out := captureStdout(t, func() { cmdTrust() }) - for _, want := range []string{"NODE ID", "MUTUAL", "9", "yes"} { + out := captureStdout(t, func() { cmdTrust(nil) }) + for _, want := range []string{"Trusted peers — 1", "node 9", "net 1", "pilotctl untrust"} { if !strings.Contains(out, want) { t.Errorf("missing %q: %s", want, out) } } + // Mutual trust is the norm — it must NOT be tagged. + if strings.Contains(out, "one-way") { + t.Errorf("mutual peer wrongly tagged one-way: %s", out) + } } // --- find / disconnect --- diff --git a/cmd/pilotctl/zz_commands_test.go b/cmd/pilotctl/zz_commands_test.go index 2cdb093b..0308a16d 100644 --- a/cmd/pilotctl/zz_commands_test.go +++ b/cmd/pilotctl/zz_commands_test.go @@ -189,7 +189,7 @@ func TestCmdContextEmitsJSON(t *testing.T) { defer func() { jsonOutput = prev }() jsonOutput = true - out := captureStdout(t, cmdContext) + out := captureStdout(t, func() { cmdContext(nil) }) var env map[string]interface{} if err := json.Unmarshal([]byte(out), &env); err != nil { t.Fatalf("envelope: %v", err) @@ -218,7 +218,7 @@ func TestCmdContextPrettyText(t *testing.T) { defer func() { jsonOutput = prev }() jsonOutput = false - out := captureStdout(t, cmdContext) + out := captureStdout(t, func() { cmdContext(nil) }) // Pretty mode prints an indented JSON map; both modes should mention // some core commands. if !strings.Contains(out, "send-message") { @@ -276,3 +276,68 @@ func TestRedactPeerEndpointsDeepNesting(t *testing.T) { t.Errorf("l2.id preserved? got %v", l2["id"]) } } + +// --- context : single-entry lookup --- + +func TestCmdContextSingleCommand(t *testing.T) { + prev := jsonOutput + defer func() { jsonOutput = prev }() + jsonOutput = true + + out := captureStdout(t, func() { cmdContext([]string{"send-message"}) }) + var env map[string]interface{} + if err := json.Unmarshal([]byte(out), &env); err != nil { + t.Fatalf("envelope: %v\n%s", err, out) + } + data, ok := env["data"].(map[string]interface{}) + if !ok { + t.Fatalf("data not a map: %v", env) + } + // Only the one entry — args/description/returns, no catalog keys. + for _, key := range []string{"args", "description", "returns"} { + if _, ok := data[key]; !ok { + t.Errorf("entry missing %q: %v", key, data) + } + } + for _, absent := range []string{"commands", "extras", "version"} { + if _, ok := data[absent]; ok { + t.Errorf("single-command lookup leaked catalog key %q", absent) + } + } +} + +func TestCmdContextMultiWordCommand(t *testing.T) { + prev := jsonOutput + defer func() { jsonOutput = prev }() + jsonOutput = true + + out := captureStdout(t, func() { cmdContext([]string{"daemon", "start"}) }) + var env map[string]interface{} + if err := json.Unmarshal([]byte(out), &env); err != nil { + t.Fatalf("envelope: %v\n%s", err, out) + } + data := env["data"].(map[string]interface{}) + desc, _ := data["description"].(string) + if !strings.Contains(desc, "daemon") { + t.Errorf("daemon start entry description = %q", desc) + } +} + +func TestCmdContextUnknownCommand(t *testing.T) { + t.Parallel() + stdout, stderr, code := runCLI(t, []string{"--json", "context", "no-such-cmd"}, nil) + if code == 0 { + t.Fatalf("expected non-zero exit, stdout=%s", stdout) + } + var env map[string]interface{} + if err := json.Unmarshal([]byte(stderr), &env); err != nil { + t.Fatalf("stderr not JSON: %v\n%s", err, stderr) + } + if env["code"] != "not_found" { + t.Errorf("code = %v, want not_found", env["code"]) + } + msg, _ := env["message"].(string) + if !strings.Contains(msg, "no-such-cmd") || !strings.Contains(msg, "send-message") { + t.Errorf("error should name the input and list valid commands: %s", msg) + } +} diff --git a/cmd/pilotctl/zz_daemon_info_health_test.go b/cmd/pilotctl/zz_daemon_info_health_test.go index aca8244a..a8e6c4ba 100644 --- a/cmd/pilotctl/zz_daemon_info_health_test.go +++ b/cmd/pilotctl/zz_daemon_info_health_test.go @@ -75,35 +75,43 @@ func TestCmdInfoText(t *testing.T) { out := captureStdout(t, func() { withText(func() { cmdInfo(nil) }) }) - if !strings.Contains(out, "Pilot Protocol Daemon") { + if !strings.Contains(out, "pilot-daemon") { t.Errorf("missing banner: %s", out) } - if !strings.Contains(out, "Version:") { - t.Errorf("missing Version line: %s", out) + if !strings.Contains(out, "v1.7.1") { + t.Errorf("missing version: %s", out) } - if !strings.Contains(out, "Node ID: 42") { - t.Errorf("missing Node ID: %s", out) + if !strings.Contains(out, "node 42") { + t.Errorf("missing node id: %s", out) } - if !strings.Contains(out, "Hostname: alice") { - t.Errorf("missing Hostname: %s", out) + if !strings.Contains(out, "alice") { + t.Errorf("missing hostname: %s", out) } - if !strings.Contains(out, "Uptime: 01:02:05") { + if !strings.Contains(out, "up 1h2m") { t.Errorf("missing uptime: %s", out) } - if !strings.Contains(out, "Encryption: enabled") { - t.Errorf("missing encryption line: %s", out) + if !strings.Contains(out, "5 peers (4 encrypted · 1 relay · 4 direct)") { + t.Errorf("missing peer breakdown: %s", out) + } + if !strings.Contains(out, "2 pending handshake(s)") || !strings.Contains(out, "pilotctl pending") { + t.Errorf("missing pending handshake warning with hint: %s", out) } - if !strings.Contains(out, "Beacon:") { + if !strings.Contains(out, "beacon 34.71.57.205:9001") { t.Errorf("missing beacon line: %s", out) } - if !strings.Contains(out, "Identity: persistent") { + if !strings.Contains(out, "Ed25519 01234567…") || !strings.Contains(out, "persistent") { t.Errorf("missing identity line: %s", out) } - if !strings.Contains(out, "Networks: 2") { - t.Errorf("missing networks count: %s", out) + if !strings.Contains(out, "networks") || !strings.Contains(out, "5:0000.0000.002A") { + t.Errorf("missing networks line: %s", out) } - if !strings.Contains(out, "ESTABLISHED") { - t.Errorf("missing conn list: %s", out) + // Connections are summarized, not dumped — detail lives in + // `pilotctl connections`. + if !strings.Contains(out, "3 connections") || !strings.Contains(out, "pilotctl connections") { + t.Errorf("missing connections summary: %s", out) + } + if !strings.Contains(out, "↑ 1.0 MB") || !strings.Contains(out, "↓ 512.0 KB") { + t.Errorf("missing traffic line: %s", out) } } @@ -139,10 +147,10 @@ func TestCmdInfoEphemeralIdentity(t *testing.T) { out := captureStdout(t, func() { withText(func() { cmdInfo(nil) }) }) - if !strings.Contains(out, "Identity: ephemeral") { + if !strings.Contains(out, "ephemeral (not persisted)") { t.Errorf("missing ephemeral identity: %s", out) } - if !strings.Contains(out, "Encryption: disabled") { + if !strings.Contains(out, "encryption disabled") { t.Errorf("missing disabled encryption: %s", out) } } @@ -174,16 +182,16 @@ func TestCmdHealthText(t *testing.T) { out := captureStdout(t, func() { withText(func() { cmdHealth() }) }) - if !strings.Contains(out, "Daemon Health") { + if !strings.Contains(out, "pilot-daemon") { t.Errorf("missing banner: %s", out) } - if !strings.Contains(out, "Status: ok") { + if !strings.Contains(out, "ok") { t.Errorf("missing status: %s", out) } - if !strings.Contains(out, "Uptime: 01:02:05") { + if !strings.Contains(out, "uptime 01:02:05") { t.Errorf("missing uptime: %s", out) } - if !strings.Contains(out, "Handshakes: 2 pending") { + if !strings.Contains(out, "2 pending handshake(s)") { t.Errorf("missing handshake line: %s", out) } } @@ -208,10 +216,10 @@ func TestCmdHealthWithDrops(t *testing.T) { out := captureStdout(t, func() { withText(func() { cmdHealth() }) }) - if !strings.Contains(out, "Queue Drops: 5") { + if !strings.Contains(out, "5 accept-queue drop(s)") { t.Errorf("missing queue drops: %s", out) } - if !strings.Contains(out, "Webhook: 3 events dropped") { + if !strings.Contains(out, "3 webhook event(s) dropped") { t.Errorf("missing webhook drops: %s", out) } } @@ -253,15 +261,41 @@ func TestCmdPeersTextWithPeers(t *testing.T) { out := captureStdout(t, func() { withText(func() { cmdPeers(nil) }) }) - if !strings.Contains(out, "42") || !strings.Contains(out, "99") { - t.Errorf("missing peer IDs: %s", out) + // Headline summary + exceptions: node 99 is unencrypted so it is the + // only row; node 42 is healthy and stays in the summary count. + if !strings.Contains(out, "2 peers") { + t.Errorf("missing peer count: %s", out) } - if !strings.Contains(out, "relay") || !strings.Contains(out, "direct") { - t.Errorf("missing path types: %s", out) + if !strings.Contains(out, "1 encrypted+authenticated") { + t.Errorf("missing secure count: %s", out) + } + if !strings.Contains(out, "1 relay") || !strings.Contains(out, "1 direct") { + t.Errorf("missing path breakdown: %s", out) + } + if !strings.Contains(out, "node 99") || !strings.Contains(out, "unencrypted") { + t.Errorf("missing exception row for node 99: %s", out) + } + if strings.Contains(out, "node 42") { + t.Errorf("healthy peer should not be listed as exception: %s", out) } if strings.Contains(out, "1.2.3.4") { t.Errorf("endpoint should have been stripped: %s", out) } + + // --all shows every peer in the table. The fake daemon serves a single + // connection, so spin up a fresh one for the second invocation. + sd2 := newStreamDaemon(t) + sd2.useDaemonNoRegistry(t) + sd2.onJSON(tdCmdInfo, tdCmdInfoOK, payload) + outAll := captureStdout(t, func() { + withText(func() { cmdPeers([]string{"--all"}) }) + }) + if !strings.Contains(outAll, "42") || !strings.Contains(outAll, "99") { + t.Errorf("--all missing peer IDs: %s", outAll) + } + if !strings.Contains(outAll, "relay") || !strings.Contains(outAll, "direct") { + t.Errorf("--all missing path types: %s", outAll) + } } func TestCmdPeersJSONR4(t *testing.T) { @@ -403,13 +437,21 @@ func TestCmdTrustWithPeers(t *testing.T) { ] }`) out := captureStdout(t, func() { - withText(func() { cmdTrust() }) + withText(func() { cmdTrust(nil) }) }) - if !strings.Contains(out, "42") || !strings.Contains(out, "99") { + if !strings.Contains(out, "node 42") || !strings.Contains(out, "node 99") { t.Errorf("missing peer IDs: %s", out) } - if !strings.Contains(out, "yes") { - t.Errorf("missing mutual yes: %s", out) + if !strings.Contains(out, "Trusted peers — 2") { + t.Errorf("missing header: %s", out) + } + // node 99 is mutual=false → tagged one-way; node 42 is mutual → untagged. + if !strings.Contains(out, "one-way") { + t.Errorf("missing one-way tag for asymmetric peer: %s", out) + } + // Newest first: 99 (1700000100) before 42 (1700000000). + if strings.Index(out, "node 99") > strings.Index(out, "node 42") { + t.Errorf("not sorted newest-first: %s", out) } } @@ -418,7 +460,7 @@ func TestCmdTrustEmptyR4(t *testing.T) { sd.useDaemonNoRegistry(t) sd.onJSON(tdCmdHandshake, tdCmdHandshakeOK, `{"trusted":[]}`) out := captureStdout(t, func() { - withText(func() { cmdTrust() }) + withText(func() { cmdTrust(nil) }) }) if !strings.Contains(out, "no trusted peers") { t.Errorf("%s", out) @@ -430,11 +472,14 @@ func TestCmdTrustJSON(t *testing.T) { sd.useDaemonNoRegistry(t) sd.onJSON(tdCmdHandshake, tdCmdHandshakeOK, `{"trusted":[]}`) out := captureStdout(t, func() { - withJSON(func() { cmdTrust() }) + withJSON(func() { cmdTrust(nil) }) }) if !strings.Contains(out, `"trusted":[]`) { t.Errorf("%s", out) } + if !strings.Contains(out, `"total":0`) { + t.Errorf("missing total: %s", out) + } } // ---- cmdPending ---------------------------------------------------------- diff --git a/cmd/pilotctl/zz_extras_test.go b/cmd/pilotctl/zz_extras_test.go index 9615e627..3d68aa94 100644 --- a/cmd/pilotctl/zz_extras_test.go +++ b/cmd/pilotctl/zz_extras_test.go @@ -270,11 +270,19 @@ func TestCmdInboxMixedTypesOrdering(t *testing.T) { if len(msgs) != 3 { t.Fatalf("got %d msgs", len(msgs)) } - // First by timestamp suffix: "1st", "2nd", "3rd". - for i, want := range []string{"1st", "2nd", "3rd"} { + // Newest first by timestamp suffix: "3rd", "2nd", "1st". Default JSON + // output carries a bounded preview, not the full body — agents use + // --latest / --full / read for full bodies. + for i, want := range []string{"3rd", "2nd", "1st"} { m := msgs[i].(map[string]interface{}) - if m["data"] != want { - t.Errorf("msgs[%d].data = %v, want %q", i, m["data"], want) + if m["preview"] != want { + t.Errorf("msgs[%d].preview = %v, want %q", i, m["preview"], want) + } + if _, hasData := m["data"]; hasData { + t.Errorf("msgs[%d] carries full data in default mode; want preview only", i) + } + if id, _ := m["id"].(string); id == "" { + t.Errorf("msgs[%d] missing stable id", i) } } } diff --git a/cmd/pilotctl/zz_inbox_test.go b/cmd/pilotctl/zz_inbox_test.go index 1ef54496..827a622e 100644 --- a/cmd/pilotctl/zz_inbox_test.go +++ b/cmd/pilotctl/zz_inbox_test.go @@ -166,7 +166,7 @@ func TestCmdReceivedWithFilesTextMode(t *testing.T) { defer func() { jsonOutput = prev }() jsonOutput = false out := captureStdout(t, func() { cmdReceived(nil) }) - for _, frag := range []string{"Received files", "report.pdf", "total"} { + for _, frag := range []string{"Received files — 1", "report.pdf", "--clear"} { if !strings.Contains(out, frag) { t.Errorf("missing %q in: %s", frag, out) } @@ -293,3 +293,136 @@ func TestCmdReceivedClear(t *testing.T) { t.Errorf("cleared = %v", data["cleared"]) } } + +// --- received: --limit / --since (text-mode bounding + age filter) --- + +// writeRecvFiles drops n files into ~/.pilot/received with mtimes spaced +// one hour apart, oldest first (f0 = oldest). Returns the dir. +func writeRecvFiles(t *testing.T, home string, n int) string { + t.Helper() + dir := filepath.Join(home, ".pilot", "received") + if err := os.MkdirAll(dir, 0o755); err != nil { + t.Fatal(err) + } + for i := 0; i < n; i++ { + p := filepath.Join(dir, "f"+string(rune('0'+i))+".bin") + if err := os.WriteFile(p, []byte("x"), 0o600); err != nil { + t.Fatal(err) + } + mt := time.Now().Add(-time.Duration(n-i) * time.Hour) + if err := os.Chtimes(p, mt, mt); err != nil { + t.Fatal(err) + } + } + return dir +} + +func TestCmdReceivedLimitTextMode(t *testing.T) { + tmp := withTempHomeFull(t) + writeRecvFiles(t, tmp, 4) + prev := jsonOutput + defer func() { jsonOutput = prev }() + jsonOutput = false + out := captureStdout(t, func() { cmdReceived([]string{"--limit", "2"}) }) + if !strings.Contains(out, "Received files — 4") { + t.Errorf("expected total 4 in header: %s", out) + } + if !strings.Contains(out, "showing 2 newest") { + t.Errorf("expected 'showing 2 newest': %s", out) + } + // Newest two are f3 and f2; oldest two must be elided. + for _, want := range []string{"f3.bin", "f2.bin"} { + if !strings.Contains(out, want) { + t.Errorf("missing %q: %s", want, out) + } + } + for _, not := range []string{"f1.bin", "f0.bin"} { + if strings.Contains(out, not) { + t.Errorf("unexpected %q (beyond limit): %s", not, out) + } + } +} + +// JSON stays unbounded unless --limit is explicit (back-compat, mirrors +// cmdTrust). "total"/"shown" are always present. +func TestCmdReceivedJSONUnboundedByDefault(t *testing.T) { + tmp := withTempHomeFull(t) + writeRecvFiles(t, tmp, 12) + prev := jsonOutput + defer func() { jsonOutput = prev }() + jsonOutput = true + out := captureStdout(t, func() { cmdReceived(nil) }) + var env map[string]interface{} + if err := json.Unmarshal([]byte(out), &env); err != nil { + t.Fatalf("parse: %v\n%s", err, out) + } + data := env["data"].(map[string]interface{}) + if data["total"] != float64(12) || data["shown"] != float64(12) { + t.Errorf("total=%v shown=%v, want 12/12 (JSON unbounded by default)", data["total"], data["shown"]) + } + // Explicit --limit bounds JSON too. + out = captureStdout(t, func() { cmdReceived([]string{"--limit", "3"}) }) + if err := json.Unmarshal([]byte(out), &env); err != nil { + t.Fatalf("parse: %v\n%s", err, out) + } + data = env["data"].(map[string]interface{}) + if data["total"] != float64(12) || data["shown"] != float64(3) { + t.Errorf("total=%v shown=%v, want 12/3 with explicit --limit", data["total"], data["shown"]) + } +} + +func TestCmdReceivedSince(t *testing.T) { + tmp := withTempHomeFull(t) + writeRecvFiles(t, tmp, 4) // mtimes: 4h, 3h, 2h, 1h ago + prev := jsonOutput + defer func() { jsonOutput = prev }() + jsonOutput = true + // 2.5h window → f2 (2h) and f3 (1h) match. + out := captureStdout(t, func() { cmdReceived([]string{"--since", "150m"}) }) + var env map[string]interface{} + if err := json.Unmarshal([]byte(out), &env); err != nil { + t.Fatalf("parse: %v\n%s", err, out) + } + data := env["data"].(map[string]interface{}) + if data["total"] != float64(2) { + t.Errorf("total = %v, want 2 (files within 150m)", data["total"]) + } + files := data["files"].([]interface{}) + first := files[0].(map[string]interface{}) + if first["name"] != "f3.bin" { + t.Errorf("newest-first violated: first = %v", first["name"]) + } +} + +func TestCmdReceivedSinceInvalid(t *testing.T) { + t.Parallel() + _, stderr, code := runCLI(t, []string{"received", "--since", "not-a-time"}, nil) + if code == 0 { + t.Fatalf("expected non-zero exit for bad --since") + } + if !strings.Contains(stderr, "--since") { + t.Errorf("expected --since mention in stderr: %s", stderr) + } +} + +func TestCmdReceivedClearBefore(t *testing.T) { + tmp := withTempHomeFull(t) + dir := writeRecvFiles(t, tmp, 4) // 4h, 3h, 2h, 1h ago + prev := jsonOutput + defer func() { jsonOutput = prev }() + jsonOutput = true + // Only files older than 2.5h (f0, f1) get cleared. + out := captureStdout(t, func() { cmdReceived([]string{"--clear", "--before", "150m"}) }) + var env map[string]interface{} + if err := json.Unmarshal([]byte(out), &env); err != nil { + t.Fatalf("parse: %v\n%s", err, out) + } + data := env["data"].(map[string]interface{}) + if data["cleared"] != float64(2) || data["remaining"] != float64(2) { + t.Errorf("cleared=%v remaining=%v, want 2/2", data["cleared"], data["remaining"]) + } + entries, _ := os.ReadDir(dir) + if len(entries) != 2 { + t.Errorf("expected 2 files left, got %d", len(entries)) + } +} diff --git a/cmd/pilotctl/zz_stream_daemon_test.go b/cmd/pilotctl/zz_stream_daemon_test.go index 26903d41..dc1d2085 100644 --- a/cmd/pilotctl/zz_stream_daemon_test.go +++ b/cmd/pilotctl/zz_stream_daemon_test.go @@ -487,6 +487,11 @@ func TestCmdSendMessageText(t *testing.T) { if data["target"] == nil { t.Errorf("missing target") } + // "to" carries the resolved destination address so --json consumers + // learn what a hostname argument resolved to. + if to, _ := data["to"].(string); to != "0:0000.0000.002A" { + t.Errorf("to = %q, want resolved address 0:0000.0000.002A", to) + } } func TestCmdSendMessageJSON(t *testing.T) { @@ -610,6 +615,9 @@ func TestCmdPingJSON(t *testing.T) { if data["target"] == nil { t.Errorf("missing target: %v", data) } + if to, _ := data["to"].(string); to != "0:0000.0000.002A" { + t.Errorf("to = %q, want resolved address 0:0000.0000.002A", to) + } results := data["results"].([]interface{}) if len(results) != 1 { t.Errorf("results len = %d, want 1", len(results)) diff --git a/cmd/pilotctl/zz_style_progress_test.go b/cmd/pilotctl/zz_style_progress_test.go new file mode 100644 index 00000000..cddd506a --- /dev/null +++ b/cmd/pilotctl/zz_style_progress_test.go @@ -0,0 +1,84 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +package main + +import ( + "io" + "os" + "strings" + "testing" + "time" +) + +// In the test environment stderr is not a character device, so +// stderrIsTTY must be false and startWaitProgress must be a no-op that +// writes nothing — agents and pipes never see ANSI rewrites. +func TestStderrIsTTYFalseUnderTests(t *testing.T) { + if stderrIsTTY { + t.Fatal("stderrIsTTY should be false when stderr is a pipe") + } +} + +func TestStartWaitProgressNoopNonTTY(t *testing.T) { + stop := startWaitProgress("waiting") + if stop == nil { + t.Fatal("stop func must never be nil") + } + // Double-stop must be safe on the no-op path too. + stop() + stop() +} + +func TestStartWaitProgressDrawAndStopIdempotent(t *testing.T) { + // Force the active path even without a TTY to exercise the + // goroutine lifecycle: start, tick once, stop twice. Capture + // stderr via a pipe so the animation does not pollute test output. + origTTY, origJSON, origStderr := stderrIsTTY, jsonOutput, os.Stderr + r, w, err := os.Pipe() + if err != nil { + t.Fatalf("pipe: %v", err) + } + stderrIsTTY, jsonOutput, os.Stderr = true, false, w + t.Cleanup(func() { stderrIsTTY, jsonOutput, os.Stderr = origTTY, origJSON, origStderr }) + + stop := startWaitProgress("test wait") + time.Sleep(600 * time.Millisecond) // let at least one tick draw + done := make(chan struct{}) + go func() { + stop() + stop() // second call must not panic or block + close(done) + }() + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("stop() did not return; goroutine leak or deadlock") + } + + w.Close() + out, _ := io.ReadAll(r) + got := string(out) + if !strings.Contains(got, "test wait") { + t.Errorf("progress line never drawn: %q", got) + } + if !strings.Contains(got, "\r\x1b[2K") { + t.Errorf("missing rewrite/erase sequence: %q", got) + } + if !strings.HasSuffix(got, "\r\x1b[2K") { + t.Errorf("stop() must erase the line last: %q", got) + } +} + +func TestStartWaitProgressNoopWhenJSON(t *testing.T) { + origTTY, origJSON := stderrIsTTY, jsonOutput + stderrIsTTY, jsonOutput = true, true + t.Cleanup(func() { stderrIsTTY, jsonOutput = origTTY, origJSON }) + + // --json must suppress the animation even on a TTY. The no-op stop + // returns immediately; if the active path were taken the draw would + // go to the real stderr, which the assertion below cannot see, so we + // assert on behavior: stop returns instantly and is double-safe. + stop := startWaitProgress("waiting") + stop() + stop() +} diff --git a/docs/BUG-updater-version-skew.md b/docs/BUG-updater-version-skew.md new file mode 100644 index 00000000..f2dcdac5 --- /dev/null +++ b/docs/BUG-updater-version-skew.md @@ -0,0 +1,149 @@ +# BUG: pilot-updater stuck on v1.10.9 — three independent issues + +**Reported:** 2026-06-14 +**Re-audited:** 2026-06-14 against current code on `main` +**Severity:** medium (causes version skew); the `send-file` impact is a +separate, larger reliability bug — see `docs/PROPOSAL-reliable-file-transfer.md`. +**Components:** pilot-updater · release packaging · dataexchange / send-file + +## Summary + +The original bug report identified one phenomenon ("Node A pinned at v1.10.9 +while a fresh `install.sh` pulls v1.11.0") and proposed a single mechanism +(stale RSS changelog feed). Live audit against the code confirms the +phenomenon but disproves the mechanism. There are in fact **three +independent issues** wearing the same symptom: + +1. **The updater binary ships but is never started.** The install bundle + contains `pilot-updater` next to `pilot-daemon` and `pilotctl`, but + neither `install.sh` nor the daemon's normal startup launches it. +2. **`install.sh` references `pilot-gateway`, which is no longer in the + v1.11.0 bundle.** Causes a harmless-looking `cp: cannot stat … + pilot-gateway` line but signals stale install logic. +3. **`send-file` between any two nodes (skewed or not) has no reliability + primitives** — no ACK timeout, no integrity hash, no resume, no + progress. The 120s EOF in the original report is this bug, not version + skew. The skew is incidental; the same failure reproduces between two + v1.11.0 nodes. + +This document covers (1) and (2). (3) is the larger fix and lives in +`docs/PROPOSAL-reliable-file-transfer.md`. + +## Issue 1 — Updater shipped, never started + +### What the original doc said + +> *"a long-running install does not auto-update to the latest version … +> the changelog/update feed the updater reads looks stale"* + +### What the code actually does + +`updater/updater.go:244-249` builds the polling URL directly against the +GitHub API: + +```go +if tag == "" { + url = fmt.Sprintf("https://api.github.com/repos/%s/releases/latest", u.config.Repo) +} else { + url = fmt.Sprintf("https://api.github.com/repos/%s/releases/tags/%s", u.config.Repo, tag) +} +``` + +The RSS feed at `teoslayer.github.io/pilot-changelog` is **only** read by +the user-facing `pilotctl updates` command (`updates.go`) — it has no role +in the auto-update decision. The feed being stale is irrelevant to the +updater. + +### The real cause + +On the affected machine (Node A, macOS arm64): + +- `pilot-updater` binary is installed at `~/.pilot/bin/pilot-updater` + (mtime 2026-06-08, i.e. unchanged since first install — as expected if + it was never invoked). +- `ps -axo command | grep pilot-updater` returns **nothing**. +- The running `pilot-daemon` command line is + `pilot-daemon -identity … -socket … -listen … -hostname … -log-level info` + — no flag enables an in-process updater either. + +So the updater binary is sitting on disk being garbage-collected by Time +Machine, not polling GitHub at all. The install never set up a launchd +service / systemd unit / cron / supervisor for it, and the daemon does +not embed it. + +### Fix + +Three reasonable options, in increasing order of "proper": + +1. **Document the deferral.** Add a clear note to `install.sh` output + that the updater is provided but not auto-started; users opt in by + running `pilotctl daemon start --enable-updater` (which already exists + in `web4/cmd/pilotctl/main.go` per the `Daemon lifecycle` help). +2. **Auto-start the updater on install** by writing a launchd plist / + systemd unit alongside the daemon plist. `install.sh` already writes + the daemon plist; the same mechanism, scoped to the updater binary, + takes ~30 lines. +3. **Embed the updater in the daemon** as an opt-in `--enable-updater` + flag that spawns a goroutine running `updater.Service.Run`. This is + the long-term right answer — one process, one PID, one log stream — + but it changes the daemon contract. + +(2) is shippable in one PR; (3) is the eventual direction. + +## Issue 2 — `pilot-gateway` missing from v1.11.0 bundle + +### Observed + +```text +$ curl -sL .../v1.11.0/pilot-linux-amd64.tar.gz | tar tzf - +./ +./updater +./daemon +./pilotctl +``` + +No `pilot-gateway`. `install.sh` line that calls +`cp /tmp/pilot/pilot-gateway …` fails with the documented "cannot stat" +error, then proceeds. + +### Whose problem this is + +Either: + +- The release workflow (`release/`) dropped `pilot-gateway` from the + artifact list and `install.sh` wasn't updated. Fix: re-add to the + workflow or remove from `install.sh`, depending on intent. +- `pilot-gateway` was deliberately retired and `install.sh` is stale. Fix: + remove the line from `install.sh` and update operator docs. + +Inspecting `release/` and `install.sh` will resolve which. + +## Issue 3 — Send-file is unreliable by design + +See `docs/PROPOSAL-reliable-file-transfer.md`. Quick summary: the +"send-file hangs 120s then EOFs" failure in the original report is not +caused by version skew. It reproduces between two v1.11.0 nodes. The +underlying gaps are zero application-layer timeouts, no end-to-end +integrity hash, no resume protocol, atomic 256 MiB frames with no +streaming, and no progress reporting. The proposal lays out a +backward-compatible streaming protocol that addresses all of these. + +## Open questions to drive a fix PR + +- Is the intent for the updater to auto-run after install on all + platforms, or only when the operator opts in? (Answers whether option + 2 or 3 above is right.) +- Is `pilot-gateway` retired? If not, why did the v1.11.0 release skip + it? + +## Environment (audit re-run) + +| | Node A (laptop) | Node B (bench VM) | +|---|---|---| +| OS / arch | macOS arm64 | Ubuntu 24.04 amd64 | +| pilot daemon | v1.10.9 | (fresh VM, no install yet) | +| install date | 2026-06-08 | n/a | +| `pilot-updater` process | **not running** | n/a | +| `~/.pilot/bin/.pilot-version` | `v1.10.9` | n/a | +| `~/.pilot/bin/pilot-updater` mtime | 2026-06-08 (unchanged) | n/a | +| latest GitHub release tag | `v1.11.0` (published 2026-06-10) | same | diff --git a/docs/PROPOSAL-reliable-file-transfer.md b/docs/PROPOSAL-reliable-file-transfer.md new file mode 100644 index 00000000..f8750242 --- /dev/null +++ b/docs/PROPOSAL-reliable-file-transfer.md @@ -0,0 +1,197 @@ +# Proposal: reliable file transfer for Pilot + +**Status:** design + initial implementation in progress +**Owner:** `dataexchange` package · `web4/cmd/pilotctl/main.go::cmdSendFile` +· `web4/pkg/daemon/...` (receiver path) +**Context:** the original BUG-updater-version-skew report's "send-file +hangs ~120s then EOFs" is not version skew; it is the unmodified send-file +behavior. This proposal lays out what reliable means and how to get there +without breaking compatibility with v1.10.x receivers. + +## Problem statement, in one paragraph + +`pilotctl send-file` today sends a file as a **single 256 MiB-capped +atomic frame** (`dataexchange/dataexchange.go:61`), reads the entire file +into memory on both sides (`os.ReadFile` on sender, `make([]byte, length) ++ io.ReadFull` on receiver), encrypts the whole thing once at the tunnel +layer, waits for a literal string ACK (`"ACK FILE N bytes"`), with **no +application-layer timeouts anywhere** — neither the sender's `client.Recv()` +nor the receiver's `ReadFrame()` has a deadline. The receiver writes the +file synchronously and only ACKs after the disk flush completes. There is +no end-to-end content hash, no resume protocol, no progress reporting, +and no per-chunk backpressure. The result is exactly what the bug report +saw: large transfers over any non-trivial path stall, and the sender hangs +until SO_KEEPALIVE finally trips at ~120s. + +## Goals (and non-goals) + +**Goals** + +1. Sender never hangs longer than a configurable timeout. +2. End-to-end integrity verified (not just per-tunnel AEAD). +3. Streaming on both sides — sender does not load the whole file into + memory, receiver writes incrementally. +4. Progress visible to the sender during transfer (for human and agent + UX). +5. Resume after a dropped transfer. +6. Backward compatible with v1.10.x receivers (which only understand + `TypeFile`). +7. Concurrent transfers to the same peer work without interfering. + +**Non-goals (this round)** + +- Compression. Pilot already runs over GCM; compressing inside that + is a separate optimization. +- Multipath / parallel-stream transfers. The overlay handles fan-out at + a lower layer. +- Files > 100 GiB. Real but a separate scope. + +## Design — three layers + +### Layer 1 — `TypeFileStream` frame (new wire type) + +Add a new dataexchange frame type alongside the existing `TypeFile`: + +```go +const ( + TypeText uint32 = 1 + TypeJSON uint32 = 2 + TypeBinary uint32 = 3 + TypeFile uint32 = 4 + TypeTrace uint32 = 5 + // TypeAutoAnswer is reserved (see Alex's reply-on-conn PR chain). + TypeFileStream uint32 = 7 // new — see below +) +``` + +`TypeFileStream` frames are **small**: a header, a control byte, and at +most 1 MiB of payload. The header carries: + +- `transfer_id` (16 bytes random) — disambiguates concurrent transfers +- `kind` (1 byte) — one of: + - `0x01 INIT`: filename, total size, SHA-256 of full content, sender's + declared chunk size + - `0x02 CHUNK`: offset (uint64), payload bytes, SHA-256 of this chunk + - `0x03 ACK`: highest contiguous offset received (uint64) + - `0x04 DONE`: end-of-stream marker; receiver verifies full SHA-256 + - `0x05 ABORT`: error code + human reason + - `0x06 RESUME`: receiver tells sender "I have through offset N for + transfer T" — only valid as a response to a re-INIT with same + `transfer_id` + +Wire format for each kind is straightforward fixed-prefix + payload. +Frames are still bounded by `MaxFrameSize` but in practice stay <1 MiB. + +### Layer 2 — sender + receiver state machines + +**Sender** (`pilotctl send-file` → `client.SendFileStream`): + +```text +1. Open file, compute total size, stream-hash SHA-256 of full content. +2. INIT frame with filename, size, full-hash, chunk_size = 256 KiB. +3. Loop: read chunk_size bytes; send CHUNK(offset, payload, chunk_hash). +4. Sliding window: keep at most `window` chunks unacked (default 16 → + 4 MiB in flight). Block reading next chunk if window full. +5. On receiving ACK(offset N): advance window's left edge to N, free + read-buffer space. +6. After last chunk: DONE. +7. Receive final ACK from receiver carrying full SHA-256 verification. +8. Timeouts at every Recv() — default 30s, exposed as `--timeout`. +``` + +**Receiver** (`dataexchange/service.go::handleConn` extended for +`TypeFileStream`): + +```text +1. INIT: open `~/.pilot/received/.partial/{transfer_id}` (atomic rename + on DONE). Start a per-transfer goroutine; register `transfer_id` in + an in-memory map. +2. CHUNK: verify chunk SHA-256; write to file at offset (pwrite); update + "highest contiguous offset" cursor. Send ACK with new cursor. +3. If a chunk arrives out of order: hold it in a small bounded buffer + (default 16 chunks = 4 MiB) and write when its predecessor lands. +4. DONE: hash-verify full file; rename .partial → final name; ACK with + success or ABORT with mismatch. +5. ABORT from sender: close transfer, keep .partial for inspection, + schedule cleanup after 1h. +6. Timeouts at every ReadFrame() — default 60s per chunk, transfer-wide + default 1h. Stalled transfers leave the .partial on disk for resume. +``` + +**Resume** — sender on retry sends INIT with the same `transfer_id`. If +receiver still has the `.partial`, it replies RESUME(offset N). Sender +seeks to N and continues from there. SHA-256 is recomputed on the +receiver side over the .partial file to detect corruption before resume. + +### Layer 3 — backward compatibility + +The sender starts every transfer by trying `TypeFileStream` first. If +the receiver is v1.10.x: + +- It receives a `TypeFileStream` frame, hits `default` in the frame-type + switch (`dataexchange/service.go:165` or thereabouts), and either drops + the connection or saves to inbox as unknown. +- Sender's INIT does not get an INIT-ACK within the negotiation timeout + (3 seconds, configurable via `--legacy-timeout`). +- Sender falls back to the old `TypeFile` atomic send, **but now with a + timeout on the ACK Recv** (the smallest concession we make to the old + path). + +This means v1.11.x→v1.10.x transfers still work (legacy path); 11.x→11.x +gets the full reliability story; 10.x→11.x continues to work (receiver +just gets an unknown frame and the sender follows up with TypeFile per +the existing protocol). + +## Implementation milestones + +| | Scope | Surface | +|---|---|---| +| **M0** | Sender + receiver timeouts on the existing `TypeFile` path. No wire-format change. End-to-end SHA-256 sent as a sidecar in the ACK string ("ACK FILE N bytes sha256=…"). Sender verifies. | `dataexchange/service.go`, `dataexchange/client.go`, `web4/cmd/pilotctl/main.go` | +| **M1** | New `TypeFileStream` wire type + sender state machine + receiver state machine. No resume. No progress UX yet. | `dataexchange/*`, `web4/cmd/pilotctl/main.go` | +| **M2** | Progress reporting via stderr (TTY-only) using the existing `startWaitProgress` helper — currently just elapsed; extend with bytes-sent/bytes-total. | `web4/cmd/pilotctl/style.go`, `main.go` | +| **M3** | Resume protocol. RESUME frame + receiver-side `.partial` files + sender-side seek-and-continue. | `dataexchange/*`, `web4/cmd/pilotctl/main.go` | +| **M4** | Streaming I/O end-to-end (no whole-file allocations on either side). | `dataexchange/*` | +| **M5** | Backward-compat negotiation (auto-fall-back to `TypeFile` when peer doesn't ACK INIT). | `web4/cmd/pilotctl/main.go` | +| **M6** | Tests (unit, integration with a fake-network jitter, concurrent transfers, resume after crash, large file) — on a controllable VM. | `dataexchange/zz_*_test.go`, `tests/` | + +## Open design questions + +- **Window size default** — 16 chunks × 256 KiB = 4 MiB in flight feels + right for relay paths; benchmarks on the VM will calibrate. +- **Chunk size negotiation** — sender's INIT proposes; should receiver + be allowed to counter-propose smaller? (Mobile receivers, e.g.) +- **`.partial` retention policy** — 1 hour for now, or "until sender + retries"? Latter is risk of orphan files; former is risk of "I came + back tomorrow and resume doesn't work." +- **Integrity hash algorithm** — SHA-256 is the conservative pick. + BLAKE3 is faster on the wire-format hashing path; not free, and + Pilot has no other dependency on it. Picking SHA-256 unless a + benchmark says otherwise. + +## Verification + +Bench rig already provisioned: `pilot-sendfile-bench-1781448324` in +`us-central1-a` (e2-standard-4 / 15 GB RAM / 50 GB pd-ssd). It can +serve as Node B for paired tests against this laptop. The bench +matrix: + +| Test | Size | Path | Pass criteria | +|---|---|---|---| +| Baseline TypeFile | 1 / 10 / 100 / 256 MiB | both nodes v1.10.x | unchanged behavior, no regressions | +| Stream happy path | 1 / 10 / 100 / 1024 / 4096 MiB | both v1.11.x stream | within 90% of raw `scp` throughput on the same path | +| Stream resume | 1024 MiB | kill receiver at 50%, restart, resume | completes with single SHA-256 verify, no re-transfer of received bytes | +| Concurrent | 5 × 100 MiB simultaneous | both v1.11.x | all five complete; aggregate throughput within 80% of one-at-a-time × 5 | +| Sender timeout | 100 MiB, `kill -9` receiver mid-transfer | mixed | sender exits within `--timeout` (default 30 s) with a clear error and `pilot-mcp doctor`-style hint | +| Backward compat | 100 MiB v1.11.x sender → v1.10.x receiver | mixed | falls back to TypeFile, completes, no error logs on either side | + +## Next steps + +1. Land M0 (timeouts + sidecar SHA-256) as a small PR against + `TeoSlayer/pilotprotocol`. This alone closes the "120s hang" report + without any wire-format change. +2. Open a tracking issue with this proposal linked. +3. Stand up M1 behind a `--stream` flag on `pilotctl send-file` so the + new path can be exercised against the bench VM without changing the + default behavior. +4. Once M1 is stable in the wild, flip the default and start landing + M2…M5. diff --git a/docs/cli-reference.md b/docs/cli-reference.md index be86b6f1..f8495825 100644 --- a/docs/cli-reference.md +++ b/docs/cli-reference.md @@ -61,7 +61,7 @@ Management commands: pilotctl disconnect Mailbox: - pilotctl received [--clear] + pilotctl received [--limit ] [--since ] [--clear [--before ]] pilotctl inbox [--clear] Service Agents: diff --git a/go.mod b/go.mod index 2adb9a39..b0bf241f 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,8 @@ require ( github.com/coder/websocket v1.8.14 github.com/pilot-protocol/app-store v1.0.1-beta.1.0.20260609061942-8852c785a264 github.com/pilot-protocol/beacon v0.2.6 - github.com/pilot-protocol/common v0.4.8 - github.com/pilot-protocol/dataexchange v0.2.0 + github.com/pilot-protocol/common v0.4.9-0.20260615113553-d5cbbfb3e5b6 + github.com/pilot-protocol/dataexchange v0.2.1-beta.1.0.20260615113607-fac933edea98 github.com/pilot-protocol/eventstream v0.2.2 github.com/pilot-protocol/handshake v0.2.1 github.com/pilot-protocol/nameserver v0.2.1 diff --git a/go.sum b/go.sum index 995da99e..2c02acd1 100644 --- a/go.sum +++ b/go.sum @@ -14,8 +14,12 @@ github.com/pilot-protocol/beacon v0.2.6 h1:grxwaVyPRUT0W6coyjYfNkO0rpzOIrwrKn94S github.com/pilot-protocol/beacon v0.2.6/go.mod h1:I/UhEv097g1z/qtAVDZbEhf3R5tzM0Dp71vGHah52A4= github.com/pilot-protocol/common v0.4.8 h1:eS2Bc+XcZWJ/qhwwOZbXwIWhtNdOijuoEp716kQE+/c= github.com/pilot-protocol/common v0.4.8/go.mod h1:yrAwPXGVMbXU+SADvOCmbdXjK/wJ3uA0KshyLvRlej4= +github.com/pilot-protocol/common v0.4.9-0.20260615113553-d5cbbfb3e5b6 h1:Us3qSMPTBHPDQXFPY07BoUanriw1rVzS6SAHcbddqzY= +github.com/pilot-protocol/common v0.4.9-0.20260615113553-d5cbbfb3e5b6/go.mod h1:yrAwPXGVMbXU+SADvOCmbdXjK/wJ3uA0KshyLvRlej4= github.com/pilot-protocol/dataexchange v0.2.0 h1:ldE6AyrES1uvdnn1NBl0KZ7C+SSWNtmeHHU3CQhwSCo= github.com/pilot-protocol/dataexchange v0.2.0/go.mod h1:JVy2+hr/IjzMPshxjExbGO/4SbJTs7ZJ7iYvT/ODF3Q= +github.com/pilot-protocol/dataexchange v0.2.1-beta.1.0.20260615113607-fac933edea98 h1:Bqgnf4CZC7aZJyDzz/E7agwXotArJg2FvFlNDqouhLo= +github.com/pilot-protocol/dataexchange v0.2.1-beta.1.0.20260615113607-fac933edea98/go.mod h1:tM9eyyruBdnxhhUtViasUjnAElwF/r5PQvCYKLdlTLY= github.com/pilot-protocol/eventstream v0.2.2 h1:E0IjveK7K+dsIbE/5hD3N821FkHzxVsx1tiAORMzt8k= github.com/pilot-protocol/eventstream v0.2.2/go.mod h1:gUjoMEItW1SRJYEq39VlcIeDe2LcE5B18/4bcaUJNrs= github.com/pilot-protocol/handshake v0.2.0 h1:uLeV8iNHcsHbcVH+GZ9p7uuIbObA8BReDByF5XGjzB8= diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 1e71c85c..493e3031 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -196,8 +196,13 @@ const ( MaxZeroWindowProbes = 30 ) -// RelayProbeInterval is how often we probe relay-flagged peers for direct connectivity. -const RelayProbeInterval = 5 * time.Minute +// RelayProbeInterval is how often we attempt to upgrade relay-flagged peers +// to a direct path via coordinated hole-punching. Was 5m (far too slow — a +// peer stuck on relay stayed there for minutes); tightened to 15s so a peer +// that becomes direct-capable (or one that briefly flipped to relay under +// load) is re-punched promptly. The punch + probe is cheap and only runs for +// peers still flagged relay, so it self-limits once a peer is promoted. +const RelayProbeInterval = 15 * time.Second // EndpointCacheTTL is how long a cached endpoint is considered fresh. // After this, the entry is stale but still usable as a fallback. @@ -5204,32 +5209,89 @@ func (d *Daemon) relayProbeLoop() { case <-d.stopCh: return case <-ticker.C: - relayPeers := d.tunnels.RelayPeerIDs() - for _, nodeID := range relayPeers { - // P1-010 fix: send a targeted direct probe without flipping - // the relay flag. If the peer's direct path has recovered, the - // response will arrive on tm.sock from their real address; - // handleEncrypted auto-clears relay mode on a successful - // direct decrypt. Concurrent traffic (key exchange replies, - // retransmits) continues going via relay during the probe. - probe := &protocol.Packet{ - Version: protocol.Version, - Flags: protocol.FlagACK, - Protocol: protocol.ProtoControl, - Src: d.Addr(), - Dst: protocol.Addr{Network: 0, Node: nodeID}, - SrcPort: protocol.PortPing, - DstPort: protocol.PortPing, - Seq: 1, - } - if err := d.tunnels.SendDirectProbe(nodeID, probe); err != nil { - slog.Debug("relay direct-probe skipped", "node_id", nodeID, "error", err) - } + for _, nodeID := range d.tunnels.RelayPeerIDs() { + go d.tryDirectUpgrade(nodeID) } } } } +// tryDirectUpgrade attempts to promote a relay-flagged peer to a direct path +// via coordinated NAT hole-punching, then verifies on the rig that the path +// actually carries traffic. +// +// Why this exists: the old relayProbeLoop sent a single one-way SendDirectProbe +// and assumed the direct path "had recovered". Through a stateful firewall/NAT +// a one-way probe is dropped — there is no conntrack pinhole until BOTH peers +// send. So a relay tunnel never upgraded unless the peer was already publicly +// reachable. Here we (1) coordinate a simultaneous punch via the beacon to +// open the pinhole on both NATs, then (2) push several encrypted probes at the +// peer's REAL address (not the beacon placeholder) so the peer's +// ClearRelayOnDirect promotes the path (DirectClearsRequired=3 direct decrypts). +func (d *Daemon) tryDirectUpgrade(nodeID uint32) { + // Only act on peers we have authoritative resolve info for. Punching a + // relay-only peer would leak its real IP via the beacon PunchCommand + // (see establishConnection), so require resolve info that says the peer + // is NOT relay-only before we punch or probe. + resp, ok := d.cachedResolve(nodeID) + if !ok { + // A relay tunnel established via beacon discovery never populated + // the resolve cache. Resolve fresh so we can target the peer's real + // address; without this the upgrade can never start. + if d.regConn == nil { + return + } + r, err := d.regConn.Resolve(nodeID, d.NodeID()) + if err != nil { + return + } + d.cacheResolve(nodeID, r) + resp = r + } + if relayOnly, _ := resp["relay_only"].(bool); relayOnly { + return + } + realAddrStr, _ := resp["real_addr"].(string) + if realAddrStr == "" { + return + } + realAddr, err := net.ResolveUDPAddr("udp", realAddrStr) + if err != nil { + return + } + + // A prior blackhole flip may have PINNED the peer to relay; ClearRelayOnDirect + // will not promote a pinned peer. Unpin it — we've confirmed above it is not + // registry relay-only, so direct is allowed to win on its merits. + if d.tunnels.IsRelayPinned(nodeID) { + d.tunnels.SetRelayPeerPinned(nodeID, false) + } + + // Coordinate the punch (opens the conntrack pinhole on both NATs). + d.tunnels.RequestHolePunch(nodeID) + + probe := &protocol.Packet{ + Version: protocol.Version, + Flags: protocol.FlagACK, + Protocol: protocol.ProtoControl, + Src: d.Addr(), + Dst: protocol.Addr{Network: 0, Node: nodeID}, + SrcPort: protocol.PortPing, + DstPort: protocol.PortPing, + Seq: 1, + } + // Give the punch a moment to land, then probe the real address a few + // times so at least DirectClearsRequired (=3) direct decrypts arrive + // while the pinhole is open. + for i := 0; i < 5; i++ { + time.Sleep(200 * time.Millisecond) + if err := d.tunnels.SendDirectProbeTo(nodeID, realAddr, probe); err != nil { + slog.Debug("direct upgrade probe skipped", "node_id", nodeID, "error", err) + return + } + } +} + // --------------------------------------------------------------------------- // Network membership reconciliation: periodic delta detection vs the // registry's authoritative membership list. The reconciler is L7-only: diff --git a/pkg/daemon/ipc.go b/pkg/daemon/ipc.go index 0b4fb1b7..a0b21d8f 100644 --- a/pkg/daemon/ipc.go +++ b/pkg/daemon/ipc.go @@ -61,6 +61,15 @@ const ( CmdRotateKeyOK byte = 0x26 CmdBroadcast byte = 0x29 CmdBroadcastOK byte = 0x2A + // CmdPreferDirect asks the daemon to drop the existing tunnel and any + // sticky routing state for a peer, then re-resolve fresh — preferring + // a direct UDP path on the next dial. See routing/routing.go for the + // state being reset (relayPeers, relayPinned, blackholeMissCount, + // directClearCount, lastDirectRecv). The handler is best-effort: on + // the next dial the daemon will still fall back to relay if the + // direct path doesn't recover within the normal retry budget. + CmdPreferDirect byte = 0x2D + CmdPreferDirectOK byte = 0x2E // CmdCancel: driver → daemon, "abandon the in-flight request that // I sent under reqID X". The reqID embedded in the envelope header // IS NOT the reqID being cancelled — that's encoded in the body @@ -749,6 +758,8 @@ func (s *IPCServer) dispatch(conn *ipcConn, cmd byte, reqID uint64, payload []by s.handleManaged(conn, reqID, payload) case CmdRotateKey: s.handleRotateKey(conn, reqID) + case CmdPreferDirect: + s.handlePreferDirect(conn, reqID, payload) default: s.sendError(conn, reqID, fmt.Sprintf("unknown command: 0x%02X", cmd)) } @@ -1077,6 +1088,152 @@ func (s *IPCServer) handleHealth(conn *ipcConn, reqID uint64) { } } +// handlePreferDirect drops the tunnel + sticky routing state for a peer +// so the next dial attempts a fresh direct path. The most common stuck +// shape this addresses: pilotctl ping works (small UDP through +// the beacon relay) but pilotctl send-file hangs because the +// established relay-mediated tunnel can't sustain stream traffic and +// nothing in the dial loop ever retries direct. +// +// The reset is idempotent and best-effort: +// - existing connections to the peer are NOT torn down by this call; +// callers should retry the dial themselves after seeing OK +// - if the registry returns relay_only=true on the subsequent +// ensureTunnel, the peer simply re-pins to relay and the reply +// to the caller will show pinned=true so they know not to retry +// - if the peer is genuinely behind a symmetric NAT, the direct +// dial will fail again and the routing.go retry-and-flip logic +// will fall back to relay within the normal DialDirectRetries budget +// +// Reply payload (JSON): +// +// { +// "node_id": , +// "had_tunnel": bool, // peer was active in TunnelManager +// "was_relay_active": bool, // relay flag was set +// "was_relay_pinned": bool, // relay flag was pinned (authoritative) +// } +// +// On reply, the calling driver can immediately retry whatever dial it +// was doing — the daemon's tunnel state is now ready to accept a fresh +// path probe. +func (s *IPCServer) handlePreferDirect(conn *ipcConn, reqID uint64, payload []byte) { + if len(payload) != 4 { + s.sendError(conn, reqID, "prefer_direct: payload must be 4 bytes (node_id)") + return + } + nodeID := binary.BigEndian.Uint32(payload) + + d := s.daemon + hadTunnel := d.tunnels.HasPeer(nodeID) + wasRelayActive := d.tunnels.IsRelayPeer(nodeID) + wasRelayPinned := d.tunnels.IsRelayPinned(nodeID) + + // Unpin relay so the next successful direct receive can promote the + // peer out via the routing layer's ClearRelayOnDirect path. We do NOT + // unset the active relay flag — that would cause our proactive PILA + // below to be sprayed at the peer's direct (NAT'd, unreachable) + // endpoint and lost. Keeping relay active means the PILA reaches + // the peer through the beacon, restores crypto state, and the + // routing layer can then promote the path on its own merits. + // + // Reproduces the Mac↔GCP-VM dual-NAT case where direct never works: + // unsetting relay there forced the next dial onto a dead direct + // path and stalled send-file at the dataexchange timeout (~17 s) + // before the routing layer's silence detector could re-flip. + if wasRelayPinned { + d.tunnels.SetRelayPeerPinned(nodeID, false) + } + + // Drop the cached resolve / endpoint so the next ensureTunnel hits + // the registry fresh. Without this, ensureTunnel would short-circuit + // on HasPeer=true and reuse the existing (relay-mediated) tunnel. + d.forgetPeerResolution(nodeID) + + // Drop the tunnel + per-peer state so ensureTunnel re-runs the full + // resolve + punch flow on the next dial. RemovePeer is safe under + // concurrent traffic — see TunnelManager.RemovePeer's docstring for the + // per-peer metadata it cleans up. + // + // SIDE EFFECT we have to undo: RemovePeer calls routing.RemovePeer, + // which wipes relayPeers and relayPinned. For a peer where the only + // working transport is relay (the Mac↔GCP-VM dual-NAT case), this is + // catastrophic — the proactive PILA we push at the end of this handler + // would then be sprayed at the peer's direct address and lost. We + // captured the original relay state above; re-apply it after the wipe + // so writeFrame still picks the working path. + if hadTunnel { + d.tunnels.RemovePeer(nodeID) + } + if wasRelayActive { + d.tunnels.SetRelayPeer(nodeID, true) + // Note: we deliberately do NOT restore relayPinned. The point of + // prefer-direct is to give the next direct receive a chance to + // promote the path out of relay; restoring the pin would defeat + // that. Relay active without pin = "use relay for now, but a + // direct packet from this peer will demote the relay flag". + } + + // Clear the per-peer cooldowns that survive RemovePeer. + // + // rekeyMu.lastRekeyReq lives on TunnelManager and is NOT touched by + // tunnels.RemovePeer. Without this, the next "encrypted packet but no + // key" from the peer hits the 3-second gate (rekeyRequestInterval) + // and silently skips the PILA reply. That blank window is exactly + // when send-file is about to dial — losing it means another retransmit + // cycle before recovery. + d.tunnels.ClearLastRekeyReq(nodeID) + // rekeyGaveUp similarly survives RemovePeer (kx.RemovePeer doesn't + // clear it because giving up doesn't imply the peer is gone). After + // an operator-initiated reset, treat the peer as fresh — let + // MarkPendingRekey arm a new retransmit cycle without the 5-second + // cooldown delay. + d.tunnels.ClearRekeyGaveUp(nodeID) + + // Proactively push a fresh PILA. Without this, recovery waits for the + // peer to send us their next packet, which can take up to the peer's + // own keepalive cadence (currently ~30s in the worst case). Doing it + // from here makes prefer-direct recovery deterministic: by the time + // the IPC reply lands, our PILA has already left the wire. + // + // Sequence: + // 1. ensureTunnel re-resolves the peer's address from the registry + // (RemovePeer just wiped tm.peers, so sendKeyExchangeToNode would + // otherwise have no destination and silently no-op). + // 2. sendKeyExchangeToNode builds the signed PILA and writes it + // out via tm.writeFrame, picking direct vs relay based on the + // freshly-set routing state. + // + // Both steps are best-effort. If the registry is unreachable or the + // peer has no public endpoint registered, we still report tunnel + // state was reset and the caller can fall back to relay-pinned + // behaviour on retry. + pilaPushed := false + resolveErr := "" + if err := d.ensureTunnel(nodeID); err != nil { + resolveErr = err.Error() + } else { + d.tunnels.sendKeyExchangeToNode(nodeID) + pilaPushed = true + } + + data, err := json.Marshal(map[string]interface{}{ + "node_id": nodeID, + "had_tunnel": hadTunnel, + "was_relay_active": wasRelayActive, + "was_relay_pinned": wasRelayPinned, + "pila_pushed": pilaPushed, + "resolve_error": resolveErr, + }) + if err != nil { + s.sendError(conn, reqID, fmt.Sprintf("prefer_direct marshal: %v", err)) + return + } + if err := conn.writeReply(CmdPreferDirectOK, reqID, data); err != nil { + slog.Debug("IPC prefer_direct reply failed", "err", err) + } +} + func (s *IPCServer) handleResolveHostname(conn *ipcConn, reqID uint64, payload []byte) { hostname := string(payload) if hostname == "" { diff --git a/pkg/daemon/routing/writeframe.go b/pkg/daemon/routing/writeframe.go index 60b27d21..8044933a 100644 --- a/pkg/daemon/routing/writeframe.go +++ b/pkg/daemon/routing/writeframe.go @@ -82,3 +82,33 @@ func (m *Manager) WriteFrame(nodeID uint32, addr *net.UDPAddr, frame []byte, cou _, _ = m.HandleSendError(nodeID, err) return SendOutcome{}, err } + +// SendRelayFrame sends a frame to a peer via the beacon relay path +// unconditionally — ignoring the per-peer relay flag and the blackhole +// heuristic. Used by the key-exchange convergence path to guarantee a +// dual-NAT peer receives the PILA via relay even before its relay flag +// has been set. Without this, two peers both behind NAT only reconverge +// after slow blackhole detection flips the direct path to relay +// (measured 28s–3min on the Mac↔GCP-VM rig). Returns ErrNoAddress if no +// beacon is configured. Does not bump the per-peer outbound-send +// timestamp (this is an out-of-band copy, not the primary path). +func (m *Manager) SendRelayFrame(nodeID uint32, frame []byte) error { + m.mu.RLock() + bAddr := m.beaconAddr + sock := m.sock + m.mu.RUnlock() + if bAddr == nil { + return ErrNoAddress + } + if sock == nil { + return fmt.Errorf("routing: socket not set") + } + // MsgRelay: [0x05][senderNodeID(4)][destNodeID(4)][frame...] + msg := make([]byte, 1+4+4+len(frame)) + msg[0] = protocol.BeaconMsgRelay + binary.BigEndian.PutUint32(msg[1:5], m.localNodeID()) + binary.BigEndian.PutUint32(msg[5:9], nodeID) + copy(msg[9:], frame) + _, err := sock.Send(msg, bAddr) + return err +} diff --git a/pkg/daemon/tunnel.go b/pkg/daemon/tunnel.go index c7b864c2..3433f0ec 100644 --- a/pkg/daemon/tunnel.go +++ b/pkg/daemon/tunnel.go @@ -407,6 +407,25 @@ func (tm *TunnelManager) pruneRekeyBudgetLocked(now time.Time) bool { return len(tm.lastRekeyReq) < maxRekeyRequesters } +// ClearRekeyGaveUp lifts the per-peer give-up cooldown so a fresh rekey +// cycle can start immediately. Thin shim over keyexchange.Manager so the +// IPC layer (pkg/daemon/ipc.go handlePreferDirect) can reach it without +// importing the keyexchange package directly. +func (tm *TunnelManager) ClearRekeyGaveUp(peerNodeID uint32) { + tm.kx.ClearRekeyGaveUp(peerNodeID) +} + +// ClearLastRekeyReq drops the per-peer rate-limit timestamp recorded by +// maybeRequestRekey. After a forced reset (pilotctl prefer-direct) the +// next "encrypted packet but no key" event must be allowed to fire a +// fresh PILA immediately — without this, the 3-second gate can silently +// swallow the first packet that would otherwise re-establish the tunnel. +func (tm *TunnelManager) ClearLastRekeyReq(peerNodeID uint32) { + tm.rekeyMu.Lock() + delete(tm.lastRekeyReq, peerNodeID) + tm.rekeyMu.Unlock() +} + // maybeRequestRekey conditionally sends a key-exchange to a peer that sent us // an encrypted packet we can't decrypt. Rate-limited per peer. Returns true if // we actually sent one. @@ -1419,6 +1438,33 @@ func (tm *TunnelManager) deriveSecret(peerPubKeyBytes []byte) (*peerCrypto, erro // BOOTSTRAP-EXCEPTION marker — moved with the function). func (tm *TunnelManager) sendKeyExchangeToNode(peerNodeID uint32) { tm.kx.SendKeyExchangeToNode(peerNodeID) + + // Dual-NAT convergence fix: when a peer is not (yet) relay-flagged but + // a beacon is available, ALSO push the key-exchange via relay. For two + // peers both behind NAT the direct PILA never lands, and the tunnel + // only reconverges after slow blackhole detection flips the path to + // relay — measured 28s–3min on the Mac↔GCP-VM rig, far longer than the + // dial/send timeouts. The relay copy converges in ~1 RTT. This is a + // no-op once the peer is relay-flagged (the primary send already went + // via relay), and relayProbeLoop keeps probing direct so a genuine + // direct path still upgrades the peer out of relay. Best-effort: a + // failed relay copy just falls back to the existing slow path. + if tm.routing.IsRelayPeer(peerNodeID) || tm.routing.BeaconAddr() == nil { + return + } + var frame []byte + if tm.kx.HasIdentity() { + frame = tm.kx.BuildAuthFrame() + } + if frame == nil { + frame = tm.kx.BuildUnauthFrame() + } + if frame == nil { + return + } + if err := tm.routing.SendRelayFrame(peerNodeID, frame); err != nil { + slog.Debug("kx relay-copy send failed", "peer_node_id", peerNodeID, "error", err) + } } // markPendingRekey is the legacy shim for keyexchange.Manager.MarkPendingRekey. @@ -1627,6 +1673,47 @@ func (tm *TunnelManager) SendDirectProbe(nodeID uint32, pkt *protocol.Packet) er return werr } +// SendDirectProbeTo sends an encrypted probe to an EXPLICIT address rather +// than the stored peers[] entry. This is the relay→direct upgrade primitive: +// a relay-flagged peer's stored address is the beacon placeholder, so +// SendDirectProbe can't reach its real endpoint — but after a coordinated +// hole-punch we know the peer's real reflexive address (from the registry +// resolve) and the NAT conntrack pinhole is open. The encrypted probe that +// lands there triggers the peer's ClearRelayOnDirect, promoting the path to +// direct. Caller is responsible for having punched first (RequestHolePunch) +// so the pinhole is open. Best-effort; returns the send error. +func (tm *TunnelManager) SendDirectProbeTo(nodeID uint32, addr *net.UDPAddr, pkt *protocol.Packet) error { + if addr == nil { + return fmt.Errorf("nil addr for node %d", nodeID) + } + if tm.routing.IsFromBeacon(addr) { + return fmt.Errorf("addr for node %d is beacon placeholder", nodeID) + } + pc := tm.envelope.Get(nodeID) + data, err := pkt.Marshal() + if err != nil { + return fmt.Errorf("marshal: %w", err) + } + var frame []byte + if tm.encrypt { + if pc == nil || !pc.Ready { + return fmt.Errorf("no crypto for node %d", nodeID) + } + frame = tm.encryptFrame(pc, data) + } else { + frame = make([]byte, 4+len(data)) + copy(frame[0:4], protocol.TunnelMagic[:]) + copy(frame[4:], data) + } + n, werr := tm.sock.Send(frame, addr) + if werr == nil { + atomic.AddUint64(&tm.PktsSent, 1) + atomic.AddUint64(&tm.BytesSent, uint64(n)) + tm.routing.RecordOutboundSend(nodeID, time.Now()) + } + return werr +} + // clearRelayOnDirectLocked is the legacy shim for tests that drive // clear-on-direct directly. Delegates to routing.Manager.ClearRelayOnDirect. func (tm *TunnelManager) clearRelayOnDirectLocked(peerNodeID uint32, from *net.UDPAddr) bool {