dogtamer/server/rtmp.go

417 lines
11 KiB
Go

package server
import (
"context"
"errors"
"fmt"
"io"
"net"
"regexp"
"sync"
"time"
"github.com/dustin/go-humanize"
"github.com/nareix/joy5/av"
"github.com/nareix/joy5/format/rtmp"
"go.uber.org/atomic"
"go.uber.org/zap"
)
var ErrNoSuchItem error = fmt.Errorf("no such stream")
type RTMPClient struct {
c *rtmp.Conn
nc net.Conn
server *RTMPServer
stream *Stream
ctx context.Context
ctxCancel context.CancelFunc
timestamp time.Time
keyframeSent bool
bytesSent atomic.Uint64
droppedPackets atomic.Uint64
packetsChan chan av.Packet
}
func NewRTMPClient(rs *RTMPServer, stream *Stream, c *rtmp.Conn, nc net.Conn) *RTMPClient {
ctx, cancel := context.WithCancel(stream.ctx)
return &RTMPClient{
c: c,
nc: nc,
server: rs,
stream: stream,
packetsChan: make(chan av.Packet, 100),
ctx: ctx,
ctxCancel: cancel,
timestamp: time.Now(),
}
}
type Stream struct {
Name string
Clients []*RTMPClient
clientsLock sync.Mutex
server *RTMPServer
nc net.Conn
c *rtmp.Conn
ctx context.Context
ctxCancel context.CancelFunc
startTime time.Time
bytesReceived atomic.Uint64
bytesSent atomic.Uint64
metadataPacket *av.Packet
h264DecoderConfigPacket *av.Packet
aacDecoderConfigPacket *av.Packet
}
func (s *Stream) AddClient(client *RTMPClient) {
s.clientsLock.Lock()
defer s.clientsLock.Unlock()
s.Clients = append(s.Clients, client)
s.server.Logger.Infow("Added client to stream.", "stream_name", s.Name)
}
func (s *Stream) RemoveClient(client *RTMPClient) {
for i, c := range s.Clients {
if c == client {
s.clientsLock.Lock()
defer s.clientsLock.Unlock()
copy(s.Clients[i:], s.Clients[i+1:])
s.Clients[len(s.Clients)-1] = nil // or the zero value of T
s.Clients = s.Clients[:len(s.Clients)-1]
continue
}
}
}
func (s *Stream) handleBroadcaster(c *rtmp.Conn, nc net.Conn) {
s.nc = nc
s.c = c
logger := s.server.Logger.Desugar()
s.startTime = time.Now()
// Ensure cleanup
defer func() {
// Ensure all clients have been removed
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
for len(s.Clients) > 0 {
select {
case <-ctx.Done():
// Timed out while waiting for clients to disconnect, log and remove stream.
s.server.RemoveStream(s)
s.server.Logger.Errorw("Timeout while waiting for clients to disconnect from stream.",
"stream_name", s.Name,
"stream_duration", time.Since(s.startTime),
"bytes_received", humanize.Bytes(s.bytesReceived.Load()))
return
default:
time.Sleep(200 * time.Millisecond)
}
}
// Once all clients have been removed, remove stream from server
s.server.RemoveStream(s)
s.server.Logger.Infow("Stream has been successfully stopped.",
"stream_name", s.Name,
"stream_duration", time.Since(s.startTime),
"bytes_received", humanize.Bytes(s.bytesReceived.Load()))
}()
for {
nc.SetReadDeadline(time.Now().Add(5 * time.Second))
// Check if stream has been cancelled
select {
case <-s.ctx.Done():
// Stream is cancelled
s.server.Logger.Debugw("Stream context has been cancelled.", "stream_name", s.Name)
nc.Close()
return
default:
}
pkt, err := c.ReadPacket()
if err != nil {
// TODO: Moar better pls
if err == io.EOF {
// Stream ended
nc.Close()
// Cancel stream context, to ensure all clients are disconnected
s.server.Logger.Infow("Stream disconnected cleanly.", "stream_name", s.Name)
s.ctxCancel()
}
nc.Close()
}
// Increase received counter
s.bytesReceived.Add(uint64(len(pkt.Data)))
switch pkt.Type {
case av.Metadata:
s.server.Logger.Debugw("Got metadata packet for stream.", "stream_name", s.Name)
s.metadataPacket = &pkt
continue
case av.AACDecoderConfig:
s.server.Logger.Debugw("Got AAC decoder config packet for stream.", "stream_name", s.Name)
s.aacDecoderConfigPacket = &pkt
continue
case av.H264DecoderConfig:
s.server.Logger.Debugw("Got h264 decoder config packet for stream.", "stream_name", s.Name)
s.h264DecoderConfigPacket = &pkt
continue
}
// TODO: Write packet to buffer, to be used read by client handlers
for _, client := range s.Clients {
select {
case client.packetsChan <- pkt:
s.bytesSent.Add(uint64(len(pkt.Data)))
continue
default:
// TODO: Should we stop trying to send packets until buffer is emptyish?
dropped := client.droppedPackets.Inc()
if dropped%1000 == 0 {
logger.Info("Multiple packets to client have been dropped.",
zap.String("client_addr", client.nc.RemoteAddr().String()),
zap.Uint64("dropped_packets", dropped))
}
}
}
}
}
func (client *RTMPClient) handleClient() {
server := client.server
stream := client.stream
// ensure cleanup
defer func() {
client.nc.Close()
stream.RemoveClient(client)
server.Logger.Infow("Disconnected viewer.",
"client_addr", client.nc.RemoteAddr().String(),
"stream_name", stream.Name,
"watch_duration", time.Since(client.timestamp),
"bytes_sent", humanize.Bytes(client.bytesSent.Load()),
"dropped_packets", client.droppedPackets.Load())
}()
server.Logger.Infow("Viewer connected.", "stream", stream.Name, "client_addr", client.nc.RemoteAddr().String())
// First ensure that metadata, and decoder configs are set
// TODO: Ensure stream isnt cancelled before sending metadata and decoder configs
for stream.aacDecoderConfigPacket == nil || stream.metadataPacket == nil || stream.h264DecoderConfigPacket == nil {
server.Logger.Debugw("Viewer is waiting for stream to receive metadata or decoder config packets.", client.nc.RemoteAddr().String())
time.Sleep(time.Second)
}
if err := client.c.WritePacket(*stream.metadataPacket); err != nil {
server.Logger.Info("Error writing metadata-packet to viewer.", "err", err)
}
if err := client.c.WritePacket(*stream.aacDecoderConfigPacket); err != nil {
server.Logger.Info("Error writing aacdecoderconfig-packet to viewer.", "err", err)
}
if err := client.c.WritePacket(*stream.h264DecoderConfigPacket); err != nil {
server.Logger.Info("Error writing aacdecoderconfig-packet to viewer.", "err", err)
}
server.Logger.Debugw("Wrote metadata and decoder configs to viewer.", "client_addr", client.nc.RemoteAddr().String())
for {
var errCount int
select {
case <-client.ctx.Done():
// Client has been cancelled, stop writing packets
server.Logger.Infow("Client context cancelled.", "client_addr", client.nc.RemoteAddr().String(), "stream_name", stream.Name)
return
default:
}
pkt := <-client.packetsChan
// Wait for keyframe before sending video
if !pkt.IsKeyFrame && !client.keyframeSent {
continue
} else if !client.keyframeSent {
// Assume that the packet will get sent
client.keyframeSent = true
server.Logger.Debugw("Got keyframe. Sending video.", "client_addr", client.nc.RemoteAddr().String(), "stream_name", stream.Name)
}
client.nc.SetWriteDeadline(time.Now().Add(1 * time.Second))
if err := client.c.WritePacket(pkt); err != nil {
netErr, ok := err.(*net.OpError)
if ok && netErr.Temporary() {
// Temporary error, so we should retry?
errCount++
time.Sleep(1 * time.Second)
continue
}
if errCount > 9 {
server.Logger.Warnw("Too many errors from viewer. Disconnecting.", "client_addr", client.nc.RemoteAddr().String(), "stream_name", stream.Name)
return
}
// TODO: Handle client disconnections properly
server.Logger.Debugw("Error while writing packet to viewer.", "client_addr", client.nc.RemoteAddr().String(), "error_message", err)
return
}
client.bytesSent.Add(uint64(len(pkt.Data)))
}
}
type RTMPServer struct {
ListenAddr string
Hostname string
Logger *zap.SugaredLogger
streamsLock sync.Mutex
streams map[string]*Stream
ctx context.Context
ctxCancel context.CancelFunc
}
func NewRTMPServer(ctx context.Context, addr string) *RTMPServer {
serverCtx, cancel := context.WithCancel(ctx)
rs := &RTMPServer{
Hostname: "localhost",
ListenAddr: addr,
Logger: zap.NewNop().Sugar(),
streams: make(map[string]*Stream),
ctx: serverCtx,
ctxCancel: cancel,
}
return rs
}
func (rs *RTMPServer) AddStream(stream *Stream) {
stream.server = rs
rs.streamsLock.Lock()
defer rs.streamsLock.Unlock()
rs.streams[stream.Name] = stream
rs.Logger.Infow("New stream added to server.", "stream_name", stream.Name)
}
func (rs *RTMPServer) RemoveStream(stream *Stream) {
rs.streamsLock.Lock()
defer rs.streamsLock.Unlock()
delete(rs.streams, stream.Name)
}
func (rs *RTMPServer) Listen() error {
listener, err := net.Listen("tcp", rs.ListenAddr)
if err != nil {
return err
}
defer listener.Close()
s := rtmp.NewServer()
s.HandleConn = rs.handleConn
rs.Logger.Info("RTMP server listening for connections.")
go func() {
<-rs.ctx.Done()
listener.Close()
}()
for {
nc, err := listener.Accept()
if err != nil {
if errors.Is(err, net.ErrClosed) {
// Wait for streams to disconnect
for len(rs.streams) > 0 {
time.Sleep(200 * time.Millisecond)
}
rs.Logger.Infow("RTMP server stopped.")
return nil
}
// Connection not closed, wait and try again
rs.Logger.Warnw("Error while accepting connection", "error", err)
time.Sleep(time.Second)
continue
}
go s.HandleNetConn(nc)
}
}
func (rs *RTMPServer) handleConn(c *rtmp.Conn, nc net.Conn) {
rs.Logger.Debugw("Incoming connection.", "remote_addr", nc.RemoteAddr(), "url", c.URL.String(), "publishing", c.Publishing)
viewerPattern := `^/view/(.+)$`
viewerRegexp := regexp.MustCompile(viewerPattern)
viewerMatch := viewerRegexp.FindStringSubmatch(c.URL.Path)
if viewerMatch != nil {
name := viewerMatch[1]
stream, ok := rs.streams[name]
if ok {
client := NewRTMPClient(rs, stream, c, nc)
stream.AddClient(client)
client.handleClient()
return
}
// No such stream, disconnect viewer
nc.Close()
rs.Logger.Infow("Closed connection to non-existing stream.", "requested_stream", name)
}
// Get stream name
streamPattern := `^/stream/(.+)$`
streamRegexp := regexp.MustCompile(streamPattern)
match := streamRegexp.FindStringSubmatch(c.URL.Path)
if match != nil {
name := match[1]
_, ok := rs.streams[name]
if ok {
// Stream already exists, disconnect this
nc.Close()
rs.Logger.Warnw("Connection attempted to stream to stream-key which is already broadcasting.", "stream_key", name)
return
}
streamCtx, cancel := context.WithCancel(rs.ctx)
stream := &Stream{
Name: name,
ctx: streamCtx,
ctxCancel: cancel,
}
rs.AddStream(stream)
stream.handleBroadcaster(c, nc)
}
// Stream URL is invalid, disconnect
nc.Close()
}
type StreamInfo struct {
Name string
Path string
}
func (rs *RTMPServer) List() []*StreamInfo {
var results []*StreamInfo
_, port, _ := net.SplitHostPort(rs.ListenAddr)
for _, stream := range rs.streams {
results = append(results, &StreamInfo{Name: stream.Name, Path: fmt.Sprintf("rtmp://%s:%s/view/%s", rs.Hostname, port, stream.Name)})
}
return results
}
func (rs *RTMPServer) GetInfo(name string) (*StreamInfo, error) {
stream, ok := rs.streams[name]
if !ok {
return nil, ErrNoSuchItem
}
_, port, _ := net.SplitHostPort(rs.ListenAddr)
return &StreamInfo{
Name: stream.Name,
Path: fmt.Sprintf("rtmp://%s:%s/view/%s", rs.Hostname, port, stream.Name),
}, nil
}