commit 72c84dff1e4b7add62d6db23cd5c62a5f760d755 Author: = Date: Sat Aug 21 16:15:29 2021 +0200 Initial commit diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..1d1fe94 --- /dev/null +++ b/.dockerignore @@ -0,0 +1 @@ +Dockerfile \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..158ae8a --- /dev/null +++ b/Dockerfile @@ -0,0 +1,14 @@ +FROM golang:latest as builder +WORKDIR /app +COPY go.mod /app/go.mod +COPY go.sum /app/go.sum +RUN go mod download +COPY . /app +RUN CGO_ENABLED=0 go build -o dogtamer cmd/dogtamer.go + + +FROM alpine:latest +WORKDIR /app +COPY --from=builder /app/dogtamer /app/dogtamer +EXPOSE 5566 +CMD ["/app/dogtamer", "serve"] \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..e0c96a4 --- /dev/null +++ b/README.md @@ -0,0 +1,20 @@ +# dogtamer + +A small rtmp relay. + +## Plans + +* Add web-interface for viewing streams +* Add tests + +## Usage + +```text +$ go build -o dogtamer cmd/dogtamer.go +$ ./dogtamer serve +``` + + +## TODO + +* Handle client errors better \ No newline at end of file diff --git a/cmd/dogtamer.go b/cmd/dogtamer.go new file mode 100644 index 0000000..1a1d91b --- /dev/null +++ b/cmd/dogtamer.go @@ -0,0 +1,101 @@ +package main + +import ( + "context" + "fmt" + "os" + "os/signal" + + "github.com/urfave/cli/v2" + "github.uio.no/torjus/dogtamer" + "github.uio.no/torjus/dogtamer/server" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + ExitSuccess = 0 + ExitGenericError = 1 +) + +func main() { + cli.VersionPrinter = func(c *cli.Context) { + fmt.Println(dogtamer.GetVersion()) + } + + app := &cli.App{ + Name: "dogtamer", + Version: dogtamer.Version, + Authors: []*cli.Author{ + { + Name: "Torjus HÃ¥kestad", + Email: "torjus@usit.uio.no", + }, + }, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "config", + Usage: "Path to config-file", + Aliases: []string{"c"}, + }, + }, + Commands: []*cli.Command{ + { + Name: "serve", + Usage: "Start dogtamer server", + Action: ActionServe, + }, + }, + } + err := app.Run(os.Args) + if err != nil { + fmt.Printf("Error: %s\n", err) + os.Exit(ExitGenericError) + } +} + +func ActionServe(c *cli.Context) error { + + ctx, cancel := context.WithCancel(context.Background()) + s := server.NewRTMPServer(ctx, ":5566") + logger := setupServerLogger() + s.Logger = logger + + // Listen for SIGINT + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt) + go func() { + <-sigChan + s.Logger.Info("Got shutdown signal.") + cancel() + }() + + if err := s.Listen(); err != nil { + return cli.Exit(err, ExitGenericError) + } + s.Logger.Info("Server shut down.") + return nil +} + +func setupServerLogger() *zap.SugaredLogger { + logEncoderConfig := zap.NewProductionEncoderConfig() + logEncoderConfig.EncodeCaller = zapcore.ShortCallerEncoder + logEncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder + logEncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder + logEncoderConfig.EncodeDuration = zapcore.StringDurationEncoder + + rootLoggerConfig := &zap.Config{ + Level: zap.NewAtomicLevelAt(zap.InfoLevel), + OutputPaths: []string{"stdout"}, + ErrorOutputPaths: []string{"stdout"}, + Encoding: "console", + EncoderConfig: logEncoderConfig, + } + + rootLogger, err := rootLoggerConfig.Build() + if err != nil { + panic(err) + } + + return rootLogger.Named("RTMP").WithOptions().Sugar() +} diff --git a/deploy/dogtamer.service b/deploy/dogtamer.service new file mode 100644 index 0000000..a048854 --- /dev/null +++ b/deploy/dogtamer.service @@ -0,0 +1,15 @@ +[Unit] +Description=Dogtamer Container +After=docker.service +Requires=docker.service + +[Service] +TimeoutStartSec=0 +Restart=always +ExecStartPre=-/usr/bin/docker stop dogtamer +ExecStartPre=-/usr/bin/docker rm dogtamer +ExecStart=/usr/bin/docker run -p 5566:5566 \ + --name dogtamer dogtamer:latest + +[Install] +WantedBy=multi-user.target diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..e28e752 --- /dev/null +++ b/go.mod @@ -0,0 +1,14 @@ +module github.uio.no/torjus/dogtamer + +go 1.17 + +require github.com/urfave/cli/v2 v2.3.0 + +require ( + github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect + github.com/nareix/joy5 v0.0.0-20210317075623-2c912ca30590 // indirect + github.com/russross/blackfriday/v2 v2.1.0 // indirect + go.uber.org/atomic v1.9.0 // indirect + go.uber.org/multierr v1.7.0 // indirect + go.uber.org/zap v1.19.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..7d26cfe --- /dev/null +++ b/go.sum @@ -0,0 +1,53 @@ +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/cpuguy83/go-md2man/v2 v2.0.1 h1:r/myEWzV9lfsM1tFLgDyu0atFtJ1fXn261LKYj/3DxU= +github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/nareix/joy5 v0.0.0-20210317075623-2c912ca30590 h1:PnxRU8L8Y2q82vFC2QdNw23Dm2u6WrjecIdpXjiYbXM= +github.com/nareix/joy5 v0.0.0-20210317075623-2c912ca30590/go.mod h1:XmAOs6UJXpNXRwKk+KY/nv5kL6xXYXyellk+A1pTlko= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= +github.com/spf13/cobra v0.0.4-0.20190109003409-7547e83b2d85/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= +github.com/spf13/pflag v1.0.4-0.20181223182923-24fa6976df40/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/urfave/cli/v2 v2.3.0 h1:qph92Y649prgesehzOrQjdWyxFOp/QVM+6imKHad91M= +github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= +go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= +go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec= +go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= +go.uber.org/zap v1.19.0 h1:mZQZefskPPCMIBCSEH0v2/iUqqLrYtaeqwD6FUGUnFE= +go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/server/rtmp.go b/server/rtmp.go new file mode 100644 index 0000000..18857e9 --- /dev/null +++ b/server/rtmp.go @@ -0,0 +1,339 @@ +package server + +import ( + "context" + "errors" + "io" + "net" + "regexp" + "sync" + "time" + + "github.com/nareix/joy5/av" + "github.com/nareix/joy5/format/rtmp" + "go.uber.org/zap" +) + +type RTMPClient struct { + c *rtmp.Conn + nc net.Conn + server *RTMPServer + stream *Stream + ctx context.Context + ctxCancel context.CancelFunc + + keyframeSent bool + + packetsChan chan av.Packet +} + +func NewRTMPClient(rs *RTMPServer, stream *Stream, c *rtmp.Conn, nc net.Conn) *RTMPClient { + ctx, cancel := context.WithCancel(stream.ctx) + return &RTMPClient{ + c: c, + nc: nc, + server: rs, + stream: stream, + packetsChan: make(chan av.Packet, 10), + ctx: ctx, + ctxCancel: cancel, + } +} + +type Stream struct { + Name string + Clients []*RTMPClient + clientsLock sync.Mutex + server *RTMPServer + nc net.Conn + c *rtmp.Conn + ctx context.Context + ctxCancel context.CancelFunc + startTime time.Time + + metadataPacket *av.Packet + h264DecoderConfigPacket *av.Packet + aacDecoderConfigPacket *av.Packet +} + +func (s *Stream) AddClient(client *RTMPClient) { + s.clientsLock.Lock() + defer s.clientsLock.Unlock() + s.Clients = append(s.Clients, client) + s.server.Logger.Infow("Added client to stream.", "stream_name", s.Name) +} + +func (s *Stream) RemoveClient(client *RTMPClient) { + for i, c := range s.Clients { + if c == client { + s.clientsLock.Lock() + defer s.clientsLock.Unlock() + copy(s.Clients[i:], s.Clients[i+1:]) + s.Clients[len(s.Clients)-1] = nil // or the zero value of T + s.Clients = s.Clients[:len(s.Clients)-1] + continue + } + } +} + +func (s *Stream) handleBroadcaster(c *rtmp.Conn, nc net.Conn) { + s.nc = nc + s.c = c + + logger := s.server.Logger.Desugar() + s.startTime = time.Now() + + // Ensure cleanup + defer func() { + // Ensure all clients have been removed + for len(s.Clients) > 0 { + time.Sleep(200 * time.Millisecond) + } + // Once all clients have been removed, remove stream from server + s.server.RemoveStream(s) + s.server.Logger.Infow("Stream has been successfully stopped.", "stream_name", s.Name, "duration_seconds", time.Since(s.startTime)) + }() + + for { + nc.SetReadDeadline(time.Now().Add(5 * time.Second)) + + // Check if stream has been cancelled + select { + case <-s.ctx.Done(): + // Stream is cancelled + s.server.Logger.Debugw("Stream context has been cancelled.", "stream_name", s.Name) + nc.Close() + return + default: + } + + pkt, err := c.ReadPacket() + if err != nil { + // TODO: Moar better pls + if err == io.EOF { + // Stream ended + nc.Close() + // Cancel stream context, to ensure all clients are disconnected + s.server.Logger.Infow("Stream disconnected cleanly.", "stream_name", s.Name) + s.ctxCancel() + } + nc.Close() + } + switch pkt.Type { + case av.Metadata: + s.server.Logger.Debugw("Got metadata packet for stream.", "stream_name", s.Name) + s.metadataPacket = &pkt + continue + case av.AACDecoderConfig: + s.server.Logger.Debugw("Got AAC decoder config packet for stream.", "stream_name", s.Name) + s.aacDecoderConfigPacket = &pkt + continue + case av.H264DecoderConfig: + s.server.Logger.Debugw("Got h264 decoder config packet for stream.", "stream_name", s.Name) + s.h264DecoderConfigPacket = &pkt + continue + } + + // TODO: Write packet to buffer, to be used read by client handlers + for _, client := range s.Clients { + select { + case client.packetsChan <- pkt: + continue + default: + logger.Info("Client channel is blocking.") + } + } + } +} + +func (client *RTMPClient) handleClient() { + server := client.server + stream := client.stream + + // ensure cleanup + defer func() { + client.nc.Close() + stream.RemoveClient(client) + server.Logger.Infow("Disconnected viewer.", "client_addr", client.nc.RemoteAddr().String(), "stream_name", stream.Name) + }() + + server.Logger.Infow("Viewer connected.", "stream", stream.Name, "remote_addr", client.nc.RemoteAddr().String()) + // First ensure that metadata, and decoder configs are set + // TODO: Ensure stream isnt cancelled before sending metadata and decoder configs + for stream.aacDecoderConfigPacket == nil || stream.metadataPacket == nil || stream.h264DecoderConfigPacket == nil { + server.Logger.Debugw("Viewer is waiting for stream to receive metadata or decoder config packets.", client.nc.RemoteAddr().String()) + time.Sleep(time.Second) + } + if err := client.c.WritePacket(*stream.metadataPacket); err != nil { + server.Logger.Info("Error writing metadata-packet to viewer.", "err", err) + } + if err := client.c.WritePacket(*stream.aacDecoderConfigPacket); err != nil { + server.Logger.Info("Error writing aacdecoderconfig-packet to viewer.", "err", err) + } + if err := client.c.WritePacket(*stream.h264DecoderConfigPacket); err != nil { + server.Logger.Info("Error writing aacdecoderconfig-packet to viewer.", "err", err) + } + server.Logger.Debugw("Wrote metadata and decoder configs to viewer.", "client_addr", client.nc.RemoteAddr().String()) + + for { + var errCount int + select { + case <-client.ctx.Done(): + // Client has been cancelled, stop writing packets + server.Logger.Infow("Client context cancelled.", "client_addr", client.nc.RemoteAddr().String(), "stream_name", stream.Name) + return + default: + } + pkt := <-client.packetsChan + + // Wait for keyframe before sending video + if !pkt.IsKeyFrame && !client.keyframeSent { + continue + } else { + // Assume that the packet will get sent + client.keyframeSent = true + } + server.Logger.Debugw("Got keyframe. Sending video.", "client_addr", client.nc.RemoteAddr().String(), "stream_name", stream.Name) + + client.nc.SetWriteDeadline(time.Now().Add(1 * time.Second)) + if err := client.c.WritePacket(pkt); err != nil { + netErr, ok := err.(*net.OpError) + if ok && netErr.Temporary() { + // Temporary error, so we should retry? + errCount++ + time.Sleep(1 * time.Second) + continue + } + if errCount > 9 { + server.Logger.Warnw("Too many errors from viewer. Disconnecting.", "client_addr", client.nc.RemoteAddr().String(), "stream_name", stream.Name) + return + } + // TODO: Handle client disconnections properly + server.Logger.Debugw("Error while writing packet to viewer.", "client_addr", client.nc.RemoteAddr().String(), "error_message", err) + return + } + } +} + +type RTMPServer struct { + ListenAddr string + Logger *zap.SugaredLogger + streamsLock sync.Mutex + streams map[string]*Stream + ctx context.Context + ctxCancel context.CancelFunc +} + +func NewRTMPServer(ctx context.Context, addr string) *RTMPServer { + serverCtx, cancel := context.WithCancel(ctx) + rs := &RTMPServer{ + ListenAddr: addr, + Logger: zap.NewNop().Sugar(), + streams: make(map[string]*Stream), + ctx: serverCtx, + ctxCancel: cancel, + } + return rs +} + +func (rs *RTMPServer) AddStream(stream *Stream) { + stream.server = rs + rs.streamsLock.Lock() + defer rs.streamsLock.Unlock() + rs.streams[stream.Name] = stream + rs.Logger.Infow("New stream added to server.", "stream_name", stream.Name) +} + +func (rs *RTMPServer) RemoveStream(stream *Stream) { + rs.streamsLock.Lock() + defer rs.streamsLock.Unlock() + delete(rs.streams, stream.Name) +} + +func (rs *RTMPServer) Listen() error { + listener, err := net.Listen("tcp", rs.ListenAddr) + if err != nil { + return err + } + defer listener.Close() + + s := rtmp.NewServer() + s.HandleConn = rs.handleConn + + rs.Logger.Info("RTMP server listening for connections.") + go func() { + <-rs.ctx.Done() + listener.Close() + }() + + for { + nc, err := listener.Accept() + if err != nil { + if errors.Is(err, net.ErrClosed) { + // Wait for streams to disconnect + for len(rs.streams) > 0 { + time.Sleep(200 * time.Millisecond) + } + rs.Logger.Infow("Server stopped.") + return nil + } + // Connection not closed, wait and try again + rs.Logger.Warnw("Error while accepting connection", "error", err) + time.Sleep(time.Second) + continue + } + + go s.HandleNetConn(nc) + } +} + +func (rs *RTMPServer) handleConn(c *rtmp.Conn, nc net.Conn) { + rs.Logger.Debugw("Incoming connection.", "remote_addr", nc.RemoteAddr(), "url", c.URL.String(), "publishing", c.Publishing) + viewerPattern := `^/view/(.+)$` + viewerRegexp := regexp.MustCompile(viewerPattern) + + viewerMatch := viewerRegexp.FindStringSubmatch(c.URL.Path) + if viewerMatch != nil { + name := viewerMatch[1] + stream, ok := rs.streams[name] + if ok { + client := NewRTMPClient(rs, stream, c, nc) + stream.AddClient(client) + client.handleClient() + return + } + + // No such stream, disconnect viewer + nc.Close() + rs.Logger.Infow("Closed connection to non-existing stream.", "requested_stream", name) + } + + // Get stream name + streamPattern := `^/stream/(.+)$` + streamRegexp := regexp.MustCompile(streamPattern) + match := streamRegexp.FindStringSubmatch(c.URL.Path) + if match != nil { + name := match[1] + + _, ok := rs.streams[name] + if ok { + // Stream already exists, disconnect this + nc.Close() + rs.Logger.Warnw("Connection attempted to stream to stream-key which is already broadcasting.", "stream_key", name) + return + } + + streamCtx, cancel := context.WithCancel(rs.ctx) + + stream := &Stream{ + Name: name, + ctx: streamCtx, + ctxCancel: cancel, + } + rs.AddStream(stream) + stream.handleBroadcaster(c, nc) + } + + // Stream URL is invalid, disconnect + nc.Close() +} diff --git a/version.go b/version.go new file mode 100644 index 0000000..4be3f56 --- /dev/null +++ b/version.go @@ -0,0 +1,33 @@ +package dogtamer + +import "fmt" + +const Version string = "v0.1.2" + +var Build string + +func GetVersion() string { + if Build == "" { + return fmt.Sprintf("Dogtamer Version %s", Version) + } else { + return fmt.Sprintf("Dogtamer Version %s (%s)", Version, Build) + } +} + +func GetShortVersion() string { + var s string + if Build == "" { + s = Version + } else { + s = fmt.Sprintf("%s-%s", Version, Build) + } + return s +} + +func GetHeaderVersion() string { + if Build == "" { + return fmt.Sprintf("Dogtamer/%s", Version) + } else { + return fmt.Sprintf("Dogtamer/%s-%s", Version, Build) + } +}