package collector import ( "context" "encoding/json" "fmt" "log/slog" "os" "os/exec" "strings" "sync" "time" "github.com/nats-io/nats.go" "github.com/prometheus/client_golang/prometheus" ) // RevisionUpdate is the message format for NATS pub/sub type RevisionUpdate struct { Rev string `json:"rev"` URL string `json:"url"` Timestamp int64 `json:"timestamp"` SourceHost string `json:"source_host"` } // FlakeCollectorConfig holds configuration for the FlakeCollector type FlakeCollectorConfig struct { FlakeURL string CheckInterval time.Duration // NATS configuration NATSEnabled bool NATSURL string NATSSubject string NATSCredentialsFile string } // nixosVersionInfo holds the parsed output of nixos-version --json type nixosVersionInfo struct { ConfigurationRevision string `json:"configurationRevision"` NixosVersion string `json:"nixosVersion"` NixpkgsRevision string `json:"nixpkgsRevision"` } type FlakeCollector struct { flakeURL string checkInterval time.Duration inputAge *prometheus.Desc inputInfo *prometheus.Desc flakeInfo *prometheus.Desc revisionBehind *prometheus.Desc mu sync.RWMutex cachedData *flakeMetadata lastFetch time.Time fetchError error // NATS fields natsEnabled bool natsConn *nats.Conn natsSub *nats.Subscription natsSubject string hostname string } type flakeMetadata struct { Revision string `json:"revision"` Locks flakeLocks `json:"locks"` } type flakeLocks struct { Nodes map[string]flakeLockNode `json:"nodes"` Root string `json:"root"` } type flakeLockNode struct { Inputs map[string]interface{} `json:"inputs,omitempty"` Locked *lockedInfo `json:"locked,omitempty"` Original *originalInfo `json:"original,omitempty"` } type lockedInfo struct { LastModified int64 `json:"lastModified"` Rev string `json:"rev"` Type string `json:"type"` } type originalInfo struct { Type string `json:"type"` } func NewFlakeCollector(flakeURL string, checkInterval time.Duration) *FlakeCollector { return newFlakeCollector(flakeURL, checkInterval) } // NewFlakeCollectorWithNATS creates a FlakeCollector with optional NATS integration func NewFlakeCollectorWithNATS(cfg FlakeCollectorConfig) (*FlakeCollector, error) { c := newFlakeCollector(cfg.FlakeURL, cfg.CheckInterval) if cfg.NATSEnabled { hostname, err := os.Hostname() if err != nil { hostname = "unknown" } c.hostname = hostname c.natsEnabled = true c.natsSubject = cfg.NATSSubject if err := c.connectNATS(cfg.NATSURL, cfg.NATSCredentialsFile); err != nil { // Log warning but continue without NATS slog.Warn("Failed to connect to NATS, continuing without cache sharing", "error", err) } } return c, nil } func newFlakeCollector(flakeURL string, checkInterval time.Duration) *FlakeCollector { return &FlakeCollector{ flakeURL: flakeURL, checkInterval: checkInterval, inputAge: prometheus.NewDesc( "nixos_flake_input_age_seconds", "Age of flake input in seconds", []string{"input"}, nil, ), inputInfo: prometheus.NewDesc( "nixos_flake_input_info", "Info gauge with revision and type labels", []string{"input", "rev", "type"}, nil, ), flakeInfo: prometheus.NewDesc( "nixos_flake_info", "Info gauge with current and remote flake revisions", []string{"current_rev", "remote_rev", "nixpkgs_rev", "nixos_version"}, nil, ), revisionBehind: prometheus.NewDesc( "nixos_flake_revision_behind", "1 if current system revision differs from remote latest, 0 if match", nil, nil, ), } } func (c *FlakeCollector) Describe(ch chan<- *prometheus.Desc) { ch <- c.inputAge ch <- c.inputInfo ch <- c.flakeInfo ch <- c.revisionBehind } func (c *FlakeCollector) Collect(ch chan<- prometheus.Metric) { data, err := c.getFlakeData() if err != nil { slog.Error("Failed to get flake data", "error", err) return } c.collectInputMetrics(ch, data) c.collectRevisionBehind(ch, data) } func (c *FlakeCollector) getFlakeData() (*flakeMetadata, error) { c.mu.RLock() cacheValid := c.cachedData != nil && time.Since(c.lastFetch) < c.checkInterval cachedData := c.cachedData c.mu.RUnlock() // Smart cache: check if current revision matches cached remote revision // If not, force a refresh even if cache hasn't expired forceRefresh := false if cacheValid && cachedData != nil { versionInfo, err := getNixosVersionInfo() if err == nil && versionInfo.ConfigurationRevision != "" { currentRev := versionInfo.ConfigurationRevision cachedRemoteRev := cachedData.Revision // If current system rev now matches cached remote rev, refresh to get latest if currentRev == cachedRemoteRev || strings.HasPrefix(cachedRemoteRev, currentRev) { forceRefresh = true slog.Debug("Current revision matches cached remote, forcing refresh") } } } if cacheValid && !forceRefresh { return cachedData, nil } c.mu.Lock() defer c.mu.Unlock() // Double-check after acquiring write lock if !forceRefresh && c.cachedData != nil && time.Since(c.lastFetch) < c.checkInterval { return c.cachedData, nil } oldRevision := "" if c.cachedData != nil { oldRevision = c.cachedData.Revision } data, err := fetchFlakeMetadata(c.flakeURL) if err != nil { c.fetchError = err // Return cached data if available, even if stale if c.cachedData != nil { slog.Warn("Using stale flake data due to fetch error", "error", err) return c.cachedData, nil } return nil, err } c.cachedData = data c.lastFetch = time.Now() c.fetchError = nil // Publish update via NATS if revision changed if c.natsEnabled && data.Revision != oldRevision { c.publishRevisionUpdate(data.Revision) } return data, nil } func (c *FlakeCollector) collectInputMetrics(ch chan<- prometheus.Metric, data *flakeMetadata) { now := time.Now().Unix() for name, node := range data.Locks.Nodes { // Skip the root node if name == "root" { continue } if node.Locked == nil { continue } // Input age age := float64(now - node.Locked.LastModified) ch <- prometheus.MustNewConstMetric(c.inputAge, prometheus.GaugeValue, age, name) // Input info rev := node.Locked.Rev if len(rev) > 7 { rev = rev[:7] } inputType := node.Locked.Type ch <- prometheus.MustNewConstMetric(c.inputInfo, prometheus.GaugeValue, 1, name, rev, inputType) } } func (c *FlakeCollector) collectRevisionBehind(ch chan<- prometheus.Metric, data *flakeMetadata) { versionInfo, err := getNixosVersionInfo() if err != nil { slog.Error("Failed to get NixOS version info", "error", err) return } currentRev := versionInfo.ConfigurationRevision if currentRev == "" { currentRev = "unknown" } else if len(currentRev) > 7 { currentRev = currentRev[:7] } remoteRev := data.Revision if len(remoteRev) > 7 { remoteRev = remoteRev[:7] } nixpkgsRev := versionInfo.NixpkgsRevision if len(nixpkgsRev) > 7 { nixpkgsRev = nixpkgsRev[:7] } // Emit flake info metric with revisions ch <- prometheus.MustNewConstMetric(c.flakeInfo, prometheus.GaugeValue, 1, currentRev, remoteRev, nixpkgsRev, versionInfo.NixosVersion) behind := 0.0 if currentRev != "unknown" && data.Revision != "" { if currentRev != remoteRev && !strings.HasPrefix(data.Revision, currentRev) { behind = 1.0 } } ch <- prometheus.MustNewConstMetric(c.revisionBehind, prometheus.GaugeValue, behind) } func fetchFlakeMetadata(flakeURL string) (*flakeMetadata, error) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() cmd := exec.CommandContext(ctx, "nix", "flake", "metadata", "--json", flakeURL) output, err := cmd.Output() if err != nil { if exitErr, ok := err.(*exec.ExitError); ok { return nil, fmt.Errorf("nix flake metadata failed: %s", strings.TrimSpace(string(exitErr.Stderr))) } return nil, fmt.Errorf("nix flake metadata failed: %w", err) } var data flakeMetadata if err := json.Unmarshal(output, &data); err != nil { return nil, fmt.Errorf("failed to parse flake metadata: %w", err) } return &data, nil } const nixosVersionPath = "/run/current-system/sw/bin/nixos-version" func getNixosVersionInfo() (*nixosVersionInfo, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() cmd := exec.CommandContext(ctx, nixosVersionPath, "--json") output, err := cmd.Output() if err != nil { if exitErr, ok := err.(*exec.ExitError); ok { return nil, fmt.Errorf("nixos-version failed: %s", strings.TrimSpace(string(exitErr.Stderr))) } return nil, fmt.Errorf("nixos-version failed: %w", err) } var info nixosVersionInfo if err := json.Unmarshal(output, &info); err != nil { return nil, fmt.Errorf("failed to parse nixos-version output: %w", err) } return &info, nil } // connectNATS establishes connection to NATS server with auto-reconnect func (c *FlakeCollector) connectNATS(url, credentialsFile string) error { opts := []nats.Option{ nats.MaxReconnects(-1), // Infinite reconnects nats.ReconnectWait(5 * time.Second), nats.DisconnectErrHandler(func(_ *nats.Conn, err error) { if err != nil { slog.Warn("NATS disconnected", "error", err) } }), nats.ReconnectHandler(func(_ *nats.Conn) { slog.Info("NATS reconnected") // Resubscribe after reconnection c.mu.Lock() defer c.mu.Unlock() if c.natsSub != nil { if err := c.natsSub.Unsubscribe(); err != nil { slog.Debug("Failed to unsubscribe old subscription", "error", err) } } sub, err := c.natsConn.Subscribe(c.natsSubject, c.handleRevisionUpdate) if err != nil { slog.Error("Failed to resubscribe after reconnect", "error", err) return } c.natsSub = sub }), } if credentialsFile != "" { opts = append(opts, nats.UserCredentials(credentialsFile)) } nc, err := nats.Connect(url, opts...) if err != nil { return fmt.Errorf("failed to connect to NATS: %w", err) } c.natsConn = nc // Subscribe to revision updates sub, err := nc.Subscribe(c.natsSubject, c.handleRevisionUpdate) if err != nil { nc.Close() return fmt.Errorf("failed to subscribe to NATS subject: %w", err) } c.natsSub = sub slog.Info("Connected to NATS", "url", url, "subject", c.natsSubject) return nil } // handleRevisionUpdate processes incoming NATS messages func (c *FlakeCollector) handleRevisionUpdate(msg *nats.Msg) { var update RevisionUpdate if err := json.Unmarshal(msg.Data, &update); err != nil { slog.Debug("Failed to parse NATS revision update", "error", err) return } // Ignore messages from self if update.SourceHost == c.hostname { return } // Ignore messages for different flake URLs if update.URL != c.flakeURL { return } c.mu.Lock() defer c.mu.Unlock() // Ignore messages older than current cache if c.lastFetch.Unix() > update.Timestamp { return } // Update cache with new revision if c.cachedData != nil && c.cachedData.Revision != update.Rev { slog.Info("Received remote revision update via NATS", "from", update.SourceHost, "old_rev", c.cachedData.Revision, "new_rev", update.Rev) c.cachedData.Revision = update.Rev c.lastFetch = time.Unix(update.Timestamp, 0) } } // publishRevisionUpdate broadcasts a revision update to other hosts func (c *FlakeCollector) publishRevisionUpdate(rev string) { if c.natsConn == nil || !c.natsConn.IsConnected() { slog.Debug("NATS not connected, skipping revision publish") return } update := RevisionUpdate{ Rev: rev, URL: c.flakeURL, Timestamp: time.Now().Unix(), SourceHost: c.hostname, } data, err := json.Marshal(update) if err != nil { slog.Debug("Failed to marshal revision update", "error", err) return } if err := c.natsConn.Publish(c.natsSubject, data); err != nil { slog.Debug("Failed to publish revision update", "error", err) return } slog.Debug("Published revision update via NATS", "rev", rev) } // Close gracefully shuts down the NATS connection func (c *FlakeCollector) Close() error { if c.natsSub != nil { if err := c.natsSub.Unsubscribe(); err != nil { slog.Debug("Failed to unsubscribe from NATS", "error", err) } } if c.natsConn != nil { c.natsConn.Close() } return nil }