diff --git a/cmd/pilotctl/appstore.go b/cmd/pilotctl/appstore.go index eac13755..ad8931d3 100644 --- a/cmd/pilotctl/appstore.go +++ b/cmd/pilotctl/appstore.go @@ -32,6 +32,7 @@ import ( "strings" "time" + "github.com/TeoSlayer/pilotprotocol/pkg/telemetry" "github.com/pilot-protocol/app-store/pkg/ipc" "github.com/pilot-protocol/app-store/pkg/manifest" "github.com/pilot-protocol/common/crypto" @@ -1209,6 +1210,36 @@ func cmdAppStoreInstall(args []string) { Reason: reason, }) + // Emit a telemetry event for the successful install (consent-gated — + // no-op when PILOT_TELEMETRY_URL is empty or identity.json is absent). + // Best-effort: a send failure is logged but not fatal — the install + // itself already succeeded on disk. + { + url := os.Getenv("PILOT_TELEMETRY_URL") + if url == "" { + url = telemetry.DefaultEndpoint + } + sourceStr := "catalogue" + if source == installSourceLocal { + sourceStr = "local" + } + payload, _ := json.Marshal(map[string]string{ + "app_id": m.ID, + "version": m.AppVersion, + "source": sourceStr, + }) + identityPath := configDir() + "/identity.json" + client := telemetry.NewClientFromIdentity(url, identityPath, 0) + err := client.Send(telemetry.Event{ + Kind: "app_installed", + TS: time.Now().UTC().Format(time.RFC3339), + Payload: payload, + }) + if err != nil { + slog.Warn("telemetry send failed, install still successful", "app", m.ID, "err", err) + } + } + report := installReport{ AppID: m.ID, AppVersion: m.AppVersion, diff --git a/pkg/telemetry/client.go b/pkg/telemetry/client.go new file mode 100644 index 00000000..7cc1e3c1 --- /dev/null +++ b/pkg/telemetry/client.go @@ -0,0 +1,195 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +// Package telemetry provides a consent-gated telemetry client that signs +// event POSTs with the node's Ed25519 identity and sends them to a +// configured telemetry endpoint. When the consent flag is off (empty URL), +// the client is a hard no-op: no dial, no buffering, no goroutines. +package telemetry + +import ( + "bytes" + "crypto/ed25519" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + "strconv" + "strings" + "sync" + "time" + + "github.com/pilot-protocol/common/crypto" +) + +// DefaultEndpoint is the production telemetry ingestion URL. +const DefaultEndpoint = "https://telemetry.pilotprotocol.network/v1/events" + +// Canonical signing header names, matching the telemetry server's +// internal/sig contract. +const ( + HeaderTimestamp = "X-Pilot-Timestamp" + HeaderPubKey = "X-Pilot-Public-Key" + HeaderSignature = "X-Pilot-Signature" +) + +// Event is the wire shape sent to the telemetry endpoint. +type Event struct { + EventID string `json:"event_id"` + Kind string `json:"kind"` + TS string `json:"ts"` // RFC3339; empty = server defaults to receive time + NodeID int64 `json:"node_id,omitempty"` + Payload json.RawMessage `json:"payload"` +} + +// Client is a consent-gated telemetry sender. Zero value is a no-op. +type Client struct { + mu sync.Mutex + url string // empty = no-op + nodeID int64 // node ID included in events + sign signFunc // ed25519 signer (set via SetSigner) + pubKeyB string // base64-encoded public key + once sync.Once // lazy init guard + initErr error // capture init failures + disabled bool // true when url is empty +} + +type signFunc func(msg []byte) []byte + +// New creates a consent-gated telemetry client. +// When url is empty the client is a permanent no-op. +func New(url string, nodeID int64) *Client { + return &Client{ + url: strings.TrimSpace(url), + nodeID: nodeID, + disabled: strings.TrimSpace(url) == "", + } +} + +// NewClientFromIdentity creates a consent-gated telemetry client from an +// Ed25519 identity file on disk and a telemetry URL. When url is empty +// the client is a permanent no-op. Returns nil if the identity file does +// not exist (first run). +func NewClientFromIdentity(url, identityPath string, nodeID int64) *Client { + c := New(url, nodeID) + if c.disabled || url == "" { + return c + } + + id, err := crypto.LoadIdentity(identityPath) + if err != nil { + slog.Warn("telemetry: can't load identity, staying disabled", "path", identityPath, "err", err) + return c + } + if id == nil { + slog.Debug("telemetry: no identity file yet, staying disabled", "path", identityPath) + return c + } + + slog.Debug("telemetry: identity loaded, enabling client", "path", identityPath, + "pubkey", crypto.EncodePublicKey(id.PublicKey)) + c.SetSigner(id.Sign, crypto.EncodePublicKey(id.PublicKey)) + return c +} + +// SetSigner installs the Ed25519 signing function and the corresponding +// base64-encoded public key. Must be called before the first Send call. +// When signer is nil the client stays disabled. +func (c *Client) SetSigner(sign signFunc, pubKeyB64 string) { + c.mu.Lock() + defer c.mu.Unlock() + if sign == nil || pubKeyB64 == "" { + c.sign = nil + c.pubKeyB = "" + return + } + c.sign = sign + c.pubKeyB = pubKeyB64 +} + +// Send POSTs one or more events to the telemetry endpoint. Returns +// immediately (no-op) when the client is disabled (no consent) or +// no signer is configured. +// +// The request is Ed25519-signed with the node's identity, following +// the telemetry server's signing contract: +// - X-Pilot-Timestamp: unix seconds (decimal string) +// - X-Pilot-Public-Key: base64(std) of the 32-byte Ed25519 public key +// - X-Pilot-Signature: base64(std) of the Ed25519 signature over +// (timestamp + "\n" + body) +func (c *Client) Send(events ...Event) error { + c.mu.Lock() + disabled := c.disabled + url := c.url + sign := c.sign + pubKeyB := c.pubKeyB + c.mu.Unlock() + + if disabled || url == "" { + slog.Debug("telemetry: consent off, dropping events", "count", len(events)) + return nil + } + if sign == nil { + slog.Debug("telemetry: no signer configured, dropping events", "count", len(events)) + return nil + } + + if len(events) == 0 { + return nil + } + + body, err := json.Marshal(events) + if err != nil { + return fmt.Errorf("telemetry marshal: %w", err) + } + + ts := strconv.FormatInt(time.Now().Unix(), 10) + // Build signed message as ts + newline + body. + message := make([]byte, 0, len(ts)+1+len(body)) + message = append(message, ts...) + message = append(message, '\n') + message = append(message, body...) + sigB64 := base64.StdEncoding.EncodeToString(sign(message)) + + req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("telemetry new request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set(HeaderTimestamp, ts) + req.Header.Set(HeaderPubKey, pubKeyB) + req.Header.Set(HeaderSignature, sigB64) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("telemetry post: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode >= 300 { + respBody, _ := io.ReadAll(resp.Body) + return fmt.Errorf("telemetry server %s: %s", resp.Status, strings.TrimSpace(string(respBody))) + } + + // Drain body so the connection can be reused + _, _ = io.Copy(io.Discard, resp.Body) + return nil +} + +// SignMessage implements the signing contract directly, without an HTTP +// POST. Useful for tests and for components that want to sign arbitrary +// byte payloads. Returns (timestamp, pubKeyB64, signatureB64, error). +func SignMessage(priv ed25519.PrivateKey, body []byte) (ts, pubB64, sigB64 string, err error) { + if len(body) == 0 { + return "", "", "", fmt.Errorf("telemetry: cannot sign empty body") + } + pub := priv.Public().(ed25519.PublicKey) + ts = strconv.FormatInt(time.Now().Unix(), 10) + message := make([]byte, 0, len(ts)+1+len(body)) + message = append(message, ts...) + message = append(message, '\n') + message = append(message, body...) + sig := ed25519.Sign(priv, message) + return ts, base64.StdEncoding.EncodeToString(pub), base64.StdEncoding.EncodeToString(sig), nil +} diff --git a/pkg/telemetry/client_test.go b/pkg/telemetry/client_test.go new file mode 100644 index 00000000..42f6a85c --- /dev/null +++ b/pkg/telemetry/client_test.go @@ -0,0 +1,243 @@ +package telemetry + +import ( + "crypto/ed25519" + "crypto/rand" + "encoding/base64" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "sync" + "testing" + "time" +) + +// TestDisabledNoOp verifies that a client with an empty URL is a hard no-op. +func TestDisabledNoOp(t *testing.T) { + c := New("", 0) + if !c.disabled { + t.Fatal("expected disabled client") + } + if err := c.Send(Event{Kind: "test"}); err != nil { + t.Fatalf("Send on disabled client should not error: %v", err) + } +} + +// TestNewClientFromIdentityEmptyURL verifies that empty URL produces a no-op. +func TestNewClientFromIdentityEmptyURL(t *testing.T) { + c := NewClientFromIdentity("", "/nonexistent", 0) + if !c.disabled { + t.Fatal("expected disabled client with empty URL") + } +} + +// TestNewClientFromIdentityNoIdentity verifies that missing identity file +// produces a client without signer (first-run grace). +func TestNewClientFromIdentityNoIdentity(t *testing.T) { + dir := t.TempDir() + identityPath := filepath.Join(dir, "identity.json") + c := NewClientFromIdentity("https://example.com/telemetry", identityPath, 42) + if c.disabled { + t.Fatal("expected non-disabled client even without identity") + } + // Without a signer, Send should be a no-op + if err := c.Send(Event{Kind: "test"}); err != nil { + t.Fatalf("Send without signer should not error: %v", err) + } +} + +// TestNewClientFromIdentityBadPerms verifies that permissive file perms +// do not prevent identity loading (identity loading is best-effort). +func TestNewClientFromIdentityBadPerms(t *testing.T) { + dir := t.TempDir() + identityPath := filepath.Join(dir, "identity.json") + pub, priv, err := ed25519.GenerateKey(rand.Reader) + if err != nil { + t.Fatal(err) + } + id := map[string]any{ + "node_id": 42, + "public_key_base64": base64.StdEncoding.EncodeToString(pub), + "private_key_base64": base64.StdEncoding.EncodeToString(priv), + } + data, _ := json.Marshal(id) + if err := os.WriteFile(identityPath, data, 0644); err != nil { + t.Fatal(err) + } + _ = NewClientFromIdentity("https://example.com/telemetry", identityPath, 42) +} + +func edSign(priv ed25519.PrivateKey) func([]byte) []byte { + return func(msg []byte) []byte { return ed25519.Sign(priv, msg) } +} + +// TestSendWithSigner verifies that a configured client sends signed events. +func TestSendWithSigner(t *testing.T) { + var mu sync.Mutex + var receivedBody []byte + var receivedTS, receivedPub, receivedSig string + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + mu.Lock() + defer mu.Unlock() + receivedBody, _ = io.ReadAll(r.Body) + receivedTS = r.Header.Get(HeaderTimestamp) + receivedPub = r.Header.Get(HeaderPubKey) + receivedSig = r.Header.Get(HeaderSignature) + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + pub, priv, err := ed25519.GenerateKey(rand.Reader) + if err != nil { + t.Fatal(err) + } + + c := New(server.URL, 42) + c.SetSigner(edSign(priv), base64.StdEncoding.EncodeToString(pub)) + + payload, _ := json.Marshal(map[string]string{"app_id": "io.test.app", "version": "1.0.0", "source": "catalogue"}) + err = c.Send(Event{ + Kind: "app_installed", + TS: time.Now().UTC().Format(time.RFC3339), + Payload: payload, + }) + if err != nil { + t.Fatalf("Send failed: %v", err) + } + + mu.Lock() + defer mu.Unlock() + if receivedTS == "" { + t.Fatal("expected timestamp header") + } + if receivedPub == "" { + t.Fatal("expected public key header") + } + if receivedSig == "" { + t.Fatal("expected signature header") + } + if len(receivedBody) == 0 { + t.Fatal("expected body") + } +} + +// TestSendDropsEventsWhenNoSigner verifies that Send is a no-op without signer. +func TestSendDropsEventsWhenNoSigner(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Fatal("server should not receive request — no signer configured") + })) + defer server.Close() + + c := New(server.URL, 42) + // No SetSigner call + if err := c.Send(Event{Kind: "test"}); err != nil { + t.Fatalf("Send should be no-op without signer: %v", err) + } +} + +// TestSignMessage verifies the signing contract. +func TestSignMessage(t *testing.T) { + _, priv, err := ed25519.GenerateKey(rand.Reader) + if err != nil { + t.Fatal(err) + } + body := []byte(`{"test":"data"}`) + ts, pubB64, sigB64, err := SignMessage(priv, body) + if err != nil { + t.Fatalf("SignMessage failed: %v", err) + } + if ts == "" { + t.Fatal("expected non-empty timestamp") + } + if pubB64 == "" { + t.Fatal("expected non-empty public key") + } + if sigB64 == "" { + t.Fatal("expected non-empty signature") + } + pubBytes, err := base64.StdEncoding.DecodeString(pubB64) + if err != nil { + t.Fatalf("decode pubkey: %v", err) + } + pub := ed25519.PublicKey(pubBytes) + sigBytes, err := base64.StdEncoding.DecodeString(sigB64) + if err != nil { + t.Fatalf("decode sig: %v", err) + } + message := append([]byte(ts), '\n') + message = append(message, body...) + if !ed25519.Verify(pub, message, sigBytes) { + t.Fatal("signature verification failed") + } +} + +// TestMultipleEvents verifies sending multiple events in one POST. +func TestMultipleEvents(t *testing.T) { + var callCount int + var mu sync.Mutex + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + mu.Lock() + callCount++ + mu.Unlock() + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + pub, priv, err := ed25519.GenerateKey(rand.Reader) + if err != nil { + t.Fatal(err) + } + + c := New(server.URL, 42) + c.SetSigner(edSign(priv), base64.StdEncoding.EncodeToString(pub)) + + events := []Event{ + {Kind: "app_installed", TS: time.Now().UTC().Format(time.RFC3339)}, + {Kind: "app_uninstalled", TS: time.Now().UTC().Format(time.RFC3339)}, + } + if err := c.Send(events...); err != nil { + t.Fatalf("Send failed: %v", err) + } + if callCount != 1 { + t.Fatalf("expected 1 HTTP call, got %d", callCount) + } +} + +// TestSetSignerNil disables signing. +func TestSetSignerNil(t *testing.T) { + c := New("https://example.com", 0) + c.SetSigner(nil, "") + if err := c.Send(Event{Kind: "test"}); err != nil { + t.Fatalf("Send should be no-op after nil signer: %v", err) + } +} + +// TestServerError propagates. +func TestServerError(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("internal error")) + })) + defer server.Close() + + pub, priv, err := ed25519.GenerateKey(rand.Reader) + if err != nil { + t.Fatal(err) + } + + c := New(server.URL, 42) + c.SetSigner(edSign(priv), base64.StdEncoding.EncodeToString(pub)) + + err = c.Send(Event{Kind: "test", TS: time.Now().UTC().Format(time.RFC3339)}) + if err == nil { + t.Fatal("expected error from server 500") + } + if !strings.Contains(err.Error(), "500") { + t.Fatalf("expected error to mention status, got: %v", err) + } +}