feat: add Prometheus metrics to listener service

Add an optional Prometheus metrics HTTP endpoint to the listener for
monitoring deployment operations. Includes four metrics:

- homelab_deploy_deployments_total (counter with status/action/error_code)
- homelab_deploy_deployment_duration_seconds (histogram with action/success)
- homelab_deploy_deployment_in_progress (gauge)
- homelab_deploy_info (gauge with hostname/tier/role/version)

New CLI flags: --metrics-enabled, --metrics-addr (default :9972)
New NixOS options: metrics.enable, metrics.address, metrics.openFirewall

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-07 07:58:22 +01:00
parent 56365835c7
commit 79db119d1c
10 changed files with 613 additions and 9 deletions

View File

@@ -8,6 +8,7 @@ import (
"git.t-juice.club/torjus/homelab-deploy/internal/deploy"
"git.t-juice.club/torjus/homelab-deploy/internal/messages"
"git.t-juice.club/torjus/homelab-deploy/internal/metrics"
"git.t-juice.club/torjus/homelab-deploy/internal/nats"
)
@@ -22,6 +23,9 @@ type Config struct {
Timeout time.Duration
DeploySubjects []string
DiscoverSubject string
MetricsEnabled bool
MetricsAddr string
Version string
}
// Listener handles deployment requests from NATS.
@@ -38,6 +42,10 @@ type Listener struct {
// restartCh signals that the listener should exit for restart
// (e.g., after a successful switch deployment)
restartCh chan struct{}
// metrics server and collector (nil if metrics disabled)
metricsServer *metrics.Server
metrics *metrics.Collector
}
// New creates a new listener with the given configuration.
@@ -46,17 +54,42 @@ func New(cfg Config, logger *slog.Logger) *Listener {
logger = slog.Default()
}
return &Listener{
l := &Listener{
cfg: cfg,
executor: deploy.NewExecutor(cfg.FlakeURL, cfg.Hostname, cfg.Timeout),
lock: deploy.NewLock(),
logger: logger,
restartCh: make(chan struct{}, 1),
}
if cfg.MetricsEnabled {
l.metricsServer = metrics.NewServer(metrics.ServerConfig{
Addr: cfg.MetricsAddr,
Logger: logger,
})
l.metrics = l.metricsServer.Collector()
}
return l
}
// Run starts the listener and blocks until the context is cancelled.
func (l *Listener) Run(ctx context.Context) error {
// Start metrics server if enabled
if l.metricsServer != nil {
if err := l.metricsServer.Start(); err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
defer func() {
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = l.metricsServer.Shutdown(shutdownCtx)
}()
// Set instance info metric
l.metrics.SetInfo(l.cfg.Hostname, l.cfg.Tier, l.cfg.Role, l.cfg.Version)
}
// Connect to NATS
l.logger.Info("connecting to NATS",
"url", l.cfg.NATSUrl,
@@ -136,6 +169,9 @@ func (l *Listener) handleDeployRequest(subject string, data []byte) {
messages.StatusRejected,
err.Error(),
).WithError(messages.ErrorInvalidAction))
if l.metrics != nil {
l.metrics.RecordRejection(req.Action, messages.ErrorInvalidAction)
}
return
}
@@ -150,6 +186,9 @@ func (l *Listener) handleDeployRequest(subject string, data []byte) {
messages.StatusRejected,
"another deployment is already in progress",
).WithError(messages.ErrorAlreadyRunning))
if l.metrics != nil {
l.metrics.RecordRejection(req.Action, messages.ErrorAlreadyRunning)
}
return
}
defer l.lock.Release()
@@ -161,6 +200,12 @@ func (l *Listener) handleDeployRequest(subject string, data []byte) {
fmt.Sprintf("starting deployment: %s", l.executor.BuildCommand(req.Action, req.Revision)),
))
// Record deployment start for metrics
if l.metrics != nil {
l.metrics.RecordDeploymentStart()
}
startTime := time.Now()
// Validate revision
ctx := context.Background()
if err := l.executor.ValidateRevision(ctx, req.Revision); err != nil {
@@ -173,6 +218,10 @@ func (l *Listener) handleDeployRequest(subject string, data []byte) {
messages.StatusFailed,
fmt.Sprintf("revision validation failed: %v", err),
).WithError(messages.ErrorInvalidRevision))
if l.metrics != nil {
duration := time.Since(startTime).Seconds()
l.metrics.RecordDeploymentFailure(req.Action, messages.ErrorInvalidRevision, duration)
}
return
}
@@ -184,6 +233,7 @@ func (l *Listener) handleDeployRequest(subject string, data []byte) {
)
result := l.executor.Execute(ctx, req.Action, req.Revision)
duration := time.Since(startTime).Seconds()
if result.Success {
l.logger.Info("deployment completed successfully",
@@ -194,6 +244,9 @@ func (l *Listener) handleDeployRequest(subject string, data []byte) {
messages.StatusCompleted,
"deployment completed successfully",
))
if l.metrics != nil {
l.metrics.RecordDeploymentEnd(req.Action, true, duration)
}
// After a successful switch, signal restart so we pick up any new version
if req.Action == messages.ActionSwitch {
@@ -220,6 +273,9 @@ func (l *Listener) handleDeployRequest(subject string, data []byte) {
messages.StatusFailed,
fmt.Sprintf("deployment failed (exit code %d): %s", result.ExitCode, result.Stderr),
).WithError(errorCode))
if l.metrics != nil {
l.metrics.RecordDeploymentFailure(req.Action, errorCode, duration)
}
}
}

View File

@@ -0,0 +1,98 @@
// Package metrics provides Prometheus metrics for the homelab-deploy listener.
package metrics
import (
"git.t-juice.club/torjus/homelab-deploy/internal/messages"
"github.com/prometheus/client_golang/prometheus"
)
// Collector holds all Prometheus metrics for the listener.
type Collector struct {
deploymentsTotal *prometheus.CounterVec
deploymentDuration *prometheus.HistogramVec
deploymentInProgress prometheus.Gauge
info *prometheus.GaugeVec
}
// NewCollector creates a new metrics collector and registers it with the given registerer.
func NewCollector(reg prometheus.Registerer) *Collector {
c := &Collector{
deploymentsTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "homelab_deploy_deployments_total",
Help: "Total deployment requests processed",
},
[]string{"status", "action", "error_code"},
),
deploymentDuration: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "homelab_deploy_deployment_duration_seconds",
Help: "Deployment execution time",
// Bucket boundaries for typical NixOS build times
Buckets: []float64{30, 60, 120, 300, 600, 900, 1200, 1800},
},
[]string{"action", "success"},
),
deploymentInProgress: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "homelab_deploy_deployment_in_progress",
Help: "1 if deployment running, 0 otherwise",
},
),
info: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "homelab_deploy_info",
Help: "Static instance metadata",
},
[]string{"hostname", "tier", "role", "version"},
),
}
reg.MustRegister(c.deploymentsTotal)
reg.MustRegister(c.deploymentDuration)
reg.MustRegister(c.deploymentInProgress)
reg.MustRegister(c.info)
return c
}
// SetInfo sets the static instance metadata.
func (c *Collector) SetInfo(hostname, tier, role, version string) {
c.info.WithLabelValues(hostname, tier, role, version).Set(1)
}
// RecordDeploymentStart marks the start of a deployment.
func (c *Collector) RecordDeploymentStart() {
c.deploymentInProgress.Set(1)
}
// RecordDeploymentEnd records the completion of a deployment.
func (c *Collector) RecordDeploymentEnd(action messages.Action, success bool, durationSeconds float64) {
c.deploymentInProgress.Set(0)
successLabel := "false"
if success {
successLabel = "true"
}
c.deploymentDuration.WithLabelValues(string(action), successLabel).Observe(durationSeconds)
status := "completed"
if !success {
status = "failed"
}
c.deploymentsTotal.WithLabelValues(status, string(action), "").Inc()
}
// RecordDeploymentFailure records a deployment failure with an error code.
func (c *Collector) RecordDeploymentFailure(action messages.Action, errorCode messages.ErrorCode, durationSeconds float64) {
c.deploymentInProgress.Set(0)
c.deploymentDuration.WithLabelValues(string(action), "false").Observe(durationSeconds)
c.deploymentsTotal.WithLabelValues("failed", string(action), string(errorCode)).Inc()
}
// RecordRejection records a rejected deployment request.
func (c *Collector) RecordRejection(action messages.Action, errorCode messages.ErrorCode) {
c.deploymentsTotal.WithLabelValues("rejected", string(action), string(errorCode)).Inc()
}

View File

@@ -0,0 +1,210 @@
package metrics
import (
"context"
"io"
"net/http"
"strings"
"testing"
"time"
"git.t-juice.club/torjus/homelab-deploy/internal/messages"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
)
func TestCollector_SetInfo(t *testing.T) {
reg := prometheus.NewRegistry()
c := NewCollector(reg)
c.SetInfo("testhost", "test", "web", "1.0.0")
expected := `
# HELP homelab_deploy_info Static instance metadata
# TYPE homelab_deploy_info gauge
homelab_deploy_info{hostname="testhost",role="web",tier="test",version="1.0.0"} 1
`
if err := testutil.GatherAndCompare(reg, strings.NewReader(expected), "homelab_deploy_info"); err != nil {
t.Errorf("unexpected metrics: %v", err)
}
}
func TestCollector_RecordDeploymentStart(t *testing.T) {
reg := prometheus.NewRegistry()
c := NewCollector(reg)
c.RecordDeploymentStart()
expected := `
# HELP homelab_deploy_deployment_in_progress 1 if deployment running, 0 otherwise
# TYPE homelab_deploy_deployment_in_progress gauge
homelab_deploy_deployment_in_progress 1
`
if err := testutil.GatherAndCompare(reg, strings.NewReader(expected), "homelab_deploy_deployment_in_progress"); err != nil {
t.Errorf("unexpected metrics: %v", err)
}
}
func TestCollector_RecordDeploymentEnd_Success(t *testing.T) {
reg := prometheus.NewRegistry()
c := NewCollector(reg)
c.RecordDeploymentStart()
c.RecordDeploymentEnd(messages.ActionSwitch, true, 120.5)
// Check in_progress is 0
inProgressExpected := `
# HELP homelab_deploy_deployment_in_progress 1 if deployment running, 0 otherwise
# TYPE homelab_deploy_deployment_in_progress gauge
homelab_deploy_deployment_in_progress 0
`
if err := testutil.GatherAndCompare(reg, strings.NewReader(inProgressExpected), "homelab_deploy_deployment_in_progress"); err != nil {
t.Errorf("unexpected in_progress metrics: %v", err)
}
// Check counter incremented
counterExpected := `
# HELP homelab_deploy_deployments_total Total deployment requests processed
# TYPE homelab_deploy_deployments_total counter
homelab_deploy_deployments_total{action="switch",error_code="",status="completed"} 1
`
if err := testutil.GatherAndCompare(reg, strings.NewReader(counterExpected), "homelab_deploy_deployments_total"); err != nil {
t.Errorf("unexpected counter metrics: %v", err)
}
}
func TestCollector_RecordDeploymentEnd_Failure(t *testing.T) {
reg := prometheus.NewRegistry()
c := NewCollector(reg)
c.RecordDeploymentStart()
c.RecordDeploymentEnd(messages.ActionBoot, false, 60.0)
counterExpected := `
# HELP homelab_deploy_deployments_total Total deployment requests processed
# TYPE homelab_deploy_deployments_total counter
homelab_deploy_deployments_total{action="boot",error_code="",status="failed"} 1
`
if err := testutil.GatherAndCompare(reg, strings.NewReader(counterExpected), "homelab_deploy_deployments_total"); err != nil {
t.Errorf("unexpected counter metrics: %v", err)
}
}
func TestCollector_RecordDeploymentFailure(t *testing.T) {
reg := prometheus.NewRegistry()
c := NewCollector(reg)
c.RecordDeploymentStart()
c.RecordDeploymentFailure(messages.ActionSwitch, messages.ErrorBuildFailed, 300.0)
counterExpected := `
# HELP homelab_deploy_deployments_total Total deployment requests processed
# TYPE homelab_deploy_deployments_total counter
homelab_deploy_deployments_total{action="switch",error_code="build_failed",status="failed"} 1
`
if err := testutil.GatherAndCompare(reg, strings.NewReader(counterExpected), "homelab_deploy_deployments_total"); err != nil {
t.Errorf("unexpected counter metrics: %v", err)
}
}
func TestCollector_RecordRejection(t *testing.T) {
reg := prometheus.NewRegistry()
c := NewCollector(reg)
c.RecordRejection(messages.ActionSwitch, messages.ErrorAlreadyRunning)
expected := `
# HELP homelab_deploy_deployments_total Total deployment requests processed
# TYPE homelab_deploy_deployments_total counter
homelab_deploy_deployments_total{action="switch",error_code="already_running",status="rejected"} 1
`
if err := testutil.GatherAndCompare(reg, strings.NewReader(expected), "homelab_deploy_deployments_total"); err != nil {
t.Errorf("unexpected metrics: %v", err)
}
}
func TestServer_StartShutdown(t *testing.T) {
srv := NewServer(ServerConfig{
Addr: ":0", // Let OS pick a free port
})
if err := srv.Start(); err != nil {
t.Fatalf("failed to start server: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
t.Errorf("failed to shutdown server: %v", err)
}
}
func TestServer_Endpoints(t *testing.T) {
srv := NewServer(ServerConfig{
Addr: "127.0.0.1:19972", // Use a fixed port for testing
})
if err := srv.Start(); err != nil {
t.Fatalf("failed to start server: %v", err)
}
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = srv.Shutdown(ctx)
}()
// Give server time to start
time.Sleep(50 * time.Millisecond)
t.Run("health endpoint", func(t *testing.T) {
resp, err := http.Get("http://127.0.0.1:19972/health")
if err != nil {
t.Fatalf("failed to get health endpoint: %v", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
t.Errorf("expected status 200, got %d", resp.StatusCode)
}
body, _ := io.ReadAll(resp.Body)
if string(body) != "ok" {
t.Errorf("expected body 'ok', got %q", string(body))
}
})
t.Run("metrics endpoint", func(t *testing.T) {
// Set some info to have metrics to display
srv.Collector().SetInfo("testhost", "test", "web", "1.0.0")
resp, err := http.Get("http://127.0.0.1:19972/metrics")
if err != nil {
t.Fatalf("failed to get metrics endpoint: %v", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
t.Errorf("expected status 200, got %d", resp.StatusCode)
}
body, _ := io.ReadAll(resp.Body)
bodyStr := string(body)
if !strings.Contains(bodyStr, "homelab_deploy_info") {
t.Error("expected metrics to contain homelab_deploy_info")
}
})
}
func TestServer_Collector(t *testing.T) {
srv := NewServer(ServerConfig{
Addr: ":0",
})
collector := srv.Collector()
if collector == nil {
t.Error("expected non-nil collector")
}
}

View File

@@ -0,0 +1,84 @@
package metrics
import (
"context"
"fmt"
"log/slog"
"net/http"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// ServerConfig holds configuration for the metrics server.
type ServerConfig struct {
Addr string
Logger *slog.Logger
}
// Server serves Prometheus metrics over HTTP.
type Server struct {
httpServer *http.Server
registry *prometheus.Registry
collector *Collector
logger *slog.Logger
}
// NewServer creates a new metrics server.
func NewServer(cfg ServerConfig) *Server {
logger := cfg.Logger
if logger == nil {
logger = slog.Default()
}
registry := prometheus.NewRegistry()
collector := NewCollector(registry)
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{
Registry: registry,
}))
mux.HandleFunc("/health", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok"))
})
return &Server{
httpServer: &http.Server{
Addr: cfg.Addr,
Handler: mux,
ReadHeaderTimeout: 10 * time.Second,
},
registry: registry,
collector: collector,
logger: logger,
}
}
// Collector returns the metrics collector.
func (s *Server) Collector() *Collector {
return s.collector
}
// Start starts the HTTP server in a goroutine.
func (s *Server) Start() error {
s.logger.Info("starting metrics server", "addr", s.httpServer.Addr)
go func() {
if err := s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
s.logger.Error("metrics server error", "error", err)
}
}()
return nil
}
// Shutdown gracefully shuts down the server.
func (s *Server) Shutdown(ctx context.Context) error {
s.logger.Info("shutting down metrics server")
if err := s.httpServer.Shutdown(ctx); err != nil {
return fmt.Errorf("failed to shutdown metrics server: %w", err)
}
return nil
}