91 lines
1.9 KiB
Go

package server
import (
"context"
"encoding/json"
"log/slog"
"time"
"git.t-juice.club/torjus/natstonotify/bus"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
const DefaultSubject = "home2rjusnet.notifications"
type Server struct {
conn *nats.Conn
bus *bus.NotifyBus
Subject string
}
func New(nc *nats.Conn, b *bus.NotifyBus) (*Server, error) {
return &Server{conn: nc, Subject: DefaultSubject, bus: b}, nil
}
func (s *Server) Close() error {
return s.conn.Drain()
}
func (s *Server) Start() error {
js, err := jetstream.New(s.conn)
if err != nil {
return err
}
timeoutCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := js.CreateOrUpdateStream(timeoutCtx, jetstream.StreamConfig{
Name: "notifications",
Subjects: []string{s.Subject},
MaxBytes: 1024 * 1024 * 5,
})
if err != nil {
return err
}
cons, err := stream.CreateOrUpdateConsumer(timeoutCtx, jetstream.ConsumerConfig{
Name: "notifications-consumer",
Durable: "notifications-consumer",
})
if err != nil {
return err
}
consCtx, err := cons.Consume(func(msg jetstream.Msg) {
slog.Info("Got message")
bn := bus.BusNotification{}
if err := json.Unmarshal(msg.Data(), &bn); err != nil {
slog.Warn("Error unmarshalling message", "error", err)
if err := msg.TermWithReason("failed to decode json"); err != nil {
slog.Error("Error terminating message", "error", err)
}
return
}
id, err := s.bus.Notify(bn)
if err != nil {
slog.Warn("Error sending notification", "error", err)
if err := msg.NakWithDelay(1 * time.Minute); err != nil {
slog.Error("Error naking message", "error", err)
}
return
}
slog.Info("Sent notification", "id", id)
if err := msg.DoubleAck(context.Background()); err != nil {
slog.Error("Error acking message", "error", err)
}
slog.Debug("Acked message")
})
if err != nil {
return err
}
<-consCtx.Closed()
return nil
}