package server import ( "context" "encoding/json" "log/slog" "time" "git.t-juice.club/torjus/natstonotify/bus" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" ) const DefaultSubject = "home2rjusnet.notifications" type Server struct { conn *nats.Conn bus *bus.NotifyBus Subject string } func New(nc *nats.Conn, b *bus.NotifyBus) (*Server, error) { return &Server{conn: nc, Subject: DefaultSubject, bus: b}, nil } func (s *Server) Close() error { return s.conn.Drain() } func (s *Server) Start() error { js, err := jetstream.New(s.conn) if err != nil { return err } timeoutCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() stream, err := js.CreateOrUpdateStream(timeoutCtx, jetstream.StreamConfig{ Name: "notifications", Subjects: []string{s.Subject}, MaxBytes: 1024 * 1024 * 5, }) if err != nil { return err } cons, err := stream.CreateOrUpdateConsumer(timeoutCtx, jetstream.ConsumerConfig{ Name: "notifications-consumer", Durable: "notifications-consumer", }) if err != nil { return err } consCtx, err := cons.Consume(func(msg jetstream.Msg) { slog.Info("Got message") bn := bus.BusNotification{} if err := json.Unmarshal(msg.Data(), &bn); err != nil { slog.Warn("Error unmarshalling message", "error", err) if err := msg.TermWithReason("failed to decode json"); err != nil { slog.Error("Error terminating message", "error", err) } return } id, err := s.bus.Notify(bn) if err != nil { slog.Warn("Error sending notification", "error", err) if err := msg.NakWithDelay(1 * time.Minute); err != nil { slog.Error("Error naking message", "error", err) } return } slog.Info("Sent notification", "id", id) if err := msg.DoubleAck(context.Background()); err != nil { slog.Error("Error acking message", "error", err) } slog.Debug("Acked message") }) if err != nil { return err } <-consCtx.Closed() return nil }