package server import ( "errors" "fmt" "io" "log/slog" "sync" "time" "github.com/pion/interceptor" "github.com/pion/interceptor/pkg/intervalpli" "github.com/pion/webrtc/v4" ) var ErrNoSuchStream error = fmt.Errorf("no such stream") type StreamStore struct { Streams map[string]*Stream mu sync.Mutex } func NewStreamStore() *StreamStore { s := &StreamStore{ Streams: make(map[string]*Stream), } return s } type Stream struct { peerConnection *webrtc.PeerConnection lastUpdate time.Time localTracks []*webrtc.TrackLocalStaticRTP peers []*webrtc.PeerConnection mu sync.Mutex } func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDescription, error) { peerConnectionConfig := webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ { URLs: []string{"stun:stun.l.google.com:19302"}, }, }, } peerConnection, err := webrtc.NewPeerConnection(peerConnectionConfig) if err != nil { return nil, err } for _, ltrack := range s.localTracks { rtpSender, err := peerConnection.AddTrack(ltrack) if err != nil { // TODO, stop peerconn return nil, err } go func() { rtcpBuf := make([]byte, 1500) for { if _, _, err := rtpSender.Read(rtcpBuf); err != nil { peerConnection.Close() return } } }() } err = peerConnection.SetRemoteDescription(*sd) if err != nil { return nil, err } answer, err := peerConnection.CreateAnswer(nil) if err != nil { return nil, err } gatherComplete := make(chan struct{}) peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) { if i == nil { gatherComplete <- struct{}{} return } slog.Info("Got ICE Candidate for listener.", "addr", i.Address, "port", i.Port, "related_addr", i.RelatedAddress, "related_port", i.RelatedPort) if err := peerConnection.AddICECandidate(i.ToJSON()); err != nil { slog.Info("Error adding ICE Candidate.", "error", err) } }) // Sets the LocalDescription, and starts our UDP listeners err = peerConnection.SetLocalDescription(answer) if err != nil { panic(err) } // Block until ICE Gathering is complete, disabling trickle ICE // we do this because we only can exchange one signaling message // in a production application you should exchange ICE Candidates via OnICECandidate <-gatherComplete s.mu.Lock() s.peers = append(s.peers, peerConnection) defer s.mu.Unlock() // Get the LocalDescription and take it to base64 so we can paste in browser return peerConnection.LocalDescription(), nil } func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*webrtc.SessionDescription, error) { answerChan := make(chan *webrtc.SessionDescription) go func() { stream := &Stream{ lastUpdate: time.Now(), } m := &webrtc.MediaEngine{} if err := m.RegisterDefaultCodecs(); err != nil { panic(err) } i := &interceptor.Registry{} if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil { panic(err) } intervalPliFactory, err := intervalpli.NewReceiverInterceptor() if err != nil { panic(err) } i.Add(intervalPliFactory) peerConnectionConfig := webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ { URLs: []string{"stun:stun.l.google.com:19302"}, }, }, } se := webrtc.SettingEngine{} _ = se.SetEphemeralUDPPortRange(50000, 50050) // Create a new RTCPeerConnection peerConnection, err := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(se)).NewPeerConnection(peerConnectionConfig) if err != nil { panic(err) } stream.peerConnection = peerConnection // Allow us to receive 1 video track if _, err := peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { panic(err) } if _, err := peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { panic(err) } // Set a handler for when a new remote track starts, this just distributes all our packets // to connected peers peerConnection.OnTrack(func(remoteTrack *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { // Create a local track, all our SFU clients will be fed via this track slog.Info("Got track.", "stream_key", streamKey, "type", remoteTrack.Codec().MimeType, "id", remoteTrack.ID()) localTrack, newTrackErr := webrtc.NewTrackLocalStaticRTP(remoteTrack.Codec().RTPCodecCapability, "video", "pion") if newTrackErr != nil { panic(newTrackErr) } stream.mu.Lock() stream.localTracks = append(stream.localTracks, localTrack) stream.mu.Unlock() rtpBuf := make([]byte, 1400) for { i, _, readErr := remoteTrack.Read(rtpBuf) if readErr != nil { if errors.Is(readErr, io.EOF) { slog.Warn("EOF from track.", "id", remoteTrack.ID()) return } panic(readErr) } // ErrClosedPipe means we don't have any subscribers, this is ok if no peers have connected yet if _, err = localTrack.Write(rtpBuf[:i]); err != nil && !errors.Is(err, io.ErrClosedPipe) { panic(err) } } }) // Set the remote SessionDescription offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: sd.SDP} err = peerConnection.SetRemoteDescription(offer) if err != nil { panic(err) } // Create answer answer, err := peerConnection.CreateAnswer(nil) if err != nil { panic(err) } // Create channel that is blocked until ICE Gathering is complete // gatherComplete := webrtc.GatheringCompletePromise(peerConnection) // Sets the LocalDescription, and starts our UDP listeners err = peerConnection.SetLocalDescription(answer) if err != nil { panic(err) } // Block until ICE Gathering is complete, disabling trickle ICE // we do this because we only can exchange one signaling message // in a production application you should exchange ICE Candidates via OnICECandidate // answerChan <- &answer peerConnection.OnICEConnectionStateChange(func(is webrtc.ICEConnectionState) { slog.Info("ICE state changed.", "stream_key", streamKey, "ice_state", is) }) gatherComplete := make(chan struct{}) peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) { if i == nil { gatherComplete <- struct{}{} return } slog.Info("Got ICE Candidate", "addr", i.Address, "port", i.Port, "related_addr", i.RelatedAddress, "related_port", i.RelatedPort) if err := peerConnection.AddICECandidate(i.ToJSON()); err != nil { slog.Info("Error adding ICE Candidate.", "error", err) } }) <-gatherComplete slog.Info("ICE Gathering complete.", "stream_key", streamKey, "ice_state", peerConnection.ICEConnectionState()) answerChan <- peerConnection.CurrentLocalDescription() s.Streams[streamKey] = stream slog.Info("Added stream.", "stream_key", streamKey, "answer", answer) }() answer := <-answerChan return answer, nil } func (s *StreamStore) Get(streamKey string) (*Stream, error) { stream, ok := s.Streams[streamKey] if !ok { return nil, ErrNoSuchStream } return stream, nil } func (s *StreamStore) Delete(streamKey string) error { stream, ok := s.Streams[streamKey] if !ok { return nil } s.mu.Lock() defer s.mu.Unlock() delete(s.Streams, streamKey) for _, peer := range stream.peers { if err := peer.Close(); err != nil { slog.Warn("Error closing peer.", "error", err) } } return stream.peerConnection.Close() } func (s *StreamStore) List() []string { streams := []string{} for key := range s.Streams { streams = append(streams, key) } return streams }