ministream/server/stream.go

403 lines
11 KiB
Go
Raw Normal View History

2023-11-30 18:32:38 +00:00
package server
import (
2023-12-04 01:07:18 +00:00
"encoding/json"
2023-11-30 18:32:38 +00:00
"errors"
"fmt"
"io"
"log/slog"
2023-12-04 01:07:18 +00:00
"net/http"
2023-11-30 18:32:38 +00:00
"sync"
"time"
"github.com/pion/interceptor"
2023-12-04 01:07:18 +00:00
"github.com/pion/interceptor/pkg/nack"
"github.com/pion/interceptor/pkg/stats"
2023-11-30 18:32:38 +00:00
"github.com/pion/webrtc/v4"
)
var ErrNoSuchStream error = fmt.Errorf("no such stream")
type StreamStore struct {
2023-12-04 01:07:18 +00:00
Streams map[string]*Stream
webRTCConfig webrtc.Configuration
mu sync.Mutex
2023-11-30 18:32:38 +00:00
}
func NewStreamStore() *StreamStore {
s := &StreamStore{
Streams: make(map[string]*Stream),
}
return s
}
type Stream struct {
2023-12-04 01:07:18 +00:00
peerConnection *webrtc.PeerConnection
peerConnectionStats map[string]*stats.Stats
peerConnectionStatsMu sync.Mutex
lastUpdate time.Time
localTracks []*webrtc.TrackLocalStaticRTP
peers []*webrtc.PeerConnection
mu sync.Mutex
}
func (s *StreamStore) StatsHandler(w http.ResponseWriter, r *http.Request) {
type StreamStats struct {
StreamKey string `json:"streamKey"`
StatsInbound stats.InboundRTPStreamStats `json:"inboudStats"`
}
stats := []StreamStats{}
for streamKey, stream := range s.Streams {
stream.peerConnectionStatsMu.Lock()
for track, stat := range stream.peerConnectionStats {
stats = append(stats, StreamStats{
StreamKey: fmt.Sprintf("%s-%s", streamKey, track),
StatsInbound: stat.InboundRTPStreamStats,
})
}
stream.peerConnectionStatsMu.Unlock()
}
w.Header().Add("Content-Type", "application/json")
enc := json.NewEncoder(w)
if err := enc.Encode(stats); err != nil {
slog.Warn("Error encoding stats: %s", err)
}
2023-11-30 18:32:38 +00:00
}
func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
2023-12-04 01:07:18 +00:00
m := &webrtc.MediaEngine{}
if err := m.RegisterDefaultCodecs(); err != nil {
panic(err)
}
i := &interceptor.Registry{}
if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil {
panic(err)
}
// intervalPliFactory, err := intervalpli.NewReceiverInterceptor()
// if err != nil {
// panic(err)
// }
// i.Add(intervalPliFactory)
// Stats interceptor
//statsInterceptorFactory, err := stats.NewInterceptor()
//if err != nil {
// panic(err)
//}
//var statsGetter stats.Getter
//statsInterceptorFactory.OnNewPeerConnection(func(_ string, g stats.Getter) {
// statsGetter = g
//})
//i.Add(statsInterceptorFactory)
se := webrtc.SettingEngine{}
_ = se.SetEphemeralUDPPortRange(50000, 50050)
webRTCConfig := webrtc.Configuration{
2023-11-30 18:32:38 +00:00
ICEServers: []webrtc.ICEServer{
{
URLs: []string{"stun:stun.l.google.com:19302"},
},
},
}
2023-12-04 01:07:18 +00:00
api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(se))
peerConnection, err := api.NewPeerConnection(webRTCConfig)
2023-11-30 18:32:38 +00:00
if err != nil {
return nil, err
}
for _, ltrack := range s.localTracks {
rtpSender, err := peerConnection.AddTrack(ltrack)
if err != nil {
// TODO, stop peerconn
return nil, err
}
go func() {
rtcpBuf := make([]byte, 1500)
for {
if _, _, err := rtpSender.Read(rtcpBuf); err != nil {
peerConnection.Close()
return
}
}
}()
}
err = peerConnection.SetRemoteDescription(*sd)
if err != nil {
return nil, err
}
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
return nil, err
}
2023-11-30 22:56:33 +00:00
gatherComplete := make(chan struct{})
peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) {
if i == nil {
gatherComplete <- struct{}{}
return
}
slog.Info("Got ICE Candidate for listener.",
"addr", i.Address,
"port", i.Port,
"related_addr", i.RelatedAddress,
"related_port", i.RelatedPort)
if err := peerConnection.AddICECandidate(i.ToJSON()); err != nil {
slog.Info("Error adding ICE Candidate.", "error", err)
}
})
2023-11-30 18:32:38 +00:00
// Sets the LocalDescription, and starts our UDP listeners
err = peerConnection.SetLocalDescription(answer)
if err != nil {
panic(err)
}
// Block until ICE Gathering is complete, disabling trickle ICE
// we do this because we only can exchange one signaling message
// in a production application you should exchange ICE Candidates via OnICECandidate
<-gatherComplete
s.mu.Lock()
s.peers = append(s.peers, peerConnection)
defer s.mu.Unlock()
// Get the LocalDescription and take it to base64 so we can paste in browser
return peerConnection.LocalDescription(), nil
}
func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
answerChan := make(chan *webrtc.SessionDescription)
go func() {
stream := &Stream{
2023-12-04 01:07:18 +00:00
lastUpdate: time.Now(),
peerConnectionStats: make(map[string]*stats.Stats),
2023-11-30 18:32:38 +00:00
}
m := &webrtc.MediaEngine{}
if err := m.RegisterDefaultCodecs(); err != nil {
panic(err)
}
i := &interceptor.Registry{}
if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil {
panic(err)
}
2023-12-04 01:07:18 +00:00
// intervalPliFactory, err := intervalpli.NewReceiverInterceptor()
// if err != nil {
// panic(err)
// }
// i.Add(intervalPliFactory)
// Stats interceptor
statsInterceptorFactory, err := stats.NewInterceptor()
if err != nil {
panic(err)
}
var statsGetter stats.Getter
statsInterceptorFactory.OnNewPeerConnection(func(_ string, g stats.Getter) {
statsGetter = g
})
i.Add(statsInterceptorFactory)
// NACK interceptor
maxPerPacket := nack.GeneratorMaxNacksPerPacket(4)
nackSize := nack.GeneratorSize(4096)
nackinterceptorFactory, err := nack.NewGeneratorInterceptor(maxPerPacket, nackSize)
2023-11-30 18:32:38 +00:00
if err != nil {
panic(err)
}
2023-12-04 01:07:18 +00:00
i.Add(nackinterceptorFactory)
2023-11-30 18:32:38 +00:00
2023-12-04 01:07:18 +00:00
s.webRTCConfig = webrtc.Configuration{
2023-11-30 18:32:38 +00:00
ICEServers: []webrtc.ICEServer{
{
URLs: []string{"stun:stun.l.google.com:19302"},
},
},
}
2023-11-30 22:56:33 +00:00
se := webrtc.SettingEngine{}
_ = se.SetEphemeralUDPPortRange(50000, 50050)
2023-12-04 01:07:18 +00:00
// se.BufferFactory = func(packetType packetio.BufferPacketType, ssrc uint32) io.ReadWriteCloser {
// buf := packetio.NewBuffer()
// buf.SetLimitSize(32 * 1000 * 1000)
// return buf
// }
api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(se))
2023-11-30 18:32:38 +00:00
// Create a new RTCPeerConnection
2023-12-04 01:07:18 +00:00
peerConnection, err := api.NewPeerConnection(s.webRTCConfig)
2023-11-30 18:32:38 +00:00
if err != nil {
panic(err)
}
stream.peerConnection = peerConnection
// Allow us to receive 1 video track
if _, err := peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
panic(err)
}
if _, err := peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
panic(err)
}
// Set a handler for when a new remote track starts, this just distributes all our packets
// to connected peers
2023-12-04 01:07:18 +00:00
peerConnection.OnDataChannel(func(dc *webrtc.DataChannel) {
slog.Info("Got data channel.", "stream_key", streamKey, "label", dc.Label())
})
2023-11-30 18:32:38 +00:00
peerConnection.OnTrack(func(remoteTrack *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
// Create a local track, all our SFU clients will be fed via this track
2023-11-30 22:56:33 +00:00
slog.Info("Got track.", "stream_key", streamKey, "type", remoteTrack.Codec().MimeType, "id", remoteTrack.ID())
2023-11-30 18:32:38 +00:00
localTrack, newTrackErr := webrtc.NewTrackLocalStaticRTP(remoteTrack.Codec().RTPCodecCapability, "video", "pion")
if newTrackErr != nil {
panic(newTrackErr)
}
2023-12-04 01:07:18 +00:00
var stop bool
go func() {
for {
if stop {
slog.Info("Stopped collecting stats for track.", "stream_key", streamKey, "type", remoteTrack.Codec().MimeType, "id", remoteTrack.ID())
return
}
stats := statsGetter.Get(uint32(remoteTrack.SSRC()))
stream.peerConnectionStatsMu.Lock()
stream.peerConnectionStats[remoteTrack.Codec().MimeType] = stats
stream.peerConnectionStatsMu.Unlock()
time.Sleep(time.Second * 1)
}
}()
2023-11-30 18:32:38 +00:00
stream.mu.Lock()
stream.localTracks = append(stream.localTracks, localTrack)
stream.mu.Unlock()
2023-12-04 01:07:18 +00:00
rtpBuf := make([]byte, 1500)
2023-11-30 18:32:38 +00:00
for {
i, _, readErr := remoteTrack.Read(rtpBuf)
if readErr != nil {
if errors.Is(readErr, io.EOF) {
slog.Warn("EOF from track.", "id", remoteTrack.ID())
2023-12-04 01:07:18 +00:00
stop = true
2023-11-30 18:32:38 +00:00
return
}
panic(readErr)
}
// ErrClosedPipe means we don't have any subscribers, this is ok if no peers have connected yet
if _, err = localTrack.Write(rtpBuf[:i]); err != nil && !errors.Is(err, io.ErrClosedPipe) {
panic(err)
}
}
})
// Set the remote SessionDescription
offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: sd.SDP}
err = peerConnection.SetRemoteDescription(offer)
if err != nil {
panic(err)
}
// Create answer
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
panic(err)
}
// Create channel that is blocked until ICE Gathering is complete
2023-11-30 22:56:33 +00:00
// gatherComplete := webrtc.GatheringCompletePromise(peerConnection)
2023-11-30 18:32:38 +00:00
// Sets the LocalDescription, and starts our UDP listeners
err = peerConnection.SetLocalDescription(answer)
if err != nil {
panic(err)
}
// Block until ICE Gathering is complete, disabling trickle ICE
// we do this because we only can exchange one signaling message
// in a production application you should exchange ICE Candidates via OnICECandidate
2023-11-30 22:56:33 +00:00
// answerChan <- &answer
peerConnection.OnICEConnectionStateChange(func(is webrtc.ICEConnectionState) {
slog.Info("ICE state changed.", "stream_key", streamKey, "ice_state", is)
})
gatherComplete := make(chan struct{})
peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) {
if i == nil {
gatherComplete <- struct{}{}
return
}
slog.Info("Got ICE Candidate",
"addr", i.Address,
"port", i.Port,
"related_addr", i.RelatedAddress,
"related_port", i.RelatedPort)
if err := peerConnection.AddICECandidate(i.ToJSON()); err != nil {
slog.Info("Error adding ICE Candidate.", "error", err)
}
})
2023-11-30 18:32:38 +00:00
<-gatherComplete
2023-11-30 22:56:33 +00:00
slog.Info("ICE Gathering complete.", "stream_key", streamKey, "ice_state", peerConnection.ICEConnectionState())
2023-11-30 18:32:38 +00:00
2023-11-30 22:56:33 +00:00
answerChan <- peerConnection.CurrentLocalDescription()
2023-11-30 18:32:38 +00:00
s.Streams[streamKey] = stream
2023-11-30 22:59:51 +00:00
slog.Info("Added stream.", "stream_key", streamKey)
2023-11-30 18:32:38 +00:00
}()
answer := <-answerChan
2023-11-30 22:56:33 +00:00
2023-11-30 18:32:38 +00:00
return answer, nil
}
func (s *StreamStore) Get(streamKey string) (*Stream, error) {
stream, ok := s.Streams[streamKey]
if !ok {
return nil, ErrNoSuchStream
}
return stream, nil
}
func (s *StreamStore) Delete(streamKey string) error {
stream, ok := s.Streams[streamKey]
if !ok {
return nil
}
s.mu.Lock()
defer s.mu.Unlock()
delete(s.Streams, streamKey)
for _, peer := range stream.peers {
if err := peer.Close(); err != nil {
slog.Warn("Error closing peer.", "error", err)
}
}
return stream.peerConnection.Close()
}
func (s *StreamStore) List() []string {
streams := []string{}
for key := range s.Streams {
streams = append(streams, key)
}
return streams
}