feat: add session replay with terminal playback via xterm.js
Persist byte-level I/O events from SSH sessions to SQLite and add a web
UI to replay them with original timing. Events are buffered in memory
and flushed every 2s to avoid blocking SSH I/O on database writes.
- Add session_events table (migration 002)
- Add SessionEvent type and storage methods (SQLite + MemoryStore)
- Change RecordingChannel to support multiple callbacks
- Add EventRecorder for buffered event persistence
- Add session detail page with xterm.js terminal replay
- Add /api/sessions/{id}/events JSON endpoint
- Linkify session IDs in dashboard and active sessions
- Vendor xterm.js v5.3.0
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
92
internal/shell/eventrecorder.go
Normal file
92
internal/shell/eventrecorder.go
Normal file
@@ -0,0 +1,92 @@
|
||||
package shell
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.t-juice.club/torjus/oubliette/internal/storage"
|
||||
)
|
||||
|
||||
// EventRecorder buffers I/O events in memory and periodically flushes them to
|
||||
// a storage.Store. It is designed to be registered as a RecordingChannel
|
||||
// callback so that SSH I/O is never blocked by database writes.
|
||||
type EventRecorder struct {
|
||||
sessionID string
|
||||
store storage.Store
|
||||
logger *slog.Logger
|
||||
|
||||
mu sync.Mutex
|
||||
buf []storage.SessionEvent
|
||||
cancel context.CancelFunc
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// NewEventRecorder creates a recorder that will persist events for the given session.
|
||||
func NewEventRecorder(sessionID string, store storage.Store, logger *slog.Logger) *EventRecorder {
|
||||
return &EventRecorder{
|
||||
sessionID: sessionID,
|
||||
store: store,
|
||||
logger: logger,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// RecordEvent implements the EventCallback signature and appends an event to
|
||||
// the in-memory buffer. It is safe to call concurrently.
|
||||
func (er *EventRecorder) RecordEvent(ts time.Time, direction int, data []byte) {
|
||||
er.mu.Lock()
|
||||
defer er.mu.Unlock()
|
||||
er.buf = append(er.buf, storage.SessionEvent{
|
||||
SessionID: er.sessionID,
|
||||
Timestamp: ts,
|
||||
Direction: direction,
|
||||
Data: data,
|
||||
})
|
||||
}
|
||||
|
||||
// Start begins the background flush goroutine that drains the buffer every 2 seconds.
|
||||
func (er *EventRecorder) Start(ctx context.Context) {
|
||||
ctx, er.cancel = context.WithCancel(ctx)
|
||||
go er.run(ctx)
|
||||
}
|
||||
|
||||
// Close cancels the background goroutine and performs a final flush.
|
||||
func (er *EventRecorder) Close() {
|
||||
if er.cancel != nil {
|
||||
er.cancel()
|
||||
}
|
||||
<-er.done
|
||||
}
|
||||
|
||||
func (er *EventRecorder) run(ctx context.Context) {
|
||||
defer close(er.done)
|
||||
ticker := time.NewTicker(2 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
er.flush()
|
||||
return
|
||||
case <-ticker.C:
|
||||
er.flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (er *EventRecorder) flush() {
|
||||
er.mu.Lock()
|
||||
if len(er.buf) == 0 {
|
||||
er.mu.Unlock()
|
||||
return
|
||||
}
|
||||
events := er.buf
|
||||
er.buf = nil
|
||||
er.mu.Unlock()
|
||||
|
||||
if err := er.store.AppendSessionEvents(context.Background(), events); err != nil {
|
||||
er.logger.Error("failed to flush session events", "err", err, "session_id", er.sessionID)
|
||||
}
|
||||
}
|
||||
80
internal/shell/eventrecorder_test.go
Normal file
80
internal/shell/eventrecorder_test.go
Normal file
@@ -0,0 +1,80 @@
|
||||
package shell
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.t-juice.club/torjus/oubliette/internal/storage"
|
||||
)
|
||||
|
||||
func TestEventRecorderFlush(t *testing.T) {
|
||||
store := storage.NewMemoryStore()
|
||||
ctx := context.Background()
|
||||
|
||||
// Create a session so events have a valid session ID.
|
||||
id, err := store.CreateSession(ctx, "10.0.0.1", "root", "bash")
|
||||
if err != nil {
|
||||
t.Fatalf("CreateSession: %v", err)
|
||||
}
|
||||
|
||||
rec := NewEventRecorder(id, store, slog.Default())
|
||||
rec.Start(ctx)
|
||||
|
||||
// Record some events.
|
||||
now := time.Now()
|
||||
rec.RecordEvent(now, 0, []byte("hello"))
|
||||
rec.RecordEvent(now.Add(100*time.Millisecond), 1, []byte("world"))
|
||||
|
||||
// Close should trigger final flush.
|
||||
rec.Close()
|
||||
|
||||
events, err := store.GetSessionEvents(ctx, id)
|
||||
if err != nil {
|
||||
t.Fatalf("GetSessionEvents: %v", err)
|
||||
}
|
||||
if len(events) != 2 {
|
||||
t.Fatalf("len = %d, want 2", len(events))
|
||||
}
|
||||
if string(events[0].Data) != "hello" {
|
||||
t.Errorf("events[0].Data = %q, want %q", events[0].Data, "hello")
|
||||
}
|
||||
if events[0].Direction != 0 {
|
||||
t.Errorf("events[0].Direction = %d, want 0", events[0].Direction)
|
||||
}
|
||||
if string(events[1].Data) != "world" {
|
||||
t.Errorf("events[1].Data = %q, want %q", events[1].Data, "world")
|
||||
}
|
||||
if events[1].Direction != 1 {
|
||||
t.Errorf("events[1].Direction = %d, want 1", events[1].Direction)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventRecorderPeriodicFlush(t *testing.T) {
|
||||
store := storage.NewMemoryStore()
|
||||
ctx := context.Background()
|
||||
|
||||
id, err := store.CreateSession(ctx, "10.0.0.1", "root", "bash")
|
||||
if err != nil {
|
||||
t.Fatalf("CreateSession: %v", err)
|
||||
}
|
||||
|
||||
rec := NewEventRecorder(id, store, slog.Default())
|
||||
rec.Start(ctx)
|
||||
|
||||
// Record an event and wait for the periodic flush (2s + some margin).
|
||||
rec.RecordEvent(time.Now(), 1, []byte("periodic"))
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
events, err := store.GetSessionEvents(ctx, id)
|
||||
if err != nil {
|
||||
t.Fatalf("GetSessionEvents: %v", err)
|
||||
}
|
||||
if len(events) != 1 {
|
||||
t.Errorf("expected periodic flush, got %d events", len(events))
|
||||
}
|
||||
|
||||
rec.Close()
|
||||
}
|
||||
@@ -9,12 +9,11 @@ import (
|
||||
// direction is 0 for input (client→server) and 1 for output (server→client).
|
||||
type EventCallback func(ts time.Time, direction int, data []byte)
|
||||
|
||||
// RecordingChannel wraps an io.ReadWriteCloser and optionally invokes a callback
|
||||
// on every Read (input) and Write (output). Phase 2.3 will add byte-level
|
||||
// keystroke recording here without changing any shell code.
|
||||
// RecordingChannel wraps an io.ReadWriteCloser and optionally invokes callbacks
|
||||
// on every Read (input) and Write (output).
|
||||
type RecordingChannel struct {
|
||||
inner io.ReadWriteCloser
|
||||
callback EventCallback
|
||||
inner io.ReadWriteCloser
|
||||
callbacks []EventCallback
|
||||
}
|
||||
|
||||
// NewRecordingChannel returns a RecordingChannel wrapping rw.
|
||||
@@ -22,28 +21,40 @@ func NewRecordingChannel(rw io.ReadWriteCloser) *RecordingChannel {
|
||||
return &RecordingChannel{inner: rw}
|
||||
}
|
||||
|
||||
// WithCallback sets the event callback and returns the RecordingChannel for chaining.
|
||||
// WithCallback clears existing callbacks, sets the given one, and returns the
|
||||
// RecordingChannel for chaining. Kept for backward compatibility.
|
||||
func (r *RecordingChannel) WithCallback(cb EventCallback) *RecordingChannel {
|
||||
r.callback = cb
|
||||
r.callbacks = []EventCallback{cb}
|
||||
return r
|
||||
}
|
||||
|
||||
// AddCallback appends an additional event callback.
|
||||
func (r *RecordingChannel) AddCallback(cb EventCallback) {
|
||||
r.callbacks = append(r.callbacks, cb)
|
||||
}
|
||||
|
||||
func (r *RecordingChannel) Read(p []byte) (int, error) {
|
||||
n, err := r.inner.Read(p)
|
||||
if n > 0 && r.callback != nil {
|
||||
if n > 0 && len(r.callbacks) > 0 {
|
||||
ts := time.Now()
|
||||
cp := make([]byte, n)
|
||||
copy(cp, p[:n])
|
||||
r.callback(time.Now(), 0, cp)
|
||||
for _, cb := range r.callbacks {
|
||||
cb(ts, 0, cp)
|
||||
}
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (r *RecordingChannel) Write(p []byte) (int, error) {
|
||||
n, err := r.inner.Write(p)
|
||||
if n > 0 && r.callback != nil {
|
||||
if n > 0 && len(r.callbacks) > 0 {
|
||||
ts := time.Now()
|
||||
cp := make([]byte, n)
|
||||
copy(cp, p[:n])
|
||||
r.callback(time.Now(), 1, cp)
|
||||
for _, cb := range r.callbacks {
|
||||
cb(ts, 1, cp)
|
||||
}
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
@@ -3,7 +3,9 @@ package shell
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// nopCloser wraps a ReadWriter with a no-op Close.
|
||||
@@ -41,3 +43,80 @@ func TestRecordingChannelPassthrough(t *testing.T) {
|
||||
t.Fatalf("Close: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRecordingChannelMultiCallback(t *testing.T) {
|
||||
var buf bytes.Buffer
|
||||
rc := NewRecordingChannel(nopCloser{&buf})
|
||||
|
||||
type event struct {
|
||||
ts time.Time
|
||||
direction int
|
||||
data string
|
||||
}
|
||||
|
||||
var mu sync.Mutex
|
||||
var events1, events2 []event
|
||||
|
||||
rc.AddCallback(func(ts time.Time, direction int, data []byte) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
events1 = append(events1, event{ts, direction, string(data)})
|
||||
})
|
||||
rc.AddCallback(func(ts time.Time, direction int, data []byte) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
events2 = append(events2, event{ts, direction, string(data)})
|
||||
})
|
||||
|
||||
// Write triggers both callbacks with direction=1.
|
||||
rc.Write([]byte("hello"))
|
||||
|
||||
// Read triggers both callbacks with direction=0.
|
||||
out := make([]byte, 16)
|
||||
rc.Read(out)
|
||||
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
if len(events1) != 2 {
|
||||
t.Fatalf("callback1 got %d events, want 2", len(events1))
|
||||
}
|
||||
if len(events2) != 2 {
|
||||
t.Fatalf("callback2 got %d events, want 2", len(events2))
|
||||
}
|
||||
|
||||
// Write event should be direction=1.
|
||||
if events1[0].direction != 1 {
|
||||
t.Errorf("write direction = %d, want 1", events1[0].direction)
|
||||
}
|
||||
// Read event should be direction=0.
|
||||
if events1[1].direction != 0 {
|
||||
t.Errorf("read direction = %d, want 0", events1[1].direction)
|
||||
}
|
||||
|
||||
// Both callbacks should get the same timestamp for a single operation.
|
||||
if events1[0].ts != events2[0].ts {
|
||||
t.Error("callbacks should receive the same timestamp")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRecordingChannelWithCallbackClearsExisting(t *testing.T) {
|
||||
var buf bytes.Buffer
|
||||
rc := NewRecordingChannel(nopCloser{&buf})
|
||||
|
||||
called1 := false
|
||||
called2 := false
|
||||
|
||||
rc.AddCallback(func(_ time.Time, _ int, _ []byte) { called1 = true })
|
||||
// WithCallback should clear existing and set new.
|
||||
rc.WithCallback(func(_ time.Time, _ int, _ []byte) { called2 = true })
|
||||
|
||||
rc.Write([]byte("x"))
|
||||
|
||||
if called1 {
|
||||
t.Error("first callback should not be called after WithCallback")
|
||||
}
|
||||
if !called2 {
|
||||
t.Error("second callback should be called")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user