diff --git a/go.mod b/go.mod index b937ea7..bcdfbbd 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/google/uuid v1.4.0 // indirect + github.com/pelletier/go-toml/v2 v2.1.0 // indirect github.com/pion/datachannel v1.5.5 // indirect github.com/pion/dtls/v2 v2.2.8 // indirect github.com/pion/ice/v3 v3.0.2 // indirect diff --git a/go.sum b/go.sum index a6b1dff..68c8fd7 100644 --- a/go.sum +++ b/go.sum @@ -37,6 +37,8 @@ github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042 github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= +github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= +github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pion/datachannel v1.5.5 h1:10ef4kwdjije+M9d7Xm9im2Y3O6A6ccQb0zcqZcJew8= github.com/pion/datachannel v1.5.5/go.mod h1:iMz+lECmfdCMqFRhXhcA/219B0SQlbpoR2V118yimL0= github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= diff --git a/main.go b/main.go index 6eaa582..654bb34 100644 --- a/main.go +++ b/main.go @@ -2,7 +2,10 @@ package main import ( "context" + "errors" "fmt" + "log/slog" + "net/http" "os" "git.t-juice.club/torjus/ministream/server" @@ -11,20 +14,30 @@ import ( const Version = "v0.1.1" +type ctxKey string + +const ( + ctxKeyConfig ctxKey = "config" +) + func main() { app := cli.App{ Name: "ministream", Usage: "Small livestreaming platform.", Commands: []*cli.Command{ { - Name: "serve", - Usage: "Start livestreaming server.", - Action: func(ctx *cli.Context) error { - store := server.NewUserStore() + Name: "serve", + Usage: "Start livestreaming server.", + Before: loadConfig, + Action: func(c *cli.Context) error { + cfg := configFromCtx(c.Context) - srv := server.NewServer(store) - srv.Addr = ":8080" - srv.ListenAndServe() + srv := server.NewServer(cfg) + srv.Addr = cfg.HTTP.ListenAddr + slog.Info("Starting HTTP-server", "addr", srv.Addr, "cfg", cfg) + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + return err + } return nil }, }, @@ -37,3 +50,27 @@ func main() { fmt.Println(err) } } + +func loadConfig(c *cli.Context) error { + cfg, err := server.ConfigFromDefault() + if err != nil { + return err + } + + c.Context = context.WithValue(c.Context, ctxKeyConfig, cfg) + return nil +} + +func configFromCtx(ctx context.Context) *server.Config { + value := ctx.Value(ctxKeyConfig) + if value == nil { + panic("unable to load config") + } + + config, ok := value.(*server.Config) + if !ok { + panic("config type assertion failed") + } + + return config +} diff --git a/ministream.toml b/ministream.toml new file mode 100644 index 0000000..798956c --- /dev/null +++ b/ministream.toml @@ -0,0 +1,21 @@ +# SiteName is used for displaying title of site on frontend. +# Default: "ministream" +# Env: MINISTREAM_SITENAME +SiteName = "stream.example.org" + +[HTTP] +# HTTPListenAddr is which port the HTTP server will listen on. +# Default: ":8080" +# Env: MINISTREAM_HTTP_LISTENADDR +ListenAddr = ":8080" + +[WebRTC] +# UDPMin is the minimum used for ephemeral ports. +# Default: 50000 +# Env: MINISTREAM_WEBRTC_UDPMIN +UDPMin = 50000 + +# UDPMax is the maximum used for ephemeral ports. +# Default: 50050 +# Env: MINISTREAM_WEBRTC_UDPMAX +UDPMAX = 50050 diff --git a/server/config.go b/server/config.go new file mode 100644 index 0000000..017f4c9 --- /dev/null +++ b/server/config.go @@ -0,0 +1,105 @@ +package server + +import ( + "io" + "os" + "strconv" + + "github.com/pelletier/go-toml/v2" +) + +type Config struct { + SiteName string `toml:"siteName"` + HTTP ConfigHTTP `toml:"http"` + WebRTC ConfigWebRTC `toml:"WebRTC"` +} + +type ConfigHTTP struct { + ListenAddr string `json:"ListenAddr" toml:"ListenAddr"` +} + +type ConfigWebRTC struct { + UDPMin int `toml:"UDPMin"` + UDPMax int `toml:"UDPMax"` +} + +func DefaultConfig() *Config { + return &Config{ + SiteName: "ministream", + HTTP: ConfigHTTP{ + ListenAddr: ":8080", + }, + WebRTC: ConfigWebRTC{ + UDPMin: 50000, + UDPMax: 50050, + }, + } +} + +func (c *Config) OverrideFromEnv() { + if siteName, ok := os.LookupEnv("MINISTREAM_SITENAME"); ok { + c.SiteName = siteName + } + + if httpAddr, ok := os.LookupEnv("MINISTREAM_HTTP_LISTENADDR"); ok { + c.HTTP.ListenAddr = httpAddr + } + + if value, ok := os.LookupEnv("MINISTREAM_WEBRTC_UDPMIN"); ok { + min, err := strconv.Atoi(value) + if err != nil { + panic("MINISTREAM_WEBRTC_UDPMIN is invalid") + } + c.WebRTC.UDPMin = min + } + if value, ok := os.LookupEnv("MINISTREAM_WEBRTC_UDPMAX"); ok { + max, err := strconv.Atoi(value) + if err != nil { + panic("MINISTREAM_WEBRTC_UDPMAX is invalid") + } + c.WebRTC.UDPMin = max + } +} + +func ConfigFromReader(r io.Reader) (*Config, error) { + var c Config + + err := toml.NewDecoder(r).Decode(&c) + if err != nil { + return nil, err + } + + return &c, nil +} + +func ConfigFromFile(path string) (*Config, error) { + f, err := os.Open(path) + if err != nil { + return nil, err + } + defer f.Close() + + return ConfigFromReader(f) +} + +func ConfigFromDefault() (*Config, error) { + var config *Config + defaultPaths := []string{ + "ministream.toml", + } + + for _, p := range defaultPaths { + c, err := ConfigFromFile(p) + if err != nil { + continue + } + config = c + break + } + + if config == nil { + config = DefaultConfig() + } + + return config, nil +} diff --git a/server/config_test.go b/server/config_test.go new file mode 100644 index 0000000..5e839ce --- /dev/null +++ b/server/config_test.go @@ -0,0 +1,37 @@ +package server_test + +import ( + "strings" + "testing" + + "git.t-juice.club/torjus/ministream/server" +) + +func TestConfig(t *testing.T) { + t.Run("FromReader", func(t *testing.T) { + configString := `SiteName = "ministream.example.org"` + expectedSiteName := "ministream.example.org" + r := strings.NewReader(configString) + + c, err := server.ConfigFromReader(r) + if err != nil { + t.Fatalf("Error reading config: %s", err) + } + + if c.SiteName != expectedSiteName { + t.Errorf("SiteName incorrect. Got %s want %s", c.SiteName, expectedSiteName) + } + }) + t.Run("OverrideFromEnv", func(t *testing.T) { + c := server.DefaultConfig() + expectedSiteName := "ms.example.org" + + t.Setenv("MINISTREAM_SITENAME", expectedSiteName) + + c.OverrideFromEnv() + + if c.SiteName != expectedSiteName { + t.Errorf("SiteName incorrect. Got %s want %s", c.SiteName, expectedSiteName) + } + }) +} diff --git a/server/server.go b/server/server.go index bb8e588..653af4e 100644 --- a/server/server.go +++ b/server/server.go @@ -19,15 +19,15 @@ import ( var static embed.FS type Server struct { - users *UserStore streams *StreamStore + config *Config http.Server } -func NewServer(store *UserStore) *Server { +func NewServer(config *Config) *Server { srv := &Server{ - users: store, - streams: NewStreamStore(), + streams: NewStreamStore(config), + config: config, } r := chi.NewRouter() @@ -41,6 +41,7 @@ func NewServer(store *UserStore) *Server { r.Patch("/whip/{streamKey}", srv.PatchHandler) r.Post("/whip/{streamKey}", srv.PostOfferHandler) r.Get("/stats", srv.streams.StatsHandler) + r.Get("/api/siteinfo", srv.InfoHandler) srv.Handler = r @@ -55,6 +56,17 @@ func corsMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(fn) } +func (s *Server) InfoHandler(w http.ResponseWriter, r *http.Request) { + var infoResponse struct { + SiteName string `json:"siteName"` + } + infoResponse.SiteName = s.config.SiteName + + if err := json.NewEncoder(w).Encode(&infoResponse); err != nil { + slog.Warn("Error writing info response") + } +} + func (s *Server) OptionsHandler(w http.ResponseWriter, r *http.Request) { slog.Info("Got OPTIONS") } diff --git a/server/stream.go b/server/stream.go index 5a2957a..6337791 100644 --- a/server/stream.go +++ b/server/stream.go @@ -19,20 +19,24 @@ import ( var ErrNoSuchStream error = fmt.Errorf("no such stream") type StreamStore struct { - Streams map[string]*Stream + Streams map[string]*Stream + + config *Config webRTCConfig webrtc.Configuration mu sync.Mutex } -func NewStreamStore() *StreamStore { +func NewStreamStore(config *Config) *StreamStore { s := &StreamStore{ Streams: make(map[string]*Stream), + config: config, } return s } type Stream struct { + store *StreamStore peerConnection *webrtc.PeerConnection peerConnectionStats map[string]*stats.Stats peerConnectionStatsMu sync.Mutex @@ -96,7 +100,10 @@ func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDesc //i.Add(statsInterceptorFactory) se := webrtc.SettingEngine{} - _ = se.SetEphemeralUDPPortRange(50000, 50050) + _ = se.SetEphemeralUDPPortRange( + uint16(s.store.config.WebRTC.UDPMin), + uint16(s.store.config.WebRTC.UDPMax), + ) webRTCConfig := webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ @@ -179,6 +186,7 @@ func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*web go func() { stream := &Stream{ + store: s, lastUpdate: time.Now(), peerConnectionStats: make(map[string]*stats.Stats), } @@ -226,7 +234,7 @@ func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*web }, } se := webrtc.SettingEngine{} - _ = se.SetEphemeralUDPPortRange(50000, 50050) + _ = se.SetEphemeralUDPPortRange(uint16(s.config.WebRTC.UDPMin), uint16(s.config.WebRTC.UDPMax)) // se.BufferFactory = func(packetType packetio.BufferPacketType, ssrc uint32) io.ReadWriteCloser { // buf := packetio.NewBuffer()