package cli import ( "context" "fmt" "sync" "time" "github.com/google/uuid" "git.t-juice.club/torjus/homelab-deploy/internal/messages" "git.t-juice.club/torjus/homelab-deploy/internal/nats" ) // DeployConfig holds configuration for a deploy operation. type DeployConfig struct { NATSUrl string NKeyFile string Subject string Action messages.Action Revision string Timeout time.Duration } // DeployResult contains the aggregated results from a deployment. type DeployResult struct { Responses []*messages.DeployResponse Errors []error } // AllSucceeded returns true if all responses indicate success. func (r *DeployResult) AllSucceeded() bool { for _, resp := range r.Responses { if resp.Status != messages.StatusCompleted { return false } } return len(r.Responses) > 0 && len(r.Errors) == 0 } // HostCount returns the number of unique hosts that responded. func (r *DeployResult) HostCount() int { seen := make(map[string]bool) for _, resp := range r.Responses { seen[resp.Hostname] = true } return len(seen) } // Deploy executes a deployment to the specified subject and collects responses. func Deploy(ctx context.Context, cfg DeployConfig, onResponse func(*messages.DeployResponse)) (*DeployResult, error) { // Connect to NATS client, err := nats.Connect(nats.Config{ URL: cfg.NATSUrl, NKeyFile: cfg.NKeyFile, Name: "homelab-deploy-cli", }) if err != nil { return nil, fmt.Errorf("failed to connect to NATS: %w", err) } defer client.Close() // Generate unique reply subject requestID := uuid.New().String() replySubject := fmt.Sprintf("deploy.responses.%s", requestID) // Track responses by hostname to handle multiple messages per host var mu sync.Mutex result := &DeployResult{} hostFinal := make(map[string]bool) // track which hosts have sent final status hostSeen := make(map[string]bool) // track all hosts that have responded lastResponse := time.Now() // Subscribe to reply subject sub, err := client.Subscribe(replySubject, func(subject string, data []byte) { resp, err := messages.UnmarshalDeployResponse(data) if err != nil { mu.Lock() result.Errors = append(result.Errors, fmt.Errorf("failed to unmarshal response: %w", err)) mu.Unlock() return } mu.Lock() result.Responses = append(result.Responses, resp) hostSeen[resp.Hostname] = true if resp.Status.IsFinal() { hostFinal[resp.Hostname] = true } lastResponse = time.Now() mu.Unlock() if onResponse != nil { onResponse(resp) } }) if err != nil { return nil, fmt.Errorf("failed to subscribe to reply subject: %w", err) } defer func() { _ = sub.Unsubscribe() }() // Build and send request req := &messages.DeployRequest{ Action: cfg.Action, Revision: cfg.Revision, ReplyTo: replySubject, } data, err := req.Marshal() if err != nil { return nil, fmt.Errorf("failed to marshal request: %w", err) } if err := client.Publish(cfg.Subject, data); err != nil { return nil, fmt.Errorf("failed to publish request: %w", err) } if err := client.Flush(); err != nil { return nil, fmt.Errorf("failed to flush: %w", err) } // Wait for responses with timeout // Use a dynamic timeout: wait for initial responses, then extend // timeout after each response until no new responses or max timeout deadline := time.Now().Add(cfg.Timeout) idleTimeout := 30 * time.Second // wait this long after last response for new hosts for { select { case <-ctx.Done(): return result, ctx.Err() case <-time.After(1 * time.Second): mu.Lock() seenCount := len(hostSeen) finalCount := len(hostFinal) lastResponseTime := lastResponse mu.Unlock() now := time.Now() // Check if we've exceeded the absolute deadline if now.After(deadline) { return result, nil } // If all hosts that responded have sent final status, we're done // Add a short grace period for late arrivals from other hosts if seenCount > 0 && seenCount == finalCount { // Wait a bit for any other hosts to respond if now.Sub(lastResponseTime) > 2*time.Second { return result, nil } } // If we have responses but waiting for more hosts, use idle timeout if seenCount > 0 && now.Sub(lastResponseTime) > idleTimeout { return result, nil } } } } // Discover sends a discovery request and collects host information. func Discover(ctx context.Context, natsURL, nkeyFile, discoverSubject string, timeout time.Duration) ([]*messages.DiscoveryResponse, error) { client, err := nats.Connect(nats.Config{ URL: natsURL, NKeyFile: nkeyFile, Name: "homelab-deploy-cli-discover", }) if err != nil { return nil, fmt.Errorf("failed to connect to NATS: %w", err) } defer client.Close() requestID := uuid.New().String() replySubject := fmt.Sprintf("deploy.responses.discover-%s", requestID) var mu sync.Mutex var responses []*messages.DiscoveryResponse sub, err := client.Subscribe(replySubject, func(subject string, data []byte) { resp, err := messages.UnmarshalDiscoveryResponse(data) if err != nil { return } mu.Lock() responses = append(responses, resp) mu.Unlock() }) if err != nil { return nil, fmt.Errorf("failed to subscribe: %w", err) } defer func() { _ = sub.Unsubscribe() }() req := &messages.DiscoveryRequest{ReplyTo: replySubject} data, err := req.Marshal() if err != nil { return nil, fmt.Errorf("failed to marshal request: %w", err) } if err := client.Publish(discoverSubject, data); err != nil { return nil, fmt.Errorf("failed to publish: %w", err) } if err := client.Flush(); err != nil { return nil, fmt.Errorf("failed to flush: %w", err) } // Wait for responses select { case <-ctx.Done(): return responses, ctx.Err() case <-time.After(timeout): return responses, nil } }