Compare commits
30 Commits
da4f319f61
...
v0.1.2
Author | SHA1 | Date | |
---|---|---|---|
567fb71ca7 | |||
cb33bc3555 | |||
94def554c3 | |||
5d76154a25 | |||
381523a2be | |||
5ec7086843 | |||
504d7bc16c | |||
836c5a3bbf | |||
6ac0efb057 | |||
565de48636 | |||
ea0f73b516 | |||
c0a3cadda4 | |||
634bad14d9 | |||
7fe24673ed | |||
cd209a3c74 | |||
f33a81d1f4 | |||
d346944e20 | |||
555ce22479 | |||
644edc33bd | |||
5cbfc14eab | |||
9fe5e68551 | |||
912cbfe8dd | |||
4b171b849f | |||
fd18ba6527 | |||
8cb0ede570 | |||
e1e96e1e19 | |||
af0f583aff | |||
20da2e5d30 | |||
18c515c887 | |||
c141e725b3 |
74
.gitea/workflows/go-test.yml
Normal file
74
.gitea/workflows/go-test.yml
Normal file
@@ -0,0 +1,74 @@
|
||||
name: Run go tests
|
||||
on:
|
||||
push:
|
||||
paths:
|
||||
- '**.go'
|
||||
pull_request:
|
||||
paths:
|
||||
- '**.go'
|
||||
|
||||
env:
|
||||
GOPATH: /go_path
|
||||
GOCACHE: /go_cache
|
||||
|
||||
jobs:
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 5
|
||||
env:
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Go
|
||||
uses: https://github.com/actions/setup-go@v4
|
||||
with:
|
||||
go-version: 1.21
|
||||
use-latest: true
|
||||
|
||||
- name: Hash go.mod and go.sum
|
||||
uses: https://gitea.com/actions/go-hashfiles@v0.0.1
|
||||
id: hash-go
|
||||
with:
|
||||
patterns: |
|
||||
go.mod
|
||||
go.sum
|
||||
|
||||
- name: Cache go
|
||||
uses: actions/cache@v3
|
||||
id: cache-go
|
||||
with:
|
||||
path: |
|
||||
/go_path
|
||||
/go_cache
|
||||
key: cache-go-${{ steps.hash-go.outputs.hash }}
|
||||
|
||||
- name: Go mod download
|
||||
run: go mod download
|
||||
|
||||
- name: Go mod verify
|
||||
run: go mod verify
|
||||
|
||||
- name: Go mod tidy
|
||||
run: go mod tidy && git diff --exit-code
|
||||
|
||||
- name: Check gofmt
|
||||
run: gofmt -s -w . && git diff --exit-code
|
||||
|
||||
- name: Check go vet
|
||||
run: go vet ./...
|
||||
|
||||
- name: Build
|
||||
run: go build -v ./...
|
||||
|
||||
- name: Test
|
||||
run: go test -cover -coverprofile=/tmp/cov.out -v ./...
|
||||
|
||||
- name: Make coverage report
|
||||
run: go tool cover -html=/tmp/cov.out -o /tmp/cov.html
|
||||
|
||||
- name: Publish coverage report
|
||||
uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: coverage-report
|
||||
path: /tmp/cov.html
|
||||
|
56
.gitea/workflows/release-nightly.yml
Normal file
56
.gitea/workflows/release-nightly.yml
Normal file
@@ -0,0 +1,56 @@
|
||||
name: release-nightly
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [ master ]
|
||||
|
||||
env:
|
||||
REGISTRY: git.t-juice.club
|
||||
IMAGE_NAME: ${{ gitea.repository }}
|
||||
DOCKER_TAG: nightly
|
||||
|
||||
jobs:
|
||||
release-image:
|
||||
runs-on: ubuntu-latest
|
||||
container:
|
||||
image: ghcr.io/catthehacker/ubuntu:act-latest
|
||||
env:
|
||||
DOCKER_ORG: torjus
|
||||
DOCKER_TAG: nightly
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0 # all history for all branches and tags
|
||||
submodules: true
|
||||
github-server-url: 'https://git.t-juice.club'
|
||||
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Set up Docker BuildX
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
- name: Login to git.t-juice.club registry
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: git.t-juice.club
|
||||
username: ${{ gitea.actor }}
|
||||
password: ${{ secrets.DOCKER_PUSH }}
|
||||
|
||||
- name: Get Meta
|
||||
id: meta
|
||||
run: |
|
||||
echo REPO_NAME=$(echo ${GITHUB_REPOSITORY} | awk -F"/" '{print $2}') >> $GITHUB_OUTPUT
|
||||
echo REPO_VERSION=$(git describe --tags --always | sed 's/^v//') >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Build and push
|
||||
uses: docker/build-push-action@v5
|
||||
with:
|
||||
context: .
|
||||
file: ./Dockerfile
|
||||
platforms: |
|
||||
linux/amd64
|
||||
push: true
|
||||
tags: |
|
||||
${{ env.REGISTRY }}/${{ gitea.repository }}:${{ env.DOCKER_TAG }}
|
58
.gitea/workflows/release-tag.yml
Normal file
58
.gitea/workflows/release-tag.yml
Normal file
@@ -0,0 +1,58 @@
|
||||
name: release-tag
|
||||
|
||||
on:
|
||||
push:
|
||||
tags:
|
||||
- '*'
|
||||
|
||||
env:
|
||||
REGISTRY: git.t-juice.club
|
||||
IMAGE_NAME: ${{ gitea.repository }}
|
||||
DOCKER_TAG: latest
|
||||
|
||||
jobs:
|
||||
release-image:
|
||||
runs-on: ubuntu-latest
|
||||
container:
|
||||
image: ghcr.io/catthehacker/ubuntu:act-latest
|
||||
env:
|
||||
DOCKER_ORG: torjus
|
||||
DOCKER_TAG: nightly
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0 # all history for all branches and tags
|
||||
submodules: true
|
||||
github-server-url: 'https://git.t-juice.club'
|
||||
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Set up Docker BuildX
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
- name: Login to git.t-juice.club registry
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: git.t-juice.club
|
||||
username: ${{ gitea.actor }}
|
||||
password: ${{ secrets.DOCKER_PUSH }}
|
||||
|
||||
- name: Get Meta
|
||||
id: meta
|
||||
run: |
|
||||
echo REPO_NAME=$(echo ${GITHUB_REPOSITORY} | awk -F"/" '{print $2}') >> $GITHUB_OUTPUT
|
||||
echo REPO_VERSION=$(git describe --tags --always | sed 's/^v//') >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Build and push
|
||||
uses: docker/build-push-action@v5
|
||||
with:
|
||||
context: .
|
||||
file: ./Dockerfile
|
||||
platforms: |
|
||||
linux/amd64
|
||||
push: true
|
||||
tags: |
|
||||
${{ env.REGISTRY }}/${{ gitea.repository }}:${{ steps.meta.outputs.REPO_VERSION }}
|
||||
${{ env.REGISTRY }}/${{ gitea.repository }}:${{ env.DOCKER_TAG }}
|
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,3 +1,3 @@
|
||||
[submodule "ministream-frontend"]
|
||||
path = ministream-frontend
|
||||
url = ssh://git@git.t-juice.club:222/torjus/ministream-frontend.git
|
||||
url = https://git.t-juice.club/torjus/ministream-frontend.git
|
||||
|
9
go.mod
9
go.mod
@@ -4,6 +4,11 @@ go 1.21.3
|
||||
|
||||
require (
|
||||
github.com/go-chi/chi/v5 v5.0.10
|
||||
github.com/google/uuid v1.4.0
|
||||
github.com/pelletier/go-toml/v2 v2.1.0
|
||||
github.com/pion/interceptor v0.1.25
|
||||
github.com/pion/sdp/v3 v3.0.6
|
||||
github.com/pion/webrtc/v4 v4.0.0-beta.7
|
||||
github.com/urfave/cli/v2 v2.25.7
|
||||
golang.org/x/crypto v0.15.0
|
||||
)
|
||||
@@ -11,24 +16,20 @@ require (
|
||||
require (
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/google/uuid v1.4.0 // indirect
|
||||
github.com/pion/datachannel v1.5.5 // indirect
|
||||
github.com/pion/dtls/v2 v2.2.8 // indirect
|
||||
github.com/pion/ice/v3 v3.0.2 // indirect
|
||||
github.com/pion/interceptor v0.1.25 // indirect
|
||||
github.com/pion/logging v0.2.2 // indirect
|
||||
github.com/pion/mdns v0.0.9 // indirect
|
||||
github.com/pion/randutil v0.1.0 // indirect
|
||||
github.com/pion/rtcp v1.2.12 // indirect
|
||||
github.com/pion/rtp v1.8.3 // indirect
|
||||
github.com/pion/sctp v1.8.9 // indirect
|
||||
github.com/pion/sdp/v3 v3.0.6 // indirect
|
||||
github.com/pion/srtp/v3 v3.0.1 // indirect
|
||||
github.com/pion/stun/v2 v2.0.0 // indirect
|
||||
github.com/pion/transport/v2 v2.2.4 // indirect
|
||||
github.com/pion/transport/v3 v3.0.1 // indirect
|
||||
github.com/pion/turn/v3 v3.0.1 // indirect
|
||||
github.com/pion/webrtc/v4 v4.0.0-beta.7 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/russross/blackfriday/v2 v2.1.0 // indirect
|
||||
github.com/stretchr/testify v1.8.4 // indirect
|
||||
|
5
go.sum
5
go.sum
@@ -25,8 +25,10 @@ github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
|
||||
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
|
||||
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
||||
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
|
||||
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
|
||||
@@ -37,6 +39,8 @@ github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042
|
||||
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
|
||||
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
|
||||
github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
|
||||
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
|
||||
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
|
||||
github.com/pion/datachannel v1.5.5 h1:10ef4kwdjije+M9d7Xm9im2Y3O6A6ccQb0zcqZcJew8=
|
||||
github.com/pion/datachannel v1.5.5/go.mod h1:iMz+lECmfdCMqFRhXhcA/219B0SQlbpoR2V118yimL0=
|
||||
github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s=
|
||||
@@ -194,6 +198,7 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
|
||||
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
|
||||
|
53
main.go
53
main.go
@@ -2,14 +2,23 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"os"
|
||||
|
||||
"git.t-juice.club/torjus/ministream/server"
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
const Version = "v0.1.1"
|
||||
const Version = "v0.1.2"
|
||||
|
||||
type ctxKey string
|
||||
|
||||
const (
|
||||
ctxKeyConfig ctxKey = "config"
|
||||
)
|
||||
|
||||
func main() {
|
||||
app := cli.App{
|
||||
@@ -17,14 +26,18 @@ func main() {
|
||||
Usage: "Small livestreaming platform.",
|
||||
Commands: []*cli.Command{
|
||||
{
|
||||
Name: "serve",
|
||||
Usage: "Start livestreaming server.",
|
||||
Action: func(ctx *cli.Context) error {
|
||||
store := server.NewUserStore()
|
||||
Name: "serve",
|
||||
Usage: "Start livestreaming server.",
|
||||
Before: loadConfig,
|
||||
Action: func(c *cli.Context) error {
|
||||
cfg := configFromCtx(c.Context)
|
||||
|
||||
srv := server.NewServer(store)
|
||||
srv.Addr = ":8080"
|
||||
srv.ListenAndServe()
|
||||
srv := server.NewServer(cfg)
|
||||
srv.Addr = cfg.HTTP.ListenAddr
|
||||
slog.Info("Starting HTTP-server", "addr", srv.Addr, "cfg", cfg)
|
||||
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
@@ -37,3 +50,27 @@ func main() {
|
||||
fmt.Println(err)
|
||||
}
|
||||
}
|
||||
|
||||
func loadConfig(c *cli.Context) error {
|
||||
cfg, err := server.ConfigFromDefault()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.Context = context.WithValue(c.Context, ctxKeyConfig, cfg)
|
||||
return nil
|
||||
}
|
||||
|
||||
func configFromCtx(ctx context.Context) *server.Config {
|
||||
value := ctx.Value(ctxKeyConfig)
|
||||
if value == nil {
|
||||
panic("unable to load config")
|
||||
}
|
||||
|
||||
config, ok := value.(*server.Config)
|
||||
if !ok {
|
||||
panic("config type assertion failed")
|
||||
}
|
||||
|
||||
return config
|
||||
}
|
||||
|
Submodule ministream-frontend updated: e26f555300...2e96de56e6
21
ministream.toml
Normal file
21
ministream.toml
Normal file
@@ -0,0 +1,21 @@
|
||||
# SiteName is used for displaying title of site on frontend.
|
||||
# Default: "ministream"
|
||||
# Env: MINISTREAM_SITENAME
|
||||
SiteName = "stream.example.org"
|
||||
|
||||
[HTTP]
|
||||
# HTTPListenAddr is which port the HTTP server will listen on.
|
||||
# Default: ":8080"
|
||||
# Env: MINISTREAM_HTTP_LISTENADDR
|
||||
ListenAddr = ":8080"
|
||||
|
||||
[WebRTC]
|
||||
# UDPMin is the minimum used for ephemeral ports.
|
||||
# Default: 50000
|
||||
# Env: MINISTREAM_WEBRTC_UDPMIN
|
||||
UDPMin = 50000
|
||||
|
||||
# UDPMax is the maximum used for ephemeral ports.
|
||||
# Default: 50050
|
||||
# Env: MINISTREAM_WEBRTC_UDPMAX
|
||||
UDPMAX = 50050
|
107
server/config.go
Normal file
107
server/config.go
Normal file
@@ -0,0 +1,107 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"io"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
"github.com/pelletier/go-toml/v2"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
SiteName string `toml:"siteName"`
|
||||
HTTP ConfigHTTP `toml:"http"`
|
||||
WebRTC ConfigWebRTC `toml:"WebRTC"`
|
||||
}
|
||||
|
||||
type ConfigHTTP struct {
|
||||
ListenAddr string `json:"ListenAddr" toml:"ListenAddr"`
|
||||
}
|
||||
|
||||
type ConfigWebRTC struct {
|
||||
UDPMin int `toml:"UDPMin"`
|
||||
UDPMax int `toml:"UDPMax"`
|
||||
}
|
||||
|
||||
func DefaultConfig() *Config {
|
||||
return &Config{
|
||||
SiteName: "ministream",
|
||||
HTTP: ConfigHTTP{
|
||||
ListenAddr: ":8080",
|
||||
},
|
||||
WebRTC: ConfigWebRTC{
|
||||
UDPMin: 50000,
|
||||
UDPMax: 50050,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Config) OverrideFromEnv() {
|
||||
if siteName, ok := os.LookupEnv("MINISTREAM_SITENAME"); ok {
|
||||
c.SiteName = siteName
|
||||
}
|
||||
|
||||
if httpAddr, ok := os.LookupEnv("MINISTREAM_HTTP_LISTENADDR"); ok {
|
||||
c.HTTP.ListenAddr = httpAddr
|
||||
}
|
||||
|
||||
if value, ok := os.LookupEnv("MINISTREAM_WEBRTC_UDPMIN"); ok {
|
||||
min, err := strconv.Atoi(value)
|
||||
if err != nil {
|
||||
panic("MINISTREAM_WEBRTC_UDPMIN is invalid")
|
||||
}
|
||||
c.WebRTC.UDPMin = min
|
||||
}
|
||||
if value, ok := os.LookupEnv("MINISTREAM_WEBRTC_UDPMAX"); ok {
|
||||
max, err := strconv.Atoi(value)
|
||||
if err != nil {
|
||||
panic("MINISTREAM_WEBRTC_UDPMAX is invalid")
|
||||
}
|
||||
c.WebRTC.UDPMin = max
|
||||
}
|
||||
}
|
||||
|
||||
func ConfigFromReader(r io.Reader) (*Config, error) {
|
||||
var c Config
|
||||
|
||||
err := toml.NewDecoder(r).Decode(&c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &c, nil
|
||||
}
|
||||
|
||||
func ConfigFromFile(path string) (*Config, error) {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
return ConfigFromReader(f)
|
||||
}
|
||||
|
||||
func ConfigFromDefault() (*Config, error) {
|
||||
var config *Config
|
||||
defaultPaths := []string{
|
||||
"ministream.toml",
|
||||
}
|
||||
|
||||
for _, p := range defaultPaths {
|
||||
c, err := ConfigFromFile(p)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
config = c
|
||||
break
|
||||
}
|
||||
|
||||
if config == nil {
|
||||
config = DefaultConfig()
|
||||
}
|
||||
|
||||
config.OverrideFromEnv()
|
||||
|
||||
return config, nil
|
||||
}
|
37
server/config_test.go
Normal file
37
server/config_test.go
Normal file
@@ -0,0 +1,37 @@
|
||||
package server_test
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"git.t-juice.club/torjus/ministream/server"
|
||||
)
|
||||
|
||||
func TestConfig(t *testing.T) {
|
||||
t.Run("FromReader", func(t *testing.T) {
|
||||
configString := `SiteName = "ministream.example.org"`
|
||||
expectedSiteName := "ministream.example.org"
|
||||
r := strings.NewReader(configString)
|
||||
|
||||
c, err := server.ConfigFromReader(r)
|
||||
if err != nil {
|
||||
t.Fatalf("Error reading config: %s", err)
|
||||
}
|
||||
|
||||
if c.SiteName != expectedSiteName {
|
||||
t.Errorf("SiteName incorrect. Got %s want %s", c.SiteName, expectedSiteName)
|
||||
}
|
||||
})
|
||||
t.Run("OverrideFromEnv", func(t *testing.T) {
|
||||
c := server.DefaultConfig()
|
||||
expectedSiteName := "ms.example.org"
|
||||
|
||||
t.Setenv("MINISTREAM_SITENAME", expectedSiteName)
|
||||
|
||||
c.OverrideFromEnv()
|
||||
|
||||
if c.SiteName != expectedSiteName {
|
||||
t.Errorf("SiteName incorrect. Got %s want %s", c.SiteName, expectedSiteName)
|
||||
}
|
||||
})
|
||||
}
|
@@ -19,15 +19,15 @@ import (
|
||||
var static embed.FS
|
||||
|
||||
type Server struct {
|
||||
users *UserStore
|
||||
streams *StreamStore
|
||||
config *Config
|
||||
http.Server
|
||||
}
|
||||
|
||||
func NewServer(store *UserStore) *Server {
|
||||
func NewServer(config *Config) *Server {
|
||||
srv := &Server{
|
||||
users: store,
|
||||
streams: NewStreamStore(),
|
||||
streams: NewStreamStore(config),
|
||||
config: config,
|
||||
}
|
||||
|
||||
r := chi.NewRouter()
|
||||
@@ -40,6 +40,8 @@ func NewServer(store *UserStore) *Server {
|
||||
r.Delete("/whip/{streamKey}", srv.DeleteHandler)
|
||||
r.Patch("/whip/{streamKey}", srv.PatchHandler)
|
||||
r.Post("/whip/{streamKey}", srv.PostOfferHandler)
|
||||
r.Get("/stats", srv.streams.StatsHandler)
|
||||
r.Get("/api/siteinfo", srv.InfoHandler)
|
||||
|
||||
srv.Handler = r
|
||||
|
||||
@@ -54,6 +56,17 @@ func corsMiddleware(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(fn)
|
||||
}
|
||||
|
||||
func (s *Server) InfoHandler(w http.ResponseWriter, r *http.Request) {
|
||||
var infoResponse struct {
|
||||
SiteName string `json:"siteName"`
|
||||
}
|
||||
infoResponse.SiteName = s.config.SiteName
|
||||
|
||||
if err := json.NewEncoder(w).Encode(&infoResponse); err != nil {
|
||||
slog.Warn("Error writing info response")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) OptionsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
slog.Info("Got OPTIONS")
|
||||
}
|
||||
@@ -123,9 +136,18 @@ func (s *Server) DeleteHandler(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func (s *Server) ListHandler(w http.ResponseWriter, r *http.Request) {
|
||||
streams := s.streams.List()
|
||||
type StreamInfo struct {
|
||||
StreamKey string `json:"streamKey"`
|
||||
ViewCount int `json:"viewCount"`
|
||||
}
|
||||
|
||||
infos := []StreamInfo{}
|
||||
for key, stream := range s.streams.Streams {
|
||||
infos = append(infos, StreamInfo{StreamKey: key, ViewCount: len(stream.viewers)})
|
||||
}
|
||||
|
||||
enc := json.NewEncoder(w)
|
||||
enc.Encode(&streams)
|
||||
enc.Encode(&infos)
|
||||
}
|
||||
|
||||
func (s *Server) WhipHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
174
server/stream.go
174
server/stream.go
@@ -1,15 +1,19 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/pion/interceptor"
|
||||
"github.com/pion/interceptor/pkg/intervalpli"
|
||||
"github.com/pion/interceptor/pkg/nack"
|
||||
"github.com/pion/interceptor/pkg/stats"
|
||||
"github.com/pion/webrtc/v4"
|
||||
)
|
||||
|
||||
@@ -17,27 +21,93 @@ var ErrNoSuchStream error = fmt.Errorf("no such stream")
|
||||
|
||||
type StreamStore struct {
|
||||
Streams map[string]*Stream
|
||||
mu sync.Mutex
|
||||
|
||||
config *Config
|
||||
webRTCConfig webrtc.Configuration
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func NewStreamStore() *StreamStore {
|
||||
func NewStreamStore(config *Config) *StreamStore {
|
||||
s := &StreamStore{
|
||||
Streams: make(map[string]*Stream),
|
||||
config: config,
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
type Stream struct {
|
||||
peerConnection *webrtc.PeerConnection
|
||||
lastUpdate time.Time
|
||||
localTracks []*webrtc.TrackLocalStaticRTP
|
||||
peers []*webrtc.PeerConnection
|
||||
mu sync.Mutex
|
||||
store *StreamStore
|
||||
peerConnection *webrtc.PeerConnection
|
||||
peerConnectionStats map[string]*stats.Stats
|
||||
peerConnectionStatsMu sync.Mutex
|
||||
lastUpdate time.Time
|
||||
localTracks []*webrtc.TrackLocalStaticRTP
|
||||
viewers map[string]*webrtc.PeerConnection
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func (s *StreamStore) StatsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
type StreamStats struct {
|
||||
StreamKey string `json:"streamKey"`
|
||||
StatsInbound stats.InboundRTPStreamStats `json:"inboudStats"`
|
||||
}
|
||||
stats := []StreamStats{}
|
||||
|
||||
for streamKey, stream := range s.Streams {
|
||||
stream.peerConnectionStatsMu.Lock()
|
||||
for track, stat := range stream.peerConnectionStats {
|
||||
stats = append(stats, StreamStats{
|
||||
StreamKey: fmt.Sprintf("%s-%s", streamKey, track),
|
||||
StatsInbound: stat.InboundRTPStreamStats,
|
||||
})
|
||||
}
|
||||
stream.peerConnectionStatsMu.Unlock()
|
||||
}
|
||||
|
||||
w.Header().Add("Content-Type", "application/json")
|
||||
enc := json.NewEncoder(w)
|
||||
if err := enc.Encode(stats); err != nil {
|
||||
slog.Warn("Error encoding stats: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
|
||||
peerConnectionConfig := webrtc.Configuration{
|
||||
viewerID := uuid.New().String()
|
||||
m := &webrtc.MediaEngine{}
|
||||
if err := m.RegisterDefaultCodecs(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
i := &interceptor.Registry{}
|
||||
if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// intervalPliFactory, err := intervalpli.NewReceiverInterceptor()
|
||||
// if err != nil {
|
||||
// panic(err)
|
||||
// }
|
||||
// i.Add(intervalPliFactory)
|
||||
|
||||
// Stats interceptor
|
||||
//statsInterceptorFactory, err := stats.NewInterceptor()
|
||||
//if err != nil {
|
||||
// panic(err)
|
||||
//}
|
||||
|
||||
//var statsGetter stats.Getter
|
||||
//statsInterceptorFactory.OnNewPeerConnection(func(_ string, g stats.Getter) {
|
||||
// statsGetter = g
|
||||
//})
|
||||
//i.Add(statsInterceptorFactory)
|
||||
|
||||
se := webrtc.SettingEngine{}
|
||||
_ = se.SetEphemeralUDPPortRange(
|
||||
uint16(s.store.config.WebRTC.UDPMin),
|
||||
uint16(s.store.config.WebRTC.UDPMax),
|
||||
)
|
||||
|
||||
webRTCConfig := webrtc.Configuration{
|
||||
ICEServers: []webrtc.ICEServer{
|
||||
{
|
||||
URLs: []string{"stun:stun.l.google.com:19302"},
|
||||
@@ -45,7 +115,9 @@ func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDesc
|
||||
},
|
||||
}
|
||||
|
||||
peerConnection, err := webrtc.NewPeerConnection(peerConnectionConfig)
|
||||
api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(se))
|
||||
|
||||
peerConnection, err := api.NewPeerConnection(webRTCConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -54,6 +126,10 @@ func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDesc
|
||||
rtpSender, err := peerConnection.AddTrack(ltrack)
|
||||
if err != nil {
|
||||
// TODO, stop peerconn
|
||||
peerConnection.Close()
|
||||
s.mu.Lock()
|
||||
delete(s.viewers, viewerID)
|
||||
s.mu.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
go func() {
|
||||
@@ -61,6 +137,9 @@ func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDesc
|
||||
for {
|
||||
if _, _, err := rtpSender.Read(rtcpBuf); err != nil {
|
||||
peerConnection.Close()
|
||||
s.mu.Lock()
|
||||
delete(s.viewers, viewerID)
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -103,7 +182,7 @@ func (s *Stream) AddListener(sd *webrtc.SessionDescription) (*webrtc.SessionDesc
|
||||
// in a production application you should exchange ICE Candidates via OnICECandidate
|
||||
<-gatherComplete
|
||||
s.mu.Lock()
|
||||
s.peers = append(s.peers, peerConnection)
|
||||
s.viewers[viewerID] = peerConnection
|
||||
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@@ -116,35 +195,68 @@ func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*web
|
||||
|
||||
go func() {
|
||||
stream := &Stream{
|
||||
lastUpdate: time.Now(),
|
||||
store: s,
|
||||
lastUpdate: time.Now(),
|
||||
peerConnectionStats: make(map[string]*stats.Stats),
|
||||
viewers: make(map[string]*webrtc.PeerConnection),
|
||||
}
|
||||
m := &webrtc.MediaEngine{}
|
||||
if err := m.RegisterDefaultCodecs(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
i := &interceptor.Registry{}
|
||||
if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
intervalPliFactory, err := intervalpli.NewReceiverInterceptor()
|
||||
|
||||
// intervalPliFactory, err := intervalpli.NewReceiverInterceptor()
|
||||
// if err != nil {
|
||||
// panic(err)
|
||||
// }
|
||||
// i.Add(intervalPliFactory)
|
||||
|
||||
// Stats interceptor
|
||||
statsInterceptorFactory, err := stats.NewInterceptor()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
i.Add(intervalPliFactory)
|
||||
|
||||
peerConnectionConfig := webrtc.Configuration{
|
||||
var statsGetter stats.Getter
|
||||
statsInterceptorFactory.OnNewPeerConnection(func(_ string, g stats.Getter) {
|
||||
statsGetter = g
|
||||
})
|
||||
i.Add(statsInterceptorFactory)
|
||||
|
||||
// NACK interceptor
|
||||
maxPerPacket := nack.GeneratorMaxNacksPerPacket(4)
|
||||
nackSize := nack.GeneratorSize(4096)
|
||||
nackinterceptorFactory, err := nack.NewGeneratorInterceptor(maxPerPacket, nackSize)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
i.Add(nackinterceptorFactory)
|
||||
|
||||
s.webRTCConfig = webrtc.Configuration{
|
||||
ICEServers: []webrtc.ICEServer{
|
||||
{
|
||||
URLs: []string{"stun:stun.l.google.com:19302"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
se := webrtc.SettingEngine{}
|
||||
_ = se.SetEphemeralUDPPortRange(50000, 50050)
|
||||
_ = se.SetEphemeralUDPPortRange(uint16(s.config.WebRTC.UDPMin), uint16(s.config.WebRTC.UDPMax))
|
||||
|
||||
// se.BufferFactory = func(packetType packetio.BufferPacketType, ssrc uint32) io.ReadWriteCloser {
|
||||
// buf := packetio.NewBuffer()
|
||||
// buf.SetLimitSize(32 * 1000 * 1000)
|
||||
|
||||
// return buf
|
||||
// }
|
||||
|
||||
api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(se))
|
||||
|
||||
// Create a new RTCPeerConnection
|
||||
peerConnection, err := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(se)).NewPeerConnection(peerConnectionConfig)
|
||||
peerConnection, err := api.NewPeerConnection(s.webRTCConfig)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
@@ -161,6 +273,9 @@ func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*web
|
||||
|
||||
// Set a handler for when a new remote track starts, this just distributes all our packets
|
||||
// to connected peers
|
||||
peerConnection.OnDataChannel(func(dc *webrtc.DataChannel) {
|
||||
slog.Info("Got data channel.", "stream_key", streamKey, "label", dc.Label())
|
||||
})
|
||||
peerConnection.OnTrack(func(remoteTrack *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
|
||||
// Create a local track, all our SFU clients will be fed via this track
|
||||
slog.Info("Got track.", "stream_key", streamKey, "type", remoteTrack.Codec().MimeType, "id", remoteTrack.ID())
|
||||
@@ -168,17 +283,34 @@ func (s *StreamStore) Add(streamKey string, sd *webrtc.SessionDescription) (*web
|
||||
if newTrackErr != nil {
|
||||
panic(newTrackErr)
|
||||
}
|
||||
var stop bool
|
||||
|
||||
go func() {
|
||||
for {
|
||||
if stop {
|
||||
slog.Info("Stopped collecting stats for track.", "stream_key", streamKey, "type", remoteTrack.Codec().MimeType, "id", remoteTrack.ID())
|
||||
return
|
||||
}
|
||||
stats := statsGetter.Get(uint32(remoteTrack.SSRC()))
|
||||
stream.peerConnectionStatsMu.Lock()
|
||||
stream.peerConnectionStats[remoteTrack.Codec().MimeType] = stats
|
||||
stream.peerConnectionStatsMu.Unlock()
|
||||
|
||||
time.Sleep(time.Second * 1)
|
||||
}
|
||||
}()
|
||||
|
||||
stream.mu.Lock()
|
||||
stream.localTracks = append(stream.localTracks, localTrack)
|
||||
stream.mu.Unlock()
|
||||
|
||||
rtpBuf := make([]byte, 1400)
|
||||
rtpBuf := make([]byte, 1500)
|
||||
for {
|
||||
i, _, readErr := remoteTrack.Read(rtpBuf)
|
||||
if readErr != nil {
|
||||
if errors.Is(readErr, io.EOF) {
|
||||
slog.Warn("EOF from track.", "id", remoteTrack.ID())
|
||||
stop = true
|
||||
return
|
||||
}
|
||||
panic(readErr)
|
||||
@@ -269,7 +401,7 @@ func (s *StreamStore) Delete(streamKey string) error {
|
||||
defer s.mu.Unlock()
|
||||
delete(s.Streams, streamKey)
|
||||
|
||||
for _, peer := range stream.peers {
|
||||
for _, peer := range stream.viewers {
|
||||
if err := peer.Close(); err != nil {
|
||||
slog.Warn("Error closing peer.", "error", err)
|
||||
}
|
||||
|
Reference in New Issue
Block a user