Add metrics
This commit is contained in:
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/dustin/go-humanize"
|
||||
"github.com/nareix/joy5/av"
|
||||
"github.com/nareix/joy5/format/rtmp"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@@ -266,7 +267,7 @@ type RTMPServer struct {
|
||||
ListenAddr string
|
||||
Hostname string
|
||||
Logger *zap.SugaredLogger
|
||||
streamsLock sync.Mutex
|
||||
streamsLock sync.RWMutex
|
||||
streams map[string]*Stream
|
||||
ctx context.Context
|
||||
ctxCancel context.CancelFunc
|
||||
@@ -336,6 +337,75 @@ func (rs *RTMPServer) Listen() error {
|
||||
}
|
||||
}
|
||||
|
||||
func (rs *RTMPServer) EnableMetrics() {
|
||||
rs.Logger.Debug("Enabling metrics-collection for RTMP.")
|
||||
currentStreams := prometheus.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "dogtamer_current_streams",
|
||||
Help: "Currently running streams.",
|
||||
},
|
||||
)
|
||||
prometheus.MustRegister(currentStreams)
|
||||
currentClients := prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "dogtamer_current_clients",
|
||||
Help: "Currently connected clients.",
|
||||
},
|
||||
[]string{"stream_name"},
|
||||
)
|
||||
prometheus.MustRegister(currentClients)
|
||||
bytesReceived := prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "dogtamer_bytes_received",
|
||||
Help: "Bytes received from streams.",
|
||||
},
|
||||
[]string{"stream_name"},
|
||||
)
|
||||
prometheus.MustRegister(bytesReceived)
|
||||
bytesSent := prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "dogtamer_bytes_sent",
|
||||
Help: "Bytes received sent to clients.",
|
||||
},
|
||||
[]string{"stream_name"},
|
||||
)
|
||||
prometheus.MustRegister(bytesSent)
|
||||
droppedPackets := prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "dogtamer_dropped_packets",
|
||||
Help: "Packets which got dropped instead of sent to clients.",
|
||||
},
|
||||
[]string{"stream_name"},
|
||||
)
|
||||
prometheus.MustRegister(droppedPackets)
|
||||
|
||||
go func() {
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
rs.Logger.Debug("RTMP metric collection started.")
|
||||
for {
|
||||
select {
|
||||
case <-rs.ctx.Done():
|
||||
rs.Logger.Debug("RTMP metric collection stopped.")
|
||||
return
|
||||
case <-ticker.C:
|
||||
}
|
||||
|
||||
currentStreams.Set(float64(len(rs.streams)))
|
||||
var clientCount int
|
||||
rs.streamsLock.RLock()
|
||||
for _, stream := range rs.streams {
|
||||
clientCount += len(stream.Clients)
|
||||
bytesReceived.WithLabelValues(stream.Name).Set(float64(stream.bytesReceived.Load()))
|
||||
for _, client := range stream.Clients {
|
||||
bytesSent.WithLabelValues(stream.Name).Set(float64(client.bytesSent.Load()))
|
||||
droppedPackets.WithLabelValues(stream.Name).Set(float64(client.droppedPackets.Load()))
|
||||
}
|
||||
}
|
||||
rs.streamsLock.RUnlock()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (rs *RTMPServer) handleConn(c *rtmp.Conn, nc net.Conn) {
|
||||
rs.Logger.Debugw("Incoming connection.", "remote_addr", nc.RemoteAddr(), "url", c.URL.String(), "publishing", c.Publishing)
|
||||
viewerPattern := `^/view/(.+)$`
|
||||
|
@@ -8,16 +8,18 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.uio.no/torjus/dogtamer/m3u"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type WebServer struct {
|
||||
Logger *zap.SugaredLogger
|
||||
ListenAddr string
|
||||
ctx context.Context
|
||||
rtmpServer *RTMPServer
|
||||
httpServer *http.Server
|
||||
Logger *zap.SugaredLogger
|
||||
EnableMetrics bool
|
||||
ListenAddr string
|
||||
ctx context.Context
|
||||
rtmpServer *RTMPServer
|
||||
httpServer *http.Server
|
||||
}
|
||||
|
||||
func NewWebServer(ctx context.Context, rs *RTMPServer) *WebServer {
|
||||
@@ -32,6 +34,9 @@ func NewWebServer(ctx context.Context, rs *RTMPServer) *WebServer {
|
||||
func (ws *WebServer) Serve() error {
|
||||
r := chi.NewRouter()
|
||||
r.Get("/", ws.IndexHandler)
|
||||
if ws.EnableMetrics {
|
||||
r.Handle("/metrics", promhttp.Handler())
|
||||
}
|
||||
r.Get("/playlist/{name}", ws.PlaylistHandler)
|
||||
ws.httpServer = &http.Server{
|
||||
Addr: ws.ListenAddr,
|
||||
|
Reference in New Issue
Block a user