Add viewcount to stream list endpoint
Some checks are pending
Run go tests / test (push) Waiting to run

This commit is contained in:
2023-12-06 00:08:14 +01:00
parent 7fe24673ed
commit 634bad14d9
4 changed files with 26 additions and 7 deletions

View File

@@ -136,9 +136,18 @@ func (s *Server) DeleteHandler(w http.ResponseWriter, r *http.Request) {
}
func (s *Server) ListHandler(w http.ResponseWriter, r *http.Request) {
streams := s.streams.List()
type StreamInfo struct {
StreamKey string `json:"streamKey"`
ViewCount int `json:"viewCount"`
}
infos := []StreamInfo{}
for key, stream := range s.streams.Streams {
infos = append(infos, StreamInfo{StreamKey: key, ViewCount: len(stream.viewers)})
}
enc := json.NewEncoder(w)
enc.Encode(&streams)
enc.Encode(&infos)
}
func (s *Server) WhipHandler(w http.ResponseWriter, r *http.Request) {

View File

@@ -10,6 +10,7 @@ import (
"sync"
"time"
"github.com/google/uuid"
"github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/nack"
"github.com/pion/interceptor/pkg/stats"
@@ -42,7 +43,7 @@ type Stream struct {
peerConnectionStatsMu sync.Mutex
lastUpdate time.Time
localTracks []*webrtc.TrackLocalStaticRTP
peers []*webrtc.PeerConnection
viewers map[string]*webrtc.PeerConnection
mu sync.Mutex
}
@@ -72,6 +73,7 @@ func (s *StreamStore) StatsHandler(w http.ResponseWriter, r *http.Request) {
}
func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
viewerID := uuid.New().String()
m := &webrtc.MediaEngine{}
if err := m.RegisterDefaultCodecs(); err != nil {
panic(err)
@@ -124,6 +126,10 @@ func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDesc
rtpSender, err := peerConnection.AddTrack(ltrack)
if err != nil {
// TODO, stop peerconn
peerConnection.Close()
s.mu.Lock()
delete(s.viewers, viewerID)
s.mu.Unlock()
return nil, err
}
go func() {
@@ -131,6 +137,9 @@ func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDesc
for {
if _, _, err := rtpSender.Read(rtcpBuf); err != nil {
peerConnection.Close()
s.mu.Lock()
delete(s.viewers, viewerID)
s.mu.Unlock()
return
}
}
@@ -173,7 +182,7 @@ func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDesc
// in a production application you should exchange ICE Candidates via OnICECandidate
<-gatherComplete
s.mu.Lock()
s.peers = append(s.peers, peerConnection)
s.viewers[viewerID] = peerConnection
defer s.mu.Unlock()
@@ -189,6 +198,7 @@ func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*web
store: s,
lastUpdate: time.Now(),
peerConnectionStats: make(map[string]*stats.Stats),
viewers: make(map[string]*webrtc.PeerConnection),
}
m := &webrtc.MediaEngine{}
if err := m.RegisterDefaultCodecs(); err != nil {
@@ -391,7 +401,7 @@ func (s *StreamStore) Delete(streamKey string) error {
defer s.mu.Unlock()
delete(s.Streams, streamKey)
for _, peer := range stream.peers {
for _, peer := range stream.viewers {
if err := peer.Close(); err != nil {
slog.Warn("Error closing peer.", "error", err)
}