diff --git a/cmd/daemon/appstore_adapter.go b/cmd/daemon/appstore_adapter.go index 9cbcdc8f..3aa29f1d 100644 --- a/cmd/daemon/appstore_adapter.go +++ b/cmd/daemon/appstore_adapter.go @@ -27,6 +27,7 @@ type appstoreAdapter struct { svc *appstore.Service telemetryURL string identityPath string + getNodeID func() int64 // called at emit time, after daemon has registered } // telemetryEmitter wraps the consent-gated telemetry client to satisfy @@ -34,7 +35,8 @@ type appstoreAdapter struct { // "app_usage" kind with the supervisor-provided fields as payload. // Best-effort: send errors are logged but never block the caller. type telemetryEmitter struct { - client *telemetry.Client + client *telemetry.Client + getNodeID func() int64 // called lazily; valid after daemon has registered } func (e *telemetryEmitter) Emit(ev appstore.TelemetryEvent) { @@ -45,9 +47,14 @@ func (e *telemetryEmitter) Emit(ev appstore.TelemetryEvent) { if err != nil { return } + var nodeID int64 + if e.getNodeID != nil { + nodeID = e.getNodeID() + } _ = e.client.Send(telemetry.Event{ Kind: "app_usage", TS: time.Now().UTC().Format(time.RFC3339), + NodeID: nodeID, Payload: payload, }) } @@ -59,7 +66,7 @@ func (a *appstoreAdapter) Start(ctx context.Context, deps coreapi.Deps) error { // When the URL is empty or identity is absent the client is a // permanent no-op — the emitter never sends anything. client := telemetry.NewClientFromIdentity(a.telemetryURL, a.identityPath, 0) - emitter := &telemetryEmitter{client: client} + emitter := &telemetryEmitter{client: client, getNodeID: a.getNodeID} return a.svc.Start(ctx, appstore.Deps{ Streams: deps.Streams, diff --git a/cmd/daemon/main.go b/cmd/daemon/main.go index 66f63e55..a17a6ea8 100644 --- a/cmd/daemon/main.go +++ b/cmd/daemon/main.go @@ -364,6 +364,7 @@ func main() { }), telemetryURL: *telemetryURL, identityPath: idPath, + getNodeID: func() int64 { return int64(d.NodeID()) }, }); err != nil { log.Fatalf("register appstore: %v", err) } diff --git a/cmd/pilotctl/appstore.go b/cmd/pilotctl/appstore.go index f601a599..193509a0 100644 --- a/cmd/pilotctl/appstore.go +++ b/cmd/pilotctl/appstore.go @@ -1233,7 +1233,7 @@ func cmdAppStoreInstall(args []string) { "source": sourceStr, }) identityPath := configDir() + "/identity.json" - client := telemetry.NewClientFromIdentity(url, identityPath, 0) + client := telemetry.NewClientFromIdentity(url, identityPath, nodeIDFromDaemon()) err := client.Send(telemetry.Event{ Kind: "app_installed", TS: time.Now().UTC().Format(time.RFC3339), diff --git a/cmd/pilotctl/appstore_catalogue.go b/cmd/pilotctl/appstore_catalogue.go index 7d2be8f3..40fa1245 100644 --- a/cmd/pilotctl/appstore_catalogue.go +++ b/cmd/pilotctl/appstore_catalogue.go @@ -246,10 +246,11 @@ func cmdAppStoreCatalogue(_ []string) { url = telemetry.DefaultEndpoint } identityPath := configDir() + "/identity.json" - client := telemetry.NewClientFromIdentity(url, identityPath, 0) + client := telemetry.NewClientFromIdentity(url, identityPath, nodeIDFromDaemon()) err := client.Send(telemetry.Event{ - Kind: "catalogue_viewed", - TS: time.Now().UTC().Format(time.RFC3339), + Kind: "catalogue_viewed", + TS: time.Now().UTC().Format(time.RFC3339), + Payload: json.RawMessage(`{"surface":"catalogue"}`), }) if err != nil { slog.Warn("telemetry send failed, catalogue still rendered", "err", err) diff --git a/cmd/pilotctl/appstore_view.go b/cmd/pilotctl/appstore_view.go index 618de385..1c9ca53b 100644 --- a/cmd/pilotctl/appstore_view.go +++ b/cmd/pilotctl/appstore_view.go @@ -194,7 +194,7 @@ func cmdAppStoreView(args []string) { "app_id": appID, }) identityPath := configDir() + "/identity.json" - client := telemetry.NewClientFromIdentity(url, identityPath, 0) + client := telemetry.NewClientFromIdentity(url, identityPath, nodeIDFromDaemon()) err := client.Send(telemetry.Event{ Kind: "appstore_view", TS: time.Now().UTC().Format(time.RFC3339), diff --git a/cmd/pilotctl/main.go b/cmd/pilotctl/main.go index b11d5eba..366502d9 100644 --- a/cmd/pilotctl/main.go +++ b/cmd/pilotctl/main.go @@ -468,6 +468,22 @@ func connectDriver() *driver.Driver { return d } +// nodeIDFromDaemon returns the daemon's registered node ID for use in +// telemetry events. Returns 0 silently if the daemon is unreachable. +func nodeIDFromDaemon() int64 { + d, err := driver.Connect(getSocket()) + if err != nil { + return 0 + } + defer d.Close() + info, err := d.Info() + if err != nil { + return 0 + } + nid, _ := info["node_id"].(float64) + return int64(nid) +} + func connectRegistry() *registry.Client { addr := getRegistry() rc, err := registry.Dial(addr) diff --git a/cmd/pilotctl/review.go b/cmd/pilotctl/review.go index c12faa9b..87de8ab1 100644 --- a/cmd/pilotctl/review.go +++ b/cmd/pilotctl/review.go @@ -117,7 +117,7 @@ func cmdReview(args []string) { payload["text"] = reviewText } payloadBytes, _ := json.Marshal(payload) - client := telemetry.NewClientFromIdentity(url, identityPath, 0) + client := telemetry.NewClientFromIdentity(url, identityPath, nodeIDFromDaemon()) if err := client.Send(telemetry.Event{ Kind: "review", TS: time.Now().UTC().Format(time.RFC3339), diff --git a/pkg/daemon/zz_broadcast_test.go b/pkg/daemon/zz_broadcast_test.go index b6e46050..d3f130a4 100644 --- a/pkg/daemon/zz_broadcast_test.go +++ b/pkg/daemon/zz_broadcast_test.go @@ -4,6 +4,8 @@ package daemon import ( "net" + "os" + "path/filepath" "testing" "time" @@ -302,6 +304,35 @@ func TestBroadcastDatagramNetworkZeroForbidden(t *testing.T) { } } +// TestBroadcastDatagramConsentOff verifies the consent gate: when the user +// has set consent.broadcasts=false in config.json, BroadcastDatagram drops +// the datagram silently (returns nil) and no peer receives a packet. +func TestBroadcastDatagramConsentOff(t *testing.T) { + // Cannot use t.Parallel — t.Setenv mutates a process-wide env var. + fx := setupBroadcastFixture(t, 1) + fx.d.config.AdminToken = "secret" + + // Point HOME at a temp dir and write a config that opts out of broadcasts. + home := t.TempDir() + t.Setenv("HOME", home) + pilotDir := filepath.Join(home, ".pilot") + if err := os.MkdirAll(pilotDir, 0700); err != nil { + t.Fatalf("mkdir .pilot: %v", err) + } + cfg := []byte(`{"consent":{"broadcasts":false}}`) + if err := os.WriteFile(filepath.Join(pilotDir, "config.json"), cfg, 0600); err != nil { + t.Fatalf("write config: %v", err) + } + + err := fx.d.BroadcastDatagram(fx.netID, 80, []byte("should-be-dropped"), "secret") + if err != nil { + t.Fatalf("BroadcastDatagram with consent off: want nil, got %v", err) + } + if pkt := readPacket(t, fx.peerConns[0], 150*time.Millisecond); pkt != nil { + t.Fatalf("packet delivered despite broadcasts consent=false: %s", pkt.Payload) + } +} + // Non-member broadcasts are denied even with a valid admin token. func TestBroadcastDatagramNonMemberDenied(t *testing.T) { t.Parallel() diff --git a/pkg/telemetry/client.go b/pkg/telemetry/client.go index 0f3526f7..84f98443 100644 --- a/pkg/telemetry/client.go +++ b/pkg/telemetry/client.go @@ -98,6 +98,7 @@ func (c *Client) Send(events ...Event) error { url := c.url sign := c.sign pubKeyB := c.pubKeyB + nodeID := c.nodeID c.mu.Unlock() if disabled || url == "" { @@ -113,6 +114,15 @@ func (c *Client) Send(events ...Event) error { return nil } + // Inject node ID into events that don't supply their own. + if nodeID != 0 { + for i := range events { + if events[i].NodeID == 0 { + events[i].NodeID = nodeID + } + } + } + body, err := json.Marshal(events) if err != nil { return fmt.Errorf("telemetry marshal: %w", err)