Gitea to Forgejo host migration — update Go module path and all import references. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
235 lines
6.0 KiB
Go
235 lines
6.0 KiB
Go
package cli
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
|
|
"code.t-juice.club/torjus/homelab-deploy/internal/messages"
|
|
"code.t-juice.club/torjus/homelab-deploy/internal/nats"
|
|
)
|
|
|
|
// DeployConfig holds configuration for a deploy operation.
|
|
type DeployConfig struct {
|
|
NATSUrl string
|
|
NKeyFile string
|
|
Subject string
|
|
Action messages.Action
|
|
Revision string
|
|
Timeout time.Duration
|
|
}
|
|
|
|
// DeployResult contains the aggregated results from a deployment.
|
|
type DeployResult struct {
|
|
Responses []*messages.DeployResponse
|
|
Errors []error
|
|
}
|
|
|
|
// AllSucceeded returns true if all hosts' final responses indicate success.
|
|
func (r *DeployResult) AllSucceeded() bool {
|
|
if len(r.Errors) > 0 {
|
|
return false
|
|
}
|
|
|
|
// Track the final status for each host
|
|
finalStatus := make(map[string]messages.Status)
|
|
for _, resp := range r.Responses {
|
|
if resp.Status.IsFinal() {
|
|
finalStatus[resp.Hostname] = resp.Status
|
|
}
|
|
}
|
|
|
|
// Need at least one host with a final status
|
|
if len(finalStatus) == 0 {
|
|
return false
|
|
}
|
|
|
|
// All final statuses must be completed
|
|
for _, status := range finalStatus {
|
|
if status != messages.StatusCompleted {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// HostCount returns the number of unique hosts that responded.
|
|
func (r *DeployResult) HostCount() int {
|
|
seen := make(map[string]bool)
|
|
for _, resp := range r.Responses {
|
|
seen[resp.Hostname] = true
|
|
}
|
|
return len(seen)
|
|
}
|
|
|
|
// Deploy executes a deployment to the specified subject and collects responses.
|
|
func Deploy(ctx context.Context, cfg DeployConfig, onResponse func(*messages.DeployResponse)) (*DeployResult, error) {
|
|
// Connect to NATS
|
|
client, err := nats.Connect(nats.Config{
|
|
URL: cfg.NATSUrl,
|
|
NKeyFile: cfg.NKeyFile,
|
|
Name: "homelab-deploy-cli",
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to connect to NATS: %w", err)
|
|
}
|
|
defer client.Close()
|
|
|
|
// Generate unique reply subject
|
|
requestID := uuid.New().String()
|
|
replySubject := fmt.Sprintf("deploy.responses.%s", requestID)
|
|
|
|
// Track responses by hostname to handle multiple messages per host
|
|
var mu sync.Mutex
|
|
result := &DeployResult{}
|
|
hostFinal := make(map[string]bool) // track which hosts have sent final status
|
|
hostSeen := make(map[string]bool) // track all hosts that have responded
|
|
lastResponse := time.Now()
|
|
|
|
// Subscribe to reply subject
|
|
sub, err := client.Subscribe(replySubject, func(subject string, data []byte) {
|
|
resp, err := messages.UnmarshalDeployResponse(data)
|
|
if err != nil {
|
|
mu.Lock()
|
|
result.Errors = append(result.Errors, fmt.Errorf("failed to unmarshal response: %w", err))
|
|
mu.Unlock()
|
|
return
|
|
}
|
|
|
|
mu.Lock()
|
|
result.Responses = append(result.Responses, resp)
|
|
hostSeen[resp.Hostname] = true
|
|
if resp.Status.IsFinal() {
|
|
hostFinal[resp.Hostname] = true
|
|
}
|
|
lastResponse = time.Now()
|
|
mu.Unlock()
|
|
|
|
if onResponse != nil {
|
|
onResponse(resp)
|
|
}
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to subscribe to reply subject: %w", err)
|
|
}
|
|
defer func() { _ = sub.Unsubscribe() }()
|
|
|
|
// Build and send request
|
|
req := &messages.DeployRequest{
|
|
Action: cfg.Action,
|
|
Revision: cfg.Revision,
|
|
ReplyTo: replySubject,
|
|
}
|
|
|
|
data, err := req.Marshal()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to marshal request: %w", err)
|
|
}
|
|
|
|
if err := client.Publish(cfg.Subject, data); err != nil {
|
|
return nil, fmt.Errorf("failed to publish request: %w", err)
|
|
}
|
|
|
|
if err := client.Flush(); err != nil {
|
|
return nil, fmt.Errorf("failed to flush: %w", err)
|
|
}
|
|
|
|
// Wait for responses with timeout
|
|
// Use a dynamic timeout: wait for initial responses, then extend
|
|
// timeout after each response until no new responses or max timeout
|
|
deadline := time.Now().Add(cfg.Timeout)
|
|
idleTimeout := 30 * time.Second // wait this long after last response for new hosts
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return result, ctx.Err()
|
|
case <-time.After(1 * time.Second):
|
|
mu.Lock()
|
|
seenCount := len(hostSeen)
|
|
finalCount := len(hostFinal)
|
|
lastResponseTime := lastResponse
|
|
mu.Unlock()
|
|
|
|
now := time.Now()
|
|
|
|
// Check if we've exceeded the absolute deadline
|
|
if now.After(deadline) {
|
|
return result, nil
|
|
}
|
|
|
|
// If all hosts that responded have sent final status, we're done
|
|
// Add a short grace period for late arrivals from other hosts
|
|
if seenCount > 0 && seenCount == finalCount {
|
|
// Wait a bit for any other hosts to respond
|
|
if now.Sub(lastResponseTime) > 2*time.Second {
|
|
return result, nil
|
|
}
|
|
}
|
|
|
|
// If we have responses but waiting for more hosts, use idle timeout
|
|
if seenCount > 0 && now.Sub(lastResponseTime) > idleTimeout {
|
|
return result, nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Discover sends a discovery request and collects host information.
|
|
func Discover(ctx context.Context, natsURL, nkeyFile, discoverSubject string, timeout time.Duration) ([]*messages.DiscoveryResponse, error) {
|
|
client, err := nats.Connect(nats.Config{
|
|
URL: natsURL,
|
|
NKeyFile: nkeyFile,
|
|
Name: "homelab-deploy-cli-discover",
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to connect to NATS: %w", err)
|
|
}
|
|
defer client.Close()
|
|
|
|
requestID := uuid.New().String()
|
|
replySubject := fmt.Sprintf("deploy.responses.discover-%s", requestID)
|
|
|
|
var mu sync.Mutex
|
|
var responses []*messages.DiscoveryResponse
|
|
|
|
sub, err := client.Subscribe(replySubject, func(subject string, data []byte) {
|
|
resp, err := messages.UnmarshalDiscoveryResponse(data)
|
|
if err != nil {
|
|
return
|
|
}
|
|
mu.Lock()
|
|
responses = append(responses, resp)
|
|
mu.Unlock()
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to subscribe: %w", err)
|
|
}
|
|
defer func() { _ = sub.Unsubscribe() }()
|
|
|
|
req := &messages.DiscoveryRequest{ReplyTo: replySubject}
|
|
data, err := req.Marshal()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to marshal request: %w", err)
|
|
}
|
|
|
|
if err := client.Publish(discoverSubject, data); err != nil {
|
|
return nil, fmt.Errorf("failed to publish: %w", err)
|
|
}
|
|
|
|
if err := client.Flush(); err != nil {
|
|
return nil, fmt.Errorf("failed to flush: %w", err)
|
|
}
|
|
|
|
// Wait for responses
|
|
select {
|
|
case <-ctx.Done():
|
|
return responses, ctx.Err()
|
|
case <-time.After(timeout):
|
|
return responses, nil
|
|
}
|
|
}
|