From 86e9024a07cc8dbf662c3aff8b23b2e61b62ab4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torjus=20H=C3=A5kestad?= Date: Thu, 30 Nov 2023 23:56:33 +0100 Subject: [PATCH] Create working prototype --- Dockerfile | 2 ++ Taskfile.yaml | 17 ++++++++++++ main.go | 2 +- server/server.go | 13 +++++++++- server/static/script.js | 51 ++++++++++++++++++++---------------- server/stream.go | 57 ++++++++++++++++++++++++++++++++++------- 6 files changed, 109 insertions(+), 33 deletions(-) create mode 100644 Taskfile.yaml diff --git a/Dockerfile b/Dockerfile index 28aba1c..78098a3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,4 +8,6 @@ RUN go build -o ministream main.go FROM alpine:latest COPY --from=builder /app/ministream /usr/bin/ministream +EXPOSE 8080 +EXPOSE 50000-50050/udp CMD ["/usr/bin/ministream", "serve"] diff --git a/Taskfile.yaml b/Taskfile.yaml new file mode 100644 index 0000000..3931ef4 --- /dev/null +++ b/Taskfile.yaml @@ -0,0 +1,17 @@ +version: '3' + +tasks: + + default: task -l + + build: + desc: "Build image using podman" + cmd: podman build -t git.t-juice.club/torjus/ministream:latest . + + push: + desc: "Push image to git.t-juice.club/torjus/ministream" + deps: + - build + cmds: + - cmd: podman push git.t-juice.club/torjus/ministream:latest + \ No newline at end of file diff --git a/main.go b/main.go index d52d92e..6eaa582 100644 --- a/main.go +++ b/main.go @@ -9,7 +9,7 @@ import ( "github.com/urfave/cli/v2" ) -const Version = "v0.1.0" +const Version = "v0.1.1" func main() { app := cli.App{ diff --git a/server/server.go b/server/server.go index 9226ff0..29b174e 100644 --- a/server/server.go +++ b/server/server.go @@ -35,7 +35,9 @@ func NewServer(store *UserStore) *Server { r.Get("/{name}", srv.StaticHandler) r.Post("/whip", http.HandlerFunc(srv.WhipHandler)) r.Get("/whip", srv.ListHandler) + r.Options("/whip", srv.OptionsHandler) r.Delete("/whip/{streamKey}", srv.DeleteHandler) + r.Patch("/whip/{streamKey}", srv.PatchHandler) r.Post("/whip/{streamKey}", srv.PostOfferHandler) srv.Handler = r @@ -43,6 +45,14 @@ func NewServer(store *UserStore) *Server { return srv } +func (s *Server) OptionsHandler(w http.ResponseWriter, r *http.Request) { + slog.Info("Got OPTIONS") +} + +func (s *Server) PatchHandler(w http.ResponseWriter, r *http.Request) { + slog.Info("Got PATCH!") +} + func (s *Server) StaticHandler(w http.ResponseWriter, r *http.Request) { name := chi.URLParam(r, "name") if name == "" { @@ -93,7 +103,7 @@ func (s *Server) PostOfferHandler(w http.ResponseWriter, r *http.Request) { io.WriteString(w, answer.SDP) - slog.Info("Got offer.", "stream", stream) + slog.Info("Got listener for stream.", "stream_key", streamKey) } func (s *Server) DeleteHandler(w http.ResponseWriter, r *http.Request) { @@ -140,6 +150,7 @@ func (s *Server) WhipHandler(w http.ResponseWriter, r *http.Request) { } w.Header().Add("Location", fmt.Sprintf("/whip/%s", streamKey)) + w.Header().Add("Link", "stun:stun.l.google.com:19302; rel=\"ice-server\";") w.WriteHeader(http.StatusCreated) if _, err := io.WriteString(w, answer.SDP); err != nil { slog.Error("Error writing response.", "error", err) diff --git a/server/static/script.js b/server/static/script.js index cdf4b1b..84087d0 100644 --- a/server/static/script.js +++ b/server/static/script.js @@ -4,28 +4,35 @@ const startStream = function (streamKey) { }) pc.oniceconnectionstatechange = e => console.log(e) + pc.onicecandidate = e => { + console.log("Adding ice candidate: " + e.candidate); + if (!e.candidate) { + console.log("Done adding candidates. Creating offer."); + fetch("/whip/tjuice", { + method: "POST", + body: pc.localDescription.sdp + }).then(resp => { + resp.text().then(text => { + var answer = { + type: "answer", + sdp: text + } + try { + console.log("Setting remote description."); + pc.setRemoteDescription(answer) + } catch (e) { + console.log("Error setting remote description: " + e) + } + }) + }); + } + } + pc.createOffer().then(offer => { + console.log("Setting local description."); + pc.setLocalDescription(offer) + }) pc.addTransceiver('video') pc.addTransceiver('audio') - pc.createOffer().then(offer => { - fetch("/whip/tjuice", { - method: "POST", - body: offer.sdp - }).then(resp => { - resp.text().then(text => { - var answer = { - type: "answer", - sdp: text - } - try { - pc.setLocalDescription(offer).then( - pc.setRemoteDescription(answer) - ) - } catch (e) { - console.log("Error setting remote description: " + e) - } - }) - }); - }) pc.ontrack = function (event) { console.log(event) @@ -33,7 +40,7 @@ const startStream = function (streamKey) { var ms = new MediaStream() event.streams.forEach(s => { const tracks = s.getTracks() - tracks.forEach( t => { + tracks.forEach(t => { ms.addTrack(t) }) }) @@ -60,7 +67,7 @@ displayStreams = function () { }) } -window.onload = function ( ev ) { +window.onload = function (ev) { console.log(ev) displayStreams() } \ No newline at end of file diff --git a/server/stream.go b/server/stream.go index 7bbb64e..48cb072 100644 --- a/server/stream.go +++ b/server/stream.go @@ -75,7 +75,22 @@ func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDesc if err != nil { return nil, err } - gatherComplete := webrtc.GatheringCompletePromise(peerConnection) + + gatherComplete := make(chan struct{}) + peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) { + if i == nil { + gatherComplete <- struct{}{} + return + } + slog.Info("Got ICE Candidate for listener.", + "addr", i.Address, + "port", i.Port, + "related_addr", i.RelatedAddress, + "related_port", i.RelatedPort) + if err := peerConnection.AddICECandidate(i.ToJSON()); err != nil { + slog.Info("Error adding ICE Candidate.", "error", err) + } + }) // Sets the LocalDescription, and starts our UDP listeners err = peerConnection.SetLocalDescription(answer) @@ -126,8 +141,10 @@ func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*web }, } + se := webrtc.SettingEngine{} + _ = se.SetEphemeralUDPPortRange(50000, 50050) // Create a new RTCPeerConnection - peerConnection, err := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i)).NewPeerConnection(peerConnectionConfig) + peerConnection, err := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(se)).NewPeerConnection(peerConnectionConfig) if err != nil { panic(err) } @@ -146,7 +163,7 @@ func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*web // to connected peers peerConnection.OnTrack(func(remoteTrack *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { // Create a local track, all our SFU clients will be fed via this track - slog.Info("Got track!", "type", remoteTrack.Codec().MimeType, "id", remoteTrack.ID()) + slog.Info("Got track.", "stream_key", streamKey, "type", remoteTrack.Codec().MimeType, "id", remoteTrack.ID()) localTrack, newTrackErr := webrtc.NewTrackLocalStaticRTP(remoteTrack.Codec().RTPCodecCapability, "video", "pion") if newTrackErr != nil { panic(newTrackErr) @@ -181,7 +198,6 @@ func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*web if err != nil { panic(err) } - // Create answer answer, err := peerConnection.CreateAnswer(nil) if err != nil { @@ -189,7 +205,7 @@ func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*web } // Create channel that is blocked until ICE Gathering is complete - gatherComplete := webrtc.GatheringCompletePromise(peerConnection) + // gatherComplete := webrtc.GatheringCompletePromise(peerConnection) // Sets the LocalDescription, and starts our UDP listeners err = peerConnection.SetLocalDescription(answer) @@ -200,14 +216,37 @@ func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*web // Block until ICE Gathering is complete, disabling trickle ICE // we do this because we only can exchange one signaling message // in a production application you should exchange ICE Candidates via OnICECandidate - <-gatherComplete - slog.Info("ICE Gathering complete.", "answer", answer) - answerChan <- &answer + // answerChan <- &answer + peerConnection.OnICEConnectionStateChange(func(is webrtc.ICEConnectionState) { + slog.Info("ICE state changed.", "stream_key", streamKey, "ice_state", is) + }) + + gatherComplete := make(chan struct{}) + peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) { + if i == nil { + gatherComplete <- struct{}{} + return + } + slog.Info("Got ICE Candidate", + "addr", i.Address, + "port", i.Port, + "related_addr", i.RelatedAddress, + "related_port", i.RelatedPort) + if err := peerConnection.AddICECandidate(i.ToJSON()); err != nil { + slog.Info("Error adding ICE Candidate.", "error", err) + } + }) + + <-gatherComplete + slog.Info("ICE Gathering complete.", "stream_key", streamKey, "ice_state", peerConnection.ICEConnectionState()) + + answerChan <- peerConnection.CurrentLocalDescription() s.Streams[streamKey] = stream - slog.Info("Added stream.", "stream_key", streamKey) + slog.Info("Added stream.", "stream_key", streamKey, "answer", answer) }() answer := <-answerChan + return answer, nil }