feat: add heartbeat status updates during deployment
Send periodic "running" status messages while nixos-rebuild executes, preventing the idle timeout from triggering before deployments complete. This fixes false "Some deployments failed" warnings in MCP when builds take longer than 30 seconds. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -16,7 +16,7 @@ import (
|
||||
"github.com/urfave/cli/v3"
|
||||
)
|
||||
|
||||
const version = "0.1.8"
|
||||
const version = "0.1.9"
|
||||
|
||||
func main() {
|
||||
app := &cli.Command{
|
||||
@@ -99,6 +99,11 @@ func listenerCommand() *cli.Command {
|
||||
Usage: "Address for Prometheus metrics HTTP server",
|
||||
Value: ":9972",
|
||||
},
|
||||
&cli.IntFlag{
|
||||
Name: "heartbeat-interval",
|
||||
Usage: "Interval in seconds for sending status updates during deployment (0 to disable)",
|
||||
Value: 15,
|
||||
},
|
||||
},
|
||||
Action: func(ctx context.Context, c *cli.Command) error {
|
||||
tier := c.String("tier")
|
||||
@@ -107,18 +112,19 @@ func listenerCommand() *cli.Command {
|
||||
}
|
||||
|
||||
cfg := listener.Config{
|
||||
Hostname: c.String("hostname"),
|
||||
Tier: tier,
|
||||
Role: c.String("role"),
|
||||
NATSUrl: c.String("nats-url"),
|
||||
NKeyFile: c.String("nkey-file"),
|
||||
FlakeURL: c.String("flake-url"),
|
||||
Timeout: time.Duration(c.Int("timeout")) * time.Second,
|
||||
DeploySubjects: c.StringSlice("deploy-subject"),
|
||||
DiscoverSubject: c.String("discover-subject"),
|
||||
MetricsEnabled: c.Bool("metrics-enabled"),
|
||||
MetricsAddr: c.String("metrics-addr"),
|
||||
Version: version,
|
||||
Hostname: c.String("hostname"),
|
||||
Tier: tier,
|
||||
Role: c.String("role"),
|
||||
NATSUrl: c.String("nats-url"),
|
||||
NKeyFile: c.String("nkey-file"),
|
||||
FlakeURL: c.String("flake-url"),
|
||||
Timeout: time.Duration(c.Int("timeout")) * time.Second,
|
||||
HeartbeatInterval: time.Duration(c.Int("heartbeat-interval")) * time.Second,
|
||||
DeploySubjects: c.StringSlice("deploy-subject"),
|
||||
DiscoverSubject: c.String("discover-subject"),
|
||||
MetricsEnabled: c.Bool("metrics-enabled"),
|
||||
MetricsAddr: c.String("metrics-addr"),
|
||||
Version: version,
|
||||
}
|
||||
|
||||
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
|
||||
|
||||
@@ -35,6 +35,15 @@ type Result struct {
|
||||
Error error
|
||||
}
|
||||
|
||||
// ExecuteOptions contains optional settings for Execute.
|
||||
type ExecuteOptions struct {
|
||||
// HeartbeatInterval is how often to call the heartbeat callback.
|
||||
// If zero, no heartbeat is sent.
|
||||
HeartbeatInterval time.Duration
|
||||
// HeartbeatCallback is called periodically with elapsed time while the command runs.
|
||||
HeartbeatCallback func(elapsed time.Duration)
|
||||
}
|
||||
|
||||
// ValidateRevision checks if a revision exists in the remote repository.
|
||||
// It uses git ls-remote to verify the ref exists.
|
||||
func (e *Executor) ValidateRevision(ctx context.Context, revision string) error {
|
||||
@@ -65,6 +74,11 @@ func (e *Executor) ValidateRevision(ctx context.Context, revision string) error
|
||||
|
||||
// Execute runs nixos-rebuild with the specified action and revision.
|
||||
func (e *Executor) Execute(ctx context.Context, action messages.Action, revision string) *Result {
|
||||
return e.ExecuteWithOptions(ctx, action, revision, nil)
|
||||
}
|
||||
|
||||
// ExecuteWithOptions runs nixos-rebuild with the specified action, revision, and options.
|
||||
func (e *Executor) ExecuteWithOptions(ctx context.Context, action messages.Action, revision string, opts *ExecuteOptions) *Result {
|
||||
ctx, cancel := context.WithTimeout(ctx, e.timeout)
|
||||
defer cancel()
|
||||
|
||||
@@ -77,7 +91,41 @@ func (e *Executor) Execute(ctx context.Context, action messages.Action, revision
|
||||
cmd.Stdout = &stdout
|
||||
cmd.Stderr = &stderr
|
||||
|
||||
err := cmd.Run()
|
||||
// Start the command
|
||||
startTime := time.Now()
|
||||
if err := cmd.Start(); err != nil {
|
||||
return &Result{
|
||||
Success: false,
|
||||
ExitCode: -1,
|
||||
Error: fmt.Errorf("failed to start command: %w", err),
|
||||
}
|
||||
}
|
||||
|
||||
// Set up heartbeat if configured
|
||||
var heartbeatDone chan struct{}
|
||||
if opts != nil && opts.HeartbeatInterval > 0 && opts.HeartbeatCallback != nil {
|
||||
heartbeatDone = make(chan struct{})
|
||||
go func() {
|
||||
ticker := time.NewTicker(opts.HeartbeatInterval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-heartbeatDone:
|
||||
return
|
||||
case <-ticker.C:
|
||||
opts.HeartbeatCallback(time.Since(startTime))
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Wait for command to complete
|
||||
err := cmd.Wait()
|
||||
|
||||
// Stop heartbeat goroutine
|
||||
if heartbeatDone != nil {
|
||||
close(heartbeatDone)
|
||||
}
|
||||
|
||||
result := &Result{
|
||||
Stdout: stdout.String(),
|
||||
|
||||
@@ -14,18 +14,19 @@ import (
|
||||
|
||||
// Config holds the configuration for the listener.
|
||||
type Config struct {
|
||||
Hostname string
|
||||
Tier string
|
||||
Role string
|
||||
NATSUrl string
|
||||
NKeyFile string
|
||||
FlakeURL string
|
||||
Timeout time.Duration
|
||||
DeploySubjects []string
|
||||
DiscoverSubject string
|
||||
MetricsEnabled bool
|
||||
MetricsAddr string
|
||||
Version string
|
||||
Hostname string
|
||||
Tier string
|
||||
Role string
|
||||
NATSUrl string
|
||||
NKeyFile string
|
||||
FlakeURL string
|
||||
Timeout time.Duration
|
||||
HeartbeatInterval time.Duration
|
||||
DeploySubjects []string
|
||||
DiscoverSubject string
|
||||
MetricsEnabled bool
|
||||
MetricsAddr string
|
||||
Version string
|
||||
}
|
||||
|
||||
// Listener handles deployment requests from NATS.
|
||||
@@ -232,7 +233,22 @@ func (l *Listener) handleDeployRequest(subject string, data []byte) {
|
||||
"command", l.executor.BuildCommand(req.Action, req.Revision),
|
||||
)
|
||||
|
||||
result := l.executor.Execute(ctx, req.Action, req.Revision)
|
||||
// Set up heartbeat options to send periodic status updates
|
||||
var opts *deploy.ExecuteOptions
|
||||
if l.cfg.HeartbeatInterval > 0 {
|
||||
opts = &deploy.ExecuteOptions{
|
||||
HeartbeatInterval: l.cfg.HeartbeatInterval,
|
||||
HeartbeatCallback: func(elapsed time.Duration) {
|
||||
l.sendResponse(req.ReplyTo, messages.NewDeployResponse(
|
||||
l.cfg.Hostname,
|
||||
messages.StatusRunning,
|
||||
fmt.Sprintf("deployment in progress (%s elapsed)", elapsed.Round(time.Second)),
|
||||
))
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
result := l.executor.ExecuteWithOptions(ctx, req.Action, req.Revision, opts)
|
||||
duration := time.Since(startTime).Seconds()
|
||||
|
||||
if result.Success {
|
||||
|
||||
@@ -35,6 +35,7 @@ const (
|
||||
StatusAccepted Status = "accepted"
|
||||
StatusRejected Status = "rejected"
|
||||
StatusStarted Status = "started"
|
||||
StatusRunning Status = "running"
|
||||
StatusCompleted Status = "completed"
|
||||
StatusFailed Status = "failed"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user