package server import ( "context" "errors" "fmt" "io" "net" "regexp" "sync" "time" "github.com/dustin/go-humanize" "github.com/nareix/joy5/av" "github.com/nareix/joy5/format/rtmp" "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" "go.uber.org/zap" ) var ErrNoSuchItem error = fmt.Errorf("no such stream") type RTMPClient struct { c *rtmp.Conn nc net.Conn server *RTMPServer stream *Stream ctx context.Context ctxCancel context.CancelFunc timestamp time.Time keyframeSent bool bytesSent atomic.Uint64 droppedPackets atomic.Uint64 packetsChan chan av.Packet } func NewRTMPClient(rs *RTMPServer, stream *Stream, c *rtmp.Conn, nc net.Conn) *RTMPClient { ctx, cancel := context.WithCancel(stream.ctx) return &RTMPClient{ c: c, nc: nc, server: rs, stream: stream, packetsChan: make(chan av.Packet, 100), ctx: ctx, ctxCancel: cancel, timestamp: time.Now(), } } type Stream struct { Name string Clients []*RTMPClient clientsLock sync.Mutex server *RTMPServer nc net.Conn c *rtmp.Conn ctx context.Context ctxCancel context.CancelFunc startTime time.Time bytesReceived atomic.Uint64 bytesSent atomic.Uint64 metadataPacket *av.Packet h264DecoderConfigPacket *av.Packet aacDecoderConfigPacket *av.Packet } func (s *Stream) AddClient(client *RTMPClient) { s.clientsLock.Lock() defer s.clientsLock.Unlock() s.Clients = append(s.Clients, client) s.server.Logger.Infow("Added client to stream.", "stream_name", s.Name) } func (s *Stream) RemoveClient(client *RTMPClient) { for i, c := range s.Clients { if c == client { s.clientsLock.Lock() defer s.clientsLock.Unlock() copy(s.Clients[i:], s.Clients[i+1:]) s.Clients[len(s.Clients)-1] = nil // or the zero value of T s.Clients = s.Clients[:len(s.Clients)-1] continue } } } func (s *Stream) handleBroadcaster(c *rtmp.Conn, nc net.Conn) { s.nc = nc s.c = c logger := s.server.Logger.Desugar() s.startTime = time.Now() // Ensure cleanup defer func() { // Ensure all clients have been removed ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() for len(s.Clients) > 0 { select { case <-ctx.Done(): // Timed out while waiting for clients to disconnect, log and remove stream. s.server.RemoveStream(s) s.server.Logger.Errorw("Timeout while waiting for clients to disconnect from stream.", "stream_name", s.Name, "stream_duration", time.Since(s.startTime), "bytes_received", humanize.Bytes(s.bytesReceived.Load())) return default: time.Sleep(200 * time.Millisecond) } } // Once all clients have been removed, remove stream from server s.server.RemoveStream(s) s.server.Logger.Infow("Stream has been successfully stopped.", "stream_name", s.Name, "stream_duration", time.Since(s.startTime), "bytes_received", humanize.Bytes(s.bytesReceived.Load())) }() for { nc.SetReadDeadline(time.Now().Add(5 * time.Second)) // Check if stream has been cancelled select { case <-s.ctx.Done(): // Stream is cancelled s.server.Logger.Debugw("Stream context has been cancelled.", "stream_name", s.Name) nc.Close() return default: } pkt, err := c.ReadPacket() if err != nil { // TODO: Moar better pls if err == io.EOF { // Stream ended nc.Close() // Cancel stream context, to ensure all clients are disconnected s.server.Logger.Infow("Stream disconnected cleanly.", "stream_name", s.Name) s.ctxCancel() } nc.Close() } // Increase received counter s.bytesReceived.Add(uint64(len(pkt.Data))) switch pkt.Type { case av.Metadata: s.server.Logger.Debugw("Got metadata packet for stream.", "stream_name", s.Name) s.metadataPacket = &pkt continue case av.AACDecoderConfig: s.server.Logger.Debugw("Got AAC decoder config packet for stream.", "stream_name", s.Name) s.aacDecoderConfigPacket = &pkt continue case av.H264DecoderConfig: s.server.Logger.Debugw("Got h264 decoder config packet for stream.", "stream_name", s.Name) s.h264DecoderConfigPacket = &pkt continue } // TODO: Write packet to buffer, to be used read by client handlers for _, client := range s.Clients { select { case client.packetsChan <- pkt: s.bytesSent.Add(uint64(len(pkt.Data))) continue default: // TODO: Should we stop trying to send packets until buffer is emptyish? dropped := client.droppedPackets.Inc() if dropped%1000 == 0 { logger.Info("Multiple packets to client have been dropped.", zap.String("client_addr", client.nc.RemoteAddr().String()), zap.Uint64("dropped_packets", dropped)) } } } } } func (client *RTMPClient) handleClient() { server := client.server stream := client.stream // ensure cleanup defer func() { client.nc.Close() stream.RemoveClient(client) server.Logger.Infow("Disconnected viewer.", "client_addr", client.nc.RemoteAddr().String(), "stream_name", stream.Name, "watch_duration", time.Since(client.timestamp), "bytes_sent", humanize.Bytes(client.bytesSent.Load()), "dropped_packets", client.droppedPackets.Load()) }() server.Logger.Infow("Viewer connected.", "stream", stream.Name, "client_addr", client.nc.RemoteAddr().String()) // First ensure that metadata, and decoder configs are set // TODO: Ensure stream isnt cancelled before sending metadata and decoder configs for stream.aacDecoderConfigPacket == nil || stream.metadataPacket == nil || stream.h264DecoderConfigPacket == nil { server.Logger.Debugw("Viewer is waiting for stream to receive metadata or decoder config packets.", client.nc.RemoteAddr().String()) time.Sleep(time.Second) } if err := client.c.WritePacket(*stream.metadataPacket); err != nil { server.Logger.Info("Error writing metadata-packet to viewer.", "err", err) } if err := client.c.WritePacket(*stream.aacDecoderConfigPacket); err != nil { server.Logger.Info("Error writing aacdecoderconfig-packet to viewer.", "err", err) } if err := client.c.WritePacket(*stream.h264DecoderConfigPacket); err != nil { server.Logger.Info("Error writing aacdecoderconfig-packet to viewer.", "err", err) } server.Logger.Debugw("Wrote metadata and decoder configs to viewer.", "client_addr", client.nc.RemoteAddr().String()) for { var errCount int select { case <-client.ctx.Done(): // Client has been cancelled, stop writing packets server.Logger.Infow("Client context cancelled.", "client_addr", client.nc.RemoteAddr().String(), "stream_name", stream.Name) return default: } pkt := <-client.packetsChan // Wait for keyframe before sending video if !pkt.IsKeyFrame && !client.keyframeSent { continue } else if !client.keyframeSent { // Assume that the packet will get sent client.keyframeSent = true server.Logger.Debugw("Got keyframe. Sending video.", "client_addr", client.nc.RemoteAddr().String(), "stream_name", stream.Name) } client.nc.SetWriteDeadline(time.Now().Add(1 * time.Second)) if err := client.c.WritePacket(pkt); err != nil { netErr, ok := err.(*net.OpError) if ok && netErr.Temporary() { // Temporary error, so we should retry? errCount++ time.Sleep(1 * time.Second) continue } if errCount > 9 { server.Logger.Warnw("Too many errors from viewer. Disconnecting.", "client_addr", client.nc.RemoteAddr().String(), "stream_name", stream.Name) return } // TODO: Handle client disconnections properly server.Logger.Debugw("Error while writing packet to viewer.", "client_addr", client.nc.RemoteAddr().String(), "error_message", err) return } client.bytesSent.Add(uint64(len(pkt.Data))) } } type RTMPServer struct { ListenAddr string Hostname string Logger *zap.SugaredLogger streamsLock sync.RWMutex streams map[string]*Stream ctx context.Context ctxCancel context.CancelFunc } func NewRTMPServer(ctx context.Context, addr string) *RTMPServer { serverCtx, cancel := context.WithCancel(ctx) rs := &RTMPServer{ Hostname: "localhost", ListenAddr: addr, Logger: zap.NewNop().Sugar(), streams: make(map[string]*Stream), ctx: serverCtx, ctxCancel: cancel, } return rs } func (rs *RTMPServer) AddStream(stream *Stream) { stream.server = rs rs.streamsLock.Lock() defer rs.streamsLock.Unlock() rs.streams[stream.Name] = stream rs.Logger.Infow("New stream added to server.", "stream_name", stream.Name) } func (rs *RTMPServer) RemoveStream(stream *Stream) { rs.streamsLock.Lock() defer rs.streamsLock.Unlock() delete(rs.streams, stream.Name) } func (rs *RTMPServer) Listen() error { listener, err := net.Listen("tcp", rs.ListenAddr) if err != nil { return err } defer listener.Close() s := rtmp.NewServer() s.HandleConn = rs.handleConn rs.Logger.Info("RTMP server listening for connections.") go func() { <-rs.ctx.Done() listener.Close() }() for { nc, err := listener.Accept() if err != nil { if errors.Is(err, net.ErrClosed) { // Wait for streams to disconnect for len(rs.streams) > 0 { time.Sleep(200 * time.Millisecond) } rs.Logger.Infow("RTMP server stopped.") return nil } // Connection not closed, wait and try again rs.Logger.Warnw("Error while accepting connection", "error", err) time.Sleep(time.Second) continue } go s.HandleNetConn(nc) } } func (rs *RTMPServer) EnableMetrics() { rs.Logger.Debug("Enabling metrics-collection for RTMP.") currentStreams := prometheus.NewGauge( prometheus.GaugeOpts{ Name: "dogtamer_current_streams", Help: "Currently running streams.", }, ) prometheus.MustRegister(currentStreams) currentClients := prometheus.NewGauge( prometheus.GaugeOpts{ Name: "dogtamer_current_clients", Help: "Currently connected clients.", }, ) prometheus.MustRegister(currentClients) bytesReceived := prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "dogtamer_bytes_received", Help: "Bytes received from streams.", }, []string{"stream_name"}, ) prometheus.MustRegister(bytesReceived) bytesSent := prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "dogtamer_bytes_sent", Help: "Bytes received sent to clients.", }, []string{"stream_name"}, ) prometheus.MustRegister(bytesSent) droppedPackets := prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "dogtamer_dropped_packets", Help: "Packets which got dropped instead of sent to clients.", }, []string{"stream_name"}, ) prometheus.MustRegister(droppedPackets) go func() { ticker := time.NewTicker(5 * time.Second) rs.Logger.Debug("RTMP metric collection started.") for { select { case <-rs.ctx.Done(): rs.Logger.Debug("RTMP metric collection stopped.") return case <-ticker.C: } currentStreams.Set(float64(len(rs.streams))) var clientCount int rs.streamsLock.RLock() for _, stream := range rs.streams { clientCount += len(stream.Clients) bytesReceived.WithLabelValues(stream.Name).Set(float64(stream.bytesReceived.Load())) for _, client := range stream.Clients { bytesSent.WithLabelValues(stream.Name).Set(float64(client.bytesSent.Load())) droppedPackets.WithLabelValues(stream.Name).Set(float64(client.droppedPackets.Load())) } } rs.streamsLock.RUnlock() currentClients.Set(float64(clientCount)) } }() } func (rs *RTMPServer) handleConn(c *rtmp.Conn, nc net.Conn) { rs.Logger.Debugw("Incoming connection.", "remote_addr", nc.RemoteAddr(), "url", c.URL.String(), "publishing", c.Publishing) viewerPattern := `^/view/(.+)$` viewerRegexp := regexp.MustCompile(viewerPattern) viewerMatch := viewerRegexp.FindStringSubmatch(c.URL.Path) if viewerMatch != nil { name := viewerMatch[1] stream, ok := rs.streams[name] if ok { client := NewRTMPClient(rs, stream, c, nc) stream.AddClient(client) client.handleClient() return } // No such stream, disconnect viewer nc.Close() rs.Logger.Infow("Closed connection to non-existing stream.", "requested_stream", name) } // Get stream name streamPattern := `^/stream/(.+)$` streamRegexp := regexp.MustCompile(streamPattern) match := streamRegexp.FindStringSubmatch(c.URL.Path) if match != nil { name := match[1] _, ok := rs.streams[name] if ok { // Stream already exists, disconnect this nc.Close() rs.Logger.Warnw("Connection attempted to stream to stream-key which is already broadcasting.", "stream_key", name) return } streamCtx, cancel := context.WithCancel(rs.ctx) stream := &Stream{ Name: name, ctx: streamCtx, ctxCancel: cancel, } rs.AddStream(stream) stream.handleBroadcaster(c, nc) } // Stream URL is invalid, disconnect nc.Close() } type StreamInfo struct { Name string Path string } func (rs *RTMPServer) List() []*StreamInfo { var results []*StreamInfo _, port, _ := net.SplitHostPort(rs.ListenAddr) for _, stream := range rs.streams { results = append(results, &StreamInfo{Name: stream.Name, Path: fmt.Sprintf("rtmp://%s:%s/view/%s", rs.Hostname, port, stream.Name)}) } return results } func (rs *RTMPServer) GetInfo(name string) (*StreamInfo, error) { stream, ok := rs.streams[name] if !ok { return nil, ErrNoSuchItem } _, port, _ := net.SplitHostPort(rs.ListenAddr) return &StreamInfo{ Name: stream.Name, Path: fmt.Sprintf("rtmp://%s:%s/view/%s", rs.Hostname, port, stream.Name), }, nil }