2023-11-30 18:32:38 +00:00
|
|
|
package server
|
|
|
|
|
|
|
|
import (
|
2023-12-04 01:07:18 +00:00
|
|
|
"encoding/json"
|
2023-11-30 18:32:38 +00:00
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"log/slog"
|
2023-12-04 01:07:18 +00:00
|
|
|
"net/http"
|
2023-11-30 18:32:38 +00:00
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
2023-12-05 23:08:14 +00:00
|
|
|
"github.com/google/uuid"
|
2023-11-30 18:32:38 +00:00
|
|
|
"github.com/pion/interceptor"
|
2023-12-04 01:07:18 +00:00
|
|
|
"github.com/pion/interceptor/pkg/nack"
|
|
|
|
"github.com/pion/interceptor/pkg/stats"
|
2023-11-30 18:32:38 +00:00
|
|
|
"github.com/pion/webrtc/v4"
|
|
|
|
)
|
|
|
|
|
|
|
|
var ErrNoSuchStream error = fmt.Errorf("no such stream")
|
|
|
|
|
|
|
|
type StreamStore struct {
|
2023-12-05 00:25:17 +00:00
|
|
|
Streams map[string]*Stream
|
|
|
|
|
|
|
|
config *Config
|
2023-12-04 01:07:18 +00:00
|
|
|
webRTCConfig webrtc.Configuration
|
|
|
|
mu sync.Mutex
|
2023-11-30 18:32:38 +00:00
|
|
|
}
|
|
|
|
|
2023-12-05 00:25:17 +00:00
|
|
|
func NewStreamStore(config *Config) *StreamStore {
|
2023-11-30 18:32:38 +00:00
|
|
|
s := &StreamStore{
|
|
|
|
Streams: make(map[string]*Stream),
|
2023-12-05 00:25:17 +00:00
|
|
|
config: config,
|
2023-11-30 18:32:38 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return s
|
|
|
|
}
|
|
|
|
|
|
|
|
type Stream struct {
|
2023-12-05 00:25:17 +00:00
|
|
|
store *StreamStore
|
2023-12-04 01:07:18 +00:00
|
|
|
peerConnection *webrtc.PeerConnection
|
|
|
|
peerConnectionStats map[string]*stats.Stats
|
|
|
|
peerConnectionStatsMu sync.Mutex
|
|
|
|
lastUpdate time.Time
|
|
|
|
localTracks []*webrtc.TrackLocalStaticRTP
|
2023-12-05 23:08:14 +00:00
|
|
|
viewers map[string]*webrtc.PeerConnection
|
2023-12-04 01:07:18 +00:00
|
|
|
mu sync.Mutex
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *StreamStore) StatsHandler(w http.ResponseWriter, r *http.Request) {
|
|
|
|
type StreamStats struct {
|
|
|
|
StreamKey string `json:"streamKey"`
|
|
|
|
StatsInbound stats.InboundRTPStreamStats `json:"inboudStats"`
|
|
|
|
}
|
|
|
|
stats := []StreamStats{}
|
|
|
|
|
|
|
|
for streamKey, stream := range s.Streams {
|
|
|
|
stream.peerConnectionStatsMu.Lock()
|
|
|
|
for track, stat := range stream.peerConnectionStats {
|
|
|
|
stats = append(stats, StreamStats{
|
|
|
|
StreamKey: fmt.Sprintf("%s-%s", streamKey, track),
|
|
|
|
StatsInbound: stat.InboundRTPStreamStats,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
stream.peerConnectionStatsMu.Unlock()
|
|
|
|
}
|
|
|
|
|
|
|
|
w.Header().Add("Content-Type", "application/json")
|
|
|
|
enc := json.NewEncoder(w)
|
|
|
|
if err := enc.Encode(stats); err != nil {
|
|
|
|
slog.Warn("Error encoding stats: %s", err)
|
|
|
|
}
|
2023-11-30 18:32:38 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
|
2023-12-05 23:08:14 +00:00
|
|
|
viewerID := uuid.New().String()
|
2023-12-04 01:07:18 +00:00
|
|
|
m := &webrtc.MediaEngine{}
|
|
|
|
if err := m.RegisterDefaultCodecs(); err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
i := &interceptor.Registry{}
|
|
|
|
if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// intervalPliFactory, err := intervalpli.NewReceiverInterceptor()
|
|
|
|
// if err != nil {
|
|
|
|
// panic(err)
|
|
|
|
// }
|
|
|
|
// i.Add(intervalPliFactory)
|
|
|
|
|
|
|
|
// Stats interceptor
|
|
|
|
//statsInterceptorFactory, err := stats.NewInterceptor()
|
|
|
|
//if err != nil {
|
|
|
|
// panic(err)
|
|
|
|
//}
|
|
|
|
|
|
|
|
//var statsGetter stats.Getter
|
|
|
|
//statsInterceptorFactory.OnNewPeerConnection(func(_ string, g stats.Getter) {
|
|
|
|
// statsGetter = g
|
|
|
|
//})
|
|
|
|
//i.Add(statsInterceptorFactory)
|
|
|
|
|
|
|
|
se := webrtc.SettingEngine{}
|
2023-12-05 00:25:17 +00:00
|
|
|
_ = se.SetEphemeralUDPPortRange(
|
|
|
|
uint16(s.store.config.WebRTC.UDPMin),
|
|
|
|
uint16(s.store.config.WebRTC.UDPMax),
|
|
|
|
)
|
2023-12-04 01:07:18 +00:00
|
|
|
|
|
|
|
webRTCConfig := webrtc.Configuration{
|
2023-11-30 18:32:38 +00:00
|
|
|
ICEServers: []webrtc.ICEServer{
|
|
|
|
{
|
|
|
|
URLs: []string{"stun:stun.l.google.com:19302"},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
2023-12-04 01:07:18 +00:00
|
|
|
api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(se))
|
|
|
|
|
|
|
|
peerConnection, err := api.NewPeerConnection(webRTCConfig)
|
2023-11-30 18:32:38 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, ltrack := range s.localTracks {
|
|
|
|
rtpSender, err := peerConnection.AddTrack(ltrack)
|
|
|
|
if err != nil {
|
|
|
|
// TODO, stop peerconn
|
2023-12-05 23:08:14 +00:00
|
|
|
peerConnection.Close()
|
|
|
|
s.mu.Lock()
|
|
|
|
delete(s.viewers, viewerID)
|
|
|
|
s.mu.Unlock()
|
2023-11-30 18:32:38 +00:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
go func() {
|
|
|
|
rtcpBuf := make([]byte, 1500)
|
|
|
|
for {
|
|
|
|
if _, _, err := rtpSender.Read(rtcpBuf); err != nil {
|
|
|
|
peerConnection.Close()
|
2023-12-05 23:08:14 +00:00
|
|
|
s.mu.Lock()
|
|
|
|
delete(s.viewers, viewerID)
|
|
|
|
s.mu.Unlock()
|
2023-11-30 18:32:38 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
|
|
|
|
err = peerConnection.SetRemoteDescription(*sd)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
answer, err := peerConnection.CreateAnswer(nil)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2023-11-30 22:56:33 +00:00
|
|
|
|
|
|
|
gatherComplete := make(chan struct{})
|
|
|
|
peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) {
|
|
|
|
if i == nil {
|
|
|
|
gatherComplete <- struct{}{}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
slog.Info("Got ICE Candidate for listener.",
|
|
|
|
"addr", i.Address,
|
|
|
|
"port", i.Port,
|
|
|
|
"related_addr", i.RelatedAddress,
|
|
|
|
"related_port", i.RelatedPort)
|
|
|
|
if err := peerConnection.AddICECandidate(i.ToJSON()); err != nil {
|
|
|
|
slog.Info("Error adding ICE Candidate.", "error", err)
|
|
|
|
}
|
|
|
|
})
|
2023-11-30 18:32:38 +00:00
|
|
|
|
|
|
|
// Sets the LocalDescription, and starts our UDP listeners
|
|
|
|
err = peerConnection.SetLocalDescription(answer)
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Block until ICE Gathering is complete, disabling trickle ICE
|
|
|
|
// we do this because we only can exchange one signaling message
|
|
|
|
// in a production application you should exchange ICE Candidates via OnICECandidate
|
|
|
|
<-gatherComplete
|
|
|
|
s.mu.Lock()
|
2023-12-05 23:08:14 +00:00
|
|
|
s.viewers[viewerID] = peerConnection
|
2023-11-30 18:32:38 +00:00
|
|
|
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
|
|
// Get the LocalDescription and take it to base64 so we can paste in browser
|
|
|
|
return peerConnection.LocalDescription(), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
|
|
|
|
answerChan := make(chan *webrtc.SessionDescription)
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
stream := &Stream{
|
2023-12-05 00:25:17 +00:00
|
|
|
store: s,
|
2023-12-04 01:07:18 +00:00
|
|
|
lastUpdate: time.Now(),
|
|
|
|
peerConnectionStats: make(map[string]*stats.Stats),
|
2023-12-05 23:08:14 +00:00
|
|
|
viewers: make(map[string]*webrtc.PeerConnection),
|
2023-11-30 18:32:38 +00:00
|
|
|
}
|
|
|
|
m := &webrtc.MediaEngine{}
|
|
|
|
if err := m.RegisterDefaultCodecs(); err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
i := &interceptor.Registry{}
|
|
|
|
if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
2023-12-04 01:07:18 +00:00
|
|
|
|
|
|
|
// intervalPliFactory, err := intervalpli.NewReceiverInterceptor()
|
|
|
|
// if err != nil {
|
|
|
|
// panic(err)
|
|
|
|
// }
|
|
|
|
// i.Add(intervalPliFactory)
|
|
|
|
|
|
|
|
// Stats interceptor
|
|
|
|
statsInterceptorFactory, err := stats.NewInterceptor()
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
var statsGetter stats.Getter
|
|
|
|
statsInterceptorFactory.OnNewPeerConnection(func(_ string, g stats.Getter) {
|
|
|
|
statsGetter = g
|
|
|
|
})
|
|
|
|
i.Add(statsInterceptorFactory)
|
|
|
|
|
|
|
|
// NACK interceptor
|
|
|
|
maxPerPacket := nack.GeneratorMaxNacksPerPacket(4)
|
|
|
|
nackSize := nack.GeneratorSize(4096)
|
|
|
|
nackinterceptorFactory, err := nack.NewGeneratorInterceptor(maxPerPacket, nackSize)
|
2023-11-30 18:32:38 +00:00
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
2023-12-04 01:07:18 +00:00
|
|
|
i.Add(nackinterceptorFactory)
|
2023-11-30 18:32:38 +00:00
|
|
|
|
2023-12-04 01:07:18 +00:00
|
|
|
s.webRTCConfig = webrtc.Configuration{
|
2023-11-30 18:32:38 +00:00
|
|
|
ICEServers: []webrtc.ICEServer{
|
|
|
|
{
|
|
|
|
URLs: []string{"stun:stun.l.google.com:19302"},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}
|
2023-11-30 22:56:33 +00:00
|
|
|
se := webrtc.SettingEngine{}
|
2023-12-05 00:25:17 +00:00
|
|
|
_ = se.SetEphemeralUDPPortRange(uint16(s.config.WebRTC.UDPMin), uint16(s.config.WebRTC.UDPMax))
|
2023-12-04 01:07:18 +00:00
|
|
|
|
|
|
|
// se.BufferFactory = func(packetType packetio.BufferPacketType, ssrc uint32) io.ReadWriteCloser {
|
|
|
|
// buf := packetio.NewBuffer()
|
|
|
|
// buf.SetLimitSize(32 * 1000 * 1000)
|
|
|
|
|
|
|
|
// return buf
|
|
|
|
// }
|
|
|
|
|
|
|
|
api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(se))
|
|
|
|
|
2023-11-30 18:32:38 +00:00
|
|
|
// Create a new RTCPeerConnection
|
2023-12-04 01:07:18 +00:00
|
|
|
peerConnection, err := api.NewPeerConnection(s.webRTCConfig)
|
2023-11-30 18:32:38 +00:00
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
stream.peerConnection = peerConnection
|
|
|
|
|
|
|
|
// Allow us to receive 1 video track
|
|
|
|
if _, err := peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
if _, err := peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Set a handler for when a new remote track starts, this just distributes all our packets
|
|
|
|
// to connected peers
|
2023-12-04 01:07:18 +00:00
|
|
|
peerConnection.OnDataChannel(func(dc *webrtc.DataChannel) {
|
|
|
|
slog.Info("Got data channel.", "stream_key", streamKey, "label", dc.Label())
|
|
|
|
})
|
2023-11-30 18:32:38 +00:00
|
|
|
peerConnection.OnTrack(func(remoteTrack *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
|
|
|
|
// Create a local track, all our SFU clients will be fed via this track
|
2023-11-30 22:56:33 +00:00
|
|
|
slog.Info("Got track.", "stream_key", streamKey, "type", remoteTrack.Codec().MimeType, "id", remoteTrack.ID())
|
2023-11-30 18:32:38 +00:00
|
|
|
localTrack, newTrackErr := webrtc.NewTrackLocalStaticRTP(remoteTrack.Codec().RTPCodecCapability, "video", "pion")
|
|
|
|
if newTrackErr != nil {
|
|
|
|
panic(newTrackErr)
|
|
|
|
}
|
2023-12-04 01:07:18 +00:00
|
|
|
var stop bool
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
for {
|
|
|
|
if stop {
|
|
|
|
slog.Info("Stopped collecting stats for track.", "stream_key", streamKey, "type", remoteTrack.Codec().MimeType, "id", remoteTrack.ID())
|
|
|
|
return
|
|
|
|
}
|
|
|
|
stats := statsGetter.Get(uint32(remoteTrack.SSRC()))
|
|
|
|
stream.peerConnectionStatsMu.Lock()
|
|
|
|
stream.peerConnectionStats[remoteTrack.Codec().MimeType] = stats
|
|
|
|
stream.peerConnectionStatsMu.Unlock()
|
|
|
|
|
|
|
|
time.Sleep(time.Second * 1)
|
|
|
|
}
|
|
|
|
}()
|
2023-11-30 18:32:38 +00:00
|
|
|
|
|
|
|
stream.mu.Lock()
|
|
|
|
stream.localTracks = append(stream.localTracks, localTrack)
|
|
|
|
stream.mu.Unlock()
|
|
|
|
|
2023-12-04 01:07:18 +00:00
|
|
|
rtpBuf := make([]byte, 1500)
|
2023-11-30 18:32:38 +00:00
|
|
|
for {
|
|
|
|
i, _, readErr := remoteTrack.Read(rtpBuf)
|
|
|
|
if readErr != nil {
|
|
|
|
if errors.Is(readErr, io.EOF) {
|
|
|
|
slog.Warn("EOF from track.", "id", remoteTrack.ID())
|
2023-12-04 01:07:18 +00:00
|
|
|
stop = true
|
2023-11-30 18:32:38 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
panic(readErr)
|
|
|
|
}
|
|
|
|
|
|
|
|
// ErrClosedPipe means we don't have any subscribers, this is ok if no peers have connected yet
|
|
|
|
if _, err = localTrack.Write(rtpBuf[:i]); err != nil && !errors.Is(err, io.ErrClosedPipe) {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
})
|
|
|
|
|
|
|
|
// Set the remote SessionDescription
|
|
|
|
offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: sd.SDP}
|
|
|
|
|
|
|
|
err = peerConnection.SetRemoteDescription(offer)
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
// Create answer
|
|
|
|
answer, err := peerConnection.CreateAnswer(nil)
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Create channel that is blocked until ICE Gathering is complete
|
2023-11-30 22:56:33 +00:00
|
|
|
// gatherComplete := webrtc.GatheringCompletePromise(peerConnection)
|
2023-11-30 18:32:38 +00:00
|
|
|
|
|
|
|
// Sets the LocalDescription, and starts our UDP listeners
|
|
|
|
err = peerConnection.SetLocalDescription(answer)
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Block until ICE Gathering is complete, disabling trickle ICE
|
|
|
|
// we do this because we only can exchange one signaling message
|
|
|
|
// in a production application you should exchange ICE Candidates via OnICECandidate
|
2023-11-30 22:56:33 +00:00
|
|
|
// answerChan <- &answer
|
|
|
|
|
|
|
|
peerConnection.OnICEConnectionStateChange(func(is webrtc.ICEConnectionState) {
|
|
|
|
slog.Info("ICE state changed.", "stream_key", streamKey, "ice_state", is)
|
|
|
|
})
|
|
|
|
|
|
|
|
gatherComplete := make(chan struct{})
|
|
|
|
peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) {
|
|
|
|
if i == nil {
|
|
|
|
gatherComplete <- struct{}{}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
slog.Info("Got ICE Candidate",
|
|
|
|
"addr", i.Address,
|
|
|
|
"port", i.Port,
|
|
|
|
"related_addr", i.RelatedAddress,
|
|
|
|
"related_port", i.RelatedPort)
|
|
|
|
if err := peerConnection.AddICECandidate(i.ToJSON()); err != nil {
|
|
|
|
slog.Info("Error adding ICE Candidate.", "error", err)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
|
2023-11-30 18:32:38 +00:00
|
|
|
<-gatherComplete
|
2023-11-30 22:56:33 +00:00
|
|
|
slog.Info("ICE Gathering complete.", "stream_key", streamKey, "ice_state", peerConnection.ICEConnectionState())
|
2023-11-30 18:32:38 +00:00
|
|
|
|
2023-11-30 22:56:33 +00:00
|
|
|
answerChan <- peerConnection.CurrentLocalDescription()
|
2023-11-30 18:32:38 +00:00
|
|
|
s.Streams[streamKey] = stream
|
2023-11-30 22:59:51 +00:00
|
|
|
slog.Info("Added stream.", "stream_key", streamKey)
|
2023-11-30 18:32:38 +00:00
|
|
|
}()
|
|
|
|
answer := <-answerChan
|
2023-11-30 22:56:33 +00:00
|
|
|
|
2023-11-30 18:32:38 +00:00
|
|
|
return answer, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *StreamStore) Get(streamKey string) (*Stream, error) {
|
|
|
|
stream, ok := s.Streams[streamKey]
|
|
|
|
if !ok {
|
|
|
|
return nil, ErrNoSuchStream
|
|
|
|
}
|
|
|
|
|
|
|
|
return stream, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *StreamStore) Delete(streamKey string) error {
|
|
|
|
stream, ok := s.Streams[streamKey]
|
|
|
|
if !ok {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
delete(s.Streams, streamKey)
|
|
|
|
|
2023-12-05 23:08:14 +00:00
|
|
|
for _, peer := range stream.viewers {
|
2023-11-30 18:32:38 +00:00
|
|
|
if err := peer.Close(); err != nil {
|
|
|
|
slog.Warn("Error closing peer.", "error", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return stream.peerConnection.Close()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *StreamStore) List() []string {
|
|
|
|
streams := []string{}
|
|
|
|
for key := range s.Streams {
|
|
|
|
streams = append(streams, key)
|
|
|
|
}
|
|
|
|
|
|
|
|
return streams
|
|
|
|
}
|