The CLI was incorrectly reporting "some deployments failed" even when deployments succeeded. This was because AllSucceeded() checked if every response had StatusCompleted, but the Responses slice contains all messages including intermediate ones like "started". Since started != completed, it returned false. Now AllSucceeded() only examines final responses (using IsFinal()) and checks that each host's final status is completed. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
235 lines
6.0 KiB
Go
235 lines
6.0 KiB
Go
package cli
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
|
|
"git.t-juice.club/torjus/homelab-deploy/internal/messages"
|
|
"git.t-juice.club/torjus/homelab-deploy/internal/nats"
|
|
)
|
|
|
|
// DeployConfig holds configuration for a deploy operation.
|
|
type DeployConfig struct {
|
|
NATSUrl string
|
|
NKeyFile string
|
|
Subject string
|
|
Action messages.Action
|
|
Revision string
|
|
Timeout time.Duration
|
|
}
|
|
|
|
// DeployResult contains the aggregated results from a deployment.
|
|
type DeployResult struct {
|
|
Responses []*messages.DeployResponse
|
|
Errors []error
|
|
}
|
|
|
|
// AllSucceeded returns true if all hosts' final responses indicate success.
|
|
func (r *DeployResult) AllSucceeded() bool {
|
|
if len(r.Errors) > 0 {
|
|
return false
|
|
}
|
|
|
|
// Track the final status for each host
|
|
finalStatus := make(map[string]messages.Status)
|
|
for _, resp := range r.Responses {
|
|
if resp.Status.IsFinal() {
|
|
finalStatus[resp.Hostname] = resp.Status
|
|
}
|
|
}
|
|
|
|
// Need at least one host with a final status
|
|
if len(finalStatus) == 0 {
|
|
return false
|
|
}
|
|
|
|
// All final statuses must be completed
|
|
for _, status := range finalStatus {
|
|
if status != messages.StatusCompleted {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// HostCount returns the number of unique hosts that responded.
|
|
func (r *DeployResult) HostCount() int {
|
|
seen := make(map[string]bool)
|
|
for _, resp := range r.Responses {
|
|
seen[resp.Hostname] = true
|
|
}
|
|
return len(seen)
|
|
}
|
|
|
|
// Deploy executes a deployment to the specified subject and collects responses.
|
|
func Deploy(ctx context.Context, cfg DeployConfig, onResponse func(*messages.DeployResponse)) (*DeployResult, error) {
|
|
// Connect to NATS
|
|
client, err := nats.Connect(nats.Config{
|
|
URL: cfg.NATSUrl,
|
|
NKeyFile: cfg.NKeyFile,
|
|
Name: "homelab-deploy-cli",
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to connect to NATS: %w", err)
|
|
}
|
|
defer client.Close()
|
|
|
|
// Generate unique reply subject
|
|
requestID := uuid.New().String()
|
|
replySubject := fmt.Sprintf("deploy.responses.%s", requestID)
|
|
|
|
// Track responses by hostname to handle multiple messages per host
|
|
var mu sync.Mutex
|
|
result := &DeployResult{}
|
|
hostFinal := make(map[string]bool) // track which hosts have sent final status
|
|
hostSeen := make(map[string]bool) // track all hosts that have responded
|
|
lastResponse := time.Now()
|
|
|
|
// Subscribe to reply subject
|
|
sub, err := client.Subscribe(replySubject, func(subject string, data []byte) {
|
|
resp, err := messages.UnmarshalDeployResponse(data)
|
|
if err != nil {
|
|
mu.Lock()
|
|
result.Errors = append(result.Errors, fmt.Errorf("failed to unmarshal response: %w", err))
|
|
mu.Unlock()
|
|
return
|
|
}
|
|
|
|
mu.Lock()
|
|
result.Responses = append(result.Responses, resp)
|
|
hostSeen[resp.Hostname] = true
|
|
if resp.Status.IsFinal() {
|
|
hostFinal[resp.Hostname] = true
|
|
}
|
|
lastResponse = time.Now()
|
|
mu.Unlock()
|
|
|
|
if onResponse != nil {
|
|
onResponse(resp)
|
|
}
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to subscribe to reply subject: %w", err)
|
|
}
|
|
defer func() { _ = sub.Unsubscribe() }()
|
|
|
|
// Build and send request
|
|
req := &messages.DeployRequest{
|
|
Action: cfg.Action,
|
|
Revision: cfg.Revision,
|
|
ReplyTo: replySubject,
|
|
}
|
|
|
|
data, err := req.Marshal()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to marshal request: %w", err)
|
|
}
|
|
|
|
if err := client.Publish(cfg.Subject, data); err != nil {
|
|
return nil, fmt.Errorf("failed to publish request: %w", err)
|
|
}
|
|
|
|
if err := client.Flush(); err != nil {
|
|
return nil, fmt.Errorf("failed to flush: %w", err)
|
|
}
|
|
|
|
// Wait for responses with timeout
|
|
// Use a dynamic timeout: wait for initial responses, then extend
|
|
// timeout after each response until no new responses or max timeout
|
|
deadline := time.Now().Add(cfg.Timeout)
|
|
idleTimeout := 30 * time.Second // wait this long after last response for new hosts
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return result, ctx.Err()
|
|
case <-time.After(1 * time.Second):
|
|
mu.Lock()
|
|
seenCount := len(hostSeen)
|
|
finalCount := len(hostFinal)
|
|
lastResponseTime := lastResponse
|
|
mu.Unlock()
|
|
|
|
now := time.Now()
|
|
|
|
// Check if we've exceeded the absolute deadline
|
|
if now.After(deadline) {
|
|
return result, nil
|
|
}
|
|
|
|
// If all hosts that responded have sent final status, we're done
|
|
// Add a short grace period for late arrivals from other hosts
|
|
if seenCount > 0 && seenCount == finalCount {
|
|
// Wait a bit for any other hosts to respond
|
|
if now.Sub(lastResponseTime) > 2*time.Second {
|
|
return result, nil
|
|
}
|
|
}
|
|
|
|
// If we have responses but waiting for more hosts, use idle timeout
|
|
if seenCount > 0 && now.Sub(lastResponseTime) > idleTimeout {
|
|
return result, nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Discover sends a discovery request and collects host information.
|
|
func Discover(ctx context.Context, natsURL, nkeyFile, discoverSubject string, timeout time.Duration) ([]*messages.DiscoveryResponse, error) {
|
|
client, err := nats.Connect(nats.Config{
|
|
URL: natsURL,
|
|
NKeyFile: nkeyFile,
|
|
Name: "homelab-deploy-cli-discover",
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to connect to NATS: %w", err)
|
|
}
|
|
defer client.Close()
|
|
|
|
requestID := uuid.New().String()
|
|
replySubject := fmt.Sprintf("deploy.responses.discover-%s", requestID)
|
|
|
|
var mu sync.Mutex
|
|
var responses []*messages.DiscoveryResponse
|
|
|
|
sub, err := client.Subscribe(replySubject, func(subject string, data []byte) {
|
|
resp, err := messages.UnmarshalDiscoveryResponse(data)
|
|
if err != nil {
|
|
return
|
|
}
|
|
mu.Lock()
|
|
responses = append(responses, resp)
|
|
mu.Unlock()
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to subscribe: %w", err)
|
|
}
|
|
defer func() { _ = sub.Unsubscribe() }()
|
|
|
|
req := &messages.DiscoveryRequest{ReplyTo: replySubject}
|
|
data, err := req.Marshal()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to marshal request: %w", err)
|
|
}
|
|
|
|
if err := client.Publish(discoverSubject, data); err != nil {
|
|
return nil, fmt.Errorf("failed to publish: %w", err)
|
|
}
|
|
|
|
if err := client.Flush(); err != nil {
|
|
return nil, fmt.Errorf("failed to flush: %w", err)
|
|
}
|
|
|
|
// Wait for responses
|
|
select {
|
|
case <-ctx.Done():
|
|
return responses, ctx.Err()
|
|
case <-time.After(timeout):
|
|
return responses, nil
|
|
}
|
|
}
|