Compare commits
	
		
			2 Commits
		
	
	
		
			da4f319f61
			...
			18c515c887
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 18c515c887 | |||
| c141e725b3 | 
 Submodule ministream-frontend updated: e26f555300...2bc2fb14f6
									
								
							@@ -40,6 +40,7 @@ func NewServer(store *UserStore) *Server {
 | 
				
			|||||||
	r.Delete("/whip/{streamKey}", srv.DeleteHandler)
 | 
						r.Delete("/whip/{streamKey}", srv.DeleteHandler)
 | 
				
			||||||
	r.Patch("/whip/{streamKey}", srv.PatchHandler)
 | 
						r.Patch("/whip/{streamKey}", srv.PatchHandler)
 | 
				
			||||||
	r.Post("/whip/{streamKey}", srv.PostOfferHandler)
 | 
						r.Post("/whip/{streamKey}", srv.PostOfferHandler)
 | 
				
			||||||
 | 
						r.Get("/stats", srv.streams.StatsHandler)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	srv.Handler = r
 | 
						srv.Handler = r
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										134
									
								
								server/stream.go
									
									
									
									
									
								
							
							
						
						
									
										134
									
								
								server/stream.go
									
									
									
									
									
								
							@@ -1,15 +1,18 @@
 | 
				
			|||||||
package server
 | 
					package server
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
 | 
						"encoding/json"
 | 
				
			||||||
	"errors"
 | 
						"errors"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"io"
 | 
						"io"
 | 
				
			||||||
	"log/slog"
 | 
						"log/slog"
 | 
				
			||||||
 | 
						"net/http"
 | 
				
			||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/pion/interceptor"
 | 
						"github.com/pion/interceptor"
 | 
				
			||||||
	"github.com/pion/interceptor/pkg/intervalpli"
 | 
						"github.com/pion/interceptor/pkg/nack"
 | 
				
			||||||
 | 
						"github.com/pion/interceptor/pkg/stats"
 | 
				
			||||||
	"github.com/pion/webrtc/v4"
 | 
						"github.com/pion/webrtc/v4"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -17,6 +20,7 @@ var ErrNoSuchStream error = fmt.Errorf("no such stream")
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
type StreamStore struct {
 | 
					type StreamStore struct {
 | 
				
			||||||
	Streams      map[string]*Stream
 | 
						Streams      map[string]*Stream
 | 
				
			||||||
 | 
						webRTCConfig webrtc.Configuration
 | 
				
			||||||
	mu           sync.Mutex
 | 
						mu           sync.Mutex
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -30,14 +34,71 @@ func NewStreamStore() *StreamStore {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
type Stream struct {
 | 
					type Stream struct {
 | 
				
			||||||
	peerConnection        *webrtc.PeerConnection
 | 
						peerConnection        *webrtc.PeerConnection
 | 
				
			||||||
 | 
						peerConnectionStats   map[string]*stats.Stats
 | 
				
			||||||
 | 
						peerConnectionStatsMu sync.Mutex
 | 
				
			||||||
	lastUpdate            time.Time
 | 
						lastUpdate            time.Time
 | 
				
			||||||
	localTracks           []*webrtc.TrackLocalStaticRTP
 | 
						localTracks           []*webrtc.TrackLocalStaticRTP
 | 
				
			||||||
	peers                 []*webrtc.PeerConnection
 | 
						peers                 []*webrtc.PeerConnection
 | 
				
			||||||
	mu                    sync.Mutex
 | 
						mu                    sync.Mutex
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (s *StreamStore) StatsHandler(w http.ResponseWriter, r *http.Request) {
 | 
				
			||||||
 | 
						type StreamStats struct {
 | 
				
			||||||
 | 
							StreamKey    string                      `json:"streamKey"`
 | 
				
			||||||
 | 
							StatsInbound stats.InboundRTPStreamStats `json:"inboudStats"`
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						stats := []StreamStats{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for streamKey, stream := range s.Streams {
 | 
				
			||||||
 | 
							stream.peerConnectionStatsMu.Lock()
 | 
				
			||||||
 | 
							for track, stat := range stream.peerConnectionStats {
 | 
				
			||||||
 | 
								stats = append(stats, StreamStats{
 | 
				
			||||||
 | 
									StreamKey:    fmt.Sprintf("%s-%s", streamKey, track),
 | 
				
			||||||
 | 
									StatsInbound: stat.InboundRTPStreamStats,
 | 
				
			||||||
 | 
								})
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							stream.peerConnectionStatsMu.Unlock()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						w.Header().Add("Content-Type", "application/json")
 | 
				
			||||||
 | 
						enc := json.NewEncoder(w)
 | 
				
			||||||
 | 
						if err := enc.Encode(stats); err != nil {
 | 
				
			||||||
 | 
							slog.Warn("Error encoding stats: %s", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
 | 
					func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
 | 
				
			||||||
	peerConnectionConfig := webrtc.Configuration{
 | 
						m := &webrtc.MediaEngine{}
 | 
				
			||||||
 | 
						if err := m.RegisterDefaultCodecs(); err != nil {
 | 
				
			||||||
 | 
							panic(err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						i := &interceptor.Registry{}
 | 
				
			||||||
 | 
						if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil {
 | 
				
			||||||
 | 
							panic(err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// intervalPliFactory, err := intervalpli.NewReceiverInterceptor()
 | 
				
			||||||
 | 
						// if err != nil {
 | 
				
			||||||
 | 
						// 	panic(err)
 | 
				
			||||||
 | 
						// }
 | 
				
			||||||
 | 
						// i.Add(intervalPliFactory)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Stats interceptor
 | 
				
			||||||
 | 
						//statsInterceptorFactory, err := stats.NewInterceptor()
 | 
				
			||||||
 | 
						//if err != nil {
 | 
				
			||||||
 | 
						//	panic(err)
 | 
				
			||||||
 | 
						//}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						//var statsGetter stats.Getter
 | 
				
			||||||
 | 
						//statsInterceptorFactory.OnNewPeerConnection(func(_ string, g stats.Getter) {
 | 
				
			||||||
 | 
						//	statsGetter = g
 | 
				
			||||||
 | 
						//})
 | 
				
			||||||
 | 
						//i.Add(statsInterceptorFactory)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						se := webrtc.SettingEngine{}
 | 
				
			||||||
 | 
						_ = se.SetEphemeralUDPPortRange(50000, 50050)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						webRTCConfig := webrtc.Configuration{
 | 
				
			||||||
		ICEServers: []webrtc.ICEServer{
 | 
							ICEServers: []webrtc.ICEServer{
 | 
				
			||||||
			{
 | 
								{
 | 
				
			||||||
				URLs: []string{"stun:stun.l.google.com:19302"},
 | 
									URLs: []string{"stun:stun.l.google.com:19302"},
 | 
				
			||||||
@@ -45,7 +106,9 @@ func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDesc
 | 
				
			|||||||
		},
 | 
							},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	peerConnection, err := webrtc.NewPeerConnection(peerConnectionConfig)
 | 
						api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(se))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						peerConnection, err := api.NewPeerConnection(webRTCConfig)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -117,34 +180,65 @@ func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*web
 | 
				
			|||||||
	go func() {
 | 
						go func() {
 | 
				
			||||||
		stream := &Stream{
 | 
							stream := &Stream{
 | 
				
			||||||
			lastUpdate:          time.Now(),
 | 
								lastUpdate:          time.Now(),
 | 
				
			||||||
 | 
								peerConnectionStats: make(map[string]*stats.Stats),
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		m := &webrtc.MediaEngine{}
 | 
							m := &webrtc.MediaEngine{}
 | 
				
			||||||
		if err := m.RegisterDefaultCodecs(); err != nil {
 | 
							if err := m.RegisterDefaultCodecs(); err != nil {
 | 
				
			||||||
			panic(err)
 | 
								panic(err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					 | 
				
			||||||
		i := &interceptor.Registry{}
 | 
							i := &interceptor.Registry{}
 | 
				
			||||||
		if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil {
 | 
							if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil {
 | 
				
			||||||
			panic(err)
 | 
								panic(err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		intervalPliFactory, err := intervalpli.NewReceiverInterceptor()
 | 
					
 | 
				
			||||||
 | 
							// intervalPliFactory, err := intervalpli.NewReceiverInterceptor()
 | 
				
			||||||
 | 
							// if err != nil {
 | 
				
			||||||
 | 
							// 	panic(err)
 | 
				
			||||||
 | 
							// }
 | 
				
			||||||
 | 
							// i.Add(intervalPliFactory)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// Stats interceptor
 | 
				
			||||||
 | 
							statsInterceptorFactory, err := stats.NewInterceptor()
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			panic(err)
 | 
								panic(err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		i.Add(intervalPliFactory)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
		peerConnectionConfig := webrtc.Configuration{
 | 
							var statsGetter stats.Getter
 | 
				
			||||||
 | 
							statsInterceptorFactory.OnNewPeerConnection(func(_ string, g stats.Getter) {
 | 
				
			||||||
 | 
								statsGetter = g
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
							i.Add(statsInterceptorFactory)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// NACK interceptor
 | 
				
			||||||
 | 
							maxPerPacket := nack.GeneratorMaxNacksPerPacket(4)
 | 
				
			||||||
 | 
							nackSize := nack.GeneratorSize(4096)
 | 
				
			||||||
 | 
							nackinterceptorFactory, err := nack.NewGeneratorInterceptor(maxPerPacket, nackSize)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								panic(err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							i.Add(nackinterceptorFactory)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							s.webRTCConfig = webrtc.Configuration{
 | 
				
			||||||
			ICEServers: []webrtc.ICEServer{
 | 
								ICEServers: []webrtc.ICEServer{
 | 
				
			||||||
				{
 | 
									{
 | 
				
			||||||
					URLs: []string{"stun:stun.l.google.com:19302"},
 | 
										URLs: []string{"stun:stun.l.google.com:19302"},
 | 
				
			||||||
				},
 | 
									},
 | 
				
			||||||
			},
 | 
								},
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					 | 
				
			||||||
		se := webrtc.SettingEngine{}
 | 
							se := webrtc.SettingEngine{}
 | 
				
			||||||
		_ = se.SetEphemeralUDPPortRange(50000, 50050)
 | 
							_ = se.SetEphemeralUDPPortRange(50000, 50050)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// se.BufferFactory = func(packetType packetio.BufferPacketType, ssrc uint32) io.ReadWriteCloser {
 | 
				
			||||||
 | 
							// 	buf := packetio.NewBuffer()
 | 
				
			||||||
 | 
							// 	buf.SetLimitSize(32 * 1000 * 1000)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// 	return buf
 | 
				
			||||||
 | 
							// }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(se))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		// Create a new RTCPeerConnection
 | 
							// Create a new RTCPeerConnection
 | 
				
			||||||
		peerConnection, err := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(se)).NewPeerConnection(peerConnectionConfig)
 | 
							peerConnection, err := api.NewPeerConnection(s.webRTCConfig)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			panic(err)
 | 
								panic(err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
@@ -161,6 +255,9 @@ func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*web
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
		// Set a handler for when a new remote track starts, this just distributes all our packets
 | 
							// Set a handler for when a new remote track starts, this just distributes all our packets
 | 
				
			||||||
		// to connected peers
 | 
							// to connected peers
 | 
				
			||||||
 | 
							peerConnection.OnDataChannel(func(dc *webrtc.DataChannel) {
 | 
				
			||||||
 | 
								slog.Info("Got data channel.", "stream_key", streamKey, "label", dc.Label())
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
		peerConnection.OnTrack(func(remoteTrack *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
 | 
							peerConnection.OnTrack(func(remoteTrack *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
 | 
				
			||||||
			// Create a local track, all our SFU clients will be fed via this track
 | 
								// Create a local track, all our SFU clients will be fed via this track
 | 
				
			||||||
			slog.Info("Got track.", "stream_key", streamKey, "type", remoteTrack.Codec().MimeType, "id", remoteTrack.ID())
 | 
								slog.Info("Got track.", "stream_key", streamKey, "type", remoteTrack.Codec().MimeType, "id", remoteTrack.ID())
 | 
				
			||||||
@@ -168,17 +265,34 @@ func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*web
 | 
				
			|||||||
			if newTrackErr != nil {
 | 
								if newTrackErr != nil {
 | 
				
			||||||
				panic(newTrackErr)
 | 
									panic(newTrackErr)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
								var stop bool
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								go func() {
 | 
				
			||||||
 | 
									for {
 | 
				
			||||||
 | 
										if stop {
 | 
				
			||||||
 | 
											slog.Info("Stopped collecting stats for track.", "stream_key", streamKey, "type", remoteTrack.Codec().MimeType, "id", remoteTrack.ID())
 | 
				
			||||||
 | 
											return
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
										stats := statsGetter.Get(uint32(remoteTrack.SSRC()))
 | 
				
			||||||
 | 
										stream.peerConnectionStatsMu.Lock()
 | 
				
			||||||
 | 
										stream.peerConnectionStats[remoteTrack.Codec().MimeType] = stats
 | 
				
			||||||
 | 
										stream.peerConnectionStatsMu.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
										time.Sleep(time.Second * 1)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			stream.mu.Lock()
 | 
								stream.mu.Lock()
 | 
				
			||||||
			stream.localTracks = append(stream.localTracks, localTrack)
 | 
								stream.localTracks = append(stream.localTracks, localTrack)
 | 
				
			||||||
			stream.mu.Unlock()
 | 
								stream.mu.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			rtpBuf := make([]byte, 1400)
 | 
								rtpBuf := make([]byte, 1500)
 | 
				
			||||||
			for {
 | 
								for {
 | 
				
			||||||
				i, _, readErr := remoteTrack.Read(rtpBuf)
 | 
									i, _, readErr := remoteTrack.Read(rtpBuf)
 | 
				
			||||||
				if readErr != nil {
 | 
									if readErr != nil {
 | 
				
			||||||
					if errors.Is(readErr, io.EOF) {
 | 
										if errors.Is(readErr, io.EOF) {
 | 
				
			||||||
						slog.Warn("EOF from track.", "id", remoteTrack.ID())
 | 
											slog.Warn("EOF from track.", "id", remoteTrack.ID())
 | 
				
			||||||
 | 
											stop = true
 | 
				
			||||||
						return
 | 
											return
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
					panic(readErr)
 | 
										panic(readErr)
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user