This repository has been archived on 2026-03-09. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
oubliette/internal/shell/eventrecorder.go
Torjus Håkestad 1b28f10ca8 refactor: migrate module path from git.t-juice.club to code.t-juice.club
Update Go module path and all import references to reflect the migration
from Gitea (git.t-juice.club) to Forgejo (code.t-juice.club).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 18:51:23 +01:00

93 lines
2.1 KiB
Go

package shell
import (
"context"
"log/slog"
"sync"
"time"
"code.t-juice.club/torjus/oubliette/internal/storage"
)
// EventRecorder buffers I/O events in memory and periodically flushes them to
// a storage.Store. It is designed to be registered as a RecordingChannel
// callback so that SSH I/O is never blocked by database writes.
type EventRecorder struct {
sessionID string
store storage.Store
logger *slog.Logger
mu sync.Mutex
buf []storage.SessionEvent
cancel context.CancelFunc
done chan struct{}
}
// NewEventRecorder creates a recorder that will persist events for the given session.
func NewEventRecorder(sessionID string, store storage.Store, logger *slog.Logger) *EventRecorder {
return &EventRecorder{
sessionID: sessionID,
store: store,
logger: logger,
done: make(chan struct{}),
}
}
// RecordEvent implements the EventCallback signature and appends an event to
// the in-memory buffer. It is safe to call concurrently.
func (er *EventRecorder) RecordEvent(ts time.Time, direction int, data []byte) {
er.mu.Lock()
defer er.mu.Unlock()
er.buf = append(er.buf, storage.SessionEvent{
SessionID: er.sessionID,
Timestamp: ts,
Direction: direction,
Data: data,
})
}
// Start begins the background flush goroutine that drains the buffer every 2 seconds.
func (er *EventRecorder) Start(ctx context.Context) {
ctx, er.cancel = context.WithCancel(ctx)
go er.run(ctx)
}
// Close cancels the background goroutine and performs a final flush.
func (er *EventRecorder) Close() {
if er.cancel != nil {
er.cancel()
}
<-er.done
}
func (er *EventRecorder) run(ctx context.Context) {
defer close(er.done)
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
er.flush()
return
case <-ticker.C:
er.flush()
}
}
}
func (er *EventRecorder) flush() {
er.mu.Lock()
if len(er.buf) == 0 {
er.mu.Unlock()
return
}
events := er.buf
er.buf = nil
er.mu.Unlock()
if err := er.store.AppendSessionEvents(context.Background(), events); err != nil {
er.logger.Error("failed to flush session events", "err", err, "session_id", er.sessionID)
}
}