This repository has been archived on 2026-03-09. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
homelab-deploy/internal/cli/deploy.go
Torjus Håkestad 2c97b6140c fix: check only final responses in AllSucceeded to determine deployment success
The CLI was incorrectly reporting "some deployments failed" even when
deployments succeeded. This was because AllSucceeded() checked if every
response had StatusCompleted, but the Responses slice contains all
messages including intermediate ones like "started". Since started !=
completed, it returned false.

Now AllSucceeded() only examines final responses (using IsFinal()) and
checks that each host's final status is completed.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-07 06:19:08 +01:00

235 lines
6.0 KiB
Go

package cli
import (
"context"
"fmt"
"sync"
"time"
"github.com/google/uuid"
"git.t-juice.club/torjus/homelab-deploy/internal/messages"
"git.t-juice.club/torjus/homelab-deploy/internal/nats"
)
// DeployConfig holds configuration for a deploy operation.
type DeployConfig struct {
NATSUrl string
NKeyFile string
Subject string
Action messages.Action
Revision string
Timeout time.Duration
}
// DeployResult contains the aggregated results from a deployment.
type DeployResult struct {
Responses []*messages.DeployResponse
Errors []error
}
// AllSucceeded returns true if all hosts' final responses indicate success.
func (r *DeployResult) AllSucceeded() bool {
if len(r.Errors) > 0 {
return false
}
// Track the final status for each host
finalStatus := make(map[string]messages.Status)
for _, resp := range r.Responses {
if resp.Status.IsFinal() {
finalStatus[resp.Hostname] = resp.Status
}
}
// Need at least one host with a final status
if len(finalStatus) == 0 {
return false
}
// All final statuses must be completed
for _, status := range finalStatus {
if status != messages.StatusCompleted {
return false
}
}
return true
}
// HostCount returns the number of unique hosts that responded.
func (r *DeployResult) HostCount() int {
seen := make(map[string]bool)
for _, resp := range r.Responses {
seen[resp.Hostname] = true
}
return len(seen)
}
// Deploy executes a deployment to the specified subject and collects responses.
func Deploy(ctx context.Context, cfg DeployConfig, onResponse func(*messages.DeployResponse)) (*DeployResult, error) {
// Connect to NATS
client, err := nats.Connect(nats.Config{
URL: cfg.NATSUrl,
NKeyFile: cfg.NKeyFile,
Name: "homelab-deploy-cli",
})
if err != nil {
return nil, fmt.Errorf("failed to connect to NATS: %w", err)
}
defer client.Close()
// Generate unique reply subject
requestID := uuid.New().String()
replySubject := fmt.Sprintf("deploy.responses.%s", requestID)
// Track responses by hostname to handle multiple messages per host
var mu sync.Mutex
result := &DeployResult{}
hostFinal := make(map[string]bool) // track which hosts have sent final status
hostSeen := make(map[string]bool) // track all hosts that have responded
lastResponse := time.Now()
// Subscribe to reply subject
sub, err := client.Subscribe(replySubject, func(subject string, data []byte) {
resp, err := messages.UnmarshalDeployResponse(data)
if err != nil {
mu.Lock()
result.Errors = append(result.Errors, fmt.Errorf("failed to unmarshal response: %w", err))
mu.Unlock()
return
}
mu.Lock()
result.Responses = append(result.Responses, resp)
hostSeen[resp.Hostname] = true
if resp.Status.IsFinal() {
hostFinal[resp.Hostname] = true
}
lastResponse = time.Now()
mu.Unlock()
if onResponse != nil {
onResponse(resp)
}
})
if err != nil {
return nil, fmt.Errorf("failed to subscribe to reply subject: %w", err)
}
defer func() { _ = sub.Unsubscribe() }()
// Build and send request
req := &messages.DeployRequest{
Action: cfg.Action,
Revision: cfg.Revision,
ReplyTo: replySubject,
}
data, err := req.Marshal()
if err != nil {
return nil, fmt.Errorf("failed to marshal request: %w", err)
}
if err := client.Publish(cfg.Subject, data); err != nil {
return nil, fmt.Errorf("failed to publish request: %w", err)
}
if err := client.Flush(); err != nil {
return nil, fmt.Errorf("failed to flush: %w", err)
}
// Wait for responses with timeout
// Use a dynamic timeout: wait for initial responses, then extend
// timeout after each response until no new responses or max timeout
deadline := time.Now().Add(cfg.Timeout)
idleTimeout := 30 * time.Second // wait this long after last response for new hosts
for {
select {
case <-ctx.Done():
return result, ctx.Err()
case <-time.After(1 * time.Second):
mu.Lock()
seenCount := len(hostSeen)
finalCount := len(hostFinal)
lastResponseTime := lastResponse
mu.Unlock()
now := time.Now()
// Check if we've exceeded the absolute deadline
if now.After(deadline) {
return result, nil
}
// If all hosts that responded have sent final status, we're done
// Add a short grace period for late arrivals from other hosts
if seenCount > 0 && seenCount == finalCount {
// Wait a bit for any other hosts to respond
if now.Sub(lastResponseTime) > 2*time.Second {
return result, nil
}
}
// If we have responses but waiting for more hosts, use idle timeout
if seenCount > 0 && now.Sub(lastResponseTime) > idleTimeout {
return result, nil
}
}
}
}
// Discover sends a discovery request and collects host information.
func Discover(ctx context.Context, natsURL, nkeyFile, discoverSubject string, timeout time.Duration) ([]*messages.DiscoveryResponse, error) {
client, err := nats.Connect(nats.Config{
URL: natsURL,
NKeyFile: nkeyFile,
Name: "homelab-deploy-cli-discover",
})
if err != nil {
return nil, fmt.Errorf("failed to connect to NATS: %w", err)
}
defer client.Close()
requestID := uuid.New().String()
replySubject := fmt.Sprintf("deploy.responses.discover-%s", requestID)
var mu sync.Mutex
var responses []*messages.DiscoveryResponse
sub, err := client.Subscribe(replySubject, func(subject string, data []byte) {
resp, err := messages.UnmarshalDiscoveryResponse(data)
if err != nil {
return
}
mu.Lock()
responses = append(responses, resp)
mu.Unlock()
})
if err != nil {
return nil, fmt.Errorf("failed to subscribe: %w", err)
}
defer func() { _ = sub.Unsubscribe() }()
req := &messages.DiscoveryRequest{ReplyTo: replySubject}
data, err := req.Marshal()
if err != nil {
return nil, fmt.Errorf("failed to marshal request: %w", err)
}
if err := client.Publish(discoverSubject, data); err != nil {
return nil, fmt.Errorf("failed to publish: %w", err)
}
if err := client.Flush(); err != nil {
return nil, fmt.Errorf("failed to flush: %w", err)
}
// Wait for responses
select {
case <-ctx.Done():
return responses, ctx.Err()
case <-time.After(timeout):
return responses, nil
}
}