From e1e96e1e19621678f22e8dbef717f5e3bbca9c7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torjus=20H=C3=A5kestad?= Date: Tue, 5 Dec 2023 00:45:43 +0100 Subject: [PATCH 1/4] Add minimal config --- go.mod | 1 + go.sum | 2 ++ ministream.toml | 9 ++++++ server/config.go | 73 +++++++++++++++++++++++++++++++++++++++++++ server/config_test.go | 37 ++++++++++++++++++++++ 5 files changed, 122 insertions(+) create mode 100644 ministream.toml create mode 100644 server/config.go create mode 100644 server/config_test.go diff --git a/go.mod b/go.mod index b937ea7..bcdfbbd 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/google/uuid v1.4.0 // indirect + github.com/pelletier/go-toml/v2 v2.1.0 // indirect github.com/pion/datachannel v1.5.5 // indirect github.com/pion/dtls/v2 v2.2.8 // indirect github.com/pion/ice/v3 v3.0.2 // indirect diff --git a/go.sum b/go.sum index a6b1dff..68c8fd7 100644 --- a/go.sum +++ b/go.sum @@ -37,6 +37,8 @@ github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042 github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= +github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= +github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pion/datachannel v1.5.5 h1:10ef4kwdjije+M9d7Xm9im2Y3O6A6ccQb0zcqZcJew8= github.com/pion/datachannel v1.5.5/go.mod h1:iMz+lECmfdCMqFRhXhcA/219B0SQlbpoR2V118yimL0= github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= diff --git a/ministream.toml b/ministream.toml new file mode 100644 index 0000000..57d11b6 --- /dev/null +++ b/ministream.toml @@ -0,0 +1,9 @@ +# SiteName is used for displaying title of site on frontend. +# Default: "ministream" +# Env: MINISTREAM_SITENAME +SiteName = "stream.example.org" + +# HTTPListenAddr is which port the HTTP server will listen on. +# Default: ":8080" +# Env: MINISTREAM_HTTPLISTENADDR +HTTPListenAddr = ":8080" diff --git a/server/config.go b/server/config.go new file mode 100644 index 0000000..a52c6bc --- /dev/null +++ b/server/config.go @@ -0,0 +1,73 @@ +package server + +import ( + "io" + "os" + + "github.com/pelletier/go-toml/v2" +) + +type Config struct { + SiteName string `json:"siteName" toml:"siteName"` + HTTPListenAddr string `json:"httpListenAddr" toml:"HTTPListenAddr"` +} + +func DefaultConfig() *Config { + return &Config{ + SiteName: "ministream", + HTTPListenAddr: ":8080", + } +} + +func (c *Config) OverrideFromEnv() { + if siteName, ok := os.LookupEnv("MINISTREAM_SITENAME"); ok { + c.SiteName = siteName + } + + if httpAddr, ok := os.LookupEnv("MINISTREAM_HTTPLISTENADDR"); ok { + c.HTTPListenAddr = httpAddr + } +} + +func ConfigFromReader(r io.Reader) (*Config, error) { + var c Config + + err := toml.NewDecoder(r).Decode(&c) + if err != nil { + return nil, err + } + + return &c, nil +} + +func ConfigFromFile(path string) (*Config, error) { + f, err := os.Open(path) + if err != nil { + return nil, err + } + defer f.Close() + + return ConfigFromReader(f) +} + +func ConfigFromDefault() (*Config, error) { + var config *Config + defaultPaths := []string{ + "ministream.toml", + } + + for _, p := range defaultPaths { + c, err := ConfigFromFile(p) + if err != nil { + continue + } + config = c + break + } + + if config == nil { + config = DefaultConfig() + } + + return config, nil +} diff --git a/server/config_test.go b/server/config_test.go new file mode 100644 index 0000000..5e839ce --- /dev/null +++ b/server/config_test.go @@ -0,0 +1,37 @@ +package server_test + +import ( + "strings" + "testing" + + "git.t-juice.club/torjus/ministream/server" +) + +func TestConfig(t *testing.T) { + t.Run("FromReader", func(t *testing.T) { + configString := `SiteName = "ministream.example.org"` + expectedSiteName := "ministream.example.org" + r := strings.NewReader(configString) + + c, err := server.ConfigFromReader(r) + if err != nil { + t.Fatalf("Error reading config: %s", err) + } + + if c.SiteName != expectedSiteName { + t.Errorf("SiteName incorrect. Got %s want %s", c.SiteName, expectedSiteName) + } + }) + t.Run("OverrideFromEnv", func(t *testing.T) { + c := server.DefaultConfig() + expectedSiteName := "ms.example.org" + + t.Setenv("MINISTREAM_SITENAME", expectedSiteName) + + c.OverrideFromEnv() + + if c.SiteName != expectedSiteName { + t.Errorf("SiteName incorrect. Got %s want %s", c.SiteName, expectedSiteName) + } + }) +} -- 2.45.2 From 8cb0ede5701088c74edf4720121451ccc8103067 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torjus=20H=C3=A5kestad?= Date: Tue, 5 Dec 2023 00:46:04 +0100 Subject: [PATCH 2/4] Use config in server --- main.go | 49 +++++++++++++++++++++++++++++++++++++++++------- server/server.go | 18 +++++++++++++++--- 2 files changed, 57 insertions(+), 10 deletions(-) diff --git a/main.go b/main.go index 6eaa582..b6a5f2e 100644 --- a/main.go +++ b/main.go @@ -2,7 +2,9 @@ package main import ( "context" + "errors" "fmt" + "net/http" "os" "git.t-juice.club/torjus/ministream/server" @@ -11,20 +13,29 @@ import ( const Version = "v0.1.1" +type ctxKey string + +const ( + ctxKeyConfig ctxKey = "config" +) + func main() { app := cli.App{ Name: "ministream", Usage: "Small livestreaming platform.", Commands: []*cli.Command{ { - Name: "serve", - Usage: "Start livestreaming server.", - Action: func(ctx *cli.Context) error { - store := server.NewUserStore() + Name: "serve", + Usage: "Start livestreaming server.", + Before: loadConfig, + Action: func(c *cli.Context) error { + cfg := configFromCtx(c.Context) - srv := server.NewServer(store) - srv.Addr = ":8080" - srv.ListenAndServe() + srv := server.NewServer(cfg) + srv.Addr = cfg.HTTPListenAddr + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + return err + } return nil }, }, @@ -37,3 +48,27 @@ func main() { fmt.Println(err) } } + +func loadConfig(c *cli.Context) error { + cfg, err := server.ConfigFromDefault() + if err != nil { + return err + } + + c.Context = context.WithValue(c.Context, ctxKeyConfig, cfg) + return nil +} + +func configFromCtx(ctx context.Context) *server.Config { + value := ctx.Value(ctxKeyConfig) + if value == nil { + panic("unable to load config") + } + + config, ok := value.(*server.Config) + if !ok { + panic("config type assertion failed") + } + + return config +} diff --git a/server/server.go b/server/server.go index bb8e588..f1608fe 100644 --- a/server/server.go +++ b/server/server.go @@ -19,15 +19,15 @@ import ( var static embed.FS type Server struct { - users *UserStore streams *StreamStore + config *Config http.Server } -func NewServer(store *UserStore) *Server { +func NewServer(config *Config) *Server { srv := &Server{ - users: store, streams: NewStreamStore(), + config: config, } r := chi.NewRouter() @@ -41,6 +41,7 @@ func NewServer(store *UserStore) *Server { r.Patch("/whip/{streamKey}", srv.PatchHandler) r.Post("/whip/{streamKey}", srv.PostOfferHandler) r.Get("/stats", srv.streams.StatsHandler) + r.Get("/api/siteinfo", srv.InfoHandler) srv.Handler = r @@ -55,6 +56,17 @@ func corsMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(fn) } +func (s *Server) InfoHandler(w http.ResponseWriter, r *http.Request) { + var infoResponse struct { + SiteName string `json:"siteName"` + } + infoResponse.SiteName = s.config.SiteName + + if err := json.NewEncoder(w).Encode(&infoResponse); err != nil { + slog.Warn("Error writing info response") + } +} + func (s *Server) OptionsHandler(w http.ResponseWriter, r *http.Request) { slog.Info("Got OPTIONS") } -- 2.45.2 From fd18ba6527bf29e534ae12f7c88bcfbd69d2841c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torjus=20H=C3=A5kestad?= Date: Tue, 5 Dec 2023 01:15:09 +0100 Subject: [PATCH 3/4] Change http config to sub element --- main.go | 2 +- ministream.toml | 3 ++- server/config.go | 18 ++++++++++++------ 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/main.go b/main.go index b6a5f2e..04fd1f9 100644 --- a/main.go +++ b/main.go @@ -32,7 +32,7 @@ func main() { cfg := configFromCtx(c.Context) srv := server.NewServer(cfg) - srv.Addr = cfg.HTTPListenAddr + srv.Addr = cfg.HTTP.ListenAddr if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { return err } diff --git a/ministream.toml b/ministream.toml index 57d11b6..80b7322 100644 --- a/ministream.toml +++ b/ministream.toml @@ -3,7 +3,8 @@ # Env: MINISTREAM_SITENAME SiteName = "stream.example.org" +[HTTP] # HTTPListenAddr is which port the HTTP server will listen on. # Default: ":8080" -# Env: MINISTREAM_HTTPLISTENADDR +# Env: MINISTREAM_HTTP_LISTENADDR HTTPListenAddr = ":8080" diff --git a/server/config.go b/server/config.go index a52c6bc..a10c7c6 100644 --- a/server/config.go +++ b/server/config.go @@ -8,14 +8,20 @@ import ( ) type Config struct { - SiteName string `json:"siteName" toml:"siteName"` - HTTPListenAddr string `json:"httpListenAddr" toml:"HTTPListenAddr"` + SiteName string `toml:"siteName"` + HTTP ConfigHTTP `toml:"http"` +} + +type ConfigHTTP struct { + ListenAddr string `json:"ListenAddr" toml:"ListenAddr"` } func DefaultConfig() *Config { return &Config{ - SiteName: "ministream", - HTTPListenAddr: ":8080", + SiteName: "ministream", + HTTP: ConfigHTTP{ + ListenAddr: ":8080", + }, } } @@ -24,8 +30,8 @@ func (c *Config) OverrideFromEnv() { c.SiteName = siteName } - if httpAddr, ok := os.LookupEnv("MINISTREAM_HTTPLISTENADDR"); ok { - c.HTTPListenAddr = httpAddr + if httpAddr, ok := os.LookupEnv("MINISTREAM_HTTP_LISTENADDR"); ok { + c.HTTP.ListenAddr = httpAddr } } -- 2.45.2 From 4b171b849f50b333956ebd48b5b3afd6359c3c45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torjus=20H=C3=A5kestad?= Date: Tue, 5 Dec 2023 01:25:17 +0100 Subject: [PATCH 4/4] Make ephemeral udp ports configurable --- main.go | 2 ++ ministream.toml | 13 ++++++++++++- server/config.go | 30 ++++++++++++++++++++++++++++-- server/server.go | 2 +- server/stream.go | 16 ++++++++++++---- 5 files changed, 55 insertions(+), 8 deletions(-) diff --git a/main.go b/main.go index 04fd1f9..654bb34 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "log/slog" "net/http" "os" @@ -33,6 +34,7 @@ func main() { srv := server.NewServer(cfg) srv.Addr = cfg.HTTP.ListenAddr + slog.Info("Starting HTTP-server", "addr", srv.Addr, "cfg", cfg) if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { return err } diff --git a/ministream.toml b/ministream.toml index 80b7322..798956c 100644 --- a/ministream.toml +++ b/ministream.toml @@ -7,4 +7,15 @@ SiteName = "stream.example.org" # HTTPListenAddr is which port the HTTP server will listen on. # Default: ":8080" # Env: MINISTREAM_HTTP_LISTENADDR -HTTPListenAddr = ":8080" +ListenAddr = ":8080" + +[WebRTC] +# UDPMin is the minimum used for ephemeral ports. +# Default: 50000 +# Env: MINISTREAM_WEBRTC_UDPMIN +UDPMin = 50000 + +# UDPMax is the maximum used for ephemeral ports. +# Default: 50050 +# Env: MINISTREAM_WEBRTC_UDPMAX +UDPMAX = 50050 diff --git a/server/config.go b/server/config.go index a10c7c6..017f4c9 100644 --- a/server/config.go +++ b/server/config.go @@ -3,25 +3,36 @@ package server import ( "io" "os" + "strconv" "github.com/pelletier/go-toml/v2" ) type Config struct { - SiteName string `toml:"siteName"` - HTTP ConfigHTTP `toml:"http"` + SiteName string `toml:"siteName"` + HTTP ConfigHTTP `toml:"http"` + WebRTC ConfigWebRTC `toml:"WebRTC"` } type ConfigHTTP struct { ListenAddr string `json:"ListenAddr" toml:"ListenAddr"` } +type ConfigWebRTC struct { + UDPMin int `toml:"UDPMin"` + UDPMax int `toml:"UDPMax"` +} + func DefaultConfig() *Config { return &Config{ SiteName: "ministream", HTTP: ConfigHTTP{ ListenAddr: ":8080", }, + WebRTC: ConfigWebRTC{ + UDPMin: 50000, + UDPMax: 50050, + }, } } @@ -33,6 +44,21 @@ func (c *Config) OverrideFromEnv() { if httpAddr, ok := os.LookupEnv("MINISTREAM_HTTP_LISTENADDR"); ok { c.HTTP.ListenAddr = httpAddr } + + if value, ok := os.LookupEnv("MINISTREAM_WEBRTC_UDPMIN"); ok { + min, err := strconv.Atoi(value) + if err != nil { + panic("MINISTREAM_WEBRTC_UDPMIN is invalid") + } + c.WebRTC.UDPMin = min + } + if value, ok := os.LookupEnv("MINISTREAM_WEBRTC_UDPMAX"); ok { + max, err := strconv.Atoi(value) + if err != nil { + panic("MINISTREAM_WEBRTC_UDPMAX is invalid") + } + c.WebRTC.UDPMin = max + } } func ConfigFromReader(r io.Reader) (*Config, error) { diff --git a/server/server.go b/server/server.go index f1608fe..653af4e 100644 --- a/server/server.go +++ b/server/server.go @@ -26,7 +26,7 @@ type Server struct { func NewServer(config *Config) *Server { srv := &Server{ - streams: NewStreamStore(), + streams: NewStreamStore(config), config: config, } diff --git a/server/stream.go b/server/stream.go index 5a2957a..6337791 100644 --- a/server/stream.go +++ b/server/stream.go @@ -19,20 +19,24 @@ import ( var ErrNoSuchStream error = fmt.Errorf("no such stream") type StreamStore struct { - Streams map[string]*Stream + Streams map[string]*Stream + + config *Config webRTCConfig webrtc.Configuration mu sync.Mutex } -func NewStreamStore() *StreamStore { +func NewStreamStore(config *Config) *StreamStore { s := &StreamStore{ Streams: make(map[string]*Stream), + config: config, } return s } type Stream struct { + store *StreamStore peerConnection *webrtc.PeerConnection peerConnectionStats map[string]*stats.Stats peerConnectionStatsMu sync.Mutex @@ -96,7 +100,10 @@ func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDesc //i.Add(statsInterceptorFactory) se := webrtc.SettingEngine{} - _ = se.SetEphemeralUDPPortRange(50000, 50050) + _ = se.SetEphemeralUDPPortRange( + uint16(s.store.config.WebRTC.UDPMin), + uint16(s.store.config.WebRTC.UDPMax), + ) webRTCConfig := webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ @@ -179,6 +186,7 @@ func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*web go func() { stream := &Stream{ + store: s, lastUpdate: time.Now(), peerConnectionStats: make(map[string]*stats.Stats), } @@ -226,7 +234,7 @@ func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*web }, } se := webrtc.SettingEngine{} - _ = se.SetEphemeralUDPPortRange(50000, 50050) + _ = se.SetEphemeralUDPPortRange(uint16(s.config.WebRTC.UDPMin), uint16(s.config.WebRTC.UDPMax)) // se.BufferFactory = func(packetType packetio.BufferPacketType, ssrc uint32) io.ReadWriteCloser { // buf := packetio.NewBuffer() -- 2.45.2