feat: add server-side session filtering with input bytes and human score

Replace client-side session table filtering with server-side filtering
via a new /fragments/recent-sessions htmx endpoint. Add InputBytes column
to session tables, Human score > 0 checkbox filter, and Sort by Input
Bytes option to help identify sessions with actual shell interaction.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-18 09:09:59 +01:00
parent 0908b43724
commit cb7be28f42
11 changed files with 409 additions and 61 deletions

View File

@@ -336,10 +336,26 @@ func (m *MemoryStore) GetRecentSessions(_ context.Context, limit int, activeOnly
m.mu.Lock()
defer m.mu.Unlock()
// Count events per session.
return m.collectSessions(limit, activeOnly, DashboardFilter{}), nil
}
func (m *MemoryStore) GetFilteredSessions(_ context.Context, limit int, activeOnly bool, f DashboardFilter) ([]Session, error) {
m.mu.Lock()
defer m.mu.Unlock()
return m.collectSessions(limit, activeOnly, f), nil
}
// collectSessions gathers sessions matching filter criteria. Must be called with m.mu held.
func (m *MemoryStore) collectSessions(limit int, activeOnly bool, f DashboardFilter) []Session {
// Compute event counts and input bytes per session.
eventCounts := make(map[string]int)
inputBytes := make(map[string]int64)
for _, e := range m.SessionEvents {
eventCounts[e.SessionID]++
if e.Direction == 0 {
inputBytes[e.SessionID] += int64(len(e.Data))
}
}
var sessions []Session
@@ -347,17 +363,54 @@ func (m *MemoryStore) GetRecentSessions(_ context.Context, limit int, activeOnly
if activeOnly && s.DisconnectedAt != nil {
continue
}
if !matchesSessionFilter(s, f) {
continue
}
sess := *s
sess.EventCount = eventCounts[s.ID]
sess.InputBytes = inputBytes[s.ID]
sessions = append(sessions, sess)
}
sort.Slice(sessions, func(i, j int) bool {
return sessions[i].ConnectedAt.After(sessions[j].ConnectedAt)
})
if f.SortBy == "input_bytes" {
sort.Slice(sessions, func(i, j int) bool {
return sessions[i].InputBytes > sessions[j].InputBytes
})
} else {
sort.Slice(sessions, func(i, j int) bool {
return sessions[i].ConnectedAt.After(sessions[j].ConnectedAt)
})
}
if limit > 0 && len(sessions) > limit {
sessions = sessions[:limit]
}
return sessions, nil
return sessions
}
// matchesSessionFilter returns true if the session matches the given filter.
func matchesSessionFilter(s *Session, f DashboardFilter) bool {
if f.Since != nil && s.ConnectedAt.Before(*f.Since) {
return false
}
if f.Until != nil && s.ConnectedAt.After(*f.Until) {
return false
}
if f.IP != "" && s.IP != f.IP {
return false
}
if f.Country != "" && s.Country != f.Country {
return false
}
if f.Username != "" && s.Username != f.Username {
return false
}
if f.HumanScoreAboveZero {
if s.HumanScore == nil || *s.HumanScore <= 0 {
return false
}
}
return true
}
func (m *MemoryStore) GetTopExecCommands(_ context.Context, limit int) ([]TopEntry, error) {

View File

@@ -383,40 +383,104 @@ func (s *SQLiteStore) queryTopN(ctx context.Context, column string, limit int) (
}
func (s *SQLiteStore) GetRecentSessions(ctx context.Context, limit int, activeOnly bool) ([]Session, error) {
query := `SELECT s.id, s.ip, s.country, s.username, s.shell_name, s.connected_at, s.disconnected_at, s.human_score, s.exec_command, COUNT(e.id) as event_count FROM sessions s LEFT JOIN session_events e ON s.id = e.session_id`
query := `SELECT s.id, s.ip, s.country, s.username, s.shell_name, s.connected_at, s.disconnected_at, s.human_score, s.exec_command, COUNT(e.id) as event_count, COALESCE(SUM(CASE WHEN e.direction = 0 THEN LENGTH(e.data) ELSE 0 END), 0) as input_bytes FROM sessions s LEFT JOIN session_events e ON s.id = e.session_id`
if activeOnly {
query += ` WHERE s.disconnected_at IS NULL`
}
query += ` GROUP BY s.id ORDER BY s.connected_at DESC LIMIT ?`
rows, err := s.db.QueryContext(ctx, query, limit)
return s.scanSessions(ctx, query, limit)
}
// buildSessionWhereClause builds a dynamic WHERE clause for session filtering.
func buildSessionWhereClause(f DashboardFilter, activeOnly bool) (string, []any) {
var clauses []string
var args []any
if activeOnly {
clauses = append(clauses, "s.disconnected_at IS NULL")
}
if f.Since != nil {
clauses = append(clauses, "s.connected_at >= ?")
args = append(args, f.Since.UTC().Format(time.RFC3339))
}
if f.Until != nil {
clauses = append(clauses, "s.connected_at <= ?")
args = append(args, f.Until.UTC().Format(time.RFC3339))
}
if f.IP != "" {
clauses = append(clauses, "s.ip = ?")
args = append(args, f.IP)
}
if f.Country != "" {
clauses = append(clauses, "s.country = ?")
args = append(args, f.Country)
}
if f.Username != "" {
clauses = append(clauses, "s.username = ?")
args = append(args, f.Username)
}
if f.HumanScoreAboveZero {
clauses = append(clauses, "s.human_score > 0")
}
if len(clauses) == 0 {
return "", nil
}
return " WHERE " + strings.Join(clauses, " AND "), args
}
// validSessionSorts maps allowed SortBy values to SQL ORDER BY clauses.
var validSessionSorts = map[string]string{
"connected_at": "s.connected_at DESC",
"input_bytes": "input_bytes DESC",
}
func (s *SQLiteStore) GetFilteredSessions(ctx context.Context, limit int, activeOnly bool, f DashboardFilter) ([]Session, error) {
where, args := buildSessionWhereClause(f, activeOnly)
args = append(args, limit)
orderBy := validSessionSorts["connected_at"]
if mapped, ok := validSessionSorts[f.SortBy]; ok {
orderBy = mapped
}
//nolint:gosec // where/order clauses built from allowlisted constants, not raw user input
query := `SELECT s.id, s.ip, s.country, s.username, s.shell_name, s.connected_at, s.disconnected_at, s.human_score, s.exec_command, COUNT(e.id) as event_count, COALESCE(SUM(CASE WHEN e.direction = 0 THEN LENGTH(e.data) ELSE 0 END), 0) as input_bytes FROM sessions s LEFT JOIN session_events e ON s.id = e.session_id` + where + ` GROUP BY s.id ORDER BY ` + orderBy + ` LIMIT ?`
return s.scanSessions(ctx, query, args...)
}
// scanSessions executes a session query and scans the results.
func (s *SQLiteStore) scanSessions(ctx context.Context, query string, args ...any) ([]Session, error) {
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("querying recent sessions: %w", err)
return nil, fmt.Errorf("querying sessions: %w", err)
}
defer func() { _ = rows.Close() }()
var sessions []Session
for rows.Next() {
var s Session
var sess Session
var connectedAt string
var disconnectedAt sql.NullString
var humanScore sql.NullFloat64
var execCommand sql.NullString
if err := rows.Scan(&s.ID, &s.IP, &s.Country, &s.Username, &s.ShellName, &connectedAt, &disconnectedAt, &humanScore, &execCommand, &s.EventCount); err != nil {
if err := rows.Scan(&sess.ID, &sess.IP, &sess.Country, &sess.Username, &sess.ShellName, &connectedAt, &disconnectedAt, &humanScore, &execCommand, &sess.EventCount, &sess.InputBytes); err != nil {
return nil, fmt.Errorf("scanning session: %w", err)
}
s.ConnectedAt, _ = time.Parse(time.RFC3339, connectedAt)
sess.ConnectedAt, _ = time.Parse(time.RFC3339, connectedAt)
if disconnectedAt.Valid {
t, _ := time.Parse(time.RFC3339, disconnectedAt.String)
s.DisconnectedAt = &t
sess.DisconnectedAt = &t
}
if humanScore.Valid {
s.HumanScore = &humanScore.Float64
sess.HumanScore = &humanScore.Float64
}
if execCommand.Valid {
s.ExecCommand = &execCommand.String
sess.ExecCommand = &execCommand.String
}
sessions = append(sessions, s)
sessions = append(sessions, sess)
}
return sessions, rows.Err()
}

View File

@@ -29,6 +29,7 @@ type Session struct {
HumanScore *float64
ExecCommand *string
EventCount int
InputBytes int64
}
// SessionLog represents a single log entry for a session.
@@ -76,11 +77,13 @@ type CountryCount struct {
// DashboardFilter contains optional filters for dashboard queries.
type DashboardFilter struct {
Since *time.Time
Until *time.Time
IP string
Country string
Username string
Since *time.Time
Until *time.Time
IP string
Country string
Username string
HumanScoreAboveZero bool
SortBy string
}
// TopEntry represents a value and its count for top-N queries.
@@ -137,6 +140,10 @@ type Store interface {
// If activeOnly is true, only sessions with no disconnected_at are returned.
GetRecentSessions(ctx context.Context, limit int, activeOnly bool) ([]Session, error)
// GetFilteredSessions returns sessions matching the given filter, ordered
// by the filter's SortBy field (default: connected_at DESC).
GetFilteredSessions(ctx context.Context, limit int, activeOnly bool, f DashboardFilter) ([]Session, error)
// GetSession returns a single session by ID.
GetSession(ctx context.Context, sessionID string) (*Session, error)

View File

@@ -700,3 +700,192 @@ func TestGetRecentSessions(t *testing.T) {
})
})
}
func TestInputBytes(t *testing.T) {
testStores(t, func(t *testing.T, newStore storeFactory) {
t.Run("counts only input direction", func(t *testing.T) {
store := newStore(t)
ctx := context.Background()
id, err := store.CreateSession(ctx, "10.0.0.1", "root", "bash", "")
if err != nil {
t.Fatalf("CreateSession: %v", err)
}
now := time.Now().UTC()
events := []SessionEvent{
{SessionID: id, Timestamp: now, Direction: 0, Data: []byte("ls\n")}, // 3 bytes input
{SessionID: id, Timestamp: now.Add(100 * time.Millisecond), Direction: 1, Data: []byte("file1\nfile2\n")}, // 11 bytes output
{SessionID: id, Timestamp: now.Add(200 * time.Millisecond), Direction: 0, Data: []byte("pwd\n")}, // 4 bytes input
}
if err := store.AppendSessionEvents(ctx, events); err != nil {
t.Fatalf("AppendSessionEvents: %v", err)
}
sessions, err := store.GetRecentSessions(ctx, 10, false)
if err != nil {
t.Fatalf("GetRecentSessions: %v", err)
}
if len(sessions) != 1 {
t.Fatalf("len = %d, want 1", len(sessions))
}
// Only direction=0 data: "ls\n" (3) + "pwd\n" (4) = 7
if sessions[0].InputBytes != 7 {
t.Errorf("InputBytes = %d, want 7", sessions[0].InputBytes)
}
})
t.Run("zero when no events", func(t *testing.T) {
store := newStore(t)
ctx := context.Background()
_, err := store.CreateSession(ctx, "10.0.0.1", "root", "bash", "")
if err != nil {
t.Fatalf("CreateSession: %v", err)
}
sessions, err := store.GetRecentSessions(ctx, 10, false)
if err != nil {
t.Fatalf("GetRecentSessions: %v", err)
}
if len(sessions) != 1 {
t.Fatalf("len = %d, want 1", len(sessions))
}
if sessions[0].InputBytes != 0 {
t.Errorf("InputBytes = %d, want 0", sessions[0].InputBytes)
}
})
})
}
func TestGetFilteredSessions(t *testing.T) {
testStores(t, func(t *testing.T, newStore storeFactory) {
t.Run("filter by human score", func(t *testing.T) {
store := newStore(t)
ctx := context.Background()
// Create two sessions, one with human score > 0.
id1, err := store.CreateSession(ctx, "10.0.0.1", "root", "bash", "CN")
if err != nil {
t.Fatalf("CreateSession: %v", err)
}
if err := store.UpdateHumanScore(ctx, id1, 0.75); err != nil {
t.Fatalf("UpdateHumanScore: %v", err)
}
_, err = store.CreateSession(ctx, "10.0.0.2", "admin", "bash", "US")
if err != nil {
t.Fatalf("CreateSession: %v", err)
}
sessions, err := store.GetFilteredSessions(ctx, 50, false, DashboardFilter{HumanScoreAboveZero: true})
if err != nil {
t.Fatalf("GetFilteredSessions: %v", err)
}
if len(sessions) != 1 {
t.Fatalf("len = %d, want 1", len(sessions))
}
if sessions[0].ID != id1 {
t.Errorf("expected session %s, got %s", id1, sessions[0].ID)
}
})
t.Run("sort by input bytes", func(t *testing.T) {
store := newStore(t)
ctx := context.Background()
// Session with more input (created first).
id1, err := store.CreateSession(ctx, "10.0.0.1", "root", "bash", "")
if err != nil {
t.Fatalf("CreateSession: %v", err)
}
now := time.Now().UTC()
if err := store.AppendSessionEvents(ctx, []SessionEvent{
{SessionID: id1, Timestamp: now, Direction: 0, Data: []byte("ls -la /tmp\n")},
{SessionID: id1, Timestamp: now.Add(time.Millisecond), Direction: 0, Data: []byte("cat /etc/passwd\n")},
}); err != nil {
t.Fatalf("AppendSessionEvents: %v", err)
}
// Session with less input (created after id1, so would be first by connected_at).
// Sleep >1s to ensure different RFC3339 timestamps in SQLite.
time.Sleep(1100 * time.Millisecond)
id2, err := store.CreateSession(ctx, "10.0.0.2", "admin", "bash", "")
if err != nil {
t.Fatalf("CreateSession: %v", err)
}
if err := store.AppendSessionEvents(ctx, []SessionEvent{
{SessionID: id2, Timestamp: now.Add(2 * time.Second), Direction: 0, Data: []byte("x\n")},
}); err != nil {
t.Fatalf("AppendSessionEvents: %v", err)
}
// Default sort (connected_at DESC) should show id2 first.
sessions, err := store.GetFilteredSessions(ctx, 50, false, DashboardFilter{})
if err != nil {
t.Fatalf("GetFilteredSessions: %v", err)
}
if len(sessions) != 2 {
t.Fatalf("len = %d, want 2", len(sessions))
}
if sessions[0].ID != id2 {
t.Errorf("default sort: expected %s first, got %s", id2, sessions[0].ID)
}
// Sort by input_bytes should show id1 first (more input).
sessions, err = store.GetFilteredSessions(ctx, 50, false, DashboardFilter{SortBy: "input_bytes"})
if err != nil {
t.Fatalf("GetFilteredSessions: %v", err)
}
if len(sessions) != 2 {
t.Fatalf("len = %d, want 2", len(sessions))
}
if sessions[0].ID != id1 {
t.Errorf("input_bytes sort: expected %s first, got %s", id1, sessions[0].ID)
}
})
t.Run("combined filters", func(t *testing.T) {
store := newStore(t)
ctx := context.Background()
id1, err := store.CreateSession(ctx, "10.0.0.1", "root", "bash", "CN")
if err != nil {
t.Fatalf("CreateSession: %v", err)
}
if err := store.UpdateHumanScore(ctx, id1, 0.5); err != nil {
t.Fatalf("UpdateHumanScore: %v", err)
}
// Different country, also has score.
id2, err := store.CreateSession(ctx, "10.0.0.2", "admin", "bash", "US")
if err != nil {
t.Fatalf("CreateSession: %v", err)
}
if err := store.UpdateHumanScore(ctx, id2, 0.8); err != nil {
t.Fatalf("UpdateHumanScore: %v", err)
}
// Same country CN but no score.
_, err = store.CreateSession(ctx, "10.0.0.3", "test", "bash", "CN")
if err != nil {
t.Fatalf("CreateSession: %v", err)
}
// Filter: CN + human score > 0 -> only id1.
sessions, err := store.GetFilteredSessions(ctx, 50, false, DashboardFilter{
Country: "CN",
HumanScoreAboveZero: true,
})
if err != nil {
t.Fatalf("GetFilteredSessions: %v", err)
}
if len(sessions) != 1 {
t.Fatalf("len = %d, want 1", len(sessions))
}
if sessions[0].ID != id1 {
t.Errorf("expected session %s, got %s", id1, sessions[0].ID)
}
})
})
}