This repository has been archived on 2026-03-09. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
oubliette/internal/detection/scorer_test.go
Torjus Håkestad 0ad6f4cb6a feat: add human detection scoring and webhook notifications
Implement phase 2.1 (human detection) and 2.2 (notifications):

- Detection scorer computes 0.0-1.0 human likelihood from keystroke
  timing variance, special key usage, typing speed, command diversity,
  and session duration
- Webhook notifier sends JSON POST to configured endpoints with
  deduplication, custom headers, and event filtering
- RecordingChannel gains an event callback for feeding keystrokes
  to the scorer without coupling shell and detection packages
- Server wires scorer into session lifecycle with periodic updates
  and threshold-based notification triggers
- Web UI shows human score in session tables with highlighting
- New config sections: [detection] and [[notify.webhooks]]

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 21:28:11 +01:00

156 lines
3.6 KiB
Go

package detection
import (
"sync"
"testing"
"time"
)
func TestScorer_EmptyInput(t *testing.T) {
s := NewScorer()
score := s.Score()
if score != 0 {
t.Errorf("empty scorer: got %f, want 0", score)
}
}
func TestScorer_SingleKeystroke(t *testing.T) {
s := NewScorer()
s.RecordEvent(time.Now(), DirInput, []byte("a"))
score := s.Score()
if score != 0 {
t.Errorf("single keystroke: got %f, want 0", score)
}
}
func TestScorer_BotLikeInput(t *testing.T) {
// Simulate a bot: paste entire commands with uniform tiny delays, no special keys.
s := NewScorer()
now := time.Now()
// Bot pastes "cat /etc/passwd\r" all at once with perfectly uniform timing.
for range 3 {
cmd := []byte("cat /etc/passwd\r")
for _, b := range cmd {
s.RecordEvent(now, DirInput, []byte{b})
now = now.Add(100 * time.Microsecond) // ~0.1ms uniform delay = paste
}
}
score := s.Score()
if score >= 0.3 {
t.Errorf("bot-like input: got %f, want < 0.3", score)
}
}
func TestScorer_HumanLikeInput(t *testing.T) {
// Simulate a human: variable timing, backspaces, diverse commands.
s := NewScorer()
now := time.Now()
type cmd struct {
text string
delay time.Duration // base delay between keys
}
commands := []cmd{
{"ls -la\r", 80 * time.Millisecond},
{"cat /etc/paswd", 120 * time.Millisecond}, // typo
{string([]byte{0x7f}), 200 * time.Millisecond}, // backspace
{"wd\r", 90 * time.Millisecond}, // correction
{"whoami\r", 100 * time.Millisecond},
{"uname -a\r", 150 * time.Millisecond},
{string([]byte{0x09}), 300 * time.Millisecond}, // tab completion
{"pwd\r", 70 * time.Millisecond},
}
for _, c := range commands {
for _, b := range []byte(c.text) {
// Add ±30% jitter to make timing more natural.
jitter := time.Duration(float64(c.delay) * 0.3)
delay := c.delay + jitter // simplified: always add, still variable across commands
s.RecordEvent(now, DirInput, []byte{b})
now = now.Add(delay)
}
// Pause between commands (thinking time).
now = now.Add(2 * time.Second)
}
score := s.Score()
if score <= 0.6 {
t.Errorf("human-like input: got %f, want > 0.6", score)
}
}
func TestScorer_OutputIgnored(t *testing.T) {
s := NewScorer()
now := time.Now()
// Only output events — should not affect score.
for i := 0; i < 100; i++ {
s.RecordEvent(now, DirOutput, []byte("some output\n"))
now = now.Add(10 * time.Millisecond)
}
score := s.Score()
if score != 0 {
t.Errorf("output-only: got %f, want 0", score)
}
}
func TestScorer_ThreadSafety(t *testing.T) {
s := NewScorer()
now := time.Now()
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(offset int) {
defer wg.Done()
for j := 0; j < 100; j++ {
ts := now.Add(time.Duration(offset*100+j) * time.Millisecond)
s.RecordEvent(ts, DirInput, []byte("a"))
}
}(i)
}
// Concurrently read score.
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 50; i++ {
_ = s.Score()
}
}()
wg.Wait()
// Should not panic; score should be valid.
score := s.Score()
if score < 0 || score > 1 {
t.Errorf("concurrent score out of range: %f", score)
}
}
func TestScorer_CommandDiversity(t *testing.T) {
s := NewScorer()
now := time.Now()
// Type 4 different commands with human-ish timing.
cmds := []string{"ls\r", "pwd\r", "id\r", "whoami\r"}
for _, cmd := range cmds {
for _, b := range []byte(cmd) {
s.RecordEvent(now, DirInput, []byte{b})
now = now.Add(100 * time.Millisecond)
}
now = now.Add(time.Second)
}
score := s.Score()
// With 4 unique commands, human timing, and decent duration,
// we should get a meaningful score.
if score < 0.4 {
t.Errorf("diverse commands: got %f, want >= 0.4", score)
}
}