refactor(ai): update patrol and service to use chat service adapter

- Update patrol.go to use chat service for AI execution
- Update service.go with chat service provider integration
- Add patrol streaming endpoint to router
This commit is contained in:
rcourtman
2026-01-28 21:24:34 +00:00
parent 2335099089
commit 03b5586ac8
3 changed files with 178 additions and 46 deletions

View File

@@ -2889,9 +2889,7 @@ func (p *PatrolService) aiResolveFindings(ctx context.Context) int {
You have access to comprehensive infrastructure monitoring tools:
**Current State:**
- pulse_get_topology - Full infrastructure snapshot (nodes, VMs, containers, storage)
- pulse_get_resource - Detailed info on a specific resource
- pulse_search_resources - Search for resources by name/type
- pulse_query - Query infrastructure (topology, search, resource details)
**Deep Analysis:**
- pulse_get_metrics - Time-series metrics to see trends (is it improving or worsening?)
@@ -2950,19 +2948,56 @@ Begin your investigation now. Use multiple tools to build a complete picture bef
// Execute with streaming - AI has access to MCP tools to query current infrastructure state
var responseBuffer strings.Builder
_, err := aiService.ExecuteStream(ctx, ExecuteRequest{
Prompt: prompt,
UseCase: "patrol", // Use patrol model for this background task
}, func(event StreamEvent) {
if event.Type == "content" {
if content, ok := event.Data.(string); ok {
responseBuffer.WriteString(content)
var executionErr error
// Try chat service path first (50+ MCP tools, FSM safety, sessions)
if cs := aiService.GetChatService(); cs != nil {
log.Debug().Msg("AI Patrol: Using chat service path for finding review")
chatResp, chatErr := cs.ExecutePatrolStream(ctx, PatrolExecuteRequest{
Prompt: prompt,
SystemPrompt: p.getPatrolSystemPrompt(),
SessionID: "patrol-findings-review",
UseCase: "patrol",
}, func(event ChatStreamEvent) {
if event.Type == "content" {
var contentData struct {
Text string `json:"text"`
}
if json.Unmarshal(event.Data, &contentData) == nil && contentData.Text != "" {
responseBuffer.WriteString(contentData.Text)
}
}
})
if chatErr != nil {
log.Warn().Err(chatErr).Msg("AI Patrol: Chat service path failed for finding review, falling back")
responseBuffer.Reset()
} else {
// Use chat response content
if chatResp.Content != "" {
responseBuffer.Reset()
responseBuffer.WriteString(chatResp.Content)
}
}
})
executionErr = chatErr
}
if err != nil {
log.Debug().Err(err).Msg("AI Patrol: Failed to get AI judgment on findings")
// Legacy fallback path
if responseBuffer.Len() == 0 {
_, legacyErr := aiService.ExecuteStream(ctx, ExecuteRequest{
Prompt: prompt,
UseCase: "patrol",
}, func(event StreamEvent) {
if event.Type == "content" {
if content, ok := event.Data.(string); ok {
responseBuffer.WriteString(content)
}
}
})
executionErr = legacyErr
}
if executionErr != nil && responseBuffer.Len() == 0 {
log.Debug().Err(executionErr).Msg("AI Patrol: Failed to get AI judgment on findings")
return 0
}
@@ -4152,50 +4187,98 @@ func (p *PatrolService) runAIAnalysisWithPrompt(ctx context.Context, state model
// Use streaming to broadcast updates in real-time
var contentBuffer strings.Builder
var inputTokens, outputTokens int
var finalContent string
resp, err := p.aiService.ExecuteStream(ctx, ExecuteRequest{
Prompt: prompt,
SystemPrompt: p.getPatrolSystemPrompt(),
UseCase: "patrol", // Use patrol model for background analysis
}, func(event StreamEvent) {
switch event.Type {
case "content":
if content, ok := event.Data.(string); ok {
contentBuffer.WriteString(content)
p.appendStreamContent(content)
// Try chat service path first (50+ MCP tools, FSM safety, sessions)
if cs := p.aiService.GetChatService(); cs != nil {
log.Debug().Msg("AI Patrol: Using chat service execution path")
chatResp, chatErr := cs.ExecutePatrolStream(ctx, PatrolExecuteRequest{
Prompt: prompt,
SystemPrompt: p.getPatrolSystemPrompt(),
SessionID: "patrol-main",
UseCase: "patrol",
}, func(event ChatStreamEvent) {
switch event.Type {
case "content":
// Chat service sends JSON-encoded content events
var contentData struct {
Text string `json:"text"`
}
if json.Unmarshal(event.Data, &contentData) == nil && contentData.Text != "" {
contentBuffer.WriteString(contentData.Text)
p.appendStreamContent(contentData.Text)
}
case "thinking":
var thinkingData struct {
Text string `json:"text"`
}
if json.Unmarshal(event.Data, &thinkingData) == nil && thinkingData.Text != "" {
p.broadcast(PatrolStreamEvent{
Type: "thinking",
Content: thinkingData.Text,
})
}
}
case "thinking":
// Thinking chunks are for live streaming only - don't persist them
// They allow users to see the AI's reasoning in real-time, but the
// final stored analysis should only contain the actual findings
if thinking, ok := event.Data.(string); ok && thinking != "" {
// Broadcast for live viewing ONLY - don't add to contentBuffer
p.broadcast(PatrolStreamEvent{
Type: "thinking",
Content: thinking,
})
})
if chatErr == nil {
finalContent = chatResp.Content
if finalContent == "" {
finalContent = contentBuffer.String()
}
inputTokens = chatResp.InputTokens
outputTokens = chatResp.OutputTokens
log.Info().
Int("input_tokens", inputTokens).
Int("output_tokens", outputTokens).
Msg("AI Patrol: Chat service execution path succeeded")
} else {
log.Warn().Err(chatErr).Msg("AI Patrol: Chat service path failed, falling back to legacy path")
// Reset buffers for legacy path
contentBuffer.Reset()
}
})
if err != nil {
p.setStreamPhase("idle")
p.broadcast(PatrolStreamEvent{Type: "error", Content: err.Error()})
return nil, fmt.Errorf("LLM analysis failed: %w", err)
}
// Use response content (streaming may have captured it already)
finalContent := resp.Content
// Legacy fallback path (3 tools: run_command, fetch_url, set_resource_url)
if finalContent == "" {
finalContent = contentBuffer.String()
resp, err := p.aiService.ExecuteStream(ctx, ExecuteRequest{
Prompt: prompt,
SystemPrompt: p.getPatrolSystemPrompt(),
UseCase: "patrol",
}, func(event StreamEvent) {
switch event.Type {
case "content":
if content, ok := event.Data.(string); ok {
contentBuffer.WriteString(content)
p.appendStreamContent(content)
}
case "thinking":
if thinking, ok := event.Data.(string); ok && thinking != "" {
p.broadcast(PatrolStreamEvent{
Type: "thinking",
Content: thinking,
})
}
}
})
if err != nil {
p.setStreamPhase("idle")
p.broadcast(PatrolStreamEvent{Type: "error", Content: err.Error()})
return nil, fmt.Errorf("LLM analysis failed: %w", err)
}
finalContent = resp.Content
if finalContent == "" {
finalContent = contentBuffer.String()
}
inputTokens = resp.InputTokens
outputTokens = resp.OutputTokens
}
// Clean any thinking tokens that might have leaked through from the provider
finalContent = cleanThinkingTokens(finalContent)
inputTokens = resp.InputTokens
outputTokens = resp.OutputTokens
log.Debug().
Int("input_tokens", inputTokens).
Int("output_tokens", outputTokens).

View File

@@ -67,10 +67,11 @@ type AgentServer interface {
}
// ChatServiceProvider defines the interface for accessing chat functionality
// This is used by the investigation orchestrator to run investigations
// This is used by the investigation orchestrator and patrol to run AI executions
type ChatServiceProvider interface {
CreateSession(ctx context.Context) (*ChatSession, error)
ExecuteStream(ctx context.Context, req ChatExecuteRequest, callback ChatStreamCallback) error
ExecutePatrolStream(ctx context.Context, req PatrolExecuteRequest, callback ChatStreamCallback) (*PatrolStreamResponse, error)
GetMessages(ctx context.Context, sessionID string) ([]ChatMessage, error)
DeleteSession(ctx context.Context, sessionID string) error
}
@@ -103,6 +104,21 @@ type ChatMessage struct {
Timestamp time.Time `json:"timestamp"`
}
// PatrolExecuteRequest represents a patrol execution request via the chat service
type PatrolExecuteRequest struct {
Prompt string `json:"prompt"`
SystemPrompt string `json:"system_prompt"`
SessionID string `json:"session_id,omitempty"`
UseCase string `json:"use_case"` // "patrol" — for model selection
}
// PatrolStreamResponse contains the results of a patrol execution via the chat service
type PatrolStreamResponse struct {
Content string `json:"content"`
InputTokens int `json:"input_tokens"`
OutputTokens int `json:"output_tokens"`
}
// Service orchestrates AI interactions
type Service struct {
mu sync.RWMutex

View File

@@ -29,6 +29,7 @@ import (
"github.com/rcourtman/pulse-go-rewrite/internal/ai"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/adapters"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/baseline"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/chat"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/circuit"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/forecast"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/knowledge"
@@ -2490,6 +2491,9 @@ func (r *Router) StartAIChat(ctx context.Context) {
// Wire up MCP tool providers so AI can access real data
r.wireAIChatProviders()
// Wire chat service to AI service for patrol and investigation
r.wireChatServiceToAI()
// Wire up investigation orchestrator now that chat service is ready
// This must happen after Start() because the orchestrator needs the chat service
if r.aiSettingsHandler != nil {
@@ -2510,6 +2514,35 @@ func (r *Router) StartAIChat(ctx context.Context) {
}
}
// wireChatServiceToAI wires the chat service adapter to the AI service,
// enabling patrol and investigation to use the chat service's execution path
// (50+ MCP tools, FSM safety, sessions) instead of the legacy 3-tool path.
func (r *Router) wireChatServiceToAI() {
if r.aiHandler == nil || r.aiSettingsHandler == nil {
return
}
ctx := context.Background()
chatSvc := r.aiHandler.GetService(ctx)
if chatSvc == nil {
return
}
chatService, ok := chatSvc.(*chat.Service)
if !ok {
log.Warn().Msg("Chat service is not *chat.Service, cannot create patrol adapter")
return
}
aiService := r.aiSettingsHandler.GetAIService(ctx)
if aiService == nil {
return
}
aiService.SetChatService(&chatServiceAdapter{svc: chatService})
log.Info().Msg("Chat service wired to AI service for patrol and investigation")
}
// wireAIChatProviders wires up all MCP tool providers for AI chat
func (r *Router) wireAIChatProviders() {
if r.aiHandler == nil || !r.aiHandler.IsRunning(context.Background()) {