feat: add NATS cache sharing and smart cache refresh

Add two complementary features to reduce remote revision cache staleness:

1. Smart local cache: When current system revision matches cached remote
   revision, force an immediate cache refresh to check for newer revisions.

2. NATS integration: Share cache updates across hosts via NATS pub/sub.
   Hosts publish revision updates when they fetch new data, and subscribe
   to receive updates from other hosts. Features include:
   - Auto-reconnect with infinite retries
   - Graceful fallback when NATS unavailable
   - Filtering by flake URL and hostname

New CLI flags:
  --flake.nats.enable
  --flake.nats.url
  --flake.nats.subject
  --flake.nats.credentials-file

New NixOS module options under services.prometheus.exporters.nixos.flake.nats

Bumps version to 0.3.0.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-08 23:05:52 +01:00
parent cf0ce85899
commit 5cc0e7eadd
7 changed files with 315 additions and 18 deletions

View File

@@ -5,14 +5,36 @@ import (
"encoding/json"
"fmt"
"log/slog"
"os"
"os/exec"
"strings"
"sync"
"time"
"github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus"
)
// RevisionUpdate is the message format for NATS pub/sub
type RevisionUpdate struct {
Rev string `json:"rev"`
URL string `json:"url"`
Timestamp int64 `json:"timestamp"`
SourceHost string `json:"source_host"`
}
// FlakeCollectorConfig holds configuration for the FlakeCollector
type FlakeCollectorConfig struct {
FlakeURL string
CheckInterval time.Duration
// NATS configuration
NATSEnabled bool
NATSURL string
NATSSubject string
NATSCredentialsFile string
}
// nixosVersionInfo holds the parsed output of nixos-version --json
type nixosVersionInfo struct {
ConfigurationRevision string `json:"configurationRevision"`
@@ -33,6 +55,13 @@ type FlakeCollector struct {
cachedData *flakeMetadata
lastFetch time.Time
fetchError error
// NATS fields
natsEnabled bool
natsConn *nats.Conn
natsSub *nats.Subscription
natsSubject string
hostname string
}
type flakeMetadata struct {
@@ -62,6 +91,32 @@ type originalInfo struct {
}
func NewFlakeCollector(flakeURL string, checkInterval time.Duration) *FlakeCollector {
return newFlakeCollector(flakeURL, checkInterval)
}
// NewFlakeCollectorWithNATS creates a FlakeCollector with optional NATS integration
func NewFlakeCollectorWithNATS(cfg FlakeCollectorConfig) (*FlakeCollector, error) {
c := newFlakeCollector(cfg.FlakeURL, cfg.CheckInterval)
if cfg.NATSEnabled {
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown"
}
c.hostname = hostname
c.natsEnabled = true
c.natsSubject = cfg.NATSSubject
if err := c.connectNATS(cfg.NATSURL, cfg.NATSCredentialsFile); err != nil {
// Log warning but continue without NATS
slog.Warn("Failed to connect to NATS, continuing without cache sharing", "error", err)
}
}
return c, nil
}
func newFlakeCollector(flakeURL string, checkInterval time.Duration) *FlakeCollector {
return &FlakeCollector{
flakeURL: flakeURL,
checkInterval: checkInterval,
@@ -108,21 +163,43 @@ func (c *FlakeCollector) Collect(ch chan<- prometheus.Metric) {
func (c *FlakeCollector) getFlakeData() (*flakeMetadata, error) {
c.mu.RLock()
if c.cachedData != nil && time.Since(c.lastFetch) < c.checkInterval {
data := c.cachedData
c.mu.RUnlock()
return data, nil
}
cacheValid := c.cachedData != nil && time.Since(c.lastFetch) < c.checkInterval
cachedData := c.cachedData
c.mu.RUnlock()
// Smart cache: check if current revision matches cached remote revision
// If not, force a refresh even if cache hasn't expired
forceRefresh := false
if cacheValid && cachedData != nil {
versionInfo, err := getNixosVersionInfo()
if err == nil && versionInfo.ConfigurationRevision != "" {
currentRev := versionInfo.ConfigurationRevision
cachedRemoteRev := cachedData.Revision
// If current system rev now matches cached remote rev, refresh to get latest
if currentRev == cachedRemoteRev || strings.HasPrefix(cachedRemoteRev, currentRev) {
forceRefresh = true
slog.Debug("Current revision matches cached remote, forcing refresh")
}
}
}
if cacheValid && !forceRefresh {
return cachedData, nil
}
c.mu.Lock()
defer c.mu.Unlock()
// Double-check after acquiring write lock
if c.cachedData != nil && time.Since(c.lastFetch) < c.checkInterval {
if !forceRefresh && c.cachedData != nil && time.Since(c.lastFetch) < c.checkInterval {
return c.cachedData, nil
}
oldRevision := ""
if c.cachedData != nil {
oldRevision = c.cachedData.Revision
}
data, err := fetchFlakeMetadata(c.flakeURL)
if err != nil {
c.fetchError = err
@@ -137,6 +214,12 @@ func (c *FlakeCollector) getFlakeData() (*flakeMetadata, error) {
c.cachedData = data
c.lastFetch = time.Now()
c.fetchError = nil
// Publish update via NATS if revision changed
if c.natsEnabled && data.Revision != oldRevision {
c.publishRevisionUpdate(data.Revision)
}
return data, nil
}
@@ -248,3 +331,135 @@ func getNixosVersionInfo() (*nixosVersionInfo, error) {
return &info, nil
}
// connectNATS establishes connection to NATS server with auto-reconnect
func (c *FlakeCollector) connectNATS(url, credentialsFile string) error {
opts := []nats.Option{
nats.MaxReconnects(-1), // Infinite reconnects
nats.ReconnectWait(5 * time.Second),
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
if err != nil {
slog.Warn("NATS disconnected", "error", err)
}
}),
nats.ReconnectHandler(func(_ *nats.Conn) {
slog.Info("NATS reconnected")
// Resubscribe after reconnection
c.mu.Lock()
defer c.mu.Unlock()
if c.natsSub != nil {
if err := c.natsSub.Unsubscribe(); err != nil {
slog.Debug("Failed to unsubscribe old subscription", "error", err)
}
}
sub, err := c.natsConn.Subscribe(c.natsSubject, c.handleRevisionUpdate)
if err != nil {
slog.Error("Failed to resubscribe after reconnect", "error", err)
return
}
c.natsSub = sub
}),
}
if credentialsFile != "" {
opts = append(opts, nats.UserCredentials(credentialsFile))
}
nc, err := nats.Connect(url, opts...)
if err != nil {
return fmt.Errorf("failed to connect to NATS: %w", err)
}
c.natsConn = nc
// Subscribe to revision updates
sub, err := nc.Subscribe(c.natsSubject, c.handleRevisionUpdate)
if err != nil {
nc.Close()
return fmt.Errorf("failed to subscribe to NATS subject: %w", err)
}
c.natsSub = sub
slog.Info("Connected to NATS", "url", url, "subject", c.natsSubject)
return nil
}
// handleRevisionUpdate processes incoming NATS messages
func (c *FlakeCollector) handleRevisionUpdate(msg *nats.Msg) {
var update RevisionUpdate
if err := json.Unmarshal(msg.Data, &update); err != nil {
slog.Debug("Failed to parse NATS revision update", "error", err)
return
}
// Ignore messages from self
if update.SourceHost == c.hostname {
return
}
// Ignore messages for different flake URLs
if update.URL != c.flakeURL {
return
}
c.mu.Lock()
defer c.mu.Unlock()
// Ignore messages older than current cache
if c.lastFetch.Unix() > update.Timestamp {
return
}
// Update cache with new revision
if c.cachedData != nil && c.cachedData.Revision != update.Rev {
slog.Info("Received remote revision update via NATS",
"from", update.SourceHost,
"old_rev", c.cachedData.Revision,
"new_rev", update.Rev)
c.cachedData.Revision = update.Rev
c.lastFetch = time.Unix(update.Timestamp, 0)
}
}
// publishRevisionUpdate broadcasts a revision update to other hosts
func (c *FlakeCollector) publishRevisionUpdate(rev string) {
if c.natsConn == nil || !c.natsConn.IsConnected() {
slog.Debug("NATS not connected, skipping revision publish")
return
}
update := RevisionUpdate{
Rev: rev,
URL: c.flakeURL,
Timestamp: time.Now().Unix(),
SourceHost: c.hostname,
}
data, err := json.Marshal(update)
if err != nil {
slog.Debug("Failed to marshal revision update", "error", err)
return
}
if err := c.natsConn.Publish(c.natsSubject, data); err != nil {
slog.Debug("Failed to publish revision update", "error", err)
return
}
slog.Debug("Published revision update via NATS", "rev", rev)
}
// Close gracefully shuts down the NATS connection
func (c *FlakeCollector) Close() error {
if c.natsSub != nil {
if err := c.natsSub.Unsubscribe(); err != nil {
slog.Debug("Failed to unsubscribe from NATS", "error", err)
}
}
if c.natsConn != nil {
c.natsConn.Close()
}
return nil
}