Reject NKey files that are readable by group or others (permissions more permissive than 0600). This prevents accidental exposure of private keys through overly permissive file permissions. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
152 lines
4.0 KiB
Go
152 lines
4.0 KiB
Go
// Package nats provides a NATS client wrapper with NKey authentication.
|
|
package nats
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/nats-io/nkeys"
|
|
)
|
|
|
|
// Config holds the configuration for a NATS connection.
|
|
type Config struct {
|
|
URL string // NATS server URL
|
|
NKeyFile string // Path to NKey seed file
|
|
Name string // Client name for identification
|
|
}
|
|
|
|
// Client wraps a NATS connection with NKey authentication.
|
|
type Client struct {
|
|
conn *nats.Conn
|
|
}
|
|
|
|
// Connect establishes a connection to NATS using NKey authentication.
|
|
func Connect(cfg Config) (*Client, error) {
|
|
// Verify NKey file has secure permissions (no group/other access)
|
|
info, err := os.Stat(cfg.NKeyFile)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to stat nkey file: %w", err)
|
|
}
|
|
if perm := info.Mode().Perm(); perm&0o077 != 0 {
|
|
return nil, fmt.Errorf("nkey file has insecure permissions %04o: must not be accessible by group or others", perm)
|
|
}
|
|
|
|
seed, err := os.ReadFile(cfg.NKeyFile)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read nkey file: %w", err)
|
|
}
|
|
|
|
// Trim any whitespace from the seed
|
|
seedStr := strings.TrimSpace(string(seed))
|
|
|
|
kp, err := nkeys.FromSeed([]byte(seedStr))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to parse nkey seed: %w", err)
|
|
}
|
|
|
|
pubKey, err := kp.PublicKey()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get public key: %w", err)
|
|
}
|
|
|
|
opts := []nats.Option{
|
|
nats.Name(cfg.Name),
|
|
nats.Nkey(pubKey, func(nonce []byte) ([]byte, error) {
|
|
return kp.Sign(nonce)
|
|
}),
|
|
nats.ReconnectWait(2 * time.Second),
|
|
nats.MaxReconnects(-1), // Unlimited reconnects
|
|
nats.ReconnectBufSize(8 * 1024 * 1024),
|
|
}
|
|
|
|
nc, err := nats.Connect(cfg.URL, opts...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to connect to NATS: %w", err)
|
|
}
|
|
|
|
return &Client{conn: nc}, nil
|
|
}
|
|
|
|
// Subscription represents a NATS subscription.
|
|
type Subscription struct {
|
|
sub *nats.Subscription
|
|
}
|
|
|
|
// MessageHandler is a callback for received messages.
|
|
type MessageHandler func(subject string, data []byte)
|
|
|
|
// Subscribe subscribes to a subject and calls the handler for each message.
|
|
func (c *Client) Subscribe(subject string, handler MessageHandler) (*Subscription, error) {
|
|
sub, err := c.conn.Subscribe(subject, func(msg *nats.Msg) {
|
|
handler(msg.Subject, msg.Data)
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to subscribe to %s: %w", subject, err)
|
|
}
|
|
return &Subscription{sub: sub}, nil
|
|
}
|
|
|
|
// QueueSubscribe subscribes to a subject with a queue group.
|
|
func (c *Client) QueueSubscribe(subject, queue string, handler MessageHandler) (*Subscription, error) {
|
|
sub, err := c.conn.QueueSubscribe(subject, queue, func(msg *nats.Msg) {
|
|
handler(msg.Subject, msg.Data)
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to queue subscribe to %s: %w", subject, err)
|
|
}
|
|
return &Subscription{sub: sub}, nil
|
|
}
|
|
|
|
// Unsubscribe removes the subscription.
|
|
func (s *Subscription) Unsubscribe() error {
|
|
if s.sub != nil {
|
|
return s.sub.Unsubscribe()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Publish sends a message to a subject.
|
|
func (c *Client) Publish(subject string, data []byte) error {
|
|
if err := c.conn.Publish(subject, data); err != nil {
|
|
return fmt.Errorf("failed to publish to %s: %w", subject, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Request sends a request and waits for a response.
|
|
func (c *Client) Request(subject string, data []byte, timeout time.Duration) ([]byte, error) {
|
|
msg, err := c.conn.Request(subject, data, timeout)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("request to %s failed: %w", subject, err)
|
|
}
|
|
return msg.Data, nil
|
|
}
|
|
|
|
// Flush flushes the connection, ensuring all published messages have been sent.
|
|
func (c *Client) Flush() error {
|
|
return c.conn.Flush()
|
|
}
|
|
|
|
// Close closes the NATS connection.
|
|
func (c *Client) Close() {
|
|
if c.conn != nil {
|
|
c.conn.Close()
|
|
}
|
|
}
|
|
|
|
// IsConnected returns true if the client is connected.
|
|
func (c *Client) IsConnected() bool {
|
|
return c.conn != nil && c.conn.IsConnected()
|
|
}
|
|
|
|
// Status returns the connection status.
|
|
func (c *Client) Status() nats.Status {
|
|
if c.conn == nil {
|
|
return nats.DISCONNECTED
|
|
}
|
|
return c.conn.Status()
|
|
}
|