// Package nats provides a NATS client wrapper with NKey authentication. package nats import ( "fmt" "os" "strings" "time" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" ) // Config holds the configuration for a NATS connection. type Config struct { URL string // NATS server URL NKeyFile string // Path to NKey seed file Name string // Client name for identification } // Client wraps a NATS connection with NKey authentication. type Client struct { conn *nats.Conn } // Connect establishes a connection to NATS using NKey authentication. func Connect(cfg Config) (*Client, error) { // Verify NKey file has secure permissions (no group/other access) info, err := os.Stat(cfg.NKeyFile) if err != nil { return nil, fmt.Errorf("failed to stat nkey file: %w", err) } if perm := info.Mode().Perm(); perm&0o077 != 0 { return nil, fmt.Errorf("nkey file has insecure permissions %04o: must not be accessible by group or others", perm) } seed, err := os.ReadFile(cfg.NKeyFile) if err != nil { return nil, fmt.Errorf("failed to read nkey file: %w", err) } // Trim any whitespace from the seed seedStr := strings.TrimSpace(string(seed)) kp, err := nkeys.FromSeed([]byte(seedStr)) if err != nil { return nil, fmt.Errorf("failed to parse nkey seed: %w", err) } pubKey, err := kp.PublicKey() if err != nil { return nil, fmt.Errorf("failed to get public key: %w", err) } opts := []nats.Option{ nats.Name(cfg.Name), nats.Nkey(pubKey, func(nonce []byte) ([]byte, error) { return kp.Sign(nonce) }), nats.ReconnectWait(2 * time.Second), nats.MaxReconnects(-1), // Unlimited reconnects nats.ReconnectBufSize(8 * 1024 * 1024), } nc, err := nats.Connect(cfg.URL, opts...) if err != nil { return nil, fmt.Errorf("failed to connect to NATS: %w", err) } return &Client{conn: nc}, nil } // Subscription represents a NATS subscription. type Subscription struct { sub *nats.Subscription } // MessageHandler is a callback for received messages. type MessageHandler func(subject string, data []byte) // Subscribe subscribes to a subject and calls the handler for each message. func (c *Client) Subscribe(subject string, handler MessageHandler) (*Subscription, error) { sub, err := c.conn.Subscribe(subject, func(msg *nats.Msg) { handler(msg.Subject, msg.Data) }) if err != nil { return nil, fmt.Errorf("failed to subscribe to %s: %w", subject, err) } return &Subscription{sub: sub}, nil } // QueueSubscribe subscribes to a subject with a queue group. func (c *Client) QueueSubscribe(subject, queue string, handler MessageHandler) (*Subscription, error) { sub, err := c.conn.QueueSubscribe(subject, queue, func(msg *nats.Msg) { handler(msg.Subject, msg.Data) }) if err != nil { return nil, fmt.Errorf("failed to queue subscribe to %s: %w", subject, err) } return &Subscription{sub: sub}, nil } // Unsubscribe removes the subscription. func (s *Subscription) Unsubscribe() error { if s.sub != nil { return s.sub.Unsubscribe() } return nil } // Publish sends a message to a subject. func (c *Client) Publish(subject string, data []byte) error { if err := c.conn.Publish(subject, data); err != nil { return fmt.Errorf("failed to publish to %s: %w", subject, err) } return nil } // Request sends a request and waits for a response. func (c *Client) Request(subject string, data []byte, timeout time.Duration) ([]byte, error) { msg, err := c.conn.Request(subject, data, timeout) if err != nil { return nil, fmt.Errorf("request to %s failed: %w", subject, err) } return msg.Data, nil } // Flush flushes the connection, ensuring all published messages have been sent. func (c *Client) Flush() error { return c.conn.Flush() } // Close closes the NATS connection. func (c *Client) Close() { if c.conn != nil { c.conn.Close() } } // IsConnected returns true if the client is connected. func (c *Client) IsConnected() bool { return c.conn != nil && c.conn.IsConnected() } // Status returns the connection status. func (c *Client) Status() nats.Status { if c.conn == nil { return nats.DISCONNECTED } return c.conn.Status() }