package server import ( "encoding/json" "errors" "fmt" "io" "log/slog" "net/http" "sync" "time" "github.com/pion/interceptor" "github.com/pion/interceptor/pkg/nack" "github.com/pion/interceptor/pkg/stats" "github.com/pion/webrtc/v4" ) var ErrNoSuchStream error = fmt.Errorf("no such stream") type StreamStore struct { Streams map[string]*Stream webRTCConfig webrtc.Configuration mu sync.Mutex } func NewStreamStore() *StreamStore { s := &StreamStore{ Streams: make(map[string]*Stream), } return s } type Stream struct { peerConnection *webrtc.PeerConnection peerConnectionStats map[string]*stats.Stats peerConnectionStatsMu sync.Mutex lastUpdate time.Time localTracks []*webrtc.TrackLocalStaticRTP peers []*webrtc.PeerConnection mu sync.Mutex } func (s *StreamStore) StatsHandler(w http.ResponseWriter, r *http.Request) { type StreamStats struct { StreamKey string `json:"streamKey"` StatsInbound stats.InboundRTPStreamStats `json:"inboudStats"` } stats := []StreamStats{} for streamKey, stream := range s.Streams { stream.peerConnectionStatsMu.Lock() for track, stat := range stream.peerConnectionStats { stats = append(stats, StreamStats{ StreamKey: fmt.Sprintf("%s-%s", streamKey, track), StatsInbound: stat.InboundRTPStreamStats, }) } stream.peerConnectionStatsMu.Unlock() } w.Header().Add("Content-Type", "application/json") enc := json.NewEncoder(w) if err := enc.Encode(stats); err != nil { slog.Warn("Error encoding stats: %s", err) } } func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDescription, error) { m := &webrtc.MediaEngine{} if err := m.RegisterDefaultCodecs(); err != nil { panic(err) } i := &interceptor.Registry{} if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil { panic(err) } // intervalPliFactory, err := intervalpli.NewReceiverInterceptor() // if err != nil { // panic(err) // } // i.Add(intervalPliFactory) // Stats interceptor //statsInterceptorFactory, err := stats.NewInterceptor() //if err != nil { // panic(err) //} //var statsGetter stats.Getter //statsInterceptorFactory.OnNewPeerConnection(func(_ string, g stats.Getter) { // statsGetter = g //}) //i.Add(statsInterceptorFactory) se := webrtc.SettingEngine{} _ = se.SetEphemeralUDPPortRange(50000, 50050) webRTCConfig := webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ { URLs: []string{"stun:stun.l.google.com:19302"}, }, }, } api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(se)) peerConnection, err := api.NewPeerConnection(webRTCConfig) if err != nil { return nil, err } for _, ltrack := range s.localTracks { rtpSender, err := peerConnection.AddTrack(ltrack) if err != nil { // TODO, stop peerconn return nil, err } go func() { rtcpBuf := make([]byte, 1500) for { if _, _, err := rtpSender.Read(rtcpBuf); err != nil { peerConnection.Close() return } } }() } err = peerConnection.SetRemoteDescription(*sd) if err != nil { return nil, err } answer, err := peerConnection.CreateAnswer(nil) if err != nil { return nil, err } gatherComplete := make(chan struct{}) peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) { if i == nil { gatherComplete <- struct{}{} return } slog.Info("Got ICE Candidate for listener.", "addr", i.Address, "port", i.Port, "related_addr", i.RelatedAddress, "related_port", i.RelatedPort) if err := peerConnection.AddICECandidate(i.ToJSON()); err != nil { slog.Info("Error adding ICE Candidate.", "error", err) } }) // Sets the LocalDescription, and starts our UDP listeners err = peerConnection.SetLocalDescription(answer) if err != nil { panic(err) } // Block until ICE Gathering is complete, disabling trickle ICE // we do this because we only can exchange one signaling message // in a production application you should exchange ICE Candidates via OnICECandidate <-gatherComplete s.mu.Lock() s.peers = append(s.peers, peerConnection) defer s.mu.Unlock() // Get the LocalDescription and take it to base64 so we can paste in browser return peerConnection.LocalDescription(), nil } func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*webrtc.SessionDescription, error) { answerChan := make(chan *webrtc.SessionDescription) go func() { stream := &Stream{ lastUpdate: time.Now(), peerConnectionStats: make(map[string]*stats.Stats), } m := &webrtc.MediaEngine{} if err := m.RegisterDefaultCodecs(); err != nil { panic(err) } i := &interceptor.Registry{} if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil { panic(err) } // intervalPliFactory, err := intervalpli.NewReceiverInterceptor() // if err != nil { // panic(err) // } // i.Add(intervalPliFactory) // Stats interceptor statsInterceptorFactory, err := stats.NewInterceptor() if err != nil { panic(err) } var statsGetter stats.Getter statsInterceptorFactory.OnNewPeerConnection(func(_ string, g stats.Getter) { statsGetter = g }) i.Add(statsInterceptorFactory) // NACK interceptor maxPerPacket := nack.GeneratorMaxNacksPerPacket(4) nackSize := nack.GeneratorSize(4096) nackinterceptorFactory, err := nack.NewGeneratorInterceptor(maxPerPacket, nackSize) if err != nil { panic(err) } i.Add(nackinterceptorFactory) s.webRTCConfig = webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ { URLs: []string{"stun:stun.l.google.com:19302"}, }, }, } se := webrtc.SettingEngine{} _ = se.SetEphemeralUDPPortRange(50000, 50050) // se.BufferFactory = func(packetType packetio.BufferPacketType, ssrc uint32) io.ReadWriteCloser { // buf := packetio.NewBuffer() // buf.SetLimitSize(32 * 1000 * 1000) // return buf // } api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(se)) // Create a new RTCPeerConnection peerConnection, err := api.NewPeerConnection(s.webRTCConfig) if err != nil { panic(err) } stream.peerConnection = peerConnection // Allow us to receive 1 video track if _, err := peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { panic(err) } if _, err := peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { panic(err) } // Set a handler for when a new remote track starts, this just distributes all our packets // to connected peers peerConnection.OnDataChannel(func(dc *webrtc.DataChannel) { slog.Info("Got data channel.", "stream_key", streamKey, "label", dc.Label()) }) peerConnection.OnTrack(func(remoteTrack *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { // Create a local track, all our SFU clients will be fed via this track slog.Info("Got track.", "stream_key", streamKey, "type", remoteTrack.Codec().MimeType, "id", remoteTrack.ID()) localTrack, newTrackErr := webrtc.NewTrackLocalStaticRTP(remoteTrack.Codec().RTPCodecCapability, "video", "pion") if newTrackErr != nil { panic(newTrackErr) } var stop bool go func() { for { if stop { slog.Info("Stopped collecting stats for track.", "stream_key", streamKey, "type", remoteTrack.Codec().MimeType, "id", remoteTrack.ID()) return } stats := statsGetter.Get(uint32(remoteTrack.SSRC())) stream.peerConnectionStatsMu.Lock() stream.peerConnectionStats[remoteTrack.Codec().MimeType] = stats stream.peerConnectionStatsMu.Unlock() time.Sleep(time.Second * 1) } }() stream.mu.Lock() stream.localTracks = append(stream.localTracks, localTrack) stream.mu.Unlock() rtpBuf := make([]byte, 1500) for { i, _, readErr := remoteTrack.Read(rtpBuf) if readErr != nil { if errors.Is(readErr, io.EOF) { slog.Warn("EOF from track.", "id", remoteTrack.ID()) stop = true return } panic(readErr) } // ErrClosedPipe means we don't have any subscribers, this is ok if no peers have connected yet if _, err = localTrack.Write(rtpBuf[:i]); err != nil && !errors.Is(err, io.ErrClosedPipe) { panic(err) } } }) // Set the remote SessionDescription offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: sd.SDP} err = peerConnection.SetRemoteDescription(offer) if err != nil { panic(err) } // Create answer answer, err := peerConnection.CreateAnswer(nil) if err != nil { panic(err) } // Create channel that is blocked until ICE Gathering is complete // gatherComplete := webrtc.GatheringCompletePromise(peerConnection) // Sets the LocalDescription, and starts our UDP listeners err = peerConnection.SetLocalDescription(answer) if err != nil { panic(err) } // Block until ICE Gathering is complete, disabling trickle ICE // we do this because we only can exchange one signaling message // in a production application you should exchange ICE Candidates via OnICECandidate // answerChan <- &answer peerConnection.OnICEConnectionStateChange(func(is webrtc.ICEConnectionState) { slog.Info("ICE state changed.", "stream_key", streamKey, "ice_state", is) }) gatherComplete := make(chan struct{}) peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) { if i == nil { gatherComplete <- struct{}{} return } slog.Info("Got ICE Candidate", "addr", i.Address, "port", i.Port, "related_addr", i.RelatedAddress, "related_port", i.RelatedPort) if err := peerConnection.AddICECandidate(i.ToJSON()); err != nil { slog.Info("Error adding ICE Candidate.", "error", err) } }) <-gatherComplete slog.Info("ICE Gathering complete.", "stream_key", streamKey, "ice_state", peerConnection.ICEConnectionState()) answerChan <- peerConnection.CurrentLocalDescription() s.Streams[streamKey] = stream slog.Info("Added stream.", "stream_key", streamKey) }() answer := <-answerChan return answer, nil } func (s *StreamStore) Get(streamKey string) (*Stream, error) { stream, ok := s.Streams[streamKey] if !ok { return nil, ErrNoSuchStream } return stream, nil } func (s *StreamStore) Delete(streamKey string) error { stream, ok := s.Streams[streamKey] if !ok { return nil } s.mu.Lock() defer s.mu.Unlock() delete(s.Streams, streamKey) for _, peer := range stream.peers { if err := peer.Close(); err != nil { slog.Warn("Error closing peer.", "error", err) } } return stream.peerConnection.Close() } func (s *StreamStore) List() []string { streams := []string{} for key := range s.Streams { streams = append(streams, key) } return streams }