diff --git a/PLAN.md b/PLAN.md index 84a9bda..c268a91 100644 --- a/PLAN.md +++ b/PLAN.md @@ -117,14 +117,14 @@ This lets shells build realistic prompts (`username@hostname:~$`) and log activi Goal: Detect likely-human sessions and make the system smarter. -### 2.1 Human Detection Scoring +### 2.1 Human Detection Scoring ✅ - Keystroke timing analysis - Track backspace, tab, arrow key usage - Command diversity scoring - Compute per-session human score, store in sessions table - Flag sessions above configurable threshold -### 2.2 Notifications +### 2.2 Notifications ✅ - Webhook support (generic HTTP POST, works with Slack/Discord/ntfy) - Trigger on: human score threshold crossed, new session started, configurable - Include session details in payload diff --git a/README.md b/README.md index 8227971..133a717 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,10 @@ Key settings: - `shell.fake_user` — override username in prompt; empty uses the authenticated user - `web.enabled` — enable the web dashboard (default `false`) - `web.listen_addr` — web dashboard listen address (default `:8080`) +- `detection.enabled` — enable human detection scoring (default `false`) +- `detection.threshold` — score threshold (0.0–1.0) for flagging sessions (default `0.6`) +- `detection.update_interval` — how often to recompute scores (default `5s`) +- `notify.webhooks` — list of webhook endpoints for notifications (see example config) ### Run diff --git a/internal/config/config.go b/internal/config/config.go index 5721d3f..8104609 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -9,13 +9,15 @@ import ( ) type Config struct { - SSH SSHConfig `toml:"ssh"` - Auth AuthConfig `toml:"auth"` - Storage StorageConfig `toml:"storage"` - Shell ShellConfig `toml:"shell"` - Web WebConfig `toml:"web"` - LogLevel string `toml:"log_level"` - LogFormat string `toml:"log_format"` // "text" (default) or "json" + SSH SSHConfig `toml:"ssh"` + Auth AuthConfig `toml:"auth"` + Storage StorageConfig `toml:"storage"` + Shell ShellConfig `toml:"shell"` + Web WebConfig `toml:"web"` + Detection DetectionConfig `toml:"detection"` + Notify NotifyConfig `toml:"notify"` + LogLevel string `toml:"log_level"` + LogFormat string `toml:"log_format"` // "text" (default) or "json" } type WebConfig struct { @@ -59,6 +61,25 @@ type Credential struct { Password string `toml:"password"` } +type DetectionConfig struct { + Enabled bool `toml:"enabled"` + Threshold float64 `toml:"threshold"` + UpdateInterval string `toml:"update_interval"` + + // Parsed duration, not from TOML directly. + UpdateIntervalDuration time.Duration `toml:"-"` +} + +type NotifyConfig struct { + Webhooks []WebhookNotifyConfig `toml:"webhooks"` +} + +type WebhookNotifyConfig struct { + URL string `toml:"url"` + Headers map[string]string `toml:"headers"` + Events []string `toml:"events"` // empty = all events +} + func Load(path string) (*Config, error) { data, err := os.ReadFile(path) if err != nil { @@ -127,6 +148,12 @@ func applyDefaults(cfg *Config) { if cfg.Shell.Banner == "" { cfg.Shell.Banner = "Welcome to Ubuntu 22.04.3 LTS (GNU/Linux 5.15.0-89-generic x86_64)\r\n\r\n" } + if cfg.Detection.Threshold == 0 { + cfg.Detection.Threshold = 0.6 + } + if cfg.Detection.UpdateInterval == "" { + cfg.Detection.UpdateInterval = "5s" + } } // knownShellKeys are top-level keys in [shell] that are not per-shell sub-tables. @@ -189,5 +216,33 @@ func validate(cfg *Config) error { } } + // Validate detection config. + if cfg.Detection.Enabled { + if cfg.Detection.Threshold < 0 || cfg.Detection.Threshold > 1 { + return fmt.Errorf("detection.threshold must be between 0 and 1, got %f", cfg.Detection.Threshold) + } + ui, err := time.ParseDuration(cfg.Detection.UpdateInterval) + if err != nil { + return fmt.Errorf("invalid detection.update_interval %q: %w", cfg.Detection.UpdateInterval, err) + } + if ui <= 0 { + return fmt.Errorf("detection.update_interval must be positive, got %s", ui) + } + cfg.Detection.UpdateIntervalDuration = ui + } + + // Validate notify config. + knownEvents := map[string]bool{"human_detected": true, "session_started": true} + for i, wh := range cfg.Notify.Webhooks { + if wh.URL == "" { + return fmt.Errorf("notify.webhooks[%d]: url must not be empty", i) + } + for j, ev := range wh.Events { + if !knownEvents[ev] { + return fmt.Errorf("notify.webhooks[%d].events[%d]: unknown event %q", i, j, ev) + } + } + } + return nil } diff --git a/internal/detection/scorer.go b/internal/detection/scorer.go new file mode 100644 index 0000000..76dd433 --- /dev/null +++ b/internal/detection/scorer.go @@ -0,0 +1,259 @@ +package detection + +import ( + "math" + "sync" + "time" +) + +// Direction constants for RecordEvent. +const ( + DirInput = 0 // client → server (keystrokes) + DirOutput = 1 // server → client (shell output) +) + +// Signal weights for the composite score. +const ( + weightTimingVariance = 0.30 + weightSpecialKeys = 0.20 + weightTypingSpeed = 0.20 + weightCommandDiversity = 0.15 + weightSessionDuration = 0.15 +) + +// Scorer accumulates keystroke events and computes a 0.0–1.0 +// human likelihood score based on multiple signals. +type Scorer struct { + mu sync.Mutex + + // Input timing data. + inputTimes []time.Time + delays []time.Duration + + // Special key counters. + specialKeys int + + // Command tracking: we count newlines and unique command prefixes. + currentCmd []byte + commands map[string]struct{} + + // Session activity duration. + firstInput time.Time + lastInput time.Time +} + +// NewScorer returns a new Scorer ready to record events. +func NewScorer() *Scorer { + return &Scorer{ + commands: make(map[string]struct{}), + } +} + +// RecordEvent records a data event with timestamp and direction. +// direction should be DirInput (0) for client input or DirOutput (1) for server output. +func (s *Scorer) RecordEvent(ts time.Time, direction int, data []byte) { + if direction != DirInput { + return // only analyze input + } + + s.mu.Lock() + defer s.mu.Unlock() + + if s.firstInput.IsZero() { + s.firstInput = ts + } + s.lastInput = ts + + for _, b := range data { + // Track inter-keystroke delay for single-byte inputs. + if len(s.inputTimes) > 0 { + delay := ts.Sub(s.inputTimes[len(s.inputTimes)-1]) + if delay > 0 && delay < 30*time.Second { + s.delays = append(s.delays, delay) + } + } + s.inputTimes = append(s.inputTimes, ts) + + // Count special keys. + if isSpecialKey(b) { + s.specialKeys++ + } + + // Track commands (split on newline/CR). + if b == '\r' || b == '\n' { + cmd := string(s.currentCmd) + if len(cmd) > 0 { + s.commands[cmd] = struct{}{} + } + s.currentCmd = s.currentCmd[:0] + } else { + // Handle backspace: remove last byte from current command. + if b == 0x7f || b == 0x08 { + if len(s.currentCmd) > 0 { + s.currentCmd = s.currentCmd[:len(s.currentCmd)-1] + } + } else if b >= 0x20 { // printable + s.currentCmd = append(s.currentCmd, b) + } + } + } +} + +// Score computes the composite human likelihood score (0.0–1.0). +// Thread-safe. +func (s *Scorer) Score() float64 { + s.mu.Lock() + defer s.mu.Unlock() + + if len(s.inputTimes) == 0 { + return 0 + } + + tv := s.timingVarianceScore() + sk := s.specialKeysScore() + ts := s.typingSpeedScore() + cd := s.commandDiversityScore() + sd := s.sessionDurationScore() + + score := tv*weightTimingVariance + + sk*weightSpecialKeys + + ts*weightTypingSpeed + + cd*weightCommandDiversity + + sd*weightSessionDuration + + return clamp(score, 0, 1) +} + +// timingVarianceScore returns 0–1 based on coefficient of variation of inter-key delays. +// Bots have CV ≈ 0 (instant or uniform), humans have CV ≥ 0.6. +func (s *Scorer) timingVarianceScore() float64 { + if len(s.delays) < 3 { + return 0 + } + + mean := meanDuration(s.delays) + if mean == 0 { + return 0 + } + + variance := 0.0 + for _, d := range s.delays { + diff := float64(d) - float64(mean) + variance += diff * diff + } + variance /= float64(len(s.delays)) + stddev := math.Sqrt(variance) + cv := stddev / float64(mean) + + // Map CV to 0–1: CV of 0.6+ is fully human-like. + return clamp(cv/0.6, 0, 1) +} + +// specialKeysScore returns 0–1 based on count of special key presses. +// Scripts almost never generate backspace/tab/ctrl characters. +func (s *Scorer) specialKeysScore() float64 { + // 5+ special keys → full score. + return clamp(float64(s.specialKeys)/5.0, 0, 1) +} + +// typingSpeedScore returns 0–1 based on median inter-key delay. +// Paste/scripts have < 5ms, humans have 30–300ms. +func (s *Scorer) typingSpeedScore() float64 { + if len(s.delays) < 2 { + return 0 + } + + med := medianDuration(s.delays) + ms := float64(med) / float64(time.Millisecond) + + if ms < 5 { + return 0 // paste or script + } + if ms > 300 { + return 0.7 // very slow, still possibly human + } + if ms >= 30 && ms <= 300 { + return 1.0 // human range + } + // 5–30ms: transition zone + return clamp((ms-5)/25, 0, 1) +} + +// commandDiversityScore returns 0–1 based on number of unique commands. +func (s *Scorer) commandDiversityScore() float64 { + // 3+ unique commands → full score. + return clamp(float64(len(s.commands))/3.0, 0, 1) +} + +// sessionDurationScore returns 0–1 based on active input duration. +func (s *Scorer) sessionDurationScore() float64 { + if s.firstInput.IsZero() || s.lastInput.IsZero() { + return 0 + } + dur := s.lastInput.Sub(s.firstInput) + // 10s+ of active input → full score. + return clamp(float64(dur)/float64(10*time.Second), 0, 1) +} + +// isSpecialKey returns true for non-printable keys that humans commonly use. +func isSpecialKey(b byte) bool { + switch b { + case 0x7f, // DEL (backspace in most terminals) + 0x08, // BS + 0x09, // TAB + 0x03, // Ctrl-C + 0x04, // Ctrl-D + 0x1b: // ESC (arrow keys start with ESC) + return true + } + return false +} + +func clamp(v, lo, hi float64) float64 { + if v < lo { + return lo + } + if v > hi { + return hi + } + return v +} + +func meanDuration(ds []time.Duration) time.Duration { + if len(ds) == 0 { + return 0 + } + var sum time.Duration + for _, d := range ds { + sum += d + } + return sum / time.Duration(len(ds)) +} + +func medianDuration(ds []time.Duration) time.Duration { + n := len(ds) + if n == 0 { + return 0 + } + // Copy to avoid mutating the original. + sorted := make([]time.Duration, n) + copy(sorted, ds) + sortDurations(sorted) + if n%2 == 0 { + return (sorted[n/2-1] + sorted[n/2]) / 2 + } + return sorted[n/2] +} + +func sortDurations(ds []time.Duration) { + // Simple insertion sort — delay slices are small. + for i := 1; i < len(ds); i++ { + key := ds[i] + j := i - 1 + for j >= 0 && ds[j] > key { + ds[j+1] = ds[j] + j-- + } + ds[j+1] = key + } +} diff --git a/internal/detection/scorer_test.go b/internal/detection/scorer_test.go new file mode 100644 index 0000000..8835a3c --- /dev/null +++ b/internal/detection/scorer_test.go @@ -0,0 +1,155 @@ +package detection + +import ( + "sync" + "testing" + "time" +) + +func TestScorer_EmptyInput(t *testing.T) { + s := NewScorer() + score := s.Score() + if score != 0 { + t.Errorf("empty scorer: got %f, want 0", score) + } +} + +func TestScorer_SingleKeystroke(t *testing.T) { + s := NewScorer() + s.RecordEvent(time.Now(), DirInput, []byte("a")) + score := s.Score() + if score != 0 { + t.Errorf("single keystroke: got %f, want 0", score) + } +} + +func TestScorer_BotLikeInput(t *testing.T) { + // Simulate a bot: paste entire commands with uniform tiny delays, no special keys. + s := NewScorer() + now := time.Now() + + // Bot pastes "cat /etc/passwd\r" all at once with perfectly uniform timing. + for range 3 { + cmd := []byte("cat /etc/passwd\r") + for _, b := range cmd { + s.RecordEvent(now, DirInput, []byte{b}) + now = now.Add(100 * time.Microsecond) // ~0.1ms uniform delay = paste + } + } + + score := s.Score() + if score >= 0.3 { + t.Errorf("bot-like input: got %f, want < 0.3", score) + } +} + +func TestScorer_HumanLikeInput(t *testing.T) { + // Simulate a human: variable timing, backspaces, diverse commands. + s := NewScorer() + now := time.Now() + + type cmd struct { + text string + delay time.Duration // base delay between keys + } + + commands := []cmd{ + {"ls -la\r", 80 * time.Millisecond}, + {"cat /etc/paswd", 120 * time.Millisecond}, // typo + {string([]byte{0x7f}), 200 * time.Millisecond}, // backspace + {"wd\r", 90 * time.Millisecond}, // correction + {"whoami\r", 100 * time.Millisecond}, + {"uname -a\r", 150 * time.Millisecond}, + {string([]byte{0x09}), 300 * time.Millisecond}, // tab completion + {"pwd\r", 70 * time.Millisecond}, + } + + for _, c := range commands { + for _, b := range []byte(c.text) { + // Add ±30% jitter to make timing more natural. + jitter := time.Duration(float64(c.delay) * 0.3) + delay := c.delay + jitter // simplified: always add, still variable across commands + s.RecordEvent(now, DirInput, []byte{b}) + now = now.Add(delay) + } + // Pause between commands (thinking time). + now = now.Add(2 * time.Second) + } + + score := s.Score() + if score <= 0.6 { + t.Errorf("human-like input: got %f, want > 0.6", score) + } +} + +func TestScorer_OutputIgnored(t *testing.T) { + s := NewScorer() + now := time.Now() + + // Only output events — should not affect score. + for i := 0; i < 100; i++ { + s.RecordEvent(now, DirOutput, []byte("some output\n")) + now = now.Add(10 * time.Millisecond) + } + + score := s.Score() + if score != 0 { + t.Errorf("output-only: got %f, want 0", score) + } +} + +func TestScorer_ThreadSafety(t *testing.T) { + s := NewScorer() + now := time.Now() + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func(offset int) { + defer wg.Done() + for j := 0; j < 100; j++ { + ts := now.Add(time.Duration(offset*100+j) * time.Millisecond) + s.RecordEvent(ts, DirInput, []byte("a")) + } + }(i) + } + + // Concurrently read score. + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 50; i++ { + _ = s.Score() + } + }() + + wg.Wait() + + // Should not panic; score should be valid. + score := s.Score() + if score < 0 || score > 1 { + t.Errorf("concurrent score out of range: %f", score) + } +} + +func TestScorer_CommandDiversity(t *testing.T) { + s := NewScorer() + now := time.Now() + + // Type 4 different commands with human-ish timing. + cmds := []string{"ls\r", "pwd\r", "id\r", "whoami\r"} + for _, cmd := range cmds { + for _, b := range []byte(cmd) { + s.RecordEvent(now, DirInput, []byte{b}) + now = now.Add(100 * time.Millisecond) + } + now = now.Add(time.Second) + } + + score := s.Score() + // With 4 unique commands, human timing, and decent duration, + // we should get a meaningful score. + if score < 0.4 { + t.Errorf("diverse commands: got %f, want >= 0.4", score) + } +} diff --git a/internal/notify/webhook.go b/internal/notify/webhook.go new file mode 100644 index 0000000..c96cc25 --- /dev/null +++ b/internal/notify/webhook.go @@ -0,0 +1,179 @@ +package notify + +import ( + "bytes" + "context" + "encoding/json" + "log/slog" + "net/http" + "sync" + "time" + + "git.t-juice.club/torjus/oubliette/internal/config" +) + +// Event types. +const ( + EventHumanDetected = "human_detected" + EventSessionStarted = "session_started" +) + +// SessionInfo holds session data included in webhook payloads. +type SessionInfo struct { + ID string `json:"id"` + IP string `json:"ip"` + Username string `json:"username"` + ShellName string `json:"shell_name"` + HumanScore float64 `json:"human_score"` + ConnectedAt string `json:"connected_at"` +} + +// webhookPayload is the JSON body sent to webhooks. +type webhookPayload struct { + Event string `json:"event"` + Timestamp string `json:"timestamp"` + Session SessionInfo `json:"session"` +} + +// Notifier sends webhook notifications for honeypot events. +type Notifier struct { + webhooks []config.WebhookNotifyConfig + logger *slog.Logger + client *http.Client + + mu sync.Mutex + sent map[string]struct{} // dedup key: "sessionID:eventType" +} + +// NewNotifier creates a Notifier with the given webhook configurations. +func NewNotifier(webhooks []config.WebhookNotifyConfig, logger *slog.Logger) *Notifier { + return &Notifier{ + webhooks: webhooks, + logger: logger, + client: &http.Client{Timeout: 10 * time.Second}, + sent: make(map[string]struct{}), + } +} + +// Notify sends a notification for the given event type and session. +// Deduplicates by (sessionID, eventType) — each combination is sent at most once. +func (n *Notifier) Notify(ctx context.Context, eventType string, session SessionInfo) { + dedupKey := session.ID + ":" + eventType + + n.mu.Lock() + if _, ok := n.sent[dedupKey]; ok { + n.mu.Unlock() + return + } + n.sent[dedupKey] = struct{}{} + n.mu.Unlock() + + payload := webhookPayload{ + Event: eventType, + Timestamp: time.Now().UTC().Format(time.RFC3339), + Session: session, + } + + for _, wh := range n.webhooks { + if !n.shouldSend(wh, eventType) { + continue + } + go n.send(ctx, wh, payload) + } +} + +// CleanupSession removes dedup state for a session. +func (n *Notifier) CleanupSession(sessionID string) { + n.mu.Lock() + defer n.mu.Unlock() + for key := range n.sent { + if len(key) > len(sessionID) && key[:len(sessionID)+1] == sessionID+":" { + delete(n.sent, key) + } + } +} + +// shouldSend returns true if the webhook is configured to receive this event type. +func (n *Notifier) shouldSend(wh config.WebhookNotifyConfig, eventType string) bool { + if len(wh.Events) == 0 { + return true // empty = all events + } + for _, ev := range wh.Events { + if ev == eventType { + return true + } + } + return false +} + +func (n *Notifier) send(ctx context.Context, wh config.WebhookNotifyConfig, payload webhookPayload) { + body, err := json.Marshal(payload) + if err != nil { + n.logger.Error("failed to marshal webhook payload", "err", err) + return + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, wh.URL, bytes.NewReader(body)) + if err != nil { + n.logger.Error("failed to create webhook request", "err", err, "url", wh.URL) + return + } + + req.Header.Set("Content-Type", "application/json") + for k, v := range wh.Headers { + req.Header.Set(k, v) + } + + resp, err := n.client.Do(req) + if err != nil { + n.logger.Error("webhook request failed", "err", err, "url", wh.URL) + return + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode >= 400 { + n.logger.Warn("webhook returned error status", + "url", wh.URL, + "status", resp.StatusCode, + "event", payload.Event, + ) + return + } + + n.logger.Debug("webhook sent", + "url", wh.URL, + "event", payload.Event, + "session_id", payload.Session.ID, + ) +} + +// FormatConnectedAt formats a time for use in SessionInfo. +func FormatConnectedAt(t time.Time) string { + return t.UTC().Format(time.RFC3339) +} + +// NoopNotifier is a no-op notifier used when no webhooks are configured. +type NoopNotifier struct{} + +func (NoopNotifier) Notify(context.Context, string, SessionInfo) {} +func (NoopNotifier) CleanupSession(string) {} + +// Sender is the interface for sending notifications. +type Sender interface { + Notify(ctx context.Context, eventType string, session SessionInfo) + CleanupSession(sessionID string) +} + +var ( + _ Sender = (*Notifier)(nil) + _ Sender = NoopNotifier{} +) + +// NewSender creates a Sender from configuration. Returns a NoopNotifier +// if no webhooks are configured. +func NewSender(webhooks []config.WebhookNotifyConfig, logger *slog.Logger) Sender { + if len(webhooks) == 0 { + return NoopNotifier{} + } + return NewNotifier(webhooks, logger) +} diff --git a/internal/notify/webhook_test.go b/internal/notify/webhook_test.go new file mode 100644 index 0000000..17da4e8 --- /dev/null +++ b/internal/notify/webhook_test.go @@ -0,0 +1,243 @@ +package notify + +import ( + "context" + "encoding/json" + "log/slog" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "git.t-juice.club/torjus/oubliette/internal/config" +) + +func testSession() SessionInfo { + return SessionInfo{ + ID: "test-session-123", + IP: "1.2.3.4", + Username: "root", + ShellName: "bash", + HumanScore: 0.85, + ConnectedAt: FormatConnectedAt(time.Now()), + } +} + +func TestNotifier_PayloadStructure(t *testing.T) { + var received webhookPayload + var mu sync.Mutex + done := make(chan struct{}) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + mu.Lock() + defer mu.Unlock() + if err := json.NewDecoder(r.Body).Decode(&received); err != nil { + t.Errorf("failed to decode payload: %v", err) + } + w.WriteHeader(http.StatusOK) + close(done) + })) + defer srv.Close() + + webhooks := []config.WebhookNotifyConfig{ + {URL: srv.URL}, + } + + n := NewNotifier(webhooks, slog.Default()) + session := testSession() + n.Notify(context.Background(), EventHumanDetected, session) + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("timeout waiting for webhook") + } + + mu.Lock() + defer mu.Unlock() + if received.Event != EventHumanDetected { + t.Errorf("event: got %q, want %q", received.Event, EventHumanDetected) + } + if received.Session.ID != session.ID { + t.Errorf("session ID: got %q, want %q", received.Session.ID, session.ID) + } + if received.Session.IP != session.IP { + t.Errorf("session IP: got %q, want %q", received.Session.IP, session.IP) + } + if received.Session.HumanScore != session.HumanScore { + t.Errorf("score: got %f, want %f", received.Session.HumanScore, session.HumanScore) + } + if received.Timestamp == "" { + t.Error("timestamp should not be empty") + } +} + +func TestNotifier_CustomHeaders(t *testing.T) { + var receivedHeaders http.Header + done := make(chan struct{}) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + receivedHeaders = r.Header.Clone() + w.WriteHeader(http.StatusOK) + close(done) + })) + defer srv.Close() + + webhooks := []config.WebhookNotifyConfig{ + { + URL: srv.URL, + Headers: map[string]string{ + "Authorization": "Bearer test-token", + "X-Custom": "my-value", + }, + }, + } + + n := NewNotifier(webhooks, slog.Default()) + n.Notify(context.Background(), EventSessionStarted, testSession()) + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("timeout waiting for webhook") + } + + if got := receivedHeaders.Get("Authorization"); got != "Bearer test-token" { + t.Errorf("Authorization header: got %q, want %q", got, "Bearer test-token") + } + if got := receivedHeaders.Get("X-Custom"); got != "my-value" { + t.Errorf("X-Custom header: got %q, want %q", got, "my-value") + } + if got := receivedHeaders.Get("Content-Type"); got != "application/json" { + t.Errorf("Content-Type: got %q, want %q", got, "application/json") + } +} + +func TestNotifier_Deduplication(t *testing.T) { + var count int + var mu sync.Mutex + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + mu.Lock() + count++ + mu.Unlock() + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + webhooks := []config.WebhookNotifyConfig{{URL: srv.URL}} + n := NewNotifier(webhooks, slog.Default()) + session := testSession() + + // Send same event three times for the same session. + for range 3 { + n.Notify(context.Background(), EventHumanDetected, session) + } + + // Allow goroutines to complete. + time.Sleep(500 * time.Millisecond) + + mu.Lock() + defer mu.Unlock() + if count != 1 { + t.Errorf("dedup: got %d sends, want 1", count) + } +} + +func TestNotifier_EventFiltering(t *testing.T) { + var receivedEvents []string + var mu sync.Mutex + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var payload webhookPayload + _ = json.NewDecoder(r.Body).Decode(&payload) + mu.Lock() + receivedEvents = append(receivedEvents, payload.Event) + mu.Unlock() + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + // Only subscribe to human_detected. + webhooks := []config.WebhookNotifyConfig{ + { + URL: srv.URL, + Events: []string{EventHumanDetected}, + }, + } + + n := NewNotifier(webhooks, slog.Default()) + session := testSession() + + // Send both event types. + n.Notify(context.Background(), EventSessionStarted, session) + // Need a different session for human_detected to avoid dedup with same session. + session2 := testSession() + session2.ID = "test-session-456" + n.Notify(context.Background(), EventHumanDetected, session2) + + time.Sleep(500 * time.Millisecond) + + mu.Lock() + defer mu.Unlock() + if len(receivedEvents) != 1 { + t.Fatalf("event filtering: got %d events, want 1", len(receivedEvents)) + } + if receivedEvents[0] != EventHumanDetected { + t.Errorf("filtered event: got %q, want %q", receivedEvents[0], EventHumanDetected) + } +} + +func TestNotifier_CleanupSession(t *testing.T) { + var count int + var mu sync.Mutex + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + mu.Lock() + count++ + mu.Unlock() + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + webhooks := []config.WebhookNotifyConfig{{URL: srv.URL}} + n := NewNotifier(webhooks, slog.Default()) + session := testSession() + + n.Notify(context.Background(), EventHumanDetected, session) + time.Sleep(200 * time.Millisecond) + + // Cleanup and resend — should work again. + n.CleanupSession(session.ID) + n.Notify(context.Background(), EventHumanDetected, session) + time.Sleep(200 * time.Millisecond) + + mu.Lock() + defer mu.Unlock() + if count != 2 { + t.Errorf("after cleanup: got %d sends, want 2", count) + } +} + +func TestNoopNotifier(t *testing.T) { + // Should not panic. + n := NoopNotifier{} + n.Notify(context.Background(), EventHumanDetected, testSession()) + n.CleanupSession("test") +} + +func TestNewSender_NoWebhooks(t *testing.T) { + sender := NewSender(nil, slog.Default()) + if _, ok := sender.(NoopNotifier); !ok { + t.Errorf("expected NoopNotifier, got %T", sender) + } +} + +func TestNewSender_WithWebhooks(t *testing.T) { + webhooks := []config.WebhookNotifyConfig{{URL: "http://example.com"}} + sender := NewSender(webhooks, slog.Default()) + if _, ok := sender.(*Notifier); !ok { + t.Errorf("expected *Notifier, got %T", sender) + } +} diff --git a/internal/server/server.go b/internal/server/server.go index 62dc597..43b2136 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -14,6 +14,8 @@ import ( "git.t-juice.club/torjus/oubliette/internal/auth" "git.t-juice.club/torjus/oubliette/internal/config" + "git.t-juice.club/torjus/oubliette/internal/detection" + "git.t-juice.club/torjus/oubliette/internal/notify" "git.t-juice.club/torjus/oubliette/internal/shell" "git.t-juice.club/torjus/oubliette/internal/shell/bash" "git.t-juice.club/torjus/oubliette/internal/storage" @@ -21,13 +23,14 @@ import ( ) type Server struct { - cfg config.Config - store storage.Store - authenticator *auth.Authenticator - sshConfig *ssh.ServerConfig - logger *slog.Logger - connSem chan struct{} // semaphore limiting concurrent connections - shellRegistry *shell.Registry + cfg config.Config + store storage.Store + authenticator *auth.Authenticator + sshConfig *ssh.ServerConfig + logger *slog.Logger + connSem chan struct{} // semaphore limiting concurrent connections + shellRegistry *shell.Registry + notifier notify.Sender } func New(cfg config.Config, store storage.Store, logger *slog.Logger) (*Server, error) { @@ -43,6 +46,7 @@ func New(cfg config.Config, store storage.Store, logger *slog.Logger) (*Server, logger: logger, connSem: make(chan struct{}, cfg.SSH.MaxConnections), shellRegistry: registry, + notifier: notify.NewSender(cfg.Notify.Webhooks, logger), } hostKey, err := loadOrGenerateHostKey(cfg.SSH.HostKeyPath) @@ -159,6 +163,18 @@ func (s *Server) handleSession(channel ssh.Channel, requests <-chan *ssh.Request "session_id", sessionID, ) + // Send session_started notification. + connectedAt := time.Now() + sessionInfo := notify.SessionInfo{ + ID: sessionID, + IP: ip, + Username: conn.User(), + ShellName: selectedShell.Name(), + ConnectedAt: notify.FormatConnectedAt(connectedAt), + } + s.notifier.Notify(context.Background(), notify.EventSessionStarted, sessionInfo) + defer s.notifier.CleanupSession(sessionID) + // Handle session requests (pty-req, shell, etc.) go func() { for req := range requests { @@ -194,18 +210,76 @@ func (s *Server) handleSession(channel ssh.Channel, requests <-chan *ssh.Request }, } - // Wrap channel in RecordingChannel for future byte-level recording. + // Wrap channel in RecordingChannel. recorder := shell.NewRecordingChannel(channel) + // Set up detection scorer if enabled. + var scorer *detection.Scorer + var scoreCancel context.CancelFunc + if s.cfg.Detection.Enabled { + scorer = detection.NewScorer() + recorder.WithCallback(func(ts time.Time, direction int, data []byte) { + scorer.RecordEvent(ts, direction, data) + }) + + var scoreCtx context.Context + scoreCtx, scoreCancel = context.WithCancel(context.Background()) + go s.runScoreUpdater(scoreCtx, sessionID, scorer, sessionInfo) + } + if err := selectedShell.Handle(context.Background(), sessCtx, recorder); err != nil { s.logger.Error("shell error", "err", err, "session_id", sessionID) } - s.logger.Info("session ended", - "remote_addr", conn.RemoteAddr(), - "user", conn.User(), - "session_id", sessionID, - ) + // Stop score updater and write final score. + if scoreCancel != nil { + scoreCancel() + } + if scorer != nil { + finalScore := scorer.Score() + if err := s.store.UpdateHumanScore(context.Background(), sessionID, finalScore); err != nil { + s.logger.Error("failed to write final human score", "err", err, "session_id", sessionID) + } + s.logger.Info("session ended", + "remote_addr", conn.RemoteAddr(), + "user", conn.User(), + "session_id", sessionID, + "human_score", finalScore, + ) + } else { + s.logger.Info("session ended", + "remote_addr", conn.RemoteAddr(), + "user", conn.User(), + "session_id", sessionID, + ) + } +} + +// runScoreUpdater periodically computes the human score, writes it to the DB, +// and triggers a notification if the threshold is crossed. +func (s *Server) runScoreUpdater(ctx context.Context, sessionID string, scorer *detection.Scorer, sessionInfo notify.SessionInfo) { + ticker := time.NewTicker(s.cfg.Detection.UpdateIntervalDuration) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + score := scorer.Score() + if err := s.store.UpdateHumanScore(ctx, sessionID, score); err != nil { + s.logger.Error("failed to update human score", "err", err, "session_id", sessionID) + continue + } + s.logger.Debug("human score updated", "session_id", sessionID, "score", score) + + if score >= s.cfg.Detection.Threshold { + info := sessionInfo + info.HumanScore = score + s.notifier.Notify(ctx, notify.EventHumanDetected, info) + } + } + } } func (s *Server) passwordCallback(conn ssh.ConnMetadata, password []byte) (*ssh.Permissions, error) { diff --git a/internal/shell/recorder.go b/internal/shell/recorder.go index bfcc7fb..b5e4d01 100644 --- a/internal/shell/recorder.go +++ b/internal/shell/recorder.go @@ -1,12 +1,20 @@ package shell -import "io" +import ( + "io" + "time" +) -// RecordingChannel wraps an io.ReadWriteCloser. In Phase 1.4 it is a -// pass-through; Phase 2.3 will add byte-level keystroke recording here -// without changing any shell code. +// EventCallback is called with a copy of data whenever the channel is read or written. +// direction is 0 for input (client→server) and 1 for output (server→client). +type EventCallback func(ts time.Time, direction int, data []byte) + +// RecordingChannel wraps an io.ReadWriteCloser and optionally invokes a callback +// on every Read (input) and Write (output). Phase 2.3 will add byte-level +// keystroke recording here without changing any shell code. type RecordingChannel struct { - inner io.ReadWriteCloser + inner io.ReadWriteCloser + callback EventCallback } // NewRecordingChannel returns a RecordingChannel wrapping rw. @@ -14,6 +22,30 @@ func NewRecordingChannel(rw io.ReadWriteCloser) *RecordingChannel { return &RecordingChannel{inner: rw} } -func (r *RecordingChannel) Read(p []byte) (int, error) { return r.inner.Read(p) } -func (r *RecordingChannel) Write(p []byte) (int, error) { return r.inner.Write(p) } -func (r *RecordingChannel) Close() error { return r.inner.Close() } +// WithCallback sets the event callback and returns the RecordingChannel for chaining. +func (r *RecordingChannel) WithCallback(cb EventCallback) *RecordingChannel { + r.callback = cb + return r +} + +func (r *RecordingChannel) Read(p []byte) (int, error) { + n, err := r.inner.Read(p) + if n > 0 && r.callback != nil { + cp := make([]byte, n) + copy(cp, p[:n]) + r.callback(time.Now(), 0, cp) + } + return n, err +} + +func (r *RecordingChannel) Write(p []byte) (int, error) { + n, err := r.inner.Write(p) + if n > 0 && r.callback != nil { + cp := make([]byte, n) + copy(cp, p[:n]) + r.callback(time.Now(), 1, cp) + } + return n, err +} + +func (r *RecordingChannel) Close() error { return r.inner.Close() } diff --git a/internal/web/templates.go b/internal/web/templates.go index aabe364..7cb6afd 100644 --- a/internal/web/templates.go +++ b/internal/web/templates.go @@ -2,6 +2,7 @@ package web import ( "embed" + "fmt" "html/template" "time" ) @@ -26,6 +27,18 @@ func loadTemplates() (*template.Template, error) { } return *t }, + "derefFloat": func(f *float64) float64 { + if f == nil { + return 0 + } + return *f + }, + "formatScore": func(f *float64) string { + if f == nil { + return "-" + } + return fmt.Sprintf("%.0f%%", *f*100) + }, } return template.New("").Funcs(funcMap).ParseFS(templateFS, diff --git a/internal/web/templates/dashboard.html b/internal/web/templates/dashboard.html index 3caee8f..fb5c370 100644 --- a/internal/web/templates/dashboard.html +++ b/internal/web/templates/dashboard.html @@ -70,6 +70,7 @@