refactor(api): Update handlers for native AI chat service

Adapts API handlers to use the new native chat service:

ai_handler.go:
- Replace opencode.Service with chat.Service
- Add AIService interface for testability
- Add factory function for service creation (mockable)
- Update provider wiring to use tools package types

ai_handlers.go:
- Add Notable field to model list response
- Simplify command approval - execution handled by agentic loop
- Remove inline command execution from approval endpoint

router.go:
- Update imports: mcp -> tools, opencode -> chat
- Add monitor wrapper types for cleaner dependency injection
- Update patrol wiring for new chat service

agent_profiles:
- Rename agent_profiles_mcp.go -> agent_profiles_tools.go
- Update imports for tools package

monitor_wrappers.go:
- New file with wrapper types for alert/notification monitors
- Enables interface-based dependency injection
This commit is contained in:
rcourtman
2026-01-19 19:20:00 +00:00
parent 3cdc5ec6c5
commit ffb8928dbf
5 changed files with 289 additions and 193 deletions

View File

@@ -7,7 +7,7 @@ import (
"time"
"github.com/google/uuid"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/mcp"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/tools"
"github.com/rcourtman/pulse-go-rewrite/internal/config"
"github.com/rcourtman/pulse-go-rewrite/internal/license"
"github.com/rcourtman/pulse-go-rewrite/internal/models"
@@ -155,7 +155,7 @@ func (m *MCPAgentProfileManager) AssignProfile(_ context.Context, agentID, profi
return profile.Name, nil
}
func (m *MCPAgentProfileManager) GetAgentScope(_ context.Context, agentID string) (*mcp.AgentScope, error) {
func (m *MCPAgentProfileManager) GetAgentScope(_ context.Context, agentID string) (*tools.AgentScope, error) {
if err := m.requireLicense(); err != nil {
return nil, err
}
@@ -190,7 +190,7 @@ func (m *MCPAgentProfileManager) GetAgentScope(_ context.Context, agentID string
for _, profile := range profiles {
if profile.ID == assignment.ProfileID {
return &mcp.AgentScope{
return &tools.AgentScope{
AgentID: agentID,
ProfileID: profile.ID,
ProfileName: profile.Name,

View File

@@ -9,26 +9,73 @@ import (
"github.com/rcourtman/pulse-go-rewrite/internal/agentexec"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/approval"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/opencode"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/chat"
"github.com/rcourtman/pulse-go-rewrite/internal/config"
"github.com/rcourtman/pulse-go-rewrite/internal/models"
"github.com/rs/zerolog/log"
)
// AIHandler handles all AI endpoints using OpenCode
// AIPersistence interface for loading/saving AI config
type AIPersistence interface {
LoadAIConfig() (*config.AIConfig, error)
}
// AIService interface for the AI chat service - enables mocking in tests
type AIService interface {
Start(ctx context.Context) error
Stop(ctx context.Context) error
Restart(ctx context.Context, newCfg *config.AIConfig) error
IsRunning() bool
Execute(ctx context.Context, req chat.ExecuteRequest) (map[string]interface{}, error)
ExecuteStream(ctx context.Context, req chat.ExecuteRequest, callback chat.StreamCallback) error
ListSessions(ctx context.Context) ([]chat.Session, error)
CreateSession(ctx context.Context) (*chat.Session, error)
DeleteSession(ctx context.Context, sessionID string) error
GetMessages(ctx context.Context, sessionID string) ([]chat.Message, error)
AbortSession(ctx context.Context, sessionID string) error
SummarizeSession(ctx context.Context, sessionID string) (map[string]interface{}, error)
GetSessionDiff(ctx context.Context, sessionID string) (map[string]interface{}, error)
ForkSession(ctx context.Context, sessionID string) (*chat.Session, error)
RevertSession(ctx context.Context, sessionID string) (map[string]interface{}, error)
UnrevertSession(ctx context.Context, sessionID string) (map[string]interface{}, error)
AnswerQuestion(ctx context.Context, questionID string, answers []chat.QuestionAnswer) error
SetAlertProvider(provider chat.MCPAlertProvider)
SetFindingsProvider(provider chat.MCPFindingsProvider)
SetBaselineProvider(provider chat.MCPBaselineProvider)
SetPatternProvider(provider chat.MCPPatternProvider)
SetMetricsHistory(provider chat.MCPMetricsHistoryProvider)
SetAgentProfileManager(manager chat.AgentProfileManager)
SetStorageProvider(provider chat.MCPStorageProvider)
SetBackupProvider(provider chat.MCPBackupProvider)
SetDiskHealthProvider(provider chat.MCPDiskHealthProvider)
SetUpdatesProvider(provider chat.MCPUpdatesProvider)
SetFindingsManager(manager chat.FindingsManager)
SetMetadataUpdater(updater chat.MetadataUpdater)
UpdateControlSettings(cfg *config.AIConfig)
GetBaseURL() string
}
// AIHandler handles all AI endpoints using direct AI integration
type AIHandler struct {
config *config.Config
persistence *config.ConfigPersistence
service *opencode.Service
persistence AIPersistence
service AIService
agentServer *agentexec.Server
}
// newChatService is the factory function for creating the AI service.
// Can be swapped in tests for mocking.
var newChatService = func(cfg chat.Config) AIService {
return chat.NewService(cfg)
}
// NewAIHandler creates a new AI handler
func NewAIHandler(cfg *config.Config, persistence *config.ConfigPersistence, agentServer *agentexec.Server) *AIHandler {
func NewAIHandler(cfg *config.Config, persistence AIPersistence, agentServer *agentexec.Server) *AIHandler {
return &AIHandler{
config: cfg,
persistence: persistence,
agentServer: agentServer,
// service will be initialized in Start()
}
}
@@ -37,7 +84,7 @@ type AIStateProvider interface {
GetState() models.StateSnapshot
}
// Start initializes and starts the OpenCode service
// Start initializes and starts the AI chat service
func (h *AIHandler) Start(ctx context.Context, stateProvider AIStateProvider) error {
log.Info().Msg("AIHandler.Start called")
aiCfg := h.loadAIConfig()
@@ -50,24 +97,26 @@ func (h *AIHandler) Start(ctx context.Context, stateProvider AIStateProvider) er
return nil
}
log.Info().Bool("enabled", aiCfg.Enabled).Str("model", aiCfg.Model).Msg("Starting OpenCode service")
h.service = opencode.NewService(opencode.Config{
// Determine data directory
dataDir := aiCfg.OpenCodeDataDir
if dataDir == "" {
dataDir = "/tmp/pulse-ai"
}
log.Info().Bool("enabled", aiCfg.Enabled).Str("model", aiCfg.Model).Msg("Starting AI chat service")
h.service = newChatService(chat.Config{
AIConfig: aiCfg,
StateProvider: stateProvider,
AgentServer: h.agentServer,
DataDir: dataDir,
})
if err := h.service.Start(ctx); err != nil {
log.Error().Err(err).Msg("Failed to start OpenCode service")
log.Error().Err(err).Msg("Failed to start AI chat service")
return err
}
// Initialize approval store for command approval workflow
dataDir := aiCfg.OpenCodeDataDir
if dataDir == "" {
dataDir = "/tmp/pulse-opencode"
}
approvalStore, err := approval.NewStore(approval.StoreConfig{
DataDir: dataDir,
DefaultTimeout: 5 * time.Minute,
@@ -81,11 +130,11 @@ func (h *AIHandler) Start(ctx context.Context, stateProvider AIStateProvider) er
log.Info().Str("data_dir", dataDir).Msg("Approval store initialized")
}
log.Info().Msg("Pulse AI started (powered by OpenCode)")
log.Info().Msg("Pulse AI started (direct integration)")
return nil
}
// Stop stops the OpenCode service
// Stop stops the AI chat service
func (h *AIHandler) Stop(ctx context.Context) error {
if h.service != nil {
return h.service.Stop(ctx)
@@ -93,7 +142,7 @@ func (h *AIHandler) Stop(ctx context.Context) error {
return nil
}
// Restart restarts the OpenCode service with updated configuration
// Restart restarts the AI chat service with updated configuration
// Call this when model or other settings change
func (h *AIHandler) Restart(ctx context.Context) error {
if h.service == nil || !h.service.IsRunning() {
@@ -109,8 +158,8 @@ func (h *AIHandler) IsRunning() bool {
return h.service != nil && h.service.IsRunning()
}
// GetService returns the underlying OpenCode service for direct access
func (h *AIHandler) GetService() *opencode.Service {
// GetService returns the underlying AI chat service
func (h *AIHandler) GetService() AIService {
return h.service
}
@@ -174,6 +223,15 @@ func (h *AIHandler) HandleChat(w http.ResponseWriter, r *http.Request) {
return
}
preview := req.Prompt
if len(preview) > 100 {
preview = preview[:100] + "..."
}
log.Info().
Str("sessionId", req.SessionID).
Str("prompt_preview", preview).
Msg("AIHandler: Received chat request")
// Set up SSE
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
@@ -223,7 +281,7 @@ func (h *AIHandler) HandleChat(w http.ResponseWriter, r *http.Request) {
defer close(heartbeatDone)
// Write helper
writeEvent := func(event opencode.StreamEvent) {
writeEvent := func(event chat.StreamEvent) {
if clientDisconnected.Load() {
return
}
@@ -240,31 +298,27 @@ func (h *AIHandler) HandleChat(w http.ResponseWriter, r *http.Request) {
flusher.Flush()
}
// Stream from OpenCode
err := h.service.ExecuteStream(ctx, opencode.ExecuteRequest{
// Stream from AI chat service
err := h.service.ExecuteStream(ctx, chat.ExecuteRequest{
Prompt: req.Prompt,
SessionID: req.SessionID,
Model: req.Model,
}, func(event opencode.StreamEvent) {
}, func(event chat.StreamEvent) {
writeEvent(event)
})
if err != nil {
log.Error().Err(err).Msg("Chat stream error")
errData, _ := json.Marshal(err.Error())
writeEvent(opencode.StreamEvent{Type: "error", Data: errData})
writeEvent(chat.StreamEvent{Type: "error", Data: errData})
}
// Send done
writeEvent(opencode.StreamEvent{Type: "done", Data: nil})
writeEvent(chat.StreamEvent{Type: "done", Data: nil})
}
// HandleSessions handles GET /api/ai/sessions - list sessions
func (h *AIHandler) HandleSessions(w http.ResponseWriter, r *http.Request) {
if !CheckAuth(h.config, w, r) {
return
}
if !h.IsRunning() {
http.Error(w, "AI is not running", http.StatusServiceUnavailable)
return
@@ -282,10 +336,6 @@ func (h *AIHandler) HandleSessions(w http.ResponseWriter, r *http.Request) {
// HandleCreateSession handles POST /api/ai/sessions - create session
func (h *AIHandler) HandleCreateSession(w http.ResponseWriter, r *http.Request) {
if !CheckAuth(h.config, w, r) {
return
}
if !h.IsRunning() {
http.Error(w, "AI is not running", http.StatusServiceUnavailable)
return
@@ -303,10 +353,6 @@ func (h *AIHandler) HandleCreateSession(w http.ResponseWriter, r *http.Request)
// HandleDeleteSession handles DELETE /api/ai/sessions/{id}
func (h *AIHandler) HandleDeleteSession(w http.ResponseWriter, r *http.Request, sessionID string) {
if !CheckAuth(h.config, w, r) {
return
}
if !h.IsRunning() {
http.Error(w, "AI is not running", http.StatusServiceUnavailable)
return
@@ -322,10 +368,6 @@ func (h *AIHandler) HandleDeleteSession(w http.ResponseWriter, r *http.Request,
// HandleMessages handles GET /api/ai/sessions/{id}/messages
func (h *AIHandler) HandleMessages(w http.ResponseWriter, r *http.Request, sessionID string) {
if !CheckAuth(h.config, w, r) {
return
}
if !h.IsRunning() {
http.Error(w, "AI is not running", http.StatusServiceUnavailable)
return
@@ -343,10 +385,6 @@ func (h *AIHandler) HandleMessages(w http.ResponseWriter, r *http.Request, sessi
// HandleAbort handles POST /api/ai/sessions/{id}/abort
func (h *AIHandler) HandleAbort(w http.ResponseWriter, r *http.Request, sessionID string) {
if !CheckAuth(h.config, w, r) {
return
}
if !h.IsRunning() {
http.Error(w, "AI is not running", http.StatusServiceUnavailable)
return
@@ -362,13 +400,9 @@ func (h *AIHandler) HandleAbort(w http.ResponseWriter, r *http.Request, sessionI
// HandleStatus handles GET /api/ai/status
func (h *AIHandler) HandleStatus(w http.ResponseWriter, r *http.Request) {
if !CheckAuth(h.config, w, r) {
return
}
status := map[string]interface{}{
"running": h.IsRunning(),
"engine": "opencode",
"engine": "direct",
}
w.Header().Set("Content-Type", "application/json")
@@ -378,10 +412,6 @@ func (h *AIHandler) HandleStatus(w http.ResponseWriter, r *http.Request) {
// HandleSummarize handles POST /api/ai/sessions/{id}/summarize
// Compresses context when nearing model limits
func (h *AIHandler) HandleSummarize(w http.ResponseWriter, r *http.Request, sessionID string) {
if !CheckAuth(h.config, w, r) {
return
}
if !h.IsRunning() {
http.Error(w, "AI is not running", http.StatusServiceUnavailable)
return
@@ -400,10 +430,6 @@ func (h *AIHandler) HandleSummarize(w http.ResponseWriter, r *http.Request, sess
// HandleDiff handles GET /api/ai/sessions/{id}/diff
// Returns file changes made during the session
func (h *AIHandler) HandleDiff(w http.ResponseWriter, r *http.Request, sessionID string) {
if !CheckAuth(h.config, w, r) {
return
}
if !h.IsRunning() {
http.Error(w, "AI is not running", http.StatusServiceUnavailable)
return
@@ -422,10 +448,6 @@ func (h *AIHandler) HandleDiff(w http.ResponseWriter, r *http.Request, sessionID
// HandleFork handles POST /api/ai/sessions/{id}/fork
// Creates a branch point in the conversation
func (h *AIHandler) HandleFork(w http.ResponseWriter, r *http.Request, sessionID string) {
if !CheckAuth(h.config, w, r) {
return
}
if !h.IsRunning() {
http.Error(w, "AI is not running", http.StatusServiceUnavailable)
return
@@ -444,10 +466,6 @@ func (h *AIHandler) HandleFork(w http.ResponseWriter, r *http.Request, sessionID
// HandleRevert handles POST /api/ai/sessions/{id}/revert
// Reverts file changes from the session
func (h *AIHandler) HandleRevert(w http.ResponseWriter, r *http.Request, sessionID string) {
if !CheckAuth(h.config, w, r) {
return
}
if !h.IsRunning() {
http.Error(w, "AI is not running", http.StatusServiceUnavailable)
return
@@ -466,10 +484,6 @@ func (h *AIHandler) HandleRevert(w http.ResponseWriter, r *http.Request, session
// HandleUnrevert handles POST /api/ai/sessions/{id}/unrevert
// Restores previously reverted changes
func (h *AIHandler) HandleUnrevert(w http.ResponseWriter, r *http.Request, sessionID string) {
if !CheckAuth(h.config, w, r) {
return
}
if !h.IsRunning() {
http.Error(w, "AI is not running", http.StatusServiceUnavailable)
return
@@ -495,10 +509,6 @@ type AnswerQuestionRequest struct {
// HandleAnswerQuestion handles POST /api/ai/question/{questionID}/answer
func (h *AIHandler) HandleAnswerQuestion(w http.ResponseWriter, r *http.Request, questionID string) {
if !CheckAuth(h.config, w, r) {
return
}
if !h.IsRunning() {
http.Error(w, "AI is not running", http.StatusServiceUnavailable)
return
@@ -510,15 +520,20 @@ func (h *AIHandler) HandleAnswerQuestion(w http.ResponseWriter, r *http.Request,
return
}
// Convert to opencode.QuestionAnswer
answers := make([]opencode.QuestionAnswer, len(req.Answers))
// Convert to chat.QuestionAnswer
answers := make([]chat.QuestionAnswer, len(req.Answers))
for i, a := range req.Answers {
answers[i] = opencode.QuestionAnswer{
answers[i] = chat.QuestionAnswer{
ID: a.ID,
Value: a.Value,
}
}
log.Info().
Str("questionID", questionID).
Int("answers_count", len(answers)).
Msg("AIHandler: Received answer to question")
if err := h.service.AnswerQuestion(r.Context(), questionID, answers); err != nil {
log.Error().Err(err).Str("questionID", questionID).Msg("Failed to answer question")
http.Error(w, err.Error(), http.StatusInternalServerError)
@@ -529,36 +544,92 @@ func (h *AIHandler) HandleAnswerQuestion(w http.ResponseWriter, r *http.Request,
}
// SetAlertProvider sets the alert provider for MCP tools
func (h *AIHandler) SetAlertProvider(provider opencode.MCPAlertProvider) {
func (h *AIHandler) SetAlertProvider(provider chat.MCPAlertProvider) {
if h.service != nil {
h.service.SetAlertProvider(provider)
}
}
// SetFindingsProvider sets the findings provider for MCP tools
func (h *AIHandler) SetFindingsProvider(provider opencode.MCPFindingsProvider) {
func (h *AIHandler) SetFindingsProvider(provider chat.MCPFindingsProvider) {
if h.service != nil {
h.service.SetFindingsProvider(provider)
}
}
// SetBaselineProvider sets the baseline provider for MCP tools
func (h *AIHandler) SetBaselineProvider(provider opencode.MCPBaselineProvider) {
func (h *AIHandler) SetBaselineProvider(provider chat.MCPBaselineProvider) {
if h.service != nil {
h.service.SetBaselineProvider(provider)
}
}
// SetPatternProvider sets the pattern provider for MCP tools
func (h *AIHandler) SetPatternProvider(provider opencode.MCPPatternProvider) {
func (h *AIHandler) SetPatternProvider(provider chat.MCPPatternProvider) {
if h.service != nil {
h.service.SetPatternProvider(provider)
}
}
// SetMetricsHistory sets the metrics history provider for MCP tools
func (h *AIHandler) SetMetricsHistory(provider opencode.MCPMetricsHistoryProvider) {
func (h *AIHandler) SetMetricsHistory(provider chat.MCPMetricsHistoryProvider) {
if h.service != nil {
h.service.SetMetricsHistory(provider)
}
}
// SetAgentProfileManager sets the agent profile manager for MCP tools
func (h *AIHandler) SetAgentProfileManager(manager chat.AgentProfileManager) {
if h.service != nil {
h.service.SetAgentProfileManager(manager)
}
}
// SetStorageProvider sets the storage provider for MCP tools
func (h *AIHandler) SetStorageProvider(provider chat.MCPStorageProvider) {
if h.service != nil {
h.service.SetStorageProvider(provider)
}
}
// SetBackupProvider sets the backup provider for MCP tools
func (h *AIHandler) SetBackupProvider(provider chat.MCPBackupProvider) {
if h.service != nil {
h.service.SetBackupProvider(provider)
}
}
// SetDiskHealthProvider sets the disk health provider for MCP tools
func (h *AIHandler) SetDiskHealthProvider(provider chat.MCPDiskHealthProvider) {
if h.service != nil {
h.service.SetDiskHealthProvider(provider)
}
}
// SetUpdatesProvider sets the updates provider for MCP tools
func (h *AIHandler) SetUpdatesProvider(provider chat.MCPUpdatesProvider) {
if h.service != nil {
h.service.SetUpdatesProvider(provider)
}
}
// SetFindingsManager sets the findings manager for MCP tools
func (h *AIHandler) SetFindingsManager(manager chat.FindingsManager) {
if h.service != nil {
h.service.SetFindingsManager(manager)
}
}
// SetMetadataUpdater sets the metadata updater for MCP tools
func (h *AIHandler) SetMetadataUpdater(updater chat.MetadataUpdater) {
if h.service != nil {
h.service.SetMetadataUpdater(updater)
}
}
// UpdateControlSettings updates control settings in the service
func (h *AIHandler) UpdateControlSettings(cfg *config.AIConfig) {
if h.service != nil {
h.service.UpdateControlSettings(cfg)
}
}

View File

@@ -859,6 +859,7 @@ func (h *AISettingsHandler) HandleListModels(w http.ResponseWriter, r *http.Requ
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Notable bool `json:"notable"`
}
type Response struct {
@@ -882,14 +883,21 @@ func (h *AISettingsHandler) HandleListModels(w http.ResponseWriter, r *http.Requ
// Convert provider models to response format
responseModels := make([]ModelInfo, 0, len(models))
notableCount := 0
for _, m := range models {
if m.Notable {
notableCount++
}
responseModels = append(responseModels, ModelInfo{
ID: m.ID,
Name: m.Name,
Description: m.Description,
Notable: m.Notable,
})
}
log.Debug().Int("total", len(responseModels)).Int("notable", notableCount).Msg("Returning AI models")
resp := Response{
Models: responseModels,
Cached: cached,
@@ -3841,73 +3849,16 @@ func (h *AISettingsHandler) HandleApproveCommand(w http.ResponseWriter, r *http.
LogAuditEvent("ai_command_approved", username, GetClientIP(r), r.URL.Path, true,
fmt.Sprintf("Approved command: %s", truncateForLog(req.Command, 100)))
// Execute the approved command
var execResult *agentexec.CommandResultPayload
var execErr error
if h.agentServer != nil && req.TargetName != "" {
// Debug: list connected agents
connectedAgents := h.agentServer.GetConnectedAgents()
agentHostnames := make([]string, len(connectedAgents))
for i, a := range connectedAgents {
agentHostnames[i] = a.Hostname
}
log.Debug().
Str("target_name", req.TargetName).
Strs("connected_agents", agentHostnames).
Int("agent_count", len(connectedAgents)).
Msg("Looking for agent to execute approved command")
// Find agent for the target host
agentID, found := h.agentServer.GetAgentForHost(req.TargetName)
if found {
ctx, cancel := context.WithTimeout(r.Context(), 60*time.Second)
defer cancel()
execResult, execErr = h.agentServer.ExecuteCommand(ctx, agentID, agentexec.ExecuteCommandPayload{
Command: req.Command,
TargetType: req.TargetType,
TargetID: req.TargetID,
})
if execErr == nil {
LogAuditEvent("ai_command_executed", username, GetClientIP(r), r.URL.Path, true,
fmt.Sprintf("Executed command: %s (exit: %d)", truncateForLog(req.Command, 100), execResult.ExitCode))
} else {
LogAuditEvent("ai_command_failed", username, GetClientIP(r), r.URL.Path, false,
fmt.Sprintf("Failed to execute command: %s - %v", truncateForLog(req.Command, 100), execErr))
}
} else {
// Agent not found - provide helpful error message
connectedAgents := h.agentServer.GetConnectedAgents()
hostnames := make([]string, len(connectedAgents))
for i, a := range connectedAgents {
hostnames[i] = a.Hostname
}
execErr = fmt.Errorf("no exec agent connected for host '%s'. Connected agents: %v. The host agent may need to be updated to support remote execution. See /api/ai/agents for details", req.TargetName, hostnames)
}
}
// Note: Command execution is handled by the agentic loop after it detects approval.
// The loop will re-execute the tool with the approval_id, and the tool will
// check the approval status and execute if approved.
// This avoids double execution (once here, once in agentic loop).
response := map[string]interface{}{
"approved": true,
"request": req,
}
if execResult != nil {
response["executed"] = true
response["result"] = map[string]interface{}{
"stdout": execResult.Stdout,
"stderr": execResult.Stderr,
"exit_code": execResult.ExitCode,
}
response["message"] = "Command approved and executed."
} else if execErr != nil {
response["executed"] = false
response["error"] = execErr.Error()
response["message"] = "Command approved but execution failed."
} else {
response["executed"] = false
response["message"] = "Command approved. No agent available for execution."
"approved": true,
"request": req,
"approval_id": req.ID,
"message": "Command approved. The AI will now execute it.",
}
w.Header().Set("Content-Type", "application/json")

View File

@@ -0,0 +1,70 @@
package api
import (
"github.com/rcourtman/pulse-go-rewrite/internal/ai/memory"
"github.com/rcourtman/pulse-go-rewrite/internal/models"
"github.com/rcourtman/pulse-go-rewrite/internal/monitoring"
"github.com/rcourtman/pulse-go-rewrite/internal/notifications"
)
// AlertMonitorWrapper wraps *monitoring.Monitor to satisfy AlertMonitor interface.
type AlertMonitorWrapper struct {
m *monitoring.Monitor
}
// NewAlertMonitorWrapper creates a new wrapper for AlertMonitor.
func NewAlertMonitorWrapper(m *monitoring.Monitor) AlertMonitor {
if m == nil {
return nil
}
return &AlertMonitorWrapper{m: m}
}
func (w *AlertMonitorWrapper) GetAlertManager() AlertManager {
return w.m.GetAlertManager()
}
func (w *AlertMonitorWrapper) GetConfigPersistence() ConfigPersistence {
return w.m.GetConfigPersistence()
}
func (w *AlertMonitorWrapper) GetIncidentStore() *memory.IncidentStore {
return w.m.GetIncidentStore()
}
func (w *AlertMonitorWrapper) GetNotificationManager() *notifications.NotificationManager {
return w.m.GetNotificationManager()
}
func (w *AlertMonitorWrapper) SyncAlertState() {
w.m.SyncAlertState()
}
func (w *AlertMonitorWrapper) GetState() models.StateSnapshot {
return w.m.GetState()
}
// NotificationMonitorWrapper wraps *monitoring.Monitor to satisfy NotificationMonitor interface.
type NotificationMonitorWrapper struct {
m *monitoring.Monitor
}
// NewNotificationMonitorWrapper creates a new wrapper for NotificationMonitor.
func NewNotificationMonitorWrapper(m *monitoring.Monitor) NotificationMonitor {
if m == nil {
return nil
}
return &NotificationMonitorWrapper{m: m}
}
func (w *NotificationMonitorWrapper) GetNotificationManager() NotificationManager {
return w.m.GetNotificationManager()
}
func (w *NotificationMonitorWrapper) GetConfigPersistence() NotificationConfigPersistence {
return w.m.GetConfigPersistence()
}
func (w *NotificationMonitorWrapper) GetState() models.StateSnapshot {
return w.m.GetState()
}

View File

@@ -28,8 +28,8 @@ import (
"github.com/rcourtman/pulse-go-rewrite/internal/agentbinaries"
"github.com/rcourtman/pulse-go-rewrite/internal/agentexec"
"github.com/rcourtman/pulse-go-rewrite/internal/ai"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/mcp"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/opencode"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/chat"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/tools"
"github.com/rcourtman/pulse-go-rewrite/internal/alerts"
"github.com/rcourtman/pulse-go-rewrite/internal/config"
"github.com/rcourtman/pulse-go-rewrite/internal/license"
@@ -200,8 +200,8 @@ func NewRouter(cfg *config.Config, monitor *monitoring.Monitor, wsHub *websocket
// setupRoutes configures all routes
func (r *Router) setupRoutes() {
// Create handlers
r.alertHandlers = NewAlertHandlers(r.monitor, r.wsHub)
r.notificationHandlers = NewNotificationHandlers(r.monitor)
r.alertHandlers = NewAlertHandlers(NewAlertMonitorWrapper(r.monitor), r.wsHub)
r.notificationHandlers = NewNotificationHandlers(NewNotificationMonitorWrapper(r.monitor))
r.notificationQueueHandlers = NewNotificationQueueHandlers(r.monitor)
guestMetadataHandler := NewGuestMetadataHandler(r.config.DataPath)
dockerMetadataHandler := NewDockerMetadataHandler(r.config.DataPath)
@@ -1718,13 +1718,13 @@ func (r *Router) Handler() http.Handler {
func (r *Router) SetMonitor(m *monitoring.Monitor) {
r.monitor = m
if r.alertHandlers != nil {
r.alertHandlers.SetMonitor(m)
r.alertHandlers.SetMonitor(NewAlertMonitorWrapper(m))
}
if r.configHandlers != nil {
r.configHandlers.SetMonitor(m)
}
if r.notificationHandlers != nil {
r.notificationHandlers.SetMonitor(m)
r.notificationHandlers.SetMonitor(NewNotificationMonitorWrapper(m))
}
if r.dockerAgentHandlers != nil {
r.dockerAgentHandlers.SetMonitor(m)
@@ -1972,19 +1972,23 @@ func (r *Router) StartOpenCodeAI(ctx context.Context) {
// Wire up MCP tool providers so AI can access real data
r.wireOpenCodeProviders()
// Wire up OpenCode patrol if UseOpenCode is enabled
// Wire up AI patrol if AI is running
aiCfg := r.aiHandler.GetAIConfig()
if aiCfg != nil && aiCfg.UseOpenCode && r.aiHandler.IsRunning() {
if aiCfg != nil && r.aiHandler.IsRunning() {
service := r.aiHandler.GetService()
if service != nil {
// Create OpenCode patrol service
ocPatrol := opencode.NewPatrolService(service)
// Create patrol service - need concrete type for patrol
chatService, ok := service.(*chat.Service)
if !ok {
log.Warn().Msg("AI service is not a *chat.Service, patrol disabled")
}
aiPatrol := chat.NewPatrolService(chatService)
// Wire to existing patrol service
if r.aiSettingsHandler != nil {
if patrolSvc := r.aiSettingsHandler.GetAIService().GetPatrolService(); patrolSvc != nil {
patrolSvc.SetOpenCodePatrol(ocPatrol, true)
log.Info().Msg("OpenCode patrol integration enabled")
patrolSvc.SetChatPatrol(aiPatrol, true)
log.Info().Msg("AI patrol integration enabled")
}
}
}
@@ -2005,7 +2009,7 @@ func (r *Router) wireOpenCodeProviders() {
// Wire alert provider
if r.monitor != nil {
if alertManager := r.monitor.GetAlertManager(); alertManager != nil {
alertAdapter := mcp.NewAlertManagerMCPAdapter(alertManager)
alertAdapter := tools.NewAlertManagerMCPAdapter(alertManager)
if alertAdapter != nil {
service.SetAlertProvider(alertAdapter)
log.Debug().Msg("OpenCode: Alert provider wired")
@@ -2034,7 +2038,7 @@ func (r *Router) wireOpenCodeProviders() {
// Wire storage provider
if r.monitor != nil {
storageAdapter := mcp.NewStorageMCPAdapter(r.monitor)
storageAdapter := tools.NewStorageMCPAdapter(r.monitor)
if storageAdapter != nil {
service.SetStorageProvider(storageAdapter)
log.Debug().Msg("OpenCode: Storage provider wired")
@@ -2043,7 +2047,7 @@ func (r *Router) wireOpenCodeProviders() {
// Wire backup provider
if r.monitor != nil {
backupAdapter := mcp.NewBackupMCPAdapter(r.monitor)
backupAdapter := tools.NewBackupMCPAdapter(r.monitor)
if backupAdapter != nil {
service.SetBackupProvider(backupAdapter)
log.Debug().Msg("OpenCode: Backup provider wired")
@@ -2052,7 +2056,7 @@ func (r *Router) wireOpenCodeProviders() {
// Wire disk health provider
if r.monitor != nil {
diskHealthAdapter := mcp.NewDiskHealthMCPAdapter(r.monitor)
diskHealthAdapter := tools.NewDiskHealthMCPAdapter(r.monitor)
if diskHealthAdapter != nil {
service.SetDiskHealthProvider(diskHealthAdapter)
log.Debug().Msg("OpenCode: Disk health provider wired")
@@ -2061,7 +2065,7 @@ func (r *Router) wireOpenCodeProviders() {
// Wire updates provider for Docker container updates
if r.monitor != nil {
updatesAdapter := mcp.NewUpdatesMCPAdapter(r.monitor, &updatesConfigWrapper{cfg: r.config})
updatesAdapter := tools.NewUpdatesMCPAdapter(r.monitor, &updatesConfigWrapper{cfg: r.config})
if updatesAdapter != nil {
service.SetUpdatesProvider(updatesAdapter)
log.Debug().Msg("OpenCode: Updates provider wired")
@@ -2071,7 +2075,7 @@ func (r *Router) wireOpenCodeProviders() {
// Wire metrics history provider
if r.monitor != nil {
if metricsHistory := r.monitor.GetMetricsHistory(); metricsHistory != nil {
metricsAdapter := mcp.NewMetricsHistoryMCPAdapter(
metricsAdapter := tools.NewMetricsHistoryMCPAdapter(
r.monitor,
&metricsSourceWrapper{history: metricsHistory},
)
@@ -2086,7 +2090,7 @@ func (r *Router) wireOpenCodeProviders() {
if r.aiSettingsHandler != nil {
if patrolSvc := r.aiSettingsHandler.GetAIService().GetPatrolService(); patrolSvc != nil {
if baselineStore := patrolSvc.GetBaselineStore(); baselineStore != nil {
baselineAdapter := mcp.NewBaselineMCPAdapter(&baselineSourceWrapper{store: baselineStore})
baselineAdapter := tools.NewBaselineMCPAdapter(&baselineSourceWrapper{store: baselineStore})
if baselineAdapter != nil {
service.SetBaselineProvider(baselineAdapter)
log.Debug().Msg("OpenCode: Baseline provider wired")
@@ -2099,7 +2103,7 @@ func (r *Router) wireOpenCodeProviders() {
if r.aiSettingsHandler != nil {
if patrolSvc := r.aiSettingsHandler.GetAIService().GetPatrolService(); patrolSvc != nil {
if patternDetector := patrolSvc.GetPatternDetector(); patternDetector != nil {
patternAdapter := mcp.NewPatternMCPAdapter(
patternAdapter := tools.NewPatternMCPAdapter(
&patternSourceWrapper{detector: patternDetector},
r.monitor,
)
@@ -2114,7 +2118,7 @@ func (r *Router) wireOpenCodeProviders() {
// Wire findings manager
if r.aiSettingsHandler != nil {
if patrolSvc := r.aiSettingsHandler.GetAIService().GetPatrolService(); patrolSvc != nil {
findingsManagerAdapter := mcp.NewFindingsManagerMCPAdapter(patrolSvc)
findingsManagerAdapter := tools.NewFindingsManagerMCPAdapter(patrolSvc)
if findingsManagerAdapter != nil {
service.SetFindingsManager(findingsManagerAdapter)
log.Debug().Msg("OpenCode: Findings manager wired")
@@ -2124,7 +2128,7 @@ func (r *Router) wireOpenCodeProviders() {
// Wire metadata updater
if r.aiSettingsHandler != nil {
metadataAdapter := mcp.NewMetadataUpdaterMCPAdapter(r.aiSettingsHandler.GetAIService())
metadataAdapter := tools.NewMetadataUpdaterMCPAdapter(r.aiSettingsHandler.GetAIService())
if metadataAdapter != nil {
service.SetMetadataUpdater(metadataAdapter)
log.Debug().Msg("OpenCode: Metadata updater wired")
@@ -2134,34 +2138,34 @@ func (r *Router) wireOpenCodeProviders() {
log.Info().Msg("OpenCode MCP tool providers wired")
}
// metricsSourceWrapper wraps monitoring.MetricsHistory to implement mcp.MetricsSource
// metricsSourceWrapper wraps monitoring.MetricsHistory to implement tools.MetricsSource
type metricsSourceWrapper struct {
history *monitoring.MetricsHistory
}
func (w *metricsSourceWrapper) GetGuestMetrics(guestID string, metricType string, duration time.Duration) []mcp.RawMetricPoint {
func (w *metricsSourceWrapper) GetGuestMetrics(guestID string, metricType string, duration time.Duration) []tools.RawMetricPoint {
points := w.history.GetGuestMetrics(guestID, metricType, duration)
return convertMetricPoints(points)
}
func (w *metricsSourceWrapper) GetNodeMetrics(nodeID string, metricType string, duration time.Duration) []mcp.RawMetricPoint {
func (w *metricsSourceWrapper) GetNodeMetrics(nodeID string, metricType string, duration time.Duration) []tools.RawMetricPoint {
points := w.history.GetNodeMetrics(nodeID, metricType, duration)
return convertMetricPoints(points)
}
func (w *metricsSourceWrapper) GetAllGuestMetrics(guestID string, duration time.Duration) map[string][]mcp.RawMetricPoint {
func (w *metricsSourceWrapper) GetAllGuestMetrics(guestID string, duration time.Duration) map[string][]tools.RawMetricPoint {
metricsMap := w.history.GetAllGuestMetrics(guestID, duration)
result := make(map[string][]mcp.RawMetricPoint, len(metricsMap))
result := make(map[string][]tools.RawMetricPoint, len(metricsMap))
for key, points := range metricsMap {
result[key] = convertMetricPoints(points)
}
return result
}
func convertMetricPoints(points []monitoring.MetricPoint) []mcp.RawMetricPoint {
result := make([]mcp.RawMetricPoint, len(points))
func convertMetricPoints(points []monitoring.MetricPoint) []tools.RawMetricPoint {
result := make([]tools.RawMetricPoint, len(points))
for i, p := range points {
result[i] = mcp.RawMetricPoint{
result[i] = tools.RawMetricPoint{
Value: p.Value,
Timestamp: p.Timestamp,
}
@@ -2169,7 +2173,7 @@ func convertMetricPoints(points []monitoring.MetricPoint) []mcp.RawMetricPoint {
return result
}
// baselineSourceWrapper wraps baseline.Store to implement mcp.BaselineSource
// baselineSourceWrapper wraps baseline.Store to implement tools.BaselineSource
type baselineSourceWrapper struct {
store *ai.BaselineStore
}
@@ -2185,7 +2189,7 @@ func (w *baselineSourceWrapper) GetBaseline(resourceID, metric string) (mean, st
return baseline.Mean, baseline.StdDev, baseline.SampleCount, true
}
func (w *baselineSourceWrapper) GetAllBaselines() map[string]map[string]mcp.BaselineData {
func (w *baselineSourceWrapper) GetAllBaselines() map[string]map[string]tools.BaselineData {
if w.store == nil {
return nil
}
@@ -2194,7 +2198,7 @@ func (w *baselineSourceWrapper) GetAllBaselines() map[string]map[string]mcp.Base
return nil
}
result := make(map[string]map[string]mcp.BaselineData)
result := make(map[string]map[string]tools.BaselineData)
for key, flat := range allFlat {
// key format is "resourceID:metric"
parts := strings.SplitN(key, ":", 2)
@@ -2204,9 +2208,9 @@ func (w *baselineSourceWrapper) GetAllBaselines() map[string]map[string]mcp.Base
resourceID, metric := parts[0], parts[1]
if result[resourceID] == nil {
result[resourceID] = make(map[string]mcp.BaselineData)
result[resourceID] = make(map[string]tools.BaselineData)
}
result[resourceID][metric] = mcp.BaselineData{
result[resourceID][metric] = tools.BaselineData{
Mean: flat.Mean,
StdDev: flat.StdDev,
SampleCount: flat.Samples,
@@ -2215,12 +2219,12 @@ func (w *baselineSourceWrapper) GetAllBaselines() map[string]map[string]mcp.Base
return result
}
// patternSourceWrapper wraps patterns.Detector to implement mcp.PatternSource
// patternSourceWrapper wraps patterns.Detector to implement tools.PatternSource
type patternSourceWrapper struct {
detector *ai.PatternDetector
}
func (w *patternSourceWrapper) GetPatterns() []mcp.PatternData {
func (w *patternSourceWrapper) GetPatterns() []tools.PatternData {
if w.detector == nil {
return nil
}
@@ -2230,12 +2234,12 @@ func (w *patternSourceWrapper) GetPatterns() []mcp.PatternData {
return nil
}
result := make([]mcp.PatternData, 0, len(patterns))
result := make([]tools.PatternData, 0, len(patterns))
for _, p := range patterns {
if p == nil {
continue
}
result = append(result, mcp.PatternData{
result = append(result, tools.PatternData{
ResourceID: p.ResourceID,
PatternType: string(p.EventType),
Description: fmt.Sprintf("%s pattern with %d occurrences", p.EventType, p.Occurrences),
@@ -2246,7 +2250,7 @@ func (w *patternSourceWrapper) GetPatterns() []mcp.PatternData {
return result
}
func (w *patternSourceWrapper) GetPredictions() []mcp.PredictionData {
func (w *patternSourceWrapper) GetPredictions() []tools.PredictionData {
if w.detector == nil {
return nil
}
@@ -2256,9 +2260,9 @@ func (w *patternSourceWrapper) GetPredictions() []mcp.PredictionData {
return nil
}
result := make([]mcp.PredictionData, 0, len(predictions))
result := make([]tools.PredictionData, 0, len(predictions))
for _, p := range predictions {
result = append(result, mcp.PredictionData{
result = append(result, tools.PredictionData{
ResourceID: p.ResourceID,
IssueType: string(p.EventType),
PredictedTime: p.PredictedAt,
@@ -2269,7 +2273,7 @@ func (w *patternSourceWrapper) GetPredictions() []mcp.PredictionData {
return result
}
// updatesConfigWrapper wraps config.Config to implement mcp.UpdatesConfig
// updatesConfigWrapper wraps config.Config to implement tools.UpdatesConfig
type updatesConfigWrapper struct {
cfg *config.Config
}