From c141e725b314d397b4e057ddd0085e7b969a37d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torjus=20H=C3=A5kestad?= Date: Mon, 4 Dec 2023 02:07:18 +0100 Subject: [PATCH] Collect stats and send NACKs --- server/server.go | 1 + server/stream.go | 150 +++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 133 insertions(+), 18 deletions(-) diff --git a/server/server.go b/server/server.go index 7ce2f9f..bb8e588 100644 --- a/server/server.go +++ b/server/server.go @@ -40,6 +40,7 @@ func NewServer(store *UserStore) *Server { r.Delete("/whip/{streamKey}", srv.DeleteHandler) r.Patch("/whip/{streamKey}", srv.PatchHandler) r.Post("/whip/{streamKey}", srv.PostOfferHandler) + r.Get("/stats", srv.streams.StatsHandler) srv.Handler = r diff --git a/server/stream.go b/server/stream.go index bed1ada..5a2957a 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1,23 +1,27 @@ package server import ( + "encoding/json" "errors" "fmt" "io" "log/slog" + "net/http" "sync" "time" "github.com/pion/interceptor" - "github.com/pion/interceptor/pkg/intervalpli" + "github.com/pion/interceptor/pkg/nack" + "github.com/pion/interceptor/pkg/stats" "github.com/pion/webrtc/v4" ) var ErrNoSuchStream error = fmt.Errorf("no such stream") type StreamStore struct { - Streams map[string]*Stream - mu sync.Mutex + Streams map[string]*Stream + webRTCConfig webrtc.Configuration + mu sync.Mutex } func NewStreamStore() *StreamStore { @@ -29,15 +33,72 @@ func NewStreamStore() *StreamStore { } type Stream struct { - peerConnection *webrtc.PeerConnection - lastUpdate time.Time - localTracks []*webrtc.TrackLocalStaticRTP - peers []*webrtc.PeerConnection - mu sync.Mutex + peerConnection *webrtc.PeerConnection + peerConnectionStats map[string]*stats.Stats + peerConnectionStatsMu sync.Mutex + lastUpdate time.Time + localTracks []*webrtc.TrackLocalStaticRTP + peers []*webrtc.PeerConnection + mu sync.Mutex +} + +func (s *StreamStore) StatsHandler(w http.ResponseWriter, r *http.Request) { + type StreamStats struct { + StreamKey string `json:"streamKey"` + StatsInbound stats.InboundRTPStreamStats `json:"inboudStats"` + } + stats := []StreamStats{} + + for streamKey, stream := range s.Streams { + stream.peerConnectionStatsMu.Lock() + for track, stat := range stream.peerConnectionStats { + stats = append(stats, StreamStats{ + StreamKey: fmt.Sprintf("%s-%s", streamKey, track), + StatsInbound: stat.InboundRTPStreamStats, + }) + } + stream.peerConnectionStatsMu.Unlock() + } + + w.Header().Add("Content-Type", "application/json") + enc := json.NewEncoder(w) + if err := enc.Encode(stats); err != nil { + slog.Warn("Error encoding stats: %s", err) + } } func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDescription, error) { - peerConnectionConfig := webrtc.Configuration{ + m := &webrtc.MediaEngine{} + if err := m.RegisterDefaultCodecs(); err != nil { + panic(err) + } + i := &interceptor.Registry{} + if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil { + panic(err) + } + + // intervalPliFactory, err := intervalpli.NewReceiverInterceptor() + // if err != nil { + // panic(err) + // } + // i.Add(intervalPliFactory) + + // Stats interceptor + //statsInterceptorFactory, err := stats.NewInterceptor() + //if err != nil { + // panic(err) + //} + + //var statsGetter stats.Getter + //statsInterceptorFactory.OnNewPeerConnection(func(_ string, g stats.Getter) { + // statsGetter = g + //}) + //i.Add(statsInterceptorFactory) + + se := webrtc.SettingEngine{} + _ = se.SetEphemeralUDPPortRange(50000, 50050) + + webRTCConfig := webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ { URLs: []string{"stun:stun.l.google.com:19302"}, @@ -45,7 +106,9 @@ func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDesc }, } - peerConnection, err := webrtc.NewPeerConnection(peerConnectionConfig) + api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(se)) + + peerConnection, err := api.NewPeerConnection(webRTCConfig) if err != nil { return nil, err } @@ -116,35 +179,66 @@ func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*web go func() { stream := &Stream{ - lastUpdate: time.Now(), + lastUpdate: time.Now(), + peerConnectionStats: make(map[string]*stats.Stats), } m := &webrtc.MediaEngine{} if err := m.RegisterDefaultCodecs(); err != nil { panic(err) } - i := &interceptor.Registry{} if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil { panic(err) } - intervalPliFactory, err := intervalpli.NewReceiverInterceptor() + + // intervalPliFactory, err := intervalpli.NewReceiverInterceptor() + // if err != nil { + // panic(err) + // } + // i.Add(intervalPliFactory) + + // Stats interceptor + statsInterceptorFactory, err := stats.NewInterceptor() if err != nil { panic(err) } - i.Add(intervalPliFactory) - peerConnectionConfig := webrtc.Configuration{ + var statsGetter stats.Getter + statsInterceptorFactory.OnNewPeerConnection(func(_ string, g stats.Getter) { + statsGetter = g + }) + i.Add(statsInterceptorFactory) + + // NACK interceptor + maxPerPacket := nack.GeneratorMaxNacksPerPacket(4) + nackSize := nack.GeneratorSize(4096) + nackinterceptorFactory, err := nack.NewGeneratorInterceptor(maxPerPacket, nackSize) + if err != nil { + panic(err) + } + i.Add(nackinterceptorFactory) + + s.webRTCConfig = webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ { URLs: []string{"stun:stun.l.google.com:19302"}, }, }, } - se := webrtc.SettingEngine{} _ = se.SetEphemeralUDPPortRange(50000, 50050) + + // se.BufferFactory = func(packetType packetio.BufferPacketType, ssrc uint32) io.ReadWriteCloser { + // buf := packetio.NewBuffer() + // buf.SetLimitSize(32 * 1000 * 1000) + + // return buf + // } + + api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(se)) + // Create a new RTCPeerConnection - peerConnection, err := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(se)).NewPeerConnection(peerConnectionConfig) + peerConnection, err := api.NewPeerConnection(s.webRTCConfig) if err != nil { panic(err) } @@ -161,6 +255,9 @@ func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*web // Set a handler for when a new remote track starts, this just distributes all our packets // to connected peers + peerConnection.OnDataChannel(func(dc *webrtc.DataChannel) { + slog.Info("Got data channel.", "stream_key", streamKey, "label", dc.Label()) + }) peerConnection.OnTrack(func(remoteTrack *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { // Create a local track, all our SFU clients will be fed via this track slog.Info("Got track.", "stream_key", streamKey, "type", remoteTrack.Codec().MimeType, "id", remoteTrack.ID()) @@ -168,17 +265,34 @@ func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*web if newTrackErr != nil { panic(newTrackErr) } + var stop bool + + go func() { + for { + if stop { + slog.Info("Stopped collecting stats for track.", "stream_key", streamKey, "type", remoteTrack.Codec().MimeType, "id", remoteTrack.ID()) + return + } + stats := statsGetter.Get(uint32(remoteTrack.SSRC())) + stream.peerConnectionStatsMu.Lock() + stream.peerConnectionStats[remoteTrack.Codec().MimeType] = stats + stream.peerConnectionStatsMu.Unlock() + + time.Sleep(time.Second * 1) + } + }() stream.mu.Lock() stream.localTracks = append(stream.localTracks, localTrack) stream.mu.Unlock() - rtpBuf := make([]byte, 1400) + rtpBuf := make([]byte, 1500) for { i, _, readErr := remoteTrack.Read(rtpBuf) if readErr != nil { if errors.Is(readErr, io.EOF) { slog.Warn("EOF from track.", "id", remoteTrack.ID()) + stop = true return } panic(readErr)