diff --git a/go.mod b/go.mod index 4488ff9..013cb95 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.21.3 require ( github.com/go-chi/chi/v5 v5.0.10 + github.com/google/uuid v1.4.0 github.com/pelletier/go-toml/v2 v2.1.0 github.com/pion/interceptor v0.1.25 github.com/pion/sdp/v3 v3.0.6 @@ -15,7 +16,6 @@ require ( require ( github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/google/uuid v1.4.0 // indirect github.com/pion/datachannel v1.5.5 // indirect github.com/pion/dtls/v2 v2.2.8 // indirect github.com/pion/ice/v3 v3.0.2 // indirect diff --git a/ministream-frontend b/ministream-frontend index 717790c..ccbc8de 160000 --- a/ministream-frontend +++ b/ministream-frontend @@ -1 +1 @@ -Subproject commit 717790cb97390295e94d832fb8bc9cc83314812a +Subproject commit ccbc8de04d35c3b1317a1e421538e630bbab1b82 diff --git a/server/server.go b/server/server.go index 653af4e..1d54f5a 100644 --- a/server/server.go +++ b/server/server.go @@ -136,9 +136,18 @@ func (s *Server) DeleteHandler(w http.ResponseWriter, r *http.Request) { } func (s *Server) ListHandler(w http.ResponseWriter, r *http.Request) { - streams := s.streams.List() + type StreamInfo struct { + StreamKey string `json:"streamKey"` + ViewCount int `json:"viewCount"` + } + + infos := []StreamInfo{} + for key, stream := range s.streams.Streams { + infos = append(infos, StreamInfo{StreamKey: key, ViewCount: len(stream.viewers)}) + } + enc := json.NewEncoder(w) - enc.Encode(&streams) + enc.Encode(&infos) } func (s *Server) WhipHandler(w http.ResponseWriter, r *http.Request) { diff --git a/server/stream.go b/server/stream.go index 6337791..3b54083 100644 --- a/server/stream.go +++ b/server/stream.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/google/uuid" "github.com/pion/interceptor" "github.com/pion/interceptor/pkg/nack" "github.com/pion/interceptor/pkg/stats" @@ -42,7 +43,7 @@ type Stream struct { peerConnectionStatsMu sync.Mutex lastUpdate time.Time localTracks []*webrtc.TrackLocalStaticRTP - peers []*webrtc.PeerConnection + viewers map[string]*webrtc.PeerConnection mu sync.Mutex } @@ -72,6 +73,7 @@ func (s *StreamStore) StatsHandler(w http.ResponseWriter, r *http.Request) { } func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDescription, error) { + viewerID := uuid.New().String() m := &webrtc.MediaEngine{} if err := m.RegisterDefaultCodecs(); err != nil { panic(err) @@ -124,6 +126,10 @@ func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDesc rtpSender, err := peerConnection.AddTrack(ltrack) if err != nil { // TODO, stop peerconn + peerConnection.Close() + s.mu.Lock() + delete(s.viewers, viewerID) + s.mu.Unlock() return nil, err } go func() { @@ -131,6 +137,9 @@ func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDesc for { if _, _, err := rtpSender.Read(rtcpBuf); err != nil { peerConnection.Close() + s.mu.Lock() + delete(s.viewers, viewerID) + s.mu.Unlock() return } } @@ -173,7 +182,7 @@ func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDesc // in a production application you should exchange ICE Candidates via OnICECandidate <-gatherComplete s.mu.Lock() - s.peers = append(s.peers, peerConnection) + s.viewers[viewerID] = peerConnection defer s.mu.Unlock() @@ -189,6 +198,7 @@ func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*web store: s, lastUpdate: time.Now(), peerConnectionStats: make(map[string]*stats.Stats), + viewers: make(map[string]*webrtc.PeerConnection), } m := &webrtc.MediaEngine{} if err := m.RegisterDefaultCodecs(); err != nil { @@ -391,7 +401,7 @@ func (s *StreamStore) Delete(streamKey string) error { defer s.mu.Unlock() delete(s.Streams, streamKey) - for _, peer := range stream.peers { + for _, peer := range stream.viewers { if err := peer.Close(); err != nil { slog.Warn("Error closing peer.", "error", err) }