package shell import ( "context" "log/slog" "sync" "time" "git.t-juice.club/torjus/oubliette/internal/storage" ) // EventRecorder buffers I/O events in memory and periodically flushes them to // a storage.Store. It is designed to be registered as a RecordingChannel // callback so that SSH I/O is never blocked by database writes. type EventRecorder struct { sessionID string store storage.Store logger *slog.Logger mu sync.Mutex buf []storage.SessionEvent cancel context.CancelFunc done chan struct{} } // NewEventRecorder creates a recorder that will persist events for the given session. func NewEventRecorder(sessionID string, store storage.Store, logger *slog.Logger) *EventRecorder { return &EventRecorder{ sessionID: sessionID, store: store, logger: logger, done: make(chan struct{}), } } // RecordEvent implements the EventCallback signature and appends an event to // the in-memory buffer. It is safe to call concurrently. func (er *EventRecorder) RecordEvent(ts time.Time, direction int, data []byte) { er.mu.Lock() defer er.mu.Unlock() er.buf = append(er.buf, storage.SessionEvent{ SessionID: er.sessionID, Timestamp: ts, Direction: direction, Data: data, }) } // Start begins the background flush goroutine that drains the buffer every 2 seconds. func (er *EventRecorder) Start(ctx context.Context) { ctx, er.cancel = context.WithCancel(ctx) go er.run(ctx) } // Close cancels the background goroutine and performs a final flush. func (er *EventRecorder) Close() { if er.cancel != nil { er.cancel() } <-er.done } func (er *EventRecorder) run(ctx context.Context) { defer close(er.done) ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): er.flush() return case <-ticker.C: er.flush() } } } func (er *EventRecorder) flush() { er.mu.Lock() if len(er.buf) == 0 { er.mu.Unlock() return } events := er.buf er.buf = nil er.mu.Unlock() if err := er.store.AppendSessionEvents(context.Background(), events); err != nil { er.logger.Error("failed to flush session events", "err", err, "session_id", er.sessionID) } }