Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pilot-protocol/webhook"

"github.com/TeoSlayer/pilotprotocol/internal/catalogtrust"
"github.com/TeoSlayer/pilotprotocol/pkg/telemetry"
)

var version = "dev"
Expand Down Expand Up @@ -100,6 +101,9 @@ func main() {
logFormat := flag.String("log-format", "text", "log format (text, json)")
motdFeedURL := flag.String("motd-feed-url", motd.DefaultFeedURL, "message-of-the-day feed URL (empty to disable); overridden by $PILOT_MOTD_URL")
motdInterval := flag.Duration("motd-interval", 0, "message-of-the-day poll interval (default 15m)")
telemetryURL := flag.String("telemetry-url", os.Getenv("PILOT_TELEMETRY_URL"),
"telemetry endpoint URL (empty = consent off, hard no-op). "+
"Env: PILOT_TELEMETRY_URL. Default: "+telemetry.DefaultEndpoint+".")
flag.Parse()
if *adminToken == "" {
if v := os.Getenv("PILOT_ADMIN_TOKEN"); v != "" {
Expand Down Expand Up @@ -209,6 +213,7 @@ func main() {
CompatTLSTrust: *tlsTrust,
MOTDFeedURL: *motdFeedURL,
MOTDInterval: *motdInterval,
TelemetryURL: *telemetryURL,
})

// L11 plugin lifecycle (T7.1): composition root owns the
Expand Down
32 changes: 32 additions & 0 deletions cmd/pilotctl/appstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"github.com/pilot-protocol/app-store/pkg/ipc"
"github.com/pilot-protocol/app-store/pkg/manifest"
"github.com/pilot-protocol/common/crypto"

"github.com/TeoSlayer/pilotprotocol/pkg/telemetry"
)

// cryptoSHA256 is named so the sha256 import isn't ambiguous-looking.
Expand Down Expand Up @@ -1209,6 +1211,36 @@ func cmdAppStoreInstall(args []string) {
Reason: reason,
})

// Emit a telemetry event for the successful install (consent-gated —
// no-op when PILOT_TELEMETRY_URL is empty or identity.json is absent).
// Best-effort: a send failure is logged but not fatal — the install
// itself already succeeded on disk.
{
url := os.Getenv("PILOT_TELEMETRY_URL")
if url == "" {
url = telemetry.DefaultEndpoint
}
sourceStr := "catalogue"
if source == installSourceLocal {
sourceStr = "local"
}
payload, _ := json.Marshal(map[string]string{
"app_id": m.ID,
"version": m.AppVersion,
"source": sourceStr,
})
identityPath := configDir() + "/identity.json"
client := telemetry.NewClientFromIdentity(url, identityPath, 0)
err := client.Send(telemetry.Event{
Kind: "app_installed",
TS: time.Now().UTC().Format(time.RFC3339),
Payload: payload,
})
if err != nil {
slog.Warn("telemetry send failed, install still successful", "app", m.ID, "err", err)
}
}

report := installReport{
AppID: m.ID,
AppVersion: m.AppVersion,
Expand Down
28 changes: 28 additions & 0 deletions cmd/pilotctl/appstore_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@ package main
import (
"encoding/json"
"fmt"
"log/slog"
"os"
"path/filepath"
"strings"
"time"

"github.com/pilot-protocol/app-store/pkg/manifest"

"github.com/TeoSlayer/pilotprotocol/pkg/telemetry"
)

// installedAppFacts is the verified, local-only band of `view` — derived
Expand Down Expand Up @@ -175,6 +179,30 @@ func cmdAppStoreView(args []string) {
"app %q not found in catalogue or install root", appID)
}

// Emit a telemetry event for the detail view (consent-gated —
// no-op when PILOT_TELEMETRY_URL is empty or identity.json is absent).
// Best-effort: a send failure is logged but not fatal — the view
// itself already resolved and rendered below.
{
url := os.Getenv("PILOT_TELEMETRY_URL")
if url == "" {
url = telemetry.DefaultEndpoint
}
payload, _ := json.Marshal(map[string]string{
"app_id": appID,
})
identityPath := configDir() + "/identity.json"
client := telemetry.NewClientFromIdentity(url, identityPath, 0)
err := client.Send(telemetry.Event{
Kind: "appstore_view",
TS: time.Now().UTC().Format(time.RFC3339),
Payload: payload,
})
if err != nil {
slog.Warn("telemetry send failed, view still shown", "app", appID, "err", err)
}
}

report := buildAppViewReport(appID, entry, meta, facts)
if jsonOutput {
_ = json.NewEncoder(os.Stdout).Encode(report)
Expand Down
6 changes: 6 additions & 0 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ type Config struct {
// Feature flags — ablation testing. All default false (current behavior).
BeaconRTTProbe bool // probe beacon RTT; override hash pick when >2× slower than best

// Telemetry consent gate. When set to the telemetry endpoint URL,
// the daemon initialises a telemetry client that emits signed events
// (install, usage, view, review). When empty (default), the client
// is a hard no-op: no dial, no buffering, no goroutines.
TelemetryURL string

// Compat-mode transport. Default empty ("" or "udp") = today's
// behavior: bind a UDP socket via udpio.Listen. Set "compat" to
// dial WSS to BeaconURL instead (for daemons in UDP-blocked
Expand Down
194 changes: 194 additions & 0 deletions pkg/telemetry/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

// Package telemetry provides a consent-gated telemetry client that signs
// event POSTs with the node's Ed25519 identity and sends them to a
// configured telemetry endpoint. When the consent flag is off (empty URL),
// the client is a hard no-op: no dial, no buffering, no goroutines.
package telemetry

import (
"bytes"
"crypto/ed25519"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"strconv"
"strings"
"sync"
"time"

"github.com/pilot-protocol/common/crypto"
)

// DefaultEndpoint is the production telemetry ingestion URL.
const DefaultEndpoint = "https://telemetry.pilotprotocol.network/v1/events"

// Canonical signing header names, matching the telemetry server's
// internal/sig contract.
const (
HeaderTimestamp = "X-Pilot-Timestamp"
HeaderPubKey = "X-Pilot-Public-Key"
HeaderSignature = "X-Pilot-Signature"
)

// Event is the wire shape sent to the telemetry endpoint.
type Event struct {
EventID string `json:"event_id"`
Kind string `json:"kind"`
TS string `json:"ts"` // RFC3339; empty = server defaults to receive time
NodeID int64 `json:"node_id,omitempty"`
Payload json.RawMessage `json:"payload"`
}

// Client is a consent-gated telemetry sender. Zero value is a no-op.
type Client struct {
mu sync.Mutex
url string // empty = no-op
nodeID int64 // node ID included in events
sign signFunc // ed25519 signer (set via SetSigner)
pubKeyB string // base64-encoded public key
once sync.Once // lazy init guard
initErr error // capture init failures
disabled bool // true when url is empty
}

type signFunc func(msg []byte) []byte

// New creates a consent-gated telemetry client.
// When url is empty the client is a permanent no-op.
func New(url string, nodeID int64) *Client {
return &Client{
url: strings.TrimSpace(url),
nodeID: nodeID,
disabled: strings.TrimSpace(url) == "",
}
}

// SetSigner installs the Ed25519 signing function and the corresponding
// base64-encoded public key. Must be called before the first Send call.
// When signer is nil the client stays disabled.
func (c *Client) SetSigner(sign signFunc, pubKeyB64 string) {
c.mu.Lock()
defer c.mu.Unlock()
if sign == nil || pubKeyB64 == "" {
c.sign = nil
c.pubKeyB = ""
return
}
c.sign = sign
c.pubKeyB = pubKeyB64
}

// Send POSTs one or more events to the telemetry endpoint. Returns
// immediately (no-op) when the client is disabled (no consent) or
// no signer is configured.
//
// The request is Ed25519-signed with the node's identity, following
// the telemetry server's signing contract:
// - X-Pilot-Timestamp: unix seconds (decimal string)
// - X-Pilot-Public-Key: base64(std) of the 32-byte Ed25519 public key
// - X-Pilot-Signature: base64(std) of the Ed25519 signature over
// (timestamp + "\n" + body)
func (c *Client) Send(events ...Event) error {
c.mu.Lock()
disabled := c.disabled
url := c.url
sign := c.sign
pubKeyB := c.pubKeyB
c.mu.Unlock()

if disabled || url == "" {
slog.Debug("telemetry: consent off, dropping events", "count", len(events))
return nil
}
if sign == nil {
slog.Debug("telemetry: no signer configured, dropping events", "count", len(events))
return nil
}

if len(events) == 0 {
return nil
}

body, err := json.Marshal(events)
if err != nil {
return fmt.Errorf("telemetry marshal: %w", err)
}

ts := strconv.FormatInt(time.Now().Unix(), 10)
message := make([]byte, 0, len(ts)+1)
message = append(message, ts...)
message = append(message, '\n')
message = append(message, body...)
sigB64 := base64.StdEncoding.EncodeToString(sign(message))

req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(body))
if err != nil {
return fmt.Errorf("telemetry new request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set(HeaderTimestamp, ts)
req.Header.Set(HeaderPubKey, pubKeyB)
req.Header.Set(HeaderSignature, sigB64)

resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("telemetry post: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode >= 300 {
respBody, _ := io.ReadAll(resp.Body)
return fmt.Errorf("telemetry server %s: %s", resp.Status, strings.TrimSpace(string(respBody)))
}

// Drain body so the connection can be reused
_, _ = io.Copy(io.Discard, resp.Body)
return nil
}

// NewClientFromIdentity creates a consent-gated telemetry client from an
// Ed25519 identity file on disk and a telemetry URL. When url is empty
// the client is a permanent no-op. Returns nil if the identity file does
// not exist (first run).
func NewClientFromIdentity(url, identityPath string, nodeID int64) *Client {
c := New(url, nodeID)
if c.disabled || url == "" {
return c
}

id, err := crypto.LoadIdentity(identityPath)
if err != nil {
slog.Warn("telemetry: can't load identity, staying disabled", "path", identityPath, "err", err)
return c
}
if id == nil {
slog.Debug("telemetry: no identity file yet, staying disabled", "path", identityPath)
return c
}

slog.Debug("telemetry: identity loaded, enabling client", "path", identityPath,
"pubkey", crypto.EncodePublicKey(id.PublicKey))
c.SetSigner(id.Sign, crypto.EncodePublicKey(id.PublicKey))
return c
}

// SignMessage implements the signing contract directly, without an HTTP
// POST. Useful for tests and for components that want to sign arbitrary
// byte payloads. Returns (timestamp, pubKeyB64, signatureB64, error).
func SignMessage(priv ed25519.PrivateKey, body []byte) (ts, pubB64, sigB64 string, err error) {
if len(body) == 0 {
return "", "", "", fmt.Errorf("telemetry: cannot sign empty body")
}
pub := priv.Public().(ed25519.PublicKey)
ts = strconv.FormatInt(time.Now().Unix(), 10)
message := make([]byte, 0, len(ts)+1)
message = append(message, ts...)
message = append(message, '\n')
message = append(message, body...)
sig := ed25519.Sign(priv, message)
return ts, base64.StdEncoding.EncodeToString(pub), base64.StdEncoding.EncodeToString(sig), nil
}
Loading
Loading