From 1cd56f15250ff172b65fa96f030985168687971a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torjus=20H=C3=A5kestad?= Date: Sat, 8 Feb 2025 22:25:34 +0100 Subject: [PATCH] Initial commit --- bus/bus.go | 76 ++++++++++++++++++++ flake.lock | 27 +++++++ flake.nix | 67 ++++++++++++++++++ go.mod | 15 ++++ go.sum | 18 +++++ main.go | 178 +++++++++++++++++++++++++++++++++++++++++++++++ server/server.go | 86 +++++++++++++++++++++++ 7 files changed, 467 insertions(+) create mode 100644 bus/bus.go create mode 100644 flake.lock create mode 100644 flake.nix create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go create mode 100644 server/server.go diff --git a/bus/bus.go b/bus/bus.go new file mode 100644 index 0000000..9c98458 --- /dev/null +++ b/bus/bus.go @@ -0,0 +1,76 @@ +package bus + +import ( + "time" + + "github.com/godbus/dbus/v5" +) + +type NotifyBus struct { + conn *dbus.Conn +} + +type BusNotification struct { + ID uint32 `json:"id,omitempty"` + Summary string `json:"summary"` + Body string `json:"body,omitempty"` + Timeout time.Duration `json:"timeout,omitempty"` +} + +type NotifyServerInfo struct { + Name string + Vendor string + Version string + SpecVersion string +} + +func NewNotifyBus() (*NotifyBus, error) { + conn, err := dbus.ConnectSessionBus() + if err != nil { + return nil, err + } + return &NotifyBus{conn: conn}, nil +} + +func (n *NotifyBus) Close() { + n.conn.Close() +} + +func (n *NotifyBus) ServerInfo() (*NotifyServerInfo, error) { + obj := n.conn.Object("org.freedesktop.Notifications", "/org/freedesktop/Notifications") + call := obj.Call( + "org.freedesktop.Notifications.GetServerInformation", // Method + 0, // Flags + ) + if call.Err != nil { + return nil, call.Err + } + + srvInfo := &NotifyServerInfo{} + call.Store(&srvInfo.Name, &srvInfo.Vendor, &srvInfo.Version, &srvInfo.SpecVersion) + return srvInfo, nil +} + +func (n *NotifyBus) Notify(notification BusNotification) (uint32, error) { + obj := n.conn.Object("org.freedesktop.Notifications", "/org/freedesktop/Notifications") + var ret uint32 + call := obj.Call( + "org.freedesktop.Notifications.Notify", // Method + 0, // Flags + "natstonotify", // App name + notification.ID, // Notification ID + "", // Icon + notification.Summary, // Summary + notification.Body, // Body + []string{}, // Actions + map[string]dbus.Variant{}, // Hints + int32(notification.Timeout.Milliseconds()), // Timeout + ) + if call.Err != nil { + return ret, call.Err + } + + call.Store(&ret) + + return ret, nil +} diff --git a/flake.lock b/flake.lock new file mode 100644 index 0000000..6ca2bd0 --- /dev/null +++ b/flake.lock @@ -0,0 +1,27 @@ +{ + "nodes": { + "nixpkgs": { + "locked": { + "lastModified": 1739020877, + "narHash": "sha256-mIvECo/NNdJJ/bXjNqIh8yeoSjVLAuDuTUzAo7dzs8Y=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "a79cfe0ebd24952b580b1cf08cd906354996d547", + "type": "github" + }, + "original": { + "owner": "NixOS", + "ref": "nixos-unstable", + "repo": "nixpkgs", + "type": "github" + } + }, + "root": { + "inputs": { + "nixpkgs": "nixpkgs" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/flake.nix b/flake.nix new file mode 100644 index 0000000..24f1530 --- /dev/null +++ b/flake.nix @@ -0,0 +1,67 @@ +{ + description = "NATS to notification service"; + + inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable"; + + outputs = + { self, nixpkgs }: + let + allSystems = [ + "x86_64-linux" + "aarch64-linux" + "x86_64-darwin" + "aarch64-darwin" + ]; + forAllSystems = + f: + nixpkgs.lib.genAttrs allSystems ( + system: + f { + pkgs = import nixpkgs { inherit system; }; + } + ); + in + { + overlays.default = final: prev: { + natstonotify = self.packages.${prev.system}.default; + }; + + packages = forAllSystems ( + { pkgs }: + { + default = + let + src = pkgs.lib.sourceFilesBySuffices ./. [ + "go.mod" + "go.sum" + ".go" + ]; + version = pkgs.lib.strings.removePrefix "v" ( + builtins.elemAt (pkgs.lib.strings.split "\"" ( + pkgs.lib.lists.findFirst (x: pkgs.lib.strings.hasInfix "Version" x) null ( + pkgs.lib.strings.splitString "\n" (builtins.readFile ./main.go) + ) + )) 2 + ); + in + pkgs.buildGoModule { + version = version; + pname = "natstonotify"; + src = src; + vendorHash = pkgs.lib.fakeHash; + }; + } + ); + devShells = forAllSystems ( + { pkgs }: + { + default = pkgs.mkShell { + packages = with pkgs; [ + go + golangci-lint + ]; + }; + } + ); + }; +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..3787c39 --- /dev/null +++ b/go.mod @@ -0,0 +1,15 @@ +module git.t-juice.club/torjus/natstonotify + +go 1.23.4 + +require ( + github.com/godbus/dbus/v5 v5.1.0 // indirect + github.com/klauspost/compress v1.17.11 // indirect + github.com/nats-io/nats.go v1.39.0 // indirect + github.com/nats-io/nkeys v0.4.9 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + github.com/urfave/cli/v3 v3.0.0-beta1 // indirect + golang.org/x/crypto v0.33.0 // indirect + golang.org/x/sys v0.30.0 // indirect + golang.org/x/text v0.22.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..751852d --- /dev/null +++ b/go.sum @@ -0,0 +1,18 @@ +github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= +github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/nats-io/nats.go v1.39.0 h1:2/yg2JQjiYYKLwDuBzV0FbB2sIV+eFNkEevlRi4n9lI= +github.com/nats-io/nats.go v1.39.0/go.mod h1:MgRb8oOdigA6cYpEPhXJuRVH6UE/V4jblJ2jQ27IXYM= +github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= +github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/urfave/cli/v3 v3.0.0-beta1 h1:6DTaaUarcM0wX7qj5Hcvs+5Dm3dyUTBbEwIWAjcw9Zg= +github.com/urfave/cli/v3 v3.0.0-beta1/go.mod h1:FnIeEMYu+ko8zP1F9Ypr3xkZMIDqW3DR92yUtY39q1Y= +golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= +golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= +golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= +golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= +golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= diff --git a/main.go b/main.go new file mode 100644 index 0000000..254d416 --- /dev/null +++ b/main.go @@ -0,0 +1,178 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + "log/slog" + "os" + "os/signal" + + "git.t-juice.club/torjus/natstonotify/bus" + "git.t-juice.club/torjus/natstonotify/server" + "github.com/nats-io/nats.go" + "github.com/nats-io/nkeys" + "github.com/urfave/cli/v3" +) + +const Version = "v0.1.0" + +func connectNats() (*nats.Conn, error) { + natsURL, ok := os.LookupEnv("NATS_URL") + if !ok { + return nil, fmt.Errorf("NATS_URL not set") + } + nkey, ok := os.LookupEnv("NATS_NKEY") + if !ok { + return nil, fmt.Errorf("NATS_NKEY not set") + } + + kp, err := nkeys.FromSeed([]byte(nkey)) + if err != nil { + return nil, err + } + + pub, err := kp.PublicKey() + if err != nil { + return nil, err + } + + opt := nats.Nkey(pub, kp.Sign) + + nc, err := nats.Connect(natsURL, opt) + if err != nil { + return nil, err + } + return nc, nil +} + +func main() { + // Setup logging + logger := slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{ + Level: slog.LevelDebug, + })) + + slog.SetDefault(logger) + + cmd := &cli.Command{ + EnableShellCompletion: true, + Usage: "NATS-powered notification service", + Commands: []*cli.Command{ + { + Name: "server", + Usage: "Start the server", + Action: func(ctx context.Context, cmd *cli.Command) error { + b, err := bus.NewNotifyBus() + if err != nil { + fmt.Println("Error creating notification bus: ", err) + return cli.Exit(err, 1) + } + + info, err := b.ServerInfo() + if err != nil { + return cli.Exit(err, 1) + } + + logger.Info("Connected to notification bus", "bus_name", info.Name, "bus_version", info.Version) + + defer b.Close() + + nc, err := connectNats() + if err != nil { + logger.Error("Error connecting to NATS", "error", err) + return cli.Exit(err, 1) + } + srv, err := server.New(nc, b) + if err != nil { + return cli.Exit(err, 1) + } + + go func() { + <-ctx.Done() + srv.Close() + }() + + slog.Info("Server started", "version", Version) + if err := srv.Start(); err != nil { + slog.Info("Server exited with error", "error", err) + return cli.Exit(err, 2) + } + + slog.Info("Server stopped") + + return nil + }, + }, + { + Name: "notify", + Aliases: []string{"n"}, + Usage: "Send a notification", + ArgsUsage: "SUMMARY [BODY]", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "local", + Usage: "Send a local notification", + }, + }, + Action: func(ctx context.Context, cmd *cli.Command) error { + if cmd.NArg() < 1 { + return cli.Exit("notify requires exactly one argument", 1) + } + summary := cmd.Args().Get(0) + body := cmd.Args().Get(1) + + bn := bus.BusNotification{ + Summary: summary, + Body: body, + } + + local := cmd.Bool("local") + if local { + b, err := bus.NewNotifyBus() + if err != nil { + fmt.Println("Error creating notification bus: ", err) + return cli.Exit(err, 1) + } + defer b.Close() + + _, err = b.Notify(bn) + if err != nil { + fmt.Println("Sending message: ", err) + return cli.Exit(err, 1) + } + } else { + nc, err := connectNats() + if err != nil { + fmt.Printf("Error connecting to NATS: %s\n", err) + return cli.Exit(err, 1) + } + defer nc.Drain() + + data, err := json.Marshal(bn) + if err != nil { + return cli.Exit(err, 1) + } + + if err := nc.PublishMsg(&nats.Msg{ + Subject: server.DefaultSubject, + Data: data, + }); err != nil { + fmt.Printf("Error sending message: %s\n", err) + } + nc.Flush() + fmt.Printf("Published message to %s: %s", server.DefaultSubject, data) + } + return nil + }, + }, + }, + } + + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) + defer cancel() + + if err := cmd.Run(ctx, os.Args); err != nil { + log.Fatal(err) + } +} diff --git a/server/server.go b/server/server.go new file mode 100644 index 0000000..fbbeaab --- /dev/null +++ b/server/server.go @@ -0,0 +1,86 @@ +package server + +import ( + "context" + "encoding/json" + "log/slog" + "time" + + "git.t-juice.club/torjus/natstonotify/bus" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +const DefaultSubject = "home2rjusnet.notifications" + +type Server struct { + conn *nats.Conn + bus *bus.NotifyBus + Subject string +} + +func New(nc *nats.Conn, b *bus.NotifyBus) (*Server, error) { + return &Server{conn: nc, Subject: DefaultSubject, bus: b}, nil +} + +func (s *Server) Close() { + s.conn.Drain() +} + +func (s *Server) Start() error { + js, err := jetstream.New(s.conn) + if err != nil { + return err + } + + timeoutCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + stream, err := js.CreateOrUpdateStream(timeoutCtx, jetstream.StreamConfig{ + Name: "notifications", + Subjects: []string{s.Subject}, + MaxBytes: 1024 * 1024 * 5, + }) + if err != nil { + return err + } + + cons, err := stream.CreateOrUpdateConsumer(timeoutCtx, jetstream.ConsumerConfig{ + Name: "notifications-consumer", + Durable: "notifications-consumer", + }) + if err != nil { + return err + } + + consCtx, err := cons.Consume(func(msg jetstream.Msg) { + slog.Info("Got message") + + bn := bus.BusNotification{} + + if err := json.Unmarshal(msg.Data(), &bn); err != nil { + slog.Warn("Error unmarshalling message", "error", err) + msg.TermWithReason("failed to decode json") + return + } + + id, err := s.bus.Notify(bn) + if err != nil { + slog.Warn("Error sending notification", "error", err) + msg.NakWithDelay(1 * time.Minute) + return + } + slog.Info("Sent notification", "id", id) + + if err := msg.DoubleAck(context.Background()); err != nil { + slog.Error("Error acking message", "error", err) + } + slog.Debug("Acked message") + }) + if err != nil { + return err + } + + <-consCtx.Closed() + + return nil +}