From 5cc0e7eadd55852fe747242e00094c7f19957a45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torjus=20H=C3=A5kestad?= Date: Sun, 8 Feb 2026 23:05:52 +0100 Subject: [PATCH] feat: add NATS cache sharing and smart cache refresh Add two complementary features to reduce remote revision cache staleness: 1. Smart local cache: When current system revision matches cached remote revision, force an immediate cache refresh to check for newer revisions. 2. NATS integration: Share cache updates across hosts via NATS pub/sub. Hosts publish revision updates when they fetch new data, and subscribe to receive updates from other hosts. Features include: - Auto-reconnect with infinite retries - Graceful fallback when NATS unavailable - Filtering by flake URL and hostname New CLI flags: --flake.nats.enable --flake.nats.url --flake.nats.subject --flake.nats.credentials-file New NixOS module options under services.prometheus.exporters.nixos.flake.nats Bumps version to 0.3.0. Co-Authored-By: Claude Opus 4.5 --- collector/flake.go | 227 +++++++++++++++++++++++++++++++++++++++++++-- config/config.go | 16 ++++ flake.nix | 2 +- go.mod | 14 ++- go.sum | 16 +++- main.go | 26 +++++- module.nix | 32 +++++++ 7 files changed, 315 insertions(+), 18 deletions(-) diff --git a/collector/flake.go b/collector/flake.go index 497ac53..96249b5 100644 --- a/collector/flake.go +++ b/collector/flake.go @@ -5,14 +5,36 @@ import ( "encoding/json" "fmt" "log/slog" + "os" "os/exec" "strings" "sync" "time" + "github.com/nats-io/nats.go" "github.com/prometheus/client_golang/prometheus" ) +// RevisionUpdate is the message format for NATS pub/sub +type RevisionUpdate struct { + Rev string `json:"rev"` + URL string `json:"url"` + Timestamp int64 `json:"timestamp"` + SourceHost string `json:"source_host"` +} + +// FlakeCollectorConfig holds configuration for the FlakeCollector +type FlakeCollectorConfig struct { + FlakeURL string + CheckInterval time.Duration + + // NATS configuration + NATSEnabled bool + NATSURL string + NATSSubject string + NATSCredentialsFile string +} + // nixosVersionInfo holds the parsed output of nixos-version --json type nixosVersionInfo struct { ConfigurationRevision string `json:"configurationRevision"` @@ -33,6 +55,13 @@ type FlakeCollector struct { cachedData *flakeMetadata lastFetch time.Time fetchError error + + // NATS fields + natsEnabled bool + natsConn *nats.Conn + natsSub *nats.Subscription + natsSubject string + hostname string } type flakeMetadata struct { @@ -62,6 +91,32 @@ type originalInfo struct { } func NewFlakeCollector(flakeURL string, checkInterval time.Duration) *FlakeCollector { + return newFlakeCollector(flakeURL, checkInterval) +} + +// NewFlakeCollectorWithNATS creates a FlakeCollector with optional NATS integration +func NewFlakeCollectorWithNATS(cfg FlakeCollectorConfig) (*FlakeCollector, error) { + c := newFlakeCollector(cfg.FlakeURL, cfg.CheckInterval) + + if cfg.NATSEnabled { + hostname, err := os.Hostname() + if err != nil { + hostname = "unknown" + } + c.hostname = hostname + c.natsEnabled = true + c.natsSubject = cfg.NATSSubject + + if err := c.connectNATS(cfg.NATSURL, cfg.NATSCredentialsFile); err != nil { + // Log warning but continue without NATS + slog.Warn("Failed to connect to NATS, continuing without cache sharing", "error", err) + } + } + + return c, nil +} + +func newFlakeCollector(flakeURL string, checkInterval time.Duration) *FlakeCollector { return &FlakeCollector{ flakeURL: flakeURL, checkInterval: checkInterval, @@ -108,21 +163,43 @@ func (c *FlakeCollector) Collect(ch chan<- prometheus.Metric) { func (c *FlakeCollector) getFlakeData() (*flakeMetadata, error) { c.mu.RLock() - if c.cachedData != nil && time.Since(c.lastFetch) < c.checkInterval { - data := c.cachedData - c.mu.RUnlock() - return data, nil - } + cacheValid := c.cachedData != nil && time.Since(c.lastFetch) < c.checkInterval + cachedData := c.cachedData c.mu.RUnlock() + // Smart cache: check if current revision matches cached remote revision + // If not, force a refresh even if cache hasn't expired + forceRefresh := false + if cacheValid && cachedData != nil { + versionInfo, err := getNixosVersionInfo() + if err == nil && versionInfo.ConfigurationRevision != "" { + currentRev := versionInfo.ConfigurationRevision + cachedRemoteRev := cachedData.Revision + // If current system rev now matches cached remote rev, refresh to get latest + if currentRev == cachedRemoteRev || strings.HasPrefix(cachedRemoteRev, currentRev) { + forceRefresh = true + slog.Debug("Current revision matches cached remote, forcing refresh") + } + } + } + + if cacheValid && !forceRefresh { + return cachedData, nil + } + c.mu.Lock() defer c.mu.Unlock() // Double-check after acquiring write lock - if c.cachedData != nil && time.Since(c.lastFetch) < c.checkInterval { + if !forceRefresh && c.cachedData != nil && time.Since(c.lastFetch) < c.checkInterval { return c.cachedData, nil } + oldRevision := "" + if c.cachedData != nil { + oldRevision = c.cachedData.Revision + } + data, err := fetchFlakeMetadata(c.flakeURL) if err != nil { c.fetchError = err @@ -137,6 +214,12 @@ func (c *FlakeCollector) getFlakeData() (*flakeMetadata, error) { c.cachedData = data c.lastFetch = time.Now() c.fetchError = nil + + // Publish update via NATS if revision changed + if c.natsEnabled && data.Revision != oldRevision { + c.publishRevisionUpdate(data.Revision) + } + return data, nil } @@ -248,3 +331,135 @@ func getNixosVersionInfo() (*nixosVersionInfo, error) { return &info, nil } + +// connectNATS establishes connection to NATS server with auto-reconnect +func (c *FlakeCollector) connectNATS(url, credentialsFile string) error { + opts := []nats.Option{ + nats.MaxReconnects(-1), // Infinite reconnects + nats.ReconnectWait(5 * time.Second), + nats.DisconnectErrHandler(func(_ *nats.Conn, err error) { + if err != nil { + slog.Warn("NATS disconnected", "error", err) + } + }), + nats.ReconnectHandler(func(_ *nats.Conn) { + slog.Info("NATS reconnected") + // Resubscribe after reconnection + c.mu.Lock() + defer c.mu.Unlock() + if c.natsSub != nil { + if err := c.natsSub.Unsubscribe(); err != nil { + slog.Debug("Failed to unsubscribe old subscription", "error", err) + } + } + sub, err := c.natsConn.Subscribe(c.natsSubject, c.handleRevisionUpdate) + if err != nil { + slog.Error("Failed to resubscribe after reconnect", "error", err) + return + } + c.natsSub = sub + }), + } + + if credentialsFile != "" { + opts = append(opts, nats.UserCredentials(credentialsFile)) + } + + nc, err := nats.Connect(url, opts...) + if err != nil { + return fmt.Errorf("failed to connect to NATS: %w", err) + } + + c.natsConn = nc + + // Subscribe to revision updates + sub, err := nc.Subscribe(c.natsSubject, c.handleRevisionUpdate) + if err != nil { + nc.Close() + return fmt.Errorf("failed to subscribe to NATS subject: %w", err) + } + c.natsSub = sub + + slog.Info("Connected to NATS", "url", url, "subject", c.natsSubject) + return nil +} + +// handleRevisionUpdate processes incoming NATS messages +func (c *FlakeCollector) handleRevisionUpdate(msg *nats.Msg) { + var update RevisionUpdate + if err := json.Unmarshal(msg.Data, &update); err != nil { + slog.Debug("Failed to parse NATS revision update", "error", err) + return + } + + // Ignore messages from self + if update.SourceHost == c.hostname { + return + } + + // Ignore messages for different flake URLs + if update.URL != c.flakeURL { + return + } + + c.mu.Lock() + defer c.mu.Unlock() + + // Ignore messages older than current cache + if c.lastFetch.Unix() > update.Timestamp { + return + } + + // Update cache with new revision + if c.cachedData != nil && c.cachedData.Revision != update.Rev { + slog.Info("Received remote revision update via NATS", + "from", update.SourceHost, + "old_rev", c.cachedData.Revision, + "new_rev", update.Rev) + c.cachedData.Revision = update.Rev + c.lastFetch = time.Unix(update.Timestamp, 0) + } +} + +// publishRevisionUpdate broadcasts a revision update to other hosts +func (c *FlakeCollector) publishRevisionUpdate(rev string) { + if c.natsConn == nil || !c.natsConn.IsConnected() { + slog.Debug("NATS not connected, skipping revision publish") + return + } + + update := RevisionUpdate{ + Rev: rev, + URL: c.flakeURL, + Timestamp: time.Now().Unix(), + SourceHost: c.hostname, + } + + data, err := json.Marshal(update) + if err != nil { + slog.Debug("Failed to marshal revision update", "error", err) + return + } + + if err := c.natsConn.Publish(c.natsSubject, data); err != nil { + slog.Debug("Failed to publish revision update", "error", err) + return + } + + slog.Debug("Published revision update via NATS", "rev", rev) +} + +// Close gracefully shuts down the NATS connection +func (c *FlakeCollector) Close() error { + if c.natsSub != nil { + if err := c.natsSub.Unsubscribe(); err != nil { + slog.Debug("Failed to unsubscribe from NATS", "error", err) + } + } + + if c.natsConn != nil { + c.natsConn.Close() + } + + return nil +} diff --git a/config/config.go b/config/config.go index 9d357a5..342cd6b 100644 --- a/config/config.go +++ b/config/config.go @@ -11,6 +11,12 @@ type Config struct { FlakeCollector bool FlakeURL string FlakeCheckInterval time.Duration + + // NATS configuration + FlakeNATSEnable bool + FlakeNATSURL string + FlakeNATSSubject string + FlakeNATSCredentialsFile string } func Parse() (*Config, error) { @@ -21,11 +27,21 @@ func Parse() (*Config, error) { flag.StringVar(&cfg.FlakeURL, "flake.url", "", "Flake URL for revision comparison (required if flake collector enabled)") flag.DurationVar(&cfg.FlakeCheckInterval, "flake.check-interval", time.Hour, "Interval between remote flake checks") + // NATS flags + flag.BoolVar(&cfg.FlakeNATSEnable, "flake.nats.enable", false, "Enable NATS cache sharing") + flag.StringVar(&cfg.FlakeNATSURL, "flake.nats.url", "nats://localhost:4222", "NATS server URL") + flag.StringVar(&cfg.FlakeNATSSubject, "flake.nats.subject", "nixos-exporter.remote-rev", "NATS subject for revision updates") + flag.StringVar(&cfg.FlakeNATSCredentialsFile, "flake.nats.credentials-file", "", "NATS credentials file (optional)") + flag.Parse() if cfg.FlakeCollector && cfg.FlakeURL == "" { return nil, fmt.Errorf("--flake.url is required when --collector.flake is enabled") } + if cfg.FlakeNATSEnable && !cfg.FlakeCollector { + return nil, fmt.Errorf("--flake.nats.enable requires --collector.flake to be enabled") + } + return cfg, nil } diff --git a/flake.nix b/flake.nix index 47c9696..42f1532 100644 --- a/flake.nix +++ b/flake.nix @@ -26,7 +26,7 @@ pname = "nixos-exporter"; inherit version; src = ./.; - vendorHash = "sha256-NnvB20rORPS5QF5enbb5KpWaKZ70ybSgfd7wjk21/Cg="; + vendorHash = "sha256-AVtGXeAmbc6ePLZgayWiqnGxxgg0ON1uG5ZzTjLjdwQ="; meta = with pkgs.lib; { description = "Prometheus exporter for NixOS-specific metrics"; diff --git a/go.mod b/go.mod index c29452b..5e7a5ea 100644 --- a/go.mod +++ b/go.mod @@ -1,17 +1,23 @@ module git.t-juice.club/torjus/nixos-exporter -go 1.23 +go 1.23.0 -require github.com/prometheus/client_golang v1.20.5 +require ( + github.com/nats-io/nats.go v1.48.0 + github.com/prometheus/client_golang v1.20.5 +) require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/klauspost/compress v1.17.9 // indirect + github.com/klauspost/compress v1.18.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/nats-io/nkeys v0.4.11 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.55.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect - golang.org/x/sys v0.22.0 // indirect + golang.org/x/crypto v0.37.0 // indirect + golang.org/x/sys v0.32.0 // indirect google.golang.org/protobuf v1.34.2 // indirect ) diff --git a/go.sum b/go.sum index d5318cf..5de57a5 100644 --- a/go.sum +++ b/go.sum @@ -4,12 +4,18 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= -github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/nats-io/nats.go v1.48.0 h1:pSFyXApG+yWU/TgbKCjmm5K4wrHu86231/w84qRVR+U= +github.com/nats-io/nats.go v1.48.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= +github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= +github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y= github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= @@ -18,7 +24,9 @@ github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= -golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= -golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE= +golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc= +golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= +golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= diff --git a/main.go b/main.go index c88011a..2c09899 100644 --- a/main.go +++ b/main.go @@ -15,7 +15,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" ) -const version = "0.2.3" +const version = "0.3.0" func main() { cfg, err := config.Parse() @@ -30,10 +30,23 @@ func main() { slog.Info("Registered generation collector") // Register flake collector if enabled + var flakeCollector *collector.FlakeCollector if cfg.FlakeCollector { - flakeCollector := collector.NewFlakeCollector(cfg.FlakeURL, cfg.FlakeCheckInterval) + var err error + flakeCollector, err = collector.NewFlakeCollectorWithNATS(collector.FlakeCollectorConfig{ + FlakeURL: cfg.FlakeURL, + CheckInterval: cfg.FlakeCheckInterval, + NATSEnabled: cfg.FlakeNATSEnable, + NATSURL: cfg.FlakeNATSURL, + NATSSubject: cfg.FlakeNATSSubject, + NATSCredentialsFile: cfg.FlakeNATSCredentialsFile, + }) + if err != nil { + slog.Error("Failed to create flake collector", "error", err) + os.Exit(1) + } prometheus.MustRegister(flakeCollector) - slog.Info("Registered flake collector", "url", cfg.FlakeURL, "check_interval", cfg.FlakeCheckInterval) + slog.Info("Registered flake collector", "url", cfg.FlakeURL, "check_interval", cfg.FlakeCheckInterval, "nats_enabled", cfg.FlakeNATSEnable) } mux := http.NewServeMux() @@ -71,6 +84,13 @@ func main() { <-ctx.Done() slog.Info("Shutting down server") + // Close flake collector (NATS connection) + if flakeCollector != nil { + if err := flakeCollector.Close(); err != nil { + slog.Error("Failed to close flake collector", "error", err) + } + } + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() diff --git a/module.nix b/module.nix index 6edb97b..1a0e3d6 100644 --- a/module.nix +++ b/module.nix @@ -37,6 +37,28 @@ in default = "1h"; description = "Interval between remote flake checks."; }; + + nats = { + enable = lib.mkEnableOption "NATS cache sharing"; + + url = lib.mkOption { + type = lib.types.str; + default = "nats://localhost:4222"; + description = "NATS server URL."; + }; + + subject = lib.mkOption { + type = lib.types.str; + default = "nixos-exporter.remote-rev"; + description = "NATS subject for revision updates."; + }; + + credentialsFile = lib.mkOption { + type = lib.types.nullOr lib.types.path; + default = null; + description = "Path to NATS credentials file."; + }; + }; }; openFirewall = lib.mkOption { @@ -70,6 +92,10 @@ in assertion = cfg.flake.enable -> cfg.flake.url != ""; message = "services.prometheus.exporters.nixos.flake.url must be set when flake collector is enabled"; } + { + assertion = cfg.flake.nats.enable -> cfg.flake.enable; + message = "services.prometheus.exporters.nixos.flake.nats.enable requires flake collector to be enabled"; + } ]; users.users.${cfg.user} = { @@ -101,6 +127,12 @@ in "--collector.flake" "--flake.url=${cfg.flake.url}" "--flake.check-interval=${cfg.flake.checkInterval}" + ] ++ lib.optionals cfg.flake.nats.enable [ + "--flake.nats.enable" + "--flake.nats.url=${cfg.flake.nats.url}" + "--flake.nats.subject=${cfg.flake.nats.subject}" + ] ++ lib.optionals (cfg.flake.nats.enable && cfg.flake.nats.credentialsFile != null) [ + "--flake.nats.credentials-file=${cfg.flake.nats.credentialsFile}" ]); Restart = "on-failure"; RestartSec = "5s";