diff --git a/.mcp.json b/.mcp.json index b895a2e..ea188c3 100644 --- a/.mcp.json +++ b/.mcp.json @@ -37,7 +37,8 @@ ], "env": { "PROMETHEUS_URL": "https://prometheus.home.2rjus.net", - "ALERTMANAGER_URL": "https://alertmanager.home.2rjus.net" + "ALERTMANAGER_URL": "https://alertmanager.home.2rjus.net", + "LOKI_URL": "http://monitoring01.home.2rjus.net:3100" } } } diff --git a/CLAUDE.md b/CLAUDE.md index 0dfd050..3a0fb82 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -21,9 +21,10 @@ Search and query NixOS configuration options. Uses nixpkgs as source. Search and query Home Manager configuration options. Uses home-manager repository as source. ### Lab Monitoring (`lab-monitoring`) -Query Prometheus metrics and Alertmanager alerts. Unlike other servers, this queries live HTTP APIs — no database or indexing needed. -- 8 tools: list/get alerts, search metrics, get metadata, PromQL query, list targets, list/create silences -- Configurable Prometheus and Alertmanager URLs via flags or environment variables +Query Prometheus metrics, Alertmanager alerts, and Loki logs. Unlike other servers, this queries live HTTP APIs — no database or indexing needed. +- 8 core tools: list/get alerts, search metrics, get metadata, PromQL query, list targets, list/create silences +- 3 optional Loki tools (when `LOKI_URL` is set): query_logs, list_labels, list_label_values +- Configurable Prometheus, Alertmanager, and Loki URLs via flags or environment variables The nixpkgs/options/hm servers share a database-backed architecture: - Full-text search across option/package names and descriptions @@ -43,9 +44,9 @@ The nixpkgs/options/hm servers share a database-backed architecture: ## Project Status **Complete and maintained** - All core features implemented: -- Full MCP servers (6 tools each for nixpkgs/options, 8 tools for monitoring) +- Full MCP servers (6 tools each for nixpkgs/options, 8-11 tools for monitoring) - PostgreSQL and SQLite backends with FTS (for nixpkgs/options servers) -- Live API queries for Prometheus/Alertmanager (monitoring server) +- Live API queries for Prometheus/Alertmanager/Loki (monitoring server) - NixOS modules for deployment - CLI for manual operations - Comprehensive test suite @@ -96,9 +97,10 @@ labmcp/ │ │ ├── types.go # Package types, channel aliases │ │ └── *_test.go # Parser tests │ └── monitoring/ -│ ├── types.go # Prometheus/Alertmanager API types +│ ├── types.go # Prometheus/Alertmanager/Loki API types │ ├── prometheus.go # Prometheus HTTP client │ ├── alertmanager.go # Alertmanager HTTP client +│ ├── loki.go # Loki HTTP client │ ├── handlers.go # MCP tool definitions + handlers │ ├── format.go # Markdown formatting utilities │ └── *_test.go # Tests (httptest-based) @@ -152,6 +154,9 @@ labmcp/ | `list_targets` | List scrape targets with health status | | `list_silences` | List active/pending silences | | `create_silence` | Create a silence (confirms with user first) | +| `query_logs` | Execute a LogQL range query against Loki (requires `LOKI_URL`) | +| `list_labels` | List available label names from Loki (requires `LOKI_URL`) | +| `list_label_values` | List values for a specific label from Loki (requires `LOKI_URL`) | ## Key Implementation Details @@ -250,6 +255,10 @@ lab-monitoring alerts --state active # Filter by state lab-monitoring query 'up' # Instant PromQL query lab-monitoring targets # List scrape targets lab-monitoring metrics node # Search metric names +lab-monitoring logs '{job="varlogs"}' # Query logs (requires LOKI_URL) +lab-monitoring logs '{job="nginx"} |= "error"' --start 2h --limit 50 +lab-monitoring labels # List Loki labels +lab-monitoring labels --values job # List values for a label ``` ### Channel Aliases diff --git a/README.md b/README.md index a930463..3ee02f1 100644 --- a/README.md +++ b/README.md @@ -18,13 +18,14 @@ Search and query Home Manager configuration options across multiple home-manager ### Lab Monitoring (`lab-monitoring`) -Query Prometheus metrics and Alertmanager alerts from your monitoring stack. Unlike other servers, this queries live HTTP APIs — no database or indexing needed. +Query Prometheus metrics, Alertmanager alerts, and Loki logs from your monitoring stack. Unlike other servers, this queries live HTTP APIs — no database or indexing needed. - List and inspect alerts from Alertmanager - Execute PromQL queries against Prometheus - Search metric names with metadata - View scrape target health - Manage alert silences +- Query logs via LogQL (when Loki is configured) ### NixOS Options (`nixos-options`) - Legacy @@ -97,7 +98,8 @@ Configure in your MCP client (e.g., Claude Desktop): "args": ["serve"], "env": { "PROMETHEUS_URL": "http://prometheus.example.com:9090", - "ALERTMANAGER_URL": "http://alertmanager.example.com:9093" + "ALERTMANAGER_URL": "http://alertmanager.example.com:9093", + "LOKI_URL": "http://loki.example.com:3100" } } } @@ -135,7 +137,8 @@ Alternatively, if you have Nix installed, you can use the flake directly without "args": ["run", "git+https://git.t-juice.club/torjus/labmcp#lab-monitoring", "--", "serve"], "env": { "PROMETHEUS_URL": "http://prometheus.example.com:9090", - "ALERTMANAGER_URL": "http://alertmanager.example.com:9093" + "ALERTMANAGER_URL": "http://alertmanager.example.com:9093", + "LOKI_URL": "http://loki.example.com:3100" } } } @@ -256,6 +259,15 @@ lab-monitoring targets # Search metrics lab-monitoring metrics node lab-monitoring metrics -n 20 cpu + +# Query logs from Loki (requires LOKI_URL) +lab-monitoring logs '{job="varlogs"}' +lab-monitoring logs '{job="nginx"} |= "error"' --start 2h --limit 50 +lab-monitoring logs '{job="systemd"}' --direction forward + +# List Loki labels +lab-monitoring labels +lab-monitoring labels --values job ``` **Delete an indexed revision:** @@ -276,6 +288,7 @@ hm-options delete release-23.11 | `NIXOS_OPTIONS_DATABASE` | Database connection string for nixos-options (legacy) | `sqlite://nixos-options.db` | | `PROMETHEUS_URL` | Prometheus base URL for lab-monitoring | `http://localhost:9090` | | `ALERTMANAGER_URL` | Alertmanager base URL for lab-monitoring | `http://localhost:9093` | +| `LOKI_URL` | Loki base URL for lab-monitoring (optional, enables log tools) | *(none)* | ### Database Connection Strings @@ -336,6 +349,9 @@ hm-options -d "sqlite://my.db" index hm-unstable | `list_targets` | List scrape targets with health status | | `list_silences` | List active/pending alert silences | | `create_silence` | Create a new alert silence (requires `--enable-silences` flag) | +| `query_logs` | Execute a LogQL range query against Loki (requires `LOKI_URL`) | +| `list_labels` | List available label names from Loki (requires `LOKI_URL`) | +| `list_label_values` | List values for a specific label from Loki (requires `LOKI_URL`) | ## NixOS Modules @@ -489,6 +505,7 @@ Both `options.http` and `packages.http` also support: | `package` | package | from flake | Package to use | | `prometheusUrl` | string | `"http://localhost:9090"` | Prometheus base URL | | `alertmanagerUrl` | string | `"http://localhost:9093"` | Alertmanager base URL | +| `lokiUrl` | nullOr string | `null` | Loki base URL (enables log query tools when set) | | `enableSilences` | bool | `false` | Enable the create_silence tool (write operation) | | `http.address` | string | `"127.0.0.1:8084"` | HTTP listen address | | `http.endpoint` | string | `"/mcp"` | HTTP endpoint path | diff --git a/TODO.md b/TODO.md index 347191e..920bddc 100644 --- a/TODO.md +++ b/TODO.md @@ -21,7 +21,7 @@ - [ ] Add `list_rules` tool - list Prometheus alerting and recording rules (via `/api/v1/rules`) - [ ] Add `get_rule_group` tool - get details for a specific rule group -- [ ] Add Loki log query support - query logs via LogQL (via `/loki/api/v1/query_range`), enabling log correlation when investigating alerts (e.g., fetch logs for a specific instance/job around the time an alert fired) +- [x] Add Loki log query support - query logs via LogQL (3 tools: `query_logs`, `list_labels`, `list_label_values`), opt-in via `LOKI_URL` ## Nice to Have diff --git a/cmd/lab-monitoring/main.go b/cmd/lab-monitoring/main.go index 263d14a..c1e080c 100644 --- a/cmd/lab-monitoring/main.go +++ b/cmd/lab-monitoring/main.go @@ -15,7 +15,7 @@ import ( "git.t-juice.club/torjus/labmcp/internal/monitoring" ) -const version = "0.1.0" +const version = "0.2.0" func main() { app := &cli.App{ @@ -35,6 +35,11 @@ func main() { EnvVars: []string{"ALERTMANAGER_URL"}, Value: "http://localhost:9093", }, + &cli.StringFlag{ + Name: "loki-url", + Usage: "Loki base URL (optional, enables log query tools)", + EnvVars: []string{"LOKI_URL"}, + }, }, Commands: []*cli.Command{ serveCommand(), @@ -42,6 +47,8 @@ func main() { queryCommand(), targetsCommand(), metricsCommand(), + logsCommand(), + labelsCommand(), }, } @@ -176,6 +183,11 @@ func runServe(c *cli.Context) error { prom := monitoring.NewPrometheusClient(c.String("prometheus-url")) am := monitoring.NewAlertmanagerClient(c.String("alertmanager-url")) + var loki *monitoring.LokiClient + if lokiURL := c.String("loki-url"); lokiURL != "" { + loki = monitoring.NewLokiClient(lokiURL) + } + config.InstructionsFunc = func() string { return monitoring.AlertSummary(am) } @@ -184,7 +196,7 @@ func runServe(c *cli.Context) error { opts := monitoring.HandlerOptions{ EnableSilences: c.Bool("enable-silences"), } - monitoring.RegisterHandlers(server, prom, am, opts) + monitoring.RegisterHandlers(server, prom, am, loki, opts) transport := c.String("transport") switch transport { @@ -347,6 +359,199 @@ func runMetrics(c *cli.Context, query string) error { return nil } +func logsCommand() *cli.Command { + return &cli.Command{ + Name: "logs", + Usage: "Query logs from Loki using LogQL", + ArgsUsage: "", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "start", + Usage: "Start time: relative duration (e.g., '1h'), RFC3339, or Unix epoch", + Value: "1h", + }, + &cli.StringFlag{ + Name: "end", + Usage: "End time: relative duration, RFC3339, or Unix epoch", + Value: "now", + }, + &cli.IntFlag{ + Name: "limit", + Aliases: []string{"n"}, + Usage: "Maximum number of entries", + Value: 100, + }, + &cli.StringFlag{ + Name: "direction", + Usage: "Sort order: 'backward' (newest first) or 'forward' (oldest first)", + Value: "backward", + }, + }, + Action: func(c *cli.Context) error { + if c.NArg() < 1 { + return fmt.Errorf("LogQL expression required") + } + return runLogs(c, c.Args().First()) + }, + } +} + +func labelsCommand() *cli.Command { + return &cli.Command{ + Name: "labels", + Usage: "List labels from Loki, or values for a specific label", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "values", + Usage: "Get values for this label name instead of listing labels", + }, + }, + Action: func(c *cli.Context) error { + return runLabels(c) + }, + } +} + +func runLogs(c *cli.Context, logql string) error { + lokiURL := c.String("loki-url") + if lokiURL == "" { + return fmt.Errorf("--loki-url or LOKI_URL is required for log queries") + } + + ctx := context.Background() + loki := monitoring.NewLokiClient(lokiURL) + + now := time.Now() + start, err := parseCLITime(c.String("start"), now.Add(-time.Hour)) + if err != nil { + return fmt.Errorf("invalid start time: %w", err) + } + end, err := parseCLITime(c.String("end"), now) + if err != nil { + return fmt.Errorf("invalid end time: %w", err) + } + + data, err := loki.QueryRange(ctx, logql, start, end, c.Int("limit"), c.String("direction")) + if err != nil { + return fmt.Errorf("log query failed: %w", err) + } + + totalEntries := 0 + for _, stream := range data.Result { + totalEntries += len(stream.Values) + } + + if totalEntries == 0 { + fmt.Println("No log entries found.") + return nil + } + + for _, stream := range data.Result { + // Print stream labels + labels := "" + for k, v := range stream.Stream { + if labels != "" { + labels += ", " + } + labels += fmt.Sprintf("%s=%q", k, v) + } + fmt.Printf("--- {%s} ---\n", labels) + + for _, entry := range stream.Values { + ts := formatCLITimestamp(entry[0]) + fmt.Printf("[%s] %s\n", ts, entry[1]) + } + fmt.Println() + } + + return nil +} + +func runLabels(c *cli.Context) error { + lokiURL := c.String("loki-url") + if lokiURL == "" { + return fmt.Errorf("--loki-url or LOKI_URL is required for label queries") + } + + ctx := context.Background() + loki := monitoring.NewLokiClient(lokiURL) + + if label := c.String("values"); label != "" { + values, err := loki.LabelValues(ctx, label) + if err != nil { + return fmt.Errorf("failed to list label values: %w", err) + } + if len(values) == 0 { + fmt.Printf("No values found for label '%s'.\n", label) + return nil + } + for _, v := range values { + fmt.Println(v) + } + return nil + } + + labels, err := loki.Labels(ctx) + if err != nil { + return fmt.Errorf("failed to list labels: %w", err) + } + if len(labels) == 0 { + fmt.Println("No labels found.") + return nil + } + for _, label := range labels { + fmt.Println(label) + } + return nil +} + +// parseCLITime parses a time string for CLI use. Handles "now", relative durations, +// RFC3339, and Unix epoch seconds. +func parseCLITime(s string, defaultTime time.Time) (time.Time, error) { + if s == "now" || s == "" { + return time.Now(), nil + } + + // Try as relative duration + if d, err := time.ParseDuration(s); err == nil { + return time.Now().Add(-d), nil + } + + // Try as RFC3339 + if t, err := time.Parse(time.RFC3339, s); err == nil { + return t, nil + } + + // Try as Unix epoch seconds + var epoch int64 + validDigits := true + for _, c := range s { + if c >= '0' && c <= '9' { + epoch = epoch*10 + int64(c-'0') + } else { + validDigits = false + break + } + } + if validDigits && len(s) > 0 { + return time.Unix(epoch, 0), nil + } + + return defaultTime, fmt.Errorf("cannot parse time '%s'", s) +} + +// formatCLITimestamp converts a nanosecond Unix timestamp string to a readable format. +func formatCLITimestamp(nsStr string) string { + var ns int64 + for _, c := range nsStr { + if c >= '0' && c <= '9' { + ns = ns*10 + int64(c-'0') + } + } + t := time.Unix(0, ns) + return t.Local().Format("2006-01-02 15:04:05") +} + func containsIgnoreCase(s, substr string) bool { sLower := make([]byte, len(s)) subLower := make([]byte, len(substr)) diff --git a/internal/mcp/server.go b/internal/mcp/server.go index 2c6c055..b49fd3b 100644 --- a/internal/mcp/server.go +++ b/internal/mcp/server.go @@ -83,7 +83,7 @@ This ensures package information matches the nixpkgs version the project actuall func DefaultMonitoringConfig() ServerConfig { return ServerConfig{ Name: "lab-monitoring", - Version: "0.1.0", + Version: "0.2.0", Mode: ModeCustom, Instructions: `Lab Monitoring MCP Server - Query Prometheus metrics and Alertmanager alerts. @@ -92,8 +92,9 @@ Tools for querying your monitoring stack: - List and inspect alerts from Alertmanager - View scrape target health status - Manage alert silences +- Query logs via LogQL (when Loki is configured) -All queries are executed against live Prometheus and Alertmanager HTTP APIs.`, +All queries are executed against live Prometheus, Alertmanager, and Loki HTTP APIs.`, } } diff --git a/internal/monitoring/format.go b/internal/monitoring/format.go index a5d4b66..4ccba43 100644 --- a/internal/monitoring/format.go +++ b/internal/monitoring/format.go @@ -294,6 +294,126 @@ func formatMetricSearch(names []string, metadata map[string][]PromMetadata) stri return sb.String() } +const maxLabelValues = 100 +const maxLineLength = 500 + +// formatLogStreams formats Loki log query results as grouped markdown. +func formatLogStreams(data *LokiQueryData) string { + if data == nil || len(data.Result) == 0 { + return "No log results." + } + + var sb strings.Builder + + totalEntries := 0 + for _, s := range data.Result { + totalEntries += len(s.Values) + } + sb.WriteString(fmt.Sprintf("**%d stream(s), %d total log entries**\n\n", len(data.Result), totalEntries)) + + for _, stream := range data.Result { + // Stream labels header + var labels []string + for k, v := range stream.Stream { + labels = append(labels, fmt.Sprintf("%s=%q", k, v)) + } + sort.Strings(labels) + sb.WriteString(fmt.Sprintf("## {%s}\n\n", strings.Join(labels, ", "))) + + if len(stream.Values) == 0 { + sb.WriteString("No entries.\n\n") + continue + } + + sb.WriteString("| Timestamp | Log Line |\n") + sb.WriteString("| --- | --- |\n") + + truncated := false + for i, entry := range stream.Values { + if i >= maxRows { + truncated = true + break + } + + ts := formatNanosecondTimestamp(entry[0]) + line := entry[1] + if len(line) > maxLineLength { + line = line[:maxLineLength] + "..." + } + // Escape pipe characters in log lines for markdown table + line = strings.ReplaceAll(line, "|", "\\|") + // Replace newlines with spaces for table compatibility + line = strings.ReplaceAll(line, "\n", " ") + + sb.WriteString(fmt.Sprintf("| %s | %s |\n", ts, line)) + } + + if truncated { + sb.WriteString(fmt.Sprintf("\n*Showing %d of %d entries (truncated)*\n", maxRows, len(stream.Values))) + } + sb.WriteString("\n") + } + + return sb.String() +} + +// formatLabels formats a list of label names as a bullet list. +func formatLabels(labels []string) string { + if len(labels) == 0 { + return "No labels found." + } + + sort.Strings(labels) + + var sb strings.Builder + sb.WriteString(fmt.Sprintf("**%d label(s)**\n\n", len(labels))) + + for _, label := range labels { + sb.WriteString(fmt.Sprintf("- `%s`\n", label)) + } + + return sb.String() +} + +// formatLabelValues formats label values as a bullet list. +func formatLabelValues(label string, values []string) string { + if len(values) == 0 { + return fmt.Sprintf("No values found for label '%s'.", label) + } + + sort.Strings(values) + + var sb strings.Builder + sb.WriteString(fmt.Sprintf("**%d value(s) for label `%s`**\n\n", len(values), label)) + + truncated := false + for i, v := range values { + if i >= maxLabelValues { + truncated = true + break + } + sb.WriteString(fmt.Sprintf("- `%s`\n", v)) + } + + if truncated { + sb.WriteString(fmt.Sprintf("\n*Showing %d of %d values (truncated)*\n", maxLabelValues, len(values))) + } + + return sb.String() +} + +// formatNanosecondTimestamp converts a nanosecond Unix timestamp string to RFC3339. +func formatNanosecondTimestamp(nsStr string) string { + var ns int64 + for _, c := range nsStr { + if c >= '0' && c <= '9' { + ns = ns*10 + int64(c-'0') + } + } + t := time.Unix(0, ns) + return t.UTC().Format(time.RFC3339) +} + // formatMetricMetadata formats metadata for a single metric. func formatMetricMetadata(name string, metas []PromMetadata) string { if len(metas) == 0 { diff --git a/internal/monitoring/handlers.go b/internal/monitoring/handlers.go index 419c3ac..e50e8db 100644 --- a/internal/monitoring/handlers.go +++ b/internal/monitoring/handlers.go @@ -62,7 +62,7 @@ type HandlerOptions struct { } // RegisterHandlers registers all monitoring tool handlers on the MCP server. -func RegisterHandlers(server *mcp.Server, prom *PrometheusClient, am *AlertmanagerClient, opts HandlerOptions) { +func RegisterHandlers(server *mcp.Server, prom *PrometheusClient, am *AlertmanagerClient, loki *LokiClient, opts HandlerOptions) { server.RegisterTool(listAlertsTool(), makeListAlertsHandler(am)) server.RegisterTool(getAlertTool(), makeGetAlertHandler(am)) server.RegisterTool(searchMetricsTool(), makeSearchMetricsHandler(prom)) @@ -73,6 +73,11 @@ func RegisterHandlers(server *mcp.Server, prom *PrometheusClient, am *Alertmanag if opts.EnableSilences { server.RegisterTool(createSilenceTool(), makeCreateSilenceHandler(am)) } + if loki != nil { + server.RegisterTool(queryLogsTool(), makeQueryLogsHandler(loki)) + server.RegisterTool(listLabelsTool(), makeListLabelsHandler(loki)) + server.RegisterTool(listLabelValuesTool(), makeListLabelValuesHandler(loki)) + } } // Tool definitions @@ -485,3 +490,183 @@ func makeCreateSilenceHandler(am *AlertmanagerClient) mcp.ToolHandler { func parseJSON(s string, v interface{}) error { return json.Unmarshal([]byte(s), v) } + +// Loki tool definitions + +func queryLogsTool() mcp.Tool { + return mcp.Tool{ + Name: "query_logs", + Description: "Execute a LogQL range query against Loki to search and retrieve log entries", + InputSchema: mcp.InputSchema{ + Type: "object", + Properties: map[string]mcp.Property{ + "logql": { + Type: "string", + Description: `LogQL query expression (e.g., '{job="varlogs"}', '{job="nginx"} |= "error"')`, + }, + "start": { + Type: "string", + Description: "Start time: relative duration (e.g., '1h', '30m'), RFC3339 timestamp, or Unix epoch seconds. Default: 1h ago", + }, + "end": { + Type: "string", + Description: "End time: relative duration (e.g., '5m'), RFC3339 timestamp, or Unix epoch seconds. Default: now", + }, + "limit": { + Type: "integer", + Description: "Maximum number of log entries to return (default: 100)", + Default: 100, + }, + "direction": { + Type: "string", + Description: "Sort order for log entries: 'backward' (newest first) or 'forward' (oldest first)", + Enum: []string{"backward", "forward"}, + }, + }, + Required: []string{"logql"}, + }, + } +} + +func listLabelsTool() mcp.Tool { + return mcp.Tool{ + Name: "list_labels", + Description: "List available label names from Loki", + InputSchema: mcp.InputSchema{ + Type: "object", + Properties: map[string]mcp.Property{}, + }, + } +} + +func listLabelValuesTool() mcp.Tool { + return mcp.Tool{ + Name: "list_label_values", + Description: "List values for a specific label from Loki", + InputSchema: mcp.InputSchema{ + Type: "object", + Properties: map[string]mcp.Property{ + "label": { + Type: "string", + Description: "Label name to get values for (e.g., 'job', 'instance')", + }, + }, + Required: []string{"label"}, + }, + } +} + +// Loki handler constructors + +func makeQueryLogsHandler(loki *LokiClient) mcp.ToolHandler { + return func(ctx context.Context, args map[string]interface{}) (mcp.CallToolResult, error) { + logql, _ := args["logql"].(string) + if logql == "" { + return mcp.ErrorContent(fmt.Errorf("logql is required")), nil + } + + now := time.Now() + start := now.Add(-time.Hour) + end := now + + if startStr, ok := args["start"].(string); ok && startStr != "" { + parsed, err := parseTimeArg(startStr, now.Add(-time.Hour)) + if err != nil { + return mcp.ErrorContent(fmt.Errorf("invalid start time: %w", err)), nil + } + start = parsed + } + + if endStr, ok := args["end"].(string); ok && endStr != "" { + parsed, err := parseTimeArg(endStr, now) + if err != nil { + return mcp.ErrorContent(fmt.Errorf("invalid end time: %w", err)), nil + } + end = parsed + } + + limit := 100 + if l, ok := args["limit"].(float64); ok && l > 0 { + limit = int(l) + } + + direction := "backward" + if d, ok := args["direction"].(string); ok && d != "" { + direction = d + } + + data, err := loki.QueryRange(ctx, logql, start, end, limit, direction) + if err != nil { + return mcp.ErrorContent(fmt.Errorf("log query failed: %w", err)), nil + } + + return mcp.CallToolResult{ + Content: []mcp.Content{mcp.TextContent(formatLogStreams(data))}, + }, nil + } +} + +func makeListLabelsHandler(loki *LokiClient) mcp.ToolHandler { + return func(ctx context.Context, args map[string]interface{}) (mcp.CallToolResult, error) { + labels, err := loki.Labels(ctx) + if err != nil { + return mcp.ErrorContent(fmt.Errorf("failed to list labels: %w", err)), nil + } + + return mcp.CallToolResult{ + Content: []mcp.Content{mcp.TextContent(formatLabels(labels))}, + }, nil + } +} + +func makeListLabelValuesHandler(loki *LokiClient) mcp.ToolHandler { + return func(ctx context.Context, args map[string]interface{}) (mcp.CallToolResult, error) { + label, _ := args["label"].(string) + if label == "" { + return mcp.ErrorContent(fmt.Errorf("label is required")), nil + } + + values, err := loki.LabelValues(ctx, label) + if err != nil { + return mcp.ErrorContent(fmt.Errorf("failed to list label values: %w", err)), nil + } + + return mcp.CallToolResult{ + Content: []mcp.Content{mcp.TextContent(formatLabelValues(label, values))}, + }, nil + } +} + +// parseTimeArg parses a time argument that can be: +// - A relative duration (e.g., "1h", "30m", "2h30m") — interpreted as that duration ago from now +// - An RFC3339 timestamp (e.g., "2024-01-15T10:30:00Z") +// - A Unix epoch in seconds (e.g., "1705312200") +// If parsing fails, returns the provided default time. +func parseTimeArg(s string, defaultTime time.Time) (time.Time, error) { + // Try as relative duration first + if d, err := time.ParseDuration(s); err == nil { + return time.Now().Add(-d), nil + } + + // Try as RFC3339 + if t, err := time.Parse(time.RFC3339, s); err == nil { + return t, nil + } + + // Try as Unix epoch seconds + var epoch int64 + validDigits := true + for _, c := range s { + if c >= '0' && c <= '9' { + epoch = epoch*10 + int64(c-'0') + } else { + validDigits = false + break + } + } + if validDigits && len(s) > 0 { + return time.Unix(epoch, 0), nil + } + + return defaultTime, fmt.Errorf("cannot parse time '%s': use relative duration (e.g., '1h'), RFC3339, or Unix epoch seconds", s) +} diff --git a/internal/monitoring/handlers_test.go b/internal/monitoring/handlers_test.go index 371cc12..ef6b0f5 100644 --- a/internal/monitoring/handlers_test.go +++ b/internal/monitoring/handlers_test.go @@ -14,7 +14,7 @@ import ( ) // setupTestServer creates a test MCP server with monitoring handlers backed by test HTTP servers. -func setupTestServer(t *testing.T, promHandler, amHandler http.HandlerFunc) (*mcp.Server, func()) { +func setupTestServer(t *testing.T, promHandler, amHandler http.HandlerFunc, lokiHandler ...http.HandlerFunc) (*mcp.Server, func()) { t.Helper() promSrv := httptest.NewServer(promHandler) @@ -26,11 +26,22 @@ func setupTestServer(t *testing.T, promHandler, amHandler http.HandlerFunc) (*mc prom := NewPrometheusClient(promSrv.URL) am := NewAlertmanagerClient(amSrv.URL) - RegisterHandlers(server, prom, am, HandlerOptions{EnableSilences: true}) + + var loki *LokiClient + var lokiSrv *httptest.Server + if len(lokiHandler) > 0 && lokiHandler[0] != nil { + lokiSrv = httptest.NewServer(lokiHandler[0]) + loki = NewLokiClient(lokiSrv.URL) + } + + RegisterHandlers(server, prom, am, loki, HandlerOptions{EnableSilences: true}) cleanup := func() { promSrv.Close() amSrv.Close() + if lokiSrv != nil { + lokiSrv.Close() + } } return server, cleanup @@ -305,8 +316,9 @@ func TestHandler_ToolCount(t *testing.T) { defer cleanup() tools := listTools(t, server) + // Without Loki: 7 base + 1 silence = 8 if len(tools) != 8 { - t.Errorf("expected 8 tools with silences enabled, got %d", len(tools)) + t.Errorf("expected 8 tools with silences enabled (no Loki), got %d", len(tools)) for _, tool := range tools { t.Logf(" tool: %s", tool.Name) } @@ -325,6 +337,37 @@ func TestHandler_ToolCount(t *testing.T) { } } +func TestHandler_ToolCountWithLoki(t *testing.T) { + server, cleanup := setupTestServer(t, + func(w http.ResponseWriter, r *http.Request) {}, + func(w http.ResponseWriter, r *http.Request) {}, + func(w http.ResponseWriter, r *http.Request) {}, + ) + defer cleanup() + + tools := listTools(t, server) + // With Loki: 7 base + 1 silence + 3 loki = 11 + if len(tools) != 11 { + t.Errorf("expected 11 tools with silences and Loki enabled, got %d", len(tools)) + for _, tool := range tools { + t.Logf(" tool: %s", tool.Name) + } + } + + // Verify Loki tools are present + lokiTools := map[string]bool{"query_logs": false, "list_labels": false, "list_label_values": false} + for _, tool := range tools { + if _, ok := lokiTools[tool.Name]; ok { + lokiTools[tool.Name] = true + } + } + for name, found := range lokiTools { + if !found { + t.Errorf("expected %s tool when Loki enabled", name) + } + } +} + func TestHandler_ToolCountWithoutSilences(t *testing.T) { promSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) amSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) @@ -337,7 +380,7 @@ func TestHandler_ToolCountWithoutSilences(t *testing.T) { prom := NewPrometheusClient(promSrv.URL) am := NewAlertmanagerClient(amSrv.URL) - RegisterHandlers(server, prom, am, HandlerOptions{EnableSilences: false}) + RegisterHandlers(server, prom, am, nil, HandlerOptions{EnableSilences: false}) tools := listTools(t, server) if len(tools) != 7 { @@ -384,6 +427,110 @@ func listTools(t *testing.T, server *mcp.Server) []mcp.Tool { return listResult.Tools } +func TestHandler_QueryLogs(t *testing.T) { + server, cleanup := setupTestServer(t, + nil, + nil, + func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/loki/api/v1/query_range" { + http.NotFound(w, r) + return + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{ + "status": "success", + "data": { + "resultType": "streams", + "result": [ + { + "stream": {"job": "varlogs", "filename": "/var/log/syslog"}, + "values": [ + ["1704067200000000000", "Jan 1 00:00:00 host kernel: test message"] + ] + } + ] + } + }`)) + }, + ) + defer cleanup() + + result := callTool(t, server, "query_logs", map[string]interface{}{ + "logql": `{job="varlogs"}`, + }) + if result.IsError { + t.Fatalf("unexpected error: %s", result.Content[0].Text) + } + if !strings.Contains(result.Content[0].Text, "varlogs") { + t.Errorf("expected output to contain 'varlogs', got: %s", result.Content[0].Text) + } + if !strings.Contains(result.Content[0].Text, "test message") { + t.Errorf("expected output to contain 'test message', got: %s", result.Content[0].Text) + } +} + +func TestHandler_ListLabels(t *testing.T) { + server, cleanup := setupTestServer(t, + nil, + nil, + func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/loki/api/v1/labels" { + http.NotFound(w, r) + return + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{ + "status": "success", + "data": ["job", "instance", "filename"] + }`)) + }, + ) + defer cleanup() + + result := callTool(t, server, "list_labels", map[string]interface{}{}) + if result.IsError { + t.Fatalf("unexpected error: %s", result.Content[0].Text) + } + if !strings.Contains(result.Content[0].Text, "3 label") { + t.Errorf("expected output to contain '3 label', got: %s", result.Content[0].Text) + } + if !strings.Contains(result.Content[0].Text, "job") { + t.Errorf("expected output to contain 'job', got: %s", result.Content[0].Text) + } +} + +func TestHandler_ListLabelValues(t *testing.T) { + server, cleanup := setupTestServer(t, + nil, + nil, + func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/loki/api/v1/label/job/values" { + http.NotFound(w, r) + return + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{ + "status": "success", + "data": ["varlogs", "nginx", "systemd"] + }`)) + }, + ) + defer cleanup() + + result := callTool(t, server, "list_label_values", map[string]interface{}{ + "label": "job", + }) + if result.IsError { + t.Fatalf("unexpected error: %s", result.Content[0].Text) + } + if !strings.Contains(result.Content[0].Text, "3 value") { + t.Errorf("expected output to contain '3 value', got: %s", result.Content[0].Text) + } + if !strings.Contains(result.Content[0].Text, "nginx") { + t.Errorf("expected output to contain 'nginx', got: %s", result.Content[0].Text) + } +} + // callTool is a test helper that calls a tool through the MCP server. func callTool(t *testing.T, server *mcp.Server, name string, args map[string]interface{}) mcp.CallToolResult { t.Helper() diff --git a/internal/monitoring/loki.go b/internal/monitoring/loki.go new file mode 100644 index 0000000..4ea4bbf --- /dev/null +++ b/internal/monitoring/loki.go @@ -0,0 +1,122 @@ +package monitoring + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" + "time" +) + +// LokiClient is an HTTP client for the Loki API. +type LokiClient struct { + baseURL string + httpClient *http.Client +} + +// NewLokiClient creates a new Loki API client. +func NewLokiClient(baseURL string) *LokiClient { + return &LokiClient{ + baseURL: strings.TrimRight(baseURL, "/"), + httpClient: &http.Client{ + Timeout: 30 * time.Second, + }, + } +} + +// QueryRange executes a LogQL range query against Loki. +func (c *LokiClient) QueryRange(ctx context.Context, logql string, start, end time.Time, limit int, direction string) (*LokiQueryData, error) { + params := url.Values{} + params.Set("query", logql) + params.Set("start", fmt.Sprintf("%d", start.UnixNano())) + params.Set("end", fmt.Sprintf("%d", end.UnixNano())) + if limit > 0 { + params.Set("limit", fmt.Sprintf("%d", limit)) + } + if direction != "" { + params.Set("direction", direction) + } + + body, err := c.get(ctx, "/loki/api/v1/query_range", params) + if err != nil { + return nil, fmt.Errorf("query range failed: %w", err) + } + + var data LokiQueryData + if err := json.Unmarshal(body, &data); err != nil { + return nil, fmt.Errorf("failed to parse query data: %w", err) + } + return &data, nil +} + +// Labels returns all available label names from Loki. +func (c *LokiClient) Labels(ctx context.Context) ([]string, error) { + body, err := c.get(ctx, "/loki/api/v1/labels", nil) + if err != nil { + return nil, fmt.Errorf("labels failed: %w", err) + } + + var labels []string + if err := json.Unmarshal(body, &labels); err != nil { + return nil, fmt.Errorf("failed to parse labels: %w", err) + } + return labels, nil +} + +// LabelValues returns all values for a given label name from Loki. +func (c *LokiClient) LabelValues(ctx context.Context, label string) ([]string, error) { + path := fmt.Sprintf("/loki/api/v1/label/%s/values", url.PathEscape(label)) + body, err := c.get(ctx, path, nil) + if err != nil { + return nil, fmt.Errorf("label values failed: %w", err) + } + + var values []string + if err := json.Unmarshal(body, &values); err != nil { + return nil, fmt.Errorf("failed to parse label values: %w", err) + } + return values, nil +} + +// get performs a GET request and returns the "data" field from the Loki response envelope. +// Loki uses the same {"status":"success","data":...} format as Prometheus. +func (c *LokiClient) get(ctx context.Context, path string, params url.Values) (json.RawMessage, error) { + u := c.baseURL + path + if len(params) > 0 { + u += "?" + params.Encode() + } + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("request failed: %w", err) + } + defer resp.Body.Close() //nolint:errcheck // cleanup on exit + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response: %w", err) + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status %d: %s", resp.StatusCode, string(body)) + } + + var promResp PromResponse + if err := json.Unmarshal(body, &promResp); err != nil { + return nil, fmt.Errorf("failed to parse response: %w", err) + } + + if promResp.Status != "success" { + return nil, fmt.Errorf("loki error (%s): %s", promResp.ErrorType, promResp.Error) + } + + return promResp.Data, nil +} diff --git a/internal/monitoring/loki_test.go b/internal/monitoring/loki_test.go new file mode 100644 index 0000000..995c321 --- /dev/null +++ b/internal/monitoring/loki_test.go @@ -0,0 +1,162 @@ +package monitoring + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +func TestLokiClient_QueryRange(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/loki/api/v1/query_range" { + t.Errorf("unexpected path: %s", r.URL.Path) + } + if r.URL.Query().Get("query") != `{job="varlogs"}` { + t.Errorf("unexpected query param: %s", r.URL.Query().Get("query")) + } + if r.URL.Query().Get("direction") != "backward" { + t.Errorf("unexpected direction: %s", r.URL.Query().Get("direction")) + } + if r.URL.Query().Get("limit") != "10" { + t.Errorf("unexpected limit: %s", r.URL.Query().Get("limit")) + } + + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{ + "status": "success", + "data": { + "resultType": "streams", + "result": [ + { + "stream": {"job": "varlogs", "filename": "/var/log/syslog"}, + "values": [ + ["1234567890000000000", "line 1"], + ["1234567891000000000", "line 2"] + ] + } + ] + } + }`)) + })) + defer srv.Close() + + client := NewLokiClient(srv.URL) + start := time.Unix(0, 1234567890000000000) + end := time.Unix(0, 1234567899000000000) + data, err := client.QueryRange(context.Background(), `{job="varlogs"}`, start, end, 10, "backward") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if data.ResultType != "streams" { + t.Errorf("expected resultType=streams, got %s", data.ResultType) + } + if len(data.Result) != 1 { + t.Fatalf("expected 1 stream, got %d", len(data.Result)) + } + if data.Result[0].Stream["job"] != "varlogs" { + t.Errorf("expected job=varlogs, got %s", data.Result[0].Stream["job"]) + } + if len(data.Result[0].Values) != 2 { + t.Fatalf("expected 2 entries, got %d", len(data.Result[0].Values)) + } + if data.Result[0].Values[0][1] != "line 1" { + t.Errorf("expected first line='line 1', got %s", data.Result[0].Values[0][1]) + } +} + +func TestLokiClient_QueryRangeError(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{ + "status": "error", + "errorType": "bad_data", + "error": "invalid LogQL query" + }`)) + })) + defer srv.Close() + + client := NewLokiClient(srv.URL) + _, err := client.QueryRange(context.Background(), "invalid{", time.Now().Add(-time.Hour), time.Now(), 100, "backward") + if err == nil { + t.Fatal("expected error, got nil") + } + if !contains(err.Error(), "invalid LogQL query") { + t.Errorf("expected error to contain 'invalid LogQL query', got: %s", err.Error()) + } +} + +func TestLokiClient_Labels(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/loki/api/v1/labels" { + t.Errorf("unexpected path: %s", r.URL.Path) + } + + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{ + "status": "success", + "data": ["job", "instance", "filename"] + }`)) + })) + defer srv.Close() + + client := NewLokiClient(srv.URL) + labels, err := client.Labels(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(labels) != 3 { + t.Fatalf("expected 3 labels, got %d", len(labels)) + } + if labels[0] != "job" { + t.Errorf("expected first label=job, got %s", labels[0]) + } +} + +func TestLokiClient_LabelValues(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/loki/api/v1/label/job/values" { + t.Errorf("unexpected path: %s, expected /loki/api/v1/label/job/values", r.URL.Path) + } + + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{ + "status": "success", + "data": ["varlogs", "nginx", "systemd"] + }`)) + })) + defer srv.Close() + + client := NewLokiClient(srv.URL) + values, err := client.LabelValues(context.Background(), "job") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(values) != 3 { + t.Fatalf("expected 3 values, got %d", len(values)) + } + if values[0] != "varlogs" { + t.Errorf("expected first value=varlogs, got %s", values[0]) + } +} + +func TestLokiClient_HTTPError(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte("internal error")) + })) + defer srv.Close() + + client := NewLokiClient(srv.URL) + _, err := client.QueryRange(context.Background(), `{job="test"}`, time.Now().Add(-time.Hour), time.Now(), 100, "backward") + if err == nil { + t.Fatal("expected error, got nil") + } + if !contains(err.Error(), "500") { + t.Errorf("expected error to contain status code, got: %s", err.Error()) + } +} diff --git a/internal/monitoring/types.go b/internal/monitoring/types.go index 002700f..a2a13e0 100644 --- a/internal/monitoring/types.go +++ b/internal/monitoring/types.go @@ -118,3 +118,20 @@ type Matcher struct { IsRegex bool `json:"isRegex"` IsEqual *bool `json:"isEqual,omitempty"` } + +// Loki API response types + +// LokiQueryData represents the data field for Loki query results. +type LokiQueryData struct { + ResultType string `json:"resultType"` + Result []LokiStream `json:"result"` +} + +// LokiStream represents a single log stream with its entries. +type LokiStream struct { + Stream map[string]string `json:"stream"` + Values []LokiEntry `json:"values"` +} + +// LokiEntry represents a log entry as [nanosecond_timestamp, log_line]. +type LokiEntry [2]string diff --git a/nix/lab-monitoring-module.nix b/nix/lab-monitoring-module.nix index b6f8d94..3e3e8ee 100644 --- a/nix/lab-monitoring-module.nix +++ b/nix/lab-monitoring-module.nix @@ -33,6 +33,12 @@ in description = "Alertmanager base URL."; }; + lokiUrl = lib.mkOption { + type = lib.types.nullOr lib.types.str; + default = null; + description = "Loki base URL. When set, enables log query tools (query_logs, list_labels, list_label_values)."; + }; + enableSilences = lib.mkOption { type = lib.types.bool; default = false; @@ -104,6 +110,8 @@ in environment = { PROMETHEUS_URL = cfg.prometheusUrl; ALERTMANAGER_URL = cfg.alertmanagerUrl; + } // lib.optionalAttrs (cfg.lokiUrl != null) { + LOKI_URL = cfg.lokiUrl; }; script = let