Compare commits
No commits in common. "18c515c8873442a1d04c9dde2661588e882a25a2" and "da4f319f61de6803b48be79fe84e010b4aa3d103" have entirely different histories.
18c515c887
...
da4f319f61
@ -1 +1 @@
|
|||||||
Subproject commit 2bc2fb14f6e38ec470faeefcfbd9815ae6b9e1e9
|
Subproject commit e26f5553007c43d87dd641149e1368e0360827c9
|
@ -40,7 +40,6 @@ func NewServer(store *UserStore) *Server {
|
|||||||
r.Delete("/whip/{streamKey}", srv.DeleteHandler)
|
r.Delete("/whip/{streamKey}", srv.DeleteHandler)
|
||||||
r.Patch("/whip/{streamKey}", srv.PatchHandler)
|
r.Patch("/whip/{streamKey}", srv.PatchHandler)
|
||||||
r.Post("/whip/{streamKey}", srv.PostOfferHandler)
|
r.Post("/whip/{streamKey}", srv.PostOfferHandler)
|
||||||
r.Get("/stats", srv.streams.StatsHandler)
|
|
||||||
|
|
||||||
srv.Handler = r
|
srv.Handler = r
|
||||||
|
|
||||||
|
134
server/stream.go
134
server/stream.go
@ -1,18 +1,15 @@
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/pion/interceptor"
|
"github.com/pion/interceptor"
|
||||||
"github.com/pion/interceptor/pkg/nack"
|
"github.com/pion/interceptor/pkg/intervalpli"
|
||||||
"github.com/pion/interceptor/pkg/stats"
|
|
||||||
"github.com/pion/webrtc/v4"
|
"github.com/pion/webrtc/v4"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -20,7 +17,6 @@ var ErrNoSuchStream error = fmt.Errorf("no such stream")
|
|||||||
|
|
||||||
type StreamStore struct {
|
type StreamStore struct {
|
||||||
Streams map[string]*Stream
|
Streams map[string]*Stream
|
||||||
webRTCConfig webrtc.Configuration
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -34,71 +30,14 @@ func NewStreamStore() *StreamStore {
|
|||||||
|
|
||||||
type Stream struct {
|
type Stream struct {
|
||||||
peerConnection *webrtc.PeerConnection
|
peerConnection *webrtc.PeerConnection
|
||||||
peerConnectionStats map[string]*stats.Stats
|
|
||||||
peerConnectionStatsMu sync.Mutex
|
|
||||||
lastUpdate time.Time
|
lastUpdate time.Time
|
||||||
localTracks []*webrtc.TrackLocalStaticRTP
|
localTracks []*webrtc.TrackLocalStaticRTP
|
||||||
peers []*webrtc.PeerConnection
|
peers []*webrtc.PeerConnection
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StreamStore) StatsHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
type StreamStats struct {
|
|
||||||
StreamKey string `json:"streamKey"`
|
|
||||||
StatsInbound stats.InboundRTPStreamStats `json:"inboudStats"`
|
|
||||||
}
|
|
||||||
stats := []StreamStats{}
|
|
||||||
|
|
||||||
for streamKey, stream := range s.Streams {
|
|
||||||
stream.peerConnectionStatsMu.Lock()
|
|
||||||
for track, stat := range stream.peerConnectionStats {
|
|
||||||
stats = append(stats, StreamStats{
|
|
||||||
StreamKey: fmt.Sprintf("%s-%s", streamKey, track),
|
|
||||||
StatsInbound: stat.InboundRTPStreamStats,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
stream.peerConnectionStatsMu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
w.Header().Add("Content-Type", "application/json")
|
|
||||||
enc := json.NewEncoder(w)
|
|
||||||
if err := enc.Encode(stats); err != nil {
|
|
||||||
slog.Warn("Error encoding stats: %s", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
|
func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
|
||||||
m := &webrtc.MediaEngine{}
|
peerConnectionConfig := webrtc.Configuration{
|
||||||
if err := m.RegisterDefaultCodecs(); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
i := &interceptor.Registry{}
|
|
||||||
if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// intervalPliFactory, err := intervalpli.NewReceiverInterceptor()
|
|
||||||
// if err != nil {
|
|
||||||
// panic(err)
|
|
||||||
// }
|
|
||||||
// i.Add(intervalPliFactory)
|
|
||||||
|
|
||||||
// Stats interceptor
|
|
||||||
//statsInterceptorFactory, err := stats.NewInterceptor()
|
|
||||||
//if err != nil {
|
|
||||||
// panic(err)
|
|
||||||
//}
|
|
||||||
|
|
||||||
//var statsGetter stats.Getter
|
|
||||||
//statsInterceptorFactory.OnNewPeerConnection(func(_ string, g stats.Getter) {
|
|
||||||
// statsGetter = g
|
|
||||||
//})
|
|
||||||
//i.Add(statsInterceptorFactory)
|
|
||||||
|
|
||||||
se := webrtc.SettingEngine{}
|
|
||||||
_ = se.SetEphemeralUDPPortRange(50000, 50050)
|
|
||||||
|
|
||||||
webRTCConfig := webrtc.Configuration{
|
|
||||||
ICEServers: []webrtc.ICEServer{
|
ICEServers: []webrtc.ICEServer{
|
||||||
{
|
{
|
||||||
URLs: []string{"stun:stun.l.google.com:19302"},
|
URLs: []string{"stun:stun.l.google.com:19302"},
|
||||||
@ -106,9 +45,7 @@ func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDesc
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(se))
|
peerConnection, err := webrtc.NewPeerConnection(peerConnectionConfig)
|
||||||
|
|
||||||
peerConnection, err := api.NewPeerConnection(webRTCConfig)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -180,65 +117,34 @@ func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*web
|
|||||||
go func() {
|
go func() {
|
||||||
stream := &Stream{
|
stream := &Stream{
|
||||||
lastUpdate: time.Now(),
|
lastUpdate: time.Now(),
|
||||||
peerConnectionStats: make(map[string]*stats.Stats),
|
|
||||||
}
|
}
|
||||||
m := &webrtc.MediaEngine{}
|
m := &webrtc.MediaEngine{}
|
||||||
if err := m.RegisterDefaultCodecs(); err != nil {
|
if err := m.RegisterDefaultCodecs(); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
i := &interceptor.Registry{}
|
i := &interceptor.Registry{}
|
||||||
if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil {
|
if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
intervalPliFactory, err := intervalpli.NewReceiverInterceptor()
|
||||||
// intervalPliFactory, err := intervalpli.NewReceiverInterceptor()
|
|
||||||
// if err != nil {
|
|
||||||
// panic(err)
|
|
||||||
// }
|
|
||||||
// i.Add(intervalPliFactory)
|
|
||||||
|
|
||||||
// Stats interceptor
|
|
||||||
statsInterceptorFactory, err := stats.NewInterceptor()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
i.Add(intervalPliFactory)
|
||||||
|
|
||||||
var statsGetter stats.Getter
|
peerConnectionConfig := webrtc.Configuration{
|
||||||
statsInterceptorFactory.OnNewPeerConnection(func(_ string, g stats.Getter) {
|
|
||||||
statsGetter = g
|
|
||||||
})
|
|
||||||
i.Add(statsInterceptorFactory)
|
|
||||||
|
|
||||||
// NACK interceptor
|
|
||||||
maxPerPacket := nack.GeneratorMaxNacksPerPacket(4)
|
|
||||||
nackSize := nack.GeneratorSize(4096)
|
|
||||||
nackinterceptorFactory, err := nack.NewGeneratorInterceptor(maxPerPacket, nackSize)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
i.Add(nackinterceptorFactory)
|
|
||||||
|
|
||||||
s.webRTCConfig = webrtc.Configuration{
|
|
||||||
ICEServers: []webrtc.ICEServer{
|
ICEServers: []webrtc.ICEServer{
|
||||||
{
|
{
|
||||||
URLs: []string{"stun:stun.l.google.com:19302"},
|
URLs: []string{"stun:stun.l.google.com:19302"},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
se := webrtc.SettingEngine{}
|
se := webrtc.SettingEngine{}
|
||||||
_ = se.SetEphemeralUDPPortRange(50000, 50050)
|
_ = se.SetEphemeralUDPPortRange(50000, 50050)
|
||||||
|
|
||||||
// se.BufferFactory = func(packetType packetio.BufferPacketType, ssrc uint32) io.ReadWriteCloser {
|
|
||||||
// buf := packetio.NewBuffer()
|
|
||||||
// buf.SetLimitSize(32 * 1000 * 1000)
|
|
||||||
|
|
||||||
// return buf
|
|
||||||
// }
|
|
||||||
|
|
||||||
api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(se))
|
|
||||||
|
|
||||||
// Create a new RTCPeerConnection
|
// Create a new RTCPeerConnection
|
||||||
peerConnection, err := api.NewPeerConnection(s.webRTCConfig)
|
peerConnection, err := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(se)).NewPeerConnection(peerConnectionConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
@ -255,9 +161,6 @@ func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*web
|
|||||||
|
|
||||||
// Set a handler for when a new remote track starts, this just distributes all our packets
|
// Set a handler for when a new remote track starts, this just distributes all our packets
|
||||||
// to connected peers
|
// to connected peers
|
||||||
peerConnection.OnDataChannel(func(dc *webrtc.DataChannel) {
|
|
||||||
slog.Info("Got data channel.", "stream_key", streamKey, "label", dc.Label())
|
|
||||||
})
|
|
||||||
peerConnection.OnTrack(func(remoteTrack *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
|
peerConnection.OnTrack(func(remoteTrack *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
|
||||||
// Create a local track, all our SFU clients will be fed via this track
|
// Create a local track, all our SFU clients will be fed via this track
|
||||||
slog.Info("Got track.", "stream_key", streamKey, "type", remoteTrack.Codec().MimeType, "id", remoteTrack.ID())
|
slog.Info("Got track.", "stream_key", streamKey, "type", remoteTrack.Codec().MimeType, "id", remoteTrack.ID())
|
||||||
@ -265,34 +168,17 @@ func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*web
|
|||||||
if newTrackErr != nil {
|
if newTrackErr != nil {
|
||||||
panic(newTrackErr)
|
panic(newTrackErr)
|
||||||
}
|
}
|
||||||
var stop bool
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
if stop {
|
|
||||||
slog.Info("Stopped collecting stats for track.", "stream_key", streamKey, "type", remoteTrack.Codec().MimeType, "id", remoteTrack.ID())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
stats := statsGetter.Get(uint32(remoteTrack.SSRC()))
|
|
||||||
stream.peerConnectionStatsMu.Lock()
|
|
||||||
stream.peerConnectionStats[remoteTrack.Codec().MimeType] = stats
|
|
||||||
stream.peerConnectionStatsMu.Unlock()
|
|
||||||
|
|
||||||
time.Sleep(time.Second * 1)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
stream.mu.Lock()
|
stream.mu.Lock()
|
||||||
stream.localTracks = append(stream.localTracks, localTrack)
|
stream.localTracks = append(stream.localTracks, localTrack)
|
||||||
stream.mu.Unlock()
|
stream.mu.Unlock()
|
||||||
|
|
||||||
rtpBuf := make([]byte, 1500)
|
rtpBuf := make([]byte, 1400)
|
||||||
for {
|
for {
|
||||||
i, _, readErr := remoteTrack.Read(rtpBuf)
|
i, _, readErr := remoteTrack.Read(rtpBuf)
|
||||||
if readErr != nil {
|
if readErr != nil {
|
||||||
if errors.Is(readErr, io.EOF) {
|
if errors.Is(readErr, io.EOF) {
|
||||||
slog.Warn("EOF from track.", "id", remoteTrack.ID())
|
slog.Warn("EOF from track.", "id", remoteTrack.ID())
|
||||||
stop = true
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
panic(readErr)
|
panic(readErr)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user