Initial commit
This commit is contained in:
339
server/rtmp.go
Normal file
339
server/rtmp.go
Normal file
@@ -0,0 +1,339 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"net"
|
||||
"regexp"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/nareix/joy5/av"
|
||||
"github.com/nareix/joy5/format/rtmp"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type RTMPClient struct {
|
||||
c *rtmp.Conn
|
||||
nc net.Conn
|
||||
server *RTMPServer
|
||||
stream *Stream
|
||||
ctx context.Context
|
||||
ctxCancel context.CancelFunc
|
||||
|
||||
keyframeSent bool
|
||||
|
||||
packetsChan chan av.Packet
|
||||
}
|
||||
|
||||
func NewRTMPClient(rs *RTMPServer, stream *Stream, c *rtmp.Conn, nc net.Conn) *RTMPClient {
|
||||
ctx, cancel := context.WithCancel(stream.ctx)
|
||||
return &RTMPClient{
|
||||
c: c,
|
||||
nc: nc,
|
||||
server: rs,
|
||||
stream: stream,
|
||||
packetsChan: make(chan av.Packet, 10),
|
||||
ctx: ctx,
|
||||
ctxCancel: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
type Stream struct {
|
||||
Name string
|
||||
Clients []*RTMPClient
|
||||
clientsLock sync.Mutex
|
||||
server *RTMPServer
|
||||
nc net.Conn
|
||||
c *rtmp.Conn
|
||||
ctx context.Context
|
||||
ctxCancel context.CancelFunc
|
||||
startTime time.Time
|
||||
|
||||
metadataPacket *av.Packet
|
||||
h264DecoderConfigPacket *av.Packet
|
||||
aacDecoderConfigPacket *av.Packet
|
||||
}
|
||||
|
||||
func (s *Stream) AddClient(client *RTMPClient) {
|
||||
s.clientsLock.Lock()
|
||||
defer s.clientsLock.Unlock()
|
||||
s.Clients = append(s.Clients, client)
|
||||
s.server.Logger.Infow("Added client to stream.", "stream_name", s.Name)
|
||||
}
|
||||
|
||||
func (s *Stream) RemoveClient(client *RTMPClient) {
|
||||
for i, c := range s.Clients {
|
||||
if c == client {
|
||||
s.clientsLock.Lock()
|
||||
defer s.clientsLock.Unlock()
|
||||
copy(s.Clients[i:], s.Clients[i+1:])
|
||||
s.Clients[len(s.Clients)-1] = nil // or the zero value of T
|
||||
s.Clients = s.Clients[:len(s.Clients)-1]
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Stream) handleBroadcaster(c *rtmp.Conn, nc net.Conn) {
|
||||
s.nc = nc
|
||||
s.c = c
|
||||
|
||||
logger := s.server.Logger.Desugar()
|
||||
s.startTime = time.Now()
|
||||
|
||||
// Ensure cleanup
|
||||
defer func() {
|
||||
// Ensure all clients have been removed
|
||||
for len(s.Clients) > 0 {
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
}
|
||||
// Once all clients have been removed, remove stream from server
|
||||
s.server.RemoveStream(s)
|
||||
s.server.Logger.Infow("Stream has been successfully stopped.", "stream_name", s.Name, "duration_seconds", time.Since(s.startTime))
|
||||
}()
|
||||
|
||||
for {
|
||||
nc.SetReadDeadline(time.Now().Add(5 * time.Second))
|
||||
|
||||
// Check if stream has been cancelled
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
// Stream is cancelled
|
||||
s.server.Logger.Debugw("Stream context has been cancelled.", "stream_name", s.Name)
|
||||
nc.Close()
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
pkt, err := c.ReadPacket()
|
||||
if err != nil {
|
||||
// TODO: Moar better pls
|
||||
if err == io.EOF {
|
||||
// Stream ended
|
||||
nc.Close()
|
||||
// Cancel stream context, to ensure all clients are disconnected
|
||||
s.server.Logger.Infow("Stream disconnected cleanly.", "stream_name", s.Name)
|
||||
s.ctxCancel()
|
||||
}
|
||||
nc.Close()
|
||||
}
|
||||
switch pkt.Type {
|
||||
case av.Metadata:
|
||||
s.server.Logger.Debugw("Got metadata packet for stream.", "stream_name", s.Name)
|
||||
s.metadataPacket = &pkt
|
||||
continue
|
||||
case av.AACDecoderConfig:
|
||||
s.server.Logger.Debugw("Got AAC decoder config packet for stream.", "stream_name", s.Name)
|
||||
s.aacDecoderConfigPacket = &pkt
|
||||
continue
|
||||
case av.H264DecoderConfig:
|
||||
s.server.Logger.Debugw("Got h264 decoder config packet for stream.", "stream_name", s.Name)
|
||||
s.h264DecoderConfigPacket = &pkt
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO: Write packet to buffer, to be used read by client handlers
|
||||
for _, client := range s.Clients {
|
||||
select {
|
||||
case client.packetsChan <- pkt:
|
||||
continue
|
||||
default:
|
||||
logger.Info("Client channel is blocking.")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (client *RTMPClient) handleClient() {
|
||||
server := client.server
|
||||
stream := client.stream
|
||||
|
||||
// ensure cleanup
|
||||
defer func() {
|
||||
client.nc.Close()
|
||||
stream.RemoveClient(client)
|
||||
server.Logger.Infow("Disconnected viewer.", "client_addr", client.nc.RemoteAddr().String(), "stream_name", stream.Name)
|
||||
}()
|
||||
|
||||
server.Logger.Infow("Viewer connected.", "stream", stream.Name, "remote_addr", client.nc.RemoteAddr().String())
|
||||
// First ensure that metadata, and decoder configs are set
|
||||
// TODO: Ensure stream isnt cancelled before sending metadata and decoder configs
|
||||
for stream.aacDecoderConfigPacket == nil || stream.metadataPacket == nil || stream.h264DecoderConfigPacket == nil {
|
||||
server.Logger.Debugw("Viewer is waiting for stream to receive metadata or decoder config packets.", client.nc.RemoteAddr().String())
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
if err := client.c.WritePacket(*stream.metadataPacket); err != nil {
|
||||
server.Logger.Info("Error writing metadata-packet to viewer.", "err", err)
|
||||
}
|
||||
if err := client.c.WritePacket(*stream.aacDecoderConfigPacket); err != nil {
|
||||
server.Logger.Info("Error writing aacdecoderconfig-packet to viewer.", "err", err)
|
||||
}
|
||||
if err := client.c.WritePacket(*stream.h264DecoderConfigPacket); err != nil {
|
||||
server.Logger.Info("Error writing aacdecoderconfig-packet to viewer.", "err", err)
|
||||
}
|
||||
server.Logger.Debugw("Wrote metadata and decoder configs to viewer.", "client_addr", client.nc.RemoteAddr().String())
|
||||
|
||||
for {
|
||||
var errCount int
|
||||
select {
|
||||
case <-client.ctx.Done():
|
||||
// Client has been cancelled, stop writing packets
|
||||
server.Logger.Infow("Client context cancelled.", "client_addr", client.nc.RemoteAddr().String(), "stream_name", stream.Name)
|
||||
return
|
||||
default:
|
||||
}
|
||||
pkt := <-client.packetsChan
|
||||
|
||||
// Wait for keyframe before sending video
|
||||
if !pkt.IsKeyFrame && !client.keyframeSent {
|
||||
continue
|
||||
} else {
|
||||
// Assume that the packet will get sent
|
||||
client.keyframeSent = true
|
||||
}
|
||||
server.Logger.Debugw("Got keyframe. Sending video.", "client_addr", client.nc.RemoteAddr().String(), "stream_name", stream.Name)
|
||||
|
||||
client.nc.SetWriteDeadline(time.Now().Add(1 * time.Second))
|
||||
if err := client.c.WritePacket(pkt); err != nil {
|
||||
netErr, ok := err.(*net.OpError)
|
||||
if ok && netErr.Temporary() {
|
||||
// Temporary error, so we should retry?
|
||||
errCount++
|
||||
time.Sleep(1 * time.Second)
|
||||
continue
|
||||
}
|
||||
if errCount > 9 {
|
||||
server.Logger.Warnw("Too many errors from viewer. Disconnecting.", "client_addr", client.nc.RemoteAddr().String(), "stream_name", stream.Name)
|
||||
return
|
||||
}
|
||||
// TODO: Handle client disconnections properly
|
||||
server.Logger.Debugw("Error while writing packet to viewer.", "client_addr", client.nc.RemoteAddr().String(), "error_message", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type RTMPServer struct {
|
||||
ListenAddr string
|
||||
Logger *zap.SugaredLogger
|
||||
streamsLock sync.Mutex
|
||||
streams map[string]*Stream
|
||||
ctx context.Context
|
||||
ctxCancel context.CancelFunc
|
||||
}
|
||||
|
||||
func NewRTMPServer(ctx context.Context, addr string) *RTMPServer {
|
||||
serverCtx, cancel := context.WithCancel(ctx)
|
||||
rs := &RTMPServer{
|
||||
ListenAddr: addr,
|
||||
Logger: zap.NewNop().Sugar(),
|
||||
streams: make(map[string]*Stream),
|
||||
ctx: serverCtx,
|
||||
ctxCancel: cancel,
|
||||
}
|
||||
return rs
|
||||
}
|
||||
|
||||
func (rs *RTMPServer) AddStream(stream *Stream) {
|
||||
stream.server = rs
|
||||
rs.streamsLock.Lock()
|
||||
defer rs.streamsLock.Unlock()
|
||||
rs.streams[stream.Name] = stream
|
||||
rs.Logger.Infow("New stream added to server.", "stream_name", stream.Name)
|
||||
}
|
||||
|
||||
func (rs *RTMPServer) RemoveStream(stream *Stream) {
|
||||
rs.streamsLock.Lock()
|
||||
defer rs.streamsLock.Unlock()
|
||||
delete(rs.streams, stream.Name)
|
||||
}
|
||||
|
||||
func (rs *RTMPServer) Listen() error {
|
||||
listener, err := net.Listen("tcp", rs.ListenAddr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer listener.Close()
|
||||
|
||||
s := rtmp.NewServer()
|
||||
s.HandleConn = rs.handleConn
|
||||
|
||||
rs.Logger.Info("RTMP server listening for connections.")
|
||||
go func() {
|
||||
<-rs.ctx.Done()
|
||||
listener.Close()
|
||||
}()
|
||||
|
||||
for {
|
||||
nc, err := listener.Accept()
|
||||
if err != nil {
|
||||
if errors.Is(err, net.ErrClosed) {
|
||||
// Wait for streams to disconnect
|
||||
for len(rs.streams) > 0 {
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
}
|
||||
rs.Logger.Infow("Server stopped.")
|
||||
return nil
|
||||
}
|
||||
// Connection not closed, wait and try again
|
||||
rs.Logger.Warnw("Error while accepting connection", "error", err)
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
go s.HandleNetConn(nc)
|
||||
}
|
||||
}
|
||||
|
||||
func (rs *RTMPServer) handleConn(c *rtmp.Conn, nc net.Conn) {
|
||||
rs.Logger.Debugw("Incoming connection.", "remote_addr", nc.RemoteAddr(), "url", c.URL.String(), "publishing", c.Publishing)
|
||||
viewerPattern := `^/view/(.+)$`
|
||||
viewerRegexp := regexp.MustCompile(viewerPattern)
|
||||
|
||||
viewerMatch := viewerRegexp.FindStringSubmatch(c.URL.Path)
|
||||
if viewerMatch != nil {
|
||||
name := viewerMatch[1]
|
||||
stream, ok := rs.streams[name]
|
||||
if ok {
|
||||
client := NewRTMPClient(rs, stream, c, nc)
|
||||
stream.AddClient(client)
|
||||
client.handleClient()
|
||||
return
|
||||
}
|
||||
|
||||
// No such stream, disconnect viewer
|
||||
nc.Close()
|
||||
rs.Logger.Infow("Closed connection to non-existing stream.", "requested_stream", name)
|
||||
}
|
||||
|
||||
// Get stream name
|
||||
streamPattern := `^/stream/(.+)$`
|
||||
streamRegexp := regexp.MustCompile(streamPattern)
|
||||
match := streamRegexp.FindStringSubmatch(c.URL.Path)
|
||||
if match != nil {
|
||||
name := match[1]
|
||||
|
||||
_, ok := rs.streams[name]
|
||||
if ok {
|
||||
// Stream already exists, disconnect this
|
||||
nc.Close()
|
||||
rs.Logger.Warnw("Connection attempted to stream to stream-key which is already broadcasting.", "stream_key", name)
|
||||
return
|
||||
}
|
||||
|
||||
streamCtx, cancel := context.WithCancel(rs.ctx)
|
||||
|
||||
stream := &Stream{
|
||||
Name: name,
|
||||
ctx: streamCtx,
|
||||
ctxCancel: cancel,
|
||||
}
|
||||
rs.AddStream(stream)
|
||||
stream.handleBroadcaster(c, nc)
|
||||
}
|
||||
|
||||
// Stream URL is invalid, disconnect
|
||||
nc.Close()
|
||||
}
|
Reference in New Issue
Block a user