Add various features

* HTTP Server
* Config files
This commit is contained in:
Torjus Håkestad 2021-09-02 02:49:11 +02:00
parent 1d6e4270ba
commit 80acc281b9
10 changed files with 153 additions and 55 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
dogtamer.toml

View File

@ -9,6 +9,7 @@ import (
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"github.uio.no/torjus/dogtamer" "github.uio.no/torjus/dogtamer"
"github.uio.no/torjus/dogtamer/config"
"github.uio.no/torjus/dogtamer/server" "github.uio.no/torjus/dogtamer/server"
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
@ -56,35 +57,51 @@ func main() {
} }
func ActionServe(c *cli.Context) error { func ActionServe(c *cli.Context) error {
logger := setupServerLogger()
cfg, err := config.FromDefaultLocations()
if err != nil {
logger.Warn("No config file found.")
}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
// Setup rtmp server // Setup RTMP-server
s := server.NewRTMPServer(ctx, ":5566") s := server.NewRTMPServer(ctx, cfg.RTMPListenAddr)
logger := setupServerLogger()
s.Logger = logger s.Logger = logger
// Setup web server // Setup web-server
ws := server.NewWebServer(ctx, s) webDone := make(chan struct{})
go func() { if cfg.HTTPServerEnable {
s.Logger.Info("Starting web server") ws := server.NewWebServer(ctx, s)
err := ws.Serve() ws.ListenAddr = cfg.HTTPListenAddr
if err != nil && err != http.ErrServerClosed { ws.Logger = logger
s.Logger.Infow("Web server shut down with error", "err", err) go func() {
} s.Logger.Info("Starting web-server.")
}() err := ws.Serve()
if err != nil && err != http.ErrServerClosed {
s.Logger.Infow("Web server shut down with error.", "err", err)
}
close(webDone)
}()
} else {
close(webDone)
}
// Listen for SIGINT // Listen for SIGINT
sigChan := make(chan os.Signal, 1) sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt) signal.Notify(sigChan, os.Interrupt)
go func() { go func() {
<-sigChan <-sigChan
s.Logger.Info("Got shutdown signal.") s.Logger.Debug("Got shutdown signal.")
cancel() cancel()
}() }()
// Start RTMP-server
s.Logger.Info("Starting RTMP-server.")
if err := s.Listen(); err != nil { if err := s.Listen(); err != nil {
return cli.Exit(err, ExitGenericError) return cli.Exit(err, ExitGenericError)
} }
// Wait for webserver to exit, if started
<-webDone
s.Logger.Info("Server shut down.") s.Logger.Info("Server shut down.")
return nil return nil
} }
@ -97,7 +114,7 @@ func setupServerLogger() *zap.SugaredLogger {
logEncoderConfig.EncodeDuration = zapcore.StringDurationEncoder logEncoderConfig.EncodeDuration = zapcore.StringDurationEncoder
rootLoggerConfig := &zap.Config{ rootLoggerConfig := &zap.Config{
Level: zap.NewAtomicLevelAt(zap.InfoLevel), Level: zap.NewAtomicLevelAt(zap.DebugLevel),
OutputPaths: []string{"stdout"}, OutputPaths: []string{"stdout"},
ErrorOutputPaths: []string{"stdout"}, ErrorOutputPaths: []string{"stdout"},
Encoding: "console", Encoding: "console",

View File

@ -6,6 +6,7 @@ import (
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"strings"
"github.com/pelletier/go-toml" "github.com/pelletier/go-toml"
) )
@ -13,8 +14,11 @@ import (
var ErrNotFound = errors.New("no config file found") var ErrNotFound = errors.New("no config file found")
type Config struct { type Config struct {
ListenAddr string `toml:"ListenAddr"` RTMPListenAddr string `toml:"RTMPListenAddr"`
LogLevel string `toml:"LogLevel"` HTTPServerEnable bool `toml:"HTTPServerEnable"`
HTTPListenAddr string `toml:"HTTPListenAddr"`
HTTPAccessLogEnable bool `toml:"HTTPAccessLogEnable"`
LogLevel string `toml:"LogLevel"`
} }
type InvalidValueError struct { type InvalidValueError struct {
@ -28,7 +32,9 @@ func (ive *InvalidValueError) Error() string {
func FromReader(r io.Reader) (*Config, error) { func FromReader(r io.Reader) (*Config, error) {
var c Config var c Config
// Set some defaults // Set some defaults
c.ListenAddr = ":5566" c.RTMPListenAddr = ":5566"
c.HTTPServerEnable = false
c.HTTPListenAddr = ":8077"
c.LogLevel = "INFO" c.LogLevel = "INFO"
decoder := toml.NewDecoder(r) decoder := toml.NewDecoder(r)
@ -53,8 +59,27 @@ func (c *Config) UpdateFromEnv() error {
if loglevel, found := os.LookupEnv("DOGTAMER_LOGLEVEL"); found { if loglevel, found := os.LookupEnv("DOGTAMER_LOGLEVEL"); found {
c.LogLevel = loglevel c.LogLevel = loglevel
} }
if listenAddr, found := os.LookupEnv("DOGTAMER_LISTENADDR"); found {
c.ListenAddr = listenAddr if listenAddr, found := os.LookupEnv("DOGTAMER_RTMPLISTENADDR"); found {
c.RTMPListenAddr = listenAddr
}
if httpEnable, found := os.LookupEnv("DOGTAMER_HTTPSERVERENABLE"); found {
switch strings.ToUpper(httpEnable) {
case "TRUE", "YES", "ENABLE":
c.HTTPServerEnable = true
}
}
if httpListenAddr, found := os.LookupEnv("DOGTAMER_HTTPLISTENADDR"); found {
c.HTTPListenAddr = httpListenAddr
}
if httpAccessLogEnable, found := os.LookupEnv("DOGTAMER_HTTPACCESSLOGENABLE"); found {
switch strings.ToUpper(httpAccessLogEnable) {
case "TRUE", "YES", "ENABLE":
c.HTTPAccessLogEnable = true
}
} }
return c.Verify() return c.Verify()
@ -69,7 +94,7 @@ func FromFile(path string) (*Config, error) {
return FromReader(f) return FromReader(f)
} }
func FromDefaultLocations(path string) (*Config, error) { func FromDefaultLocations() (*Config, error) {
defaultLocations := []string{ defaultLocations := []string{
"dogtamer.toml", "dogtamer.toml",
"/etc/dogtamer.toml", "/etc/dogtamer.toml",
@ -94,5 +119,9 @@ func FromDefaultLocations(path string) (*Config, error) {
return cfg, cfg.UpdateFromEnv() return cfg, cfg.UpdateFromEnv()
} }
cfg, err := FromReader(strings.NewReader(""))
if err == nil {
return cfg, ErrNotFound
}
return nil, ErrNotFound return nil, ErrNotFound
} }

View File

@ -19,8 +19,8 @@ func TestConfig(t *testing.T) {
} }
// Ensure proper defaults are set // Ensure proper defaults are set
if c.ListenAddr != ":5566" { if c.RTMPListenAddr != ":5566" {
t.Errorf("Unexpected ListenAddr: %s", c.ListenAddr) t.Errorf("Unexpected RTMPListenAddr: %s", c.RTMPListenAddr)
} }
if c.LogLevel != "INFO" { if c.LogLevel != "INFO" {
t.Errorf("Unexpected LogLevel: %s", c.LogLevel) t.Errorf("Unexpected LogLevel: %s", c.LogLevel)
@ -32,7 +32,7 @@ func TestConfig(t *testing.T) {
# Random comment # Random comment
LogLevel = "DEBUG" LogLevel = "DEBUG"
ListenAddr = ":5555" RTMPListenAddr = ":5555"
` `
sr := strings.NewReader(configString) sr := strings.NewReader(configString)
@ -41,8 +41,8 @@ func TestConfig(t *testing.T) {
t.Fatalf("Error reading config: %s", err) t.Fatalf("Error reading config: %s", err)
} }
if c.ListenAddr != ":5555" { if c.RTMPListenAddr != ":5555" {
t.Errorf("Unexpected ListenAddr: %s", c.ListenAddr) t.Errorf("Unexpected RTMPListenAddr: %s", c.RTMPListenAddr)
} }
if c.LogLevel != "DEBUG" { if c.LogLevel != "DEBUG" {
@ -54,7 +54,7 @@ func TestConfig(t *testing.T) {
# Random comment # Random comment
LogLevel = "INVALID" LogLevel = "INVALID"
ListenAddr = ":5555" RTMPListenAddr = ":5555"
` `
sr := strings.NewReader(configString) sr := strings.NewReader(configString)
@ -71,8 +71,8 @@ func TestConfig(t *testing.T) {
os.Clearenv() os.Clearenv()
envValues := map[string]string{ envValues := map[string]string{
"DOGTAMER_LOGLEVEL": "DEBUG", "DOGTAMER_LOGLEVEL": "DEBUG",
"DOGTAMER_LISTENADDR": ":3333", "DOGTAMER_RTMPLISTENADDR": ":3333",
} }
for k, v := range envValues { for k, v := range envValues {
@ -92,8 +92,8 @@ func TestConfig(t *testing.T) {
t.Errorf("Error updating config with environment") t.Errorf("Error updating config with environment")
} }
if c.ListenAddr != envValues["DOGTAMER_LISTENADDR"] { if c.RTMPListenAddr != envValues["DOGTAMER_RTMPLISTENADDR"] {
t.Errorf("ListenAddr has wrong value: %s", c.ListenAddr) t.Errorf("RTMPListenAddr has wrong value: %s", c.RTMPListenAddr)
} }
if c.LogLevel != envValues["DOGTAMER_LOGLEVEL"] { if c.LogLevel != envValues["DOGTAMER_LOGLEVEL"] {
t.Errorf("LogLevel has wrong value: %s", c.LogLevel) t.Errorf("LogLevel has wrong value: %s", c.LogLevel)
@ -103,8 +103,8 @@ func TestConfig(t *testing.T) {
os.Clearenv() os.Clearenv()
envValues := map[string]string{ envValues := map[string]string{
"DOGTAMER_LOGLEVEL": "TEST", "DOGTAMER_LOGLEVEL": "TEST",
"DOGTAMER_LISTENADDR": ":3333", "DOGTAMER_RTMPLISTENADDR": ":3333",
} }
for k, v := range envValues { for k, v := range envValues {

View File

@ -1,10 +1,27 @@
# dogtamer example config file # dogtamer example config file
# Address to listen to for incoming connections # Address to listen to for incoming rtmp connections
# Default: ":5566" # Default: ":5566"
ListenAddr = ":5566" # ENV: DOGTAMER_RTMPLISTENADDR
RTMPListenAddr = ":5566"
# Log level # Log level
# Default: INFO # Default: INFO
# Possible values: "DEBUG", "INFO", "WARN", "ERROR" # Possible values: "DEBUG", "INFO", "WARN", "ERROR"
# ENV: DOGTAMER_LOGLEVEL
LogLevel = "INFO" LogLevel = "INFO"
# Enable HTTP-server
# Default: False
# ENV: DOGTAMER_HTTPSERVERENABLE
HTTPServerEnable = true
# HTTP Listen address
# Default: ":8077"
# ENV: DOGTAMER_HTTPLISTENADDR
HTTPListenAddr = ":8077"
# Enable HTTP access logging
# Default: true
# ENV: DOGTAMER_HTTPACCESSLOGENABLE
HTTPAccessLogEnable = true

1
go.mod
View File

@ -6,6 +6,7 @@ require github.com/urfave/cli/v2 v2.3.0
require ( require (
github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/nareix/joy5 v0.0.0-20210317075623-2c912ca30590 // indirect github.com/nareix/joy5 v0.0.0-20210317075623-2c912ca30590 // indirect
github.com/pelletier/go-toml v1.9.3 // indirect github.com/pelletier/go-toml v1.9.3 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect

2
go.sum
View File

@ -5,6 +5,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.1 h1:r/myEWzV9lfsM1tFLgDyu0atFtJ1fXn261LKY
github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=

View File

@ -9,8 +9,10 @@ import (
"sync" "sync"
"time" "time"
"github.com/dustin/go-humanize"
"github.com/nareix/joy5/av" "github.com/nareix/joy5/av"
"github.com/nareix/joy5/format/rtmp" "github.com/nareix/joy5/format/rtmp"
"go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -21,9 +23,12 @@ type RTMPClient struct {
stream *Stream stream *Stream
ctx context.Context ctx context.Context
ctxCancel context.CancelFunc ctxCancel context.CancelFunc
timestamp time.Time
keyframeSent bool keyframeSent bool
bytesSent atomic.Uint64
packetsChan chan av.Packet packetsChan chan av.Packet
} }
@ -34,22 +39,25 @@ func NewRTMPClient(rs *RTMPServer, stream *Stream, c *rtmp.Conn, nc net.Conn) *R
nc: nc, nc: nc,
server: rs, server: rs,
stream: stream, stream: stream,
packetsChan: make(chan av.Packet, 10), packetsChan: make(chan av.Packet, 100),
ctx: ctx, ctx: ctx,
ctxCancel: cancel, ctxCancel: cancel,
timestamp: time.Now(),
} }
} }
type Stream struct { type Stream struct {
Name string Name string
Clients []*RTMPClient Clients []*RTMPClient
clientsLock sync.Mutex clientsLock sync.Mutex
server *RTMPServer server *RTMPServer
nc net.Conn nc net.Conn
c *rtmp.Conn c *rtmp.Conn
ctx context.Context ctx context.Context
ctxCancel context.CancelFunc ctxCancel context.CancelFunc
startTime time.Time startTime time.Time
bytesReceived atomic.Uint64
bytesSent atomic.Uint64
metadataPacket *av.Packet metadataPacket *av.Packet
h264DecoderConfigPacket *av.Packet h264DecoderConfigPacket *av.Packet
@ -91,7 +99,10 @@ func (s *Stream) handleBroadcaster(c *rtmp.Conn, nc net.Conn) {
} }
// Once all clients have been removed, remove stream from server // Once all clients have been removed, remove stream from server
s.server.RemoveStream(s) s.server.RemoveStream(s)
s.server.Logger.Infow("Stream has been successfully stopped.", "stream_name", s.Name, "duration_seconds", time.Since(s.startTime)) s.server.Logger.Infow("Stream has been successfully stopped.",
"stream_name", s.Name,
"stream_duration", time.Since(s.startTime),
"bytes_received", humanize.Bytes(s.bytesReceived.Load()))
}() }()
for { for {
@ -119,6 +130,10 @@ func (s *Stream) handleBroadcaster(c *rtmp.Conn, nc net.Conn) {
} }
nc.Close() nc.Close()
} }
// Increase received counter
s.bytesReceived.Add(uint64(len(pkt.Data)))
switch pkt.Type { switch pkt.Type {
case av.Metadata: case av.Metadata:
s.server.Logger.Debugw("Got metadata packet for stream.", "stream_name", s.Name) s.server.Logger.Debugw("Got metadata packet for stream.", "stream_name", s.Name)
@ -138,9 +153,10 @@ func (s *Stream) handleBroadcaster(c *rtmp.Conn, nc net.Conn) {
for _, client := range s.Clients { for _, client := range s.Clients {
select { select {
case client.packetsChan <- pkt: case client.packetsChan <- pkt:
s.bytesSent.Add(uint64(len(pkt.Data)))
continue continue
default: default:
logger.Info("Client channel is blocking.") logger.Info("Client channel is blocking.", zap.String("client_addr", client.nc.RemoteAddr().String()))
} }
} }
} }
@ -154,10 +170,14 @@ func (client *RTMPClient) handleClient() {
defer func() { defer func() {
client.nc.Close() client.nc.Close()
stream.RemoveClient(client) stream.RemoveClient(client)
server.Logger.Infow("Disconnected viewer.", "client_addr", client.nc.RemoteAddr().String(), "stream_name", stream.Name) server.Logger.Infow("Disconnected viewer.",
"client_addr", client.nc.RemoteAddr().String(),
"stream_name", stream.Name,
"watch_duration", time.Since(client.timestamp),
"bytes_sent", humanize.Bytes(client.bytesSent.Load()))
}() }()
server.Logger.Infow("Viewer connected.", "stream", stream.Name, "remote_addr", client.nc.RemoteAddr().String()) server.Logger.Infow("Viewer connected.", "stream", stream.Name, "client_addr", client.nc.RemoteAddr().String())
// First ensure that metadata, and decoder configs are set // First ensure that metadata, and decoder configs are set
// TODO: Ensure stream isnt cancelled before sending metadata and decoder configs // TODO: Ensure stream isnt cancelled before sending metadata and decoder configs
for stream.aacDecoderConfigPacket == nil || stream.metadataPacket == nil || stream.h264DecoderConfigPacket == nil { for stream.aacDecoderConfigPacket == nil || stream.metadataPacket == nil || stream.h264DecoderConfigPacket == nil {
@ -189,11 +209,11 @@ func (client *RTMPClient) handleClient() {
// Wait for keyframe before sending video // Wait for keyframe before sending video
if !pkt.IsKeyFrame && !client.keyframeSent { if !pkt.IsKeyFrame && !client.keyframeSent {
continue continue
} else { } else if !client.keyframeSent {
// Assume that the packet will get sent // Assume that the packet will get sent
client.keyframeSent = true client.keyframeSent = true
server.Logger.Debugw("Got keyframe. Sending video.", "client_addr", client.nc.RemoteAddr().String(), "stream_name", stream.Name)
} }
server.Logger.Debugw("Got keyframe. Sending video.", "client_addr", client.nc.RemoteAddr().String(), "stream_name", stream.Name)
client.nc.SetWriteDeadline(time.Now().Add(1 * time.Second)) client.nc.SetWriteDeadline(time.Now().Add(1 * time.Second))
if err := client.c.WritePacket(pkt); err != nil { if err := client.c.WritePacket(pkt); err != nil {
@ -212,6 +232,7 @@ func (client *RTMPClient) handleClient() {
server.Logger.Debugw("Error while writing packet to viewer.", "client_addr", client.nc.RemoteAddr().String(), "error_message", err) server.Logger.Debugw("Error while writing packet to viewer.", "client_addr", client.nc.RemoteAddr().String(), "error_message", err)
return return
} }
client.bytesSent.Add(uint64(len(pkt.Data)))
} }
} }
@ -274,7 +295,7 @@ func (rs *RTMPServer) Listen() error {
for len(rs.streams) > 0 { for len(rs.streams) > 0 {
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
} }
rs.Logger.Infow("Server stopped.") rs.Logger.Infow("RTMP server stopped.")
return nil return nil
} }
// Connection not closed, wait and try again // Connection not closed, wait and try again

View File

@ -12,6 +12,7 @@ import (
type WebServer struct { type WebServer struct {
Logger *zap.SugaredLogger Logger *zap.SugaredLogger
ListenAddr string
ctx context.Context ctx context.Context
rtmpServer *RTMPServer rtmpServer *RTMPServer
httpServer *http.Server httpServer *http.Server
@ -22,23 +23,32 @@ func NewWebServer(ctx context.Context, rs *RTMPServer) *WebServer {
ctx: ctx, ctx: ctx,
rtmpServer: rs, rtmpServer: rs,
Logger: zap.NewNop().Sugar(), Logger: zap.NewNop().Sugar(),
ListenAddr: ":8077",
} }
} }
func (ws *WebServer) Serve() error { func (ws *WebServer) Serve() error {
ws.httpServer = &http.Server{ ws.httpServer = &http.Server{
Addr: ":8077", Addr: ws.ListenAddr,
Handler: http.HandlerFunc(ws.IndexHandler), Handler: http.HandlerFunc(ws.IndexHandler),
} }
go func() { go func() {
<-ws.ctx.Done() <-ws.ctx.Done()
ws.Logger.Debugw("HTTP shutdown signal received.")
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
_ = ws.httpServer.Shutdown(shutdownCtx) _ = ws.httpServer.Shutdown(shutdownCtx)
}() }()
return ws.httpServer.ListenAndServe() err := ws.httpServer.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
ws.Logger.Warnw("HTTP Server stopped with error.", "error", err)
return err
}
ws.Logger.Info("HTTP server stopped.")
return nil
} }
func (ws *WebServer) IndexHandler(w http.ResponseWriter, r *http.Request) { func (ws *WebServer) IndexHandler(w http.ResponseWriter, r *http.Request) {

View File

@ -2,7 +2,7 @@ package dogtamer
import "fmt" import "fmt"
const Version string = "v0.1.2" const Version string = "v0.1.3"
var Build string var Build string