Initial commit
This commit is contained in:
86
server/server.go
Normal file
86
server/server.go
Normal file
@@ -0,0 +1,86 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"git.t-juice.club/torjus/natstonotify/bus"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
)
|
||||
|
||||
const DefaultSubject = "home2rjusnet.notifications"
|
||||
|
||||
type Server struct {
|
||||
conn *nats.Conn
|
||||
bus *bus.NotifyBus
|
||||
Subject string
|
||||
}
|
||||
|
||||
func New(nc *nats.Conn, b *bus.NotifyBus) (*Server, error) {
|
||||
return &Server{conn: nc, Subject: DefaultSubject, bus: b}, nil
|
||||
}
|
||||
|
||||
func (s *Server) Close() {
|
||||
s.conn.Drain()
|
||||
}
|
||||
|
||||
func (s *Server) Start() error {
|
||||
js, err := jetstream.New(s.conn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
timeoutCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
stream, err := js.CreateOrUpdateStream(timeoutCtx, jetstream.StreamConfig{
|
||||
Name: "notifications",
|
||||
Subjects: []string{s.Subject},
|
||||
MaxBytes: 1024 * 1024 * 5,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cons, err := stream.CreateOrUpdateConsumer(timeoutCtx, jetstream.ConsumerConfig{
|
||||
Name: "notifications-consumer",
|
||||
Durable: "notifications-consumer",
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
consCtx, err := cons.Consume(func(msg jetstream.Msg) {
|
||||
slog.Info("Got message")
|
||||
|
||||
bn := bus.BusNotification{}
|
||||
|
||||
if err := json.Unmarshal(msg.Data(), &bn); err != nil {
|
||||
slog.Warn("Error unmarshalling message", "error", err)
|
||||
msg.TermWithReason("failed to decode json")
|
||||
return
|
||||
}
|
||||
|
||||
id, err := s.bus.Notify(bn)
|
||||
if err != nil {
|
||||
slog.Warn("Error sending notification", "error", err)
|
||||
msg.NakWithDelay(1 * time.Minute)
|
||||
return
|
||||
}
|
||||
slog.Info("Sent notification", "id", id)
|
||||
|
||||
if err := msg.DoubleAck(context.Background()); err != nil {
|
||||
slog.Error("Error acking message", "error", err)
|
||||
}
|
||||
slog.Debug("Acked message")
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
<-consCtx.Closed()
|
||||
|
||||
return nil
|
||||
}
|
Reference in New Issue
Block a user