From 80acc281b97703bae5a0fd3086dd6a6d162fc05b Mon Sep 17 00:00:00 2001 From: = Date: Thu, 2 Sep 2021 02:49:11 +0200 Subject: [PATCH] Add various features * HTTP Server * Config files --- .gitignore | 1 + cmd/dogtamer.go | 47 ++++++++++++++++++++++++------------ config/config.go | 41 +++++++++++++++++++++++++++----- config/config_test.go | 24 +++++++++---------- dogtamer.example.toml | 21 +++++++++++++++-- go.mod | 1 + go.sum | 2 ++ server/rtmp.go | 55 ++++++++++++++++++++++++++++++------------- server/web.go | 14 +++++++++-- version.go | 2 +- 10 files changed, 153 insertions(+), 55 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a8db187 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +dogtamer.toml \ No newline at end of file diff --git a/cmd/dogtamer.go b/cmd/dogtamer.go index ec9f1d8..3dd6e78 100644 --- a/cmd/dogtamer.go +++ b/cmd/dogtamer.go @@ -9,6 +9,7 @@ import ( "github.com/urfave/cli/v2" "github.uio.no/torjus/dogtamer" + "github.uio.no/torjus/dogtamer/config" "github.uio.no/torjus/dogtamer/server" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -56,35 +57,51 @@ func main() { } func ActionServe(c *cli.Context) error { + logger := setupServerLogger() + cfg, err := config.FromDefaultLocations() + if err != nil { + logger.Warn("No config file found.") + } ctx, cancel := context.WithCancel(context.Background()) - // Setup rtmp server - s := server.NewRTMPServer(ctx, ":5566") - logger := setupServerLogger() + // Setup RTMP-server + s := server.NewRTMPServer(ctx, cfg.RTMPListenAddr) s.Logger = logger - // Setup web server - ws := server.NewWebServer(ctx, s) - go func() { - s.Logger.Info("Starting web server") - err := ws.Serve() - if err != nil && err != http.ErrServerClosed { - s.Logger.Infow("Web server shut down with error", "err", err) - } - }() - + // Setup web-server + webDone := make(chan struct{}) + if cfg.HTTPServerEnable { + ws := server.NewWebServer(ctx, s) + ws.ListenAddr = cfg.HTTPListenAddr + ws.Logger = logger + go func() { + s.Logger.Info("Starting web-server.") + err := ws.Serve() + if err != nil && err != http.ErrServerClosed { + s.Logger.Infow("Web server shut down with error.", "err", err) + } + close(webDone) + }() + } else { + close(webDone) + } // Listen for SIGINT sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt) go func() { <-sigChan - s.Logger.Info("Got shutdown signal.") + s.Logger.Debug("Got shutdown signal.") cancel() }() + // Start RTMP-server + s.Logger.Info("Starting RTMP-server.") if err := s.Listen(); err != nil { return cli.Exit(err, ExitGenericError) } + + // Wait for webserver to exit, if started + <-webDone s.Logger.Info("Server shut down.") return nil } @@ -97,7 +114,7 @@ func setupServerLogger() *zap.SugaredLogger { logEncoderConfig.EncodeDuration = zapcore.StringDurationEncoder rootLoggerConfig := &zap.Config{ - Level: zap.NewAtomicLevelAt(zap.InfoLevel), + Level: zap.NewAtomicLevelAt(zap.DebugLevel), OutputPaths: []string{"stdout"}, ErrorOutputPaths: []string{"stdout"}, Encoding: "console", diff --git a/config/config.go b/config/config.go index 05b9b72..3012174 100644 --- a/config/config.go +++ b/config/config.go @@ -6,6 +6,7 @@ import ( "io" "os" "path/filepath" + "strings" "github.com/pelletier/go-toml" ) @@ -13,8 +14,11 @@ import ( var ErrNotFound = errors.New("no config file found") type Config struct { - ListenAddr string `toml:"ListenAddr"` - LogLevel string `toml:"LogLevel"` + RTMPListenAddr string `toml:"RTMPListenAddr"` + HTTPServerEnable bool `toml:"HTTPServerEnable"` + HTTPListenAddr string `toml:"HTTPListenAddr"` + HTTPAccessLogEnable bool `toml:"HTTPAccessLogEnable"` + LogLevel string `toml:"LogLevel"` } type InvalidValueError struct { @@ -28,7 +32,9 @@ func (ive *InvalidValueError) Error() string { func FromReader(r io.Reader) (*Config, error) { var c Config // Set some defaults - c.ListenAddr = ":5566" + c.RTMPListenAddr = ":5566" + c.HTTPServerEnable = false + c.HTTPListenAddr = ":8077" c.LogLevel = "INFO" decoder := toml.NewDecoder(r) @@ -53,8 +59,27 @@ func (c *Config) UpdateFromEnv() error { if loglevel, found := os.LookupEnv("DOGTAMER_LOGLEVEL"); found { c.LogLevel = loglevel } - if listenAddr, found := os.LookupEnv("DOGTAMER_LISTENADDR"); found { - c.ListenAddr = listenAddr + + if listenAddr, found := os.LookupEnv("DOGTAMER_RTMPLISTENADDR"); found { + c.RTMPListenAddr = listenAddr + } + + if httpEnable, found := os.LookupEnv("DOGTAMER_HTTPSERVERENABLE"); found { + switch strings.ToUpper(httpEnable) { + case "TRUE", "YES", "ENABLE": + c.HTTPServerEnable = true + } + } + + if httpListenAddr, found := os.LookupEnv("DOGTAMER_HTTPLISTENADDR"); found { + c.HTTPListenAddr = httpListenAddr + } + + if httpAccessLogEnable, found := os.LookupEnv("DOGTAMER_HTTPACCESSLOGENABLE"); found { + switch strings.ToUpper(httpAccessLogEnable) { + case "TRUE", "YES", "ENABLE": + c.HTTPAccessLogEnable = true + } } return c.Verify() @@ -69,7 +94,7 @@ func FromFile(path string) (*Config, error) { return FromReader(f) } -func FromDefaultLocations(path string) (*Config, error) { +func FromDefaultLocations() (*Config, error) { defaultLocations := []string{ "dogtamer.toml", "/etc/dogtamer.toml", @@ -94,5 +119,9 @@ func FromDefaultLocations(path string) (*Config, error) { return cfg, cfg.UpdateFromEnv() } + cfg, err := FromReader(strings.NewReader("")) + if err == nil { + return cfg, ErrNotFound + } return nil, ErrNotFound } diff --git a/config/config_test.go b/config/config_test.go index 81a4aec..2ddd9d0 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -19,8 +19,8 @@ func TestConfig(t *testing.T) { } // Ensure proper defaults are set - if c.ListenAddr != ":5566" { - t.Errorf("Unexpected ListenAddr: %s", c.ListenAddr) + if c.RTMPListenAddr != ":5566" { + t.Errorf("Unexpected RTMPListenAddr: %s", c.RTMPListenAddr) } if c.LogLevel != "INFO" { t.Errorf("Unexpected LogLevel: %s", c.LogLevel) @@ -32,7 +32,7 @@ func TestConfig(t *testing.T) { # Random comment LogLevel = "DEBUG" - ListenAddr = ":5555" + RTMPListenAddr = ":5555" ` sr := strings.NewReader(configString) @@ -41,8 +41,8 @@ func TestConfig(t *testing.T) { t.Fatalf("Error reading config: %s", err) } - if c.ListenAddr != ":5555" { - t.Errorf("Unexpected ListenAddr: %s", c.ListenAddr) + if c.RTMPListenAddr != ":5555" { + t.Errorf("Unexpected RTMPListenAddr: %s", c.RTMPListenAddr) } if c.LogLevel != "DEBUG" { @@ -54,7 +54,7 @@ func TestConfig(t *testing.T) { # Random comment LogLevel = "INVALID" - ListenAddr = ":5555" + RTMPListenAddr = ":5555" ` sr := strings.NewReader(configString) @@ -71,8 +71,8 @@ func TestConfig(t *testing.T) { os.Clearenv() envValues := map[string]string{ - "DOGTAMER_LOGLEVEL": "DEBUG", - "DOGTAMER_LISTENADDR": ":3333", + "DOGTAMER_LOGLEVEL": "DEBUG", + "DOGTAMER_RTMPLISTENADDR": ":3333", } for k, v := range envValues { @@ -92,8 +92,8 @@ func TestConfig(t *testing.T) { t.Errorf("Error updating config with environment") } - if c.ListenAddr != envValues["DOGTAMER_LISTENADDR"] { - t.Errorf("ListenAddr has wrong value: %s", c.ListenAddr) + if c.RTMPListenAddr != envValues["DOGTAMER_RTMPLISTENADDR"] { + t.Errorf("RTMPListenAddr has wrong value: %s", c.RTMPListenAddr) } if c.LogLevel != envValues["DOGTAMER_LOGLEVEL"] { t.Errorf("LogLevel has wrong value: %s", c.LogLevel) @@ -103,8 +103,8 @@ func TestConfig(t *testing.T) { os.Clearenv() envValues := map[string]string{ - "DOGTAMER_LOGLEVEL": "TEST", - "DOGTAMER_LISTENADDR": ":3333", + "DOGTAMER_LOGLEVEL": "TEST", + "DOGTAMER_RTMPLISTENADDR": ":3333", } for k, v := range envValues { diff --git a/dogtamer.example.toml b/dogtamer.example.toml index 3c03129..909957a 100644 --- a/dogtamer.example.toml +++ b/dogtamer.example.toml @@ -1,10 +1,27 @@ # dogtamer example config file -# Address to listen to for incoming connections +# Address to listen to for incoming rtmp connections # Default: ":5566" -ListenAddr = ":5566" +# ENV: DOGTAMER_RTMPLISTENADDR +RTMPListenAddr = ":5566" # Log level # Default: INFO # Possible values: "DEBUG", "INFO", "WARN", "ERROR" +# ENV: DOGTAMER_LOGLEVEL LogLevel = "INFO" + +# Enable HTTP-server +# Default: False +# ENV: DOGTAMER_HTTPSERVERENABLE +HTTPServerEnable = true + +# HTTP Listen address +# Default: ":8077" +# ENV: DOGTAMER_HTTPLISTENADDR +HTTPListenAddr = ":8077" + +# Enable HTTP access logging +# Default: true +# ENV: DOGTAMER_HTTPACCESSLOGENABLE +HTTPAccessLogEnable = true \ No newline at end of file diff --git a/go.mod b/go.mod index aae0ddc..94fd72d 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require github.com/urfave/cli/v2 v2.3.0 require ( github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect + github.com/dustin/go-humanize v1.0.0 // indirect github.com/nareix/joy5 v0.0.0-20210317075623-2c912ca30590 // indirect github.com/pelletier/go-toml v1.9.3 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect diff --git a/go.sum b/go.sum index 17ecdd4..6d64f17 100644 --- a/go.sum +++ b/go.sum @@ -5,6 +5,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.1 h1:r/myEWzV9lfsM1tFLgDyu0atFtJ1fXn261LKY github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= +github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= diff --git a/server/rtmp.go b/server/rtmp.go index 18857e9..d74f798 100644 --- a/server/rtmp.go +++ b/server/rtmp.go @@ -9,8 +9,10 @@ import ( "sync" "time" + "github.com/dustin/go-humanize" "github.com/nareix/joy5/av" "github.com/nareix/joy5/format/rtmp" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -21,9 +23,12 @@ type RTMPClient struct { stream *Stream ctx context.Context ctxCancel context.CancelFunc + timestamp time.Time keyframeSent bool + bytesSent atomic.Uint64 + packetsChan chan av.Packet } @@ -34,22 +39,25 @@ func NewRTMPClient(rs *RTMPServer, stream *Stream, c *rtmp.Conn, nc net.Conn) *R nc: nc, server: rs, stream: stream, - packetsChan: make(chan av.Packet, 10), + packetsChan: make(chan av.Packet, 100), ctx: ctx, ctxCancel: cancel, + timestamp: time.Now(), } } type Stream struct { - Name string - Clients []*RTMPClient - clientsLock sync.Mutex - server *RTMPServer - nc net.Conn - c *rtmp.Conn - ctx context.Context - ctxCancel context.CancelFunc - startTime time.Time + Name string + Clients []*RTMPClient + clientsLock sync.Mutex + server *RTMPServer + nc net.Conn + c *rtmp.Conn + ctx context.Context + ctxCancel context.CancelFunc + startTime time.Time + bytesReceived atomic.Uint64 + bytesSent atomic.Uint64 metadataPacket *av.Packet h264DecoderConfigPacket *av.Packet @@ -91,7 +99,10 @@ func (s *Stream) handleBroadcaster(c *rtmp.Conn, nc net.Conn) { } // Once all clients have been removed, remove stream from server s.server.RemoveStream(s) - s.server.Logger.Infow("Stream has been successfully stopped.", "stream_name", s.Name, "duration_seconds", time.Since(s.startTime)) + s.server.Logger.Infow("Stream has been successfully stopped.", + "stream_name", s.Name, + "stream_duration", time.Since(s.startTime), + "bytes_received", humanize.Bytes(s.bytesReceived.Load())) }() for { @@ -119,6 +130,10 @@ func (s *Stream) handleBroadcaster(c *rtmp.Conn, nc net.Conn) { } nc.Close() } + + // Increase received counter + s.bytesReceived.Add(uint64(len(pkt.Data))) + switch pkt.Type { case av.Metadata: s.server.Logger.Debugw("Got metadata packet for stream.", "stream_name", s.Name) @@ -138,9 +153,10 @@ func (s *Stream) handleBroadcaster(c *rtmp.Conn, nc net.Conn) { for _, client := range s.Clients { select { case client.packetsChan <- pkt: + s.bytesSent.Add(uint64(len(pkt.Data))) continue default: - logger.Info("Client channel is blocking.") + logger.Info("Client channel is blocking.", zap.String("client_addr", client.nc.RemoteAddr().String())) } } } @@ -154,10 +170,14 @@ func (client *RTMPClient) handleClient() { defer func() { client.nc.Close() stream.RemoveClient(client) - server.Logger.Infow("Disconnected viewer.", "client_addr", client.nc.RemoteAddr().String(), "stream_name", stream.Name) + server.Logger.Infow("Disconnected viewer.", + "client_addr", client.nc.RemoteAddr().String(), + "stream_name", stream.Name, + "watch_duration", time.Since(client.timestamp), + "bytes_sent", humanize.Bytes(client.bytesSent.Load())) }() - server.Logger.Infow("Viewer connected.", "stream", stream.Name, "remote_addr", client.nc.RemoteAddr().String()) + server.Logger.Infow("Viewer connected.", "stream", stream.Name, "client_addr", client.nc.RemoteAddr().String()) // First ensure that metadata, and decoder configs are set // TODO: Ensure stream isnt cancelled before sending metadata and decoder configs for stream.aacDecoderConfigPacket == nil || stream.metadataPacket == nil || stream.h264DecoderConfigPacket == nil { @@ -189,11 +209,11 @@ func (client *RTMPClient) handleClient() { // Wait for keyframe before sending video if !pkt.IsKeyFrame && !client.keyframeSent { continue - } else { + } else if !client.keyframeSent { // Assume that the packet will get sent client.keyframeSent = true + server.Logger.Debugw("Got keyframe. Sending video.", "client_addr", client.nc.RemoteAddr().String(), "stream_name", stream.Name) } - server.Logger.Debugw("Got keyframe. Sending video.", "client_addr", client.nc.RemoteAddr().String(), "stream_name", stream.Name) client.nc.SetWriteDeadline(time.Now().Add(1 * time.Second)) if err := client.c.WritePacket(pkt); err != nil { @@ -212,6 +232,7 @@ func (client *RTMPClient) handleClient() { server.Logger.Debugw("Error while writing packet to viewer.", "client_addr", client.nc.RemoteAddr().String(), "error_message", err) return } + client.bytesSent.Add(uint64(len(pkt.Data))) } } @@ -274,7 +295,7 @@ func (rs *RTMPServer) Listen() error { for len(rs.streams) > 0 { time.Sleep(200 * time.Millisecond) } - rs.Logger.Infow("Server stopped.") + rs.Logger.Infow("RTMP server stopped.") return nil } // Connection not closed, wait and try again diff --git a/server/web.go b/server/web.go index ae0cc71..297aa59 100644 --- a/server/web.go +++ b/server/web.go @@ -12,6 +12,7 @@ import ( type WebServer struct { Logger *zap.SugaredLogger + ListenAddr string ctx context.Context rtmpServer *RTMPServer httpServer *http.Server @@ -22,23 +23,32 @@ func NewWebServer(ctx context.Context, rs *RTMPServer) *WebServer { ctx: ctx, rtmpServer: rs, Logger: zap.NewNop().Sugar(), + ListenAddr: ":8077", } } func (ws *WebServer) Serve() error { ws.httpServer = &http.Server{ - Addr: ":8077", + Addr: ws.ListenAddr, Handler: http.HandlerFunc(ws.IndexHandler), } go func() { <-ws.ctx.Done() + ws.Logger.Debugw("HTTP shutdown signal received.") shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _ = ws.httpServer.Shutdown(shutdownCtx) }() - return ws.httpServer.ListenAndServe() + err := ws.httpServer.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + ws.Logger.Warnw("HTTP Server stopped with error.", "error", err) + return err + } + ws.Logger.Info("HTTP server stopped.") + + return nil } func (ws *WebServer) IndexHandler(w http.ResponseWriter, r *http.Request) { diff --git a/version.go b/version.go index 4be3f56..942137a 100644 --- a/version.go +++ b/version.go @@ -2,7 +2,7 @@ package dogtamer import "fmt" -const Version string = "v0.1.2" +const Version string = "v0.1.3" var Build string