This repository has been archived on 2026-03-10. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
labmcp/internal/mcp/server_test.go
Torjus Håkestad fd40e73f1b feat: add package indexing to MCP index_revision tool
The options server's index_revision now also indexes packages when running
under nixpkgs-search, matching the CLI behavior. The packages server gets
its own index_revision tool for standalone package indexing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-13 22:12:08 +01:00

392 lines
10 KiB
Go

package mcp
import (
"bytes"
"context"
"encoding/json"
"io"
"strings"
"testing"
"git.t-juice.club/torjus/labmcp/internal/database"
"git.t-juice.club/torjus/labmcp/internal/nixos"
"git.t-juice.club/torjus/labmcp/internal/packages"
)
func TestServerInitialize(t *testing.T) {
store := setupTestStore(t)
server := NewServer(store, nil, DefaultNixOSConfig())
input := `{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"test","version":"1.0"}}}`
resp := runRequest(t, server, input)
if resp.Error != nil {
t.Fatalf("Unexpected error: %v", resp.Error)
}
result, ok := resp.Result.(map[string]interface{})
if !ok {
t.Fatalf("Expected map result, got %T", resp.Result)
}
if result["protocolVersion"] != ProtocolVersion {
t.Errorf("protocolVersion = %v, want %v", result["protocolVersion"], ProtocolVersion)
}
serverInfo := result["serverInfo"].(map[string]interface{})
if serverInfo["name"] != "nixos-options" {
t.Errorf("serverInfo.name = %v, want nixos-options", serverInfo["name"])
}
}
func TestServerToolsList(t *testing.T) {
store := setupTestStore(t)
server := NewServer(store, nil, DefaultNixOSConfig())
input := `{"jsonrpc":"2.0","id":1,"method":"tools/list"}`
resp := runRequest(t, server, input)
if resp.Error != nil {
t.Fatalf("Unexpected error: %v", resp.Error)
}
result, ok := resp.Result.(map[string]interface{})
if !ok {
t.Fatalf("Expected map result, got %T", resp.Result)
}
tools, ok := result["tools"].([]interface{})
if !ok {
t.Fatalf("Expected tools array, got %T", result["tools"])
}
// Should have 6 tools
if len(tools) != 6 {
t.Errorf("Expected 6 tools, got %d", len(tools))
}
// Check tool names
expectedTools := map[string]bool{
"search_options": false,
"get_option": false,
"get_file": false,
"index_revision": false,
"list_revisions": false,
"delete_revision": false,
}
for _, tool := range tools {
toolMap := tool.(map[string]interface{})
name := toolMap["name"].(string)
if _, ok := expectedTools[name]; ok {
expectedTools[name] = true
}
}
for name, found := range expectedTools {
if !found {
t.Errorf("Tool %q not found in tools list", name)
}
}
}
func TestServerMethodNotFound(t *testing.T) {
store := setupTestStore(t)
server := NewServer(store, nil, DefaultNixOSConfig())
input := `{"jsonrpc":"2.0","id":1,"method":"unknown/method"}`
resp := runRequest(t, server, input)
if resp.Error == nil {
t.Fatal("Expected error for unknown method")
}
if resp.Error.Code != MethodNotFound {
t.Errorf("Error code = %d, want %d", resp.Error.Code, MethodNotFound)
}
}
func TestServerParseError(t *testing.T) {
store := setupTestStore(t)
server := NewServer(store, nil, DefaultNixOSConfig())
input := `not valid json`
resp := runRequest(t, server, input)
if resp.Error == nil {
t.Fatal("Expected parse error")
}
if resp.Error.Code != ParseError {
t.Errorf("Error code = %d, want %d", resp.Error.Code, ParseError)
}
}
func TestServerNotification(t *testing.T) {
store := setupTestStore(t)
server := NewServer(store, nil, DefaultNixOSConfig())
// Notification (no response expected)
input := `{"jsonrpc":"2.0","method":"notifications/initialized"}`
var output bytes.Buffer
ctx := context.Background()
err := server.Run(ctx, strings.NewReader(input+"\n"), &output)
if err != nil && err != io.EOF {
t.Fatalf("Run failed: %v", err)
}
// Should not produce any output for notifications
if output.Len() > 0 {
t.Errorf("Expected no output for notification, got: %s", output.String())
}
}
func TestPackagesServerToolsList(t *testing.T) {
store := setupTestStore(t)
server := NewServer(store, nil, DefaultNixpkgsPackagesConfig())
pkgIndexer := packages.NewIndexer(store)
server.RegisterPackageHandlers(pkgIndexer)
input := `{"jsonrpc":"2.0","id":1,"method":"tools/list"}`
resp := runRequest(t, server, input)
if resp.Error != nil {
t.Fatalf("Unexpected error: %v", resp.Error)
}
result, ok := resp.Result.(map[string]interface{})
if !ok {
t.Fatalf("Expected map result, got %T", resp.Result)
}
tools, ok := result["tools"].([]interface{})
if !ok {
t.Fatalf("Expected tools array, got %T", result["tools"])
}
// Should have 6 tools (search_packages, get_package, get_file, index_revision, list_revisions, delete_revision)
if len(tools) != 6 {
t.Errorf("Expected 6 tools, got %d", len(tools))
}
expectedTools := map[string]bool{
"search_packages": false,
"get_package": false,
"get_file": false,
"index_revision": false,
"list_revisions": false,
"delete_revision": false,
}
for _, tool := range tools {
toolMap := tool.(map[string]interface{})
name := toolMap["name"].(string)
if _, ok := expectedTools[name]; ok {
expectedTools[name] = true
}
}
for name, found := range expectedTools {
if !found {
t.Errorf("Tool %q not found in tools list", name)
}
}
}
func TestOptionsServerWithPackagesToolsList(t *testing.T) {
store := setupTestStore(t)
server := NewServer(store, nil, DefaultNixOSConfig())
indexer := nixos.NewIndexer(store)
pkgIndexer := packages.NewIndexer(store)
server.RegisterHandlersWithPackages(indexer, pkgIndexer)
input := `{"jsonrpc":"2.0","id":1,"method":"tools/list"}`
resp := runRequest(t, server, input)
if resp.Error != nil {
t.Fatalf("Unexpected error: %v", resp.Error)
}
result, ok := resp.Result.(map[string]interface{})
if !ok {
t.Fatalf("Expected map result, got %T", resp.Result)
}
tools, ok := result["tools"].([]interface{})
if !ok {
t.Fatalf("Expected tools array, got %T", result["tools"])
}
// Should still have 6 tools (same as options-only)
if len(tools) != 6 {
t.Errorf("Expected 6 tools, got %d", len(tools))
}
// Verify index_revision is present
found := false
for _, tool := range tools {
toolMap := tool.(map[string]interface{})
if toolMap["name"].(string) == "index_revision" {
found = true
// For nixpkgs source, description should mention packages
desc := toolMap["description"].(string)
if !strings.Contains(desc, "packages") {
t.Errorf("index_revision description should mention packages, got: %s", desc)
}
break
}
}
if !found {
t.Error("index_revision tool not found in tools list")
}
}
func TestGetFilePathValidation(t *testing.T) {
store := setupTestStore(t)
server := setupTestServer(t, store)
// Create a test revision and file
ctx := context.Background()
rev := &database.Revision{
GitHash: "abc123",
OptionCount: 0,
}
if err := store.CreateRevision(ctx, rev); err != nil {
t.Fatalf("Failed to create revision: %v", err)
}
file := &database.File{
RevisionID: rev.ID,
FilePath: "nixos/modules/test.nix",
Extension: ".nix",
Content: "{ }",
}
if err := store.CreateFile(ctx, file); err != nil {
t.Fatalf("Failed to create file: %v", err)
}
tests := []struct {
name string
path string
wantError bool
errorMsg string
}{
// Valid paths
{"valid relative path", "nixos/modules/test.nix", false, ""},
{"valid simple path", "test.nix", false, ""},
// Path traversal attempts
{"dotdot traversal", "../etc/passwd", true, "directory traversal"},
{"dotdot in middle", "nixos/../../../etc/passwd", true, "directory traversal"},
{"multiple dotdot", "../../etc/passwd", true, "directory traversal"},
// Absolute paths
{"absolute unix path", "/etc/passwd", true, "absolute paths"},
// Cleaned paths that become traversal
{"dot slash dotdot", "./../../etc/passwd", true, "directory traversal"},
// Paths that clean to valid (no error expected, but file won't exist)
{"dotdot at end cleans to valid", "nixos/modules/..", false, ""}, // Cleans to "nixos", which is safe
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
input := `{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"get_file","arguments":{"path":"` + tt.path + `","revision":"abc123"}}}`
resp := runRequest(t, server, input)
result, ok := resp.Result.(map[string]interface{})
if !ok {
t.Fatalf("Expected map result, got %T", resp.Result)
}
isError, _ := result["isError"].(bool)
if tt.wantError {
if !isError {
t.Errorf("Expected error for path %q, got success", tt.path)
} else {
content := result["content"].([]interface{})
text := content[0].(map[string]interface{})["text"].(string)
if tt.errorMsg != "" && !strings.Contains(text, tt.errorMsg) {
t.Errorf("Error message %q doesn't contain %q", text, tt.errorMsg)
}
}
} else {
// For valid paths that don't exist, we expect a "not found" error, not a security error
if isError {
content := result["content"].([]interface{})
text := content[0].(map[string]interface{})["text"].(string)
if strings.Contains(text, "traversal") || strings.Contains(text, "absolute") {
t.Errorf("Got security error for valid path %q: %s", tt.path, text)
}
}
}
})
}
}
// Helper functions
func setupTestStore(t *testing.T) database.Store {
t.Helper()
store, err := database.NewSQLiteStore(":memory:")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
if err := store.Initialize(context.Background()); err != nil {
t.Fatalf("Failed to initialize store: %v", err)
}
t.Cleanup(func() {
store.Close() //nolint:errcheck // test cleanup
})
return store
}
func setupTestServer(t *testing.T, store database.Store) *Server {
t.Helper()
server := NewServer(store, nil, DefaultNixOSConfig())
indexer := nixos.NewIndexer(store)
server.RegisterHandlers(indexer)
return server
}
func runRequest(t *testing.T, server *Server, input string) *Response {
t.Helper()
var output bytes.Buffer
ctx := context.Background()
// Run with input terminated by newline
err := server.Run(ctx, strings.NewReader(input+"\n"), &output)
if err != nil && err != io.EOF {
t.Fatalf("Run failed: %v", err)
}
if output.Len() == 0 {
t.Fatal("Expected response, got empty output")
}
var resp Response
if err := json.Unmarshal(output.Bytes(), &resp); err != nil {
t.Fatalf("Failed to parse response: %v\nOutput: %s", err, output.String())
}
return &resp
}