Compare commits

...

4 Commits

Author SHA1 Message Date
541b0df007 chore: bump version to 0.3.0
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 22:10:26 +01:00
24c166b86b feat: add session replay with terminal playback via xterm.js
Persist byte-level I/O events from SSH sessions to SQLite and add a web
UI to replay them with original timing. Events are buffered in memory
and flushed every 2s to avoid blocking SSH I/O on database writes.

- Add session_events table (migration 002)
- Add SessionEvent type and storage methods (SQLite + MemoryStore)
- Change RecordingChannel to support multiple callbacks
- Add EventRecorder for buffered event persistence
- Add session detail page with xterm.js terminal replay
- Add /api/sessions/{id}/events JSON endpoint
- Linkify session IDs in dashboard and active sessions
- Vendor xterm.js v5.3.0

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 22:09:24 +01:00
d4380c0aea chore: add golangci-lint config and fix all lint issues
Enable 15 additional linters (gosec, errorlint, gocritic, modernize,
misspell, bodyclose, sqlclosecheck, nilerr, unconvert, durationcheck,
sloglint, wastedassign, usestdlibvars) with sensible exclusion rules.

Fix all findings: errors.Is for error comparisons, run() pattern in
main to avoid exitAfterDefer, ReadHeaderTimeout for Slowloris
protection, bounds check in escape sequence reader, WaitGroup.Go,
slices.Contains, range-over-int loops, and http.MethodGet constants.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 21:43:49 +01:00
0ad6f4cb6a feat: add human detection scoring and webhook notifications
Implement phase 2.1 (human detection) and 2.2 (notifications):

- Detection scorer computes 0.0-1.0 human likelihood from keystroke
  timing variance, special key usage, typing speed, command diversity,
  and session duration
- Webhook notifier sends JSON POST to configured endpoints with
  deduplication, custom headers, and event filtering
- RecordingChannel gains an event callback for feeding keystrokes
  to the scorer without coupling shell and detection packages
- Server wires scorer into session lifecycle with periodic updates
  and threshold-based notification triggers
- Web UI shows human score in session tables with highlighting
- New config sections: [detection] and [[notify.webhooks]]

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 21:28:11 +01:00
35 changed files with 2388 additions and 92 deletions

79
.golangci.yml Normal file
View File

@@ -0,0 +1,79 @@
version: "2"
linters:
enable:
# Bug detectors.
- bodyclose
- durationcheck
- errorlint
- gocritic
- nilerr
- sqlclosecheck
# Security.
- gosec
# Style and modernization.
- misspell
- modernize
- unconvert
- usestdlibvars
# Logging.
- sloglint
# Dead code.
- wastedassign
settings:
errcheck:
exclude-functions:
# Terminal I/O writes (honeypot shell output).
- fmt.Fprint
- fmt.Fprintf
# Low-level byte I/O in shell readLine (escape sequences, echo).
- (io.ReadWriter).Read
- (io.ReadWriter).Write
- (io.ReadWriteCloser).Read
- (io.ReadWriteCloser).Write
- (io.Reader).Read
- (io.Writer).Write
gosec:
excludes:
# File reads from config paths — expected in a CLI tool.
- G304
# Weak RNG for shell selection — crypto/rand not needed.
- G404
exclusions:
rules:
# Ignore unchecked Close() — standard resource cleanup.
- linters: [errcheck]
text: "Error return value of .+\\.Close.+ is not checked"
# Ignore unchecked Rollback() — called in error paths before returning.
- linters: [errcheck]
text: "Error return value of .+\\.Rollback.+ is not checked"
# Ignore unchecked Reply/Reject — SSH protocol; nothing useful on failure.
- linters: [errcheck]
text: "Error return value of .+\\.(Reply|Reject).+ is not checked"
# Test files: allow unchecked errors.
- linters: [errcheck]
path: "_test\\.go"
# Test files: InsecureIgnoreHostKey, file permissions, unhandled errors are expected.
- linters: [gosec]
path: "_test\\.go"
# Unhandled errors for cleanup/protocol ops — mirrors errcheck exclusions.
- linters: [gosec]
text: "G104"
source: "\\.(Close|Rollback|Reject|Reply|Read|Write)\\("
# SQL with safe column interpolation from a fixed switch — not user input.
- linters: [gosec]
text: "G201"
path: "internal/storage/"

View File

@@ -117,14 +117,14 @@ This lets shells build realistic prompts (`username@hostname:~$`) and log activi
Goal: Detect likely-human sessions and make the system smarter.
### 2.1 Human Detection Scoring
### 2.1 Human Detection Scoring
- Keystroke timing analysis
- Track backspace, tab, arrow key usage
- Command diversity scoring
- Compute per-session human score, store in sessions table
- Flag sessions above configurable threshold
### 2.2 Notifications
### 2.2 Notifications
- Webhook support (generic HTTP POST, works with Slack/Discord/ntfy)
- Trigger on: human score threshold crossed, new session started, configurable
- Include session details in payload

View File

@@ -42,6 +42,11 @@ Key settings:
- `shell.fake_user` — override username in prompt; empty uses the authenticated user
- `web.enabled` — enable the web dashboard (default `false`)
- `web.listen_addr` — web dashboard listen address (default `:8080`)
- Session detail pages at `/sessions/{id}` include terminal replay via xterm.js
- `detection.enabled` — enable human detection scoring (default `false`)
- `detection.threshold` — score threshold (0.01.0) for flagging sessions (default `0.6`)
- `detection.update_interval` — how often to recompute scores (default `5s`)
- `notify.webhooks` — list of webhook endpoints for notifications (see example config)
### Run

View File

@@ -4,12 +4,14 @@ import (
"context"
"errors"
"flag"
"fmt"
"log/slog"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
"git.t-juice.club/torjus/oubliette/internal/config"
"git.t-juice.club/torjus/oubliette/internal/server"
@@ -17,16 +19,22 @@ import (
"git.t-juice.club/torjus/oubliette/internal/web"
)
const Version = "0.2.0"
const Version = "0.3.0"
func main() {
if err := run(); err != nil {
slog.Error("fatal error", "err", err)
os.Exit(1)
}
}
func run() error {
configPath := flag.String("config", "oubliette.toml", "path to config file")
flag.Parse()
cfg, err := config.Load(*configPath)
if err != nil {
slog.Error("failed to load config", "err", err)
os.Exit(1)
return fmt.Errorf("load config: %w", err)
}
level := new(slog.LevelVar)
@@ -53,8 +61,7 @@ func main() {
store, err := storage.NewSQLiteStore(cfg.Storage.DBPath)
if err != nil {
logger.Error("failed to open database", "err", err)
os.Exit(1)
return fmt.Errorf("open database: %w", err)
}
defer store.Close()
@@ -65,8 +72,7 @@ func main() {
srv, err := server.New(*cfg, store, logger)
if err != nil {
logger.Error("failed to create server", "err", err)
os.Exit(1)
return fmt.Errorf("create server: %w", err)
}
var wg sync.WaitGroup
@@ -75,23 +81,21 @@ func main() {
if cfg.Web.Enabled {
webHandler, err := web.NewServer(store, logger.With("component", "web"))
if err != nil {
logger.Error("failed to create web server", "err", err)
os.Exit(1)
return fmt.Errorf("create web server: %w", err)
}
httpServer := &http.Server{
Addr: cfg.Web.ListenAddr,
Handler: webHandler,
Addr: cfg.Web.ListenAddr,
Handler: webHandler,
ReadHeaderTimeout: 10 * time.Second,
}
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
logger.Info("web server listening", "addr", cfg.Web.ListenAddr)
if err := httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Error("web server error", "err", err)
}
}()
})
// Graceful shutdown on context cancellation.
go func() {
@@ -103,10 +107,10 @@ func main() {
}
if err := srv.ListenAndServe(ctx); err != nil {
logger.Error("server error", "err", err)
os.Exit(1)
return fmt.Errorf("server: %w", err)
}
wg.Wait()
logger.Info("server stopped")
return nil
}

View File

@@ -36,7 +36,7 @@ func TestStaticCredentialsWrongPassword(t *testing.T) {
func TestRejectionBeforeThreshold(t *testing.T) {
a := newTestAuth(3, time.Hour)
for i := 0; i < 2; i++ {
for i := range 2 {
d := a.Authenticate("1.2.3.4", "user", "pass")
if d.Accepted {
t.Fatalf("attempt %d should be rejected", i+1)
@@ -49,7 +49,7 @@ func TestRejectionBeforeThreshold(t *testing.T) {
func TestThresholdAcceptance(t *testing.T) {
a := newTestAuth(3, time.Hour)
for i := 0; i < 2; i++ {
for i := range 2 {
d := a.Authenticate("1.2.3.4", "user", "pass")
if d.Accepted {
t.Fatalf("attempt %d should be rejected", i+1)
@@ -65,7 +65,7 @@ func TestPerIPIsolation(t *testing.T) {
a := newTestAuth(3, time.Hour)
// IP1 gets 2 failures.
for i := 0; i < 2; i++ {
for range 2 {
a.Authenticate("1.1.1.1", "user", "pass")
}
@@ -157,12 +157,10 @@ func TestConcurrentAccess(t *testing.T) {
a := newTestAuth(5, time.Hour)
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for range 100 {
wg.Go(func() {
a.Authenticate("1.2.3.4", "user", "pass")
}()
})
}
wg.Wait()
}

View File

@@ -9,13 +9,15 @@ import (
)
type Config struct {
SSH SSHConfig `toml:"ssh"`
Auth AuthConfig `toml:"auth"`
Storage StorageConfig `toml:"storage"`
Shell ShellConfig `toml:"shell"`
Web WebConfig `toml:"web"`
LogLevel string `toml:"log_level"`
LogFormat string `toml:"log_format"` // "text" (default) or "json"
SSH SSHConfig `toml:"ssh"`
Auth AuthConfig `toml:"auth"`
Storage StorageConfig `toml:"storage"`
Shell ShellConfig `toml:"shell"`
Web WebConfig `toml:"web"`
Detection DetectionConfig `toml:"detection"`
Notify NotifyConfig `toml:"notify"`
LogLevel string `toml:"log_level"`
LogFormat string `toml:"log_format"` // "text" (default) or "json"
}
type WebConfig struct {
@@ -59,6 +61,25 @@ type Credential struct {
Password string `toml:"password"`
}
type DetectionConfig struct {
Enabled bool `toml:"enabled"`
Threshold float64 `toml:"threshold"`
UpdateInterval string `toml:"update_interval"`
// Parsed duration, not from TOML directly.
UpdateIntervalDuration time.Duration `toml:"-"`
}
type NotifyConfig struct {
Webhooks []WebhookNotifyConfig `toml:"webhooks"`
}
type WebhookNotifyConfig struct {
URL string `toml:"url"`
Headers map[string]string `toml:"headers"`
Events []string `toml:"events"` // empty = all events
}
func Load(path string) (*Config, error) {
data, err := os.ReadFile(path)
if err != nil {
@@ -127,6 +148,12 @@ func applyDefaults(cfg *Config) {
if cfg.Shell.Banner == "" {
cfg.Shell.Banner = "Welcome to Ubuntu 22.04.3 LTS (GNU/Linux 5.15.0-89-generic x86_64)\r\n\r\n"
}
if cfg.Detection.Threshold == 0 {
cfg.Detection.Threshold = 0.6
}
if cfg.Detection.UpdateInterval == "" {
cfg.Detection.UpdateInterval = "5s"
}
}
// knownShellKeys are top-level keys in [shell] that are not per-shell sub-tables.
@@ -189,5 +216,33 @@ func validate(cfg *Config) error {
}
}
// Validate detection config.
if cfg.Detection.Enabled {
if cfg.Detection.Threshold < 0 || cfg.Detection.Threshold > 1 {
return fmt.Errorf("detection.threshold must be between 0 and 1, got %f", cfg.Detection.Threshold)
}
ui, err := time.ParseDuration(cfg.Detection.UpdateInterval)
if err != nil {
return fmt.Errorf("invalid detection.update_interval %q: %w", cfg.Detection.UpdateInterval, err)
}
if ui <= 0 {
return fmt.Errorf("detection.update_interval must be positive, got %s", ui)
}
cfg.Detection.UpdateIntervalDuration = ui
}
// Validate notify config.
knownEvents := map[string]bool{"human_detected": true, "session_started": true}
for i, wh := range cfg.Notify.Webhooks {
if wh.URL == "" {
return fmt.Errorf("notify.webhooks[%d]: url must not be empty", i)
}
for j, ev := range wh.Events {
if !knownEvents[ev] {
return fmt.Errorf("notify.webhooks[%d].events[%d]: unknown event %q", i, j, ev)
}
}
}
return nil
}

View File

@@ -0,0 +1,259 @@
package detection
import (
"math"
"sync"
"time"
)
// Direction constants for RecordEvent.
const (
DirInput = 0 // client → server (keystrokes)
DirOutput = 1 // server → client (shell output)
)
// Signal weights for the composite score.
const (
weightTimingVariance = 0.30
weightSpecialKeys = 0.20
weightTypingSpeed = 0.20
weightCommandDiversity = 0.15
weightSessionDuration = 0.15
)
// Scorer accumulates keystroke events and computes a 0.01.0
// human likelihood score based on multiple signals.
type Scorer struct {
mu sync.Mutex
// Input timing data.
inputTimes []time.Time
delays []time.Duration
// Special key counters.
specialKeys int
// Command tracking: we count newlines and unique command prefixes.
currentCmd []byte
commands map[string]struct{}
// Session activity duration.
firstInput time.Time
lastInput time.Time
}
// NewScorer returns a new Scorer ready to record events.
func NewScorer() *Scorer {
return &Scorer{
commands: make(map[string]struct{}),
}
}
// RecordEvent records a data event with timestamp and direction.
// direction should be DirInput (0) for client input or DirOutput (1) for server output.
func (s *Scorer) RecordEvent(ts time.Time, direction int, data []byte) {
if direction != DirInput {
return // only analyze input
}
s.mu.Lock()
defer s.mu.Unlock()
if s.firstInput.IsZero() {
s.firstInput = ts
}
s.lastInput = ts
for _, b := range data {
// Track inter-keystroke delay for single-byte inputs.
if len(s.inputTimes) > 0 {
delay := ts.Sub(s.inputTimes[len(s.inputTimes)-1])
if delay > 0 && delay < 30*time.Second {
s.delays = append(s.delays, delay)
}
}
s.inputTimes = append(s.inputTimes, ts)
// Count special keys.
if isSpecialKey(b) {
s.specialKeys++
}
// Track commands (split on newline/CR).
if b == '\r' || b == '\n' {
cmd := string(s.currentCmd)
if len(cmd) > 0 {
s.commands[cmd] = struct{}{}
}
s.currentCmd = s.currentCmd[:0]
} else {
// Handle backspace: remove last byte from current command.
if b == 0x7f || b == 0x08 {
if len(s.currentCmd) > 0 {
s.currentCmd = s.currentCmd[:len(s.currentCmd)-1]
}
} else if b >= 0x20 { // printable
s.currentCmd = append(s.currentCmd, b)
}
}
}
}
// Score computes the composite human likelihood score (0.01.0).
// Thread-safe.
func (s *Scorer) Score() float64 {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.inputTimes) == 0 {
return 0
}
tv := s.timingVarianceScore()
sk := s.specialKeysScore()
ts := s.typingSpeedScore()
cd := s.commandDiversityScore()
sd := s.sessionDurationScore()
score := tv*weightTimingVariance +
sk*weightSpecialKeys +
ts*weightTypingSpeed +
cd*weightCommandDiversity +
sd*weightSessionDuration
return clamp(score, 0, 1)
}
// timingVarianceScore returns 01 based on coefficient of variation of inter-key delays.
// Bots have CV ≈ 0 (instant or uniform), humans have CV ≥ 0.6.
func (s *Scorer) timingVarianceScore() float64 {
if len(s.delays) < 3 {
return 0
}
mean := meanDuration(s.delays)
if mean == 0 {
return 0
}
variance := 0.0
for _, d := range s.delays {
diff := float64(d) - float64(mean)
variance += diff * diff
}
variance /= float64(len(s.delays))
stddev := math.Sqrt(variance)
cv := stddev / float64(mean)
// Map CV to 01: CV of 0.6+ is fully human-like.
return clamp(cv/0.6, 0, 1)
}
// specialKeysScore returns 01 based on count of special key presses.
// Scripts almost never generate backspace/tab/ctrl characters.
func (s *Scorer) specialKeysScore() float64 {
// 5+ special keys → full score.
return clamp(float64(s.specialKeys)/5.0, 0, 1)
}
// typingSpeedScore returns 01 based on median inter-key delay.
// Paste/scripts have < 5ms, humans have 30300ms.
func (s *Scorer) typingSpeedScore() float64 {
if len(s.delays) < 2 {
return 0
}
med := medianDuration(s.delays)
ms := float64(med) / float64(time.Millisecond)
if ms < 5 {
return 0 // paste or script
}
if ms > 300 {
return 0.7 // very slow, still possibly human
}
if ms >= 30 && ms <= 300 {
return 1.0 // human range
}
// 530ms: transition zone
return clamp((ms-5)/25, 0, 1)
}
// commandDiversityScore returns 01 based on number of unique commands.
func (s *Scorer) commandDiversityScore() float64 {
// 3+ unique commands → full score.
return clamp(float64(len(s.commands))/3.0, 0, 1)
}
// sessionDurationScore returns 01 based on active input duration.
func (s *Scorer) sessionDurationScore() float64 {
if s.firstInput.IsZero() || s.lastInput.IsZero() {
return 0
}
dur := s.lastInput.Sub(s.firstInput)
// 10s+ of active input → full score.
return clamp(float64(dur)/float64(10*time.Second), 0, 1)
}
// isSpecialKey returns true for non-printable keys that humans commonly use.
func isSpecialKey(b byte) bool {
switch b {
case 0x7f, // DEL (backspace in most terminals)
0x08, // BS
0x09, // TAB
0x03, // Ctrl-C
0x04, // Ctrl-D
0x1b: // ESC (arrow keys start with ESC)
return true
}
return false
}
func clamp(v, lo, hi float64) float64 {
if v < lo {
return lo
}
if v > hi {
return hi
}
return v
}
func meanDuration(ds []time.Duration) time.Duration {
if len(ds) == 0 {
return 0
}
var sum time.Duration
for _, d := range ds {
sum += d
}
return sum / time.Duration(len(ds))
}
func medianDuration(ds []time.Duration) time.Duration {
n := len(ds)
if n == 0 {
return 0
}
// Copy to avoid mutating the original.
sorted := make([]time.Duration, n)
copy(sorted, ds)
sortDurations(sorted)
if n%2 == 0 {
return (sorted[n/2-1] + sorted[n/2]) / 2
}
return sorted[n/2]
}
func sortDurations(ds []time.Duration) {
// Simple insertion sort — delay slices are small.
for i := 1; i < len(ds); i++ {
key := ds[i]
j := i - 1
for j >= 0 && ds[j] > key {
ds[j+1] = ds[j]
j--
}
ds[j+1] = key
}
}

View File

@@ -0,0 +1,151 @@
package detection
import (
"sync"
"testing"
"time"
)
func TestScorer_EmptyInput(t *testing.T) {
s := NewScorer()
score := s.Score()
if score != 0 {
t.Errorf("empty scorer: got %f, want 0", score)
}
}
func TestScorer_SingleKeystroke(t *testing.T) {
s := NewScorer()
s.RecordEvent(time.Now(), DirInput, []byte("a"))
score := s.Score()
if score != 0 {
t.Errorf("single keystroke: got %f, want 0", score)
}
}
func TestScorer_BotLikeInput(t *testing.T) {
// Simulate a bot: paste entire commands with uniform tiny delays, no special keys.
s := NewScorer()
now := time.Now()
// Bot pastes "cat /etc/passwd\r" all at once with perfectly uniform timing.
for range 3 {
cmd := []byte("cat /etc/passwd\r")
for _, b := range cmd {
s.RecordEvent(now, DirInput, []byte{b})
now = now.Add(100 * time.Microsecond) // ~0.1ms uniform delay = paste
}
}
score := s.Score()
if score >= 0.3 {
t.Errorf("bot-like input: got %f, want < 0.3", score)
}
}
func TestScorer_HumanLikeInput(t *testing.T) {
// Simulate a human: variable timing, backspaces, diverse commands.
s := NewScorer()
now := time.Now()
type cmd struct {
text string
delay time.Duration // base delay between keys
}
commands := []cmd{
{"ls -la\r", 80 * time.Millisecond},
{"cat /etc/paswd", 120 * time.Millisecond}, // typo
{string([]byte{0x7f}), 200 * time.Millisecond}, // backspace
{"wd\r", 90 * time.Millisecond}, // correction
{"whoami\r", 100 * time.Millisecond},
{"uname -a\r", 150 * time.Millisecond},
{string([]byte{0x09}), 300 * time.Millisecond}, // tab completion
{"pwd\r", 70 * time.Millisecond},
}
for _, c := range commands {
for _, b := range []byte(c.text) {
// Add ±30% jitter to make timing more natural.
jitter := time.Duration(float64(c.delay) * 0.3)
delay := c.delay + jitter // simplified: always add, still variable across commands
s.RecordEvent(now, DirInput, []byte{b})
now = now.Add(delay)
}
// Pause between commands (thinking time).
now = now.Add(2 * time.Second)
}
score := s.Score()
if score <= 0.6 {
t.Errorf("human-like input: got %f, want > 0.6", score)
}
}
func TestScorer_OutputIgnored(t *testing.T) {
s := NewScorer()
now := time.Now()
// Only output events — should not affect score.
for range 100 {
s.RecordEvent(now, DirOutput, []byte("some output\n"))
now = now.Add(10 * time.Millisecond)
}
score := s.Score()
if score != 0 {
t.Errorf("output-only: got %f, want 0", score)
}
}
func TestScorer_ThreadSafety(t *testing.T) {
s := NewScorer()
now := time.Now()
var wg sync.WaitGroup
for i := range 10 {
wg.Go(func() {
for j := range 100 {
ts := now.Add(time.Duration(i*100+j) * time.Millisecond)
s.RecordEvent(ts, DirInput, []byte("a"))
}
})
}
// Concurrently read score.
wg.Go(func() {
for range 50 {
_ = s.Score()
}
})
wg.Wait()
// Should not panic; score should be valid.
score := s.Score()
if score < 0 || score > 1 {
t.Errorf("concurrent score out of range: %f", score)
}
}
func TestScorer_CommandDiversity(t *testing.T) {
s := NewScorer()
now := time.Now()
// Type 4 different commands with human-ish timing.
cmds := []string{"ls\r", "pwd\r", "id\r", "whoami\r"}
for _, cmd := range cmds {
for _, b := range []byte(cmd) {
s.RecordEvent(now, DirInput, []byte{b})
now = now.Add(100 * time.Millisecond)
}
now = now.Add(time.Second)
}
score := s.Score()
// With 4 unique commands, human timing, and decent duration,
// we should get a meaningful score.
if score < 0.4 {
t.Errorf("diverse commands: got %f, want >= 0.4", score)
}
}

175
internal/notify/webhook.go Normal file
View File

@@ -0,0 +1,175 @@
package notify
import (
"bytes"
"context"
"encoding/json"
"log/slog"
"net/http"
"slices"
"sync"
"time"
"git.t-juice.club/torjus/oubliette/internal/config"
)
// Event types.
const (
EventHumanDetected = "human_detected"
EventSessionStarted = "session_started"
)
// SessionInfo holds session data included in webhook payloads.
type SessionInfo struct {
ID string `json:"id"`
IP string `json:"ip"`
Username string `json:"username"`
ShellName string `json:"shell_name"`
HumanScore float64 `json:"human_score"`
ConnectedAt string `json:"connected_at"`
}
// webhookPayload is the JSON body sent to webhooks.
type webhookPayload struct {
Event string `json:"event"`
Timestamp string `json:"timestamp"`
Session SessionInfo `json:"session"`
}
// Notifier sends webhook notifications for honeypot events.
type Notifier struct {
webhooks []config.WebhookNotifyConfig
logger *slog.Logger
client *http.Client
mu sync.Mutex
sent map[string]struct{} // dedup key: "sessionID:eventType"
}
// NewNotifier creates a Notifier with the given webhook configurations.
func NewNotifier(webhooks []config.WebhookNotifyConfig, logger *slog.Logger) *Notifier {
return &Notifier{
webhooks: webhooks,
logger: logger,
client: &http.Client{Timeout: 10 * time.Second},
sent: make(map[string]struct{}),
}
}
// Notify sends a notification for the given event type and session.
// Deduplicates by (sessionID, eventType) — each combination is sent at most once.
func (n *Notifier) Notify(ctx context.Context, eventType string, session SessionInfo) {
dedupKey := session.ID + ":" + eventType
n.mu.Lock()
if _, ok := n.sent[dedupKey]; ok {
n.mu.Unlock()
return
}
n.sent[dedupKey] = struct{}{}
n.mu.Unlock()
payload := webhookPayload{
Event: eventType,
Timestamp: time.Now().UTC().Format(time.RFC3339),
Session: session,
}
for _, wh := range n.webhooks {
if !n.shouldSend(wh, eventType) {
continue
}
go n.send(ctx, wh, payload)
}
}
// CleanupSession removes dedup state for a session.
func (n *Notifier) CleanupSession(sessionID string) {
n.mu.Lock()
defer n.mu.Unlock()
for key := range n.sent {
if len(key) > len(sessionID) && key[:len(sessionID)+1] == sessionID+":" {
delete(n.sent, key)
}
}
}
// shouldSend returns true if the webhook is configured to receive this event type.
func (n *Notifier) shouldSend(wh config.WebhookNotifyConfig, eventType string) bool {
if len(wh.Events) == 0 {
return true // empty = all events
}
return slices.Contains(wh.Events, eventType)
}
func (n *Notifier) send(ctx context.Context, wh config.WebhookNotifyConfig, payload webhookPayload) {
body, err := json.Marshal(payload)
if err != nil {
n.logger.Error("failed to marshal webhook payload", "err", err)
return
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, wh.URL, bytes.NewReader(body))
if err != nil {
n.logger.Error("failed to create webhook request", "err", err, "url", wh.URL)
return
}
req.Header.Set("Content-Type", "application/json")
for k, v := range wh.Headers {
req.Header.Set(k, v)
}
resp, err := n.client.Do(req)
if err != nil {
n.logger.Error("webhook request failed", "err", err, "url", wh.URL)
return
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode >= 400 {
n.logger.Warn("webhook returned error status",
"url", wh.URL,
"status", resp.StatusCode,
"event", payload.Event,
)
return
}
n.logger.Debug("webhook sent",
"url", wh.URL,
"event", payload.Event,
"session_id", payload.Session.ID,
)
}
// FormatConnectedAt formats a time for use in SessionInfo.
func FormatConnectedAt(t time.Time) string {
return t.UTC().Format(time.RFC3339)
}
// NoopNotifier is a no-op notifier used when no webhooks are configured.
type NoopNotifier struct{}
func (NoopNotifier) Notify(context.Context, string, SessionInfo) {}
func (NoopNotifier) CleanupSession(string) {}
// Sender is the interface for sending notifications.
type Sender interface {
Notify(ctx context.Context, eventType string, session SessionInfo)
CleanupSession(sessionID string)
}
var (
_ Sender = (*Notifier)(nil)
_ Sender = NoopNotifier{}
)
// NewSender creates a Sender from configuration. Returns a NoopNotifier
// if no webhooks are configured.
func NewSender(webhooks []config.WebhookNotifyConfig, logger *slog.Logger) Sender {
if len(webhooks) == 0 {
return NoopNotifier{}
}
return NewNotifier(webhooks, logger)
}

View File

@@ -0,0 +1,243 @@
package notify
import (
"context"
"encoding/json"
"log/slog"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
"git.t-juice.club/torjus/oubliette/internal/config"
)
func testSession() SessionInfo {
return SessionInfo{
ID: "test-session-123",
IP: "1.2.3.4",
Username: "root",
ShellName: "bash",
HumanScore: 0.85,
ConnectedAt: FormatConnectedAt(time.Now()),
}
}
func TestNotifier_PayloadStructure(t *testing.T) {
var received webhookPayload
var mu sync.Mutex
done := make(chan struct{})
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mu.Lock()
defer mu.Unlock()
if err := json.NewDecoder(r.Body).Decode(&received); err != nil {
t.Errorf("failed to decode payload: %v", err)
}
w.WriteHeader(http.StatusOK)
close(done)
}))
defer srv.Close()
webhooks := []config.WebhookNotifyConfig{
{URL: srv.URL},
}
n := NewNotifier(webhooks, slog.Default())
session := testSession()
n.Notify(context.Background(), EventHumanDetected, session)
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("timeout waiting for webhook")
}
mu.Lock()
defer mu.Unlock()
if received.Event != EventHumanDetected {
t.Errorf("event: got %q, want %q", received.Event, EventHumanDetected)
}
if received.Session.ID != session.ID {
t.Errorf("session ID: got %q, want %q", received.Session.ID, session.ID)
}
if received.Session.IP != session.IP {
t.Errorf("session IP: got %q, want %q", received.Session.IP, session.IP)
}
if received.Session.HumanScore != session.HumanScore {
t.Errorf("score: got %f, want %f", received.Session.HumanScore, session.HumanScore)
}
if received.Timestamp == "" {
t.Error("timestamp should not be empty")
}
}
func TestNotifier_CustomHeaders(t *testing.T) {
var receivedHeaders http.Header
done := make(chan struct{})
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
receivedHeaders = r.Header.Clone()
w.WriteHeader(http.StatusOK)
close(done)
}))
defer srv.Close()
webhooks := []config.WebhookNotifyConfig{
{
URL: srv.URL,
Headers: map[string]string{
"Authorization": "Bearer test-token",
"X-Custom": "my-value",
},
},
}
n := NewNotifier(webhooks, slog.Default())
n.Notify(context.Background(), EventSessionStarted, testSession())
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("timeout waiting for webhook")
}
if got := receivedHeaders.Get("Authorization"); got != "Bearer test-token" {
t.Errorf("Authorization header: got %q, want %q", got, "Bearer test-token")
}
if got := receivedHeaders.Get("X-Custom"); got != "my-value" {
t.Errorf("X-Custom header: got %q, want %q", got, "my-value")
}
if got := receivedHeaders.Get("Content-Type"); got != "application/json" {
t.Errorf("Content-Type: got %q, want %q", got, "application/json")
}
}
func TestNotifier_Deduplication(t *testing.T) {
var count int
var mu sync.Mutex
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mu.Lock()
count++
mu.Unlock()
w.WriteHeader(http.StatusOK)
}))
defer srv.Close()
webhooks := []config.WebhookNotifyConfig{{URL: srv.URL}}
n := NewNotifier(webhooks, slog.Default())
session := testSession()
// Send same event three times for the same session.
for range 3 {
n.Notify(context.Background(), EventHumanDetected, session)
}
// Allow goroutines to complete.
time.Sleep(500 * time.Millisecond)
mu.Lock()
defer mu.Unlock()
if count != 1 {
t.Errorf("dedup: got %d sends, want 1", count)
}
}
func TestNotifier_EventFiltering(t *testing.T) {
var receivedEvents []string
var mu sync.Mutex
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var payload webhookPayload
_ = json.NewDecoder(r.Body).Decode(&payload)
mu.Lock()
receivedEvents = append(receivedEvents, payload.Event)
mu.Unlock()
w.WriteHeader(http.StatusOK)
}))
defer srv.Close()
// Only subscribe to human_detected.
webhooks := []config.WebhookNotifyConfig{
{
URL: srv.URL,
Events: []string{EventHumanDetected},
},
}
n := NewNotifier(webhooks, slog.Default())
session := testSession()
// Send both event types.
n.Notify(context.Background(), EventSessionStarted, session)
// Need a different session for human_detected to avoid dedup with same session.
session2 := testSession()
session2.ID = "test-session-456"
n.Notify(context.Background(), EventHumanDetected, session2)
time.Sleep(500 * time.Millisecond)
mu.Lock()
defer mu.Unlock()
if len(receivedEvents) != 1 {
t.Fatalf("event filtering: got %d events, want 1", len(receivedEvents))
}
if receivedEvents[0] != EventHumanDetected {
t.Errorf("filtered event: got %q, want %q", receivedEvents[0], EventHumanDetected)
}
}
func TestNotifier_CleanupSession(t *testing.T) {
var count int
var mu sync.Mutex
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mu.Lock()
count++
mu.Unlock()
w.WriteHeader(http.StatusOK)
}))
defer srv.Close()
webhooks := []config.WebhookNotifyConfig{{URL: srv.URL}}
n := NewNotifier(webhooks, slog.Default())
session := testSession()
n.Notify(context.Background(), EventHumanDetected, session)
time.Sleep(200 * time.Millisecond)
// Cleanup and resend — should work again.
n.CleanupSession(session.ID)
n.Notify(context.Background(), EventHumanDetected, session)
time.Sleep(200 * time.Millisecond)
mu.Lock()
defer mu.Unlock()
if count != 2 {
t.Errorf("after cleanup: got %d sends, want 2", count)
}
}
func TestNoopNotifier(t *testing.T) {
// Should not panic.
n := NoopNotifier{}
n.Notify(context.Background(), EventHumanDetected, testSession())
n.CleanupSession("test")
}
func TestNewSender_NoWebhooks(t *testing.T) {
sender := NewSender(nil, slog.Default())
if _, ok := sender.(NoopNotifier); !ok {
t.Errorf("expected NoopNotifier, got %T", sender)
}
}
func TestNewSender_WithWebhooks(t *testing.T) {
webhooks := []config.WebhookNotifyConfig{{URL: "http://example.com"}}
sender := NewSender(webhooks, slog.Default())
if _, ok := sender.(*Notifier); !ok {
t.Errorf("expected *Notifier, got %T", sender)
}
}

View File

@@ -14,6 +14,8 @@ import (
"git.t-juice.club/torjus/oubliette/internal/auth"
"git.t-juice.club/torjus/oubliette/internal/config"
"git.t-juice.club/torjus/oubliette/internal/detection"
"git.t-juice.club/torjus/oubliette/internal/notify"
"git.t-juice.club/torjus/oubliette/internal/shell"
"git.t-juice.club/torjus/oubliette/internal/shell/bash"
"git.t-juice.club/torjus/oubliette/internal/storage"
@@ -21,13 +23,14 @@ import (
)
type Server struct {
cfg config.Config
store storage.Store
authenticator *auth.Authenticator
sshConfig *ssh.ServerConfig
logger *slog.Logger
connSem chan struct{} // semaphore limiting concurrent connections
shellRegistry *shell.Registry
cfg config.Config
store storage.Store
authenticator *auth.Authenticator
sshConfig *ssh.ServerConfig
logger *slog.Logger
connSem chan struct{} // semaphore limiting concurrent connections
shellRegistry *shell.Registry
notifier notify.Sender
}
func New(cfg config.Config, store storage.Store, logger *slog.Logger) (*Server, error) {
@@ -43,6 +46,7 @@ func New(cfg config.Config, store storage.Store, logger *slog.Logger) (*Server,
logger: logger,
connSem: make(chan struct{}, cfg.SSH.MaxConnections),
shellRegistry: registry,
notifier: notify.NewSender(cfg.Notify.Webhooks, logger),
}
hostKey, err := loadOrGenerateHostKey(cfg.SSH.HostKeyPath)
@@ -159,6 +163,18 @@ func (s *Server) handleSession(channel ssh.Channel, requests <-chan *ssh.Request
"session_id", sessionID,
)
// Send session_started notification.
connectedAt := time.Now()
sessionInfo := notify.SessionInfo{
ID: sessionID,
IP: ip,
Username: conn.User(),
ShellName: selectedShell.Name(),
ConnectedAt: notify.FormatConnectedAt(connectedAt),
}
s.notifier.Notify(context.Background(), notify.EventSessionStarted, sessionInfo)
defer s.notifier.CleanupSession(sessionID)
// Handle session requests (pty-req, shell, etc.)
go func() {
for req := range requests {
@@ -194,18 +210,82 @@ func (s *Server) handleSession(channel ssh.Channel, requests <-chan *ssh.Request
},
}
// Wrap channel in RecordingChannel for future byte-level recording.
// Wrap channel in RecordingChannel.
recorder := shell.NewRecordingChannel(channel)
// Always record session events for replay.
eventRec := shell.NewEventRecorder(sessionID, s.store, s.logger)
eventRec.Start(context.Background())
defer eventRec.Close()
recorder.AddCallback(eventRec.RecordEvent)
// Set up detection scorer if enabled.
var scorer *detection.Scorer
var scoreCancel context.CancelFunc
if s.cfg.Detection.Enabled {
scorer = detection.NewScorer()
recorder.AddCallback(func(ts time.Time, direction int, data []byte) {
scorer.RecordEvent(ts, direction, data)
})
var scoreCtx context.Context
scoreCtx, scoreCancel = context.WithCancel(context.Background())
go s.runScoreUpdater(scoreCtx, sessionID, scorer, sessionInfo)
}
if err := selectedShell.Handle(context.Background(), sessCtx, recorder); err != nil {
s.logger.Error("shell error", "err", err, "session_id", sessionID)
}
s.logger.Info("session ended",
"remote_addr", conn.RemoteAddr(),
"user", conn.User(),
"session_id", sessionID,
)
// Stop score updater and write final score.
if scoreCancel != nil {
scoreCancel()
}
if scorer != nil {
finalScore := scorer.Score()
if err := s.store.UpdateHumanScore(context.Background(), sessionID, finalScore); err != nil {
s.logger.Error("failed to write final human score", "err", err, "session_id", sessionID)
}
s.logger.Info("session ended",
"remote_addr", conn.RemoteAddr(),
"user", conn.User(),
"session_id", sessionID,
"human_score", finalScore,
)
} else {
s.logger.Info("session ended",
"remote_addr", conn.RemoteAddr(),
"user", conn.User(),
"session_id", sessionID,
)
}
}
// runScoreUpdater periodically computes the human score, writes it to the DB,
// and triggers a notification if the threshold is crossed.
func (s *Server) runScoreUpdater(ctx context.Context, sessionID string, scorer *detection.Scorer, sessionInfo notify.SessionInfo) {
ticker := time.NewTicker(s.cfg.Detection.UpdateIntervalDuration)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
score := scorer.Score()
if err := s.store.UpdateHumanScore(ctx, sessionID, score); err != nil {
s.logger.Error("failed to update human score", "err", err, "session_id", sessionID)
continue
}
s.logger.Debug("human score updated", "session_id", sessionID, "score", score)
if score >= s.cfg.Detection.Threshold {
info := sessionInfo
info.HumanScore = score
s.notifier.Notify(ctx, notify.EventHumanDetected, info)
}
}
}
}
func (s *Server) passwordCallback(conn ssh.ConnMetadata, password []byte) (*ssh.Permissions, error) {

View File

@@ -2,6 +2,7 @@ package bash
import (
"context"
"errors"
"fmt"
"io"
"strings"
@@ -55,7 +56,7 @@ func (b *BashShell) Handle(ctx context.Context, sess *shell.SessionContext, rw i
}
line, err := readLine(ctx, rw)
if err == io.EOF {
if errors.Is(err, io.EOF) {
fmt.Fprint(rw, "logout\r\n")
return nil
}
@@ -81,7 +82,9 @@ func (b *BashShell) Handle(ctx context.Context, sess *shell.SessionContext, rw i
// Log command and output to store.
if sess.Store != nil {
sess.Store.AppendSessionLog(ctx, sess.SessionID, trimmed, output)
if err := sess.Store.AppendSessionLog(ctx, sess.SessionID, trimmed, output); err != nil {
return fmt.Errorf("append session log: %w", err)
}
}
if result.exit {
@@ -145,8 +148,7 @@ func readLine(ctx context.Context, rw io.ReadWriter) (string, error) {
// Read and discard the rest of the escape sequence.
// Most are 3 bytes: ESC [ X (arrow keys, etc.)
next := make([]byte, 1)
rw.Read(next)
if next[0] == '[' {
if n, _ := rw.Read(next); n > 0 && next[0] == '[' {
rw.Read(next) // read the final byte
}

View File

@@ -3,6 +3,7 @@ package bash
import (
"bytes"
"context"
"errors"
"io"
"strings"
"testing"
@@ -108,7 +109,7 @@ func TestReadLineCtrlD(t *testing.T) {
ctx := context.Background()
_, err := readLine(ctx, rw)
if err != io.EOF {
if !errors.Is(err, io.EOF) {
t.Fatalf("expected io.EOF, got %v", err)
}
}

View File

@@ -0,0 +1,92 @@
package shell
import (
"context"
"log/slog"
"sync"
"time"
"git.t-juice.club/torjus/oubliette/internal/storage"
)
// EventRecorder buffers I/O events in memory and periodically flushes them to
// a storage.Store. It is designed to be registered as a RecordingChannel
// callback so that SSH I/O is never blocked by database writes.
type EventRecorder struct {
sessionID string
store storage.Store
logger *slog.Logger
mu sync.Mutex
buf []storage.SessionEvent
cancel context.CancelFunc
done chan struct{}
}
// NewEventRecorder creates a recorder that will persist events for the given session.
func NewEventRecorder(sessionID string, store storage.Store, logger *slog.Logger) *EventRecorder {
return &EventRecorder{
sessionID: sessionID,
store: store,
logger: logger,
done: make(chan struct{}),
}
}
// RecordEvent implements the EventCallback signature and appends an event to
// the in-memory buffer. It is safe to call concurrently.
func (er *EventRecorder) RecordEvent(ts time.Time, direction int, data []byte) {
er.mu.Lock()
defer er.mu.Unlock()
er.buf = append(er.buf, storage.SessionEvent{
SessionID: er.sessionID,
Timestamp: ts,
Direction: direction,
Data: data,
})
}
// Start begins the background flush goroutine that drains the buffer every 2 seconds.
func (er *EventRecorder) Start(ctx context.Context) {
ctx, er.cancel = context.WithCancel(ctx)
go er.run(ctx)
}
// Close cancels the background goroutine and performs a final flush.
func (er *EventRecorder) Close() {
if er.cancel != nil {
er.cancel()
}
<-er.done
}
func (er *EventRecorder) run(ctx context.Context) {
defer close(er.done)
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
er.flush()
return
case <-ticker.C:
er.flush()
}
}
}
func (er *EventRecorder) flush() {
er.mu.Lock()
if len(er.buf) == 0 {
er.mu.Unlock()
return
}
events := er.buf
er.buf = nil
er.mu.Unlock()
if err := er.store.AppendSessionEvents(context.Background(), events); err != nil {
er.logger.Error("failed to flush session events", "err", err, "session_id", er.sessionID)
}
}

View File

@@ -0,0 +1,80 @@
package shell
import (
"context"
"log/slog"
"testing"
"time"
"git.t-juice.club/torjus/oubliette/internal/storage"
)
func TestEventRecorderFlush(t *testing.T) {
store := storage.NewMemoryStore()
ctx := context.Background()
// Create a session so events have a valid session ID.
id, err := store.CreateSession(ctx, "10.0.0.1", "root", "bash")
if err != nil {
t.Fatalf("CreateSession: %v", err)
}
rec := NewEventRecorder(id, store, slog.Default())
rec.Start(ctx)
// Record some events.
now := time.Now()
rec.RecordEvent(now, 0, []byte("hello"))
rec.RecordEvent(now.Add(100*time.Millisecond), 1, []byte("world"))
// Close should trigger final flush.
rec.Close()
events, err := store.GetSessionEvents(ctx, id)
if err != nil {
t.Fatalf("GetSessionEvents: %v", err)
}
if len(events) != 2 {
t.Fatalf("len = %d, want 2", len(events))
}
if string(events[0].Data) != "hello" {
t.Errorf("events[0].Data = %q, want %q", events[0].Data, "hello")
}
if events[0].Direction != 0 {
t.Errorf("events[0].Direction = %d, want 0", events[0].Direction)
}
if string(events[1].Data) != "world" {
t.Errorf("events[1].Data = %q, want %q", events[1].Data, "world")
}
if events[1].Direction != 1 {
t.Errorf("events[1].Direction = %d, want 1", events[1].Direction)
}
}
func TestEventRecorderPeriodicFlush(t *testing.T) {
store := storage.NewMemoryStore()
ctx := context.Background()
id, err := store.CreateSession(ctx, "10.0.0.1", "root", "bash")
if err != nil {
t.Fatalf("CreateSession: %v", err)
}
rec := NewEventRecorder(id, store, slog.Default())
rec.Start(ctx)
// Record an event and wait for the periodic flush (2s + some margin).
rec.RecordEvent(time.Now(), 1, []byte("periodic"))
time.Sleep(3 * time.Second)
events, err := store.GetSessionEvents(ctx, id)
if err != nil {
t.Fatalf("GetSessionEvents: %v", err)
}
if len(events) != 1 {
t.Errorf("expected periodic flush, got %d events", len(events))
}
rec.Close()
}

View File

@@ -1,12 +1,19 @@
package shell
import "io"
import (
"io"
"time"
)
// RecordingChannel wraps an io.ReadWriteCloser. In Phase 1.4 it is a
// pass-through; Phase 2.3 will add byte-level keystroke recording here
// without changing any shell code.
// EventCallback is called with a copy of data whenever the channel is read or written.
// direction is 0 for input (client→server) and 1 for output (server→client).
type EventCallback func(ts time.Time, direction int, data []byte)
// RecordingChannel wraps an io.ReadWriteCloser and optionally invokes callbacks
// on every Read (input) and Write (output).
type RecordingChannel struct {
inner io.ReadWriteCloser
inner io.ReadWriteCloser
callbacks []EventCallback
}
// NewRecordingChannel returns a RecordingChannel wrapping rw.
@@ -14,6 +21,42 @@ func NewRecordingChannel(rw io.ReadWriteCloser) *RecordingChannel {
return &RecordingChannel{inner: rw}
}
func (r *RecordingChannel) Read(p []byte) (int, error) { return r.inner.Read(p) }
func (r *RecordingChannel) Write(p []byte) (int, error) { return r.inner.Write(p) }
func (r *RecordingChannel) Close() error { return r.inner.Close() }
// WithCallback clears existing callbacks, sets the given one, and returns the
// RecordingChannel for chaining. Kept for backward compatibility.
func (r *RecordingChannel) WithCallback(cb EventCallback) *RecordingChannel {
r.callbacks = []EventCallback{cb}
return r
}
// AddCallback appends an additional event callback.
func (r *RecordingChannel) AddCallback(cb EventCallback) {
r.callbacks = append(r.callbacks, cb)
}
func (r *RecordingChannel) Read(p []byte) (int, error) {
n, err := r.inner.Read(p)
if n > 0 && len(r.callbacks) > 0 {
ts := time.Now()
cp := make([]byte, n)
copy(cp, p[:n])
for _, cb := range r.callbacks {
cb(ts, 0, cp)
}
}
return n, err
}
func (r *RecordingChannel) Write(p []byte) (int, error) {
n, err := r.inner.Write(p)
if n > 0 && len(r.callbacks) > 0 {
ts := time.Now()
cp := make([]byte, n)
copy(cp, p[:n])
for _, cb := range r.callbacks {
cb(ts, 1, cp)
}
}
return n, err
}
func (r *RecordingChannel) Close() error { return r.inner.Close() }

View File

@@ -3,7 +3,9 @@ package shell
import (
"bytes"
"io"
"sync"
"testing"
"time"
)
// nopCloser wraps a ReadWriter with a no-op Close.
@@ -41,3 +43,80 @@ func TestRecordingChannelPassthrough(t *testing.T) {
t.Fatalf("Close: %v", err)
}
}
func TestRecordingChannelMultiCallback(t *testing.T) {
var buf bytes.Buffer
rc := NewRecordingChannel(nopCloser{&buf})
type event struct {
ts time.Time
direction int
data string
}
var mu sync.Mutex
var events1, events2 []event
rc.AddCallback(func(ts time.Time, direction int, data []byte) {
mu.Lock()
defer mu.Unlock()
events1 = append(events1, event{ts, direction, string(data)})
})
rc.AddCallback(func(ts time.Time, direction int, data []byte) {
mu.Lock()
defer mu.Unlock()
events2 = append(events2, event{ts, direction, string(data)})
})
// Write triggers both callbacks with direction=1.
rc.Write([]byte("hello"))
// Read triggers both callbacks with direction=0.
out := make([]byte, 16)
rc.Read(out)
mu.Lock()
defer mu.Unlock()
if len(events1) != 2 {
t.Fatalf("callback1 got %d events, want 2", len(events1))
}
if len(events2) != 2 {
t.Fatalf("callback2 got %d events, want 2", len(events2))
}
// Write event should be direction=1.
if events1[0].direction != 1 {
t.Errorf("write direction = %d, want 1", events1[0].direction)
}
// Read event should be direction=0.
if events1[1].direction != 0 {
t.Errorf("read direction = %d, want 0", events1[1].direction)
}
// Both callbacks should get the same timestamp for a single operation.
if events1[0].ts != events2[0].ts {
t.Error("callbacks should receive the same timestamp")
}
}
func TestRecordingChannelWithCallbackClearsExisting(t *testing.T) {
var buf bytes.Buffer
rc := NewRecordingChannel(nopCloser{&buf})
called1 := false
called2 := false
rc.AddCallback(func(_ time.Time, _ int, _ []byte) { called1 = true })
// WithCallback should clear existing and set new.
rc.WithCallback(func(_ time.Time, _ int, _ []byte) { called2 = true })
rc.Write([]byte("x"))
if called1 {
t.Error("first callback should not be called after WithCallback")
}
if !called2 {
t.Error("second callback should be called")
}
}

View File

@@ -15,6 +15,7 @@ type MemoryStore struct {
LoginAttempts []LoginAttempt
Sessions map[string]*Session
SessionLogs []SessionLog
SessionEvents []SessionEvent
}
// NewMemoryStore returns a new empty MemoryStore.
@@ -101,6 +102,55 @@ func (m *MemoryStore) AppendSessionLog(_ context.Context, sessionID, input, outp
return nil
}
func (m *MemoryStore) GetSession(_ context.Context, sessionID string) (*Session, error) {
m.mu.Lock()
defer m.mu.Unlock()
s, ok := m.Sessions[sessionID]
if !ok {
return nil, nil
}
copy := *s
return &copy, nil
}
func (m *MemoryStore) GetSessionLogs(_ context.Context, sessionID string) ([]SessionLog, error) {
m.mu.Lock()
defer m.mu.Unlock()
var logs []SessionLog
for _, l := range m.SessionLogs {
if l.SessionID == sessionID {
logs = append(logs, l)
}
}
sort.Slice(logs, func(i, j int) bool {
return logs[i].Timestamp.Before(logs[j].Timestamp)
})
return logs, nil
}
func (m *MemoryStore) AppendSessionEvents(_ context.Context, events []SessionEvent) error {
m.mu.Lock()
defer m.mu.Unlock()
m.SessionEvents = append(m.SessionEvents, events...)
return nil
}
func (m *MemoryStore) GetSessionEvents(_ context.Context, sessionID string) ([]SessionEvent, error) {
m.mu.Lock()
defer m.mu.Unlock()
var events []SessionEvent
for _, e := range m.SessionEvents {
if e.SessionID == sessionID {
events = append(events, e)
}
}
return events, nil
}
func (m *MemoryStore) DeleteRecordsBefore(_ context.Context, cutoff time.Time) (int64, error) {
m.mu.Lock()
defer m.mu.Unlock()
@@ -136,6 +186,16 @@ func (m *MemoryStore) DeleteRecordsBefore(_ context.Context, cutoff time.Time) (
}
m.SessionLogs = keptLogs
keptEvents := m.SessionEvents[:0]
for _, e := range m.SessionEvents {
if _, ok := m.Sessions[e.SessionID]; ok {
keptEvents = append(keptEvents, e)
} else {
total++
}
}
m.SessionEvents = keptEvents
return total, nil
}

View File

@@ -0,0 +1,9 @@
CREATE TABLE session_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
timestamp TEXT NOT NULL,
direction INTEGER NOT NULL,
data BLOB NOT NULL
);
CREATE INDEX idx_session_events_session_id ON session_events(session_id);

View File

@@ -25,8 +25,8 @@ func TestMigrateCreatesTablesAndVersion(t *testing.T) {
if err := db.QueryRow(`SELECT version FROM schema_version`).Scan(&version); err != nil {
t.Fatalf("query version: %v", err)
}
if version != 1 {
t.Errorf("version = %d, want 1", version)
if version != 2 {
t.Errorf("version = %d, want 2", version)
}
// Verify tables exist by inserting into them.
@@ -64,8 +64,8 @@ func TestMigrateIdempotent(t *testing.T) {
if err := db.QueryRow(`SELECT version FROM schema_version`).Scan(&version); err != nil {
t.Fatalf("query version: %v", err)
}
if version != 1 {
t.Errorf("version = %d after double migrate, want 1", version)
if version != 2 {
t.Errorf("version = %d after double migrate, want 2", version)
}
}

View File

@@ -94,6 +94,111 @@ func (s *SQLiteStore) AppendSessionLog(ctx context.Context, sessionID, input, ou
return nil
}
func (s *SQLiteStore) GetSession(ctx context.Context, sessionID string) (*Session, error) {
var sess Session
var connectedAt string
var disconnectedAt sql.NullString
var humanScore sql.NullFloat64
err := s.db.QueryRowContext(ctx, `
SELECT id, ip, username, shell_name, connected_at, disconnected_at, human_score
FROM sessions WHERE id = ?`, sessionID).Scan(
&sess.ID, &sess.IP, &sess.Username, &sess.ShellName,
&connectedAt, &disconnectedAt, &humanScore,
)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("querying session: %w", err)
}
sess.ConnectedAt, _ = time.Parse(time.RFC3339, connectedAt)
if disconnectedAt.Valid {
t, _ := time.Parse(time.RFC3339, disconnectedAt.String)
sess.DisconnectedAt = &t
}
if humanScore.Valid {
sess.HumanScore = &humanScore.Float64
}
return &sess, nil
}
func (s *SQLiteStore) GetSessionLogs(ctx context.Context, sessionID string) ([]SessionLog, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT id, session_id, timestamp, input, output
FROM session_logs WHERE session_id = ?
ORDER BY timestamp`, sessionID)
if err != nil {
return nil, fmt.Errorf("querying session logs: %w", err)
}
defer func() { _ = rows.Close() }()
var logs []SessionLog
for rows.Next() {
var l SessionLog
var ts string
if err := rows.Scan(&l.ID, &l.SessionID, &ts, &l.Input, &l.Output); err != nil {
return nil, fmt.Errorf("scanning session log: %w", err)
}
l.Timestamp, _ = time.Parse(time.RFC3339, ts)
logs = append(logs, l)
}
return logs, rows.Err()
}
func (s *SQLiteStore) AppendSessionEvents(ctx context.Context, events []SessionEvent) error {
if len(events) == 0 {
return nil
}
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("begin transaction: %w", err)
}
defer tx.Rollback()
stmt, err := tx.PrepareContext(ctx, `
INSERT INTO session_events (session_id, timestamp, direction, data)
VALUES (?, ?, ?, ?)`)
if err != nil {
return fmt.Errorf("preparing statement: %w", err)
}
defer stmt.Close()
for _, e := range events {
_, err := stmt.ExecContext(ctx, e.SessionID, e.Timestamp.UTC().Format(time.RFC3339Nano), e.Direction, e.Data)
if err != nil {
return fmt.Errorf("inserting session event: %w", err)
}
}
return tx.Commit()
}
func (s *SQLiteStore) GetSessionEvents(ctx context.Context, sessionID string) ([]SessionEvent, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT session_id, timestamp, direction, data
FROM session_events WHERE session_id = ?
ORDER BY id`, sessionID)
if err != nil {
return nil, fmt.Errorf("querying session events: %w", err)
}
defer func() { _ = rows.Close() }()
var events []SessionEvent
for rows.Next() {
var e SessionEvent
var ts string
if err := rows.Scan(&e.SessionID, &ts, &e.Direction, &e.Data); err != nil {
return nil, fmt.Errorf("scanning session event: %w", err)
}
e.Timestamp, _ = time.Parse(time.RFC3339Nano, ts)
events = append(events, e)
}
return events, rows.Err()
}
func (s *SQLiteStore) DeleteRecordsBefore(ctx context.Context, cutoff time.Time) (int64, error) {
cutoffStr := cutoff.UTC().Format(time.RFC3339)
@@ -105,15 +210,26 @@ func (s *SQLiteStore) DeleteRecordsBefore(ctx context.Context, cutoff time.Time)
var total int64
// Delete session logs for old sessions.
// Delete session events for old sessions.
res, err := tx.ExecContext(ctx, `
DELETE FROM session_events WHERE session_id IN (
SELECT id FROM sessions WHERE connected_at < ?
)`, cutoffStr)
if err != nil {
return 0, fmt.Errorf("deleting session events: %w", err)
}
n, _ := res.RowsAffected()
total += n
// Delete session logs for old sessions.
res, err = tx.ExecContext(ctx, `
DELETE FROM session_logs WHERE session_id IN (
SELECT id FROM sessions WHERE connected_at < ?
)`, cutoffStr)
if err != nil {
return 0, fmt.Errorf("deleting session logs: %w", err)
}
n, _ := res.RowsAffected()
n, _ = res.RowsAffected()
total += n
// Delete old sessions.

View File

@@ -205,11 +205,7 @@ func TestDeleteRecordsBefore(t *testing.T) {
}
func TestNewSQLiteStoreCreatesFile(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), "subdir", "test.db")
// Parent directory doesn't exist yet; SQLite should create it.
// Actually, SQLite doesn't create parent dirs, but the file itself.
// Use a path in the temp dir directly.
dbPath = filepath.Join(t.TempDir(), "test.db")
dbPath := filepath.Join(t.TempDir(), "test.db")
store, err := NewSQLiteStore(dbPath)
if err != nil {
t.Fatalf("creating store: %v", err)

View File

@@ -36,6 +36,14 @@ type SessionLog struct {
Output string
}
// SessionEvent represents a single I/O event recorded during a session.
type SessionEvent struct {
SessionID string
Timestamp time.Time
Direction int // 0=input (client→server), 1=output (server→client)
Data []byte
}
// DashboardStats holds aggregate counts for the web dashboard.
type DashboardStats struct {
TotalAttempts int64
@@ -88,6 +96,18 @@ type Store interface {
// If activeOnly is true, only sessions with no disconnected_at are returned.
GetRecentSessions(ctx context.Context, limit int, activeOnly bool) ([]Session, error)
// GetSession returns a single session by ID.
GetSession(ctx context.Context, sessionID string) (*Session, error)
// GetSessionLogs returns all log entries for a session ordered by timestamp.
GetSessionLogs(ctx context.Context, sessionID string) ([]SessionLog, error)
// AppendSessionEvents batch-inserts session events.
AppendSessionEvents(ctx context.Context, events []SessionEvent) error
// GetSessionEvents returns all events for a session ordered by id.
GetSessionEvents(ctx context.Context, sessionID string) ([]SessionEvent, error)
// Close releases any resources held by the store.
Close() error
}

View File

@@ -37,17 +37,17 @@ func seedData(t *testing.T, store Store) {
ctx := context.Background()
// Login attempts: root/toor from two IPs, admin/admin from one IP.
for i := 0; i < 5; i++ {
for range 5 {
if err := store.RecordLoginAttempt(ctx, "root", "toor", "10.0.0.1"); err != nil {
t.Fatalf("seeding attempt: %v", err)
}
}
for i := 0; i < 3; i++ {
for range 3 {
if err := store.RecordLoginAttempt(ctx, "root", "toor", "10.0.0.2"); err != nil {
t.Fatalf("seeding attempt: %v", err)
}
}
for i := 0; i < 2; i++ {
for range 2 {
if err := store.RecordLoginAttempt(ctx, "admin", "admin", "10.0.0.1"); err != nil {
t.Fatalf("seeding attempt: %v", err)
}
@@ -194,6 +194,128 @@ func TestGetTopIPs(t *testing.T) {
})
}
func TestGetSession(t *testing.T) {
testStores(t, func(t *testing.T, newStore storeFactory) {
t.Run("not found", func(t *testing.T) {
store := newStore(t)
s, err := store.GetSession(context.Background(), "nonexistent")
if err != nil {
t.Fatalf("GetSession: %v", err)
}
if s != nil {
t.Errorf("expected nil, got %+v", s)
}
})
t.Run("found", func(t *testing.T) {
store := newStore(t)
ctx := context.Background()
id, err := store.CreateSession(ctx, "10.0.0.1", "root", "bash")
if err != nil {
t.Fatalf("CreateSession: %v", err)
}
s, err := store.GetSession(ctx, id)
if err != nil {
t.Fatalf("GetSession: %v", err)
}
if s == nil {
t.Fatal("expected session, got nil")
}
if s.ID != id || s.IP != "10.0.0.1" || s.Username != "root" || s.ShellName != "bash" {
t.Errorf("unexpected session: %+v", s)
}
})
})
}
func TestGetSessionLogs(t *testing.T) {
testStores(t, func(t *testing.T, newStore storeFactory) {
store := newStore(t)
ctx := context.Background()
id, err := store.CreateSession(ctx, "10.0.0.1", "root", "bash")
if err != nil {
t.Fatalf("CreateSession: %v", err)
}
if err := store.AppendSessionLog(ctx, id, "ls", "file1\nfile2"); err != nil {
t.Fatalf("AppendSessionLog: %v", err)
}
if err := store.AppendSessionLog(ctx, id, "pwd", "/home/root"); err != nil {
t.Fatalf("AppendSessionLog: %v", err)
}
logs, err := store.GetSessionLogs(ctx, id)
if err != nil {
t.Fatalf("GetSessionLogs: %v", err)
}
if len(logs) != 2 {
t.Fatalf("len = %d, want 2", len(logs))
}
if logs[0].Input != "ls" {
t.Errorf("logs[0].Input = %q, want %q", logs[0].Input, "ls")
}
if logs[1].Input != "pwd" {
t.Errorf("logs[1].Input = %q, want %q", logs[1].Input, "pwd")
}
})
}
func TestSessionEvents(t *testing.T) {
testStores(t, func(t *testing.T, newStore storeFactory) {
t.Run("empty", func(t *testing.T) {
store := newStore(t)
events, err := store.GetSessionEvents(context.Background(), "nonexistent")
if err != nil {
t.Fatalf("GetSessionEvents: %v", err)
}
if len(events) != 0 {
t.Errorf("expected empty, got %d", len(events))
}
})
t.Run("append and retrieve", func(t *testing.T) {
store := newStore(t)
ctx := context.Background()
id, err := store.CreateSession(ctx, "10.0.0.1", "root", "bash")
if err != nil {
t.Fatalf("CreateSession: %v", err)
}
now := time.Now().UTC()
events := []SessionEvent{
{SessionID: id, Timestamp: now, Direction: 0, Data: []byte("ls\n")},
{SessionID: id, Timestamp: now.Add(100 * time.Millisecond), Direction: 1, Data: []byte("file1\nfile2\n")},
{SessionID: id, Timestamp: now.Add(200 * time.Millisecond), Direction: 0, Data: []byte("pwd\n")},
}
if err := store.AppendSessionEvents(ctx, events); err != nil {
t.Fatalf("AppendSessionEvents: %v", err)
}
got, err := store.GetSessionEvents(ctx, id)
if err != nil {
t.Fatalf("GetSessionEvents: %v", err)
}
if len(got) != 3 {
t.Fatalf("len = %d, want 3", len(got))
}
if got[0].Direction != 0 || string(got[0].Data) != "ls\n" {
t.Errorf("got[0] = %+v", got[0])
}
if got[1].Direction != 1 || string(got[1].Data) != "file1\nfile2\n" {
t.Errorf("got[1] = %+v", got[1])
}
})
t.Run("append empty", func(t *testing.T) {
store := newStore(t)
if err := store.AppendSessionEvents(context.Background(), nil); err != nil {
t.Fatalf("AppendSessionEvents(nil): %v", err)
}
})
})
}
func TestGetRecentSessions(t *testing.T) {
testStores(t, func(t *testing.T, newStore storeFactory) {
t.Run("empty", func(t *testing.T) {

View File

@@ -1,6 +1,8 @@
package web
import (
"encoding/base64"
"encoding/json"
"net/http"
"git.t-juice.club/torjus/oubliette/internal/storage"
@@ -70,7 +72,7 @@ func (s *Server) handleDashboard(w http.ResponseWriter, r *http.Request) {
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
if err := s.tmpl.ExecuteTemplate(w, "layout.html", data); err != nil {
if err := s.tmpl.dashboard.ExecuteTemplate(w, "layout.html", data); err != nil {
s.logger.Error("failed to render dashboard", "err", err)
}
}
@@ -84,7 +86,7 @@ func (s *Server) handleFragmentStats(w http.ResponseWriter, r *http.Request) {
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
if err := s.tmpl.ExecuteTemplate(w, "stats", stats); err != nil {
if err := s.tmpl.dashboard.ExecuteTemplate(w, "stats", stats); err != nil {
s.logger.Error("failed to render stats fragment", "err", err)
}
}
@@ -98,7 +100,95 @@ func (s *Server) handleFragmentActiveSessions(w http.ResponseWriter, r *http.Req
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
if err := s.tmpl.ExecuteTemplate(w, "active_sessions", sessions); err != nil {
if err := s.tmpl.dashboard.ExecuteTemplate(w, "active_sessions", sessions); err != nil {
s.logger.Error("failed to render active sessions fragment", "err", err)
}
}
type sessionDetailData struct {
Session *storage.Session
Logs []storage.SessionLog
EventCount int
}
func (s *Server) handleSessionDetail(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
sessionID := r.PathValue("id")
session, err := s.store.GetSession(ctx, sessionID)
if err != nil {
s.logger.Error("failed to get session", "err", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
if session == nil {
http.NotFound(w, r)
return
}
logs, err := s.store.GetSessionLogs(ctx, sessionID)
if err != nil {
s.logger.Error("failed to get session logs", "err", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
events, err := s.store.GetSessionEvents(ctx, sessionID)
if err != nil {
s.logger.Error("failed to get session events", "err", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
data := sessionDetailData{
Session: session,
Logs: logs,
EventCount: len(events),
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
if err := s.tmpl.sessionDetail.ExecuteTemplate(w, "layout.html", data); err != nil {
s.logger.Error("failed to render session detail", "err", err)
}
}
type apiEvent struct {
T int64 `json:"t"`
D int `json:"d"`
Data string `json:"data"`
}
type apiEventsResponse struct {
Events []apiEvent `json:"events"`
}
func (s *Server) handleAPISessionEvents(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
sessionID := r.PathValue("id")
events, err := s.store.GetSessionEvents(ctx, sessionID)
if err != nil {
s.logger.Error("failed to get session events", "err", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
resp := apiEventsResponse{Events: make([]apiEvent, len(events))}
var baseTime int64
for i, e := range events {
ms := e.Timestamp.UnixMilli()
if i == 0 {
baseTime = ms
}
resp.Events[i] = apiEvent{
T: ms - baseTime,
D: e.Direction,
Data: base64.StdEncoding.EncodeToString(e.Data),
}
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(resp); err != nil {
s.logger.Error("failed to encode session events", "err", err)
}
}

View File

@@ -0,0 +1,83 @@
// ReplayPlayer drives xterm.js playback of recorded session events.
function ReplayPlayer(containerId, sessionId) {
this.terminal = new Terminal({
cols: 80,
rows: 24,
convertEol: true,
disableStdin: true,
theme: {
background: '#000000',
foreground: '#ffffff'
}
});
this.terminal.open(document.getElementById(containerId));
this.sessionId = sessionId;
this.events = [];
this.index = 0;
this.speed = 1;
this.timers = [];
this.playing = false;
// Fetch events immediately.
var self = this;
fetch('/api/sessions/' + sessionId + '/events')
.then(function(r) { return r.json(); })
.then(function(data) {
self.events = data.events || [];
});
}
ReplayPlayer.prototype.play = function() {
if (this.playing) return;
if (this.events.length === 0) return;
this.playing = true;
this._schedule();
};
ReplayPlayer.prototype.pause = function() {
this.playing = false;
for (var i = 0; i < this.timers.length; i++) {
clearTimeout(this.timers[i]);
}
this.timers = [];
};
ReplayPlayer.prototype.reset = function() {
this.pause();
this.index = 0;
this.terminal.reset();
};
ReplayPlayer.prototype.setSpeed = function(speed) {
this.speed = speed;
if (this.playing) {
this.pause();
this.play();
}
};
ReplayPlayer.prototype._schedule = function() {
var self = this;
var baseT = this.index < this.events.length ? this.events[this.index].t : 0;
for (var i = this.index; i < this.events.length; i++) {
(function(idx) {
var evt = self.events[idx];
var delay = (evt.t - baseT) / self.speed;
var timer = setTimeout(function() {
if (!self.playing) return;
// Only write output events (d=1) to terminal; input is echoed in output.
if (evt.d === 1) {
var raw = atob(evt.data);
self.terminal.write(raw);
}
self.index = idx + 1;
if (self.index >= self.events.length) {
self.playing = false;
}
}, delay);
self.timers.push(timer);
})(i);
}
};

View File

@@ -0,0 +1,209 @@
/**
* Copyright (c) 2014 The xterm.js authors. All rights reserved.
* Copyright (c) 2012-2013, Christopher Jeffrey (MIT License)
* https://github.com/chjj/term.js
* @license MIT
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
* Originally forked from (with the author's permission):
* Fabrice Bellard's javascript vt100 for jslinux:
* http://bellard.org/jslinux/
* Copyright (c) 2011 Fabrice Bellard
* The original design remains. The terminal itself
* has been extended to include xterm CSI codes, among
* other features.
*/
/**
* Default styles for xterm.js
*/
.xterm {
cursor: text;
position: relative;
user-select: none;
-ms-user-select: none;
-webkit-user-select: none;
}
.xterm.focus,
.xterm:focus {
outline: none;
}
.xterm .xterm-helpers {
position: absolute;
top: 0;
/**
* The z-index of the helpers must be higher than the canvases in order for
* IMEs to appear on top.
*/
z-index: 5;
}
.xterm .xterm-helper-textarea {
padding: 0;
border: 0;
margin: 0;
/* Move textarea out of the screen to the far left, so that the cursor is not visible */
position: absolute;
opacity: 0;
left: -9999em;
top: 0;
width: 0;
height: 0;
z-index: -5;
/** Prevent wrapping so the IME appears against the textarea at the correct position */
white-space: nowrap;
overflow: hidden;
resize: none;
}
.xterm .composition-view {
/* TODO: Composition position got messed up somewhere */
background: #000;
color: #FFF;
display: none;
position: absolute;
white-space: nowrap;
z-index: 1;
}
.xterm .composition-view.active {
display: block;
}
.xterm .xterm-viewport {
/* On OS X this is required in order for the scroll bar to appear fully opaque */
background-color: #000;
overflow-y: scroll;
cursor: default;
position: absolute;
right: 0;
left: 0;
top: 0;
bottom: 0;
}
.xterm .xterm-screen {
position: relative;
}
.xterm .xterm-screen canvas {
position: absolute;
left: 0;
top: 0;
}
.xterm .xterm-scroll-area {
visibility: hidden;
}
.xterm-char-measure-element {
display: inline-block;
visibility: hidden;
position: absolute;
top: 0;
left: -9999em;
line-height: normal;
}
.xterm.enable-mouse-events {
/* When mouse events are enabled (eg. tmux), revert to the standard pointer cursor */
cursor: default;
}
.xterm.xterm-cursor-pointer,
.xterm .xterm-cursor-pointer {
cursor: pointer;
}
.xterm.column-select.focus {
/* Column selection mode */
cursor: crosshair;
}
.xterm .xterm-accessibility,
.xterm .xterm-message {
position: absolute;
left: 0;
top: 0;
bottom: 0;
right: 0;
z-index: 10;
color: transparent;
pointer-events: none;
}
.xterm .live-region {
position: absolute;
left: -9999px;
width: 1px;
height: 1px;
overflow: hidden;
}
.xterm-dim {
/* Dim should not apply to background, so the opacity of the foreground color is applied
* explicitly in the generated class and reset to 1 here */
opacity: 1 !important;
}
.xterm-underline-1 { text-decoration: underline; }
.xterm-underline-2 { text-decoration: double underline; }
.xterm-underline-3 { text-decoration: wavy underline; }
.xterm-underline-4 { text-decoration: dotted underline; }
.xterm-underline-5 { text-decoration: dashed underline; }
.xterm-overline {
text-decoration: overline;
}
.xterm-overline.xterm-underline-1 { text-decoration: overline underline; }
.xterm-overline.xterm-underline-2 { text-decoration: overline double underline; }
.xterm-overline.xterm-underline-3 { text-decoration: overline wavy underline; }
.xterm-overline.xterm-underline-4 { text-decoration: overline dotted underline; }
.xterm-overline.xterm-underline-5 { text-decoration: overline dashed underline; }
.xterm-strikethrough {
text-decoration: line-through;
}
.xterm-screen .xterm-decoration-container .xterm-decoration {
z-index: 6;
position: absolute;
}
.xterm-screen .xterm-decoration-container .xterm-decoration.xterm-decoration-top-layer {
z-index: 7;
}
.xterm-decoration-overview-ruler {
z-index: 8;
position: absolute;
top: 0;
right: 0;
pointer-events: none;
}
.xterm-decoration-top {
z-index: 2;
position: relative;
}

8
internal/web/static/xterm.min.js vendored Normal file

File diff suppressed because one or more lines are too long

View File

@@ -2,6 +2,7 @@ package web
import (
"embed"
"fmt"
"html/template"
"time"
)
@@ -9,8 +10,13 @@ import (
//go:embed templates/*.html templates/fragments/*.html
var templateFS embed.FS
func loadTemplates() (*template.Template, error) {
funcMap := template.FuncMap{
type templateSet struct {
dashboard *template.Template
sessionDetail *template.Template
}
func templateFuncMap() template.FuncMap {
return template.FuncMap{
"formatTime": func(t time.Time) string {
return t.Format("2006-01-02 15:04:05 UTC")
},
@@ -26,12 +32,44 @@ func loadTemplates() (*template.Template, error) {
}
return *t
},
"derefFloat": func(f *float64) float64 {
if f == nil {
return 0
}
return *f
},
"formatScore": func(f *float64) string {
if f == nil {
return "-"
}
return fmt.Sprintf("%.0f%%", *f*100)
},
}
}
return template.New("").Funcs(funcMap).ParseFS(templateFS,
func loadTemplates() (*templateSet, error) {
funcMap := templateFuncMap()
dashboard, err := template.New("").Funcs(funcMap).ParseFS(templateFS,
"templates/layout.html",
"templates/dashboard.html",
"templates/fragments/stats.html",
"templates/fragments/active_sessions.html",
)
if err != nil {
return nil, fmt.Errorf("parsing dashboard templates: %w", err)
}
sessionDetail, err := template.New("").Funcs(funcMap).ParseFS(templateFS,
"templates/layout.html",
"templates/session_detail.html",
)
if err != nil {
return nil, fmt.Errorf("parsing session detail templates: %w", err)
}
return &templateSet{
dashboard: dashboard,
sessionDetail: sessionDetail,
}, nil
}

View File

@@ -70,6 +70,7 @@
<th>IP</th>
<th>Username</th>
<th>Shell</th>
<th>Score</th>
<th>Connected</th>
<th>Disconnected</th>
</tr>
@@ -77,15 +78,16 @@
<tbody>
{{range .RecentSessions}}
<tr>
<td><code>{{truncateID .ID}}</code></td>
<td><a href="/sessions/{{.ID}}"><code>{{truncateID .ID}}</code></a></td>
<td>{{.IP}}</td>
<td>{{.Username}}</td>
<td>{{.ShellName}}</td>
<td>{{if .HumanScore}}{{if gt (derefFloat .HumanScore) 0.6}}<mark>{{formatScore .HumanScore}}</mark>{{else}}{{formatScore .HumanScore}}{{end}}{{else}}-{{end}}</td>
<td>{{formatTime .ConnectedAt}}</td>
<td>{{if .DisconnectedAt}}{{formatTime (derefTime .DisconnectedAt)}}{{else}}<mark>active</mark>{{end}}</td>
</tr>
{{else}}
<tr><td colspan="6">No sessions</td></tr>
<tr><td colspan="7">No sessions</td></tr>
{{end}}
</tbody>
</table>

View File

@@ -6,20 +6,22 @@
<th>IP</th>
<th>Username</th>
<th>Shell</th>
<th>Score</th>
<th>Connected</th>
</tr>
</thead>
<tbody>
{{range .}}
<tr>
<td><code>{{truncateID .ID}}</code></td>
<td><a href="/sessions/{{.ID}}"><code>{{truncateID .ID}}</code></a></td>
<td>{{.IP}}</td>
<td>{{.Username}}</td>
<td>{{.ShellName}}</td>
<td>{{if .HumanScore}}{{if gt (derefFloat .HumanScore) 0.6}}<mark>{{formatScore .HumanScore}}</mark>{{else}}{{formatScore .HumanScore}}{{end}}{{else}}-{{end}}</td>
<td>{{formatTime .ConnectedAt}}</td>
</tr>
{{else}}
<tr><td colspan="5">No active sessions</td></tr>
<tr><td colspan="6">No active sessions</td></tr>
{{end}}
</tbody>
</table>

View File

@@ -0,0 +1,79 @@
{{define "content"}}
<section>
<h3>Session {{.Session.ID}}</h3>
<div class="top-grid">
<article>
<header>Session Info</header>
<table>
<tbody>
<tr><td><strong>IP</strong></td><td>{{.Session.IP}}</td></tr>
<tr><td><strong>Username</strong></td><td>{{.Session.Username}}</td></tr>
<tr><td><strong>Shell</strong></td><td>{{.Session.ShellName}}</td></tr>
<tr><td><strong>Score</strong></td><td>{{formatScore .Session.HumanScore}}</td></tr>
<tr><td><strong>Connected</strong></td><td>{{formatTime .Session.ConnectedAt}}</td></tr>
<tr>
<td><strong>Disconnected</strong></td>
<td>{{if .Session.DisconnectedAt}}{{formatTime (derefTime .Session.DisconnectedAt)}}{{else}}<mark>active</mark>{{end}}</td>
</tr>
</tbody>
</table>
</article>
</div>
</section>
{{if gt .EventCount 0}}
<section>
<h3>Session Replay</h3>
<div style="margin-bottom: 1rem;">
<button id="btn-play" onclick="replayPlayer.play()">Play</button>
<button id="btn-pause" onclick="replayPlayer.pause()">Pause</button>
<button id="btn-reset" onclick="replayPlayer.reset()">Reset</button>
<label for="speed-select" style="margin-left: 1rem;">Speed:</label>
<select id="speed-select" onchange="replayPlayer.setSpeed(parseFloat(this.value))">
<option value="0.5">0.5x</option>
<option value="1" selected>1x</option>
<option value="2">2x</option>
<option value="5">5x</option>
<option value="10">10x</option>
</select>
</div>
<div id="terminal" style="background: #000; padding: 4px; border-radius: 4px;"></div>
</section>
<link rel="stylesheet" href="/static/xterm.css">
<script src="/static/xterm.min.js"></script>
<script src="/static/replay.js"></script>
<script>
var replayPlayer = new ReplayPlayer("terminal", "{{.Session.ID}}");
</script>
{{else}}
<section>
<p>No recorded events for this session.</p>
</section>
{{end}}
{{if .Logs}}
<section>
<h3>Command Log</h3>
<table>
<thead>
<tr>
<th>Time</th>
<th>Input</th>
<th>Output</th>
</tr>
</thead>
<tbody>
{{range .Logs}}
<tr>
<td>{{formatTime .Timestamp}}</td>
<td><code>{{.Input}}</code></td>
<td><pre style="margin:0; white-space:pre-wrap;">{{.Output}}</pre></td>
</tr>
{{end}}
</tbody>
</table>
</section>
{{end}}
<p><a href="/">&larr; Back to dashboard</a></p>
{{end}}

View File

@@ -2,7 +2,6 @@ package web
import (
"embed"
"html/template"
"log/slog"
"net/http"
@@ -17,7 +16,7 @@ type Server struct {
store storage.Store
logger *slog.Logger
mux *http.ServeMux
tmpl *template.Template
tmpl *templateSet
}
// NewServer creates a new web Server with routes registered.
@@ -35,6 +34,8 @@ func NewServer(store storage.Store, logger *slog.Logger) (*Server, error) {
}
s.mux.Handle("GET /static/", http.FileServerFS(staticFS))
s.mux.HandleFunc("GET /sessions/{id}", s.handleSessionDetail)
s.mux.HandleFunc("GET /api/sessions/{id}/events", s.handleAPISessionEvents)
s.mux.HandleFunc("GET /", s.handleDashboard)
s.mux.HandleFunc("GET /fragments/stats", s.handleFragmentStats)
s.mux.HandleFunc("GET /fragments/active-sessions", s.handleFragmentActiveSessions)

View File

@@ -2,11 +2,13 @@ package web
import (
"context"
"encoding/json"
"log/slog"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"git.t-juice.club/torjus/oubliette/internal/storage"
)
@@ -27,7 +29,7 @@ func newSeededTestServer(t *testing.T) *Server {
store := storage.NewMemoryStore()
ctx := context.Background()
for i := 0; i < 5; i++ {
for range 5 {
if err := store.RecordLoginAttempt(ctx, "root", "toor", "10.0.0.1"); err != nil {
t.Fatalf("seeding attempt: %v", err)
}
@@ -54,7 +56,7 @@ func newSeededTestServer(t *testing.T) *Server {
func TestDashboardHandler(t *testing.T) {
t.Run("empty store", func(t *testing.T) {
srv := newTestServer(t)
req := httptest.NewRequest("GET", "/", nil)
req := httptest.NewRequest(http.MethodGet, "/", nil)
w := httptest.NewRecorder()
srv.ServeHTTP(w, req)
@@ -73,7 +75,7 @@ func TestDashboardHandler(t *testing.T) {
t.Run("with data", func(t *testing.T) {
srv := newSeededTestServer(t)
req := httptest.NewRequest("GET", "/", nil)
req := httptest.NewRequest(http.MethodGet, "/", nil)
w := httptest.NewRecorder()
srv.ServeHTTP(w, req)
@@ -93,7 +95,7 @@ func TestDashboardHandler(t *testing.T) {
func TestFragmentStats(t *testing.T) {
srv := newSeededTestServer(t)
req := httptest.NewRequest("GET", "/fragments/stats", nil)
req := httptest.NewRequest(http.MethodGet, "/fragments/stats", nil)
w := httptest.NewRecorder()
srv.ServeHTTP(w, req)
@@ -113,7 +115,7 @@ func TestFragmentStats(t *testing.T) {
func TestFragmentActiveSessions(t *testing.T) {
srv := newSeededTestServer(t)
req := httptest.NewRequest("GET", "/fragments/active-sessions", nil)
req := httptest.NewRequest(http.MethodGet, "/fragments/active-sessions", nil)
w := httptest.NewRecorder()
srv.ServeHTTP(w, req)
@@ -131,6 +133,109 @@ func TestFragmentActiveSessions(t *testing.T) {
}
}
func TestSessionDetailHandler(t *testing.T) {
t.Run("not found", func(t *testing.T) {
srv := newTestServer(t)
req := httptest.NewRequest(http.MethodGet, "/sessions/nonexistent", nil)
w := httptest.NewRecorder()
srv.ServeHTTP(w, req)
if w.Code != http.StatusNotFound {
t.Errorf("status = %d, want 404", w.Code)
}
})
t.Run("found", func(t *testing.T) {
store := storage.NewMemoryStore()
ctx := context.Background()
id, err := store.CreateSession(ctx, "10.0.0.1", "root", "bash")
if err != nil {
t.Fatalf("CreateSession: %v", err)
}
srv, err := NewServer(store, slog.Default())
if err != nil {
t.Fatalf("NewServer: %v", err)
}
req := httptest.NewRequest(http.MethodGet, "/sessions/"+id, nil)
w := httptest.NewRecorder()
srv.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Errorf("status = %d, want 200", w.Code)
}
body := w.Body.String()
if !strings.Contains(body, "10.0.0.1") {
t.Error("response should contain IP")
}
if !strings.Contains(body, "root") {
t.Error("response should contain username")
}
})
}
func TestAPISessionEvents(t *testing.T) {
store := storage.NewMemoryStore()
ctx := context.Background()
id, err := store.CreateSession(ctx, "10.0.0.1", "root", "bash")
if err != nil {
t.Fatalf("CreateSession: %v", err)
}
now := time.Now().UTC()
events := []storage.SessionEvent{
{SessionID: id, Timestamp: now, Direction: 0, Data: []byte("ls\n")},
{SessionID: id, Timestamp: now.Add(500 * time.Millisecond), Direction: 1, Data: []byte("file1\n")},
}
if err := store.AppendSessionEvents(ctx, events); err != nil {
t.Fatalf("AppendSessionEvents: %v", err)
}
srv, err := NewServer(store, slog.Default())
if err != nil {
t.Fatalf("NewServer: %v", err)
}
req := httptest.NewRequest(http.MethodGet, "/api/sessions/"+id+"/events", nil)
w := httptest.NewRecorder()
srv.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Errorf("status = %d, want 200", w.Code)
}
ct := w.Header().Get("Content-Type")
if !strings.Contains(ct, "application/json") {
t.Errorf("Content-Type = %q, want application/json", ct)
}
var resp apiEventsResponse
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
t.Fatalf("decoding response: %v", err)
}
if len(resp.Events) != 2 {
t.Fatalf("len = %d, want 2", len(resp.Events))
}
// First event should have t=0 (relative).
if resp.Events[0].T != 0 {
t.Errorf("events[0].T = %d, want 0", resp.Events[0].T)
}
// Second event should have t=500 (500ms later).
if resp.Events[1].T != 500 {
t.Errorf("events[1].T = %d, want 500", resp.Events[1].T)
}
if resp.Events[0].D != 0 {
t.Errorf("events[0].D = %d, want 0", resp.Events[0].D)
}
if resp.Events[1].D != 1 {
t.Errorf("events[1].D = %d, want 1", resp.Events[1].D)
}
}
func TestStaticAssets(t *testing.T) {
srv := newTestServer(t)
@@ -144,7 +249,7 @@ func TestStaticAssets(t *testing.T) {
for _, tt := range tests {
t.Run(tt.path, func(t *testing.T) {
req := httptest.NewRequest("GET", tt.path, nil)
req := httptest.NewRequest(http.MethodGet, tt.path, nil)
w := httptest.NewRecorder()
srv.ServeHTTP(w, req)

View File

@@ -31,3 +31,13 @@ retention_interval = "1h"
hostname = "ubuntu-server"
# banner = "Welcome to Ubuntu 22.04.3 LTS (GNU/Linux 5.15.0-89-generic x86_64)\r\n\r\n"
# fake_user = "" # override username in prompt; empty = use authenticated user
# [detection]
# enabled = true
# threshold = 0.6 # 0.01.0, sessions above this trigger notifications
# update_interval = "5s" # how often to recompute the score during a session
# [[notify.webhooks]]
# url = "https://ntfy.example.com/honeypot"
# headers = { Authorization = "Bearer your-token" }
# events = ["human_detected", "session_started"] # empty = all events