package builder import ( "context" "fmt" "log/slog" "regexp" "sort" "strings" "sync" "time" "code.t-juice.club/torjus/homelab-deploy/internal/messages" "code.t-juice.club/torjus/homelab-deploy/internal/metrics" "code.t-juice.club/torjus/homelab-deploy/internal/nats" ) // hostnameRegex validates hostnames from flake output. // Allows: alphanumeric, dashes, underscores, dots. var hostnameRegex = regexp.MustCompile(`^[a-zA-Z0-9._-]+$`) // truncateOutputLines truncates output to the first and last N lines if it exceeds 2*N lines, // returning the result as a slice of strings. func truncateOutputLines(output string, keepLines int) []string { lines := strings.Split(output, "\n") if len(lines) <= keepLines*2 { return lines } head := lines[:keepLines] tail := lines[len(lines)-keepLines:] omitted := len(lines) - keepLines*2 result := make([]string, 0, keepLines*2+1) result = append(result, head...) result = append(result, fmt.Sprintf("... (%d lines omitted) ...", omitted)) result = append(result, tail...) return result } // truncateOutput truncates output to the first and last N lines if it exceeds 2*N lines. func truncateOutput(output string, keepLines int) string { lines := strings.Split(output, "\n") if len(lines) <= keepLines*2 { return output } head := lines[:keepLines] tail := lines[len(lines)-keepLines:] omitted := len(lines) - keepLines*2 return strings.Join(head, "\n") + fmt.Sprintf("\n\n... (%d lines omitted) ...\n\n", omitted) + strings.Join(tail, "\n") } // BuilderConfig holds the configuration for the builder. type BuilderConfig struct { NATSUrl string NKeyFile string ConfigFile string Timeout time.Duration MetricsEnabled bool MetricsAddr string } // Builder handles build requests from NATS. type Builder struct { cfg BuilderConfig repoCfg *Config client *nats.Client executor *Executor lock sync.Mutex busy bool logger *slog.Logger // metrics server and collector (nil if metrics disabled) metricsServer *metrics.Server metrics *metrics.BuildCollector } // New creates a new builder with the given configuration. func New(cfg BuilderConfig, repoCfg *Config, logger *slog.Logger) *Builder { if logger == nil { logger = slog.Default() } b := &Builder{ cfg: cfg, repoCfg: repoCfg, executor: NewExecutor(cfg.Timeout), logger: logger, } if cfg.MetricsEnabled { b.metricsServer = metrics.NewServer(metrics.ServerConfig{ Addr: cfg.MetricsAddr, Logger: logger, }) b.metrics = metrics.NewBuildCollector(b.metricsServer.Registry()) } return b } // Run starts the builder and blocks until the context is cancelled. func (b *Builder) Run(ctx context.Context) error { // Start metrics server if enabled if b.metricsServer != nil { if err := b.metricsServer.Start(); err != nil { return fmt.Errorf("failed to start metrics server: %w", err) } defer func() { shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _ = b.metricsServer.Shutdown(shutdownCtx) }() } // Connect to NATS b.logger.Info("connecting to NATS", "url", b.cfg.NATSUrl) client, err := nats.Connect(nats.Config{ URL: b.cfg.NATSUrl, NKeyFile: b.cfg.NKeyFile, Name: "homelab-deploy-builder", }) if err != nil { return fmt.Errorf("failed to connect to NATS: %w", err) } b.client = client defer b.client.Close() b.logger.Info("connected to NATS") // Subscribe to build subjects for each repo for repoName := range b.repoCfg.Repos { // Subscribe to build..all and build.. allSubject := fmt.Sprintf("build.%s.*", repoName) b.logger.Info("subscribing to build subject", "subject", allSubject) if _, err := b.client.Subscribe(allSubject, b.handleBuildRequest); err != nil { return fmt.Errorf("failed to subscribe to %s: %w", allSubject, err) } } b.logger.Info("builder started", "repos", len(b.repoCfg.Repos)) // Wait for context cancellation <-ctx.Done() b.logger.Info("shutting down builder") return nil } func (b *Builder) handleBuildRequest(subject string, data []byte) { req, err := messages.UnmarshalBuildRequest(data) if err != nil { b.logger.Error("failed to unmarshal build request", "subject", subject, "error", err, ) return } b.logger.Info("received build request", "subject", subject, "repo", req.Repo, "target", req.Target, "branch", req.Branch, "reply_to", req.ReplyTo, ) // Validate request if err := req.Validate(); err != nil { b.logger.Warn("invalid build request", "error", err) b.sendResponse(req.ReplyTo, messages.NewBuildResponse( messages.BuildStatusRejected, err.Error(), )) return } // Get repo config repo, err := b.repoCfg.GetRepo(req.Repo) if err != nil { b.logger.Warn("unknown repo", "repo", req.Repo) b.sendResponse(req.ReplyTo, messages.NewBuildResponse( messages.BuildStatusRejected, fmt.Sprintf("unknown repo: %s", req.Repo), )) return } // Try to acquire lock b.lock.Lock() if b.busy { b.lock.Unlock() b.logger.Warn("build already in progress") b.sendResponse(req.ReplyTo, messages.NewBuildResponse( messages.BuildStatusRejected, "another build is already in progress", )) return } b.busy = true b.lock.Unlock() defer func() { b.lock.Lock() b.busy = false b.lock.Unlock() }() // Use default branch if not specified branch := req.Branch if branch == "" { branch = repo.DefaultBranch } // Determine hosts to build var hosts []string if req.Target == "all" { // List hosts from flake b.sendResponse(req.ReplyTo, messages.NewBuildResponse( messages.BuildStatusStarted, "discovering hosts...", )) hosts, err = b.executor.ListHosts(context.Background(), repo.URL, branch) if err != nil { b.logger.Error("failed to list hosts", "error", err) b.sendResponse(req.ReplyTo, messages.NewBuildResponse( messages.BuildStatusFailed, fmt.Sprintf("failed to list hosts: %v", err), ).WithError(err.Error())) if b.metrics != nil { b.metrics.RecordBuildFailure(req.Repo, "") } return } // Filter out hostnames with invalid characters (security: prevent injection) validHosts := make([]string, 0, len(hosts)) for _, host := range hosts { if hostnameRegex.MatchString(host) { validHosts = append(validHosts, host) } else { b.logger.Warn("skipping hostname with invalid characters", "hostname", host) } } hosts = validHosts // Sort hosts for consistent ordering sort.Strings(hosts) } else { hosts = []string{req.Target} } if len(hosts) == 0 { b.sendResponse(req.ReplyTo, messages.NewBuildResponse( messages.BuildStatusFailed, "no hosts to build", )) return } // Send started response b.sendResponse(req.ReplyTo, &messages.BuildResponse{ Status: messages.BuildStatusStarted, Message: fmt.Sprintf("building %d host(s)", len(hosts)), HostsTotal: len(hosts), }) // Build each host sequentially startTime := time.Now() results := make([]messages.BuildHostResult, 0, len(hosts)) succeeded := 0 failed := 0 for i, host := range hosts { hostStart := time.Now() b.logger.Info("building host", "host", host, "repo", req.Repo, "rev", branch, "progress", fmt.Sprintf("%d/%d", i+1, len(hosts)), "command", b.executor.BuildCommand(repo.URL, branch, host), ) result := b.executor.Build(context.Background(), repo.URL, branch, host) hostDuration := time.Since(hostStart).Seconds() hostResult := messages.BuildHostResult{ Host: host, Success: result.Success, DurationSeconds: hostDuration, } if !result.Success { if result.Error != nil { hostResult.Error = result.Error.Error() } if result.Stderr != "" { hostResult.Output = truncateOutput(result.Stderr, 50) } } results = append(results, hostResult) if result.Success { succeeded++ b.logger.Info("host build succeeded", "host", host, "repo", req.Repo, "rev", branch, "duration", hostDuration) if b.metrics != nil { b.metrics.RecordHostBuildSuccess(req.Repo, host, hostDuration) } } else { failed++ b.logger.Error("host build failed", "host", host, "repo", req.Repo, "rev", branch, "error", hostResult.Error) if result.Stderr != "" { for _, line := range truncateOutputLines(result.Stderr, 50) { b.logger.Warn("build output", "host", host, "repo", req.Repo, "line", line) } } if b.metrics != nil { b.metrics.RecordHostBuildFailure(req.Repo, host, hostDuration) } } // Send progress update success := result.Success b.sendResponse(req.ReplyTo, &messages.BuildResponse{ Status: messages.BuildStatusProgress, Host: host, HostSuccess: &success, HostsCompleted: i + 1, HostsTotal: len(hosts), }) } totalDuration := time.Since(startTime).Seconds() // Send final response status := messages.BuildStatusCompleted message := fmt.Sprintf("built %d/%d hosts successfully", succeeded, len(hosts)) if failed > 0 { status = messages.BuildStatusFailed message = fmt.Sprintf("build failed: %d/%d hosts failed", failed, len(hosts)) } b.sendResponse(req.ReplyTo, &messages.BuildResponse{ Status: status, Message: message, Results: results, TotalDurationSeconds: totalDuration, Succeeded: succeeded, Failed: failed, }) // Record overall build metrics if b.metrics != nil { if failed == 0 { b.metrics.RecordBuildSuccess(req.Repo) } else { b.metrics.RecordBuildFailure(req.Repo, "") } } } func (b *Builder) sendResponse(replyTo string, resp *messages.BuildResponse) { data, err := resp.Marshal() if err != nil { b.logger.Error("failed to marshal build response", "error", err) return } if err := b.client.Publish(replyTo, data); err != nil { b.logger.Error("failed to publish build response", "reply_to", replyTo, "error", err, ) } // Flush to ensure response is sent immediately if err := b.client.Flush(); err != nil { b.logger.Error("failed to flush", "error", err) } }