Add ExecutionIntent classification and NonInteractiveOnly enforcement

Implement safety layers for command execution:

ExecutionIntent classifies commands as:
- ObservationOnly: Pure read (status, logs, metrics)
- SideEffects: May change state (restart, write, delete)

NonInteractiveOnly enforces safe command forms:
- Blocks interactive commands (vim, top without -b, etc)
- Blocks unbounded streaming (tail -f without limit)
- Suggests safe alternatives in error messages

Add phantom execution detection:
- Catches when model claims actions without using tools
- Skips check when tools actually succeeded (fixes false positives)

Includes comprehensive tests for:
- Intent classification accuracy
- Interactive command blocking
- Strict resolution validation
This commit is contained in:
rcourtman
2026-01-28 16:49:00 +00:00
parent 6e739cea5c
commit b2e0ae3fdb
4 changed files with 3113 additions and 24 deletions

View File

@@ -17,17 +17,24 @@ import (
// AgenticLoop handles the tool-calling loop with streaming
type AgenticLoop struct {
provider providers.StreamingProvider
executor *tools.PulseToolExecutor
tools []providers.Tool
systemPrompt string
maxTurns int
provider providers.StreamingProvider
executor *tools.PulseToolExecutor
tools []providers.Tool
baseSystemPrompt string // Base prompt without mode context
maxTurns int
// Provider info for telemetry (e.g., "anthropic", "claude-3-sonnet")
providerName string
modelName string
// State for ongoing executions
mu sync.Mutex
aborted map[string]bool // sessionID -> aborted
pendingQs map[string]chan []QuestionAnswer // questionID -> answer channel
autonomousMode bool // When true, don't wait for approvals (for investigations)
// Per-session FSMs for workflow enforcement (set before each execution)
sessionFSM *SessionFSM
}
// NewAgenticLoop creates a new agentic loop
@@ -37,13 +44,13 @@ func NewAgenticLoop(provider providers.StreamingProvider, executor *tools.PulseT
providerTools := ConvertMCPToolsToProvider(mcpTools)
return &AgenticLoop{
provider: provider,
executor: executor,
tools: providerTools,
systemPrompt: systemPrompt,
maxTurns: MaxAgenticTurns,
aborted: make(map[string]bool),
pendingQs: make(map[string]chan []QuestionAnswer),
provider: provider,
executor: executor,
tools: providerTools,
baseSystemPrompt: systemPrompt,
maxTurns: MaxAgenticTurns,
aborted: make(map[string]bool),
pendingQs: make(map[string]chan []QuestionAnswer),
}
}
@@ -84,6 +91,7 @@ func (a *AgenticLoop) executeWithTools(ctx context.Context, sessionID string, me
var resultMessages []Message
turn := 0
toolsSucceededThisEpisode := false // Track if any tool executed successfully this episode
for turn < a.maxTurns {
// Check if aborted
@@ -92,8 +100,15 @@ func (a *AgenticLoop) executeWithTools(ctx context.Context, sessionID string, me
a.mu.Unlock()
return resultMessages, fmt.Errorf("session aborted")
}
providerName := a.providerName
modelName := a.modelName
a.mu.Unlock()
// Record telemetry for loop iteration
if metrics := GetAIMetrics(); metrics != nil {
metrics.RecordAgenticIteration(providerName, modelName)
}
// Check context
select {
case <-ctx.Done():
@@ -108,13 +123,38 @@ func (a *AgenticLoop) executeWithTools(ctx context.Context, sessionID string, me
Str("session_id", sessionID).
Msg("[AgenticLoop] Starting turn")
// Build the request
// Build the request with dynamic system prompt (includes current mode)
systemPrompt := a.getSystemPrompt()
req := providers.ChatRequest{
Messages: providerMessages,
System: a.systemPrompt,
System: systemPrompt,
Tools: tools,
}
// Determine tool_choice based on turn and intent
// We only force tool use when:
// 1. Tools are available
// 2. It's the first turn
// 3. The user's message indicates they need live data or an action
// This prevents forcing tool calls on conceptual questions like "What is TCP?"
if len(tools) > 0 {
if turn == 0 && requiresToolUse(providerMessages) {
// First turn with action intent: force the model to use a tool
req.ToolChoice = &providers.ToolChoice{Type: providers.ToolChoiceAny}
log.Debug().
Str("session_id", sessionID).
Msg("[AgenticLoop] First turn with action intent - forcing tool use")
} else {
// Conceptual questions or subsequent turns: let the model decide
req.ToolChoice = &providers.ToolChoice{Type: providers.ToolChoiceAuto}
if turn == 0 {
log.Debug().
Str("session_id", sessionID).
Msg("[AgenticLoop] First turn appears conceptual - using auto tool choice")
}
}
}
// Collect streaming response
var contentBuilder strings.Builder
var thinkingBuilder strings.Builder
@@ -122,7 +162,7 @@ func (a *AgenticLoop) executeWithTools(ctx context.Context, sessionID string, me
log.Debug().
Str("session_id", sessionID).
Int("system_prompt_len", len(a.systemPrompt)).
Int("system_prompt_len", len(systemPrompt)).
Msg("[AgenticLoop] Calling provider.ChatStream")
err := a.provider.ChatStream(ctx, req, func(event providers.StreamEvent) {
@@ -145,11 +185,35 @@ func (a *AgenticLoop) executeWithTools(ctx context.Context, sessionID string, me
case "tool_start":
if data, ok := event.Data.(providers.ToolStartEvent); ok {
// Convert input to JSON string for frontend
// Format input for frontend display
// For control tools, show a human-readable summary instead of raw JSON to avoid "hallucination" look
inputStr := "{}"
if data.Input != nil {
if inputBytes, err := json.Marshal(data.Input); err == nil {
inputStr = string(inputBytes)
// Special handling for command execution tools to avoid showing raw JSON
if data.Name == "pulse_control" || data.Name == "pulse_run_command" || data.Name == "control" {
if cmd, ok := data.Input["command"].(string); ok {
// Just show the command being run
inputStr = fmt.Sprintf("Running: %s", cmd)
} else if action, ok := data.Input["action"].(string); ok {
// Show action (e.g. for guest control)
target := ""
if t, ok := data.Input["guest_id"].(string); ok {
target = t
} else if t, ok := data.Input["container"].(string); ok {
target = t
}
inputStr = fmt.Sprintf("%s %s", action, target)
} else {
// Fallback to JSON
if inputBytes, err := json.Marshal(data.Input); err == nil {
inputStr = string(inputBytes)
}
}
} else {
// Standard JSON for other tools
if inputBytes, err := json.Marshal(data.Input); err == nil {
inputStr = string(inputBytes)
}
}
}
jsonData, _ := json.Marshal(ToolStartData{
@@ -227,8 +291,96 @@ func (a *AgenticLoop) executeWithTools(ctx context.Context, sessionID string, me
}
providerMessages = append(providerMessages, providerAssistant)
// If no tool calls, we're done
// If no tool calls, we're done - but first check FSM and phantom execution
if len(toolCalls) == 0 {
// === FSM ENFORCEMENT GATE 2: Check if final answer is allowed ===
a.mu.Lock()
fsm := a.sessionFSM
a.mu.Unlock()
if fsm != nil {
if fsmErr := fsm.CanFinalAnswer(); fsmErr != nil {
log.Warn().
Str("session_id", sessionID).
Str("state", string(fsm.State)).
Bool("wrote_this_episode", fsm.WroteThisEpisode).
Bool("read_after_write", fsm.ReadAfterWrite).
Msg("[AgenticLoop] FSM blocked final answer - must verify write first")
// Record telemetry for FSM final answer block
if metrics := GetAIMetrics(); metrics != nil {
metrics.RecordFSMFinalBlock(fsm.State)
}
// Inject a minimal, factual constraint - not a narrative or example.
// This tells the model what is required, not how to do it.
verifyPrompt := fmt.Sprintf(
"Verification required: perform a read or status check on %s before responding.",
fsm.LastWriteTool,
)
// Update the last assistant message to include verification constraint
if len(resultMessages) > 0 {
resultMessages[len(resultMessages)-1].Content = verifyPrompt
}
// Note: verification constraint is injected into resultMessages above (for the model).
// We intentionally do NOT emit this to the user callback — it's an internal protocol
// prompt that would appear as spam in the chat output.
// Mark that we completed verification (the next read will set ReadAfterWrite)
// and continue the loop to force a verification read
turn++
continue
}
// If we're completing successfully and there was a write, mark verification complete
if fsm.State == StateVerifying && fsm.ReadAfterWrite {
fsm.CompleteVerification()
log.Debug().
Str("session_id", sessionID).
Str("new_state", string(fsm.State)).
Msg("[AgenticLoop] FSM verification complete, transitioning to READING")
}
}
// Detect phantom execution: model claims to have done something without tool calls
// This is especially important for providers that can't force tool use (e.g., Ollama)
// IMPORTANT: Skip this check if tools already succeeded this episode - the model is
// legitimately summarizing tool results, not hallucinating.
log.Debug().
Bool("toolsSucceededThisEpisode", toolsSucceededThisEpisode).
Bool("hasPhantomExecution", hasPhantomExecution(assistantMsg.Content)).
Str("content_preview", truncateForLog(assistantMsg.Content, 200)).
Msg("[AgenticLoop] Phantom detection check")
if !toolsSucceededThisEpisode && hasPhantomExecution(assistantMsg.Content) {
log.Warn().
Str("session_id", sessionID).
Str("content_preview", truncateForLog(assistantMsg.Content, 200)).
Msg("[AgenticLoop] Phantom execution detected - model claims action without tool call")
// Record telemetry for phantom detection
if metrics := GetAIMetrics(); metrics != nil {
metrics.RecordPhantomDetected(providerName, modelName)
}
// Replace the response with a safe failure message
safeResponse := "I apologize, but I wasn't able to access the infrastructure tools needed to complete that request. This can happen when:\n\n" +
"1. The tools aren't available right now\n" +
"2. There was a connection issue\n" +
"3. The model I'm running on doesn't support function calling\n\n" +
"Please try again, or let me know if you have a question I can answer without checking live infrastructure."
// Update the last result message
if len(resultMessages) > 0 {
resultMessages[len(resultMessages)-1].Content = safeResponse
}
// Send corrected content to callback
jsonData, _ := json.Marshal(ContentData{Text: "\n\n---\n" + safeResponse})
callback(StreamEvent{Type: "content", Data: jsonData})
}
log.Debug().Msg("Agentic loop complete - no tool calls")
return resultMessages, nil
}
@@ -241,6 +393,7 @@ func (a *AgenticLoop) executeWithTools(ctx context.Context, sessionID string, me
a.mu.Unlock()
return resultMessages, fmt.Errorf("session aborted")
}
fsm := a.sessionFSM
a.mu.Unlock()
log.Debug().
@@ -248,6 +401,73 @@ func (a *AgenticLoop) executeWithTools(ctx context.Context, sessionID string, me
Str("id", tc.ID).
Msg("Executing tool")
// === FSM ENFORCEMENT GATE 1: Check if tool is allowed in current state ===
toolKind := ClassifyToolCall(tc.Name, tc.Input)
if fsm != nil {
if fsmErr := fsm.CanExecuteTool(toolKind, tc.Name); fsmErr != nil {
log.Warn().
Str("tool", tc.Name).
Str("kind", toolKind.String()).
Str("state", string(fsm.State)).
Err(fsmErr).
Msg("[AgenticLoop] FSM blocked tool execution")
// Record telemetry for FSM tool block
if metrics := GetAIMetrics(); metrics != nil {
metrics.RecordFSMToolBlock(fsm.State, tc.Name, toolKind)
}
// Return the FSM error as a tool result so the model can self-correct
fsmBlockedErr, ok := fsmErr.(*FSMBlockedError)
var recoveryHint string
if ok && fsmBlockedErr.Recoverable {
recoveryHint = " Use a discovery or read tool first, then retry."
// Track pending recovery for success correlation
fsm.TrackPendingRecovery("FSM_BLOCKED", tc.Name)
// Record auto-recovery attempt (model gets a chance to self-correct)
if metrics := GetAIMetrics(); metrics != nil {
metrics.RecordAutoRecoveryAttempt("FSM_BLOCKED", tc.Name)
}
}
// Send tool_end event with error
jsonData, _ := json.Marshal(ToolEndData{
ID: tc.ID,
Name: tc.Name,
Input: "",
Output: fsmErr.Error() + recoveryHint,
Success: false,
})
callback(StreamEvent{Type: "tool_end", Data: jsonData})
// Create tool result message with the error
toolResultMsg := Message{
ID: uuid.New().String(),
Role: "user",
Timestamp: time.Now(),
ToolResult: &ToolResult{
ToolUseID: tc.ID,
Content: fsmErr.Error() + recoveryHint,
IsError: true,
},
}
resultMessages = append(resultMessages, toolResultMsg)
// Add to provider messages for next turn
providerMessages = append(providerMessages, providers.Message{
Role: "user",
ToolResult: &providers.ToolResult{
ToolUseID: tc.ID,
Content: fsmErr.Error() + recoveryHint,
IsError: true,
},
})
// Skip execution but continue the loop to process other tool calls
continue
}
}
// Execute the tool
result, err := a.executor.ExecuteTool(ctx, tc.Name, tc.Input)
@@ -260,6 +480,77 @@ func (a *AgenticLoop) executeWithTools(ctx context.Context, sessionID string, me
} else {
resultText = FormatToolResult(result)
isError = result.IsError
if !isError {
toolsSucceededThisEpisode = true // Tool executed successfully
log.Debug().
Str("tool", tc.Name).
Msg("[AgenticLoop] Tool succeeded - toolsSucceededThisEpisode set to true")
}
}
// Track pending recovery for strict resolution blocks
// (FSM blocks are tracked above; strict resolution blocks come from the executor)
if isError && fsm != nil && strings.Contains(resultText, "STRICT_RESOLUTION") {
fsm.TrackPendingRecovery("STRICT_RESOLUTION", tc.Name)
log.Debug().
Str("tool", tc.Name).
Msg("[AgenticLoop] Tracking pending recovery for strict resolution block")
}
// === AUTO-RECOVERY FOR NONINTERACTIVEONLY BLOCKS ===
// If tool blocked with auto_recoverable=true and has a suggested_rewrite,
// automatically apply the rewrite and retry once.
// Note: err == nil means executor didn't throw, isError means the tool result indicates error/block
if err == nil && isError && strings.Contains(resultText, `"auto_recoverable":true`) {
// Result is a blocked response (not a hard error)
if suggestedRewrite, recoveryAttempted := tryAutoRecovery(result, tc, a.executor, ctx); suggestedRewrite != "" && !recoveryAttempted {
// This is a fresh recoverable block - attempt auto-recovery
log.Info().
Str("tool", tc.Name).
Str("original_command", getCommandFromInput(tc.Input)).
Str("suggested_rewrite", suggestedRewrite).
Msg("[AgenticLoop] Attempting auto-recovery with suggested rewrite")
// Record auto-recovery attempt
if metrics := GetAIMetrics(); metrics != nil {
metrics.RecordAutoRecoveryAttempt("READ_ONLY_VIOLATION", tc.Name)
}
// Apply the rewrite and retry
modifiedInput := make(map[string]interface{})
for k, v := range tc.Input {
modifiedInput[k] = v
}
modifiedInput["command"] = suggestedRewrite
modifiedInput["_auto_recovery_attempt"] = true // Prevent infinite loops
retryResult, retryErr := a.executor.ExecuteTool(ctx, tc.Name, modifiedInput)
if retryErr != nil {
log.Warn().
Err(retryErr).
Str("tool", tc.Name).
Msg("[AgenticLoop] Auto-recovery retry failed with error")
} else if !retryResult.IsError {
// Recovery succeeded!
log.Info().
Str("tool", tc.Name).
Msg("[AgenticLoop] Auto-recovery succeeded")
if metrics := GetAIMetrics(); metrics != nil {
metrics.RecordAutoRecoverySuccess("READ_ONLY_VIOLATION", tc.Name)
}
// Use the successful result
result = retryResult
resultText = FormatToolResult(result)
isError = false
} else {
log.Warn().
Str("tool", tc.Name).
Str("retry_error", FormatToolResult(retryResult)).
Msg("[AgenticLoop] Auto-recovery retry still blocked")
// Keep original error but note the failed recovery attempt
resultText = resultText + "\n\n[Auto-recovery attempted but failed. Please use the suggested command manually or switch to pulse_control.]"
}
}
}
// Check if this is an approval request
@@ -351,8 +642,30 @@ func (a *AgenticLoop) executeWithTools(ctx context.Context, sessionID string, me
// Convert input to JSON string for frontend display
inputStr := ""
if tc.Input != nil {
if inputBytes, err := json.Marshal(tc.Input); err == nil {
inputStr = string(inputBytes)
// Special handling for command execution tools to avoid showing raw JSON
if tc.Name == "pulse_control" || tc.Name == "pulse_run_command" || tc.Name == "control" {
if cmd, ok := tc.Input["command"].(string); ok {
// Just show the command being run
inputStr = fmt.Sprintf("Running: %s", cmd)
} else if action, ok := tc.Input["action"].(string); ok {
// Show action (e.g. for guest control)
target := ""
if t, ok := tc.Input["guest_id"].(string); ok {
target = t
} else if t, ok := tc.Input["container"].(string); ok {
target = t
}
inputStr = fmt.Sprintf("%s %s", action, target)
} else {
// Fallback to JSON
if inputBytes, err := json.Marshal(tc.Input); err == nil {
inputStr = string(inputBytes)
}
}
} else {
if inputBytes, err := json.Marshal(tc.Input); err == nil {
inputStr = string(inputBytes)
}
}
}
jsonData, _ := json.Marshal(ToolEndData{
@@ -364,6 +677,42 @@ func (a *AgenticLoop) executeWithTools(ctx context.Context, sessionID string, me
})
callback(StreamEvent{Type: "tool_end", Data: jsonData})
// === FSM STATE TRANSITION: Update FSM after successful tool execution ===
if fsm != nil && !isError {
fsm.OnToolSuccess(toolKind, tc.Name)
// If we just completed verification (read after write in VERIFYING), transition to READING
// This allows subsequent writes to proceed without being blocked
// CRITICAL: Must call this IMMEDIATELY after OnToolSuccess, not just when model gives final answer
if fsm.State == StateVerifying && fsm.ReadAfterWrite {
fsm.CompleteVerification()
log.Debug().
Str("tool", tc.Name).
Str("new_state", string(fsm.State)).
Msg("[AgenticLoop] FSM verification complete after read, transitioning to READING")
}
log.Debug().
Str("tool", tc.Name).
Str("kind", toolKind.String()).
Str("new_state", string(fsm.State)).
Bool("wrote_this_episode", fsm.WroteThisEpisode).
Bool("read_after_write", fsm.ReadAfterWrite).
Msg("[AgenticLoop] FSM state transition after tool success")
// Check if this success resolves a pending recovery
if pr := fsm.CheckRecoverySuccess(tc.Name); pr != nil {
log.Info().
Str("tool", tc.Name).
Str("error_code", pr.ErrorCode).
Str("recovery_id", pr.RecoveryID).
Msg("[AgenticLoop] Auto-recovery succeeded")
if metrics := GetAIMetrics(); metrics != nil {
metrics.RecordAutoRecoverySuccess(pr.ErrorCode, pr.Tool)
}
}
}
// Create tool result message
toolResultMsg := Message{
ID: uuid.New().String(),
@@ -410,6 +759,295 @@ func (a *AgenticLoop) SetAutonomousMode(enabled bool) {
a.mu.Unlock()
}
// SetSessionFSM sets the workflow FSM for the current session.
// This must be called before ExecuteWithTools to enable structural guarantees.
func (a *AgenticLoop) SetSessionFSM(fsm *SessionFSM) {
a.mu.Lock()
a.sessionFSM = fsm
a.mu.Unlock()
}
// SetProviderInfo sets the provider/model info for telemetry.
func (a *AgenticLoop) SetProviderInfo(provider, model string) {
a.mu.Lock()
a.providerName = provider
a.modelName = model
a.mu.Unlock()
}
// hasPhantomExecution detects when the model claims to have executed something
// but no actual tool calls were made. This catches models that "hallucinate"
// tool execution by writing about it instead of calling tools.
//
// We're intentionally conservative here to avoid false positives like:
// - "I checked the docs..." (not a tool)
// - "I ran through the logic..." (not a command)
//
// We only trigger when the model asserts:
// 1. Concrete system metrics/values (CPU %, memory usage, etc.)
// 2. Infrastructure state that requires live queries (running/stopped)
// 3. Fake tool call formatting
func hasPhantomExecution(content string) bool {
if content == "" {
return false
}
lower := strings.ToLower(content)
// Category 1: Concrete metrics/values that MUST come from tools
// These are specific enough that they can't be "general knowledge"
metricsPatterns := []string{
"cpu usage is ", "cpu is at ", "cpu at ",
"memory usage is ", "memory is at ", "memory at ",
"disk usage is ", "disk is at ", "storage at ",
"using % ", "% cpu", "% memory", "% disk",
"mb of ram", "gb of ram", "mb of memory", "gb of memory",
}
for _, pattern := range metricsPatterns {
if strings.Contains(lower, pattern) {
return true
}
}
// Category 2: Claims of infrastructure state that require live queries
// Must be specific claims about current state, not general discussion
statePatterns := []string{
"is currently running", "is currently stopped", "is currently down",
"is now running", "is now stopped", "is now restarted",
"the service is running", "the container is running",
"the service is stopped", "the container is stopped",
"the logs show", "the output shows", "the result shows",
"according to the logs", "according to the output",
}
for _, pattern := range statePatterns {
if strings.Contains(lower, pattern) {
return true
}
}
// Category 3: Fake tool call formatting (definite hallucination)
fakeToolPatterns := []string{
"```tool", "```json\n{\"tool", "tool_result:",
"function_call:", "<tool_call>", "</tool_call>",
"pulse_query(", "pulse_run_command(", "pulse_control(",
}
for _, pattern := range fakeToolPatterns {
if strings.Contains(lower, pattern) {
return true
}
}
// Category 4: Past tense claims of SPECIFIC infrastructure actions
// Only trigger if followed by concrete results (not "I checked and...")
actionResultPatterns := []string{
"i restarted the", "i stopped the", "i started the",
"i killed the", "i terminated the",
"successfully restarted", "successfully stopped", "successfully started",
"has been restarted", "has been stopped", "has been started",
}
for _, pattern := range actionResultPatterns {
if strings.Contains(lower, pattern) {
return true
}
}
return false
}
// truncateForLog truncates a string for logging, adding "..." if truncated
func truncateForLog(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen] + "..."
}
// tryAutoRecovery checks if a tool result is auto-recoverable and returns the suggested rewrite.
// Returns (suggestedRewrite, alreadyAttempted) where:
// - suggestedRewrite is the command to retry with (empty if not recoverable)
// - alreadyAttempted is true if auto-recovery was already attempted for this call
func tryAutoRecovery(result tools.CallToolResult, tc providers.ToolCall, executor *tools.PulseToolExecutor, ctx context.Context) (string, bool) {
// Check if this is already a recovery attempt
if _, ok := tc.Input["_auto_recovery_attempt"]; ok {
return "", true // Already attempted, don't retry again
}
// Parse the result to check for auto_recoverable flag
resultStr := FormatToolResult(result)
// Look for the structured error response pattern
// The result should contain JSON with auto_recoverable and suggested_rewrite
if !strings.Contains(resultStr, `"auto_recoverable"`) {
return "", false
}
// Extract the JSON portion from the result
// Results are formatted as "Error: {json}" or just "{json}"
jsonStart := strings.Index(resultStr, "{")
if jsonStart == -1 {
return "", false
}
var parsed struct {
Error struct {
Details struct {
AutoRecoverable bool `json:"auto_recoverable"`
SuggestedRewrite string `json:"suggested_rewrite"`
Category string `json:"category"`
} `json:"details"`
} `json:"error"`
}
if err := json.Unmarshal([]byte(resultStr[jsonStart:]), &parsed); err != nil {
// Try alternative format where details are at top level
var altParsed struct {
AutoRecoverable bool `json:"auto_recoverable"`
SuggestedRewrite string `json:"suggested_rewrite"`
}
if err2 := json.Unmarshal([]byte(resultStr[jsonStart:]), &altParsed); err2 != nil {
return "", false
}
if altParsed.AutoRecoverable && altParsed.SuggestedRewrite != "" {
return altParsed.SuggestedRewrite, false
}
return "", false
}
if parsed.Error.Details.AutoRecoverable && parsed.Error.Details.SuggestedRewrite != "" {
return parsed.Error.Details.SuggestedRewrite, false
}
return "", false
}
// getCommandFromInput extracts the command from tool input for logging.
func getCommandFromInput(input map[string]interface{}) string {
if cmd, ok := input["command"].(string); ok {
return cmd
}
return "<unknown>"
}
// requiresToolUse determines if the user's message requires live data or an action.
// Returns true for messages that need infrastructure access (check status, restart, etc.)
// Returns false for conceptual questions (What is TCP?, How does Docker work?)
func requiresToolUse(messages []providers.Message) bool {
// Find the last user message
var lastUserContent string
for i := len(messages) - 1; i >= 0; i-- {
if messages[i].Role == "user" && messages[i].ToolResult == nil {
lastUserContent = strings.ToLower(messages[i].Content)
break
}
}
if lastUserContent == "" {
return false
}
// First, check for explicit conceptual question patterns
// These should NOT require tools even if they mention infrastructure terms
conceptualPatterns := []string{
"what is ", "what's the difference", "what are the",
"explain ", "how does ", "how do i ", "how to ",
"why do ", "why does ", "why is it ",
"tell me about ", "describe ",
"can you explain", "help me understand",
"difference between", "best way to", "best practice",
"is it hard", "is it difficult", "is it easy",
"should i ", "would you recommend", "what do you think",
}
for _, pattern := range conceptualPatterns {
if strings.Contains(lastUserContent, pattern) {
// Exception: questions about MY specific infrastructure state are action queries
// e.g., "what is the status of my server" or "what is my CPU usage"
hasMyInfra := strings.Contains(lastUserContent, "my ") ||
strings.Contains(lastUserContent, "on my") ||
strings.Contains(lastUserContent, "@")
hasStateQuery := strings.Contains(lastUserContent, "status") ||
strings.Contains(lastUserContent, "doing") ||
strings.Contains(lastUserContent, "running") ||
strings.Contains(lastUserContent, "using") ||
strings.Contains(lastUserContent, "usage")
if hasMyInfra && hasStateQuery {
break // Not conceptual, continue to action detection
}
return false
}
}
// Pattern 1: @mentions indicate infrastructure references
if strings.Contains(lastUserContent, "@") {
return true
}
// Pattern 2: Action verbs that require live data
// These are more specific to avoid matching conceptual discussions
actionPatterns := []string{
// Direct action commands
"restart ", "start ", "stop ", "reboot ", "shutdown ",
"kill ", "terminate ",
// Status checks (specific phrasing)
"check ", "check the", "status of", "is it running", "is it up", "is it down",
"is running", "is stopped", "is down",
// "is X running?" pattern
" running?", " up?", " down?", " stopped?",
// Live data queries
"show me the", "list my", "list the", "list all",
"what's the cpu", "what's the memory", "what's the disk",
"cpu usage", "memory usage", "disk usage", "storage usage",
"how much memory", "how much cpu", "how much disk",
// Logs and debugging
"show logs", "show the logs", "check logs", "view logs",
"why is my", "why did my", "troubleshoot my", "debug my", "diagnose my",
// Discovery of MY resources
"where is my", "which of my", "find my",
// Questions about "my" specific infrastructure
"my server", "my container", "my vm", "my host", "my infrastructure",
"my node", "my cluster", "my proxmox", "my docker",
}
for _, pattern := range actionPatterns {
if strings.Contains(lastUserContent, pattern) {
return true
}
}
// Default: assume conceptual question, don't force tools
return false
}
// getSystemPrompt builds the full system prompt including the current mode context.
// This is called at request time so the prompt reflects the current mode.
func (a *AgenticLoop) getSystemPrompt() string {
a.mu.Lock()
isAutonomous := a.autonomousMode
a.mu.Unlock()
var modeContext string
if isAutonomous {
modeContext = `
EXECUTION MODE: Autonomous
Commands execute immediately without user approval. Follow the Discover → Investigate → Act
workflow. Gather information before taking action. Use the tools freely to explore logs, check
status, and understand the situation before attempting fixes.`
} else {
modeContext = `
EXECUTION MODE: Controlled
Commands require user approval before execution. The system handles this automatically via a
confirmation prompt - you don't need to ask "Would you like me to...?" Just execute what's
needed and the system will prompt the user to approve if required.`
}
return a.baseSystemPrompt + modeContext
}
// AnswerQuestion provides an answer to a pending question
func (a *AgenticLoop) AnswerQuestion(questionID string, answers []QuestionAnswer) error {
a.mu.Lock()
@@ -484,7 +1122,8 @@ func truncateToolResultForModel(text string) string {
}
truncated := text[:MaxToolResultCharsLimit]
return fmt.Sprintf("%s\n...[truncated %d chars]...", truncated, len(text)-MaxToolResultCharsLimit)
truncatedChars := len(text) - MaxToolResultCharsLimit
return fmt.Sprintf("%s\n\n---\n[TRUNCATED: %d characters cut. The result was too large. If you need specific details that may have been cut, make a more targeted query (e.g., filter by specific resource or type).]", truncated, truncatedChars)
}
// convertToProviderMessages converts our messages to provider format

View File

@@ -169,9 +169,10 @@ func TestAgenticLoop_UpdateTools(t *testing.T) {
executor := tools.NewPulseToolExecutor(tools.ExecutorConfig{})
loop := NewAgenticLoop(mockProvider, executor, "test")
// After tool consolidation, pulse_metrics replaces pulse_get_metrics
hasMetrics := false
for _, tool := range loop.tools {
if tool.Name == "pulse_get_metrics" {
if tool.Name == "pulse_metrics" {
hasMetrics = true
break
}
@@ -183,7 +184,7 @@ func TestAgenticLoop_UpdateTools(t *testing.T) {
hasMetrics = false
for _, tool := range loop.tools {
if tool.Name == "pulse_get_metrics" {
if tool.Name == "pulse_metrics" {
hasMetrics = true
break
}
@@ -269,3 +270,128 @@ func TestWaitForApprovalDecision(t *testing.T) {
assert.Equal(t, approval.StatusApproved, decision.Status)
})
}
func TestRequiresToolUse(t *testing.T) {
tests := []struct {
name string
message string
expected bool
}{
// Should require tools - action requests
{"@mention", "@jellyfin status", true},
{"check status", "check the status of my server", true},
{"restart request", "please restart nginx", true},
{"status query", "is homepage running?", true},
{"logs request", "show me the logs for influxdb", true},
{"cpu query", "what's the cpu usage on delly?", true},
{"memory query", "how much memory is traefik using?", true},
{"container query", "list my docker containers", true},
{"my infrastructure", "what's happening on my server?", true},
{"troubleshoot my", "troubleshoot my plex server", true},
{"my docker", "show me my docker containers", true},
// Should NOT require tools (conceptual questions)
{"what is tcp", "what is tcp?", false},
{"explain docker", "explain how docker networking works", false},
{"general question", "how do i configure nginx?", false},
{"theory question", "what's the difference between lxc and vm?", false},
{"empty message", "", false},
{"greeting", "hello", false},
{"thanks", "thanks for your help!", false},
{"explain proxmox", "explain what proxmox is", false},
{"describe kubernetes", "describe how kubernetes pods work", false},
// Edge cases from feedback - these are conceptual despite mentioning infra terms
{"is docker hard", "is docker networking hard?", false},
{"best way cpu", "what's the best way to monitor CPU usage?", false},
{"best practice", "what are the best practices for container security?", false},
{"should i use", "should i use kubernetes or docker swarm?", false},
// Edge cases - conceptual patterns with action keywords should still be action
// ONLY when they reference MY specific infrastructure
{"what is status", "what is the status of my server", true},
{"what is running", "what is running on my host", true},
{"my cpu usage", "what is my cpu usage", true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
messages := []providers.Message{
{Role: "user", Content: tt.message},
}
result := requiresToolUse(messages)
assert.Equal(t, tt.expected, result, "message: %q", tt.message)
})
}
}
func TestHasPhantomExecution(t *testing.T) {
tests := []struct {
name string
content string
expected bool
}{
// Phantom execution patterns - concrete metrics
{"cpu percentage", "The CPU usage is at 85%", true},
{"memory usage", "Memory usage is 4.2GB", true},
{"disk at", "Disk is at 92% capacity", true},
// Phantom execution patterns - state claims
{"currently running", "The service is currently running", true},
{"currently stopped", "The container is currently stopped", true},
{"logs show", "The logs show several errors", true},
{"output shows", "The output shows the service failed", true},
// Phantom execution patterns - fake tool formatting
{"fake tool block", "```tool\npulse_query\n```", true},
{"fake function call", "pulse_query({\"type\": \"nodes\"})", true},
// Phantom execution patterns - action claims
{"restarted the", "I restarted the nginx service", true},
{"successfully restarted", "The service was successfully restarted", true},
{"has been stopped", "The container has been stopped", true},
// NOT phantom execution - these are safe
{"suggestion", "You should check the logs", false},
{"question", "Would you like me to restart it?", false},
{"explanation", "Docker containers run in isolated environments", false},
{"future tense", "I will check the status for you", false},
{"empty", "", false},
{"general info", "Proxmox uses LXC for containers", false},
// NOT phantom - these used to false-positive
{"checked docs", "I checked the documentation and found...", false},
{"ran through logic", "I ran through the logic and it seems...", false},
{"looked at code", "I looked at the configuration options", false},
{"verified understanding", "I verified my understanding of the issue", false},
{"found that general", "I found that Docker networking is complex", false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := hasPhantomExecution(tt.content)
assert.Equal(t, tt.expected, result, "content: %q", tt.content)
})
}
}
func TestTruncateForLog(t *testing.T) {
tests := []struct {
name string
input string
maxLen int
expected string
}{
{"short string", "hello", 10, "hello"},
{"exact length", "hello", 5, "hello"},
{"needs truncation", "hello world", 5, "hello..."},
{"empty string", "", 10, ""},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := truncateForLog(tt.input, tt.maxLen)
assert.Equal(t, tt.expected, result)
})
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,434 @@
package tools
import (
"context"
"fmt"
"strings"
"github.com/rcourtman/pulse-go-rewrite/internal/agentexec"
"github.com/rs/zerolog/log"
)
// registerReadTools registers the read-only pulse_read tool
// This tool is ALWAYS classified as ToolKindRead and will never trigger VERIFYING state
func (e *PulseToolExecutor) registerReadTools() {
e.registry.Register(RegisteredTool{
Definition: Tool{
Name: "pulse_read",
Description: `Execute read-only operations on infrastructure. Use this for investigation and diagnostics.
This tool is for READ-ONLY operations only. It will reject any command that could modify state.
Actions:
- exec: Execute a read-only shell command (grep, cat, tail, journalctl, docker logs, ps, etc.)
- file: Read contents of a file
- find: Find files by pattern (glob)
- tail: Tail a file or log (last N lines)
- logs: Read logs from journalctl or docker
USE THIS TOOL FOR:
- Checking logs: action="exec", command="grep -i error /var/log/app/*.log"
- Reading files: action="file", path="/etc/config.yaml"
- Finding files: action="find", pattern="/var/log/*.log"
- Tailing logs: action="tail", path="/var/log/syslog", lines=100
- Docker logs: action="logs", source="docker", container="nginx"
- Journal logs: action="logs", source="journal", unit="nginx"
DO NOT use this for write operations - use pulse_control instead.
Routing: target_host can be a Proxmox host (delly), an LXC name (jellyfin), or a VM name.
Commands are automatically routed through the appropriate agent.`,
InputSchema: InputSchema{
Type: "object",
Properties: map[string]PropertySchema{
"action": {
Type: "string",
Description: "Read action: exec, file, find, tail, logs",
Enum: []string{"exec", "file", "find", "tail", "logs"},
},
"target_host": {
Type: "string",
Description: "Hostname to read from (Proxmox host, LXC name, or VM name)",
},
"command": {
Type: "string",
Description: "For exec: the read-only shell command to run",
},
"path": {
Type: "string",
Description: "For file/find/tail: the file path or glob pattern",
},
"pattern": {
Type: "string",
Description: "For find: glob pattern to search for",
},
"lines": {
Type: "integer",
Description: "For tail: number of lines (default 100)",
},
"source": {
Type: "string",
Description: "For logs: 'docker' or 'journal'",
Enum: []string{"docker", "journal"},
},
"container": {
Type: "string",
Description: "For logs with source=docker: container name",
},
"unit": {
Type: "string",
Description: "For logs with source=journal: systemd unit name",
},
"since": {
Type: "string",
Description: "For logs: time filter (e.g., '1h', '30m', '2024-01-01')",
},
"grep": {
Type: "string",
Description: "For logs/tail: filter output by pattern",
},
"docker_container": {
Type: "string",
Description: "Read from inside a Docker container (target_host is where Docker runs)",
},
},
Required: []string{"action", "target_host"},
},
},
Handler: func(ctx context.Context, exec *PulseToolExecutor, args map[string]interface{}) (CallToolResult, error) {
return exec.executeRead(ctx, args)
},
// Note: RequireControl is NOT set - this is a read-only tool
// It's available at all control levels including read_only
})
}
// executeRead routes to the appropriate read handler based on action
func (e *PulseToolExecutor) executeRead(ctx context.Context, args map[string]interface{}) (CallToolResult, error) {
action, _ := args["action"].(string)
switch action {
case "exec":
return e.executeReadExec(ctx, args)
case "file":
return e.executeReadFile(ctx, args)
case "find":
return e.executeReadFind(ctx, args)
case "tail":
return e.executeReadTail(ctx, args)
case "logs":
return e.executeReadLogs(ctx, args)
default:
return NewErrorResult(fmt.Errorf("unknown action: %s. Use: exec, file, find, tail, logs", action)), nil
}
}
// executeReadExec executes a read-only command
// This STRUCTURALLY ENFORCES read-only by rejecting non-read commands at the tool layer
func (e *PulseToolExecutor) executeReadExec(ctx context.Context, args map[string]interface{}) (CallToolResult, error) {
command, _ := args["command"].(string)
targetHost, _ := args["target_host"].(string)
dockerContainer, _ := args["docker_container"].(string)
if command == "" {
return NewErrorResult(fmt.Errorf("command is required for exec action")), nil
}
if targetHost == "" {
return NewErrorResult(fmt.Errorf("target_host is required")), nil
}
// STRUCTURAL ENFORCEMENT: Reject non-read-only commands at the tool layer
// This is enforced HERE, not in the model's prompt
// Uses ExecutionIntent: ReadOnlyCertain and ReadOnlyConditional are allowed;
// WriteOrUnknown is rejected.
intentResult := ClassifyExecutionIntent(command)
if intentResult.Intent == IntentWriteOrUnknown {
hint := GetReadOnlyViolationHint(command, intentResult)
alternative := "Use pulse_control type=command for write operations"
details := map[string]interface{}{
"command": truncateCommand(command, 100),
"reason": intentResult.Reason,
"hint": hint,
"alternative": alternative,
}
// If this is a NonInteractiveOnly block with a suggested rewrite,
// include auto-recovery information
if niBlock := intentResult.NonInteractiveBlock; niBlock != nil {
details["auto_recoverable"] = niBlock.AutoRecoverable
details["category"] = niBlock.Category
if niBlock.SuggestedCmd != "" {
details["suggested_rewrite"] = niBlock.SuggestedCmd
details["recovery_hint"] = fmt.Sprintf("Retry with: %s", niBlock.SuggestedCmd)
}
}
return NewToolResponseResult(NewToolBlockedError(
"READ_ONLY_VIOLATION",
fmt.Sprintf("Command '%s' is not read-only. Use pulse_control for write operations.", truncateCommand(command, 50)),
details,
)), nil
}
// Validate routing context - block if targeting a Proxmox host when child resources exist
// This prevents accidentally reading from the host when user meant to read from an LXC/VM
routingResult := e.validateRoutingContext(targetHost)
if routingResult.IsBlocked() {
return NewToolResponseResult(routingResult.RoutingError.ToToolResponse()), nil
}
// Validate resource is in resolved context
// For read-only exec, we allow if ANY resource has been discovered in the session
validation := e.validateResolvedResourceForExec(targetHost, command, true)
if validation.IsBlocked() {
return NewToolResponseResult(validation.StrictError.ToToolResponse()), nil
}
if e.agentServer == nil {
return NewErrorResult(fmt.Errorf("no agent server available")), nil
}
// Resolve target to the correct agent and routing info (with full provenance)
routing := e.resolveTargetForCommandFull(targetHost)
if routing.AgentID == "" {
if routing.TargetType == "container" || routing.TargetType == "vm" {
return NewErrorResult(fmt.Errorf("'%s' is a %s but no agent is available on its Proxmox host", targetHost, routing.TargetType)), nil
}
return NewErrorResult(fmt.Errorf("no agent available for target '%s'", targetHost)), nil
}
// Build command (with optional Docker wrapper)
execCommand := command
if dockerContainer != "" {
// Validate container name
if !isValidContainerName(dockerContainer) {
return NewErrorResult(fmt.Errorf("invalid docker_container name")), nil
}
execCommand = fmt.Sprintf("docker exec %s sh -c %s", shellEscape(dockerContainer), shellEscape(command))
}
log.Debug().
Str("command", truncateCommand(command, 100)).
Str("target", targetHost).
Str("agent", routing.AgentID).
Str("agent_host", routing.AgentHostname).
Str("target_type", routing.TargetType).
Str("target_id", routing.TargetID).
Str("transport", routing.Transport).
Str("resolved_kind", routing.ResolvedKind).
Msg("[pulse_read] Executing read-only command")
result, err := e.agentServer.ExecuteCommand(ctx, routing.AgentID, agentexec.ExecuteCommandPayload{
Command: execCommand,
TargetType: routing.TargetType,
TargetID: routing.TargetID,
})
if err != nil {
return NewErrorResult(fmt.Errorf("command execution failed: %w", err)), nil
}
output := result.Stdout
if result.Stderr != "" {
if output != "" {
output += "\n"
}
output += result.Stderr
}
if result.ExitCode != 0 {
return NewTextResult(fmt.Sprintf("Command exited with code %d:\n%s", result.ExitCode, output)), nil
}
if output == "" {
return NewTextResult("Command completed successfully (no output)"), nil
}
return NewTextResult(output), nil
}
// executeReadFile reads a file's contents
func (e *PulseToolExecutor) executeReadFile(ctx context.Context, args map[string]interface{}) (CallToolResult, error) {
path, _ := args["path"].(string)
targetHost, _ := args["target_host"].(string)
dockerContainer, _ := args["docker_container"].(string)
if path == "" {
return NewErrorResult(fmt.Errorf("path is required for file action")), nil
}
if targetHost == "" {
return NewErrorResult(fmt.Errorf("target_host is required")), nil
}
// Validate path is absolute
if !strings.HasPrefix(path, "/") {
return NewErrorResult(fmt.Errorf("path must be absolute (start with /)")), nil
}
// Note: routing validation is done inside executeFileRead
// Use the existing file read implementation
return e.executeFileRead(ctx, path, targetHost, dockerContainer)
}
// executeReadFind finds files by pattern
func (e *PulseToolExecutor) executeReadFind(ctx context.Context, args map[string]interface{}) (CallToolResult, error) {
pattern, _ := args["pattern"].(string)
path, _ := args["path"].(string)
targetHost, _ := args["target_host"].(string)
if pattern == "" && path == "" {
return NewErrorResult(fmt.Errorf("pattern or path is required for find action")), nil
}
if targetHost == "" {
return NewErrorResult(fmt.Errorf("target_host is required")), nil
}
// Use pattern if provided, otherwise use path as the pattern
searchPattern := pattern
if searchPattern == "" {
searchPattern = path
}
// Extract directory and filename pattern
dir := "/"
filePattern := searchPattern
if lastSlash := strings.LastIndex(searchPattern, "/"); lastSlash > 0 {
dir = searchPattern[:lastSlash]
filePattern = searchPattern[lastSlash+1:]
}
// Build a safe find command
// Use -maxdepth to prevent runaway searches
command := fmt.Sprintf("find %s -maxdepth 3 -name %s -type f 2>/dev/null | head -50",
shellEscape(dir), shellEscape(filePattern))
// Execute via read exec
return e.executeReadExec(ctx, map[string]interface{}{
"action": "exec",
"command": command,
"target_host": targetHost,
})
}
// executeReadTail tails a file
func (e *PulseToolExecutor) executeReadTail(ctx context.Context, args map[string]interface{}) (CallToolResult, error) {
path, _ := args["path"].(string)
targetHost, _ := args["target_host"].(string)
lines := intArg(args, "lines", 100)
grepPattern, _ := args["grep"].(string)
dockerContainer, _ := args["docker_container"].(string)
if path == "" {
return NewErrorResult(fmt.Errorf("path is required for tail action")), nil
}
if targetHost == "" {
return NewErrorResult(fmt.Errorf("target_host is required")), nil
}
// Validate path is absolute
if !strings.HasPrefix(path, "/") {
return NewErrorResult(fmt.Errorf("path must be absolute (start with /)")), nil
}
// Cap lines to prevent memory issues
if lines > 1000 {
lines = 1000
}
if lines < 1 {
lines = 100
}
// Build command
command := fmt.Sprintf("tail -n %d %s", lines, shellEscape(path))
if grepPattern != "" {
command += fmt.Sprintf(" | grep -i %s", shellEscape(grepPattern))
}
return e.executeReadExec(ctx, map[string]interface{}{
"action": "exec",
"command": command,
"target_host": targetHost,
"docker_container": dockerContainer,
})
}
// executeReadLogs reads logs from docker or journalctl
func (e *PulseToolExecutor) executeReadLogs(ctx context.Context, args map[string]interface{}) (CallToolResult, error) {
source, _ := args["source"].(string)
targetHost, _ := args["target_host"].(string)
container, _ := args["container"].(string)
unit, _ := args["unit"].(string)
since, _ := args["since"].(string)
grepPattern, _ := args["grep"].(string)
lines := intArg(args, "lines", 100)
if targetHost == "" {
return NewErrorResult(fmt.Errorf("target_host is required")), nil
}
// Cap lines
if lines > 1000 {
lines = 1000
}
if lines < 1 {
lines = 100
}
var command string
switch source {
case "docker":
if container == "" {
return NewErrorResult(fmt.Errorf("container is required for docker logs")), nil
}
if !isValidContainerName(container) {
return NewErrorResult(fmt.Errorf("invalid container name")), nil
}
command = fmt.Sprintf("docker logs --tail %d %s", lines, shellEscape(container))
if since != "" {
command = fmt.Sprintf("docker logs --since %s --tail %d %s", shellEscape(since), lines, shellEscape(container))
}
case "journal":
if unit == "" {
return NewErrorResult(fmt.Errorf("unit is required for journal logs")), nil
}
command = fmt.Sprintf("journalctl -u %s -n %d --no-pager", shellEscape(unit), lines)
if since != "" {
command = fmt.Sprintf("journalctl -u %s --since %s -n %d --no-pager", shellEscape(unit), shellEscape(since), lines)
}
default:
return NewErrorResult(fmt.Errorf("source must be 'docker' or 'journal'")), nil
}
// Add grep filter if provided
if grepPattern != "" {
command += fmt.Sprintf(" 2>&1 | grep -i %s", shellEscape(grepPattern))
}
return e.executeReadExec(ctx, map[string]interface{}{
"action": "exec",
"command": command,
"target_host": targetHost,
})
}
// truncateCommand truncates a command for display/logging
func truncateCommand(cmd string, maxLen int) string {
if len(cmd) <= maxLen {
return cmd
}
return cmd[:maxLen] + "..."
}
// isValidContainerName validates a container name (alphanumeric, _, -, .)
func isValidContainerName(name string) bool {
if name == "" {
return false
}
for _, c := range name {
if !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9') || c == '_' || c == '-' || c == '.') {
return false
}
}
return true
}