diff --git a/internal/ai/patrol.go b/internal/ai/patrol.go index 333aa6812..4e4a69807 100644 --- a/internal/ai/patrol.go +++ b/internal/ai/patrol.go @@ -2889,9 +2889,7 @@ func (p *PatrolService) aiResolveFindings(ctx context.Context) int { You have access to comprehensive infrastructure monitoring tools: **Current State:** -- pulse_get_topology - Full infrastructure snapshot (nodes, VMs, containers, storage) -- pulse_get_resource - Detailed info on a specific resource -- pulse_search_resources - Search for resources by name/type +- pulse_query - Query infrastructure (topology, search, resource details) **Deep Analysis:** - pulse_get_metrics - Time-series metrics to see trends (is it improving or worsening?) @@ -2950,19 +2948,56 @@ Begin your investigation now. Use multiple tools to build a complete picture bef // Execute with streaming - AI has access to MCP tools to query current infrastructure state var responseBuffer strings.Builder - _, err := aiService.ExecuteStream(ctx, ExecuteRequest{ - Prompt: prompt, - UseCase: "patrol", // Use patrol model for this background task - }, func(event StreamEvent) { - if event.Type == "content" { - if content, ok := event.Data.(string); ok { - responseBuffer.WriteString(content) + var executionErr error + + // Try chat service path first (50+ MCP tools, FSM safety, sessions) + if cs := aiService.GetChatService(); cs != nil { + log.Debug().Msg("AI Patrol: Using chat service path for finding review") + chatResp, chatErr := cs.ExecutePatrolStream(ctx, PatrolExecuteRequest{ + Prompt: prompt, + SystemPrompt: p.getPatrolSystemPrompt(), + SessionID: "patrol-findings-review", + UseCase: "patrol", + }, func(event ChatStreamEvent) { + if event.Type == "content" { + var contentData struct { + Text string `json:"text"` + } + if json.Unmarshal(event.Data, &contentData) == nil && contentData.Text != "" { + responseBuffer.WriteString(contentData.Text) + } + } + }) + if chatErr != nil { + log.Warn().Err(chatErr).Msg("AI Patrol: Chat service path failed for finding review, falling back") + responseBuffer.Reset() + } else { + // Use chat response content + if chatResp.Content != "" { + responseBuffer.Reset() + responseBuffer.WriteString(chatResp.Content) } } - }) + executionErr = chatErr + } - if err != nil { - log.Debug().Err(err).Msg("AI Patrol: Failed to get AI judgment on findings") + // Legacy fallback path + if responseBuffer.Len() == 0 { + _, legacyErr := aiService.ExecuteStream(ctx, ExecuteRequest{ + Prompt: prompt, + UseCase: "patrol", + }, func(event StreamEvent) { + if event.Type == "content" { + if content, ok := event.Data.(string); ok { + responseBuffer.WriteString(content) + } + } + }) + executionErr = legacyErr + } + + if executionErr != nil && responseBuffer.Len() == 0 { + log.Debug().Err(executionErr).Msg("AI Patrol: Failed to get AI judgment on findings") return 0 } @@ -4152,50 +4187,98 @@ func (p *PatrolService) runAIAnalysisWithPrompt(ctx context.Context, state model // Use streaming to broadcast updates in real-time var contentBuffer strings.Builder var inputTokens, outputTokens int + var finalContent string - resp, err := p.aiService.ExecuteStream(ctx, ExecuteRequest{ - Prompt: prompt, - SystemPrompt: p.getPatrolSystemPrompt(), - UseCase: "patrol", // Use patrol model for background analysis - }, func(event StreamEvent) { - switch event.Type { - case "content": - if content, ok := event.Data.(string); ok { - contentBuffer.WriteString(content) - p.appendStreamContent(content) + // Try chat service path first (50+ MCP tools, FSM safety, sessions) + if cs := p.aiService.GetChatService(); cs != nil { + log.Debug().Msg("AI Patrol: Using chat service execution path") + chatResp, chatErr := cs.ExecutePatrolStream(ctx, PatrolExecuteRequest{ + Prompt: prompt, + SystemPrompt: p.getPatrolSystemPrompt(), + SessionID: "patrol-main", + UseCase: "patrol", + }, func(event ChatStreamEvent) { + switch event.Type { + case "content": + // Chat service sends JSON-encoded content events + var contentData struct { + Text string `json:"text"` + } + if json.Unmarshal(event.Data, &contentData) == nil && contentData.Text != "" { + contentBuffer.WriteString(contentData.Text) + p.appendStreamContent(contentData.Text) + } + case "thinking": + var thinkingData struct { + Text string `json:"text"` + } + if json.Unmarshal(event.Data, &thinkingData) == nil && thinkingData.Text != "" { + p.broadcast(PatrolStreamEvent{ + Type: "thinking", + Content: thinkingData.Text, + }) + } } - case "thinking": - // Thinking chunks are for live streaming only - don't persist them - // They allow users to see the AI's reasoning in real-time, but the - // final stored analysis should only contain the actual findings - if thinking, ok := event.Data.(string); ok && thinking != "" { - // Broadcast for live viewing ONLY - don't add to contentBuffer - p.broadcast(PatrolStreamEvent{ - Type: "thinking", - Content: thinking, - }) + }) + + if chatErr == nil { + finalContent = chatResp.Content + if finalContent == "" { + finalContent = contentBuffer.String() } + inputTokens = chatResp.InputTokens + outputTokens = chatResp.OutputTokens + log.Info(). + Int("input_tokens", inputTokens). + Int("output_tokens", outputTokens). + Msg("AI Patrol: Chat service execution path succeeded") + } else { + log.Warn().Err(chatErr).Msg("AI Patrol: Chat service path failed, falling back to legacy path") + // Reset buffers for legacy path + contentBuffer.Reset() } - }) - - if err != nil { - p.setStreamPhase("idle") - p.broadcast(PatrolStreamEvent{Type: "error", Content: err.Error()}) - return nil, fmt.Errorf("LLM analysis failed: %w", err) } - // Use response content (streaming may have captured it already) - finalContent := resp.Content + // Legacy fallback path (3 tools: run_command, fetch_url, set_resource_url) if finalContent == "" { - finalContent = contentBuffer.String() + resp, err := p.aiService.ExecuteStream(ctx, ExecuteRequest{ + Prompt: prompt, + SystemPrompt: p.getPatrolSystemPrompt(), + UseCase: "patrol", + }, func(event StreamEvent) { + switch event.Type { + case "content": + if content, ok := event.Data.(string); ok { + contentBuffer.WriteString(content) + p.appendStreamContent(content) + } + case "thinking": + if thinking, ok := event.Data.(string); ok && thinking != "" { + p.broadcast(PatrolStreamEvent{ + Type: "thinking", + Content: thinking, + }) + } + } + }) + + if err != nil { + p.setStreamPhase("idle") + p.broadcast(PatrolStreamEvent{Type: "error", Content: err.Error()}) + return nil, fmt.Errorf("LLM analysis failed: %w", err) + } + + finalContent = resp.Content + if finalContent == "" { + finalContent = contentBuffer.String() + } + inputTokens = resp.InputTokens + outputTokens = resp.OutputTokens } // Clean any thinking tokens that might have leaked through from the provider finalContent = cleanThinkingTokens(finalContent) - inputTokens = resp.InputTokens - outputTokens = resp.OutputTokens - log.Debug(). Int("input_tokens", inputTokens). Int("output_tokens", outputTokens). diff --git a/internal/ai/service.go b/internal/ai/service.go index 26e8cbe4f..3a806eac3 100644 --- a/internal/ai/service.go +++ b/internal/ai/service.go @@ -67,10 +67,11 @@ type AgentServer interface { } // ChatServiceProvider defines the interface for accessing chat functionality -// This is used by the investigation orchestrator to run investigations +// This is used by the investigation orchestrator and patrol to run AI executions type ChatServiceProvider interface { CreateSession(ctx context.Context) (*ChatSession, error) ExecuteStream(ctx context.Context, req ChatExecuteRequest, callback ChatStreamCallback) error + ExecutePatrolStream(ctx context.Context, req PatrolExecuteRequest, callback ChatStreamCallback) (*PatrolStreamResponse, error) GetMessages(ctx context.Context, sessionID string) ([]ChatMessage, error) DeleteSession(ctx context.Context, sessionID string) error } @@ -103,6 +104,21 @@ type ChatMessage struct { Timestamp time.Time `json:"timestamp"` } +// PatrolExecuteRequest represents a patrol execution request via the chat service +type PatrolExecuteRequest struct { + Prompt string `json:"prompt"` + SystemPrompt string `json:"system_prompt"` + SessionID string `json:"session_id,omitempty"` + UseCase string `json:"use_case"` // "patrol" — for model selection +} + +// PatrolStreamResponse contains the results of a patrol execution via the chat service +type PatrolStreamResponse struct { + Content string `json:"content"` + InputTokens int `json:"input_tokens"` + OutputTokens int `json:"output_tokens"` +} + // Service orchestrates AI interactions type Service struct { mu sync.RWMutex diff --git a/internal/api/router.go b/internal/api/router.go index 2767dd52f..90a949e55 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -29,6 +29,7 @@ import ( "github.com/rcourtman/pulse-go-rewrite/internal/ai" "github.com/rcourtman/pulse-go-rewrite/internal/ai/adapters" "github.com/rcourtman/pulse-go-rewrite/internal/ai/baseline" + "github.com/rcourtman/pulse-go-rewrite/internal/ai/chat" "github.com/rcourtman/pulse-go-rewrite/internal/ai/circuit" "github.com/rcourtman/pulse-go-rewrite/internal/ai/forecast" "github.com/rcourtman/pulse-go-rewrite/internal/ai/knowledge" @@ -2490,6 +2491,9 @@ func (r *Router) StartAIChat(ctx context.Context) { // Wire up MCP tool providers so AI can access real data r.wireAIChatProviders() + // Wire chat service to AI service for patrol and investigation + r.wireChatServiceToAI() + // Wire up investigation orchestrator now that chat service is ready // This must happen after Start() because the orchestrator needs the chat service if r.aiSettingsHandler != nil { @@ -2510,6 +2514,35 @@ func (r *Router) StartAIChat(ctx context.Context) { } } +// wireChatServiceToAI wires the chat service adapter to the AI service, +// enabling patrol and investigation to use the chat service's execution path +// (50+ MCP tools, FSM safety, sessions) instead of the legacy 3-tool path. +func (r *Router) wireChatServiceToAI() { + if r.aiHandler == nil || r.aiSettingsHandler == nil { + return + } + + ctx := context.Background() + chatSvc := r.aiHandler.GetService(ctx) + if chatSvc == nil { + return + } + + chatService, ok := chatSvc.(*chat.Service) + if !ok { + log.Warn().Msg("Chat service is not *chat.Service, cannot create patrol adapter") + return + } + + aiService := r.aiSettingsHandler.GetAIService(ctx) + if aiService == nil { + return + } + + aiService.SetChatService(&chatServiceAdapter{svc: chatService}) + log.Info().Msg("Chat service wired to AI service for patrol and investigation") +} + // wireAIChatProviders wires up all MCP tool providers for AI chat func (r *Router) wireAIChatProviders() { if r.aiHandler == nil || !r.aiHandler.IsRunning(context.Background()) {