ministream/server/stream.go
2023-11-30 19:32:38 +01:00

250 lines
6.3 KiB
Go

package server
import (
"errors"
"fmt"
"io"
"log/slog"
"sync"
"time"
"github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/intervalpli"
"github.com/pion/webrtc/v4"
)
var ErrNoSuchStream error = fmt.Errorf("no such stream")
type StreamStore struct {
Streams map[string]*Stream
mu sync.Mutex
}
func NewStreamStore() *StreamStore {
s := &StreamStore{
Streams: make(map[string]*Stream),
}
return s
}
type Stream struct {
peerConnection *webrtc.PeerConnection
lastUpdate time.Time
localTracks []*webrtc.TrackLocalStaticRTP
peers []*webrtc.PeerConnection
mu sync.Mutex
}
func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
peerConnectionConfig := webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{"stun:stun.l.google.com:19302"},
},
},
}
peerConnection, err := webrtc.NewPeerConnection(peerConnectionConfig)
if err != nil {
return nil, err
}
for _, ltrack := range s.localTracks {
rtpSender, err := peerConnection.AddTrack(ltrack)
if err != nil {
// TODO, stop peerconn
return nil, err
}
go func() {
rtcpBuf := make([]byte, 1500)
for {
if _, _, err := rtpSender.Read(rtcpBuf); err != nil {
peerConnection.Close()
return
}
}
}()
}
err = peerConnection.SetRemoteDescription(*sd)
if err != nil {
return nil, err
}
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
return nil, err
}
gatherComplete := webrtc.GatheringCompletePromise(peerConnection)
// Sets the LocalDescription, and starts our UDP listeners
err = peerConnection.SetLocalDescription(answer)
if err != nil {
panic(err)
}
// Block until ICE Gathering is complete, disabling trickle ICE
// we do this because we only can exchange one signaling message
// in a production application you should exchange ICE Candidates via OnICECandidate
<-gatherComplete
s.mu.Lock()
s.peers = append(s.peers, peerConnection)
defer s.mu.Unlock()
// Get the LocalDescription and take it to base64 so we can paste in browser
return peerConnection.LocalDescription(), nil
}
func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
answerChan := make(chan *webrtc.SessionDescription)
go func() {
stream := &Stream{
lastUpdate: time.Now(),
}
m := &webrtc.MediaEngine{}
if err := m.RegisterDefaultCodecs(); err != nil {
panic(err)
}
i := &interceptor.Registry{}
if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil {
panic(err)
}
intervalPliFactory, err := intervalpli.NewReceiverInterceptor()
if err != nil {
panic(err)
}
i.Add(intervalPliFactory)
peerConnectionConfig := webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{"stun:stun.l.google.com:19302"},
},
},
}
// Create a new RTCPeerConnection
peerConnection, err := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i)).NewPeerConnection(peerConnectionConfig)
if err != nil {
panic(err)
}
stream.peerConnection = peerConnection
// Allow us to receive 1 video track
if _, err := peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
panic(err)
}
if _, err := peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
panic(err)
}
// Set a handler for when a new remote track starts, this just distributes all our packets
// to connected peers
peerConnection.OnTrack(func(remoteTrack *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
// Create a local track, all our SFU clients will be fed via this track
slog.Info("Got track!", "type", remoteTrack.Codec().MimeType, "id", remoteTrack.ID())
localTrack, newTrackErr := webrtc.NewTrackLocalStaticRTP(remoteTrack.Codec().RTPCodecCapability, "video", "pion")
if newTrackErr != nil {
panic(newTrackErr)
}
stream.mu.Lock()
stream.localTracks = append(stream.localTracks, localTrack)
stream.mu.Unlock()
rtpBuf := make([]byte, 1400)
for {
i, _, readErr := remoteTrack.Read(rtpBuf)
if readErr != nil {
if errors.Is(readErr, io.EOF) {
slog.Warn("EOF from track.", "id", remoteTrack.ID())
return
}
panic(readErr)
}
// ErrClosedPipe means we don't have any subscribers, this is ok if no peers have connected yet
if _, err = localTrack.Write(rtpBuf[:i]); err != nil && !errors.Is(err, io.ErrClosedPipe) {
panic(err)
}
}
})
// Set the remote SessionDescription
offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: sd.SDP}
err = peerConnection.SetRemoteDescription(offer)
if err != nil {
panic(err)
}
// Create answer
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
panic(err)
}
// Create channel that is blocked until ICE Gathering is complete
gatherComplete := webrtc.GatheringCompletePromise(peerConnection)
// Sets the LocalDescription, and starts our UDP listeners
err = peerConnection.SetLocalDescription(answer)
if err != nil {
panic(err)
}
// Block until ICE Gathering is complete, disabling trickle ICE
// we do this because we only can exchange one signaling message
// in a production application you should exchange ICE Candidates via OnICECandidate
<-gatherComplete
slog.Info("ICE Gathering complete.", "answer", answer)
answerChan <- &answer
s.Streams[streamKey] = stream
slog.Info("Added stream.", "stream_key", streamKey)
}()
answer := <-answerChan
return answer, nil
}
func (s *StreamStore) Get(streamKey string) (*Stream, error) {
stream, ok := s.Streams[streamKey]
if !ok {
return nil, ErrNoSuchStream
}
return stream, nil
}
func (s *StreamStore) Delete(streamKey string) error {
stream, ok := s.Streams[streamKey]
if !ok {
return nil
}
s.mu.Lock()
defer s.mu.Unlock()
delete(s.Streams, streamKey)
for _, peer := range stream.peers {
if err := peer.Close(); err != nil {
slog.Warn("Error closing peer.", "error", err)
}
}
return stream.peerConnection.Close()
}
func (s *StreamStore) List() []string {
streams := []string{}
for key := range s.Streams {
streams = append(streams, key)
}
return streams
}