package cli import ( "context" "fmt" "sync" "time" "github.com/google/uuid" "git.t-juice.club/torjus/homelab-deploy/internal/messages" "git.t-juice.club/torjus/homelab-deploy/internal/nats" ) // DeployConfig holds configuration for a deploy operation. type DeployConfig struct { NATSUrl string NKeyFile string Subject string Action messages.Action Revision string Timeout time.Duration } // DeployResult contains the aggregated results from a deployment. type DeployResult struct { Responses []*messages.DeployResponse Errors []error } // AllSucceeded returns true if all responses indicate success. func (r *DeployResult) AllSucceeded() bool { for _, resp := range r.Responses { if resp.Status != messages.StatusCompleted { return false } } return len(r.Responses) > 0 && len(r.Errors) == 0 } // HostCount returns the number of unique hosts that responded. func (r *DeployResult) HostCount() int { seen := make(map[string]bool) for _, resp := range r.Responses { seen[resp.Hostname] = true } return len(seen) } // Deploy executes a deployment to the specified subject and collects responses. func Deploy(ctx context.Context, cfg DeployConfig, onResponse func(*messages.DeployResponse)) (*DeployResult, error) { // Connect to NATS client, err := nats.Connect(nats.Config{ URL: cfg.NATSUrl, NKeyFile: cfg.NKeyFile, Name: "homelab-deploy-cli", }) if err != nil { return nil, fmt.Errorf("failed to connect to NATS: %w", err) } defer client.Close() // Generate unique reply subject requestID := uuid.New().String() replySubject := fmt.Sprintf("deploy.responses.%s", requestID) // Track responses by hostname to handle multiple messages per host var mu sync.Mutex result := &DeployResult{} hostFinal := make(map[string]bool) // track which hosts have sent final status // Subscribe to reply subject sub, err := client.Subscribe(replySubject, func(subject string, data []byte) { resp, err := messages.UnmarshalDeployResponse(data) if err != nil { mu.Lock() result.Errors = append(result.Errors, fmt.Errorf("failed to unmarshal response: %w", err)) mu.Unlock() return } mu.Lock() result.Responses = append(result.Responses, resp) if resp.Status.IsFinal() { hostFinal[resp.Hostname] = true } mu.Unlock() if onResponse != nil { onResponse(resp) } }) if err != nil { return nil, fmt.Errorf("failed to subscribe to reply subject: %w", err) } defer func() { _ = sub.Unsubscribe() }() // Build and send request req := &messages.DeployRequest{ Action: cfg.Action, Revision: cfg.Revision, ReplyTo: replySubject, } data, err := req.Marshal() if err != nil { return nil, fmt.Errorf("failed to marshal request: %w", err) } if err := client.Publish(cfg.Subject, data); err != nil { return nil, fmt.Errorf("failed to publish request: %w", err) } if err := client.Flush(); err != nil { return nil, fmt.Errorf("failed to flush: %w", err) } // Wait for responses with timeout // Use a dynamic timeout: wait for initial responses, then extend // timeout after each response until no new responses or max timeout deadline := time.Now().Add(cfg.Timeout) lastResponse := time.Now() idleTimeout := 30 * time.Second // wait this long after last response for { select { case <-ctx.Done(): return result, ctx.Err() case <-time.After(1 * time.Second): mu.Lock() responseCount := len(result.Responses) mu.Unlock() now := time.Now() // Check if we've exceeded the absolute deadline if now.After(deadline) { return result, nil } // If we have responses, use idle timeout if responseCount > 0 { mu.Lock() lastResponseTime := lastResponse // Update lastResponse time if we got new responses if responseCount > 0 { // Simple approximation - in practice you'd track this more precisely lastResponseTime = now } mu.Unlock() if now.Sub(lastResponseTime) > idleTimeout { return result, nil } } } } } // Discover sends a discovery request and collects host information. func Discover(ctx context.Context, natsURL, nkeyFile, discoverSubject string, timeout time.Duration) ([]*messages.DiscoveryResponse, error) { client, err := nats.Connect(nats.Config{ URL: natsURL, NKeyFile: nkeyFile, Name: "homelab-deploy-cli-discover", }) if err != nil { return nil, fmt.Errorf("failed to connect to NATS: %w", err) } defer client.Close() requestID := uuid.New().String() replySubject := fmt.Sprintf("deploy.responses.discover-%s", requestID) var mu sync.Mutex var responses []*messages.DiscoveryResponse sub, err := client.Subscribe(replySubject, func(subject string, data []byte) { resp, err := messages.UnmarshalDiscoveryResponse(data) if err != nil { return } mu.Lock() responses = append(responses, resp) mu.Unlock() }) if err != nil { return nil, fmt.Errorf("failed to subscribe: %w", err) } defer func() { _ = sub.Unsubscribe() }() req := &messages.DiscoveryRequest{ReplyTo: replySubject} data, err := req.Marshal() if err != nil { return nil, fmt.Errorf("failed to marshal request: %w", err) } if err := client.Publish(discoverSubject, data); err != nil { return nil, fmt.Errorf("failed to publish: %w", err) } if err := client.Flush(); err != nil { return nil, fmt.Errorf("failed to flush: %w", err) } // Wait for responses select { case <-ctx.Done(): return responses, ctx.Err() case <-time.After(timeout): return responses, nil } }