Collect stats and send NACKs

This commit is contained in:
Torjus Håkestad 2023-12-04 02:07:18 +01:00
parent da4f319f61
commit c141e725b3
2 changed files with 133 additions and 18 deletions

View File

@ -40,6 +40,7 @@ func NewServer(store *UserStore) *Server {
r.Delete("/whip/{streamKey}", srv.DeleteHandler) r.Delete("/whip/{streamKey}", srv.DeleteHandler)
r.Patch("/whip/{streamKey}", srv.PatchHandler) r.Patch("/whip/{streamKey}", srv.PatchHandler)
r.Post("/whip/{streamKey}", srv.PostOfferHandler) r.Post("/whip/{streamKey}", srv.PostOfferHandler)
r.Get("/stats", srv.streams.StatsHandler)
srv.Handler = r srv.Handler = r

View File

@ -1,15 +1,18 @@
package server package server
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"io" "io"
"log/slog" "log/slog"
"net/http"
"sync" "sync"
"time" "time"
"github.com/pion/interceptor" "github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/intervalpli" "github.com/pion/interceptor/pkg/nack"
"github.com/pion/interceptor/pkg/stats"
"github.com/pion/webrtc/v4" "github.com/pion/webrtc/v4"
) )
@ -17,6 +20,7 @@ var ErrNoSuchStream error = fmt.Errorf("no such stream")
type StreamStore struct { type StreamStore struct {
Streams map[string]*Stream Streams map[string]*Stream
webRTCConfig webrtc.Configuration
mu sync.Mutex mu sync.Mutex
} }
@ -30,14 +34,71 @@ func NewStreamStore() *StreamStore {
type Stream struct { type Stream struct {
peerConnection *webrtc.PeerConnection peerConnection *webrtc.PeerConnection
peerConnectionStats map[string]*stats.Stats
peerConnectionStatsMu sync.Mutex
lastUpdate time.Time lastUpdate time.Time
localTracks []*webrtc.TrackLocalStaticRTP localTracks []*webrtc.TrackLocalStaticRTP
peers []*webrtc.PeerConnection peers []*webrtc.PeerConnection
mu sync.Mutex mu sync.Mutex
} }
func (s *StreamStore) StatsHandler(w http.ResponseWriter, r *http.Request) {
type StreamStats struct {
StreamKey string `json:"streamKey"`
StatsInbound stats.InboundRTPStreamStats `json:"inboudStats"`
}
stats := []StreamStats{}
for streamKey, stream := range s.Streams {
stream.peerConnectionStatsMu.Lock()
for track, stat := range stream.peerConnectionStats {
stats = append(stats, StreamStats{
StreamKey: fmt.Sprintf("%s-%s", streamKey, track),
StatsInbound: stat.InboundRTPStreamStats,
})
}
stream.peerConnectionStatsMu.Unlock()
}
w.Header().Add("Content-Type", "application/json")
enc := json.NewEncoder(w)
if err := enc.Encode(stats); err != nil {
slog.Warn("Error encoding stats: %s", err)
}
}
func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDescription, error) { func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
peerConnectionConfig := webrtc.Configuration{ m := &webrtc.MediaEngine{}
if err := m.RegisterDefaultCodecs(); err != nil {
panic(err)
}
i := &interceptor.Registry{}
if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil {
panic(err)
}
// intervalPliFactory, err := intervalpli.NewReceiverInterceptor()
// if err != nil {
// panic(err)
// }
// i.Add(intervalPliFactory)
// Stats interceptor
//statsInterceptorFactory, err := stats.NewInterceptor()
//if err != nil {
// panic(err)
//}
//var statsGetter stats.Getter
//statsInterceptorFactory.OnNewPeerConnection(func(_ string, g stats.Getter) {
// statsGetter = g
//})
//i.Add(statsInterceptorFactory)
se := webrtc.SettingEngine{}
_ = se.SetEphemeralUDPPortRange(50000, 50050)
webRTCConfig := webrtc.Configuration{
ICEServers: []webrtc.ICEServer{ ICEServers: []webrtc.ICEServer{
{ {
URLs: []string{"stun:stun.l.google.com:19302"}, URLs: []string{"stun:stun.l.google.com:19302"},
@ -45,7 +106,9 @@ func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDesc
}, },
} }
peerConnection, err := webrtc.NewPeerConnection(peerConnectionConfig) api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(se))
peerConnection, err := api.NewPeerConnection(webRTCConfig)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -117,34 +180,65 @@ func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*web
go func() { go func() {
stream := &Stream{ stream := &Stream{
lastUpdate: time.Now(), lastUpdate: time.Now(),
peerConnectionStats: make(map[string]*stats.Stats),
} }
m := &webrtc.MediaEngine{} m := &webrtc.MediaEngine{}
if err := m.RegisterDefaultCodecs(); err != nil { if err := m.RegisterDefaultCodecs(); err != nil {
panic(err) panic(err)
} }
i := &interceptor.Registry{} i := &interceptor.Registry{}
if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil { if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil {
panic(err) panic(err)
} }
intervalPliFactory, err := intervalpli.NewReceiverInterceptor()
// intervalPliFactory, err := intervalpli.NewReceiverInterceptor()
// if err != nil {
// panic(err)
// }
// i.Add(intervalPliFactory)
// Stats interceptor
statsInterceptorFactory, err := stats.NewInterceptor()
if err != nil { if err != nil {
panic(err) panic(err)
} }
i.Add(intervalPliFactory)
peerConnectionConfig := webrtc.Configuration{ var statsGetter stats.Getter
statsInterceptorFactory.OnNewPeerConnection(func(_ string, g stats.Getter) {
statsGetter = g
})
i.Add(statsInterceptorFactory)
// NACK interceptor
maxPerPacket := nack.GeneratorMaxNacksPerPacket(4)
nackSize := nack.GeneratorSize(4096)
nackinterceptorFactory, err := nack.NewGeneratorInterceptor(maxPerPacket, nackSize)
if err != nil {
panic(err)
}
i.Add(nackinterceptorFactory)
s.webRTCConfig = webrtc.Configuration{
ICEServers: []webrtc.ICEServer{ ICEServers: []webrtc.ICEServer{
{ {
URLs: []string{"stun:stun.l.google.com:19302"}, URLs: []string{"stun:stun.l.google.com:19302"},
}, },
}, },
} }
se := webrtc.SettingEngine{} se := webrtc.SettingEngine{}
_ = se.SetEphemeralUDPPortRange(50000, 50050) _ = se.SetEphemeralUDPPortRange(50000, 50050)
// se.BufferFactory = func(packetType packetio.BufferPacketType, ssrc uint32) io.ReadWriteCloser {
// buf := packetio.NewBuffer()
// buf.SetLimitSize(32 * 1000 * 1000)
// return buf
// }
api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(se))
// Create a new RTCPeerConnection // Create a new RTCPeerConnection
peerConnection, err := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(se)).NewPeerConnection(peerConnectionConfig) peerConnection, err := api.NewPeerConnection(s.webRTCConfig)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -161,6 +255,9 @@ func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*web
// Set a handler for when a new remote track starts, this just distributes all our packets // Set a handler for when a new remote track starts, this just distributes all our packets
// to connected peers // to connected peers
peerConnection.OnDataChannel(func(dc *webrtc.DataChannel) {
slog.Info("Got data channel.", "stream_key", streamKey, "label", dc.Label())
})
peerConnection.OnTrack(func(remoteTrack *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { peerConnection.OnTrack(func(remoteTrack *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
// Create a local track, all our SFU clients will be fed via this track // Create a local track, all our SFU clients will be fed via this track
slog.Info("Got track.", "stream_key", streamKey, "type", remoteTrack.Codec().MimeType, "id", remoteTrack.ID()) slog.Info("Got track.", "stream_key", streamKey, "type", remoteTrack.Codec().MimeType, "id", remoteTrack.ID())
@ -168,17 +265,34 @@ func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*web
if newTrackErr != nil { if newTrackErr != nil {
panic(newTrackErr) panic(newTrackErr)
} }
var stop bool
go func() {
for {
if stop {
slog.Info("Stopped collecting stats for track.", "stream_key", streamKey, "type", remoteTrack.Codec().MimeType, "id", remoteTrack.ID())
return
}
stats := statsGetter.Get(uint32(remoteTrack.SSRC()))
stream.peerConnectionStatsMu.Lock()
stream.peerConnectionStats[remoteTrack.Codec().MimeType] = stats
stream.peerConnectionStatsMu.Unlock()
time.Sleep(time.Second * 1)
}
}()
stream.mu.Lock() stream.mu.Lock()
stream.localTracks = append(stream.localTracks, localTrack) stream.localTracks = append(stream.localTracks, localTrack)
stream.mu.Unlock() stream.mu.Unlock()
rtpBuf := make([]byte, 1400) rtpBuf := make([]byte, 1500)
for { for {
i, _, readErr := remoteTrack.Read(rtpBuf) i, _, readErr := remoteTrack.Read(rtpBuf)
if readErr != nil { if readErr != nil {
if errors.Is(readErr, io.EOF) { if errors.Is(readErr, io.EOF) {
slog.Warn("EOF from track.", "id", remoteTrack.ID()) slog.Warn("EOF from track.", "id", remoteTrack.ID())
stop = true
return return
} }
panic(readErr) panic(readErr)