This repository has been archived on 2026-03-09. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
homelab-deploy/internal/cli/deploy.go
Torjus Håkestad 5f3cfc3d21 fix: add nixos-rebuild to PATH and fix CLI hanging after deploy failure
- Add nixos-rebuild to listener service PATH in NixOS module
- Fix CLI deploy command hanging after receiving final status by properly
  tracking lastResponse time and exiting when all hosts have responded

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-07 05:53:22 +01:00

217 lines
5.7 KiB
Go

package cli
import (
"context"
"fmt"
"sync"
"time"
"github.com/google/uuid"
"git.t-juice.club/torjus/homelab-deploy/internal/messages"
"git.t-juice.club/torjus/homelab-deploy/internal/nats"
)
// DeployConfig holds configuration for a deploy operation.
type DeployConfig struct {
NATSUrl string
NKeyFile string
Subject string
Action messages.Action
Revision string
Timeout time.Duration
}
// DeployResult contains the aggregated results from a deployment.
type DeployResult struct {
Responses []*messages.DeployResponse
Errors []error
}
// AllSucceeded returns true if all responses indicate success.
func (r *DeployResult) AllSucceeded() bool {
for _, resp := range r.Responses {
if resp.Status != messages.StatusCompleted {
return false
}
}
return len(r.Responses) > 0 && len(r.Errors) == 0
}
// HostCount returns the number of unique hosts that responded.
func (r *DeployResult) HostCount() int {
seen := make(map[string]bool)
for _, resp := range r.Responses {
seen[resp.Hostname] = true
}
return len(seen)
}
// Deploy executes a deployment to the specified subject and collects responses.
func Deploy(ctx context.Context, cfg DeployConfig, onResponse func(*messages.DeployResponse)) (*DeployResult, error) {
// Connect to NATS
client, err := nats.Connect(nats.Config{
URL: cfg.NATSUrl,
NKeyFile: cfg.NKeyFile,
Name: "homelab-deploy-cli",
})
if err != nil {
return nil, fmt.Errorf("failed to connect to NATS: %w", err)
}
defer client.Close()
// Generate unique reply subject
requestID := uuid.New().String()
replySubject := fmt.Sprintf("deploy.responses.%s", requestID)
// Track responses by hostname to handle multiple messages per host
var mu sync.Mutex
result := &DeployResult{}
hostFinal := make(map[string]bool) // track which hosts have sent final status
hostSeen := make(map[string]bool) // track all hosts that have responded
lastResponse := time.Now()
// Subscribe to reply subject
sub, err := client.Subscribe(replySubject, func(subject string, data []byte) {
resp, err := messages.UnmarshalDeployResponse(data)
if err != nil {
mu.Lock()
result.Errors = append(result.Errors, fmt.Errorf("failed to unmarshal response: %w", err))
mu.Unlock()
return
}
mu.Lock()
result.Responses = append(result.Responses, resp)
hostSeen[resp.Hostname] = true
if resp.Status.IsFinal() {
hostFinal[resp.Hostname] = true
}
lastResponse = time.Now()
mu.Unlock()
if onResponse != nil {
onResponse(resp)
}
})
if err != nil {
return nil, fmt.Errorf("failed to subscribe to reply subject: %w", err)
}
defer func() { _ = sub.Unsubscribe() }()
// Build and send request
req := &messages.DeployRequest{
Action: cfg.Action,
Revision: cfg.Revision,
ReplyTo: replySubject,
}
data, err := req.Marshal()
if err != nil {
return nil, fmt.Errorf("failed to marshal request: %w", err)
}
if err := client.Publish(cfg.Subject, data); err != nil {
return nil, fmt.Errorf("failed to publish request: %w", err)
}
if err := client.Flush(); err != nil {
return nil, fmt.Errorf("failed to flush: %w", err)
}
// Wait for responses with timeout
// Use a dynamic timeout: wait for initial responses, then extend
// timeout after each response until no new responses or max timeout
deadline := time.Now().Add(cfg.Timeout)
idleTimeout := 30 * time.Second // wait this long after last response for new hosts
for {
select {
case <-ctx.Done():
return result, ctx.Err()
case <-time.After(1 * time.Second):
mu.Lock()
seenCount := len(hostSeen)
finalCount := len(hostFinal)
lastResponseTime := lastResponse
mu.Unlock()
now := time.Now()
// Check if we've exceeded the absolute deadline
if now.After(deadline) {
return result, nil
}
// If all hosts that responded have sent final status, we're done
// Add a short grace period for late arrivals from other hosts
if seenCount > 0 && seenCount == finalCount {
// Wait a bit for any other hosts to respond
if now.Sub(lastResponseTime) > 2*time.Second {
return result, nil
}
}
// If we have responses but waiting for more hosts, use idle timeout
if seenCount > 0 && now.Sub(lastResponseTime) > idleTimeout {
return result, nil
}
}
}
}
// Discover sends a discovery request and collects host information.
func Discover(ctx context.Context, natsURL, nkeyFile, discoverSubject string, timeout time.Duration) ([]*messages.DiscoveryResponse, error) {
client, err := nats.Connect(nats.Config{
URL: natsURL,
NKeyFile: nkeyFile,
Name: "homelab-deploy-cli-discover",
})
if err != nil {
return nil, fmt.Errorf("failed to connect to NATS: %w", err)
}
defer client.Close()
requestID := uuid.New().String()
replySubject := fmt.Sprintf("deploy.responses.discover-%s", requestID)
var mu sync.Mutex
var responses []*messages.DiscoveryResponse
sub, err := client.Subscribe(replySubject, func(subject string, data []byte) {
resp, err := messages.UnmarshalDiscoveryResponse(data)
if err != nil {
return
}
mu.Lock()
responses = append(responses, resp)
mu.Unlock()
})
if err != nil {
return nil, fmt.Errorf("failed to subscribe: %w", err)
}
defer func() { _ = sub.Unsubscribe() }()
req := &messages.DiscoveryRequest{ReplyTo: replySubject}
data, err := req.Marshal()
if err != nil {
return nil, fmt.Errorf("failed to marshal request: %w", err)
}
if err := client.Publish(discoverSubject, data); err != nil {
return nil, fmt.Errorf("failed to publish: %w", err)
}
if err := client.Flush(); err != nil {
return nil, fmt.Errorf("failed to flush: %w", err)
}
// Wait for responses
select {
case <-ctx.Done():
return responses, ctx.Err()
case <-time.After(timeout):
return responses, nil
}
}