refactor(ai): consolidate tool implementations into domain-specific files

- Merge tools_infrastructure.go, tools_intelligence.go, tools_patrol.go,
  tools_profiles.go into their respective domain tools
- Expand tools_control.go with command execution logic
- Expand tools_discovery.go with resource discovery handlers
- Expand tools_storage.go with storage-related operations
- Expand tools_metrics.go with metrics functionality
- Update tests to match new structure

This consolidation reduces file count and groups related functionality together.
This commit is contained in:
rcourtman
2026-01-28 21:21:28 +00:00
parent 23ff4d1337
commit a75393d1c5
15 changed files with 4236 additions and 106 deletions

View File

@@ -34,7 +34,7 @@ func TestPulseToolExecutor_ExecuteListBackups(t *testing.T) {
backupProv.On("GetBackups").Return(expectedBackups)
backupProv.On("GetPBSInstances").Return([]models.PBSInstance{})
// Use consolidated pulse_storage tool with type: "backups"
// Use pulse_storage tool with type: "backups"
result, err := exec.ExecuteTool(context.Background(), "pulse_storage", map[string]interface{}{
"type": "backups",
})
@@ -69,7 +69,7 @@ func TestPulseToolExecutor_ExecuteControlGuest(t *testing.T) {
ExitCode: 0,
}, nil)
// Use consolidated pulse_control tool with type: "guest"
// Use pulse_control tool with type: "guest"
result, err := exec.ExecuteTool(context.Background(), "pulse_control", map[string]interface{}{
"type": "guest",
"guest_id": "100",
@@ -112,7 +112,7 @@ func TestPulseToolExecutor_ExecuteControlDocker(t *testing.T) {
ExitCode: 0,
}, nil)
// Use consolidated pulse_docker tool with action: "control"
// Use pulse_docker tool with action: "control"
result, err := exec.ExecuteTool(context.Background(), "pulse_docker", map[string]interface{}{
"action": "control",
"container": "nginx",

View File

@@ -565,7 +565,7 @@ func (e *PulseToolExecutor) ListTools() []Tool {
func (e *PulseToolExecutor) isToolAvailable(name string) bool {
switch name {
// Consolidated tools - check based on primary requirements
// Check tool availability based on primary requirements
case "pulse_query":
return e.stateProvider != nil
case "pulse_metrics":
@@ -607,8 +607,7 @@ func (e *PulseToolExecutor) ExecuteTool(ctx context.Context, name string, args m
// registerTools registers all available tools
func (e *PulseToolExecutor) registerTools() {
// Consolidated tools (49 tools -> 10 tools)
// See plan at /Users/rcourtman/.claude/plans/atomic-wobbling-rose.md
// All tools registered below (12 tools total)
// pulse_query - search, get, config, topology, list, health
e.registerQueryTools()
@@ -634,17 +633,17 @@ func (e *PulseToolExecutor) registerTools() {
// pulse_control - guest control, run commands (requires control permission)
// NOTE: For read-only command execution, use pulse_read instead
e.registerControlToolsConsolidated()
e.registerControlTools()
// pulse_file_edit - read, append, write files (requires control permission)
e.registerFileTools()
// pulse_discovery - get, list discoveries
e.registerDiscoveryToolsConsolidated()
e.registerDiscoveryTools()
// pulse_knowledge - remember, recall, incidents, correlate, relationships
e.registerKnowledgeTools()
// pulse_pmg - status, mail_stats, queues, spam
e.registerPMGToolsConsolidated()
e.registerPMGTools()
}

View File

@@ -150,44 +150,6 @@ func TestExecuteListPBSJobs(t *testing.T) {
assert.Equal(t, "job1", resp.Jobs[0].ID)
}
func TestExecuteGetClusterStatus(t *testing.T) {
ctx := context.Background()
exec := NewPulseToolExecutor(ExecutorConfig{StateProvider: &mockStateProvider{state: models.StateSnapshot{}}})
result, err := exec.executeGetClusterStatus(ctx, map[string]interface{}{})
require.NoError(t, err)
assert.Equal(t, "No Proxmox nodes found.", result.Content[0].Text)
state := models.StateSnapshot{
Nodes: []models.Node{
{
Name: "node1",
Status: "online",
IsClusterMember: true,
ClusterName: "cluster1",
Instance: "pve1",
},
{
Name: "node2",
Status: "offline",
IsClusterMember: true,
ClusterName: "cluster1",
Instance: "pve1",
},
},
}
exec = NewPulseToolExecutor(ExecutorConfig{StateProvider: &mockStateProvider{state: state}})
result, err = exec.executeGetClusterStatus(ctx, map[string]interface{}{})
require.NoError(t, err)
var resp ClusterStatusResponse
require.NoError(t, json.Unmarshal([]byte(result.Content[0].Text), &resp))
require.Len(t, resp.Clusters, 1)
assert.Equal(t, "cluster1", resp.Clusters[0].ClusterName)
assert.False(t, resp.Clusters[0].QuorumOK)
}
func TestExecuteGetConnectionHealth(t *testing.T) {
ctx := context.Background()
exec := NewPulseToolExecutor(ExecutorConfig{StateProvider: &mockStateProvider{state: models.StateSnapshot{}}})
@@ -349,36 +311,6 @@ func TestExecuteGetResourceDisks(t *testing.T) {
assert.Equal(t, "vm1", resp.Resources[0].ID)
}
func TestExecuteListRecentTasks(t *testing.T) {
ctx := context.Background()
now := time.Now()
state := models.StateSnapshot{
PVEBackups: models.PVEBackups{
BackupTasks: []models.BackupTask{
{
ID: "task1",
Node: "node1",
Instance: "pve1",
Type: "backup",
Status: "ok",
StartTime: now,
},
},
},
}
exec := NewPulseToolExecutor(ExecutorConfig{StateProvider: &mockStateProvider{state: state}})
result, err := exec.executeListRecentTasks(ctx, map[string]interface{}{
"type": "backup",
})
require.NoError(t, err)
var resp RecentTasksResponse
require.NoError(t, json.Unmarshal([]byte(result.Content[0].Text), &resp))
require.Len(t, resp.Tasks, 1)
assert.Equal(t, "task1", resp.Tasks[0].ID)
}
func TestExecuteListBackupTasks(t *testing.T) {
ctx := context.Background()
exec := NewPulseToolExecutor(ExecutorConfig{})

View File

@@ -20,7 +20,7 @@ func TestExecuteGetDiskHealth(t *testing.T) {
}
diskHealthProv.On("GetHosts").Return(expectedHosts)
// Use consolidated pulse_storage tool with type: "disk_health"
// Use pulse_storage tool with type: "disk_health"
result, err := exec.ExecuteTool(context.Background(), "pulse_storage", map[string]interface{}{
"type": "disk_health",
})
@@ -44,7 +44,7 @@ func TestExecuteGetTemperatures(t *testing.T) {
}
stateProv.On("GetState").Return(state)
// Use consolidated pulse_metrics tool with type: "temperatures"
// Use pulse_metrics tool with type: "temperatures"
result, err := exec.ExecuteTool(context.Background(), "pulse_metrics", map[string]interface{}{
"type": "temperatures",
})

View File

@@ -1489,6 +1489,8 @@ func TestExecutionIntent_ReadOnlyPipeChains(t *testing.T) {
"journalctl -u nginx | grep failed",
"ls -la /opt | grep -v total",
"find /var/log -name '*.log' 2>/dev/null | head -20",
"ss -tuln | grep -E '3000|9090|9100|22|25'",
"netstat -tuln | grep -E '3000|9090|9100|22|25'",
}
for _, cmd := range allowedPipeChains {

View File

@@ -3,9 +3,10 @@ package tools
import (
"context"
"fmt"
"strings"
)
// registerAlertsTools registers the consolidated pulse_alerts tool
// registerAlertsTools registers the pulse_alerts tool
func (e *PulseToolExecutor) registerAlertsTools() {
e.registry.Register(RegisteredTool{
Definition: Tool{
@@ -94,7 +95,6 @@ Examples:
}
// executeAlerts routes to the appropriate alerts handler based on action
// All handler functions are implemented in tools_patrol.go
func (e *PulseToolExecutor) executeAlerts(ctx context.Context, args map[string]interface{}) (CallToolResult, error) {
action, _ := args["action"].(string)
switch action {
@@ -112,3 +112,292 @@ func (e *PulseToolExecutor) executeAlerts(ctx context.Context, args map[string]i
return NewErrorResult(fmt.Errorf("unknown action: %s. Use: list, findings, resolved, resolve, dismiss", action)), nil
}
}
func (e *PulseToolExecutor) executeListAlerts(_ context.Context, args map[string]interface{}) (CallToolResult, error) {
if e.alertProvider == nil {
return NewTextResult("Alert data not available."), nil
}
severityFilter, _ := args["severity"].(string)
limit := intArg(args, "limit", 100)
offset := intArg(args, "offset", 0)
allAlerts := e.alertProvider.GetActiveAlerts()
var filtered []ActiveAlert
for i, a := range allAlerts {
if i < offset {
continue
}
if len(filtered) >= limit {
break
}
if severityFilter != "" && a.Severity != severityFilter {
continue
}
filtered = append(filtered, a)
}
if filtered == nil {
filtered = []ActiveAlert{}
}
response := AlertsResponse{
Alerts: filtered,
Count: len(filtered),
}
if offset > 0 || len(allAlerts) > limit {
response.Pagination = &PaginationInfo{
Total: len(allAlerts),
Limit: limit,
Offset: offset,
}
}
return NewJSONResult(response), nil
}
func (e *PulseToolExecutor) executeListFindings(_ context.Context, args map[string]interface{}) (CallToolResult, error) {
includeDismissed, _ := args["include_dismissed"].(bool)
severityFilter, _ := args["severity"].(string)
resourceType, _ := args["resource_type"].(string)
resourceID, _ := args["resource_id"].(string)
limit := intArg(args, "limit", 100)
offset := intArg(args, "offset", 0)
if limit <= 0 {
limit = 100
}
if offset < 0 {
offset = 0
}
resourceType = strings.ToLower(strings.TrimSpace(resourceType))
resourceID = strings.TrimSpace(resourceID)
if resourceType != "" {
validTypes := map[string]bool{"vm": true, "container": true, "node": true, "docker": true}
if !validTypes[resourceType] {
return NewErrorResult(fmt.Errorf("invalid resource_type: %s. Use vm, container, node, or docker", resourceType)), nil
}
}
if e.findingsProvider == nil {
return NewTextResult("Patrol findings not available. Pulse Patrol may not be running."), nil
}
allActive := e.findingsProvider.GetActiveFindings()
var allDismissed []Finding
if includeDismissed {
allDismissed = e.findingsProvider.GetDismissedFindings()
}
normalizeType := func(value string) string {
normalized := strings.ToLower(strings.TrimSpace(value))
switch normalized {
case "docker container", "docker-container", "docker_container":
return "docker"
case "lxc", "lxc container", "lxc-container", "lxc_container":
return "container"
default:
return normalized
}
}
matches := func(f Finding) bool {
if severityFilter != "" && f.Severity != severityFilter {
return false
}
if resourceID != "" && f.ResourceID != resourceID {
return false
}
if resourceType != "" && normalizeType(f.ResourceType) != resourceType {
return false
}
return true
}
// Filter active
var active []Finding
totalActive := 0
for _, f := range allActive {
if !matches(f) {
continue
}
if totalActive < offset {
totalActive++
continue
}
if len(active) >= limit {
totalActive++
continue
}
active = append(active, f)
totalActive++
}
// Filter dismissed
var dismissed []Finding
totalDismissed := 0
if includeDismissed {
for _, f := range allDismissed {
if !matches(f) {
continue
}
if totalDismissed < offset {
totalDismissed++
continue
}
if len(dismissed) >= limit {
totalDismissed++
continue
}
dismissed = append(dismissed, f)
totalDismissed++
}
}
if active == nil {
active = []Finding{}
}
if dismissed == nil {
dismissed = []Finding{}
}
response := FindingsResponse{
Active: active,
Dismissed: dismissed,
Counts: FindingCounts{
Active: totalActive,
Dismissed: totalDismissed,
},
}
total := totalActive + totalDismissed
if offset > 0 || total > limit {
response.Pagination = &PaginationInfo{
Total: total,
Limit: limit,
Offset: offset,
}
}
return NewJSONResult(response), nil
}
func (e *PulseToolExecutor) executeResolveFinding(_ context.Context, args map[string]interface{}) (CallToolResult, error) {
findingID, _ := args["finding_id"].(string)
resolutionNote, _ := args["resolution_note"].(string)
if findingID == "" {
return NewErrorResult(fmt.Errorf("finding_id is required")), nil
}
if resolutionNote == "" {
return NewErrorResult(fmt.Errorf("resolution_note is required")), nil
}
if e.findingsManager == nil {
return NewTextResult("Findings manager not available."), nil
}
if err := e.findingsManager.ResolveFinding(findingID, resolutionNote); err != nil {
return NewErrorResult(err), nil
}
return NewJSONResult(map[string]interface{}{
"success": true,
"finding_id": findingID,
"action": "resolved",
"resolution_note": resolutionNote,
}), nil
}
func (e *PulseToolExecutor) executeDismissFinding(_ context.Context, args map[string]interface{}) (CallToolResult, error) {
findingID, _ := args["finding_id"].(string)
reason, _ := args["reason"].(string)
note, _ := args["note"].(string)
if findingID == "" {
return NewErrorResult(fmt.Errorf("finding_id is required")), nil
}
if reason == "" {
return NewErrorResult(fmt.Errorf("reason is required")), nil
}
if note == "" {
return NewErrorResult(fmt.Errorf("note is required")), nil
}
if e.findingsManager == nil {
return NewTextResult("Findings manager not available."), nil
}
if err := e.findingsManager.DismissFinding(findingID, reason, note); err != nil {
return NewErrorResult(err), nil
}
return NewJSONResult(map[string]interface{}{
"success": true,
"finding_id": findingID,
"action": "dismissed",
"reason": reason,
"note": note,
}), nil
}
// ========== Resolved Alerts Tool Implementation ==========
func (e *PulseToolExecutor) executeListResolvedAlerts(_ context.Context, args map[string]interface{}) (CallToolResult, error) {
if e.stateProvider == nil {
return NewTextResult("State provider not available."), nil
}
typeFilter, _ := args["type"].(string)
levelFilter, _ := args["level"].(string)
limit := intArg(args, "limit", 50)
state := e.stateProvider.GetState()
if len(state.RecentlyResolved) == 0 {
return NewTextResult("No recently resolved alerts."), nil
}
var alerts []ResolvedAlertSummary
for _, alert := range state.RecentlyResolved {
// Apply filters
if typeFilter != "" && !strings.EqualFold(alert.Type, typeFilter) {
continue
}
if levelFilter != "" && !strings.EqualFold(alert.Level, levelFilter) {
continue
}
if len(alerts) >= limit {
break
}
alerts = append(alerts, ResolvedAlertSummary{
ID: alert.ID,
Type: alert.Type,
Level: alert.Level,
ResourceID: alert.ResourceID,
ResourceName: alert.ResourceName,
Node: alert.Node,
Instance: alert.Instance,
Message: alert.Message,
Value: alert.Value,
Threshold: alert.Threshold,
StartTime: alert.StartTime,
ResolvedTime: alert.ResolvedTime,
})
}
if alerts == nil {
alerts = []ResolvedAlertSummary{}
}
response := ResolvedAlertsResponse{
Alerts: alerts,
Total: len(state.RecentlyResolved),
}
return NewJSONResult(response), nil
}

File diff suppressed because it is too large Load Diff

View File

@@ -3,10 +3,12 @@ package tools
import (
"context"
"fmt"
"strconv"
"strings"
)
// registerDiscoveryToolsConsolidated registers the consolidated pulse_discovery tool
func (e *PulseToolExecutor) registerDiscoveryToolsConsolidated() {
// registerDiscoveryTools registers the pulse_discovery tool
func (e *PulseToolExecutor) registerDiscoveryTools() {
e.registry.Register(RegisteredTool{
Definition: Tool{
Name: "pulse_discovery",
@@ -73,7 +75,6 @@ Examples:
}
// executeDiscovery routes to the appropriate discovery handler based on action
// Handler functions are implemented in tools_discovery.go
func (e *PulseToolExecutor) executeDiscovery(ctx context.Context, args map[string]interface{}) (CallToolResult, error) {
action, _ := args["action"].(string)
switch action {
@@ -85,3 +86,417 @@ func (e *PulseToolExecutor) executeDiscovery(ctx context.Context, args map[strin
return NewErrorResult(fmt.Errorf("unknown action: %s. Use: get, list", action)), nil
}
}
// getCommandContext returns information about how to run commands on a resource.
// This helps the AI understand what commands to use with pulse_control.
type CommandContext struct {
// How to run commands: "direct" (agent on resource), "via_host" (agent on parent host)
Method string `json:"method"`
// The target_host value to use with pulse_control
TargetHost string `json:"target_host"`
// Example command pattern (what to pass to pulse_control)
Example string `json:"example"`
// For containers running inside this resource (e.g., Docker in LXC)
NestedExample string `json:"nested_example,omitempty"`
}
// getCLIAccessPattern returns context about the resource type.
// Does NOT prescribe how to access - the AI should determine that based on available agents.
func getCLIAccessPattern(resourceType, hostID, resourceID string) string {
switch resourceType {
case "lxc":
return fmt.Sprintf("LXC container on Proxmox node '%s' (VMID %s)", hostID, resourceID)
case "vm":
return fmt.Sprintf("VM on Proxmox node '%s' (VMID %s)", hostID, resourceID)
case "docker":
return fmt.Sprintf("Docker container '%s' on host '%s'", resourceID, hostID)
case "host":
return fmt.Sprintf("Host '%s'", hostID)
default:
return ""
}
}
// commonServicePaths contains typical log/config paths for well-known services
// These are fallbacks when discovery doesn't find specific paths
var commonServicePaths = map[string]struct {
LogPaths []string
ConfigPaths []string
DataPaths []string
}{
"jellyfin": {
LogPaths: []string{"/var/log/jellyfin/", "/config/log/"},
ConfigPaths: []string{"/etc/jellyfin/", "/config/"},
DataPaths: []string{"/var/lib/jellyfin/", "/config/data/"},
},
"plex": {
LogPaths: []string{"/var/lib/plexmediaserver/Library/Application Support/Plex Media Server/Logs/"},
ConfigPaths: []string{"/var/lib/plexmediaserver/Library/Application Support/Plex Media Server/"},
DataPaths: []string{"/var/lib/plexmediaserver/"},
},
"sonarr": {
LogPaths: []string{"/config/logs/"},
ConfigPaths: []string{"/config/"},
DataPaths: []string{"/config/"},
},
"radarr": {
LogPaths: []string{"/config/logs/"},
ConfigPaths: []string{"/config/"},
DataPaths: []string{"/config/"},
},
"prowlarr": {
LogPaths: []string{"/config/logs/"},
ConfigPaths: []string{"/config/"},
DataPaths: []string{"/config/"},
},
"lidarr": {
LogPaths: []string{"/config/logs/"},
ConfigPaths: []string{"/config/"},
DataPaths: []string{"/config/"},
},
"postgresql": {
LogPaths: []string{"/var/log/postgresql/", "/var/lib/postgresql/data/log/"},
ConfigPaths: []string{"/etc/postgresql/", "/var/lib/postgresql/data/"},
DataPaths: []string{"/var/lib/postgresql/data/"},
},
"mysql": {
LogPaths: []string{"/var/log/mysql/", "/var/lib/mysql/"},
ConfigPaths: []string{"/etc/mysql/"},
DataPaths: []string{"/var/lib/mysql/"},
},
"mariadb": {
LogPaths: []string{"/var/log/mysql/", "/var/lib/mysql/"},
ConfigPaths: []string{"/etc/mysql/"},
DataPaths: []string{"/var/lib/mysql/"},
},
"nginx": {
LogPaths: []string{"/var/log/nginx/"},
ConfigPaths: []string{"/etc/nginx/"},
DataPaths: []string{"/var/www/"},
},
"homeassistant": {
LogPaths: []string{"/config/home-assistant.log"},
ConfigPaths: []string{"/config/"},
DataPaths: []string{"/config/"},
},
"frigate": {
LogPaths: []string{"/config/logs/"},
ConfigPaths: []string{"/config/"},
DataPaths: []string{"/media/frigate/"},
},
"redis": {
LogPaths: []string{"/var/log/redis/"},
ConfigPaths: []string{"/etc/redis/"},
DataPaths: []string{"/var/lib/redis/"},
},
"mongodb": {
LogPaths: []string{"/var/log/mongodb/"},
ConfigPaths: []string{"/etc/mongod.conf"},
DataPaths: []string{"/var/lib/mongodb/"},
},
"grafana": {
LogPaths: []string{"/var/log/grafana/"},
ConfigPaths: []string{"/etc/grafana/"},
DataPaths: []string{"/var/lib/grafana/"},
},
"prometheus": {
LogPaths: []string{"/var/log/prometheus/"},
ConfigPaths: []string{"/etc/prometheus/"},
DataPaths: []string{"/var/lib/prometheus/"},
},
"influxdb": {
LogPaths: []string{"/var/log/influxdb/"},
ConfigPaths: []string{"/etc/influxdb/"},
DataPaths: []string{"/var/lib/influxdb/"},
},
}
// getCommonServicePaths returns fallback paths for a service type
func getCommonServicePaths(serviceType string) (logPaths, configPaths, dataPaths []string) {
// Normalize service type (lowercase, remove version numbers)
normalized := strings.ToLower(serviceType)
// Try to match against known services
for key, paths := range commonServicePaths {
if strings.Contains(normalized, key) {
return paths.LogPaths, paths.ConfigPaths, paths.DataPaths
}
}
return nil, nil, nil
}
func (e *PulseToolExecutor) executeGetDiscovery(ctx context.Context, args map[string]interface{}) (CallToolResult, error) {
if e.discoveryProvider == nil {
return NewTextResult("Discovery service not available."), nil
}
resourceType, _ := args["resource_type"].(string)
resourceID, _ := args["resource_id"].(string)
hostID, _ := args["host_id"].(string)
if resourceType == "" {
return NewErrorResult(fmt.Errorf("resource_type is required")), nil
}
if resourceID == "" {
return NewErrorResult(fmt.Errorf("resource_id is required")), nil
}
if hostID == "" {
return NewErrorResult(fmt.Errorf("host_id is required - use the 'node' field from search or get_resource results")), nil
}
// For LXC and VM types, resourceID should be a numeric VMID.
// If a name was passed, try to resolve it to a VMID.
if (resourceType == "lxc" || resourceType == "vm") && e.stateProvider != nil {
if _, err := strconv.Atoi(resourceID); err != nil {
// Not a number - try to resolve the name to a VMID
state := e.stateProvider.GetState()
resolved := false
if resourceType == "lxc" {
for _, c := range state.Containers {
if strings.EqualFold(c.Name, resourceID) && c.Node == hostID {
resourceID = fmt.Sprintf("%d", c.VMID)
resolved = true
break
}
}
} else if resourceType == "vm" {
for _, vm := range state.VMs {
if strings.EqualFold(vm.Name, resourceID) && vm.Node == hostID {
resourceID = fmt.Sprintf("%d", vm.VMID)
resolved = true
break
}
}
}
if !resolved {
return NewErrorResult(fmt.Errorf("could not resolve resource name '%s' to a VMID on host '%s'", resourceID, hostID)), nil
}
}
}
// First try to get existing discovery
discovery, err := e.discoveryProvider.GetDiscoveryByResource(resourceType, hostID, resourceID)
if err != nil {
return NewErrorResult(fmt.Errorf("failed to get discovery: %w", err)), nil
}
// Compute CLI access pattern (always useful, even if discovery fails)
cliAccess := getCLIAccessPattern(resourceType, hostID, resourceID)
// If no discovery exists, trigger one
if discovery == nil {
discovery, err = e.discoveryProvider.TriggerDiscovery(ctx, resourceType, hostID, resourceID)
if err != nil {
// Even on failure, provide cli_access so AI can investigate manually
return NewJSONResult(map[string]interface{}{
"found": false,
"resource_type": resourceType,
"resource_id": resourceID,
"host_id": hostID,
"cli_access": cliAccess,
"message": fmt.Sprintf("Discovery failed: %v", err),
"hint": "Use pulse_control with type='command' to investigate. Try checking /var/log/ for logs.",
}), nil
}
}
if discovery == nil {
// No discovery but provide cli_access for manual investigation
return NewJSONResult(map[string]interface{}{
"found": false,
"resource_type": resourceType,
"resource_id": resourceID,
"host_id": hostID,
"cli_access": cliAccess,
"message": "Discovery returned no data. The resource may not be accessible.",
"hint": "Use pulse_control with type='command' to investigate. Try listing /var/log/ or checking running processes.",
}), nil
}
// Use fallback cli_access if discovery didn't provide one
responseCLIAccess := discovery.CLIAccess
if responseCLIAccess == "" {
responseCLIAccess = cliAccess
}
// Use fallback paths for known services if discovery didn't find specific ones
responseConfigPaths := discovery.ConfigPaths
responseDataPaths := discovery.DataPaths
var responseLogPaths []string
if discovery.ServiceType != "" {
fallbackLogPaths, fallbackConfigPaths, fallbackDataPaths := getCommonServicePaths(discovery.ServiceType)
if len(responseConfigPaths) == 0 && len(fallbackConfigPaths) > 0 {
responseConfigPaths = fallbackConfigPaths
}
if len(responseDataPaths) == 0 && len(fallbackDataPaths) > 0 {
responseDataPaths = fallbackDataPaths
}
if len(fallbackLogPaths) > 0 {
responseLogPaths = fallbackLogPaths
}
}
// Return the discovery information
response := map[string]interface{}{
"found": true,
"id": discovery.ID,
"resource_type": discovery.ResourceType,
"resource_id": discovery.ResourceID,
"host_id": discovery.HostID,
"hostname": discovery.Hostname,
"service_type": discovery.ServiceType,
"service_name": discovery.ServiceName,
"service_version": discovery.ServiceVersion,
"category": discovery.Category,
"cli_access": responseCLIAccess,
"config_paths": responseConfigPaths,
"data_paths": responseDataPaths,
"confidence": discovery.Confidence,
"discovered_at": discovery.DiscoveredAt,
"updated_at": discovery.UpdatedAt,
}
// Add log paths if we have them (from fallback or discovery)
if len(responseLogPaths) > 0 {
response["log_paths"] = responseLogPaths
}
// Add facts if present
if len(discovery.Facts) > 0 {
facts := make([]map[string]string, 0, len(discovery.Facts))
for _, f := range discovery.Facts {
facts = append(facts, map[string]string{
"category": f.Category,
"key": f.Key,
"value": f.Value,
})
}
response["facts"] = facts
}
// Add user notes if present
if discovery.UserNotes != "" {
response["user_notes"] = discovery.UserNotes
}
// Add AI reasoning for context
if discovery.AIReasoning != "" {
response["ai_reasoning"] = discovery.AIReasoning
}
// Add listening ports if present
if len(discovery.Ports) > 0 {
ports := make([]map[string]interface{}, 0, len(discovery.Ports))
for _, p := range discovery.Ports {
port := map[string]interface{}{
"port": p.Port,
"protocol": p.Protocol,
}
if p.Process != "" {
port["process"] = p.Process
}
if p.Address != "" {
port["address"] = p.Address
}
ports = append(ports, port)
}
response["ports"] = ports
}
return NewJSONResult(response), nil
}
func (e *PulseToolExecutor) executeListDiscoveries(_ context.Context, args map[string]interface{}) (CallToolResult, error) {
if e.discoveryProvider == nil {
return NewTextResult("Discovery service not available."), nil
}
filterType, _ := args["type"].(string)
filterHost, _ := args["host"].(string)
filterServiceType, _ := args["service_type"].(string)
limit := intArg(args, "limit", 50)
var discoveries []*ResourceDiscoveryInfo
var err error
// Get discoveries based on filters
if filterType != "" {
discoveries, err = e.discoveryProvider.ListDiscoveriesByType(filterType)
} else if filterHost != "" {
discoveries, err = e.discoveryProvider.ListDiscoveriesByHost(filterHost)
} else {
discoveries, err = e.discoveryProvider.ListDiscoveries()
}
if err != nil {
return NewErrorResult(fmt.Errorf("failed to list discoveries: %w", err)), nil
}
// Filter by service type if specified
if filterServiceType != "" {
filtered := make([]*ResourceDiscoveryInfo, 0)
filterLower := strings.ToLower(filterServiceType)
for _, d := range discoveries {
if strings.Contains(strings.ToLower(d.ServiceType), filterLower) ||
strings.Contains(strings.ToLower(d.ServiceName), filterLower) {
filtered = append(filtered, d)
}
}
discoveries = filtered
}
// Apply limit
if len(discoveries) > limit {
discoveries = discoveries[:limit]
}
// Build response
results := make([]map[string]interface{}, 0, len(discoveries))
for _, d := range discoveries {
result := map[string]interface{}{
"id": d.ID,
"resource_type": d.ResourceType,
"resource_id": d.ResourceID,
"host_id": d.HostID,
"hostname": d.Hostname,
"service_type": d.ServiceType,
"service_name": d.ServiceName,
"service_version": d.ServiceVersion,
"category": d.Category,
"cli_access": d.CLIAccess,
"confidence": d.Confidence,
"updated_at": d.UpdatedAt,
}
// Add key facts count
if len(d.Facts) > 0 {
result["facts_count"] = len(d.Facts)
}
// Add ports count
if len(d.Ports) > 0 {
result["ports_count"] = len(d.Ports)
}
results = append(results, result)
}
response := map[string]interface{}{
"discoveries": results,
"total": len(results),
}
if filterType != "" {
response["filter_type"] = filterType
}
if filterHost != "" {
response["filter_host"] = filterHost
}
if filterServiceType != "" {
response["filter_service_type"] = filterServiceType
}
return NewJSONResult(response), nil
}

View File

@@ -2,13 +2,14 @@ package tools
import (
"context"
"encoding/json"
"fmt"
"github.com/rcourtman/pulse-go-rewrite/internal/agentexec"
"github.com/rs/zerolog/log"
)
// registerDockerTools registers the consolidated pulse_docker tool
// registerDockerTools registers the pulse_docker tool
func (e *PulseToolExecutor) registerDockerTools() {
e.registry.Register(RegisteredTool{
Definition: Tool{
@@ -78,22 +79,16 @@ func (e *PulseToolExecutor) executeDocker(ctx context.Context, args map[string]i
case "control":
return e.executeDockerControl(ctx, args)
case "updates":
// Uses existing function from tools_infrastructure.go
return e.executeListDockerUpdates(ctx, args)
case "check_updates":
// Uses existing function from tools_infrastructure.go
return e.executeCheckDockerUpdates(ctx, args)
case "update":
// Uses existing function from tools_infrastructure.go
return e.executeUpdateDockerContainer(ctx, args)
case "services":
// Uses existing function from tools_infrastructure.go
return e.executeListDockerServices(ctx, args)
case "tasks":
// Uses existing function from tools_infrastructure.go
return e.executeListDockerTasks(ctx, args)
case "swarm":
// Uses existing function from tools_infrastructure.go
return e.executeGetSwarmStatus(ctx, args)
default:
return NewErrorResult(fmt.Errorf("unknown action: %s. Use: control, updates, check_updates, update, services, tasks, swarm", action)), nil
@@ -101,7 +96,6 @@ func (e *PulseToolExecutor) executeDocker(ctx context.Context, args map[string]i
}
// executeDockerControl handles start/stop/restart of Docker containers
// This is a new consolidated handler that merges pulse_control_docker functionality
func (e *PulseToolExecutor) executeDockerControl(ctx context.Context, args map[string]interface{}) (CallToolResult, error) {
containerName, _ := args["container"].(string)
hostName, _ := args["host"].(string)
@@ -199,3 +193,341 @@ func (e *PulseToolExecutor) executeDockerControl(ctx context.Context, args map[s
return NewTextResult(fmt.Sprintf("Command failed (exit code %d):\n%s", result.ExitCode, output)), nil
}
// ========== Docker Updates Handler Implementations ==========
func (e *PulseToolExecutor) executeListDockerUpdates(_ context.Context, args map[string]interface{}) (CallToolResult, error) {
if e.updatesProvider == nil {
return NewTextResult("Docker update information not available. Ensure updates provider is configured."), nil
}
hostFilter, _ := args["host"].(string)
// Resolve host name to ID if needed
hostID := e.resolveDockerHostID(hostFilter)
updates := e.updatesProvider.GetPendingUpdates(hostID)
// Ensure non-nil slice
if updates == nil {
updates = []ContainerUpdateInfo{}
}
response := DockerUpdatesResponse{
Updates: updates,
Total: len(updates),
HostID: hostID,
}
return NewJSONResult(response), nil
}
func (e *PulseToolExecutor) executeCheckDockerUpdates(_ context.Context, args map[string]interface{}) (CallToolResult, error) {
if e.updatesProvider == nil {
return NewTextResult("Docker update checking not available. Ensure updates provider is configured."), nil
}
hostArg, _ := args["host"].(string)
if hostArg == "" {
return NewErrorResult(fmt.Errorf("host is required")), nil
}
// Resolve host name to ID
hostID := e.resolveDockerHostID(hostArg)
if hostID == "" {
return NewTextResult(fmt.Sprintf("Docker host '%s' not found.", hostArg)), nil
}
hostName := e.getDockerHostName(hostID)
// Trigger the update check
cmdStatus, err := e.updatesProvider.TriggerUpdateCheck(hostID)
if err != nil {
return NewTextResult(fmt.Sprintf("Failed to trigger update check: %v", err)), nil
}
response := DockerCheckUpdatesResponse{
Success: true,
HostID: hostID,
HostName: hostName,
CommandID: cmdStatus.ID,
Message: "Update check command queued. Results will be available after the next agent report cycle (~30 seconds).",
Command: cmdStatus,
}
return NewJSONResult(response), nil
}
func (e *PulseToolExecutor) executeUpdateDockerContainer(ctx context.Context, args map[string]interface{}) (CallToolResult, error) {
if e.updatesProvider == nil {
return NewTextResult("Docker update functionality not available. Ensure updates provider is configured."), nil
}
containerArg, _ := args["container"].(string)
hostArg, _ := args["host"].(string)
if containerArg == "" {
return NewErrorResult(fmt.Errorf("container is required")), nil
}
if hostArg == "" {
return NewErrorResult(fmt.Errorf("host is required")), nil
}
// Check if update actions are enabled
if !e.updatesProvider.IsUpdateActionsEnabled() {
return NewTextResult("Docker container updates are disabled by server configuration. Set PULSE_DISABLE_DOCKER_UPDATE_ACTIONS=false or enable in Settings to allow updates."), nil
}
// Resolve container and host
container, dockerHost, err := e.resolveDockerContainer(containerArg, hostArg)
if err != nil {
return NewTextResult(fmt.Sprintf("Could not find container '%s' on host '%s': %v", containerArg, hostArg, err)), nil
}
containerName := trimContainerName(container.Name)
// Controlled mode - require approval
if e.controlLevel == ControlLevelControlled {
command := fmt.Sprintf("docker update %s", containerName)
agentHostname := e.getAgentHostnameForDockerHost(dockerHost)
approvalID := createApprovalRecord(command, "docker", container.ID, agentHostname, fmt.Sprintf("Update container %s to latest image", containerName))
return NewTextResult(formatDockerUpdateApprovalNeeded(containerName, dockerHost.Hostname, approvalID)), nil
}
// Autonomous mode - execute directly
cmdStatus, err := e.updatesProvider.UpdateContainer(dockerHost.ID, container.ID, containerName)
if err != nil {
return NewTextResult(fmt.Sprintf("Failed to queue update command: %v", err)), nil
}
response := DockerUpdateContainerResponse{
Success: true,
HostID: dockerHost.ID,
ContainerID: container.ID,
ContainerName: containerName,
CommandID: cmdStatus.ID,
Message: fmt.Sprintf("Update command queued for container '%s'. The agent will pull the latest image and recreate the container.", containerName),
Command: cmdStatus,
}
return NewJSONResult(response), nil
}
// Helper methods for Docker updates
func (e *PulseToolExecutor) resolveDockerHostID(hostArg string) string {
if hostArg == "" {
return ""
}
if e.stateProvider == nil {
return hostArg
}
state := e.stateProvider.GetState()
for _, host := range state.DockerHosts {
if host.ID == hostArg || host.Hostname == hostArg || host.DisplayName == hostArg {
return host.ID
}
}
return hostArg // Return as-is if not found (provider will handle error)
}
func (e *PulseToolExecutor) getDockerHostName(hostID string) string {
if e.stateProvider == nil {
return hostID
}
state := e.stateProvider.GetState()
for _, host := range state.DockerHosts {
if host.ID == hostID {
if host.DisplayName != "" {
return host.DisplayName
}
return host.Hostname
}
}
return hostID
}
func formatDockerUpdateApprovalNeeded(containerName, hostName, approvalID string) string {
payload := map[string]interface{}{
"type": "approval_required",
"approval_id": approvalID,
"container_name": containerName,
"docker_host": hostName,
"action": "update",
"command": fmt.Sprintf("docker update %s (pull latest + recreate)", containerName),
"how_to_approve": "Click the approval button in the chat to execute this update.",
"do_not_retry": true,
}
b, _ := json.Marshal(payload)
return "APPROVAL_REQUIRED: " + string(b)
}
func trimLeadingSlash(name string) string {
if len(name) > 0 && name[0] == '/' {
return name[1:]
}
return name
}
// ========== Docker Swarm Handler Implementations ==========
func (e *PulseToolExecutor) executeGetSwarmStatus(_ context.Context, args map[string]interface{}) (CallToolResult, error) {
if e.stateProvider == nil {
return NewTextResult("State provider not available."), nil
}
hostArg, _ := args["host"].(string)
if hostArg == "" {
return NewErrorResult(fmt.Errorf("host is required")), nil
}
state := e.stateProvider.GetState()
for _, host := range state.DockerHosts {
if host.ID == hostArg || host.Hostname == hostArg || host.DisplayName == hostArg || host.CustomDisplayName == hostArg {
if host.Swarm == nil {
return NewTextResult(fmt.Sprintf("Docker host '%s' is not part of a Swarm cluster.", host.Hostname)), nil
}
response := SwarmStatusResponse{
Host: host.Hostname,
Status: DockerSwarmSummary{
NodeID: host.Swarm.NodeID,
NodeRole: host.Swarm.NodeRole,
LocalState: host.Swarm.LocalState,
ControlAvailable: host.Swarm.ControlAvailable,
ClusterID: host.Swarm.ClusterID,
ClusterName: host.Swarm.ClusterName,
Error: host.Swarm.Error,
},
}
return NewJSONResult(response), nil
}
}
return NewTextResult(fmt.Sprintf("Docker host '%s' not found.", hostArg)), nil
}
func (e *PulseToolExecutor) executeListDockerServices(_ context.Context, args map[string]interface{}) (CallToolResult, error) {
if e.stateProvider == nil {
return NewTextResult("State provider not available."), nil
}
hostArg, _ := args["host"].(string)
if hostArg == "" {
return NewErrorResult(fmt.Errorf("host is required")), nil
}
stackFilter, _ := args["stack"].(string)
state := e.stateProvider.GetState()
for _, host := range state.DockerHosts {
if host.ID == hostArg || host.Hostname == hostArg || host.DisplayName == hostArg || host.CustomDisplayName == hostArg {
if len(host.Services) == 0 {
return NewTextResult(fmt.Sprintf("No Docker services found on host '%s'. The host may not be a Swarm manager.", host.Hostname)), nil
}
var services []DockerServiceSummary
filteredCount := 0
for _, svc := range host.Services {
if stackFilter != "" && svc.Stack != stackFilter {
continue
}
filteredCount++
updateStatus := ""
if svc.UpdateStatus != nil {
updateStatus = svc.UpdateStatus.State
}
services = append(services, DockerServiceSummary{
ID: svc.ID,
Name: svc.Name,
Stack: svc.Stack,
Image: svc.Image,
Mode: svc.Mode,
DesiredTasks: svc.DesiredTasks,
RunningTasks: svc.RunningTasks,
UpdateStatus: updateStatus,
})
}
if services == nil {
services = []DockerServiceSummary{}
}
response := DockerServicesResponse{
Host: host.Hostname,
Services: services,
Total: len(host.Services),
Filtered: filteredCount,
}
return NewJSONResult(response), nil
}
}
return NewTextResult(fmt.Sprintf("Docker host '%s' not found.", hostArg)), nil
}
func (e *PulseToolExecutor) executeListDockerTasks(_ context.Context, args map[string]interface{}) (CallToolResult, error) {
if e.stateProvider == nil {
return NewTextResult("State provider not available."), nil
}
hostArg, _ := args["host"].(string)
if hostArg == "" {
return NewErrorResult(fmt.Errorf("host is required")), nil
}
serviceFilter, _ := args["service"].(string)
state := e.stateProvider.GetState()
for _, host := range state.DockerHosts {
if host.ID == hostArg || host.Hostname == hostArg || host.DisplayName == hostArg || host.CustomDisplayName == hostArg {
if len(host.Tasks) == 0 {
return NewTextResult(fmt.Sprintf("No Docker tasks found on host '%s'. The host may not be a Swarm manager.", host.Hostname)), nil
}
var tasks []DockerTaskSummary
for _, task := range host.Tasks {
if serviceFilter != "" && task.ServiceID != serviceFilter && task.ServiceName != serviceFilter {
continue
}
tasks = append(tasks, DockerTaskSummary{
ID: task.ID,
ServiceName: task.ServiceName,
NodeName: task.NodeName,
DesiredState: task.DesiredState,
CurrentState: task.CurrentState,
Error: task.Error,
StartedAt: task.StartedAt,
})
}
if tasks == nil {
tasks = []DockerTaskSummary{}
}
response := DockerTasksResponse{
Host: host.Hostname,
Service: serviceFilter,
Tasks: tasks,
Total: len(tasks),
}
return NewJSONResult(response), nil
}
}
return NewTextResult(fmt.Sprintf("Docker host '%s' not found.", hostArg)), nil
}

View File

@@ -3,9 +3,92 @@ package tools
import (
"context"
"fmt"
"time"
)
// registerKnowledgeTools registers the consolidated pulse_knowledge tool
// IncidentRecorderProvider provides access to incident recording data
type IncidentRecorderProvider interface {
GetWindowsForResource(resourceID string, limit int) []*IncidentWindow
GetWindow(windowID string) *IncidentWindow
}
// IncidentWindow represents a high-frequency recording window during an incident
type IncidentWindow struct {
ID string `json:"id"`
ResourceID string `json:"resource_id"`
ResourceName string `json:"resource_name,omitempty"`
ResourceType string `json:"resource_type,omitempty"`
TriggerType string `json:"trigger_type"`
TriggerID string `json:"trigger_id,omitempty"`
StartTime time.Time `json:"start_time"`
EndTime *time.Time `json:"end_time,omitempty"`
Status string `json:"status"`
DataPoints []IncidentDataPoint `json:"data_points"`
Summary *IncidentSummary `json:"summary,omitempty"`
}
// IncidentDataPoint represents a single data point in an incident window
type IncidentDataPoint struct {
Timestamp time.Time `json:"timestamp"`
Metrics map[string]float64 `json:"metrics"`
}
// IncidentSummary provides computed statistics about an incident window
type IncidentSummary struct {
Duration time.Duration `json:"duration_ms"`
DataPoints int `json:"data_points"`
Peaks map[string]float64 `json:"peaks"`
Lows map[string]float64 `json:"lows"`
Averages map[string]float64 `json:"averages"`
Changes map[string]float64 `json:"changes"`
}
// EventCorrelatorProvider provides access to correlated events
type EventCorrelatorProvider interface {
GetCorrelationsForResource(resourceID string, window time.Duration) []EventCorrelation
}
// EventCorrelation represents a correlated event
type EventCorrelation struct {
EventType string `json:"event_type"`
Timestamp time.Time `json:"timestamp"`
ResourceID string `json:"resource_id"`
ResourceName string `json:"resource_name,omitempty"`
Description string `json:"description"`
Severity string `json:"severity,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
// TopologyProvider provides access to resource relationships
type TopologyProvider interface {
GetRelatedResources(resourceID string, depth int) []RelatedResource
}
// RelatedResource represents a resource related to another resource
type RelatedResource struct {
ResourceID string `json:"resource_id"`
ResourceName string `json:"resource_name"`
ResourceType string `json:"resource_type"`
Relationship string `json:"relationship"`
}
// KnowledgeStoreProvider provides access to stored knowledge/notes
type KnowledgeStoreProvider interface {
SaveNote(resourceID, note, category string) error
GetKnowledge(resourceID string, category string) []KnowledgeEntry
}
// KnowledgeEntry represents a stored note about a resource
type KnowledgeEntry struct {
ID string `json:"id"`
ResourceID string `json:"resource_id"`
Note string `json:"note"`
Category string `json:"category,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at,omitempty"`
}
// registerKnowledgeTools registers the pulse_knowledge tool
func (e *PulseToolExecutor) registerKnowledgeTools() {
e.registry.Register(RegisteredTool{
Definition: Tool{
@@ -76,7 +159,6 @@ Examples:
}
// executeKnowledge routes to the appropriate knowledge handler based on action
// Handler functions are implemented in tools_intelligence.go
func (e *PulseToolExecutor) executeKnowledge(ctx context.Context, args map[string]interface{}) (CallToolResult, error) {
action, _ := args["action"].(string)
switch action {
@@ -94,3 +176,182 @@ func (e *PulseToolExecutor) executeKnowledge(ctx context.Context, args map[strin
return NewErrorResult(fmt.Errorf("unknown action: %s. Use: remember, recall, incidents, correlate, relationships", action)), nil
}
}
// Tool handler implementations
func (e *PulseToolExecutor) executeGetIncidentWindow(_ context.Context, args map[string]interface{}) (CallToolResult, error) {
resourceID, _ := args["resource_id"].(string)
windowID, _ := args["window_id"].(string)
limit := intArg(args, "limit", 5)
if resourceID == "" {
return NewErrorResult(fmt.Errorf("resource_id is required")), nil
}
if e.incidentRecorderProvider == nil {
return NewTextResult("Incident recording data not available. The incident recorder may not be enabled."), nil
}
// If a specific window ID is requested
if windowID != "" {
window := e.incidentRecorderProvider.GetWindow(windowID)
if window == nil {
return NewTextResult(fmt.Sprintf("Incident window '%s' not found.", windowID)), nil
}
return NewJSONResult(map[string]interface{}{
"window": window,
}), nil
}
// Get windows for the resource
windows := e.incidentRecorderProvider.GetWindowsForResource(resourceID, limit)
if len(windows) == 0 {
return NewTextResult(fmt.Sprintf("No incident recording data found for resource '%s'. Incident data is captured when alerts fire.", resourceID)), nil
}
return NewJSONResult(map[string]interface{}{
"resource_id": resourceID,
"windows": windows,
"count": len(windows),
}), nil
}
func (e *PulseToolExecutor) executeCorrelateEvents(_ context.Context, args map[string]interface{}) (CallToolResult, error) {
resourceID, _ := args["resource_id"].(string)
timestampStr, _ := args["timestamp"].(string)
windowMinutes := intArg(args, "window_minutes", 15)
if resourceID == "" {
return NewErrorResult(fmt.Errorf("resource_id is required")), nil
}
if e.eventCorrelatorProvider == nil {
return NewTextResult("Event correlation not available. The event correlator may not be enabled."), nil
}
// Parse timestamp or use now
var timestamp time.Time
if timestampStr != "" {
var err error
timestamp, err = time.Parse(time.RFC3339, timestampStr)
if err != nil {
return NewErrorResult(fmt.Errorf("invalid timestamp format: %w", err)), nil
}
} else {
timestamp = time.Now()
}
window := time.Duration(windowMinutes) * time.Minute
correlations := e.eventCorrelatorProvider.GetCorrelationsForResource(resourceID, window)
if len(correlations) == 0 {
return NewTextResult(fmt.Sprintf("No correlated events found for resource '%s' within %d minutes of %s.",
resourceID, windowMinutes, timestamp.Format(time.RFC3339))), nil
}
return NewJSONResult(map[string]interface{}{
"resource_id": resourceID,
"timestamp": timestamp.Format(time.RFC3339),
"window_minutes": windowMinutes,
"events": correlations,
"count": len(correlations),
}), nil
}
func (e *PulseToolExecutor) executeGetRelationshipGraph(_ context.Context, args map[string]interface{}) (CallToolResult, error) {
resourceID, _ := args["resource_id"].(string)
depth := intArg(args, "depth", 1)
if resourceID == "" {
return NewErrorResult(fmt.Errorf("resource_id is required")), nil
}
// Cap depth to prevent excessive traversal
if depth < 1 {
depth = 1
}
if depth > 3 {
depth = 3
}
if e.topologyProvider == nil {
return NewTextResult("Topology information not available."), nil
}
related := e.topologyProvider.GetRelatedResources(resourceID, depth)
if len(related) == 0 {
return NewTextResult(fmt.Sprintf("No relationships found for resource '%s'.", resourceID)), nil
}
return NewJSONResult(map[string]interface{}{
"resource_id": resourceID,
"depth": depth,
"related_resources": related,
"count": len(related),
}), nil
}
func (e *PulseToolExecutor) executeRemember(_ context.Context, args map[string]interface{}) (CallToolResult, error) {
resourceID, _ := args["resource_id"].(string)
note, _ := args["note"].(string)
category, _ := args["category"].(string)
if resourceID == "" {
return NewErrorResult(fmt.Errorf("resource_id is required")), nil
}
if note == "" {
return NewErrorResult(fmt.Errorf("note is required")), nil
}
if e.knowledgeStoreProvider == nil {
return NewTextResult("Knowledge storage not available."), nil
}
if err := e.knowledgeStoreProvider.SaveNote(resourceID, note, category); err != nil {
return NewErrorResult(fmt.Errorf("failed to save note: %w", err)), nil
}
response := map[string]interface{}{
"success": true,
"resource_id": resourceID,
"note": note,
"message": "Note saved successfully",
}
if category != "" {
response["category"] = category
}
return NewJSONResult(response), nil
}
func (e *PulseToolExecutor) executeRecall(_ context.Context, args map[string]interface{}) (CallToolResult, error) {
resourceID, _ := args["resource_id"].(string)
category, _ := args["category"].(string)
if resourceID == "" {
return NewErrorResult(fmt.Errorf("resource_id is required")), nil
}
if e.knowledgeStoreProvider == nil {
return NewTextResult("Knowledge storage not available."), nil
}
entries := e.knowledgeStoreProvider.GetKnowledge(resourceID, category)
if len(entries) == 0 {
if category != "" {
return NewTextResult(fmt.Sprintf("No notes found for resource '%s' in category '%s'.", resourceID, category)), nil
}
return NewTextResult(fmt.Sprintf("No notes found for resource '%s'.", resourceID)), nil
}
response := map[string]interface{}{
"resource_id": resourceID,
"notes": entries,
"count": len(entries),
}
if category != "" {
response["category"] = category
}
return NewJSONResult(response), nil
}

View File

@@ -10,7 +10,7 @@ import (
"github.com/rcourtman/pulse-go-rewrite/internal/models"
)
// registerKubernetesTools registers the consolidated pulse_kubernetes tool
// registerKubernetesTools registers the pulse_kubernetes tool
func (e *PulseToolExecutor) registerKubernetesTools() {
e.registry.Register(RegisteredTool{
Definition: Tool{

View File

@@ -2,10 +2,14 @@ package tools
import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"
"time"
)
// registerMetricsTools registers the consolidated pulse_metrics tool
// registerMetricsTools registers the pulse_metrics tool
func (e *PulseToolExecutor) registerMetricsTools() {
e.registry.Register(RegisteredTool{
Definition: Tool{
@@ -86,7 +90,6 @@ Examples:
}
// executeMetrics routes to the appropriate metrics handler based on type
// All handler functions are implemented in tools_patrol.go and tools_infrastructure.go
func (e *PulseToolExecutor) executeMetrics(ctx context.Context, args map[string]interface{}) (CallToolResult, error) {
metricType, _ := args["type"].(string)
switch metricType {
@@ -108,3 +111,532 @@ func (e *PulseToolExecutor) executeMetrics(ctx context.Context, args map[string]
return NewErrorResult(fmt.Errorf("unknown type: %s. Use: performance, temperatures, network, diskio, disks, baselines, patterns", metricType)), nil
}
}
func (e *PulseToolExecutor) executeGetMetrics(_ context.Context, args map[string]interface{}) (CallToolResult, error) {
period, _ := args["period"].(string)
resourceID, _ := args["resource_id"].(string)
resourceType, _ := args["resource_type"].(string)
limit := intArg(args, "limit", 100)
offset := intArg(args, "offset", 0)
if limit <= 0 {
limit = 100
}
if offset < 0 {
offset = 0
}
resourceType = strings.ToLower(strings.TrimSpace(resourceType))
if resourceType != "" {
validTypes := map[string]bool{"vm": true, "container": true, "node": true}
if !validTypes[resourceType] {
return NewErrorResult(fmt.Errorf("invalid resource_type: %s. Use vm, container, or node", resourceType)), nil
}
}
if e.metricsHistory == nil {
return NewTextResult("Metrics history not available. The system may still be collecting data."), nil
}
var duration time.Duration
switch period {
case "24h":
duration = 24 * time.Hour
case "7d":
duration = 7 * 24 * time.Hour
default:
duration = 24 * time.Hour
period = "24h"
}
response := MetricsResponse{
Period: period,
}
if resourceID != "" {
response.ResourceID = resourceID
metrics, err := e.metricsHistory.GetResourceMetrics(resourceID, duration)
if err != nil {
return NewErrorResult(err), nil
}
response.Points = metrics
return NewJSONResult(response), nil
}
summary, err := e.metricsHistory.GetAllMetricsSummary(duration)
if err != nil {
return NewErrorResult(err), nil
}
keys := make([]string, 0, len(summary))
for id, metric := range summary {
if resourceType != "" && strings.ToLower(metric.ResourceType) != resourceType {
continue
}
keys = append(keys, id)
}
sort.Strings(keys)
filtered := make(map[string]ResourceMetricsSummary)
total := 0
for _, id := range keys {
if total < offset {
total++
continue
}
if len(filtered) >= limit {
total++
continue
}
filtered[id] = summary[id]
total++
}
if filtered == nil {
filtered = map[string]ResourceMetricsSummary{}
}
response.Summary = filtered
if offset > 0 || total > limit {
response.Pagination = &PaginationInfo{
Total: total,
Limit: limit,
Offset: offset,
}
}
return NewJSONResult(response), nil
}
func (e *PulseToolExecutor) executeGetBaselines(_ context.Context, args map[string]interface{}) (CallToolResult, error) {
resourceID, _ := args["resource_id"].(string)
resourceType, _ := args["resource_type"].(string)
limit := intArg(args, "limit", 100)
offset := intArg(args, "offset", 0)
if limit <= 0 {
limit = 100
}
if offset < 0 {
offset = 0
}
resourceType = strings.ToLower(strings.TrimSpace(resourceType))
if resourceType != "" {
validTypes := map[string]bool{"vm": true, "container": true, "node": true}
if !validTypes[resourceType] {
return NewErrorResult(fmt.Errorf("invalid resource_type: %s. Use vm, container, or node", resourceType)), nil
}
}
if e.baselineProvider == nil {
return NewTextResult("Baseline data not available. The system needs time to learn normal behavior patterns."), nil
}
response := BaselinesResponse{
Baselines: make(map[string]map[string]*MetricBaseline),
}
if resourceID != "" {
response.ResourceID = resourceID
cpuBaseline := e.baselineProvider.GetBaseline(resourceID, "cpu")
memBaseline := e.baselineProvider.GetBaseline(resourceID, "memory")
if cpuBaseline != nil || memBaseline != nil {
response.Baselines[resourceID] = make(map[string]*MetricBaseline)
if cpuBaseline != nil {
response.Baselines[resourceID]["cpu"] = cpuBaseline
}
if memBaseline != nil {
response.Baselines[resourceID]["memory"] = memBaseline
}
}
return NewJSONResult(response), nil
}
baselines := e.baselineProvider.GetAllBaselines()
keys := make([]string, 0, len(baselines))
var typeIndex map[string]string
if resourceType != "" {
if e.stateProvider == nil {
return NewErrorResult(fmt.Errorf("state provider not available")), nil
}
state := e.stateProvider.GetState()
typeIndex = make(map[string]string)
for _, vm := range state.VMs {
typeIndex[fmt.Sprintf("%d", vm.VMID)] = "vm"
}
for _, ct := range state.Containers {
typeIndex[fmt.Sprintf("%d", ct.VMID)] = "container"
}
for _, node := range state.Nodes {
typeIndex[node.ID] = "node"
}
}
for id := range baselines {
if resourceType != "" {
if t, ok := typeIndex[id]; !ok || t != resourceType {
continue
}
}
keys = append(keys, id)
}
sort.Strings(keys)
filtered := make(map[string]map[string]*MetricBaseline)
total := 0
for _, id := range keys {
if total < offset {
total++
continue
}
if len(filtered) >= limit {
total++
continue
}
filtered[id] = baselines[id]
total++
}
if filtered == nil {
filtered = map[string]map[string]*MetricBaseline{}
}
response.Baselines = filtered
if offset > 0 || total > limit {
response.Pagination = &PaginationInfo{
Total: total,
Limit: limit,
Offset: offset,
}
}
return NewJSONResult(response), nil
}
func (e *PulseToolExecutor) executeGetPatterns(_ context.Context, _ map[string]interface{}) (CallToolResult, error) {
if e.patternProvider == nil {
return NewTextResult("Pattern detection not available. The system needs more historical data."), nil
}
response := PatternsResponse{
Patterns: e.patternProvider.GetPatterns(),
Predictions: e.patternProvider.GetPredictions(),
}
// Ensure non-nil slices for clean JSON
if response.Patterns == nil {
response.Patterns = []Pattern{}
}
if response.Predictions == nil {
response.Predictions = []Prediction{}
}
return NewJSONResult(response), nil
}
// ========== Temperature, Network, DiskIO, Physical Disks ==========
func (e *PulseToolExecutor) executeGetTemperatures(_ context.Context, args map[string]interface{}) (CallToolResult, error) {
hostFilter, _ := args["host"].(string)
state := e.stateProvider.GetState()
type HostTemps struct {
Hostname string `json:"hostname"`
Platform string `json:"platform,omitempty"`
CPU map[string]float64 `json:"cpu_temps,omitempty"`
Disks map[string]float64 `json:"disk_temps,omitempty"`
Fans map[string]float64 `json:"fan_rpm,omitempty"`
Other map[string]float64 `json:"other_temps,omitempty"`
LastUpdated string `json:"last_updated,omitempty"`
}
var results []HostTemps
for _, host := range state.Hosts {
if hostFilter != "" && host.Hostname != hostFilter {
continue
}
if len(host.Sensors.TemperatureCelsius) == 0 && len(host.Sensors.FanRPM) == 0 {
continue
}
temps := HostTemps{
Hostname: host.Hostname,
Platform: host.Platform,
CPU: make(map[string]float64),
Disks: make(map[string]float64),
Fans: make(map[string]float64),
Other: make(map[string]float64),
}
// Categorize temperatures
for name, value := range host.Sensors.TemperatureCelsius {
switch {
case containsAny(name, "cpu", "core", "package"):
temps.CPU[name] = value
case containsAny(name, "nvme", "ssd", "hdd", "disk"):
temps.Disks[name] = value
default:
temps.Other[name] = value
}
}
// Add fan data
for name, value := range host.Sensors.FanRPM {
temps.Fans[name] = value
}
// Add additional sensors to Other
for name, value := range host.Sensors.Additional {
if _, exists := temps.CPU[name]; !exists {
if _, exists := temps.Disks[name]; !exists {
temps.Other[name] = value
}
}
}
results = append(results, temps)
}
if len(results) == 0 {
if hostFilter != "" {
return NewTextResult(fmt.Sprintf("No temperature data available for host '%s'. The host may not have a Pulse agent installed or sensors may not be available.", hostFilter)), nil
}
return NewTextResult("No temperature data available. Ensure Pulse unified agents are installed on hosts and lm-sensors is available."), nil
}
output, _ := json.MarshalIndent(results, "", " ")
return NewTextResult(string(output)), nil
}
func (e *PulseToolExecutor) executeGetNetworkStats(_ context.Context, args map[string]interface{}) (CallToolResult, error) {
if e.stateProvider == nil {
return NewTextResult("State provider not available."), nil
}
hostFilter, _ := args["host"].(string)
state := e.stateProvider.GetState()
var hosts []HostNetworkStatsSummary
for _, host := range state.Hosts {
if hostFilter != "" && host.Hostname != hostFilter {
continue
}
if len(host.NetworkInterfaces) == 0 {
continue
}
var interfaces []NetworkInterfaceSummary
for _, iface := range host.NetworkInterfaces {
interfaces = append(interfaces, NetworkInterfaceSummary{
Name: iface.Name,
MAC: iface.MAC,
Addresses: iface.Addresses,
RXBytes: iface.RXBytes,
TXBytes: iface.TXBytes,
SpeedMbps: iface.SpeedMbps,
})
}
hosts = append(hosts, HostNetworkStatsSummary{
Hostname: host.Hostname,
Interfaces: interfaces,
})
}
// Also check Docker hosts for network stats
for _, dockerHost := range state.DockerHosts {
if hostFilter != "" && dockerHost.Hostname != hostFilter {
continue
}
if len(dockerHost.NetworkInterfaces) == 0 {
continue
}
// Check if we already have this host
found := false
for _, h := range hosts {
if h.Hostname == dockerHost.Hostname {
found = true
break
}
}
if found {
continue
}
var interfaces []NetworkInterfaceSummary
for _, iface := range dockerHost.NetworkInterfaces {
interfaces = append(interfaces, NetworkInterfaceSummary{
Name: iface.Name,
MAC: iface.MAC,
Addresses: iface.Addresses,
RXBytes: iface.RXBytes,
TXBytes: iface.TXBytes,
SpeedMbps: iface.SpeedMbps,
})
}
hosts = append(hosts, HostNetworkStatsSummary{
Hostname: dockerHost.Hostname,
Interfaces: interfaces,
})
}
if len(hosts) == 0 {
if hostFilter != "" {
return NewTextResult(fmt.Sprintf("No network statistics available for host '%s'.", hostFilter)), nil
}
return NewTextResult("No network statistics available. Ensure Pulse agents are reporting network data."), nil
}
response := NetworkStatsResponse{
Hosts: hosts,
Total: len(hosts),
}
return NewJSONResult(response), nil
}
func (e *PulseToolExecutor) executeGetDiskIOStats(_ context.Context, args map[string]interface{}) (CallToolResult, error) {
if e.stateProvider == nil {
return NewTextResult("State provider not available."), nil
}
hostFilter, _ := args["host"].(string)
state := e.stateProvider.GetState()
var hosts []HostDiskIOStatsSummary
for _, host := range state.Hosts {
if hostFilter != "" && host.Hostname != hostFilter {
continue
}
if len(host.DiskIO) == 0 {
continue
}
var devices []DiskIODeviceSummary
for _, dio := range host.DiskIO {
devices = append(devices, DiskIODeviceSummary{
Device: dio.Device,
ReadBytes: dio.ReadBytes,
WriteBytes: dio.WriteBytes,
ReadOps: dio.ReadOps,
WriteOps: dio.WriteOps,
IOTimeMs: dio.IOTime,
})
}
hosts = append(hosts, HostDiskIOStatsSummary{
Hostname: host.Hostname,
Devices: devices,
})
}
if len(hosts) == 0 {
if hostFilter != "" {
return NewTextResult(fmt.Sprintf("No disk I/O statistics available for host '%s'.", hostFilter)), nil
}
return NewTextResult("No disk I/O statistics available. Ensure Pulse agents are reporting disk I/O data."), nil
}
response := DiskIOStatsResponse{
Hosts: hosts,
Total: len(hosts),
}
return NewJSONResult(response), nil
}
func (e *PulseToolExecutor) executeListPhysicalDisks(_ context.Context, args map[string]interface{}) (CallToolResult, error) {
if e.stateProvider == nil {
return NewTextResult("State provider not available."), nil
}
instanceFilter, _ := args["instance"].(string)
nodeFilter, _ := args["node"].(string)
healthFilter, _ := args["health"].(string)
typeFilter, _ := args["type"].(string)
limit := intArg(args, "limit", 100)
state := e.stateProvider.GetState()
if len(state.PhysicalDisks) == 0 {
return NewTextResult("No physical disk data available. Physical disk information is collected from Proxmox nodes."), nil
}
var disks []PhysicalDiskSummary
totalCount := 0
for _, disk := range state.PhysicalDisks {
// Apply filters
if instanceFilter != "" && disk.Instance != instanceFilter {
continue
}
if nodeFilter != "" && disk.Node != nodeFilter {
continue
}
if healthFilter != "" && !strings.EqualFold(disk.Health, healthFilter) {
continue
}
if typeFilter != "" && !strings.EqualFold(disk.Type, typeFilter) {
continue
}
totalCount++
if len(disks) >= limit {
continue
}
summary := PhysicalDiskSummary{
ID: disk.ID,
Node: disk.Node,
Instance: disk.Instance,
DevPath: disk.DevPath,
Model: disk.Model,
Serial: disk.Serial,
WWN: disk.WWN,
Type: disk.Type,
SizeBytes: disk.Size,
Health: disk.Health,
Used: disk.Used,
LastChecked: disk.LastChecked,
}
// Only include optional fields if they have meaningful values
if disk.Wearout >= 0 {
wearout := disk.Wearout
summary.Wearout = &wearout
}
if disk.Temperature > 0 {
temp := disk.Temperature
summary.Temperature = &temp
}
if disk.RPM > 0 {
rpm := disk.RPM
summary.RPM = &rpm
}
disks = append(disks, summary)
}
if disks == nil {
disks = []PhysicalDiskSummary{}
}
response := PhysicalDisksResponse{
Disks: disks,
Total: len(state.PhysicalDisks),
Filtered: totalCount,
}
return NewJSONResult(response), nil
}

View File

@@ -5,8 +5,8 @@ import (
"fmt"
)
// registerPMGToolsConsolidated registers the consolidated pulse_pmg tool
func (e *PulseToolExecutor) registerPMGToolsConsolidated() {
// registerPMGTools registers the pulse_pmg tool
func (e *PulseToolExecutor) registerPMGTools() {
e.registry.Register(RegisteredTool{
Definition: Tool{
Name: "pulse_pmg",
@@ -62,3 +62,231 @@ func (e *PulseToolExecutor) executePMG(ctx context.Context, args map[string]inte
return NewErrorResult(fmt.Errorf("unknown type: %s. Use: status, mail_stats, queues, spam", pmgType)), nil
}
}
func (e *PulseToolExecutor) executeGetPMGStatus(_ context.Context, args map[string]interface{}) (CallToolResult, error) {
if e.stateProvider == nil {
return NewTextResult("State provider not available."), nil
}
instanceFilter, _ := args["instance"].(string)
state := e.stateProvider.GetState()
if len(state.PMGInstances) == 0 {
return NewTextResult("No Proxmox Mail Gateway instances found. PMG monitoring may not be configured."), nil
}
var instances []PMGInstanceSummary
for _, pmg := range state.PMGInstances {
if instanceFilter != "" && pmg.ID != instanceFilter && pmg.Name != instanceFilter {
continue
}
var nodes []PMGNodeSummary
for _, node := range pmg.Nodes {
nodes = append(nodes, PMGNodeSummary{
Name: node.Name,
Status: node.Status,
Role: node.Role,
Uptime: node.Uptime,
LoadAvg: node.LoadAvg,
})
}
instances = append(instances, PMGInstanceSummary{
ID: pmg.ID,
Name: pmg.Name,
Host: pmg.Host,
Status: pmg.Status,
Version: pmg.Version,
Nodes: nodes,
})
}
if len(instances) == 0 && instanceFilter != "" {
return NewTextResult(fmt.Sprintf("PMG instance '%s' not found.", instanceFilter)), nil
}
// Ensure non-nil slices
for i := range instances {
if instances[i].Nodes == nil {
instances[i].Nodes = []PMGNodeSummary{}
}
}
response := PMGStatusResponse{
Instances: instances,
Total: len(instances),
}
return NewJSONResult(response), nil
}
func (e *PulseToolExecutor) executeGetMailStats(_ context.Context, args map[string]interface{}) (CallToolResult, error) {
if e.stateProvider == nil {
return NewTextResult("State provider not available."), nil
}
instanceFilter, _ := args["instance"].(string)
state := e.stateProvider.GetState()
if len(state.PMGInstances) == 0 {
return NewTextResult("No Proxmox Mail Gateway instances found. PMG monitoring may not be configured."), nil
}
// If filtering, find that specific instance
for _, pmg := range state.PMGInstances {
if instanceFilter != "" && pmg.ID != instanceFilter && pmg.Name != instanceFilter {
continue
}
if pmg.MailStats == nil {
if instanceFilter != "" {
return NewTextResult(fmt.Sprintf("No mail statistics available for PMG instance '%s'.", instanceFilter)), nil
}
continue
}
response := MailStatsResponse{
Instance: pmg.Name,
Stats: PMGMailStatsSummary{
Timeframe: pmg.MailStats.Timeframe,
TotalIn: pmg.MailStats.CountIn,
TotalOut: pmg.MailStats.CountOut,
SpamIn: pmg.MailStats.SpamIn,
SpamOut: pmg.MailStats.SpamOut,
VirusIn: pmg.MailStats.VirusIn,
VirusOut: pmg.MailStats.VirusOut,
BouncesIn: pmg.MailStats.BouncesIn,
BouncesOut: pmg.MailStats.BouncesOut,
BytesIn: pmg.MailStats.BytesIn,
BytesOut: pmg.MailStats.BytesOut,
GreylistCount: pmg.MailStats.GreylistCount,
RBLRejects: pmg.MailStats.RBLRejects,
AverageProcessTimeMs: pmg.MailStats.AverageProcessTimeMs,
},
}
return NewJSONResult(response), nil
}
if instanceFilter != "" {
return NewTextResult(fmt.Sprintf("PMG instance '%s' not found.", instanceFilter)), nil
}
return NewTextResult("No mail statistics available from any PMG instance."), nil
}
func (e *PulseToolExecutor) executeGetMailQueues(_ context.Context, args map[string]interface{}) (CallToolResult, error) {
if e.stateProvider == nil {
return NewTextResult("State provider not available."), nil
}
instanceFilter, _ := args["instance"].(string)
state := e.stateProvider.GetState()
if len(state.PMGInstances) == 0 {
return NewTextResult("No Proxmox Mail Gateway instances found. PMG monitoring may not be configured."), nil
}
// Collect queue data from all instances (or filtered instance)
for _, pmg := range state.PMGInstances {
if instanceFilter != "" && pmg.ID != instanceFilter && pmg.Name != instanceFilter {
continue
}
var queues []PMGQueueSummary
for _, node := range pmg.Nodes {
if node.QueueStatus != nil {
queues = append(queues, PMGQueueSummary{
Node: node.Name,
Active: node.QueueStatus.Active,
Deferred: node.QueueStatus.Deferred,
Hold: node.QueueStatus.Hold,
Incoming: node.QueueStatus.Incoming,
Total: node.QueueStatus.Total,
OldestAgeSeconds: node.QueueStatus.OldestAge,
})
}
}
if len(queues) == 0 {
if instanceFilter != "" {
return NewTextResult(fmt.Sprintf("No queue data available for PMG instance '%s'.", instanceFilter)), nil
}
continue
}
response := MailQueuesResponse{
Instance: pmg.Name,
Queues: queues,
}
return NewJSONResult(response), nil
}
if instanceFilter != "" {
return NewTextResult(fmt.Sprintf("PMG instance '%s' not found.", instanceFilter)), nil
}
return NewTextResult("No mail queue data available from any PMG instance."), nil
}
func (e *PulseToolExecutor) executeGetSpamStats(_ context.Context, args map[string]interface{}) (CallToolResult, error) {
if e.stateProvider == nil {
return NewTextResult("State provider not available."), nil
}
instanceFilter, _ := args["instance"].(string)
state := e.stateProvider.GetState()
if len(state.PMGInstances) == 0 {
return NewTextResult("No Proxmox Mail Gateway instances found. PMG monitoring may not be configured."), nil
}
for _, pmg := range state.PMGInstances {
if instanceFilter != "" && pmg.ID != instanceFilter && pmg.Name != instanceFilter {
continue
}
quarantine := PMGQuarantineSummary{}
if pmg.Quarantine != nil {
quarantine = PMGQuarantineSummary{
Spam: pmg.Quarantine.Spam,
Virus: pmg.Quarantine.Virus,
Attachment: pmg.Quarantine.Attachment,
Blacklisted: pmg.Quarantine.Blacklisted,
Total: pmg.Quarantine.Spam + pmg.Quarantine.Virus + pmg.Quarantine.Attachment + pmg.Quarantine.Blacklisted,
}
}
var distribution []PMGSpamBucketSummary
for _, bucket := range pmg.SpamDistribution {
distribution = append(distribution, PMGSpamBucketSummary{
Score: bucket.Score,
Count: bucket.Count,
})
}
response := SpamStatsResponse{
Instance: pmg.Name,
Quarantine: quarantine,
Distribution: distribution,
}
if response.Distribution == nil {
response.Distribution = []PMGSpamBucketSummary{}
}
return NewJSONResult(response), nil
}
if instanceFilter != "" {
return NewTextResult(fmt.Sprintf("PMG instance '%s' not found.", instanceFilter)), nil
}
return NewTextResult("No spam statistics available from any PMG instance."), nil
}

View File

@@ -1017,6 +1017,8 @@ func isReadOnlyByConstruction(cmdLower string) bool {
"wget -q", "wget --spider",
"docker ps", "docker logs", "docker inspect", "docker stats", "docker images", "docker info",
"systemctl status", "systemctl is-active", "systemctl is-enabled", "systemctl list", "systemctl show",
"pct list", "pct status",
"qm list", "qm status",
"ip addr", "ip route", "ip link",
// Kubectl read-only commands
"kubectl get", "kubectl describe", "kubectl logs", "kubectl top", "kubectl cluster-info",
@@ -1096,13 +1098,15 @@ func matchesWritePatterns(cmdLower string) string {
"echo ": "output (may redirect)", "printf ": "output (may redirect)",
"wget -O": "file download", "wget --output": "file download",
"tar -x": "archive extraction", "tar x": "archive extraction", "unzip ": "archive extraction", "gunzip ": "archive extraction",
"ln ": "link creation", "link ": "link creation",
}
for pattern, reason := range mediumRiskPatterns {
if strings.Contains(cmdLower, pattern) {
return reason
}
}
if containsCommandToken(cmdLower, "ln") || containsCommandToken(cmdLower, "link") {
return "link creation"
}
// Curl with mutation verbs
if strings.Contains(cmdLower, "curl") {
@@ -1117,6 +1121,21 @@ func matchesWritePatterns(cmdLower string) string {
return ""
}
func containsCommandToken(cmdLower, token string) bool {
for _, field := range strings.Fields(cmdLower) {
if field == token {
return true
}
if strings.HasPrefix(field, "/") || strings.HasPrefix(field, "./") ||
strings.HasPrefix(field, "../") || strings.HasPrefix(field, "~/") {
if strings.HasSuffix(field, "/"+token) {
return true
}
}
}
return false
}
// hasShellChainingOutsideQuotes checks if a command contains shell chaining operators
// (;, &&, ||) outside of quoted strings. This allows SQL statements like "SELECT 1;"
// while still catching shell command chaining like "ls; rm -rf /".
@@ -1801,7 +1820,7 @@ func logRoutingMismatchDebug(targetHost string, childKinds, childIDs []string) {
Msg("[RoutingValidation] Blocked operation targeting parent node when child recently referenced")
}
// registerQueryTools registers the consolidated pulse_query tool
// registerQueryTools registers the pulse_query tool
func (e *PulseToolExecutor) registerQueryTools() {
e.registry.Register(RegisteredTool{
Definition: Tool{
@@ -3143,6 +3162,43 @@ func (e *PulseToolExecutor) executeSearchResources(_ context.Context, args map[s
return NewJSONResult(response), nil
}
func (e *PulseToolExecutor) executeGetConnectionHealth(_ context.Context, _ map[string]interface{}) (CallToolResult, error) {
if e.stateProvider == nil {
return NewTextResult("State provider not available."), nil
}
state := e.stateProvider.GetState()
if len(state.ConnectionHealth) == 0 {
return NewTextResult("No connection health data available."), nil
}
var connections []ConnectionStatus
connected := 0
disconnected := 0
for instanceID, isConnected := range state.ConnectionHealth {
connections = append(connections, ConnectionStatus{
InstanceID: instanceID,
Connected: isConnected,
})
if isConnected {
connected++
} else {
disconnected++
}
}
response := ConnectionHealthResponse{
Connections: connections,
Total: len(connections),
Connected: connected,
Disconnected: disconnected,
}
return NewJSONResult(response), nil
}
// Helper to get int args with default
func intArg(args map[string]interface{}, key string, defaultVal int) int {
if v, ok := args[key]; ok {

File diff suppressed because it is too large Load Diff