Allow authentication to NATS using NKey seed files as an alternative to credentials files. NKeys use Ed25519 key pairs for authentication. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
473 lines
12 KiB
Go
473 lines
12 KiB
Go
package collector
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"os"
|
|
"os/exec"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
)
|
|
|
|
// RevisionUpdate is the message format for NATS pub/sub
|
|
type RevisionUpdate struct {
|
|
Rev string `json:"rev"`
|
|
URL string `json:"url"`
|
|
Timestamp int64 `json:"timestamp"`
|
|
SourceHost string `json:"source_host"`
|
|
}
|
|
|
|
// FlakeCollectorConfig holds configuration for the FlakeCollector
|
|
type FlakeCollectorConfig struct {
|
|
FlakeURL string
|
|
CheckInterval time.Duration
|
|
|
|
// NATS configuration
|
|
NATSEnabled bool
|
|
NATSURL string
|
|
NATSSubject string
|
|
NATSCredentialsFile string
|
|
NATSNkeySeedFile string
|
|
}
|
|
|
|
// nixosVersionInfo holds the parsed output of nixos-version --json
|
|
type nixosVersionInfo struct {
|
|
ConfigurationRevision string `json:"configurationRevision"`
|
|
NixosVersion string `json:"nixosVersion"`
|
|
NixpkgsRevision string `json:"nixpkgsRevision"`
|
|
}
|
|
|
|
type FlakeCollector struct {
|
|
flakeURL string
|
|
checkInterval time.Duration
|
|
|
|
inputAge *prometheus.Desc
|
|
inputInfo *prometheus.Desc
|
|
flakeInfo *prometheus.Desc
|
|
revisionBehind *prometheus.Desc
|
|
|
|
mu sync.RWMutex
|
|
cachedData *flakeMetadata
|
|
lastFetch time.Time
|
|
fetchError error
|
|
|
|
// NATS fields
|
|
natsEnabled bool
|
|
natsConn *nats.Conn
|
|
natsSub *nats.Subscription
|
|
natsSubject string
|
|
hostname string
|
|
}
|
|
|
|
type flakeMetadata struct {
|
|
Revision string `json:"revision"`
|
|
Locks flakeLocks `json:"locks"`
|
|
}
|
|
|
|
type flakeLocks struct {
|
|
Nodes map[string]flakeLockNode `json:"nodes"`
|
|
Root string `json:"root"`
|
|
}
|
|
|
|
type flakeLockNode struct {
|
|
Inputs map[string]interface{} `json:"inputs,omitempty"`
|
|
Locked *lockedInfo `json:"locked,omitempty"`
|
|
Original *originalInfo `json:"original,omitempty"`
|
|
}
|
|
|
|
type lockedInfo struct {
|
|
LastModified int64 `json:"lastModified"`
|
|
Rev string `json:"rev"`
|
|
Type string `json:"type"`
|
|
}
|
|
|
|
type originalInfo struct {
|
|
Type string `json:"type"`
|
|
}
|
|
|
|
func NewFlakeCollector(flakeURL string, checkInterval time.Duration) *FlakeCollector {
|
|
return newFlakeCollector(flakeURL, checkInterval)
|
|
}
|
|
|
|
// NewFlakeCollectorWithNATS creates a FlakeCollector with optional NATS integration
|
|
func NewFlakeCollectorWithNATS(cfg FlakeCollectorConfig) (*FlakeCollector, error) {
|
|
c := newFlakeCollector(cfg.FlakeURL, cfg.CheckInterval)
|
|
|
|
if cfg.NATSEnabled {
|
|
hostname, err := os.Hostname()
|
|
if err != nil {
|
|
hostname = "unknown"
|
|
}
|
|
c.hostname = hostname
|
|
c.natsEnabled = true
|
|
c.natsSubject = cfg.NATSSubject
|
|
|
|
if err := c.connectNATS(cfg.NATSURL, cfg.NATSCredentialsFile, cfg.NATSNkeySeedFile); err != nil {
|
|
// Log warning but continue without NATS
|
|
slog.Warn("Failed to connect to NATS, continuing without cache sharing", "error", err)
|
|
}
|
|
}
|
|
|
|
return c, nil
|
|
}
|
|
|
|
func newFlakeCollector(flakeURL string, checkInterval time.Duration) *FlakeCollector {
|
|
return &FlakeCollector{
|
|
flakeURL: flakeURL,
|
|
checkInterval: checkInterval,
|
|
inputAge: prometheus.NewDesc(
|
|
"nixos_flake_input_age_seconds",
|
|
"Age of flake input in seconds",
|
|
[]string{"input"}, nil,
|
|
),
|
|
inputInfo: prometheus.NewDesc(
|
|
"nixos_flake_input_info",
|
|
"Info gauge with revision and type labels",
|
|
[]string{"input", "rev", "type"}, nil,
|
|
),
|
|
flakeInfo: prometheus.NewDesc(
|
|
"nixos_flake_info",
|
|
"Info gauge with current and remote flake revisions",
|
|
[]string{"current_rev", "remote_rev", "nixpkgs_rev", "nixos_version"}, nil,
|
|
),
|
|
revisionBehind: prometheus.NewDesc(
|
|
"nixos_flake_revision_behind",
|
|
"1 if current system revision differs from remote latest, 0 if match",
|
|
nil, nil,
|
|
),
|
|
}
|
|
}
|
|
|
|
func (c *FlakeCollector) Describe(ch chan<- *prometheus.Desc) {
|
|
ch <- c.inputAge
|
|
ch <- c.inputInfo
|
|
ch <- c.flakeInfo
|
|
ch <- c.revisionBehind
|
|
}
|
|
|
|
func (c *FlakeCollector) Collect(ch chan<- prometheus.Metric) {
|
|
data, err := c.getFlakeData()
|
|
if err != nil {
|
|
slog.Error("Failed to get flake data", "error", err)
|
|
return
|
|
}
|
|
|
|
c.collectInputMetrics(ch, data)
|
|
c.collectRevisionBehind(ch, data)
|
|
}
|
|
|
|
func (c *FlakeCollector) getFlakeData() (*flakeMetadata, error) {
|
|
c.mu.RLock()
|
|
cacheValid := c.cachedData != nil && time.Since(c.lastFetch) < c.checkInterval
|
|
cachedData := c.cachedData
|
|
c.mu.RUnlock()
|
|
|
|
// Smart cache: check if current revision matches cached remote revision
|
|
// If not, force a refresh even if cache hasn't expired
|
|
forceRefresh := false
|
|
if cacheValid && cachedData != nil {
|
|
versionInfo, err := getNixosVersionInfo()
|
|
if err == nil && versionInfo.ConfigurationRevision != "" {
|
|
currentRev := versionInfo.ConfigurationRevision
|
|
cachedRemoteRev := cachedData.Revision
|
|
// If current system rev now matches cached remote rev, refresh to get latest
|
|
if currentRev == cachedRemoteRev || strings.HasPrefix(cachedRemoteRev, currentRev) {
|
|
forceRefresh = true
|
|
slog.Debug("Current revision matches cached remote, forcing refresh")
|
|
}
|
|
}
|
|
}
|
|
|
|
if cacheValid && !forceRefresh {
|
|
return cachedData, nil
|
|
}
|
|
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
// Double-check after acquiring write lock
|
|
if !forceRefresh && c.cachedData != nil && time.Since(c.lastFetch) < c.checkInterval {
|
|
return c.cachedData, nil
|
|
}
|
|
|
|
oldRevision := ""
|
|
if c.cachedData != nil {
|
|
oldRevision = c.cachedData.Revision
|
|
}
|
|
|
|
data, err := fetchFlakeMetadata(c.flakeURL)
|
|
if err != nil {
|
|
c.fetchError = err
|
|
// Return cached data if available, even if stale
|
|
if c.cachedData != nil {
|
|
slog.Warn("Using stale flake data due to fetch error", "error", err)
|
|
return c.cachedData, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
c.cachedData = data
|
|
c.lastFetch = time.Now()
|
|
c.fetchError = nil
|
|
|
|
// Publish update via NATS if revision changed
|
|
if c.natsEnabled && data.Revision != oldRevision {
|
|
c.publishRevisionUpdate(data.Revision)
|
|
}
|
|
|
|
return data, nil
|
|
}
|
|
|
|
func (c *FlakeCollector) collectInputMetrics(ch chan<- prometheus.Metric, data *flakeMetadata) {
|
|
now := time.Now().Unix()
|
|
|
|
for name, node := range data.Locks.Nodes {
|
|
// Skip the root node
|
|
if name == "root" {
|
|
continue
|
|
}
|
|
|
|
if node.Locked == nil {
|
|
continue
|
|
}
|
|
|
|
// Input age
|
|
age := float64(now - node.Locked.LastModified)
|
|
ch <- prometheus.MustNewConstMetric(c.inputAge, prometheus.GaugeValue, age, name)
|
|
|
|
// Input info
|
|
rev := node.Locked.Rev
|
|
if len(rev) > 7 {
|
|
rev = rev[:7]
|
|
}
|
|
inputType := node.Locked.Type
|
|
ch <- prometheus.MustNewConstMetric(c.inputInfo, prometheus.GaugeValue, 1, name, rev, inputType)
|
|
}
|
|
}
|
|
|
|
func (c *FlakeCollector) collectRevisionBehind(ch chan<- prometheus.Metric, data *flakeMetadata) {
|
|
versionInfo, err := getNixosVersionInfo()
|
|
if err != nil {
|
|
slog.Error("Failed to get NixOS version info", "error", err)
|
|
return
|
|
}
|
|
|
|
currentRev := versionInfo.ConfigurationRevision
|
|
if currentRev == "" {
|
|
currentRev = "unknown"
|
|
} else if len(currentRev) > 7 {
|
|
currentRev = currentRev[:7]
|
|
}
|
|
|
|
remoteRev := data.Revision
|
|
if len(remoteRev) > 7 {
|
|
remoteRev = remoteRev[:7]
|
|
}
|
|
|
|
nixpkgsRev := versionInfo.NixpkgsRevision
|
|
if len(nixpkgsRev) > 7 {
|
|
nixpkgsRev = nixpkgsRev[:7]
|
|
}
|
|
|
|
// Emit flake info metric with revisions
|
|
ch <- prometheus.MustNewConstMetric(c.flakeInfo, prometheus.GaugeValue, 1,
|
|
currentRev, remoteRev, nixpkgsRev, versionInfo.NixosVersion)
|
|
|
|
behind := 0.0
|
|
if currentRev != "unknown" && data.Revision != "" {
|
|
if currentRev != remoteRev && !strings.HasPrefix(data.Revision, currentRev) {
|
|
behind = 1.0
|
|
}
|
|
}
|
|
|
|
ch <- prometheus.MustNewConstMetric(c.revisionBehind, prometheus.GaugeValue, behind)
|
|
}
|
|
|
|
func fetchFlakeMetadata(flakeURL string) (*flakeMetadata, error) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
cmd := exec.CommandContext(ctx, "nix", "flake", "metadata", "--json", flakeURL)
|
|
output, err := cmd.Output()
|
|
if err != nil {
|
|
if exitErr, ok := err.(*exec.ExitError); ok {
|
|
return nil, fmt.Errorf("nix flake metadata failed: %s", strings.TrimSpace(string(exitErr.Stderr)))
|
|
}
|
|
return nil, fmt.Errorf("nix flake metadata failed: %w", err)
|
|
}
|
|
|
|
var data flakeMetadata
|
|
if err := json.Unmarshal(output, &data); err != nil {
|
|
return nil, fmt.Errorf("failed to parse flake metadata: %w", err)
|
|
}
|
|
|
|
return &data, nil
|
|
}
|
|
|
|
const nixosVersionPath = "/run/current-system/sw/bin/nixos-version"
|
|
|
|
func getNixosVersionInfo() (*nixosVersionInfo, error) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
cmd := exec.CommandContext(ctx, nixosVersionPath, "--json")
|
|
output, err := cmd.Output()
|
|
if err != nil {
|
|
if exitErr, ok := err.(*exec.ExitError); ok {
|
|
return nil, fmt.Errorf("nixos-version failed: %s", strings.TrimSpace(string(exitErr.Stderr)))
|
|
}
|
|
return nil, fmt.Errorf("nixos-version failed: %w", err)
|
|
}
|
|
|
|
var info nixosVersionInfo
|
|
if err := json.Unmarshal(output, &info); err != nil {
|
|
return nil, fmt.Errorf("failed to parse nixos-version output: %w", err)
|
|
}
|
|
|
|
return &info, nil
|
|
}
|
|
|
|
// connectNATS establishes connection to NATS server with auto-reconnect
|
|
func (c *FlakeCollector) connectNATS(url, credentialsFile, nkeySeedFile string) error {
|
|
opts := []nats.Option{
|
|
nats.MaxReconnects(-1), // Infinite reconnects
|
|
nats.ReconnectWait(5 * time.Second),
|
|
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
|
|
if err != nil {
|
|
slog.Warn("NATS disconnected", "error", err)
|
|
}
|
|
}),
|
|
nats.ReconnectHandler(func(_ *nats.Conn) {
|
|
slog.Info("NATS reconnected")
|
|
// Resubscribe after reconnection
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
if c.natsSub != nil {
|
|
if err := c.natsSub.Unsubscribe(); err != nil {
|
|
slog.Debug("Failed to unsubscribe old subscription", "error", err)
|
|
}
|
|
}
|
|
sub, err := c.natsConn.Subscribe(c.natsSubject, c.handleRevisionUpdate)
|
|
if err != nil {
|
|
slog.Error("Failed to resubscribe after reconnect", "error", err)
|
|
return
|
|
}
|
|
c.natsSub = sub
|
|
}),
|
|
}
|
|
|
|
if credentialsFile != "" {
|
|
opts = append(opts, nats.UserCredentials(credentialsFile))
|
|
} else if nkeySeedFile != "" {
|
|
opt, err := nats.NkeyOptionFromSeed(nkeySeedFile)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to load NKey seed file: %w", err)
|
|
}
|
|
opts = append(opts, opt)
|
|
}
|
|
|
|
nc, err := nats.Connect(url, opts...)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to connect to NATS: %w", err)
|
|
}
|
|
|
|
c.natsConn = nc
|
|
|
|
// Subscribe to revision updates
|
|
sub, err := nc.Subscribe(c.natsSubject, c.handleRevisionUpdate)
|
|
if err != nil {
|
|
nc.Close()
|
|
return fmt.Errorf("failed to subscribe to NATS subject: %w", err)
|
|
}
|
|
c.natsSub = sub
|
|
|
|
slog.Info("Connected to NATS", "url", url, "subject", c.natsSubject)
|
|
return nil
|
|
}
|
|
|
|
// handleRevisionUpdate processes incoming NATS messages
|
|
func (c *FlakeCollector) handleRevisionUpdate(msg *nats.Msg) {
|
|
var update RevisionUpdate
|
|
if err := json.Unmarshal(msg.Data, &update); err != nil {
|
|
slog.Debug("Failed to parse NATS revision update", "error", err)
|
|
return
|
|
}
|
|
|
|
// Ignore messages from self
|
|
if update.SourceHost == c.hostname {
|
|
return
|
|
}
|
|
|
|
// Ignore messages for different flake URLs
|
|
if update.URL != c.flakeURL {
|
|
return
|
|
}
|
|
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
// Ignore messages older than current cache
|
|
if c.lastFetch.Unix() > update.Timestamp {
|
|
return
|
|
}
|
|
|
|
// Update cache with new revision
|
|
if c.cachedData != nil && c.cachedData.Revision != update.Rev {
|
|
slog.Info("Received remote revision update via NATS",
|
|
"from", update.SourceHost,
|
|
"old_rev", c.cachedData.Revision,
|
|
"new_rev", update.Rev)
|
|
c.cachedData.Revision = update.Rev
|
|
c.lastFetch = time.Unix(update.Timestamp, 0)
|
|
}
|
|
}
|
|
|
|
// publishRevisionUpdate broadcasts a revision update to other hosts
|
|
func (c *FlakeCollector) publishRevisionUpdate(rev string) {
|
|
if c.natsConn == nil || !c.natsConn.IsConnected() {
|
|
slog.Debug("NATS not connected, skipping revision publish")
|
|
return
|
|
}
|
|
|
|
update := RevisionUpdate{
|
|
Rev: rev,
|
|
URL: c.flakeURL,
|
|
Timestamp: time.Now().Unix(),
|
|
SourceHost: c.hostname,
|
|
}
|
|
|
|
data, err := json.Marshal(update)
|
|
if err != nil {
|
|
slog.Debug("Failed to marshal revision update", "error", err)
|
|
return
|
|
}
|
|
|
|
if err := c.natsConn.Publish(c.natsSubject, data); err != nil {
|
|
slog.Debug("Failed to publish revision update", "error", err)
|
|
return
|
|
}
|
|
|
|
slog.Debug("Published revision update via NATS", "rev", rev)
|
|
}
|
|
|
|
// Close gracefully shuts down the NATS connection
|
|
func (c *FlakeCollector) Close() error {
|
|
if c.natsSub != nil {
|
|
if err := c.natsSub.Unsubscribe(); err != nil {
|
|
slog.Debug("Failed to unsubscribe from NATS", "error", err)
|
|
}
|
|
}
|
|
|
|
if c.natsConn != nil {
|
|
c.natsConn.Close()
|
|
}
|
|
|
|
return nil
|
|
}
|