From a75393d1c5df52ea1d88ce96c1f2a44302dfbb43 Mon Sep 17 00:00:00 2001 From: rcourtman Date: Wed, 28 Jan 2026 21:21:28 +0000 Subject: [PATCH] refactor(ai): consolidate tool implementations into domain-specific files - Merge tools_infrastructure.go, tools_intelligence.go, tools_patrol.go, tools_profiles.go into their respective domain tools - Expand tools_control.go with command execution logic - Expand tools_discovery.go with resource discovery handlers - Expand tools_storage.go with storage-related operations - Expand tools_metrics.go with metrics functionality - Update tests to match new structure This consolidation reduces file count and groups related functionality together. --- internal/ai/tools/control_test.go | 6 +- internal/ai/tools/executor.go | 11 +- .../tools/infrastructure_low_coverage_test.go | 68 - internal/ai/tools/infrastructure_test.go | 4 +- internal/ai/tools/strict_resolution_test.go | 2 + internal/ai/tools/tools_alerts.go | 293 ++++- internal/ai/tools/tools_control.go | 992 ++++++++++++++- internal/ai/tools/tools_discovery.go | 421 ++++++- internal/ai/tools/tools_docker.go | 348 +++++- internal/ai/tools/tools_knowledge.go | 265 +++- internal/ai/tools/tools_kubernetes.go | 2 +- internal/ai/tools/tools_metrics.go | 536 +++++++- internal/ai/tools/tools_pmg.go | 232 +++- internal/ai/tools/tools_query.go | 60 +- internal/ai/tools/tools_storage.go | 1102 ++++++++++++++++- 15 files changed, 4236 insertions(+), 106 deletions(-) diff --git a/internal/ai/tools/control_test.go b/internal/ai/tools/control_test.go index 12d86528d..48f389b6e 100644 --- a/internal/ai/tools/control_test.go +++ b/internal/ai/tools/control_test.go @@ -34,7 +34,7 @@ func TestPulseToolExecutor_ExecuteListBackups(t *testing.T) { backupProv.On("GetBackups").Return(expectedBackups) backupProv.On("GetPBSInstances").Return([]models.PBSInstance{}) - // Use consolidated pulse_storage tool with type: "backups" + // Use pulse_storage tool with type: "backups" result, err := exec.ExecuteTool(context.Background(), "pulse_storage", map[string]interface{}{ "type": "backups", }) @@ -69,7 +69,7 @@ func TestPulseToolExecutor_ExecuteControlGuest(t *testing.T) { ExitCode: 0, }, nil) - // Use consolidated pulse_control tool with type: "guest" + // Use pulse_control tool with type: "guest" result, err := exec.ExecuteTool(context.Background(), "pulse_control", map[string]interface{}{ "type": "guest", "guest_id": "100", @@ -112,7 +112,7 @@ func TestPulseToolExecutor_ExecuteControlDocker(t *testing.T) { ExitCode: 0, }, nil) - // Use consolidated pulse_docker tool with action: "control" + // Use pulse_docker tool with action: "control" result, err := exec.ExecuteTool(context.Background(), "pulse_docker", map[string]interface{}{ "action": "control", "container": "nginx", diff --git a/internal/ai/tools/executor.go b/internal/ai/tools/executor.go index e5596c742..54a96d467 100644 --- a/internal/ai/tools/executor.go +++ b/internal/ai/tools/executor.go @@ -565,7 +565,7 @@ func (e *PulseToolExecutor) ListTools() []Tool { func (e *PulseToolExecutor) isToolAvailable(name string) bool { switch name { - // Consolidated tools - check based on primary requirements + // Check tool availability based on primary requirements case "pulse_query": return e.stateProvider != nil case "pulse_metrics": @@ -607,8 +607,7 @@ func (e *PulseToolExecutor) ExecuteTool(ctx context.Context, name string, args m // registerTools registers all available tools func (e *PulseToolExecutor) registerTools() { - // Consolidated tools (49 tools -> 10 tools) - // See plan at /Users/rcourtman/.claude/plans/atomic-wobbling-rose.md + // All tools registered below (12 tools total) // pulse_query - search, get, config, topology, list, health e.registerQueryTools() @@ -634,17 +633,17 @@ func (e *PulseToolExecutor) registerTools() { // pulse_control - guest control, run commands (requires control permission) // NOTE: For read-only command execution, use pulse_read instead - e.registerControlToolsConsolidated() + e.registerControlTools() // pulse_file_edit - read, append, write files (requires control permission) e.registerFileTools() // pulse_discovery - get, list discoveries - e.registerDiscoveryToolsConsolidated() + e.registerDiscoveryTools() // pulse_knowledge - remember, recall, incidents, correlate, relationships e.registerKnowledgeTools() // pulse_pmg - status, mail_stats, queues, spam - e.registerPMGToolsConsolidated() + e.registerPMGTools() } diff --git a/internal/ai/tools/infrastructure_low_coverage_test.go b/internal/ai/tools/infrastructure_low_coverage_test.go index a84bbad27..7d9340e41 100644 --- a/internal/ai/tools/infrastructure_low_coverage_test.go +++ b/internal/ai/tools/infrastructure_low_coverage_test.go @@ -150,44 +150,6 @@ func TestExecuteListPBSJobs(t *testing.T) { assert.Equal(t, "job1", resp.Jobs[0].ID) } -func TestExecuteGetClusterStatus(t *testing.T) { - ctx := context.Background() - exec := NewPulseToolExecutor(ExecutorConfig{StateProvider: &mockStateProvider{state: models.StateSnapshot{}}}) - - result, err := exec.executeGetClusterStatus(ctx, map[string]interface{}{}) - require.NoError(t, err) - assert.Equal(t, "No Proxmox nodes found.", result.Content[0].Text) - - state := models.StateSnapshot{ - Nodes: []models.Node{ - { - Name: "node1", - Status: "online", - IsClusterMember: true, - ClusterName: "cluster1", - Instance: "pve1", - }, - { - Name: "node2", - Status: "offline", - IsClusterMember: true, - ClusterName: "cluster1", - Instance: "pve1", - }, - }, - } - exec = NewPulseToolExecutor(ExecutorConfig{StateProvider: &mockStateProvider{state: state}}) - - result, err = exec.executeGetClusterStatus(ctx, map[string]interface{}{}) - require.NoError(t, err) - - var resp ClusterStatusResponse - require.NoError(t, json.Unmarshal([]byte(result.Content[0].Text), &resp)) - require.Len(t, resp.Clusters, 1) - assert.Equal(t, "cluster1", resp.Clusters[0].ClusterName) - assert.False(t, resp.Clusters[0].QuorumOK) -} - func TestExecuteGetConnectionHealth(t *testing.T) { ctx := context.Background() exec := NewPulseToolExecutor(ExecutorConfig{StateProvider: &mockStateProvider{state: models.StateSnapshot{}}}) @@ -349,36 +311,6 @@ func TestExecuteGetResourceDisks(t *testing.T) { assert.Equal(t, "vm1", resp.Resources[0].ID) } -func TestExecuteListRecentTasks(t *testing.T) { - ctx := context.Background() - now := time.Now() - state := models.StateSnapshot{ - PVEBackups: models.PVEBackups{ - BackupTasks: []models.BackupTask{ - { - ID: "task1", - Node: "node1", - Instance: "pve1", - Type: "backup", - Status: "ok", - StartTime: now, - }, - }, - }, - } - - exec := NewPulseToolExecutor(ExecutorConfig{StateProvider: &mockStateProvider{state: state}}) - result, err := exec.executeListRecentTasks(ctx, map[string]interface{}{ - "type": "backup", - }) - require.NoError(t, err) - - var resp RecentTasksResponse - require.NoError(t, json.Unmarshal([]byte(result.Content[0].Text), &resp)) - require.Len(t, resp.Tasks, 1) - assert.Equal(t, "task1", resp.Tasks[0].ID) -} - func TestExecuteListBackupTasks(t *testing.T) { ctx := context.Background() exec := NewPulseToolExecutor(ExecutorConfig{}) diff --git a/internal/ai/tools/infrastructure_test.go b/internal/ai/tools/infrastructure_test.go index f2fd4c922..5385ce8b0 100644 --- a/internal/ai/tools/infrastructure_test.go +++ b/internal/ai/tools/infrastructure_test.go @@ -20,7 +20,7 @@ func TestExecuteGetDiskHealth(t *testing.T) { } diskHealthProv.On("GetHosts").Return(expectedHosts) - // Use consolidated pulse_storage tool with type: "disk_health" + // Use pulse_storage tool with type: "disk_health" result, err := exec.ExecuteTool(context.Background(), "pulse_storage", map[string]interface{}{ "type": "disk_health", }) @@ -44,7 +44,7 @@ func TestExecuteGetTemperatures(t *testing.T) { } stateProv.On("GetState").Return(state) - // Use consolidated pulse_metrics tool with type: "temperatures" + // Use pulse_metrics tool with type: "temperatures" result, err := exec.ExecuteTool(context.Background(), "pulse_metrics", map[string]interface{}{ "type": "temperatures", }) diff --git a/internal/ai/tools/strict_resolution_test.go b/internal/ai/tools/strict_resolution_test.go index 799ebf462..b19f39baa 100644 --- a/internal/ai/tools/strict_resolution_test.go +++ b/internal/ai/tools/strict_resolution_test.go @@ -1489,6 +1489,8 @@ func TestExecutionIntent_ReadOnlyPipeChains(t *testing.T) { "journalctl -u nginx | grep failed", "ls -la /opt | grep -v total", "find /var/log -name '*.log' 2>/dev/null | head -20", + "ss -tuln | grep -E '3000|9090|9100|22|25'", + "netstat -tuln | grep -E '3000|9090|9100|22|25'", } for _, cmd := range allowedPipeChains { diff --git a/internal/ai/tools/tools_alerts.go b/internal/ai/tools/tools_alerts.go index 0be52ec2c..d6e138816 100644 --- a/internal/ai/tools/tools_alerts.go +++ b/internal/ai/tools/tools_alerts.go @@ -3,9 +3,10 @@ package tools import ( "context" "fmt" + "strings" ) -// registerAlertsTools registers the consolidated pulse_alerts tool +// registerAlertsTools registers the pulse_alerts tool func (e *PulseToolExecutor) registerAlertsTools() { e.registry.Register(RegisteredTool{ Definition: Tool{ @@ -94,7 +95,6 @@ Examples: } // executeAlerts routes to the appropriate alerts handler based on action -// All handler functions are implemented in tools_patrol.go func (e *PulseToolExecutor) executeAlerts(ctx context.Context, args map[string]interface{}) (CallToolResult, error) { action, _ := args["action"].(string) switch action { @@ -112,3 +112,292 @@ func (e *PulseToolExecutor) executeAlerts(ctx context.Context, args map[string]i return NewErrorResult(fmt.Errorf("unknown action: %s. Use: list, findings, resolved, resolve, dismiss", action)), nil } } + +func (e *PulseToolExecutor) executeListAlerts(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + if e.alertProvider == nil { + return NewTextResult("Alert data not available."), nil + } + + severityFilter, _ := args["severity"].(string) + limit := intArg(args, "limit", 100) + offset := intArg(args, "offset", 0) + + allAlerts := e.alertProvider.GetActiveAlerts() + + var filtered []ActiveAlert + for i, a := range allAlerts { + if i < offset { + continue + } + if len(filtered) >= limit { + break + } + if severityFilter != "" && a.Severity != severityFilter { + continue + } + filtered = append(filtered, a) + } + + if filtered == nil { + filtered = []ActiveAlert{} + } + + response := AlertsResponse{ + Alerts: filtered, + Count: len(filtered), + } + + if offset > 0 || len(allAlerts) > limit { + response.Pagination = &PaginationInfo{ + Total: len(allAlerts), + Limit: limit, + Offset: offset, + } + } + + return NewJSONResult(response), nil +} + +func (e *PulseToolExecutor) executeListFindings(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + includeDismissed, _ := args["include_dismissed"].(bool) + severityFilter, _ := args["severity"].(string) + resourceType, _ := args["resource_type"].(string) + resourceID, _ := args["resource_id"].(string) + limit := intArg(args, "limit", 100) + offset := intArg(args, "offset", 0) + if limit <= 0 { + limit = 100 + } + if offset < 0 { + offset = 0 + } + resourceType = strings.ToLower(strings.TrimSpace(resourceType)) + resourceID = strings.TrimSpace(resourceID) + if resourceType != "" { + validTypes := map[string]bool{"vm": true, "container": true, "node": true, "docker": true} + if !validTypes[resourceType] { + return NewErrorResult(fmt.Errorf("invalid resource_type: %s. Use vm, container, node, or docker", resourceType)), nil + } + } + + if e.findingsProvider == nil { + return NewTextResult("Patrol findings not available. Pulse Patrol may not be running."), nil + } + + allActive := e.findingsProvider.GetActiveFindings() + var allDismissed []Finding + if includeDismissed { + allDismissed = e.findingsProvider.GetDismissedFindings() + } + + normalizeType := func(value string) string { + normalized := strings.ToLower(strings.TrimSpace(value)) + switch normalized { + case "docker container", "docker-container", "docker_container": + return "docker" + case "lxc", "lxc container", "lxc-container", "lxc_container": + return "container" + default: + return normalized + } + } + + matches := func(f Finding) bool { + if severityFilter != "" && f.Severity != severityFilter { + return false + } + if resourceID != "" && f.ResourceID != resourceID { + return false + } + if resourceType != "" && normalizeType(f.ResourceType) != resourceType { + return false + } + return true + } + + // Filter active + var active []Finding + totalActive := 0 + for _, f := range allActive { + if !matches(f) { + continue + } + if totalActive < offset { + totalActive++ + continue + } + if len(active) >= limit { + totalActive++ + continue + } + active = append(active, f) + totalActive++ + } + + // Filter dismissed + var dismissed []Finding + totalDismissed := 0 + if includeDismissed { + for _, f := range allDismissed { + if !matches(f) { + continue + } + if totalDismissed < offset { + totalDismissed++ + continue + } + if len(dismissed) >= limit { + totalDismissed++ + continue + } + dismissed = append(dismissed, f) + totalDismissed++ + } + } + + if active == nil { + active = []Finding{} + } + if dismissed == nil { + dismissed = []Finding{} + } + + response := FindingsResponse{ + Active: active, + Dismissed: dismissed, + Counts: FindingCounts{ + Active: totalActive, + Dismissed: totalDismissed, + }, + } + + total := totalActive + totalDismissed + if offset > 0 || total > limit { + response.Pagination = &PaginationInfo{ + Total: total, + Limit: limit, + Offset: offset, + } + } + + return NewJSONResult(response), nil +} + +func (e *PulseToolExecutor) executeResolveFinding(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + findingID, _ := args["finding_id"].(string) + resolutionNote, _ := args["resolution_note"].(string) + + if findingID == "" { + return NewErrorResult(fmt.Errorf("finding_id is required")), nil + } + if resolutionNote == "" { + return NewErrorResult(fmt.Errorf("resolution_note is required")), nil + } + + if e.findingsManager == nil { + return NewTextResult("Findings manager not available."), nil + } + + if err := e.findingsManager.ResolveFinding(findingID, resolutionNote); err != nil { + return NewErrorResult(err), nil + } + + return NewJSONResult(map[string]interface{}{ + "success": true, + "finding_id": findingID, + "action": "resolved", + "resolution_note": resolutionNote, + }), nil +} + +func (e *PulseToolExecutor) executeDismissFinding(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + findingID, _ := args["finding_id"].(string) + reason, _ := args["reason"].(string) + note, _ := args["note"].(string) + + if findingID == "" { + return NewErrorResult(fmt.Errorf("finding_id is required")), nil + } + if reason == "" { + return NewErrorResult(fmt.Errorf("reason is required")), nil + } + if note == "" { + return NewErrorResult(fmt.Errorf("note is required")), nil + } + + if e.findingsManager == nil { + return NewTextResult("Findings manager not available."), nil + } + + if err := e.findingsManager.DismissFinding(findingID, reason, note); err != nil { + return NewErrorResult(err), nil + } + + return NewJSONResult(map[string]interface{}{ + "success": true, + "finding_id": findingID, + "action": "dismissed", + "reason": reason, + "note": note, + }), nil +} + +// ========== Resolved Alerts Tool Implementation ========== + +func (e *PulseToolExecutor) executeListResolvedAlerts(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + if e.stateProvider == nil { + return NewTextResult("State provider not available."), nil + } + + typeFilter, _ := args["type"].(string) + levelFilter, _ := args["level"].(string) + limit := intArg(args, "limit", 50) + + state := e.stateProvider.GetState() + + if len(state.RecentlyResolved) == 0 { + return NewTextResult("No recently resolved alerts."), nil + } + + var alerts []ResolvedAlertSummary + + for _, alert := range state.RecentlyResolved { + // Apply filters + if typeFilter != "" && !strings.EqualFold(alert.Type, typeFilter) { + continue + } + if levelFilter != "" && !strings.EqualFold(alert.Level, levelFilter) { + continue + } + + if len(alerts) >= limit { + break + } + + alerts = append(alerts, ResolvedAlertSummary{ + ID: alert.ID, + Type: alert.Type, + Level: alert.Level, + ResourceID: alert.ResourceID, + ResourceName: alert.ResourceName, + Node: alert.Node, + Instance: alert.Instance, + Message: alert.Message, + Value: alert.Value, + Threshold: alert.Threshold, + StartTime: alert.StartTime, + ResolvedTime: alert.ResolvedTime, + }) + } + + if alerts == nil { + alerts = []ResolvedAlertSummary{} + } + + response := ResolvedAlertsResponse{ + Alerts: alerts, + Total: len(state.RecentlyResolved), + } + + return NewJSONResult(response), nil +} diff --git a/internal/ai/tools/tools_control.go b/internal/ai/tools/tools_control.go index 9d2b9d618..83858abc3 100644 --- a/internal/ai/tools/tools_control.go +++ b/internal/ai/tools/tools_control.go @@ -2,11 +2,20 @@ package tools import ( "context" + "encoding/json" "fmt" + "strconv" + "strings" + + "github.com/rcourtman/pulse-go-rewrite/internal/agentexec" + "github.com/rcourtman/pulse-go-rewrite/internal/ai/approval" + "github.com/rcourtman/pulse-go-rewrite/internal/ai/safety" + "github.com/rcourtman/pulse-go-rewrite/internal/models" + "github.com/rs/zerolog/log" ) -// registerControlToolsConsolidated registers the consolidated pulse_control tool -func (e *PulseToolExecutor) registerControlToolsConsolidated() { +// registerControlTools registers the pulse_control tool +func (e *PulseToolExecutor) registerControlTools() { e.registry.Register(RegisteredTool{ Definition: Tool{ Name: "pulse_control", @@ -82,7 +91,6 @@ Note: Delete requires the guest to be stopped first.`, } // executeControl routes to the appropriate control handler based on type -// Handler functions are implemented in tools_control.go func (e *PulseToolExecutor) executeControl(ctx context.Context, args map[string]interface{}) (CallToolResult, error) { controlType, _ := args["type"].(string) switch controlType { @@ -94,3 +102,981 @@ func (e *PulseToolExecutor) executeControl(ctx context.Context, args map[string] return NewErrorResult(fmt.Errorf("unknown type: %s. Use: guest, command", controlType)), nil } } + +func (e *PulseToolExecutor) executeRunCommand(ctx context.Context, args map[string]interface{}) (CallToolResult, error) { + command, _ := args["command"].(string) + runOnHost, _ := args["run_on_host"].(bool) + targetHost, _ := args["target_host"].(string) + + if command == "" { + return NewErrorResult(fmt.Errorf("command is required")), nil + } + + // Validate resource is in resolved context + // Uses command risk classification: read-only commands bypass strict mode + // With PULSE_STRICT_RESOLUTION=true, write commands are blocked on undiscovered resources + if targetHost != "" { + validation := e.validateResolvedResourceForExec(targetHost, command, true) + if validation.IsBlocked() { + // Hard validation failure - return consistent error envelope + return NewToolResponseResult(validation.StrictError.ToToolResponse()), nil + } + if validation.ErrorMsg != "" { + // Soft validation - log warning but allow operation + log.Warn(). + Str("target", targetHost). + Str("command", command). + Str("validation_error", validation.ErrorMsg). + Msg("[Control] Target resource not in resolved context - may indicate model hallucination") + } + + // Validate routing context - block if targeting a Proxmox host when child resources exist + // This prevents accidentally executing commands on the host when user meant to target an LXC/VM + routingResult := e.validateRoutingContext(targetHost) + if routingResult.IsBlocked() { + return NewToolResponseResult(routingResult.RoutingError.ToToolResponse()), nil + } + } + + // Note: Control level read_only check is now centralized in registry.Execute() + + // Check if this is a pre-approved execution (agentic loop re-executing after user approval) + preApproved := isPreApproved(args) + + // Check security policy (skip block check - blocks cannot be pre-approved) + decision := agentexec.PolicyAllow + if e.policy != nil { + decision = e.policy.Evaluate(command) + if decision == agentexec.PolicyBlock { + return NewTextResult(formatPolicyBlocked(command, "This command is blocked by security policy")), nil + } + } + + if targetHost == "" && e.agentServer != nil { + agents := e.agentServer.GetConnectedAgents() + if len(agents) > 1 { + return NewTextResult(formatTargetHostRequired(agents)), nil + } + } + + // Skip approval checks if pre-approved or in autonomous mode + if !preApproved && !e.isAutonomous && e.controlLevel == ControlLevelControlled { + targetType := "container" + if runOnHost { + targetType = "host" + } + approvalID := createApprovalRecord(command, targetType, e.targetID, targetHost, "Control level requires approval") + return NewTextResult(formatApprovalNeeded(command, "Control level requires approval", approvalID)), nil + } + if e.isAutonomous { + log.Debug(). + Str("command", command). + Bool("read_only", safety.IsReadOnlyCommand(command)). + Msg("Auto-approving command for autonomous investigation") + } + if !preApproved && decision == agentexec.PolicyRequireApproval && !e.isAutonomous { + targetType := "container" + if runOnHost { + targetType = "host" + } + approvalID := createApprovalRecord(command, targetType, e.targetID, targetHost, "Security policy requires approval") + return NewTextResult(formatApprovalNeeded(command, "Security policy requires approval", approvalID)), nil + } + + // Execute via agent server + if e.agentServer == nil { + return NewErrorResult(fmt.Errorf("no agent server available")), nil + } + + // Resolve target to the correct agent and routing info (with full provenance) + // If targetHost is an LXC/VM name, this routes to the Proxmox host agent + // with the correct TargetType and TargetID for pct exec / qm guest exec + routing := e.resolveTargetForCommandFull(targetHost) + if routing.AgentID == "" { + if targetHost != "" { + if routing.TargetType == "container" || routing.TargetType == "vm" { + return NewErrorResult(fmt.Errorf("'%s' is a %s but no agent is available on its Proxmox host. Install Pulse Unified Agent on the Proxmox node.", targetHost, routing.TargetType)), nil + } + return NewErrorResult(fmt.Errorf("no agent available for target '%s'. Specify a valid hostname with a connected agent.", targetHost)), nil + } + return NewErrorResult(fmt.Errorf("no agent available for target")), nil + } + + log.Debug(). + Str("target_host", targetHost). + Str("agent_id", routing.AgentID). + Str("agent_host", routing.AgentHostname). + Str("resolved_kind", routing.ResolvedKind). + Str("resolved_node", routing.ResolvedNode). + Str("transport", routing.Transport). + Str("target_type", routing.TargetType). + Str("target_id", routing.TargetID). + Msg("[pulse_control] Routing command execution") + + result, err := e.agentServer.ExecuteCommand(ctx, routing.AgentID, agentexec.ExecuteCommandPayload{ + Command: command, + TargetType: routing.TargetType, + TargetID: routing.TargetID, + }) + if err != nil { + return NewErrorResult(err), nil + } + + output := result.Stdout + if result.Stderr != "" { + output += "\n" + result.Stderr + } + if result.ExitCode != 0 { + return NewTextResult(fmt.Sprintf("Command failed (exit code %d):\n%s", result.ExitCode, output)), nil + } + + // Success - always show output explicitly to prevent LLM hallucination + // When output is empty, we must be explicit about it so the LLM doesn't fabricate results + if output == "" { + return NewTextResult("Command completed successfully (exit code 0).\n\nOutput:\n(no output)"), nil + } + return NewTextResult(fmt.Sprintf("Command completed successfully (exit code 0).\n\nOutput:\n%s", output)), nil +} + +func (e *PulseToolExecutor) executeControlGuest(ctx context.Context, args map[string]interface{}) (CallToolResult, error) { + guestID, _ := args["guest_id"].(string) + action, _ := args["action"].(string) + force, _ := args["force"].(bool) + + if guestID == "" { + return NewErrorResult(fmt.Errorf("guest_id is required")), nil + } + if action == "" { + return NewErrorResult(fmt.Errorf("action is required")), nil + } + + validActions := map[string]bool{"start": true, "stop": true, "shutdown": true, "restart": true, "delete": true} + if !validActions[action] { + return NewErrorResult(fmt.Errorf("invalid action: %s. Use start, stop, shutdown, restart, or delete", action)), nil + } + + // Validate resource is in resolved context + // With PULSE_STRICT_RESOLUTION=true, this blocks execution on undiscovered resources + validation := e.validateResolvedResource(guestID, action, true) + if validation.IsBlocked() { + // Hard validation failure - return consistent error envelope + return NewToolResponseResult(validation.StrictError.ToToolResponse()), nil + } + if validation.ErrorMsg != "" { + // Soft validation - log warning but allow operation + log.Warn(). + Str("guest_id", guestID). + Str("action", action). + Str("validation_error", validation.ErrorMsg). + Msg("[ControlGuest] Guest not in resolved context - may indicate model hallucination") + } + + // Note: Control level read_only check is now centralized in registry.Execute() + + guest, err := e.resolveGuest(guestID) + if err != nil { + return NewTextResult(fmt.Sprintf("Could not find guest '%s': %v", guestID, err)), nil + } + + // Check if guest is protected + vmidStr := fmt.Sprintf("%d", guest.VMID) + for _, protected := range e.protectedGuests { + if protected == vmidStr || protected == guest.Name { + return NewTextResult(fmt.Sprintf("Guest %s (VMID %d) is protected and cannot be controlled by Pulse Assistant.", guest.Name, guest.VMID)), nil + } + } + + // Build the command + cmdTool := "pct" + if guest.Type == "vm" { + cmdTool = "qm" + } + + // For delete action, verify guest is stopped first + if action == "delete" && guest.Status != "stopped" { + return NewTextResult(fmt.Sprintf("Cannot delete %s (VMID %d) - it is currently %s. Stop it first, then try deleting again.", guest.Name, guest.VMID, guest.Status)), nil + } + + var command string + switch action { + case "start": + command = fmt.Sprintf("%s start %d", cmdTool, guest.VMID) + case "stop": + command = fmt.Sprintf("%s stop %d", cmdTool, guest.VMID) + case "shutdown": + command = fmt.Sprintf("%s shutdown %d", cmdTool, guest.VMID) + case "restart": + command = fmt.Sprintf("%s reboot %d", cmdTool, guest.VMID) + case "delete": + // Delete uses 'destroy' subcommand with --purge to also remove associated storage + command = fmt.Sprintf("%s destroy %d --purge", cmdTool, guest.VMID) + } + + if force && action == "stop" { + command = fmt.Sprintf("%s stop %d --skiplock", cmdTool, guest.VMID) + } + + // Check if this is a pre-approved execution (agentic loop re-executing after user approval) + preApproved := isPreApproved(args) + + // Check security policy (skip if pre-approved) + if !preApproved && e.policy != nil { + decision := e.policy.Evaluate(command) + if decision == agentexec.PolicyBlock { + return NewTextResult(formatPolicyBlocked(command, "This command is blocked by security policy")), nil + } + if decision == agentexec.PolicyRequireApproval && !e.isAutonomous { + // Use guest.Node (the Proxmox host) as targetName so approval execution can find the correct agent + approvalID := createApprovalRecord(command, guest.Type, fmt.Sprintf("%d", guest.VMID), guest.Node, fmt.Sprintf("%s guest %s", action, guest.Name)) + return NewTextResult(formatControlApprovalNeeded(guest.Name, guest.VMID, action, command, approvalID)), nil + } + } + + // Check control level - this must be outside policy check since policy may be nil (skip if pre-approved) + if !preApproved && e.controlLevel == ControlLevelControlled { + // Use guest.Node (the Proxmox host) as targetName so approval execution can find the correct agent + approvalID := createApprovalRecord(command, guest.Type, fmt.Sprintf("%d", guest.VMID), guest.Node, fmt.Sprintf("%s guest %s", action, guest.Name)) + return NewTextResult(formatControlApprovalNeeded(guest.Name, guest.VMID, action, command, approvalID)), nil + } + + if e.agentServer == nil { + return NewErrorResult(fmt.Errorf("no agent server available")), nil + } + + agentID := e.findAgentForNode(guest.Node) + if agentID == "" { + return NewTextResult(fmt.Sprintf("No agent available on node '%s'. Install Pulse Unified Agent on the Proxmox host to enable control.", guest.Node)), nil + } + + result, err := e.agentServer.ExecuteCommand(ctx, agentID, agentexec.ExecuteCommandPayload{ + Command: command, + TargetType: "host", + TargetID: "", + }) + if err != nil { + return NewErrorResult(err), nil + } + + output := result.Stdout + if result.Stderr != "" { + output += "\n" + result.Stderr + } + + if result.ExitCode == 0 { + return NewTextResult(fmt.Sprintf("✓ Successfully executed '%s' on %s (VMID %d). Action complete - no verification needed (state updates in ~10s).\n%s", action, guest.Name, guest.VMID, output)), nil + } + + return NewTextResult(fmt.Sprintf("Command failed (exit code %d):\n%s", result.ExitCode, output)), nil +} + +func (e *PulseToolExecutor) executeControlDocker(ctx context.Context, args map[string]interface{}) (CallToolResult, error) { + containerName, _ := args["container"].(string) + hostName, _ := args["host"].(string) + action, _ := args["action"].(string) + + if containerName == "" { + return NewErrorResult(fmt.Errorf("container name is required")), nil + } + if action == "" { + return NewErrorResult(fmt.Errorf("action is required")), nil + } + + validActions := map[string]bool{"start": true, "stop": true, "restart": true} + if !validActions[action] { + return NewErrorResult(fmt.Errorf("invalid action: %s. Use start, stop, or restart", action)), nil + } + + // Validate resource is in resolved context + // With PULSE_STRICT_RESOLUTION=true, this blocks execution on undiscovered resources + validation := e.validateResolvedResource(containerName, action, true) + if validation.IsBlocked() { + // Hard validation failure - return consistent error envelope + return NewToolResponseResult(validation.StrictError.ToToolResponse()), nil + } + if validation.ErrorMsg != "" { + // Soft validation - log warning but allow operation + log.Warn(). + Str("container", containerName). + Str("action", action). + Str("host", hostName). + Str("validation_error", validation.ErrorMsg). + Msg("[ControlDocker] Container not in resolved context - may indicate model hallucination") + } + + // Note: Control level read_only check is now centralized in registry.Execute() + + // Check if this is a pre-approved execution (agentic loop re-executing after user approval) + preApproved := isPreApproved(args) + + container, dockerHost, err := e.resolveDockerContainer(containerName, hostName) + if err != nil { + return NewTextResult(fmt.Sprintf("Could not find Docker container '%s': %v", containerName, err)), nil + } + + command := fmt.Sprintf("docker %s %s", action, container.Name) + + // Get the agent hostname for approval records (may differ from docker host display name) + agentHostname := e.getAgentHostnameForDockerHost(dockerHost) + + // Skip approval checks if pre-approved + if !preApproved && e.policy != nil { + decision := e.policy.Evaluate(command) + if decision == agentexec.PolicyBlock { + return NewTextResult(formatPolicyBlocked(command, "This command is blocked by security policy")), nil + } + if decision == agentexec.PolicyRequireApproval && !e.isAutonomous { + approvalID := createApprovalRecord(command, "docker", container.Name, agentHostname, fmt.Sprintf("%s Docker container %s", action, container.Name)) + return NewTextResult(formatDockerApprovalNeeded(container.Name, dockerHost.Hostname, action, command, approvalID)), nil + } + } + + // Check control level - this must be outside policy check since policy may be nil (skip if pre-approved) + if !preApproved && e.controlLevel == ControlLevelControlled { + approvalID := createApprovalRecord(command, "docker", container.Name, agentHostname, fmt.Sprintf("%s Docker container %s", action, container.Name)) + return NewTextResult(formatDockerApprovalNeeded(container.Name, dockerHost.Hostname, action, command, approvalID)), nil + } + + if e.agentServer == nil { + return NewErrorResult(fmt.Errorf("no agent server available")), nil + } + + // Resolve the Docker host to the correct agent and routing info (with full provenance) + routing := e.resolveDockerHostRoutingFull(dockerHost) + if routing.AgentID == "" { + if routing.TargetType == "container" || routing.TargetType == "vm" { + return NewTextResult(fmt.Sprintf("Docker host '%s' is a %s but no agent is available on its Proxmox host. Install Pulse Unified Agent on the Proxmox node.", dockerHost.Hostname, routing.TargetType)), nil + } + return NewTextResult(fmt.Sprintf("No agent available on Docker host '%s'. Install Pulse Unified Agent on the host to enable control.", dockerHost.Hostname)), nil + } + + log.Debug(). + Str("docker_host", dockerHost.Hostname). + Str("agent_id", routing.AgentID). + Str("agent_host", routing.AgentHostname). + Str("resolved_kind", routing.ResolvedKind). + Str("resolved_node", routing.ResolvedNode). + Str("transport", routing.Transport). + Str("target_type", routing.TargetType). + Str("target_id", routing.TargetID). + Msg("[pulse_control docker] Routing docker command execution") + + result, err := e.agentServer.ExecuteCommand(ctx, routing.AgentID, agentexec.ExecuteCommandPayload{ + Command: command, + TargetType: routing.TargetType, + TargetID: routing.TargetID, + }) + if err != nil { + return NewErrorResult(err), nil + } + + output := result.Stdout + if result.Stderr != "" { + output += "\n" + result.Stderr + } + + if result.ExitCode == 0 { + return NewTextResult(fmt.Sprintf("✓ Successfully executed 'docker %s' on container '%s' (host: %s). Action complete - no verification needed (state updates in ~10s).\n%s", action, container.Name, dockerHost.Hostname, output)), nil + } + + return NewTextResult(fmt.Sprintf("Command failed (exit code %d):\n%s", result.ExitCode, output)), nil +} + +// Helper methods for control tools + +// CommandRoutingResult contains full routing information for command execution. +// This provides the provenance needed to verify where commands actually run. +type CommandRoutingResult struct { + // Routing info for agent + AgentID string // The agent that will execute the command + TargetType string // "host", "container", or "vm" + TargetID string // VMID for LXC/VM, empty for host + + // Provenance info + AgentHostname string // Hostname of the agent + ResolvedKind string // What kind of resource we resolved to: "node", "lxc", "vm", "docker", "host" + ResolvedNode string // Proxmox node name (if applicable) + Transport string // How command will be executed: "direct", "pct_exec", "qm_guest_exec" +} + +// resolveTargetForCommandFull resolves a target_host to full routing info including provenance. +// Use this for write operations where you need to verify execution context. +// +// CRITICAL ORDERING: Topology resolution (state.ResolveResource) happens FIRST. +// Agent hostname matching is a FALLBACK only when the state doesn't know the resource. +// This prevents the "hostname collision" bug where an agent with hostname matching an LXC name +// causes commands to execute on the node instead of inside the LXC via pct exec. +func (e *PulseToolExecutor) resolveTargetForCommandFull(targetHost string) CommandRoutingResult { + result := CommandRoutingResult{ + TargetType: "host", + Transport: "direct", + } + + if e.agentServer == nil { + return result + } + + agents := e.agentServer.GetConnectedAgents() + if len(agents) == 0 { + return result + } + + if targetHost == "" { + // No target_host specified - require exactly one agent or fail + if len(agents) > 1 { + return result + } + result.AgentID = agents[0].AgentID + result.AgentHostname = agents[0].Hostname + result.ResolvedKind = "host" + return result + } + + // STEP 1: Consult topology (state) FIRST — this is authoritative. + // If the state knows about this resource, use topology-based routing. + // This prevents hostname collisions from masquerading as host targets. + if e.stateProvider != nil { + state := e.stateProvider.GetState() + loc := state.ResolveResource(targetHost) + + if loc.Found { + // Route based on resource type + switch loc.ResourceType { + case "node": + // Direct Proxmox node + nodeAgentID := e.findAgentForNode(loc.Node) + result.AgentID = nodeAgentID + result.ResolvedKind = "node" + result.ResolvedNode = loc.Node + for _, agent := range agents { + if agent.AgentID == nodeAgentID { + result.AgentHostname = agent.Hostname + break + } + } + return result + + case "lxc": + // LXC container - route through Proxmox node agent via pct exec + nodeAgentID := e.findAgentForNode(loc.Node) + result.ResolvedKind = "lxc" + result.ResolvedNode = loc.Node + result.TargetType = "container" + result.TargetID = fmt.Sprintf("%d", loc.VMID) + result.Transport = "pct_exec" + if nodeAgentID != "" { + result.AgentID = nodeAgentID + for _, agent := range agents { + if agent.AgentID == nodeAgentID { + result.AgentHostname = agent.Hostname + break + } + } + } + return result + + case "vm": + // VM - route through Proxmox node agent via qm guest exec + nodeAgentID := e.findAgentForNode(loc.Node) + result.ResolvedKind = "vm" + result.ResolvedNode = loc.Node + result.TargetType = "vm" + result.TargetID = fmt.Sprintf("%d", loc.VMID) + result.Transport = "qm_guest_exec" + if nodeAgentID != "" { + result.AgentID = nodeAgentID + for _, agent := range agents { + if agent.AgentID == nodeAgentID { + result.AgentHostname = agent.Hostname + break + } + } + } + return result + + case "docker", "dockerhost": + // Docker container or Docker host + result.ResolvedKind = loc.ResourceType + result.ResolvedNode = loc.Node + + if loc.DockerHostType == "lxc" { + nodeAgentID := e.findAgentForNode(loc.Node) + result.TargetType = "container" + result.TargetID = fmt.Sprintf("%d", loc.DockerHostVMID) + result.Transport = "pct_exec" + if nodeAgentID != "" { + result.AgentID = nodeAgentID + for _, agent := range agents { + if agent.AgentID == nodeAgentID { + result.AgentHostname = agent.Hostname + break + } + } + } + return result + } + if loc.DockerHostType == "vm" { + nodeAgentID := e.findAgentForNode(loc.Node) + result.TargetType = "vm" + result.TargetID = fmt.Sprintf("%d", loc.DockerHostVMID) + result.Transport = "qm_guest_exec" + if nodeAgentID != "" { + result.AgentID = nodeAgentID + for _, agent := range agents { + if agent.AgentID == nodeAgentID { + result.AgentHostname = agent.Hostname + break + } + } + } + return result + } + // Standalone Docker host - find agent directly + for _, agent := range agents { + if agent.Hostname == loc.TargetHost || agent.AgentID == loc.TargetHost { + result.AgentID = agent.AgentID + result.AgentHostname = agent.Hostname + return result + } + } + } + } + } + + // STEP 2: FALLBACK — agent hostname match. + // Only used when the state doesn't know about this resource at all. + // This handles standalone hosts without Proxmox topology. + for _, agent := range agents { + if agent.Hostname == targetHost || agent.AgentID == targetHost { + result.AgentID = agent.AgentID + result.AgentHostname = agent.Hostname + result.ResolvedKind = "host" + return result + } + } + + return result +} + +// resolveTargetForCommand resolves a target_host to the correct agent and routing info. +// Uses the authoritative ResolveResource function from models.StateSnapshot. +// Returns: agentID, targetType ("host", "container", or "vm"), targetID (vmid for LXC/VM) +// +// CRITICAL ORDERING: Same as resolveTargetForCommandFull — topology first, agent fallback second. +func (e *PulseToolExecutor) resolveTargetForCommand(targetHost string) (agentID string, targetType string, targetID string) { + // Delegate to the full resolver and extract the triple + r := e.resolveTargetForCommandFull(targetHost) + return r.AgentID, r.TargetType, r.TargetID +} + +func (e *PulseToolExecutor) findAgentForCommand(runOnHost bool, targetHost string) string { + agentID, _, _ := e.resolveTargetForCommand(targetHost) + return agentID +} + +func (e *PulseToolExecutor) resolveGuest(guestID string) (*GuestInfo, error) { + if e.stateProvider == nil { + return nil, fmt.Errorf("state provider not available") + } + + state := e.stateProvider.GetState() + vmid, err := strconv.Atoi(guestID) + + for _, vm := range state.VMs { + if (err == nil && vm.VMID == vmid) || vm.Name == guestID || vm.ID == guestID { + return &GuestInfo{ + VMID: vm.VMID, + Name: vm.Name, + Node: vm.Node, + Type: "vm", + Status: vm.Status, + Instance: vm.Instance, + }, nil + } + } + + for _, ct := range state.Containers { + if (err == nil && ct.VMID == vmid) || ct.Name == guestID || ct.ID == guestID { + return &GuestInfo{ + VMID: ct.VMID, + Name: ct.Name, + Node: ct.Node, + Type: "lxc", + Status: ct.Status, + Instance: ct.Instance, + }, nil + } + } + + return nil, fmt.Errorf("no VM or container found with ID or name '%s'", guestID) +} + +func (e *PulseToolExecutor) resolveDockerContainer(containerName, hostName string) (*models.DockerContainer, *models.DockerHost, error) { + if e.stateProvider == nil { + return nil, nil, fmt.Errorf("state provider not available") + } + + state := e.stateProvider.GetState() + type dockerMatch struct { + host *models.DockerHost + idx int + } + matches := []dockerMatch{} + + for i := range state.DockerHosts { + host := &state.DockerHosts[i] + if hostName != "" && host.Hostname != hostName && host.DisplayName != hostName { + continue + } + + for ci := range host.Containers { + container := host.Containers[ci] + if container.Name == containerName || + container.ID == containerName || + strings.HasPrefix(container.ID, containerName) { + matches = append(matches, dockerMatch{host: host, idx: ci}) + } + } + } + + if hostName != "" { + if len(matches) == 0 { + return nil, nil, fmt.Errorf("container '%s' not found on host '%s'", containerName, hostName) + } + match := matches[0] + return &match.host.Containers[match.idx], match.host, nil + } + + if len(matches) == 0 { + return nil, nil, fmt.Errorf("container '%s' not found on any Docker host", containerName) + } + if len(matches) > 1 { + hostNames := make([]string, 0, len(matches)) + seen := make(map[string]bool) + for _, match := range matches { + name := strings.TrimSpace(match.host.DisplayName) + if name == "" { + name = strings.TrimSpace(match.host.Hostname) + } + if name == "" { + name = strings.TrimSpace(match.host.ID) + } + if name == "" || seen[name] { + continue + } + hostNames = append(hostNames, name) + seen[name] = true + } + if len(hostNames) == 0 { + return nil, nil, fmt.Errorf("container '%s' exists on multiple Docker hosts; specify host", containerName) + } + return nil, nil, fmt.Errorf("container '%s' exists on multiple Docker hosts: %s. Specify host.", containerName, strings.Join(hostNames, ", ")) + } + + match := matches[0] + return &match.host.Containers[match.idx], match.host, nil +} + +func (e *PulseToolExecutor) findAgentForNode(nodeName string) string { + if e.agentServer == nil { + return "" + } + + agents := e.agentServer.GetConnectedAgents() + for _, agent := range agents { + if agent.Hostname == nodeName { + return agent.AgentID + } + } + + if e.stateProvider != nil { + state := e.stateProvider.GetState() + for _, host := range state.Hosts { + if host.LinkedNodeID != "" { + for _, node := range state.Nodes { + if node.ID == host.LinkedNodeID && node.Name == nodeName { + for _, agent := range agents { + if agent.Hostname == host.Hostname || agent.AgentID == host.ID { + return agent.AgentID + } + } + } + } + } + } + } + + return "" +} + +func (e *PulseToolExecutor) findAgentForDockerHost(dockerHost *models.DockerHost) string { + if e.agentServer == nil { + return "" + } + + agents := e.agentServer.GetConnectedAgents() + + // First try to match by AgentID (most reliable) + if dockerHost.AgentID != "" { + for _, agent := range agents { + if agent.AgentID == dockerHost.AgentID { + return agent.AgentID + } + } + } + + // Fall back to hostname match + for _, agent := range agents { + if agent.Hostname == dockerHost.Hostname { + return agent.AgentID + } + } + + return "" +} + +// getAgentHostnameForDockerHost finds the agent hostname for a Docker host (for approval records) +func (e *PulseToolExecutor) getAgentHostnameForDockerHost(dockerHost *models.DockerHost) string { + if e.agentServer == nil { + return dockerHost.Hostname // fallback + } + + agents := e.agentServer.GetConnectedAgents() + + // Try to match by AgentID first + if dockerHost.AgentID != "" { + for _, agent := range agents { + if agent.AgentID == dockerHost.AgentID { + return agent.Hostname + } + } + } + + // Fall back to the docker host's hostname + return dockerHost.Hostname +} + +// resolveDockerHostRoutingFull resolves a Docker host to the correct agent and routing info +// with full provenance metadata. If the Docker host is actually an LXC or VM, it routes +// through the Proxmox host agent with the correct TargetType and TargetID so commands +// are executed inside the guest. +func (e *PulseToolExecutor) resolveDockerHostRoutingFull(dockerHost *models.DockerHost) CommandRoutingResult { + result := CommandRoutingResult{ + TargetType: "host", + Transport: "direct", + } + + if e.agentServer == nil { + return result + } + + // STEP 1: Check topology — is the Docker host actually an LXC or VM? + if e.stateProvider != nil { + state := e.stateProvider.GetState() + + // Check LXCs + for _, ct := range state.Containers { + if ct.Name == dockerHost.Hostname { + result.ResolvedKind = "lxc" + result.ResolvedNode = ct.Node + result.TargetType = "container" + result.TargetID = fmt.Sprintf("%d", ct.VMID) + result.Transport = "pct_exec" + nodeAgentID := e.findAgentForNode(ct.Node) + if nodeAgentID != "" { + result.AgentID = nodeAgentID + result.AgentHostname = ct.Node + log.Debug(). + Str("docker_host", dockerHost.Hostname). + Str("node", ct.Node). + Int("vmid", ct.VMID). + Str("agent", nodeAgentID). + Str("transport", result.Transport). + Msg("Resolved Docker host as LXC, routing through Proxmox agent") + } + return result + } + } + + // Check VMs + for _, vm := range state.VMs { + if vm.Name == dockerHost.Hostname { + result.ResolvedKind = "vm" + result.ResolvedNode = vm.Node + result.TargetType = "vm" + result.TargetID = fmt.Sprintf("%d", vm.VMID) + result.Transport = "qm_guest_exec" + nodeAgentID := e.findAgentForNode(vm.Node) + if nodeAgentID != "" { + result.AgentID = nodeAgentID + result.AgentHostname = vm.Node + log.Debug(). + Str("docker_host", dockerHost.Hostname). + Str("node", vm.Node). + Int("vmid", vm.VMID). + Str("agent", nodeAgentID). + Str("transport", result.Transport). + Msg("Resolved Docker host as VM, routing through Proxmox agent") + } + return result + } + } + } + + // STEP 2: Docker host is not an LXC/VM — use direct agent routing + agentID := e.findAgentForDockerHost(dockerHost) + result.AgentID = agentID + result.ResolvedKind = "dockerhost" + if agentID != "" { + // Try to get agent hostname + agents := e.agentServer.GetConnectedAgents() + for _, a := range agents { + if a.AgentID == agentID { + result.AgentHostname = a.Hostname + break + } + } + } + return result +} + +// resolveDockerHostRouting delegates to resolveDockerHostRoutingFull for backwards compatibility. +func (e *PulseToolExecutor) resolveDockerHostRouting(dockerHost *models.DockerHost) (agentID string, targetType string, targetID string) { + r := e.resolveDockerHostRoutingFull(dockerHost) + return r.AgentID, r.TargetType, r.TargetID +} + +// createApprovalRecord creates an approval record in the store and returns the approval ID. +// Returns empty string if store is not available (approvals will still work, just without persistence). +func createApprovalRecord(command, targetType, targetID, targetName, context string) string { + store := approval.GetStore() + if store == nil { + log.Debug().Msg("Approval store not available, approval will not be persisted") + return "" + } + + req := &approval.ApprovalRequest{ + Command: command, + TargetType: targetType, + TargetID: targetID, + TargetName: targetName, + Context: context, + } + + if err := store.CreateApproval(req); err != nil { + log.Warn().Err(err).Msg("Failed to create approval record") + return "" + } + + log.Debug().Str("approval_id", req.ID).Str("command", command).Msg("Created approval record") + return req.ID +} + +// isPreApproved checks if the args contain a valid, approved approval_id. +// This is used when the agentic loop re-executes a tool after user approval. +func isPreApproved(args map[string]interface{}) bool { + approvalID, ok := args["_approval_id"].(string) + if !ok || approvalID == "" { + return false + } + + store := approval.GetStore() + if store == nil { + return false + } + + req, found := store.GetApproval(approvalID) + if !found { + log.Debug().Str("approval_id", approvalID).Msg("Pre-approval check: approval not found") + return false + } + + if req.Status == approval.StatusApproved { + log.Debug().Str("approval_id", approvalID).Msg("Pre-approval check: approved, skipping approval flow") + return true + } + + log.Debug().Str("approval_id", approvalID).Str("status", string(req.Status)).Msg("Pre-approval check: not approved") + return false +} + +// Formatting helpers for control tools + +func formatApprovalNeeded(command, reason, approvalID string) string { + payload := map[string]interface{}{ + "type": "approval_required", + "approval_id": approvalID, + "command": command, + "reason": reason, + "how_to_approve": "Click the approval button in the chat to execute this command.", + "do_not_retry": true, + } + b, _ := json.Marshal(payload) + return "APPROVAL_REQUIRED: " + string(b) +} + +func formatPolicyBlocked(command, reason string) string { + payload := map[string]interface{}{ + "type": "policy_blocked", + "command": command, + "reason": reason, + "do_not_retry": true, + } + b, _ := json.Marshal(payload) + return "POLICY_BLOCKED: " + string(b) +} + +func formatTargetHostRequired(agents []agentexec.ConnectedAgent) string { + hostnames := make([]string, 0, len(agents)) + for _, agent := range agents { + name := strings.TrimSpace(agent.Hostname) + if name == "" { + name = strings.TrimSpace(agent.AgentID) + } + if name != "" { + hostnames = append(hostnames, name) + } + } + if len(hostnames) == 0 { + return "Multiple agents are connected. Please specify target_host." + } + maxItems := 6 + list := hostnames + if len(hostnames) > maxItems { + list = hostnames[:maxItems] + } + message := fmt.Sprintf("Multiple agents are connected. Please specify target_host. Available: %s", strings.Join(list, ", ")) + if len(hostnames) > maxItems { + message = fmt.Sprintf("%s (+%d more)", message, len(hostnames)-maxItems) + } + return message +} + +func formatControlApprovalNeeded(name string, vmid int, action, command, approvalID string) string { + payload := map[string]interface{}{ + "type": "approval_required", + "approval_id": approvalID, + "guest_name": name, + "guest_vmid": vmid, + "action": action, + "command": command, + "how_to_approve": "Click the approval button in the chat to execute this action.", + "do_not_retry": true, + } + b, _ := json.Marshal(payload) + return "APPROVAL_REQUIRED: " + string(b) +} + +func formatDockerApprovalNeeded(name, host, action, command, approvalID string) string { + payload := map[string]interface{}{ + "type": "approval_required", + "approval_id": approvalID, + "container_name": name, + "docker_host": host, + "action": action, + "command": command, + "how_to_approve": "Click the approval button in the chat to execute this action.", + "do_not_retry": true, + } + b, _ := json.Marshal(payload) + return "APPROVAL_REQUIRED: " + string(b) +} diff --git a/internal/ai/tools/tools_discovery.go b/internal/ai/tools/tools_discovery.go index 325738712..077211e5f 100644 --- a/internal/ai/tools/tools_discovery.go +++ b/internal/ai/tools/tools_discovery.go @@ -3,10 +3,12 @@ package tools import ( "context" "fmt" + "strconv" + "strings" ) -// registerDiscoveryToolsConsolidated registers the consolidated pulse_discovery tool -func (e *PulseToolExecutor) registerDiscoveryToolsConsolidated() { +// registerDiscoveryTools registers the pulse_discovery tool +func (e *PulseToolExecutor) registerDiscoveryTools() { e.registry.Register(RegisteredTool{ Definition: Tool{ Name: "pulse_discovery", @@ -73,7 +75,6 @@ Examples: } // executeDiscovery routes to the appropriate discovery handler based on action -// Handler functions are implemented in tools_discovery.go func (e *PulseToolExecutor) executeDiscovery(ctx context.Context, args map[string]interface{}) (CallToolResult, error) { action, _ := args["action"].(string) switch action { @@ -85,3 +86,417 @@ func (e *PulseToolExecutor) executeDiscovery(ctx context.Context, args map[strin return NewErrorResult(fmt.Errorf("unknown action: %s. Use: get, list", action)), nil } } + +// getCommandContext returns information about how to run commands on a resource. +// This helps the AI understand what commands to use with pulse_control. +type CommandContext struct { + // How to run commands: "direct" (agent on resource), "via_host" (agent on parent host) + Method string `json:"method"` + // The target_host value to use with pulse_control + TargetHost string `json:"target_host"` + // Example command pattern (what to pass to pulse_control) + Example string `json:"example"` + // For containers running inside this resource (e.g., Docker in LXC) + NestedExample string `json:"nested_example,omitempty"` +} + +// getCLIAccessPattern returns context about the resource type. +// Does NOT prescribe how to access - the AI should determine that based on available agents. +func getCLIAccessPattern(resourceType, hostID, resourceID string) string { + switch resourceType { + case "lxc": + return fmt.Sprintf("LXC container on Proxmox node '%s' (VMID %s)", hostID, resourceID) + case "vm": + return fmt.Sprintf("VM on Proxmox node '%s' (VMID %s)", hostID, resourceID) + case "docker": + return fmt.Sprintf("Docker container '%s' on host '%s'", resourceID, hostID) + case "host": + return fmt.Sprintf("Host '%s'", hostID) + default: + return "" + } +} + +// commonServicePaths contains typical log/config paths for well-known services +// These are fallbacks when discovery doesn't find specific paths +var commonServicePaths = map[string]struct { + LogPaths []string + ConfigPaths []string + DataPaths []string +}{ + "jellyfin": { + LogPaths: []string{"/var/log/jellyfin/", "/config/log/"}, + ConfigPaths: []string{"/etc/jellyfin/", "/config/"}, + DataPaths: []string{"/var/lib/jellyfin/", "/config/data/"}, + }, + "plex": { + LogPaths: []string{"/var/lib/plexmediaserver/Library/Application Support/Plex Media Server/Logs/"}, + ConfigPaths: []string{"/var/lib/plexmediaserver/Library/Application Support/Plex Media Server/"}, + DataPaths: []string{"/var/lib/plexmediaserver/"}, + }, + "sonarr": { + LogPaths: []string{"/config/logs/"}, + ConfigPaths: []string{"/config/"}, + DataPaths: []string{"/config/"}, + }, + "radarr": { + LogPaths: []string{"/config/logs/"}, + ConfigPaths: []string{"/config/"}, + DataPaths: []string{"/config/"}, + }, + "prowlarr": { + LogPaths: []string{"/config/logs/"}, + ConfigPaths: []string{"/config/"}, + DataPaths: []string{"/config/"}, + }, + "lidarr": { + LogPaths: []string{"/config/logs/"}, + ConfigPaths: []string{"/config/"}, + DataPaths: []string{"/config/"}, + }, + "postgresql": { + LogPaths: []string{"/var/log/postgresql/", "/var/lib/postgresql/data/log/"}, + ConfigPaths: []string{"/etc/postgresql/", "/var/lib/postgresql/data/"}, + DataPaths: []string{"/var/lib/postgresql/data/"}, + }, + "mysql": { + LogPaths: []string{"/var/log/mysql/", "/var/lib/mysql/"}, + ConfigPaths: []string{"/etc/mysql/"}, + DataPaths: []string{"/var/lib/mysql/"}, + }, + "mariadb": { + LogPaths: []string{"/var/log/mysql/", "/var/lib/mysql/"}, + ConfigPaths: []string{"/etc/mysql/"}, + DataPaths: []string{"/var/lib/mysql/"}, + }, + "nginx": { + LogPaths: []string{"/var/log/nginx/"}, + ConfigPaths: []string{"/etc/nginx/"}, + DataPaths: []string{"/var/www/"}, + }, + "homeassistant": { + LogPaths: []string{"/config/home-assistant.log"}, + ConfigPaths: []string{"/config/"}, + DataPaths: []string{"/config/"}, + }, + "frigate": { + LogPaths: []string{"/config/logs/"}, + ConfigPaths: []string{"/config/"}, + DataPaths: []string{"/media/frigate/"}, + }, + "redis": { + LogPaths: []string{"/var/log/redis/"}, + ConfigPaths: []string{"/etc/redis/"}, + DataPaths: []string{"/var/lib/redis/"}, + }, + "mongodb": { + LogPaths: []string{"/var/log/mongodb/"}, + ConfigPaths: []string{"/etc/mongod.conf"}, + DataPaths: []string{"/var/lib/mongodb/"}, + }, + "grafana": { + LogPaths: []string{"/var/log/grafana/"}, + ConfigPaths: []string{"/etc/grafana/"}, + DataPaths: []string{"/var/lib/grafana/"}, + }, + "prometheus": { + LogPaths: []string{"/var/log/prometheus/"}, + ConfigPaths: []string{"/etc/prometheus/"}, + DataPaths: []string{"/var/lib/prometheus/"}, + }, + "influxdb": { + LogPaths: []string{"/var/log/influxdb/"}, + ConfigPaths: []string{"/etc/influxdb/"}, + DataPaths: []string{"/var/lib/influxdb/"}, + }, +} + +// getCommonServicePaths returns fallback paths for a service type +func getCommonServicePaths(serviceType string) (logPaths, configPaths, dataPaths []string) { + // Normalize service type (lowercase, remove version numbers) + normalized := strings.ToLower(serviceType) + // Try to match against known services + for key, paths := range commonServicePaths { + if strings.Contains(normalized, key) { + return paths.LogPaths, paths.ConfigPaths, paths.DataPaths + } + } + return nil, nil, nil +} + +func (e *PulseToolExecutor) executeGetDiscovery(ctx context.Context, args map[string]interface{}) (CallToolResult, error) { + if e.discoveryProvider == nil { + return NewTextResult("Discovery service not available."), nil + } + + resourceType, _ := args["resource_type"].(string) + resourceID, _ := args["resource_id"].(string) + hostID, _ := args["host_id"].(string) + + if resourceType == "" { + return NewErrorResult(fmt.Errorf("resource_type is required")), nil + } + if resourceID == "" { + return NewErrorResult(fmt.Errorf("resource_id is required")), nil + } + if hostID == "" { + return NewErrorResult(fmt.Errorf("host_id is required - use the 'node' field from search or get_resource results")), nil + } + + // For LXC and VM types, resourceID should be a numeric VMID. + // If a name was passed, try to resolve it to a VMID. + if (resourceType == "lxc" || resourceType == "vm") && e.stateProvider != nil { + if _, err := strconv.Atoi(resourceID); err != nil { + // Not a number - try to resolve the name to a VMID + state := e.stateProvider.GetState() + resolved := false + + if resourceType == "lxc" { + for _, c := range state.Containers { + if strings.EqualFold(c.Name, resourceID) && c.Node == hostID { + resourceID = fmt.Sprintf("%d", c.VMID) + resolved = true + break + } + } + } else if resourceType == "vm" { + for _, vm := range state.VMs { + if strings.EqualFold(vm.Name, resourceID) && vm.Node == hostID { + resourceID = fmt.Sprintf("%d", vm.VMID) + resolved = true + break + } + } + } + + if !resolved { + return NewErrorResult(fmt.Errorf("could not resolve resource name '%s' to a VMID on host '%s'", resourceID, hostID)), nil + } + } + } + + // First try to get existing discovery + discovery, err := e.discoveryProvider.GetDiscoveryByResource(resourceType, hostID, resourceID) + if err != nil { + return NewErrorResult(fmt.Errorf("failed to get discovery: %w", err)), nil + } + + // Compute CLI access pattern (always useful, even if discovery fails) + cliAccess := getCLIAccessPattern(resourceType, hostID, resourceID) + + // If no discovery exists, trigger one + if discovery == nil { + discovery, err = e.discoveryProvider.TriggerDiscovery(ctx, resourceType, hostID, resourceID) + if err != nil { + // Even on failure, provide cli_access so AI can investigate manually + return NewJSONResult(map[string]interface{}{ + "found": false, + "resource_type": resourceType, + "resource_id": resourceID, + "host_id": hostID, + "cli_access": cliAccess, + "message": fmt.Sprintf("Discovery failed: %v", err), + "hint": "Use pulse_control with type='command' to investigate. Try checking /var/log/ for logs.", + }), nil + } + } + + if discovery == nil { + // No discovery but provide cli_access for manual investigation + return NewJSONResult(map[string]interface{}{ + "found": false, + "resource_type": resourceType, + "resource_id": resourceID, + "host_id": hostID, + "cli_access": cliAccess, + "message": "Discovery returned no data. The resource may not be accessible.", + "hint": "Use pulse_control with type='command' to investigate. Try listing /var/log/ or checking running processes.", + }), nil + } + + // Use fallback cli_access if discovery didn't provide one + responseCLIAccess := discovery.CLIAccess + if responseCLIAccess == "" { + responseCLIAccess = cliAccess + } + + // Use fallback paths for known services if discovery didn't find specific ones + responseConfigPaths := discovery.ConfigPaths + responseDataPaths := discovery.DataPaths + var responseLogPaths []string + + if discovery.ServiceType != "" { + fallbackLogPaths, fallbackConfigPaths, fallbackDataPaths := getCommonServicePaths(discovery.ServiceType) + if len(responseConfigPaths) == 0 && len(fallbackConfigPaths) > 0 { + responseConfigPaths = fallbackConfigPaths + } + if len(responseDataPaths) == 0 && len(fallbackDataPaths) > 0 { + responseDataPaths = fallbackDataPaths + } + if len(fallbackLogPaths) > 0 { + responseLogPaths = fallbackLogPaths + } + } + + // Return the discovery information + response := map[string]interface{}{ + "found": true, + "id": discovery.ID, + "resource_type": discovery.ResourceType, + "resource_id": discovery.ResourceID, + "host_id": discovery.HostID, + "hostname": discovery.Hostname, + "service_type": discovery.ServiceType, + "service_name": discovery.ServiceName, + "service_version": discovery.ServiceVersion, + "category": discovery.Category, + "cli_access": responseCLIAccess, + "config_paths": responseConfigPaths, + "data_paths": responseDataPaths, + "confidence": discovery.Confidence, + "discovered_at": discovery.DiscoveredAt, + "updated_at": discovery.UpdatedAt, + } + + // Add log paths if we have them (from fallback or discovery) + if len(responseLogPaths) > 0 { + response["log_paths"] = responseLogPaths + } + + // Add facts if present + if len(discovery.Facts) > 0 { + facts := make([]map[string]string, 0, len(discovery.Facts)) + for _, f := range discovery.Facts { + facts = append(facts, map[string]string{ + "category": f.Category, + "key": f.Key, + "value": f.Value, + }) + } + response["facts"] = facts + } + + // Add user notes if present + if discovery.UserNotes != "" { + response["user_notes"] = discovery.UserNotes + } + + // Add AI reasoning for context + if discovery.AIReasoning != "" { + response["ai_reasoning"] = discovery.AIReasoning + } + + // Add listening ports if present + if len(discovery.Ports) > 0 { + ports := make([]map[string]interface{}, 0, len(discovery.Ports)) + for _, p := range discovery.Ports { + port := map[string]interface{}{ + "port": p.Port, + "protocol": p.Protocol, + } + if p.Process != "" { + port["process"] = p.Process + } + if p.Address != "" { + port["address"] = p.Address + } + ports = append(ports, port) + } + response["ports"] = ports + } + + return NewJSONResult(response), nil +} + +func (e *PulseToolExecutor) executeListDiscoveries(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + if e.discoveryProvider == nil { + return NewTextResult("Discovery service not available."), nil + } + + filterType, _ := args["type"].(string) + filterHost, _ := args["host"].(string) + filterServiceType, _ := args["service_type"].(string) + limit := intArg(args, "limit", 50) + + var discoveries []*ResourceDiscoveryInfo + var err error + + // Get discoveries based on filters + if filterType != "" { + discoveries, err = e.discoveryProvider.ListDiscoveriesByType(filterType) + } else if filterHost != "" { + discoveries, err = e.discoveryProvider.ListDiscoveriesByHost(filterHost) + } else { + discoveries, err = e.discoveryProvider.ListDiscoveries() + } + + if err != nil { + return NewErrorResult(fmt.Errorf("failed to list discoveries: %w", err)), nil + } + + // Filter by service type if specified + if filterServiceType != "" { + filtered := make([]*ResourceDiscoveryInfo, 0) + filterLower := strings.ToLower(filterServiceType) + for _, d := range discoveries { + if strings.Contains(strings.ToLower(d.ServiceType), filterLower) || + strings.Contains(strings.ToLower(d.ServiceName), filterLower) { + filtered = append(filtered, d) + } + } + discoveries = filtered + } + + // Apply limit + if len(discoveries) > limit { + discoveries = discoveries[:limit] + } + + // Build response + results := make([]map[string]interface{}, 0, len(discoveries)) + for _, d := range discoveries { + result := map[string]interface{}{ + "id": d.ID, + "resource_type": d.ResourceType, + "resource_id": d.ResourceID, + "host_id": d.HostID, + "hostname": d.Hostname, + "service_type": d.ServiceType, + "service_name": d.ServiceName, + "service_version": d.ServiceVersion, + "category": d.Category, + "cli_access": d.CLIAccess, + "confidence": d.Confidence, + "updated_at": d.UpdatedAt, + } + + // Add key facts count + if len(d.Facts) > 0 { + result["facts_count"] = len(d.Facts) + } + + // Add ports count + if len(d.Ports) > 0 { + result["ports_count"] = len(d.Ports) + } + + results = append(results, result) + } + + response := map[string]interface{}{ + "discoveries": results, + "total": len(results), + } + + if filterType != "" { + response["filter_type"] = filterType + } + if filterHost != "" { + response["filter_host"] = filterHost + } + if filterServiceType != "" { + response["filter_service_type"] = filterServiceType + } + + return NewJSONResult(response), nil +} diff --git a/internal/ai/tools/tools_docker.go b/internal/ai/tools/tools_docker.go index 6b6f6fb6b..2aa71f823 100644 --- a/internal/ai/tools/tools_docker.go +++ b/internal/ai/tools/tools_docker.go @@ -2,13 +2,14 @@ package tools import ( "context" + "encoding/json" "fmt" "github.com/rcourtman/pulse-go-rewrite/internal/agentexec" "github.com/rs/zerolog/log" ) -// registerDockerTools registers the consolidated pulse_docker tool +// registerDockerTools registers the pulse_docker tool func (e *PulseToolExecutor) registerDockerTools() { e.registry.Register(RegisteredTool{ Definition: Tool{ @@ -78,22 +79,16 @@ func (e *PulseToolExecutor) executeDocker(ctx context.Context, args map[string]i case "control": return e.executeDockerControl(ctx, args) case "updates": - // Uses existing function from tools_infrastructure.go return e.executeListDockerUpdates(ctx, args) case "check_updates": - // Uses existing function from tools_infrastructure.go return e.executeCheckDockerUpdates(ctx, args) case "update": - // Uses existing function from tools_infrastructure.go return e.executeUpdateDockerContainer(ctx, args) case "services": - // Uses existing function from tools_infrastructure.go return e.executeListDockerServices(ctx, args) case "tasks": - // Uses existing function from tools_infrastructure.go return e.executeListDockerTasks(ctx, args) case "swarm": - // Uses existing function from tools_infrastructure.go return e.executeGetSwarmStatus(ctx, args) default: return NewErrorResult(fmt.Errorf("unknown action: %s. Use: control, updates, check_updates, update, services, tasks, swarm", action)), nil @@ -101,7 +96,6 @@ func (e *PulseToolExecutor) executeDocker(ctx context.Context, args map[string]i } // executeDockerControl handles start/stop/restart of Docker containers -// This is a new consolidated handler that merges pulse_control_docker functionality func (e *PulseToolExecutor) executeDockerControl(ctx context.Context, args map[string]interface{}) (CallToolResult, error) { containerName, _ := args["container"].(string) hostName, _ := args["host"].(string) @@ -199,3 +193,341 @@ func (e *PulseToolExecutor) executeDockerControl(ctx context.Context, args map[s return NewTextResult(fmt.Sprintf("Command failed (exit code %d):\n%s", result.ExitCode, output)), nil } + +// ========== Docker Updates Handler Implementations ========== + +func (e *PulseToolExecutor) executeListDockerUpdates(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + if e.updatesProvider == nil { + return NewTextResult("Docker update information not available. Ensure updates provider is configured."), nil + } + + hostFilter, _ := args["host"].(string) + + // Resolve host name to ID if needed + hostID := e.resolveDockerHostID(hostFilter) + + updates := e.updatesProvider.GetPendingUpdates(hostID) + + // Ensure non-nil slice + if updates == nil { + updates = []ContainerUpdateInfo{} + } + + response := DockerUpdatesResponse{ + Updates: updates, + Total: len(updates), + HostID: hostID, + } + + return NewJSONResult(response), nil +} + +func (e *PulseToolExecutor) executeCheckDockerUpdates(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + if e.updatesProvider == nil { + return NewTextResult("Docker update checking not available. Ensure updates provider is configured."), nil + } + + hostArg, _ := args["host"].(string) + if hostArg == "" { + return NewErrorResult(fmt.Errorf("host is required")), nil + } + + // Resolve host name to ID + hostID := e.resolveDockerHostID(hostArg) + if hostID == "" { + return NewTextResult(fmt.Sprintf("Docker host '%s' not found.", hostArg)), nil + } + + hostName := e.getDockerHostName(hostID) + + // Trigger the update check + cmdStatus, err := e.updatesProvider.TriggerUpdateCheck(hostID) + if err != nil { + return NewTextResult(fmt.Sprintf("Failed to trigger update check: %v", err)), nil + } + + response := DockerCheckUpdatesResponse{ + Success: true, + HostID: hostID, + HostName: hostName, + CommandID: cmdStatus.ID, + Message: "Update check command queued. Results will be available after the next agent report cycle (~30 seconds).", + Command: cmdStatus, + } + + return NewJSONResult(response), nil +} + +func (e *PulseToolExecutor) executeUpdateDockerContainer(ctx context.Context, args map[string]interface{}) (CallToolResult, error) { + if e.updatesProvider == nil { + return NewTextResult("Docker update functionality not available. Ensure updates provider is configured."), nil + } + + containerArg, _ := args["container"].(string) + hostArg, _ := args["host"].(string) + + if containerArg == "" { + return NewErrorResult(fmt.Errorf("container is required")), nil + } + if hostArg == "" { + return NewErrorResult(fmt.Errorf("host is required")), nil + } + + // Check if update actions are enabled + if !e.updatesProvider.IsUpdateActionsEnabled() { + return NewTextResult("Docker container updates are disabled by server configuration. Set PULSE_DISABLE_DOCKER_UPDATE_ACTIONS=false or enable in Settings to allow updates."), nil + } + + // Resolve container and host + container, dockerHost, err := e.resolveDockerContainer(containerArg, hostArg) + if err != nil { + return NewTextResult(fmt.Sprintf("Could not find container '%s' on host '%s': %v", containerArg, hostArg, err)), nil + } + + containerName := trimContainerName(container.Name) + + // Controlled mode - require approval + if e.controlLevel == ControlLevelControlled { + command := fmt.Sprintf("docker update %s", containerName) + agentHostname := e.getAgentHostnameForDockerHost(dockerHost) + approvalID := createApprovalRecord(command, "docker", container.ID, agentHostname, fmt.Sprintf("Update container %s to latest image", containerName)) + return NewTextResult(formatDockerUpdateApprovalNeeded(containerName, dockerHost.Hostname, approvalID)), nil + } + + // Autonomous mode - execute directly + cmdStatus, err := e.updatesProvider.UpdateContainer(dockerHost.ID, container.ID, containerName) + if err != nil { + return NewTextResult(fmt.Sprintf("Failed to queue update command: %v", err)), nil + } + + response := DockerUpdateContainerResponse{ + Success: true, + HostID: dockerHost.ID, + ContainerID: container.ID, + ContainerName: containerName, + CommandID: cmdStatus.ID, + Message: fmt.Sprintf("Update command queued for container '%s'. The agent will pull the latest image and recreate the container.", containerName), + Command: cmdStatus, + } + + return NewJSONResult(response), nil +} + +// Helper methods for Docker updates + +func (e *PulseToolExecutor) resolveDockerHostID(hostArg string) string { + if hostArg == "" { + return "" + } + if e.stateProvider == nil { + return hostArg + } + + state := e.stateProvider.GetState() + for _, host := range state.DockerHosts { + if host.ID == hostArg || host.Hostname == hostArg || host.DisplayName == hostArg { + return host.ID + } + } + return hostArg // Return as-is if not found (provider will handle error) +} + +func (e *PulseToolExecutor) getDockerHostName(hostID string) string { + if e.stateProvider == nil { + return hostID + } + + state := e.stateProvider.GetState() + for _, host := range state.DockerHosts { + if host.ID == hostID { + if host.DisplayName != "" { + return host.DisplayName + } + return host.Hostname + } + } + return hostID +} + +func formatDockerUpdateApprovalNeeded(containerName, hostName, approvalID string) string { + payload := map[string]interface{}{ + "type": "approval_required", + "approval_id": approvalID, + "container_name": containerName, + "docker_host": hostName, + "action": "update", + "command": fmt.Sprintf("docker update %s (pull latest + recreate)", containerName), + "how_to_approve": "Click the approval button in the chat to execute this update.", + "do_not_retry": true, + } + b, _ := json.Marshal(payload) + return "APPROVAL_REQUIRED: " + string(b) +} + +func trimLeadingSlash(name string) string { + if len(name) > 0 && name[0] == '/' { + return name[1:] + } + return name +} + +// ========== Docker Swarm Handler Implementations ========== + +func (e *PulseToolExecutor) executeGetSwarmStatus(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + if e.stateProvider == nil { + return NewTextResult("State provider not available."), nil + } + + hostArg, _ := args["host"].(string) + if hostArg == "" { + return NewErrorResult(fmt.Errorf("host is required")), nil + } + + state := e.stateProvider.GetState() + + for _, host := range state.DockerHosts { + if host.ID == hostArg || host.Hostname == hostArg || host.DisplayName == hostArg || host.CustomDisplayName == hostArg { + if host.Swarm == nil { + return NewTextResult(fmt.Sprintf("Docker host '%s' is not part of a Swarm cluster.", host.Hostname)), nil + } + + response := SwarmStatusResponse{ + Host: host.Hostname, + Status: DockerSwarmSummary{ + NodeID: host.Swarm.NodeID, + NodeRole: host.Swarm.NodeRole, + LocalState: host.Swarm.LocalState, + ControlAvailable: host.Swarm.ControlAvailable, + ClusterID: host.Swarm.ClusterID, + ClusterName: host.Swarm.ClusterName, + Error: host.Swarm.Error, + }, + } + + return NewJSONResult(response), nil + } + } + + return NewTextResult(fmt.Sprintf("Docker host '%s' not found.", hostArg)), nil +} + +func (e *PulseToolExecutor) executeListDockerServices(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + if e.stateProvider == nil { + return NewTextResult("State provider not available."), nil + } + + hostArg, _ := args["host"].(string) + if hostArg == "" { + return NewErrorResult(fmt.Errorf("host is required")), nil + } + + stackFilter, _ := args["stack"].(string) + + state := e.stateProvider.GetState() + + for _, host := range state.DockerHosts { + if host.ID == hostArg || host.Hostname == hostArg || host.DisplayName == hostArg || host.CustomDisplayName == hostArg { + if len(host.Services) == 0 { + return NewTextResult(fmt.Sprintf("No Docker services found on host '%s'. The host may not be a Swarm manager.", host.Hostname)), nil + } + + var services []DockerServiceSummary + filteredCount := 0 + + for _, svc := range host.Services { + if stackFilter != "" && svc.Stack != stackFilter { + continue + } + + filteredCount++ + + updateStatus := "" + if svc.UpdateStatus != nil { + updateStatus = svc.UpdateStatus.State + } + + services = append(services, DockerServiceSummary{ + ID: svc.ID, + Name: svc.Name, + Stack: svc.Stack, + Image: svc.Image, + Mode: svc.Mode, + DesiredTasks: svc.DesiredTasks, + RunningTasks: svc.RunningTasks, + UpdateStatus: updateStatus, + }) + } + + if services == nil { + services = []DockerServiceSummary{} + } + + response := DockerServicesResponse{ + Host: host.Hostname, + Services: services, + Total: len(host.Services), + Filtered: filteredCount, + } + + return NewJSONResult(response), nil + } + } + + return NewTextResult(fmt.Sprintf("Docker host '%s' not found.", hostArg)), nil +} + +func (e *PulseToolExecutor) executeListDockerTasks(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + if e.stateProvider == nil { + return NewTextResult("State provider not available."), nil + } + + hostArg, _ := args["host"].(string) + if hostArg == "" { + return NewErrorResult(fmt.Errorf("host is required")), nil + } + + serviceFilter, _ := args["service"].(string) + + state := e.stateProvider.GetState() + + for _, host := range state.DockerHosts { + if host.ID == hostArg || host.Hostname == hostArg || host.DisplayName == hostArg || host.CustomDisplayName == hostArg { + if len(host.Tasks) == 0 { + return NewTextResult(fmt.Sprintf("No Docker tasks found on host '%s'. The host may not be a Swarm manager.", host.Hostname)), nil + } + + var tasks []DockerTaskSummary + + for _, task := range host.Tasks { + if serviceFilter != "" && task.ServiceID != serviceFilter && task.ServiceName != serviceFilter { + continue + } + + tasks = append(tasks, DockerTaskSummary{ + ID: task.ID, + ServiceName: task.ServiceName, + NodeName: task.NodeName, + DesiredState: task.DesiredState, + CurrentState: task.CurrentState, + Error: task.Error, + StartedAt: task.StartedAt, + }) + } + + if tasks == nil { + tasks = []DockerTaskSummary{} + } + + response := DockerTasksResponse{ + Host: host.Hostname, + Service: serviceFilter, + Tasks: tasks, + Total: len(tasks), + } + + return NewJSONResult(response), nil + } + } + + return NewTextResult(fmt.Sprintf("Docker host '%s' not found.", hostArg)), nil +} diff --git a/internal/ai/tools/tools_knowledge.go b/internal/ai/tools/tools_knowledge.go index 1aac6ec7f..896884e0d 100644 --- a/internal/ai/tools/tools_knowledge.go +++ b/internal/ai/tools/tools_knowledge.go @@ -3,9 +3,92 @@ package tools import ( "context" "fmt" + "time" ) -// registerKnowledgeTools registers the consolidated pulse_knowledge tool +// IncidentRecorderProvider provides access to incident recording data +type IncidentRecorderProvider interface { + GetWindowsForResource(resourceID string, limit int) []*IncidentWindow + GetWindow(windowID string) *IncidentWindow +} + +// IncidentWindow represents a high-frequency recording window during an incident +type IncidentWindow struct { + ID string `json:"id"` + ResourceID string `json:"resource_id"` + ResourceName string `json:"resource_name,omitempty"` + ResourceType string `json:"resource_type,omitempty"` + TriggerType string `json:"trigger_type"` + TriggerID string `json:"trigger_id,omitempty"` + StartTime time.Time `json:"start_time"` + EndTime *time.Time `json:"end_time,omitempty"` + Status string `json:"status"` + DataPoints []IncidentDataPoint `json:"data_points"` + Summary *IncidentSummary `json:"summary,omitempty"` +} + +// IncidentDataPoint represents a single data point in an incident window +type IncidentDataPoint struct { + Timestamp time.Time `json:"timestamp"` + Metrics map[string]float64 `json:"metrics"` +} + +// IncidentSummary provides computed statistics about an incident window +type IncidentSummary struct { + Duration time.Duration `json:"duration_ms"` + DataPoints int `json:"data_points"` + Peaks map[string]float64 `json:"peaks"` + Lows map[string]float64 `json:"lows"` + Averages map[string]float64 `json:"averages"` + Changes map[string]float64 `json:"changes"` +} + +// EventCorrelatorProvider provides access to correlated events +type EventCorrelatorProvider interface { + GetCorrelationsForResource(resourceID string, window time.Duration) []EventCorrelation +} + +// EventCorrelation represents a correlated event +type EventCorrelation struct { + EventType string `json:"event_type"` + Timestamp time.Time `json:"timestamp"` + ResourceID string `json:"resource_id"` + ResourceName string `json:"resource_name,omitempty"` + Description string `json:"description"` + Severity string `json:"severity,omitempty"` + Metadata map[string]interface{} `json:"metadata,omitempty"` +} + +// TopologyProvider provides access to resource relationships +type TopologyProvider interface { + GetRelatedResources(resourceID string, depth int) []RelatedResource +} + +// RelatedResource represents a resource related to another resource +type RelatedResource struct { + ResourceID string `json:"resource_id"` + ResourceName string `json:"resource_name"` + ResourceType string `json:"resource_type"` + Relationship string `json:"relationship"` +} + +// KnowledgeStoreProvider provides access to stored knowledge/notes +type KnowledgeStoreProvider interface { + SaveNote(resourceID, note, category string) error + GetKnowledge(resourceID string, category string) []KnowledgeEntry +} + +// KnowledgeEntry represents a stored note about a resource +type KnowledgeEntry struct { + ID string `json:"id"` + ResourceID string `json:"resource_id"` + Note string `json:"note"` + Category string `json:"category,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at,omitempty"` +} + +// registerKnowledgeTools registers the pulse_knowledge tool func (e *PulseToolExecutor) registerKnowledgeTools() { e.registry.Register(RegisteredTool{ Definition: Tool{ @@ -76,7 +159,6 @@ Examples: } // executeKnowledge routes to the appropriate knowledge handler based on action -// Handler functions are implemented in tools_intelligence.go func (e *PulseToolExecutor) executeKnowledge(ctx context.Context, args map[string]interface{}) (CallToolResult, error) { action, _ := args["action"].(string) switch action { @@ -94,3 +176,182 @@ func (e *PulseToolExecutor) executeKnowledge(ctx context.Context, args map[strin return NewErrorResult(fmt.Errorf("unknown action: %s. Use: remember, recall, incidents, correlate, relationships", action)), nil } } + +// Tool handler implementations + +func (e *PulseToolExecutor) executeGetIncidentWindow(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + resourceID, _ := args["resource_id"].(string) + windowID, _ := args["window_id"].(string) + limit := intArg(args, "limit", 5) + + if resourceID == "" { + return NewErrorResult(fmt.Errorf("resource_id is required")), nil + } + + if e.incidentRecorderProvider == nil { + return NewTextResult("Incident recording data not available. The incident recorder may not be enabled."), nil + } + + // If a specific window ID is requested + if windowID != "" { + window := e.incidentRecorderProvider.GetWindow(windowID) + if window == nil { + return NewTextResult(fmt.Sprintf("Incident window '%s' not found.", windowID)), nil + } + return NewJSONResult(map[string]interface{}{ + "window": window, + }), nil + } + + // Get windows for the resource + windows := e.incidentRecorderProvider.GetWindowsForResource(resourceID, limit) + if len(windows) == 0 { + return NewTextResult(fmt.Sprintf("No incident recording data found for resource '%s'. Incident data is captured when alerts fire.", resourceID)), nil + } + + return NewJSONResult(map[string]interface{}{ + "resource_id": resourceID, + "windows": windows, + "count": len(windows), + }), nil +} + +func (e *PulseToolExecutor) executeCorrelateEvents(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + resourceID, _ := args["resource_id"].(string) + timestampStr, _ := args["timestamp"].(string) + windowMinutes := intArg(args, "window_minutes", 15) + + if resourceID == "" { + return NewErrorResult(fmt.Errorf("resource_id is required")), nil + } + + if e.eventCorrelatorProvider == nil { + return NewTextResult("Event correlation not available. The event correlator may not be enabled."), nil + } + + // Parse timestamp or use now + var timestamp time.Time + if timestampStr != "" { + var err error + timestamp, err = time.Parse(time.RFC3339, timestampStr) + if err != nil { + return NewErrorResult(fmt.Errorf("invalid timestamp format: %w", err)), nil + } + } else { + timestamp = time.Now() + } + + window := time.Duration(windowMinutes) * time.Minute + correlations := e.eventCorrelatorProvider.GetCorrelationsForResource(resourceID, window) + + if len(correlations) == 0 { + return NewTextResult(fmt.Sprintf("No correlated events found for resource '%s' within %d minutes of %s.", + resourceID, windowMinutes, timestamp.Format(time.RFC3339))), nil + } + + return NewJSONResult(map[string]interface{}{ + "resource_id": resourceID, + "timestamp": timestamp.Format(time.RFC3339), + "window_minutes": windowMinutes, + "events": correlations, + "count": len(correlations), + }), nil +} + +func (e *PulseToolExecutor) executeGetRelationshipGraph(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + resourceID, _ := args["resource_id"].(string) + depth := intArg(args, "depth", 1) + + if resourceID == "" { + return NewErrorResult(fmt.Errorf("resource_id is required")), nil + } + + // Cap depth to prevent excessive traversal + if depth < 1 { + depth = 1 + } + if depth > 3 { + depth = 3 + } + + if e.topologyProvider == nil { + return NewTextResult("Topology information not available."), nil + } + + related := e.topologyProvider.GetRelatedResources(resourceID, depth) + if len(related) == 0 { + return NewTextResult(fmt.Sprintf("No relationships found for resource '%s'.", resourceID)), nil + } + + return NewJSONResult(map[string]interface{}{ + "resource_id": resourceID, + "depth": depth, + "related_resources": related, + "count": len(related), + }), nil +} + +func (e *PulseToolExecutor) executeRemember(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + resourceID, _ := args["resource_id"].(string) + note, _ := args["note"].(string) + category, _ := args["category"].(string) + + if resourceID == "" { + return NewErrorResult(fmt.Errorf("resource_id is required")), nil + } + if note == "" { + return NewErrorResult(fmt.Errorf("note is required")), nil + } + + if e.knowledgeStoreProvider == nil { + return NewTextResult("Knowledge storage not available."), nil + } + + if err := e.knowledgeStoreProvider.SaveNote(resourceID, note, category); err != nil { + return NewErrorResult(fmt.Errorf("failed to save note: %w", err)), nil + } + + response := map[string]interface{}{ + "success": true, + "resource_id": resourceID, + "note": note, + "message": "Note saved successfully", + } + if category != "" { + response["category"] = category + } + + return NewJSONResult(response), nil +} + +func (e *PulseToolExecutor) executeRecall(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + resourceID, _ := args["resource_id"].(string) + category, _ := args["category"].(string) + + if resourceID == "" { + return NewErrorResult(fmt.Errorf("resource_id is required")), nil + } + + if e.knowledgeStoreProvider == nil { + return NewTextResult("Knowledge storage not available."), nil + } + + entries := e.knowledgeStoreProvider.GetKnowledge(resourceID, category) + if len(entries) == 0 { + if category != "" { + return NewTextResult(fmt.Sprintf("No notes found for resource '%s' in category '%s'.", resourceID, category)), nil + } + return NewTextResult(fmt.Sprintf("No notes found for resource '%s'.", resourceID)), nil + } + + response := map[string]interface{}{ + "resource_id": resourceID, + "notes": entries, + "count": len(entries), + } + if category != "" { + response["category"] = category + } + + return NewJSONResult(response), nil +} diff --git a/internal/ai/tools/tools_kubernetes.go b/internal/ai/tools/tools_kubernetes.go index dbd037707..d6e28406b 100644 --- a/internal/ai/tools/tools_kubernetes.go +++ b/internal/ai/tools/tools_kubernetes.go @@ -10,7 +10,7 @@ import ( "github.com/rcourtman/pulse-go-rewrite/internal/models" ) -// registerKubernetesTools registers the consolidated pulse_kubernetes tool +// registerKubernetesTools registers the pulse_kubernetes tool func (e *PulseToolExecutor) registerKubernetesTools() { e.registry.Register(RegisteredTool{ Definition: Tool{ diff --git a/internal/ai/tools/tools_metrics.go b/internal/ai/tools/tools_metrics.go index 202d35685..b016e011f 100644 --- a/internal/ai/tools/tools_metrics.go +++ b/internal/ai/tools/tools_metrics.go @@ -2,10 +2,14 @@ package tools import ( "context" + "encoding/json" "fmt" + "sort" + "strings" + "time" ) -// registerMetricsTools registers the consolidated pulse_metrics tool +// registerMetricsTools registers the pulse_metrics tool func (e *PulseToolExecutor) registerMetricsTools() { e.registry.Register(RegisteredTool{ Definition: Tool{ @@ -86,7 +90,6 @@ Examples: } // executeMetrics routes to the appropriate metrics handler based on type -// All handler functions are implemented in tools_patrol.go and tools_infrastructure.go func (e *PulseToolExecutor) executeMetrics(ctx context.Context, args map[string]interface{}) (CallToolResult, error) { metricType, _ := args["type"].(string) switch metricType { @@ -108,3 +111,532 @@ func (e *PulseToolExecutor) executeMetrics(ctx context.Context, args map[string] return NewErrorResult(fmt.Errorf("unknown type: %s. Use: performance, temperatures, network, diskio, disks, baselines, patterns", metricType)), nil } } + +func (e *PulseToolExecutor) executeGetMetrics(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + period, _ := args["period"].(string) + resourceID, _ := args["resource_id"].(string) + resourceType, _ := args["resource_type"].(string) + limit := intArg(args, "limit", 100) + offset := intArg(args, "offset", 0) + if limit <= 0 { + limit = 100 + } + if offset < 0 { + offset = 0 + } + resourceType = strings.ToLower(strings.TrimSpace(resourceType)) + if resourceType != "" { + validTypes := map[string]bool{"vm": true, "container": true, "node": true} + if !validTypes[resourceType] { + return NewErrorResult(fmt.Errorf("invalid resource_type: %s. Use vm, container, or node", resourceType)), nil + } + } + + if e.metricsHistory == nil { + return NewTextResult("Metrics history not available. The system may still be collecting data."), nil + } + + var duration time.Duration + switch period { + case "24h": + duration = 24 * time.Hour + case "7d": + duration = 7 * 24 * time.Hour + default: + duration = 24 * time.Hour + period = "24h" + } + + response := MetricsResponse{ + Period: period, + } + + if resourceID != "" { + response.ResourceID = resourceID + metrics, err := e.metricsHistory.GetResourceMetrics(resourceID, duration) + if err != nil { + return NewErrorResult(err), nil + } + response.Points = metrics + return NewJSONResult(response), nil + } + + summary, err := e.metricsHistory.GetAllMetricsSummary(duration) + if err != nil { + return NewErrorResult(err), nil + } + + keys := make([]string, 0, len(summary)) + for id, metric := range summary { + if resourceType != "" && strings.ToLower(metric.ResourceType) != resourceType { + continue + } + keys = append(keys, id) + } + sort.Strings(keys) + + filtered := make(map[string]ResourceMetricsSummary) + total := 0 + for _, id := range keys { + if total < offset { + total++ + continue + } + if len(filtered) >= limit { + total++ + continue + } + filtered[id] = summary[id] + total++ + } + + if filtered == nil { + filtered = map[string]ResourceMetricsSummary{} + } + + response.Summary = filtered + if offset > 0 || total > limit { + response.Pagination = &PaginationInfo{ + Total: total, + Limit: limit, + Offset: offset, + } + } + + return NewJSONResult(response), nil +} + +func (e *PulseToolExecutor) executeGetBaselines(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + resourceID, _ := args["resource_id"].(string) + resourceType, _ := args["resource_type"].(string) + limit := intArg(args, "limit", 100) + offset := intArg(args, "offset", 0) + if limit <= 0 { + limit = 100 + } + if offset < 0 { + offset = 0 + } + resourceType = strings.ToLower(strings.TrimSpace(resourceType)) + if resourceType != "" { + validTypes := map[string]bool{"vm": true, "container": true, "node": true} + if !validTypes[resourceType] { + return NewErrorResult(fmt.Errorf("invalid resource_type: %s. Use vm, container, or node", resourceType)), nil + } + } + + if e.baselineProvider == nil { + return NewTextResult("Baseline data not available. The system needs time to learn normal behavior patterns."), nil + } + + response := BaselinesResponse{ + Baselines: make(map[string]map[string]*MetricBaseline), + } + + if resourceID != "" { + response.ResourceID = resourceID + cpuBaseline := e.baselineProvider.GetBaseline(resourceID, "cpu") + memBaseline := e.baselineProvider.GetBaseline(resourceID, "memory") + + if cpuBaseline != nil || memBaseline != nil { + response.Baselines[resourceID] = make(map[string]*MetricBaseline) + if cpuBaseline != nil { + response.Baselines[resourceID]["cpu"] = cpuBaseline + } + if memBaseline != nil { + response.Baselines[resourceID]["memory"] = memBaseline + } + } + return NewJSONResult(response), nil + } + + baselines := e.baselineProvider.GetAllBaselines() + keys := make([]string, 0, len(baselines)) + var typeIndex map[string]string + if resourceType != "" { + if e.stateProvider == nil { + return NewErrorResult(fmt.Errorf("state provider not available")), nil + } + state := e.stateProvider.GetState() + typeIndex = make(map[string]string) + for _, vm := range state.VMs { + typeIndex[fmt.Sprintf("%d", vm.VMID)] = "vm" + } + for _, ct := range state.Containers { + typeIndex[fmt.Sprintf("%d", ct.VMID)] = "container" + } + for _, node := range state.Nodes { + typeIndex[node.ID] = "node" + } + } + + for id := range baselines { + if resourceType != "" { + if t, ok := typeIndex[id]; !ok || t != resourceType { + continue + } + } + keys = append(keys, id) + } + sort.Strings(keys) + + filtered := make(map[string]map[string]*MetricBaseline) + total := 0 + for _, id := range keys { + if total < offset { + total++ + continue + } + if len(filtered) >= limit { + total++ + continue + } + filtered[id] = baselines[id] + total++ + } + + if filtered == nil { + filtered = map[string]map[string]*MetricBaseline{} + } + + response.Baselines = filtered + if offset > 0 || total > limit { + response.Pagination = &PaginationInfo{ + Total: total, + Limit: limit, + Offset: offset, + } + } + + return NewJSONResult(response), nil +} + +func (e *PulseToolExecutor) executeGetPatterns(_ context.Context, _ map[string]interface{}) (CallToolResult, error) { + if e.patternProvider == nil { + return NewTextResult("Pattern detection not available. The system needs more historical data."), nil + } + + response := PatternsResponse{ + Patterns: e.patternProvider.GetPatterns(), + Predictions: e.patternProvider.GetPredictions(), + } + + // Ensure non-nil slices for clean JSON + if response.Patterns == nil { + response.Patterns = []Pattern{} + } + if response.Predictions == nil { + response.Predictions = []Prediction{} + } + + return NewJSONResult(response), nil +} + +// ========== Temperature, Network, DiskIO, Physical Disks ========== + +func (e *PulseToolExecutor) executeGetTemperatures(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + hostFilter, _ := args["host"].(string) + + state := e.stateProvider.GetState() + + type HostTemps struct { + Hostname string `json:"hostname"` + Platform string `json:"platform,omitempty"` + CPU map[string]float64 `json:"cpu_temps,omitempty"` + Disks map[string]float64 `json:"disk_temps,omitempty"` + Fans map[string]float64 `json:"fan_rpm,omitempty"` + Other map[string]float64 `json:"other_temps,omitempty"` + LastUpdated string `json:"last_updated,omitempty"` + } + + var results []HostTemps + + for _, host := range state.Hosts { + if hostFilter != "" && host.Hostname != hostFilter { + continue + } + + if len(host.Sensors.TemperatureCelsius) == 0 && len(host.Sensors.FanRPM) == 0 { + continue + } + + temps := HostTemps{ + Hostname: host.Hostname, + Platform: host.Platform, + CPU: make(map[string]float64), + Disks: make(map[string]float64), + Fans: make(map[string]float64), + Other: make(map[string]float64), + } + + // Categorize temperatures + for name, value := range host.Sensors.TemperatureCelsius { + switch { + case containsAny(name, "cpu", "core", "package"): + temps.CPU[name] = value + case containsAny(name, "nvme", "ssd", "hdd", "disk"): + temps.Disks[name] = value + default: + temps.Other[name] = value + } + } + + // Add fan data + for name, value := range host.Sensors.FanRPM { + temps.Fans[name] = value + } + + // Add additional sensors to Other + for name, value := range host.Sensors.Additional { + if _, exists := temps.CPU[name]; !exists { + if _, exists := temps.Disks[name]; !exists { + temps.Other[name] = value + } + } + } + + results = append(results, temps) + } + + if len(results) == 0 { + if hostFilter != "" { + return NewTextResult(fmt.Sprintf("No temperature data available for host '%s'. The host may not have a Pulse agent installed or sensors may not be available.", hostFilter)), nil + } + return NewTextResult("No temperature data available. Ensure Pulse unified agents are installed on hosts and lm-sensors is available."), nil + } + + output, _ := json.MarshalIndent(results, "", " ") + return NewTextResult(string(output)), nil +} + +func (e *PulseToolExecutor) executeGetNetworkStats(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + if e.stateProvider == nil { + return NewTextResult("State provider not available."), nil + } + + hostFilter, _ := args["host"].(string) + + state := e.stateProvider.GetState() + + var hosts []HostNetworkStatsSummary + + for _, host := range state.Hosts { + if hostFilter != "" && host.Hostname != hostFilter { + continue + } + + if len(host.NetworkInterfaces) == 0 { + continue + } + + var interfaces []NetworkInterfaceSummary + for _, iface := range host.NetworkInterfaces { + interfaces = append(interfaces, NetworkInterfaceSummary{ + Name: iface.Name, + MAC: iface.MAC, + Addresses: iface.Addresses, + RXBytes: iface.RXBytes, + TXBytes: iface.TXBytes, + SpeedMbps: iface.SpeedMbps, + }) + } + + hosts = append(hosts, HostNetworkStatsSummary{ + Hostname: host.Hostname, + Interfaces: interfaces, + }) + } + + // Also check Docker hosts for network stats + for _, dockerHost := range state.DockerHosts { + if hostFilter != "" && dockerHost.Hostname != hostFilter { + continue + } + + if len(dockerHost.NetworkInterfaces) == 0 { + continue + } + + // Check if we already have this host + found := false + for _, h := range hosts { + if h.Hostname == dockerHost.Hostname { + found = true + break + } + } + if found { + continue + } + + var interfaces []NetworkInterfaceSummary + for _, iface := range dockerHost.NetworkInterfaces { + interfaces = append(interfaces, NetworkInterfaceSummary{ + Name: iface.Name, + MAC: iface.MAC, + Addresses: iface.Addresses, + RXBytes: iface.RXBytes, + TXBytes: iface.TXBytes, + SpeedMbps: iface.SpeedMbps, + }) + } + + hosts = append(hosts, HostNetworkStatsSummary{ + Hostname: dockerHost.Hostname, + Interfaces: interfaces, + }) + } + + if len(hosts) == 0 { + if hostFilter != "" { + return NewTextResult(fmt.Sprintf("No network statistics available for host '%s'.", hostFilter)), nil + } + return NewTextResult("No network statistics available. Ensure Pulse agents are reporting network data."), nil + } + + response := NetworkStatsResponse{ + Hosts: hosts, + Total: len(hosts), + } + + return NewJSONResult(response), nil +} + +func (e *PulseToolExecutor) executeGetDiskIOStats(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + if e.stateProvider == nil { + return NewTextResult("State provider not available."), nil + } + + hostFilter, _ := args["host"].(string) + + state := e.stateProvider.GetState() + + var hosts []HostDiskIOStatsSummary + + for _, host := range state.Hosts { + if hostFilter != "" && host.Hostname != hostFilter { + continue + } + + if len(host.DiskIO) == 0 { + continue + } + + var devices []DiskIODeviceSummary + for _, dio := range host.DiskIO { + devices = append(devices, DiskIODeviceSummary{ + Device: dio.Device, + ReadBytes: dio.ReadBytes, + WriteBytes: dio.WriteBytes, + ReadOps: dio.ReadOps, + WriteOps: dio.WriteOps, + IOTimeMs: dio.IOTime, + }) + } + + hosts = append(hosts, HostDiskIOStatsSummary{ + Hostname: host.Hostname, + Devices: devices, + }) + } + + if len(hosts) == 0 { + if hostFilter != "" { + return NewTextResult(fmt.Sprintf("No disk I/O statistics available for host '%s'.", hostFilter)), nil + } + return NewTextResult("No disk I/O statistics available. Ensure Pulse agents are reporting disk I/O data."), nil + } + + response := DiskIOStatsResponse{ + Hosts: hosts, + Total: len(hosts), + } + + return NewJSONResult(response), nil +} + +func (e *PulseToolExecutor) executeListPhysicalDisks(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + if e.stateProvider == nil { + return NewTextResult("State provider not available."), nil + } + + instanceFilter, _ := args["instance"].(string) + nodeFilter, _ := args["node"].(string) + healthFilter, _ := args["health"].(string) + typeFilter, _ := args["type"].(string) + limit := intArg(args, "limit", 100) + + state := e.stateProvider.GetState() + + if len(state.PhysicalDisks) == 0 { + return NewTextResult("No physical disk data available. Physical disk information is collected from Proxmox nodes."), nil + } + + var disks []PhysicalDiskSummary + totalCount := 0 + + for _, disk := range state.PhysicalDisks { + // Apply filters + if instanceFilter != "" && disk.Instance != instanceFilter { + continue + } + if nodeFilter != "" && disk.Node != nodeFilter { + continue + } + if healthFilter != "" && !strings.EqualFold(disk.Health, healthFilter) { + continue + } + if typeFilter != "" && !strings.EqualFold(disk.Type, typeFilter) { + continue + } + + totalCount++ + + if len(disks) >= limit { + continue + } + + summary := PhysicalDiskSummary{ + ID: disk.ID, + Node: disk.Node, + Instance: disk.Instance, + DevPath: disk.DevPath, + Model: disk.Model, + Serial: disk.Serial, + WWN: disk.WWN, + Type: disk.Type, + SizeBytes: disk.Size, + Health: disk.Health, + Used: disk.Used, + LastChecked: disk.LastChecked, + } + + // Only include optional fields if they have meaningful values + if disk.Wearout >= 0 { + wearout := disk.Wearout + summary.Wearout = &wearout + } + if disk.Temperature > 0 { + temp := disk.Temperature + summary.Temperature = &temp + } + if disk.RPM > 0 { + rpm := disk.RPM + summary.RPM = &rpm + } + + disks = append(disks, summary) + } + + if disks == nil { + disks = []PhysicalDiskSummary{} + } + + response := PhysicalDisksResponse{ + Disks: disks, + Total: len(state.PhysicalDisks), + Filtered: totalCount, + } + + return NewJSONResult(response), nil +} diff --git a/internal/ai/tools/tools_pmg.go b/internal/ai/tools/tools_pmg.go index 6b9fd0d39..e3e422cb2 100644 --- a/internal/ai/tools/tools_pmg.go +++ b/internal/ai/tools/tools_pmg.go @@ -5,8 +5,8 @@ import ( "fmt" ) -// registerPMGToolsConsolidated registers the consolidated pulse_pmg tool -func (e *PulseToolExecutor) registerPMGToolsConsolidated() { +// registerPMGTools registers the pulse_pmg tool +func (e *PulseToolExecutor) registerPMGTools() { e.registry.Register(RegisteredTool{ Definition: Tool{ Name: "pulse_pmg", @@ -62,3 +62,231 @@ func (e *PulseToolExecutor) executePMG(ctx context.Context, args map[string]inte return NewErrorResult(fmt.Errorf("unknown type: %s. Use: status, mail_stats, queues, spam", pmgType)), nil } } + +func (e *PulseToolExecutor) executeGetPMGStatus(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + if e.stateProvider == nil { + return NewTextResult("State provider not available."), nil + } + + instanceFilter, _ := args["instance"].(string) + + state := e.stateProvider.GetState() + + if len(state.PMGInstances) == 0 { + return NewTextResult("No Proxmox Mail Gateway instances found. PMG monitoring may not be configured."), nil + } + + var instances []PMGInstanceSummary + for _, pmg := range state.PMGInstances { + if instanceFilter != "" && pmg.ID != instanceFilter && pmg.Name != instanceFilter { + continue + } + + var nodes []PMGNodeSummary + for _, node := range pmg.Nodes { + nodes = append(nodes, PMGNodeSummary{ + Name: node.Name, + Status: node.Status, + Role: node.Role, + Uptime: node.Uptime, + LoadAvg: node.LoadAvg, + }) + } + + instances = append(instances, PMGInstanceSummary{ + ID: pmg.ID, + Name: pmg.Name, + Host: pmg.Host, + Status: pmg.Status, + Version: pmg.Version, + Nodes: nodes, + }) + } + + if len(instances) == 0 && instanceFilter != "" { + return NewTextResult(fmt.Sprintf("PMG instance '%s' not found.", instanceFilter)), nil + } + + // Ensure non-nil slices + for i := range instances { + if instances[i].Nodes == nil { + instances[i].Nodes = []PMGNodeSummary{} + } + } + + response := PMGStatusResponse{ + Instances: instances, + Total: len(instances), + } + + return NewJSONResult(response), nil +} + +func (e *PulseToolExecutor) executeGetMailStats(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + if e.stateProvider == nil { + return NewTextResult("State provider not available."), nil + } + + instanceFilter, _ := args["instance"].(string) + + state := e.stateProvider.GetState() + + if len(state.PMGInstances) == 0 { + return NewTextResult("No Proxmox Mail Gateway instances found. PMG monitoring may not be configured."), nil + } + + // If filtering, find that specific instance + for _, pmg := range state.PMGInstances { + if instanceFilter != "" && pmg.ID != instanceFilter && pmg.Name != instanceFilter { + continue + } + + if pmg.MailStats == nil { + if instanceFilter != "" { + return NewTextResult(fmt.Sprintf("No mail statistics available for PMG instance '%s'.", instanceFilter)), nil + } + continue + } + + response := MailStatsResponse{ + Instance: pmg.Name, + Stats: PMGMailStatsSummary{ + Timeframe: pmg.MailStats.Timeframe, + TotalIn: pmg.MailStats.CountIn, + TotalOut: pmg.MailStats.CountOut, + SpamIn: pmg.MailStats.SpamIn, + SpamOut: pmg.MailStats.SpamOut, + VirusIn: pmg.MailStats.VirusIn, + VirusOut: pmg.MailStats.VirusOut, + BouncesIn: pmg.MailStats.BouncesIn, + BouncesOut: pmg.MailStats.BouncesOut, + BytesIn: pmg.MailStats.BytesIn, + BytesOut: pmg.MailStats.BytesOut, + GreylistCount: pmg.MailStats.GreylistCount, + RBLRejects: pmg.MailStats.RBLRejects, + AverageProcessTimeMs: pmg.MailStats.AverageProcessTimeMs, + }, + } + + return NewJSONResult(response), nil + } + + if instanceFilter != "" { + return NewTextResult(fmt.Sprintf("PMG instance '%s' not found.", instanceFilter)), nil + } + + return NewTextResult("No mail statistics available from any PMG instance."), nil +} + +func (e *PulseToolExecutor) executeGetMailQueues(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + if e.stateProvider == nil { + return NewTextResult("State provider not available."), nil + } + + instanceFilter, _ := args["instance"].(string) + + state := e.stateProvider.GetState() + + if len(state.PMGInstances) == 0 { + return NewTextResult("No Proxmox Mail Gateway instances found. PMG monitoring may not be configured."), nil + } + + // Collect queue data from all instances (or filtered instance) + for _, pmg := range state.PMGInstances { + if instanceFilter != "" && pmg.ID != instanceFilter && pmg.Name != instanceFilter { + continue + } + + var queues []PMGQueueSummary + for _, node := range pmg.Nodes { + if node.QueueStatus != nil { + queues = append(queues, PMGQueueSummary{ + Node: node.Name, + Active: node.QueueStatus.Active, + Deferred: node.QueueStatus.Deferred, + Hold: node.QueueStatus.Hold, + Incoming: node.QueueStatus.Incoming, + Total: node.QueueStatus.Total, + OldestAgeSeconds: node.QueueStatus.OldestAge, + }) + } + } + + if len(queues) == 0 { + if instanceFilter != "" { + return NewTextResult(fmt.Sprintf("No queue data available for PMG instance '%s'.", instanceFilter)), nil + } + continue + } + + response := MailQueuesResponse{ + Instance: pmg.Name, + Queues: queues, + } + + return NewJSONResult(response), nil + } + + if instanceFilter != "" { + return NewTextResult(fmt.Sprintf("PMG instance '%s' not found.", instanceFilter)), nil + } + + return NewTextResult("No mail queue data available from any PMG instance."), nil +} + +func (e *PulseToolExecutor) executeGetSpamStats(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + if e.stateProvider == nil { + return NewTextResult("State provider not available."), nil + } + + instanceFilter, _ := args["instance"].(string) + + state := e.stateProvider.GetState() + + if len(state.PMGInstances) == 0 { + return NewTextResult("No Proxmox Mail Gateway instances found. PMG monitoring may not be configured."), nil + } + + for _, pmg := range state.PMGInstances { + if instanceFilter != "" && pmg.ID != instanceFilter && pmg.Name != instanceFilter { + continue + } + + quarantine := PMGQuarantineSummary{} + if pmg.Quarantine != nil { + quarantine = PMGQuarantineSummary{ + Spam: pmg.Quarantine.Spam, + Virus: pmg.Quarantine.Virus, + Attachment: pmg.Quarantine.Attachment, + Blacklisted: pmg.Quarantine.Blacklisted, + Total: pmg.Quarantine.Spam + pmg.Quarantine.Virus + pmg.Quarantine.Attachment + pmg.Quarantine.Blacklisted, + } + } + + var distribution []PMGSpamBucketSummary + for _, bucket := range pmg.SpamDistribution { + distribution = append(distribution, PMGSpamBucketSummary{ + Score: bucket.Score, + Count: bucket.Count, + }) + } + + response := SpamStatsResponse{ + Instance: pmg.Name, + Quarantine: quarantine, + Distribution: distribution, + } + + if response.Distribution == nil { + response.Distribution = []PMGSpamBucketSummary{} + } + + return NewJSONResult(response), nil + } + + if instanceFilter != "" { + return NewTextResult(fmt.Sprintf("PMG instance '%s' not found.", instanceFilter)), nil + } + + return NewTextResult("No spam statistics available from any PMG instance."), nil +} diff --git a/internal/ai/tools/tools_query.go b/internal/ai/tools/tools_query.go index 314cbd056..42fa328be 100644 --- a/internal/ai/tools/tools_query.go +++ b/internal/ai/tools/tools_query.go @@ -1017,6 +1017,8 @@ func isReadOnlyByConstruction(cmdLower string) bool { "wget -q", "wget --spider", "docker ps", "docker logs", "docker inspect", "docker stats", "docker images", "docker info", "systemctl status", "systemctl is-active", "systemctl is-enabled", "systemctl list", "systemctl show", + "pct list", "pct status", + "qm list", "qm status", "ip addr", "ip route", "ip link", // Kubectl read-only commands "kubectl get", "kubectl describe", "kubectl logs", "kubectl top", "kubectl cluster-info", @@ -1096,13 +1098,15 @@ func matchesWritePatterns(cmdLower string) string { "echo ": "output (may redirect)", "printf ": "output (may redirect)", "wget -O": "file download", "wget --output": "file download", "tar -x": "archive extraction", "tar x": "archive extraction", "unzip ": "archive extraction", "gunzip ": "archive extraction", - "ln ": "link creation", "link ": "link creation", } for pattern, reason := range mediumRiskPatterns { if strings.Contains(cmdLower, pattern) { return reason } } + if containsCommandToken(cmdLower, "ln") || containsCommandToken(cmdLower, "link") { + return "link creation" + } // Curl with mutation verbs if strings.Contains(cmdLower, "curl") { @@ -1117,6 +1121,21 @@ func matchesWritePatterns(cmdLower string) string { return "" } +func containsCommandToken(cmdLower, token string) bool { + for _, field := range strings.Fields(cmdLower) { + if field == token { + return true + } + if strings.HasPrefix(field, "/") || strings.HasPrefix(field, "./") || + strings.HasPrefix(field, "../") || strings.HasPrefix(field, "~/") { + if strings.HasSuffix(field, "/"+token) { + return true + } + } + } + return false +} + // hasShellChainingOutsideQuotes checks if a command contains shell chaining operators // (;, &&, ||) outside of quoted strings. This allows SQL statements like "SELECT 1;" // while still catching shell command chaining like "ls; rm -rf /". @@ -1801,7 +1820,7 @@ func logRoutingMismatchDebug(targetHost string, childKinds, childIDs []string) { Msg("[RoutingValidation] Blocked operation targeting parent node when child recently referenced") } -// registerQueryTools registers the consolidated pulse_query tool +// registerQueryTools registers the pulse_query tool func (e *PulseToolExecutor) registerQueryTools() { e.registry.Register(RegisteredTool{ Definition: Tool{ @@ -3143,6 +3162,43 @@ func (e *PulseToolExecutor) executeSearchResources(_ context.Context, args map[s return NewJSONResult(response), nil } +func (e *PulseToolExecutor) executeGetConnectionHealth(_ context.Context, _ map[string]interface{}) (CallToolResult, error) { + if e.stateProvider == nil { + return NewTextResult("State provider not available."), nil + } + + state := e.stateProvider.GetState() + + if len(state.ConnectionHealth) == 0 { + return NewTextResult("No connection health data available."), nil + } + + var connections []ConnectionStatus + connected := 0 + disconnected := 0 + + for instanceID, isConnected := range state.ConnectionHealth { + connections = append(connections, ConnectionStatus{ + InstanceID: instanceID, + Connected: isConnected, + }) + if isConnected { + connected++ + } else { + disconnected++ + } + } + + response := ConnectionHealthResponse{ + Connections: connections, + Total: len(connections), + Connected: connected, + Disconnected: disconnected, + } + + return NewJSONResult(response), nil +} + // Helper to get int args with default func intArg(args map[string]interface{}, key string, defaultVal int) int { if v, ok := args[key]; ok { diff --git a/internal/ai/tools/tools_storage.go b/internal/ai/tools/tools_storage.go index b89935122..4603718bc 100644 --- a/internal/ai/tools/tools_storage.go +++ b/internal/ai/tools/tools_storage.go @@ -2,10 +2,12 @@ package tools import ( "context" + "encoding/json" "fmt" + "strings" ) -// registerStorageTools registers the consolidated pulse_storage tool +// registerStorageTools registers the pulse_storage tool func (e *PulseToolExecutor) registerStorageTools() { e.registry.Register(RegisteredTool{ Definition: Tool{ @@ -112,7 +114,6 @@ Examples: } // executeStorage routes to the appropriate storage handler based on type -// All handler functions are implemented in tools_infrastructure.go func (e *PulseToolExecutor) executeStorage(ctx context.Context, args map[string]interface{}) (CallToolResult, error) { storageType, _ := args["type"].(string) switch storageType { @@ -144,3 +145,1100 @@ func (e *PulseToolExecutor) executeStorage(ctx context.Context, args map[string] return NewErrorResult(fmt.Errorf("unknown type: %s. Use: pools, config, backups, backup_tasks, snapshots, ceph, ceph_details, replication, pbs_jobs, raid, disk_health, resource_disks", storageType)), nil } } + +func (e *PulseToolExecutor) executeListBackups(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + resourceID, _ := args["resource_id"].(string) + limit := intArg(args, "limit", 100) + offset := intArg(args, "offset", 0) + + if e.backupProvider == nil { + return NewTextResult("Backup information not available."), nil + } + + backups := e.backupProvider.GetBackups() + pbsInstances := e.backupProvider.GetPBSInstances() + + response := BackupsResponse{} + + // PBS Backups + count := 0 + for _, b := range backups.PBS { + if resourceID != "" && b.VMID != resourceID { + continue + } + if count < offset { + count++ + continue + } + if len(response.PBS) >= limit { + break + } + response.PBS = append(response.PBS, PBSBackupSummary{ + VMID: b.VMID, + BackupType: b.BackupType, + BackupTime: b.BackupTime, + Instance: b.Instance, + Datastore: b.Datastore, + SizeGB: float64(b.Size) / (1024 * 1024 * 1024), + Verified: b.Verified, + Protected: b.Protected, + }) + count++ + } + + // PVE Backups + count = 0 + for _, b := range backups.PVE.StorageBackups { + if resourceID != "" && string(rune(b.VMID)) != resourceID { + continue + } + if count < offset { + count++ + continue + } + if len(response.PVE) >= limit { + break + } + response.PVE = append(response.PVE, PVEBackupSummary{ + VMID: b.VMID, + BackupTime: b.Time, + SizeGB: float64(b.Size) / (1024 * 1024 * 1024), + Storage: b.Storage, + }) + count++ + } + + // PBS Servers + for _, pbs := range pbsInstances { + server := PBSServerSummary{ + Name: pbs.Name, + Host: pbs.Host, + Status: pbs.Status, + } + for _, ds := range pbs.Datastores { + server.Datastores = append(server.Datastores, DatastoreSummary{ + Name: ds.Name, + UsagePercent: ds.Usage * 100, + FreeGB: float64(ds.Free) / (1024 * 1024 * 1024), + }) + } + response.PBSServers = append(response.PBSServers, server) + } + + // Recent tasks + for _, t := range backups.PVE.BackupTasks { + if len(response.RecentTasks) >= 20 { + break + } + response.RecentTasks = append(response.RecentTasks, BackupTaskSummary{ + VMID: t.VMID, + Node: t.Node, + Status: t.Status, + StartTime: t.StartTime, + }) + } + + // Ensure non-nil slices + if response.PBS == nil { + response.PBS = []PBSBackupSummary{} + } + if response.PVE == nil { + response.PVE = []PVEBackupSummary{} + } + if response.PBSServers == nil { + response.PBSServers = []PBSServerSummary{} + } + if response.RecentTasks == nil { + response.RecentTasks = []BackupTaskSummary{} + } + + return NewJSONResult(response), nil +} + +func (e *PulseToolExecutor) executeListStorage(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + storageID, _ := args["storage_id"].(string) + limit := intArg(args, "limit", 100) + offset := intArg(args, "offset", 0) + + if e.storageProvider == nil { + return NewTextResult("Storage information not available."), nil + } + + storage := e.storageProvider.GetStorage() + cephClusters := e.storageProvider.GetCephClusters() + + response := StorageResponse{} + + // Storage pools + count := 0 + for _, s := range storage { + if storageID != "" && s.ID != storageID && s.Name != storageID { + continue + } + if count < offset { + count++ + continue + } + if len(response.Pools) >= limit { + break + } + + pool := StoragePoolSummary{ + ID: s.ID, + Name: s.Name, + Node: s.Node, + Instance: s.Instance, + Nodes: s.Nodes, + Type: s.Type, + Status: s.Status, + Enabled: s.Enabled, + Active: s.Active, + Path: s.Path, + UsagePercent: s.Usage * 100, + UsedGB: float64(s.Used) / (1024 * 1024 * 1024), + TotalGB: float64(s.Total) / (1024 * 1024 * 1024), + FreeGB: float64(s.Free) / (1024 * 1024 * 1024), + Content: s.Content, + Shared: s.Shared, + } + + if s.ZFSPool != nil { + pool.ZFS = &ZFSPoolSummary{ + Name: s.ZFSPool.Name, + State: s.ZFSPool.State, + ReadErrors: s.ZFSPool.ReadErrors, + WriteErrors: s.ZFSPool.WriteErrors, + ChecksumErrors: s.ZFSPool.ChecksumErrors, + Scan: s.ZFSPool.Scan, + } + } + + response.Pools = append(response.Pools, pool) + count++ + } + + // Ceph clusters + for _, c := range cephClusters { + response.CephClusters = append(response.CephClusters, CephClusterSummary{ + Name: c.Name, + Health: c.Health, + HealthMessage: c.HealthMessage, + UsagePercent: c.UsagePercent, + UsedTB: float64(c.UsedBytes) / (1024 * 1024 * 1024 * 1024), + TotalTB: float64(c.TotalBytes) / (1024 * 1024 * 1024 * 1024), + NumOSDs: c.NumOSDs, + NumOSDsUp: c.NumOSDsUp, + NumOSDsIn: c.NumOSDsIn, + NumMons: c.NumMons, + NumMgrs: c.NumMgrs, + }) + } + + // Ensure non-nil slices + if response.Pools == nil { + response.Pools = []StoragePoolSummary{} + } + if response.CephClusters == nil { + response.CephClusters = []CephClusterSummary{} + } + + return NewJSONResult(response), nil +} + +func (e *PulseToolExecutor) executeGetStorageConfig(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + storageID, _ := args["storage_id"].(string) + instance, _ := args["instance"].(string) + node, _ := args["node"].(string) + + storageID = strings.TrimSpace(storageID) + instance = strings.TrimSpace(instance) + node = strings.TrimSpace(node) + + if e.storageConfigProvider == nil { + return NewTextResult("Storage configuration not available."), nil + } + + configs, err := e.storageConfigProvider.GetStorageConfig(instance) + if err != nil { + return NewErrorResult(err), nil + } + + response := StorageConfigResponse{} + for _, cfg := range configs { + if storageID != "" && !strings.EqualFold(cfg.ID, storageID) && !strings.EqualFold(cfg.Name, storageID) { + continue + } + if instance != "" && !strings.EqualFold(cfg.Instance, instance) { + continue + } + if node != "" && !storageConfigHasNode(cfg.Nodes, node) { + continue + } + response.Storages = append(response.Storages, cfg) + } + + if response.Storages == nil { + response.Storages = []StorageConfigSummary{} + } + + return NewJSONResult(response), nil +} + +func storageConfigHasNode(nodes []string, node string) bool { + for _, n := range nodes { + if strings.EqualFold(strings.TrimSpace(n), node) { + return true + } + } + return false +} + +func (e *PulseToolExecutor) executeGetDiskHealth(_ context.Context, _ map[string]interface{}) (CallToolResult, error) { + if e.diskHealthProvider == nil && e.storageProvider == nil { + return NewTextResult("Disk health information not available."), nil + } + + response := DiskHealthResponse{ + Hosts: []HostDiskHealth{}, + } + + // SMART and RAID data from host agents + if e.diskHealthProvider != nil { + hosts := e.diskHealthProvider.GetHosts() + for _, host := range hosts { + hostHealth := HostDiskHealth{ + Hostname: host.Hostname, + } + + // SMART data + for _, disk := range host.Sensors.SMART { + hostHealth.SMART = append(hostHealth.SMART, SMARTDiskSummary{ + Device: disk.Device, + Model: disk.Model, + Health: disk.Health, + Temperature: disk.Temperature, + }) + } + + // RAID arrays + for _, raid := range host.RAID { + hostHealth.RAID = append(hostHealth.RAID, RAIDArraySummary{ + Device: raid.Device, + Level: raid.Level, + State: raid.State, + ActiveDevices: raid.ActiveDevices, + WorkingDevices: raid.WorkingDevices, + FailedDevices: raid.FailedDevices, + SpareDevices: raid.SpareDevices, + RebuildPercent: raid.RebuildPercent, + }) + } + + // Ceph from agent + if host.Ceph != nil { + hostHealth.Ceph = &CephStatusSummary{ + Health: host.Ceph.Health.Status, + NumOSDs: host.Ceph.OSDMap.NumOSDs, + NumOSDsUp: host.Ceph.OSDMap.NumUp, + NumOSDsIn: host.Ceph.OSDMap.NumIn, + NumPGs: host.Ceph.PGMap.NumPGs, + UsagePercent: host.Ceph.PGMap.UsagePercent, + } + } + + // Only add if there's data + if len(hostHealth.SMART) > 0 || len(hostHealth.RAID) > 0 || hostHealth.Ceph != nil { + // Ensure non-nil slices + if hostHealth.SMART == nil { + hostHealth.SMART = []SMARTDiskSummary{} + } + if hostHealth.RAID == nil { + hostHealth.RAID = []RAIDArraySummary{} + } + response.Hosts = append(response.Hosts, hostHealth) + } + } + } + + return NewJSONResult(response), nil +} + +// executeGetCephStatus returns Ceph cluster status +func (e *PulseToolExecutor) executeGetCephStatus(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + clusterFilter, _ := args["cluster"].(string) + + state := e.stateProvider.GetState() + + if len(state.CephClusters) == 0 { + return NewTextResult("No Ceph clusters found. Ceph may not be configured or data is not yet available."), nil + } + + type CephSummary struct { + Name string `json:"name"` + Health string `json:"health"` + Details map[string]interface{} `json:"details,omitempty"` + } + + var results []CephSummary + + for _, cluster := range state.CephClusters { + if clusterFilter != "" && cluster.Name != clusterFilter { + continue + } + + summary := CephSummary{ + Name: cluster.Name, + Health: cluster.Health, + Details: make(map[string]interface{}), + } + + // Add relevant details + if cluster.HealthMessage != "" { + summary.Details["health_message"] = cluster.HealthMessage + } + if cluster.NumOSDs > 0 { + summary.Details["osd_count"] = cluster.NumOSDs + summary.Details["osds_up"] = cluster.NumOSDsUp + summary.Details["osds_in"] = cluster.NumOSDsIn + summary.Details["osds_down"] = cluster.NumOSDs - cluster.NumOSDsUp + } + if cluster.NumMons > 0 { + summary.Details["monitors"] = cluster.NumMons + } + if cluster.TotalBytes > 0 { + summary.Details["total_bytes"] = cluster.TotalBytes + summary.Details["used_bytes"] = cluster.UsedBytes + summary.Details["available_bytes"] = cluster.AvailableBytes + summary.Details["usage_percent"] = cluster.UsagePercent + } + if len(cluster.Pools) > 0 { + summary.Details["pools"] = cluster.Pools + } + + results = append(results, summary) + } + + if len(results) == 0 && clusterFilter != "" { + return NewTextResult(fmt.Sprintf("Ceph cluster '%s' not found.", clusterFilter)), nil + } + + output, _ := json.MarshalIndent(results, "", " ") + return NewTextResult(string(output)), nil +} + +// executeGetReplication returns replication job status +func (e *PulseToolExecutor) executeGetReplication(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + vmFilter, _ := args["vm_id"].(string) + + state := e.stateProvider.GetState() + + if len(state.ReplicationJobs) == 0 { + return NewTextResult("No replication jobs found. Replication may not be configured."), nil + } + + type ReplicationSummary struct { + ID string `json:"id"` + GuestID int `json:"guest_id"` + GuestName string `json:"guest_name,omitempty"` + GuestType string `json:"guest_type,omitempty"` + SourceNode string `json:"source_node,omitempty"` + TargetNode string `json:"target_node"` + Schedule string `json:"schedule,omitempty"` + Status string `json:"status"` + LastSync string `json:"last_sync,omitempty"` + NextSync string `json:"next_sync,omitempty"` + LastDuration string `json:"last_duration,omitempty"` + Error string `json:"error,omitempty"` + } + + var results []ReplicationSummary + + for _, job := range state.ReplicationJobs { + if vmFilter != "" && fmt.Sprintf("%d", job.GuestID) != vmFilter { + continue + } + + summary := ReplicationSummary{ + ID: job.ID, + GuestID: job.GuestID, + GuestName: job.GuestName, + GuestType: job.GuestType, + SourceNode: job.SourceNode, + TargetNode: job.TargetNode, + Schedule: job.Schedule, + Status: job.Status, + } + + if job.LastSyncTime != nil { + summary.LastSync = job.LastSyncTime.Format("2006-01-02 15:04:05") + } + if job.NextSyncTime != nil { + summary.NextSync = job.NextSyncTime.Format("2006-01-02 15:04:05") + } + if job.LastSyncDurationHuman != "" { + summary.LastDuration = job.LastSyncDurationHuman + } + if job.Error != "" { + summary.Error = job.Error + } + + results = append(results, summary) + } + + if len(results) == 0 && vmFilter != "" { + return NewTextResult(fmt.Sprintf("No replication jobs found for VM %s.", vmFilter)), nil + } + + output, _ := json.MarshalIndent(results, "", " ") + return NewTextResult(string(output)), nil +} + +// containsAny checks if s contains any of the substrings (case-insensitive) +func containsAny(s string, substrs ...string) bool { + lower := strings.ToLower(s) + for _, sub := range substrs { + if strings.Contains(lower, strings.ToLower(sub)) { + return true + } + } + return false +} + +func (e *PulseToolExecutor) executeListSnapshots(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + if e.stateProvider == nil { + return NewTextResult("State provider not available."), nil + } + + guestIDFilter, _ := args["guest_id"].(string) + instanceFilter, _ := args["instance"].(string) + limit := intArg(args, "limit", 100) + offset := intArg(args, "offset", 0) + + state := e.stateProvider.GetState() + + // Build VM name map for enrichment + vmNames := make(map[int]string) + for _, vm := range state.VMs { + vmNames[vm.VMID] = vm.Name + } + for _, ct := range state.Containers { + vmNames[ct.VMID] = ct.Name + } + + var snapshots []SnapshotSummary + filteredCount := 0 + count := 0 + + for _, snap := range state.PVEBackups.GuestSnapshots { + // Apply filters + if guestIDFilter != "" && fmt.Sprintf("%d", snap.VMID) != guestIDFilter { + continue + } + if instanceFilter != "" && snap.Instance != instanceFilter { + continue + } + + filteredCount++ + + // Apply pagination + if count < offset { + count++ + continue + } + if len(snapshots) >= limit { + count++ + continue + } + + snapshots = append(snapshots, SnapshotSummary{ + ID: snap.ID, + VMID: snap.VMID, + VMName: vmNames[snap.VMID], + Type: snap.Type, + Node: snap.Node, + Instance: snap.Instance, + SnapshotName: snap.Name, + Description: snap.Description, + Time: snap.Time, + VMState: snap.VMState, + SizeBytes: snap.SizeBytes, + }) + count++ + } + + if snapshots == nil { + snapshots = []SnapshotSummary{} + } + + response := SnapshotsResponse{ + Snapshots: snapshots, + Total: len(state.PVEBackups.GuestSnapshots), + Filtered: filteredCount, + } + + return NewJSONResult(response), nil +} + +func (e *PulseToolExecutor) executeListPBSJobs(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + if e.backupProvider == nil { + return NewTextResult("Backup provider not available."), nil + } + + instanceFilter, _ := args["instance"].(string) + jobTypeFilter, _ := args["job_type"].(string) + + pbsInstances := e.backupProvider.GetPBSInstances() + + if len(pbsInstances) == 0 { + return NewTextResult("No PBS instances found. PBS monitoring may not be configured."), nil + } + + var jobs []PBSJobSummary + + for _, pbs := range pbsInstances { + if instanceFilter != "" && pbs.ID != instanceFilter && pbs.Name != instanceFilter { + continue + } + + // Backup jobs + if jobTypeFilter == "" || jobTypeFilter == "backup" { + for _, job := range pbs.BackupJobs { + jobs = append(jobs, PBSJobSummary{ + ID: job.ID, + Type: "backup", + Store: job.Store, + Status: job.Status, + LastRun: job.LastBackup, + NextRun: job.NextRun, + Error: job.Error, + VMID: job.VMID, + }) + } + } + + // Sync jobs + if jobTypeFilter == "" || jobTypeFilter == "sync" { + for _, job := range pbs.SyncJobs { + jobs = append(jobs, PBSJobSummary{ + ID: job.ID, + Type: "sync", + Store: job.Store, + Status: job.Status, + LastRun: job.LastSync, + NextRun: job.NextRun, + Error: job.Error, + Remote: job.Remote, + }) + } + } + + // Verify jobs + if jobTypeFilter == "" || jobTypeFilter == "verify" { + for _, job := range pbs.VerifyJobs { + jobs = append(jobs, PBSJobSummary{ + ID: job.ID, + Type: "verify", + Store: job.Store, + Status: job.Status, + LastRun: job.LastVerify, + NextRun: job.NextRun, + Error: job.Error, + }) + } + } + + // Prune jobs + if jobTypeFilter == "" || jobTypeFilter == "prune" { + for _, job := range pbs.PruneJobs { + jobs = append(jobs, PBSJobSummary{ + ID: job.ID, + Type: "prune", + Store: job.Store, + Status: job.Status, + LastRun: job.LastPrune, + NextRun: job.NextRun, + Error: job.Error, + }) + } + } + + // Garbage jobs + if jobTypeFilter == "" || jobTypeFilter == "garbage" { + for _, job := range pbs.GarbageJobs { + jobs = append(jobs, PBSJobSummary{ + ID: job.ID, + Type: "garbage", + Store: job.Store, + Status: job.Status, + LastRun: job.LastGarbage, + NextRun: job.NextRun, + Error: job.Error, + RemovedBytes: job.RemovedBytes, + }) + } + } + } + + if jobs == nil { + jobs = []PBSJobSummary{} + } + + response := PBSJobsResponse{ + Instance: instanceFilter, + Jobs: jobs, + Total: len(jobs), + } + + return NewJSONResult(response), nil +} + +func (e *PulseToolExecutor) executeListBackupTasks(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + if e.stateProvider == nil { + return NewTextResult("State provider not available."), nil + } + + instanceFilter, _ := args["instance"].(string) + guestIDFilter, _ := args["guest_id"].(string) + statusFilter, _ := args["status"].(string) + limit := intArg(args, "limit", 50) + + state := e.stateProvider.GetState() + + // Build VM name map + vmNames := make(map[int]string) + for _, vm := range state.VMs { + vmNames[vm.VMID] = vm.Name + } + for _, ct := range state.Containers { + vmNames[ct.VMID] = ct.Name + } + + var tasks []BackupTaskDetail + filteredCount := 0 + + for _, task := range state.PVEBackups.BackupTasks { + // Apply filters + if instanceFilter != "" && task.Instance != instanceFilter { + continue + } + if guestIDFilter != "" && fmt.Sprintf("%d", task.VMID) != guestIDFilter { + continue + } + if statusFilter != "" && !strings.EqualFold(task.Status, statusFilter) { + continue + } + + filteredCount++ + + if len(tasks) >= limit { + continue + } + + tasks = append(tasks, BackupTaskDetail{ + ID: task.ID, + VMID: task.VMID, + VMName: vmNames[task.VMID], + Node: task.Node, + Instance: task.Instance, + Type: task.Type, + Status: task.Status, + StartTime: task.StartTime, + EndTime: task.EndTime, + SizeBytes: task.Size, + Error: task.Error, + }) + } + + if tasks == nil { + tasks = []BackupTaskDetail{} + } + + response := BackupTasksListResponse{ + Tasks: tasks, + Total: len(state.PVEBackups.BackupTasks), + Filtered: filteredCount, + } + + return NewJSONResult(response), nil +} + +func (e *PulseToolExecutor) executeGetHostRAIDStatus(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + if e.diskHealthProvider == nil { + return NewTextResult("Disk health provider not available."), nil + } + + hostFilter, _ := args["host"].(string) + stateFilter, _ := args["state"].(string) + + hosts := e.diskHealthProvider.GetHosts() + + var hostSummaries []HostRAIDSummary + + for _, host := range hosts { + // Apply host filter + if hostFilter != "" && host.ID != hostFilter && host.Hostname != hostFilter && host.DisplayName != hostFilter { + continue + } + + // Skip hosts without RAID arrays + if len(host.RAID) == 0 { + continue + } + + var arrays []HostRAIDArraySummary + + for _, raid := range host.RAID { + // Apply state filter + if stateFilter != "" && !strings.EqualFold(raid.State, stateFilter) { + continue + } + + var devices []HostRAIDDeviceSummary + for _, dev := range raid.Devices { + devices = append(devices, HostRAIDDeviceSummary{ + Device: dev.Device, + State: dev.State, + Slot: dev.Slot, + }) + } + + if devices == nil { + devices = []HostRAIDDeviceSummary{} + } + + arrays = append(arrays, HostRAIDArraySummary{ + Device: raid.Device, + Name: raid.Name, + Level: raid.Level, + State: raid.State, + TotalDevices: raid.TotalDevices, + ActiveDevices: raid.ActiveDevices, + WorkingDevices: raid.WorkingDevices, + FailedDevices: raid.FailedDevices, + SpareDevices: raid.SpareDevices, + UUID: raid.UUID, + RebuildPercent: raid.RebuildPercent, + RebuildSpeed: raid.RebuildSpeed, + Devices: devices, + }) + } + + if len(arrays) > 0 { + if arrays == nil { + arrays = []HostRAIDArraySummary{} + } + hostSummaries = append(hostSummaries, HostRAIDSummary{ + Hostname: host.Hostname, + HostID: host.ID, + Arrays: arrays, + }) + } + } + + if hostSummaries == nil { + hostSummaries = []HostRAIDSummary{} + } + + if len(hostSummaries) == 0 { + if hostFilter != "" { + return NewTextResult(fmt.Sprintf("No RAID arrays found for host '%s'.", hostFilter)), nil + } + return NewTextResult("No RAID arrays found across any hosts. RAID monitoring requires host agents to be configured."), nil + } + + response := HostRAIDStatusResponse{ + Hosts: hostSummaries, + Total: len(hostSummaries), + } + + return NewJSONResult(response), nil +} + +func (e *PulseToolExecutor) executeGetHostCephDetails(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + if e.diskHealthProvider == nil { + return NewTextResult("Disk health provider not available."), nil + } + + hostFilter, _ := args["host"].(string) + + hosts := e.diskHealthProvider.GetHosts() + + var hostSummaries []HostCephSummary + + for _, host := range hosts { + // Apply host filter + if hostFilter != "" && host.ID != hostFilter && host.Hostname != hostFilter && host.DisplayName != hostFilter { + continue + } + + // Skip hosts without Ceph data + if host.Ceph == nil { + continue + } + + ceph := host.Ceph + + // Build health messages from checks and summary + var healthMessages []HostCephHealthMessage + for checkName, check := range ceph.Health.Checks { + msg := check.Message + if msg == "" { + msg = checkName + } + healthMessages = append(healthMessages, HostCephHealthMessage{ + Severity: check.Severity, + Message: msg, + }) + } + for _, summary := range ceph.Health.Summary { + healthMessages = append(healthMessages, HostCephHealthMessage{ + Severity: summary.Severity, + Message: summary.Message, + }) + } + + // Build monitor summary + var monSummary *HostCephMonSummary + if ceph.MonMap.NumMons > 0 { + var monitors []HostCephMonitorSummary + for _, mon := range ceph.MonMap.Monitors { + monitors = append(monitors, HostCephMonitorSummary{ + Name: mon.Name, + Rank: mon.Rank, + Addr: mon.Addr, + Status: mon.Status, + }) + } + monSummary = &HostCephMonSummary{ + NumMons: ceph.MonMap.NumMons, + Monitors: monitors, + } + } + + // Build manager summary + var mgrSummary *HostCephMgrSummary + if ceph.MgrMap.NumMgrs > 0 || ceph.MgrMap.Available { + mgrSummary = &HostCephMgrSummary{ + Available: ceph.MgrMap.Available, + NumMgrs: ceph.MgrMap.NumMgrs, + ActiveMgr: ceph.MgrMap.ActiveMgr, + Standbys: ceph.MgrMap.Standbys, + } + } + + // Build pool summaries + var pools []HostCephPoolSummary + for _, pool := range ceph.Pools { + pools = append(pools, HostCephPoolSummary{ + ID: pool.ID, + Name: pool.Name, + BytesUsed: pool.BytesUsed, + BytesAvailable: pool.BytesAvailable, + Objects: pool.Objects, + PercentUsed: pool.PercentUsed, + }) + } + + if healthMessages == nil { + healthMessages = []HostCephHealthMessage{} + } + if pools == nil { + pools = []HostCephPoolSummary{} + } + + hostSummaries = append(hostSummaries, HostCephSummary{ + Hostname: host.Hostname, + HostID: host.ID, + FSID: ceph.FSID, + Health: HostCephHealthSummary{ + Status: ceph.Health.Status, + Messages: healthMessages, + }, + MonMap: monSummary, + MgrMap: mgrSummary, + OSDMap: HostCephOSDSummary{ + NumOSDs: ceph.OSDMap.NumOSDs, + NumUp: ceph.OSDMap.NumUp, + NumIn: ceph.OSDMap.NumIn, + NumDown: ceph.OSDMap.NumDown, + NumOut: ceph.OSDMap.NumOut, + }, + PGMap: HostCephPGSummary{ + NumPGs: ceph.PGMap.NumPGs, + BytesTotal: ceph.PGMap.BytesTotal, + BytesUsed: ceph.PGMap.BytesUsed, + BytesAvailable: ceph.PGMap.BytesAvailable, + UsagePercent: ceph.PGMap.UsagePercent, + DegradedRatio: ceph.PGMap.DegradedRatio, + MisplacedRatio: ceph.PGMap.MisplacedRatio, + ReadBytesPerSec: ceph.PGMap.ReadBytesPerSec, + WriteBytesPerSec: ceph.PGMap.WriteBytesPerSec, + ReadOpsPerSec: ceph.PGMap.ReadOpsPerSec, + WriteOpsPerSec: ceph.PGMap.WriteOpsPerSec, + }, + Pools: pools, + CollectedAt: ceph.CollectedAt, + }) + } + + if hostSummaries == nil { + hostSummaries = []HostCephSummary{} + } + + if len(hostSummaries) == 0 { + if hostFilter != "" { + return NewTextResult(fmt.Sprintf("No Ceph data found for host '%s'.", hostFilter)), nil + } + return NewTextResult("No Ceph data found from host agents. Ceph monitoring requires host agents to be configured on Ceph nodes."), nil + } + + response := HostCephDetailsResponse{ + Hosts: hostSummaries, + Total: len(hostSummaries), + } + + return NewJSONResult(response), nil +} + +func (e *PulseToolExecutor) executeGetResourceDisks(_ context.Context, args map[string]interface{}) (CallToolResult, error) { + if e.stateProvider == nil { + return NewTextResult("State provider not available."), nil + } + + resourceFilter, _ := args["resource_id"].(string) + typeFilter, _ := args["type"].(string) + instanceFilter, _ := args["instance"].(string) + minUsage, _ := args["min_usage"].(float64) + + state := e.stateProvider.GetState() + + var resources []ResourceDisksSummary + + // Process VMs + if typeFilter == "" || strings.EqualFold(typeFilter, "vm") { + for _, vm := range state.VMs { + // Apply filters + if resourceFilter != "" && vm.ID != resourceFilter && fmt.Sprintf("%d", vm.VMID) != resourceFilter { + continue + } + if instanceFilter != "" && vm.Instance != instanceFilter { + continue + } + // Skip VMs without disk data + if len(vm.Disks) == 0 { + continue + } + + var disks []ResourceDiskInfo + maxUsage := 0.0 + + for _, disk := range vm.Disks { + if disk.Usage > maxUsage { + maxUsage = disk.Usage + } + + disks = append(disks, ResourceDiskInfo{ + Device: disk.Device, + Mountpoint: disk.Mountpoint, + Type: disk.Type, + TotalBytes: disk.Total, + UsedBytes: disk.Used, + FreeBytes: disk.Free, + Usage: disk.Usage, + }) + } + + // Apply min_usage filter + if minUsage > 0 && maxUsage < minUsage { + continue + } + + if disks == nil { + disks = []ResourceDiskInfo{} + } + + resources = append(resources, ResourceDisksSummary{ + ID: vm.ID, + VMID: vm.VMID, + Name: vm.Name, + Type: "vm", + Node: vm.Node, + Instance: vm.Instance, + Disks: disks, + }) + } + } + + // Process containers + if typeFilter == "" || strings.EqualFold(typeFilter, "lxc") { + for _, ct := range state.Containers { + // Apply filters + if resourceFilter != "" && ct.ID != resourceFilter && fmt.Sprintf("%d", ct.VMID) != resourceFilter { + continue + } + if instanceFilter != "" && ct.Instance != instanceFilter { + continue + } + // Skip containers without disk data + if len(ct.Disks) == 0 { + continue + } + + var disks []ResourceDiskInfo + maxUsage := 0.0 + + for _, disk := range ct.Disks { + if disk.Usage > maxUsage { + maxUsage = disk.Usage + } + + disks = append(disks, ResourceDiskInfo{ + Device: disk.Device, + Mountpoint: disk.Mountpoint, + Type: disk.Type, + TotalBytes: disk.Total, + UsedBytes: disk.Used, + FreeBytes: disk.Free, + Usage: disk.Usage, + }) + } + + // Apply min_usage filter + if minUsage > 0 && maxUsage < minUsage { + continue + } + + if disks == nil { + disks = []ResourceDiskInfo{} + } + + resources = append(resources, ResourceDisksSummary{ + ID: ct.ID, + VMID: ct.VMID, + Name: ct.Name, + Type: "lxc", + Node: ct.Node, + Instance: ct.Instance, + Disks: disks, + }) + } + } + + if resources == nil { + resources = []ResourceDisksSummary{} + } + + if len(resources) == 0 { + if resourceFilter != "" { + return NewTextResult(fmt.Sprintf("No disk data found for resource '%s'. Guest agent may not be installed or disk info unavailable.", resourceFilter)), nil + } + return NewTextResult("No disk data available for any VMs or containers. Disk details require guest agents to be installed and running."), nil + } + + response := ResourceDisksResponse{ + Resources: resources, + Total: len(resources), + } + + return NewJSONResult(response), nil +}