Add end-to-end evaluation framework for AI assistant testing

Implement comprehensive eval framework for testing Pulse Assistant:

Core components:
- Runner: Executes scenarios against live API with SSE stream parsing
- Assertions: Reusable checks (tool usage, content, duration, errors)
- Scenarios: Multi-step test workflows with configurable assertions

Basic scenarios:
- QuickSmokeTest: Minimal functionality verification
- ReadOnlyInfrastructure: List, logs, status operations
- RoutingValidation: Command routing to correct targets
- LogTailing: Bounded log commands complete properly
- Discovery: Infrastructure discovery capabilities

Advanced scenarios:
- TroubleshootingScenario: Multi-step investigation workflow
- DeepDiveScenario: Thorough single-service investigation
- ConfigInspectionScenario: Reading configuration files
- ResourceAnalysisScenario: Cross-container resource comparison
- MultiNodeScenario: Operations across Proxmox nodes
- DockerInDockerScenario: Docker containers inside LXCs
- ContextChainScenario: Context retention across turns

Usage: go test ./internal/ai/eval -live -run TestQuickSmokeTest
This commit is contained in:
rcourtman
2026-01-28 16:49:24 +00:00
parent b2e0ae3fdb
commit a04d41ce2c
5 changed files with 1596 additions and 0 deletions

176
cmd/eval/main.go Normal file
View File

@@ -0,0 +1,176 @@
// Command eval runs Pulse Assistant evaluation scenarios against a live Pulse instance.
//
// Usage:
//
// go run ./cmd/eval # Run quick smoke test
// go run ./cmd/eval -scenario all # Run all scenarios
// go run ./cmd/eval -scenario readonly # Run read-only infrastructure scenario
// go run ./cmd/eval -list # List available scenarios
//
// Options:
//
// -scenario string Scenario to run: smoke, readonly, routing, logs, discovery, all (default "smoke")
// -url string Pulse API base URL (default "http://127.0.0.1:7655")
// -user string Username for auth (default "admin")
// -pass string Password for auth (default "admin")
// -list List available scenarios and exit
// -quiet Only show summary, not step-by-step output
package main
import (
"flag"
"fmt"
"os"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/eval"
)
func main() {
scenario := flag.String("scenario", "smoke", "Scenario to run: smoke, readonly, routing, logs, discovery, all")
url := flag.String("url", "http://127.0.0.1:7655", "Pulse API base URL")
user := flag.String("user", "admin", "Username for auth")
pass := flag.String("pass", "admin", "Password for auth")
list := flag.Bool("list", false, "List available scenarios and exit")
quiet := flag.Bool("quiet", false, "Only show summary, not step-by-step output")
flag.Parse()
if *list {
listScenarios()
return
}
config := eval.Config{
BaseURL: *url,
Username: *user,
Password: *pass,
Verbose: !*quiet,
}
runner := eval.NewRunner(config)
scenarios := getScenarios(*scenario)
if len(scenarios) == 0 {
fmt.Fprintf(os.Stderr, "Unknown scenario: %s\n", *scenario)
fmt.Fprintf(os.Stderr, "Use -list to see available scenarios\n")
os.Exit(1)
}
allPassed := true
for _, s := range scenarios {
fmt.Printf("\n>>> Running scenario: %s\n", s.Name)
fmt.Printf(">>> %s\n", s.Description)
result := runner.RunScenario(s)
runner.PrintSummary(result)
if !result.Passed {
allPassed = false
}
}
if allPassed {
fmt.Printf("\n>>> ALL SCENARIOS PASSED\n")
os.Exit(0)
} else {
fmt.Printf("\n>>> SOME SCENARIOS FAILED\n")
os.Exit(1)
}
}
func listScenarios() {
fmt.Println("Available scenarios:")
fmt.Println()
fmt.Println(" Basic:")
fmt.Println(" smoke - Quick smoke test (1 step)")
fmt.Println(" readonly - Read-only infrastructure test (3 steps)")
fmt.Println(" routing - Routing validation test (2 steps)")
fmt.Println(" logs - Log tailing/bounded command test (2 steps)")
fmt.Println(" discovery - Infrastructure discovery test (2 steps)")
fmt.Println()
fmt.Println(" Advanced:")
fmt.Println(" troubleshoot - Multi-step troubleshooting workflow (4 steps)")
fmt.Println(" deepdive - Deep investigation of a service (4 steps)")
fmt.Println(" config - Configuration file inspection (3 steps)")
fmt.Println(" resources - Resource analysis and comparison (3 steps)")
fmt.Println(" multinode - Multi-node operations (3 steps)")
fmt.Println(" docker - Docker-in-LXC operations (3 steps)")
fmt.Println(" context - Context chain / follow-up questions (4 steps)")
fmt.Println()
fmt.Println(" Collections:")
fmt.Println(" all - Run all basic scenarios")
fmt.Println(" advanced - Run all advanced scenarios")
fmt.Println(" full - Run everything")
fmt.Println()
fmt.Println("Example:")
fmt.Println(" go run ./cmd/eval -scenario troubleshoot")
}
func getScenarios(name string) []eval.Scenario {
switch name {
// Basic scenarios
case "smoke":
return []eval.Scenario{eval.QuickSmokeTest()}
case "readonly":
return []eval.Scenario{eval.ReadOnlyInfrastructureScenario()}
case "routing":
return []eval.Scenario{eval.RoutingValidationScenario()}
case "logs":
return []eval.Scenario{eval.LogTailingScenario()}
case "discovery":
return []eval.Scenario{eval.DiscoveryScenario()}
// Advanced scenarios
case "troubleshoot":
return []eval.Scenario{eval.TroubleshootingScenario()}
case "deepdive":
return []eval.Scenario{eval.DeepDiveScenario()}
case "config":
return []eval.Scenario{eval.ConfigInspectionScenario()}
case "resources":
return []eval.Scenario{eval.ResourceAnalysisScenario()}
case "multinode":
return []eval.Scenario{eval.MultiNodeScenario()}
case "docker":
return []eval.Scenario{eval.DockerInDockerScenario()}
case "context":
return []eval.Scenario{eval.ContextChainScenario()}
// Collections
case "all":
return []eval.Scenario{
eval.QuickSmokeTest(),
eval.ReadOnlyInfrastructureScenario(),
eval.RoutingValidationScenario(),
eval.LogTailingScenario(),
eval.DiscoveryScenario(),
}
case "advanced":
return []eval.Scenario{
eval.TroubleshootingScenario(),
eval.DeepDiveScenario(),
eval.ConfigInspectionScenario(),
eval.ResourceAnalysisScenario(),
eval.MultiNodeScenario(),
eval.DockerInDockerScenario(),
eval.ContextChainScenario(),
}
case "full":
return []eval.Scenario{
eval.QuickSmokeTest(),
eval.ReadOnlyInfrastructureScenario(),
eval.RoutingValidationScenario(),
eval.LogTailingScenario(),
eval.DiscoveryScenario(),
eval.TroubleshootingScenario(),
eval.DeepDiveScenario(),
eval.ConfigInspectionScenario(),
eval.ResourceAnalysisScenario(),
eval.MultiNodeScenario(),
eval.DockerInDockerScenario(),
eval.ContextChainScenario(),
}
default:
return nil
}
}

View File

@@ -0,0 +1,350 @@
package eval
import (
"fmt"
"strings"
)
// === Common Assertions ===
// AssertToolUsed checks that a specific tool was called
func AssertToolUsed(toolName string) Assertion {
return func(result *StepResult) AssertionResult {
for _, tc := range result.ToolCalls {
if tc.Name == toolName {
return AssertionResult{
Name: fmt.Sprintf("tool_used:%s", toolName),
Passed: true,
Message: fmt.Sprintf("Tool '%s' was called", toolName),
}
}
}
return AssertionResult{
Name: fmt.Sprintf("tool_used:%s", toolName),
Passed: false,
Message: fmt.Sprintf("Tool '%s' was NOT called. Tools used: %v", toolName, getToolNames(result.ToolCalls)),
}
}
}
// AssertAnyToolUsed checks that at least one tool was called
func AssertAnyToolUsed() Assertion {
return func(result *StepResult) AssertionResult {
if len(result.ToolCalls) > 0 {
return AssertionResult{
Name: "any_tool_used",
Passed: true,
Message: fmt.Sprintf("%d tool(s) called: %v", len(result.ToolCalls), getToolNames(result.ToolCalls)),
}
}
return AssertionResult{
Name: "any_tool_used",
Passed: false,
Message: "No tools were called",
}
}
}
// AssertNoToolErrors checks that all tool calls succeeded
func AssertNoToolErrors() Assertion {
return func(result *StepResult) AssertionResult {
var failures []string
for _, tc := range result.ToolCalls {
if !tc.Success {
failures = append(failures, fmt.Sprintf("%s: %s", tc.Name, truncate(tc.Output, 100)))
}
}
if len(failures) == 0 {
return AssertionResult{
Name: "no_tool_errors",
Passed: true,
Message: "All tool calls succeeded",
}
}
return AssertionResult{
Name: "no_tool_errors",
Passed: false,
Message: fmt.Sprintf("Tool failures: %v", failures),
}
}
}
// AssertContentContains checks that the response contains a substring
func AssertContentContains(substring string) Assertion {
return func(result *StepResult) AssertionResult {
if strings.Contains(strings.ToLower(result.Content), strings.ToLower(substring)) {
return AssertionResult{
Name: fmt.Sprintf("content_contains:%s", truncate(substring, 20)),
Passed: true,
Message: fmt.Sprintf("Content contains '%s'", substring),
}
}
return AssertionResult{
Name: fmt.Sprintf("content_contains:%s", truncate(substring, 20)),
Passed: false,
Message: fmt.Sprintf("Content does NOT contain '%s'", substring),
}
}
}
// AssertContentNotContains checks that the response does NOT contain a substring
func AssertContentNotContains(substring string) Assertion {
return func(result *StepResult) AssertionResult {
if !strings.Contains(strings.ToLower(result.Content), strings.ToLower(substring)) {
return AssertionResult{
Name: fmt.Sprintf("content_not_contains:%s", truncate(substring, 20)),
Passed: true,
Message: fmt.Sprintf("Content does not contain '%s'", substring),
}
}
return AssertionResult{
Name: fmt.Sprintf("content_not_contains:%s", truncate(substring, 20)),
Passed: false,
Message: fmt.Sprintf("Content SHOULD NOT contain '%s' but does", substring),
}
}
}
// AssertNoPhantomDetection checks that phantom detection did not trigger
func AssertNoPhantomDetection() Assertion {
return func(result *StepResult) AssertionResult {
// The exact phantom detection message from agentic.go
phantomMessage := "I apologize, but I wasn't able to access the infrastructure tools needed to complete that request"
if strings.Contains(result.Content, phantomMessage) {
// Find where in the content it appears
idx := strings.Index(result.Content, phantomMessage)
context := result.Content[max(0, idx-50):min(len(result.Content), idx+100)]
return AssertionResult{
Name: "no_phantom_detection",
Passed: false,
Message: fmt.Sprintf("Phantom detection triggered, found at: ...%s...", context),
}
}
return AssertionResult{
Name: "no_phantom_detection",
Passed: true,
Message: "No phantom detection",
}
}
}
// AssertToolOutputContains checks that a specific tool's output contains a substring
func AssertToolOutputContains(toolName, substring string) Assertion {
return func(result *StepResult) AssertionResult {
for _, tc := range result.ToolCalls {
if tc.Name == toolName {
if strings.Contains(strings.ToLower(tc.Output), strings.ToLower(substring)) {
return AssertionResult{
Name: fmt.Sprintf("tool_output:%s_contains:%s", toolName, truncate(substring, 20)),
Passed: true,
Message: fmt.Sprintf("Tool '%s' output contains '%s'", toolName, substring),
}
}
return AssertionResult{
Name: fmt.Sprintf("tool_output:%s_contains:%s", toolName, truncate(substring, 20)),
Passed: false,
Message: fmt.Sprintf("Tool '%s' output does NOT contain '%s'", toolName, substring),
}
}
}
return AssertionResult{
Name: fmt.Sprintf("tool_output:%s_contains:%s", toolName, truncate(substring, 20)),
Passed: false,
Message: fmt.Sprintf("Tool '%s' was not called", toolName),
}
}
}
// AssertNoError checks that no execution error occurred
func AssertNoError() Assertion {
return func(result *StepResult) AssertionResult {
if result.Error == nil {
return AssertionResult{
Name: "no_error",
Passed: true,
Message: "No execution error",
}
}
return AssertionResult{
Name: "no_error",
Passed: false,
Message: fmt.Sprintf("Execution error: %v", result.Error),
}
}
}
// AssertDurationUnder checks that the step completed within a time limit
func AssertDurationUnder(maxDuration string) Assertion {
return func(result *StepResult) AssertionResult {
// Parse duration - simplified, just handle seconds for now
var maxSec float64
fmt.Sscanf(maxDuration, "%fs", &maxSec)
if maxSec == 0 {
fmt.Sscanf(maxDuration, "%f", &maxSec)
}
actualSec := result.Duration.Seconds()
if actualSec <= maxSec {
return AssertionResult{
Name: fmt.Sprintf("duration_under:%s", maxDuration),
Passed: true,
Message: fmt.Sprintf("Completed in %.1fs (max: %.1fs)", actualSec, maxSec),
}
}
return AssertionResult{
Name: fmt.Sprintf("duration_under:%s", maxDuration),
Passed: false,
Message: fmt.Sprintf("Took %.1fs which exceeds max of %.1fs", actualSec, maxSec),
}
}
}
// AssertToolNotBlocked checks that no tools were blocked
func AssertToolNotBlocked() Assertion {
return func(result *StepResult) AssertionResult {
for _, tc := range result.ToolCalls {
if strings.Contains(tc.Output, `"blocked":true`) ||
strings.Contains(tc.Output, "ROUTING_MISMATCH") ||
strings.Contains(tc.Output, "FSM_BLOCKED") ||
strings.Contains(tc.Output, "READ_ONLY_VIOLATION") {
return AssertionResult{
Name: "tool_not_blocked",
Passed: false,
Message: fmt.Sprintf("Tool '%s' was blocked: %s", tc.Name, truncate(tc.Output, 100)),
}
}
}
return AssertionResult{
Name: "tool_not_blocked",
Passed: true,
Message: "No tools were blocked",
}
}
}
// AssertEventualSuccess checks that at least one tool succeeded (allows intermediate failures)
// This is useful for complex workflows where some tools may be blocked but the model recovers.
func AssertEventualSuccess() Assertion {
return func(result *StepResult) AssertionResult {
successCount := 0
for _, tc := range result.ToolCalls {
if tc.Success {
successCount++
}
}
if successCount > 0 {
return AssertionResult{
Name: "eventual_success",
Passed: true,
Message: fmt.Sprintf("%d/%d tool calls succeeded", successCount, len(result.ToolCalls)),
}
}
return AssertionResult{
Name: "eventual_success",
Passed: false,
Message: "No tool calls succeeded",
}
}
}
// AssertMinToolCalls checks that at least N tools were called
func AssertMinToolCalls(min int) Assertion {
return func(result *StepResult) AssertionResult {
if len(result.ToolCalls) >= min {
return AssertionResult{
Name: fmt.Sprintf("min_tool_calls:%d", min),
Passed: true,
Message: fmt.Sprintf("%d tool calls made (min: %d)", len(result.ToolCalls), min),
}
}
return AssertionResult{
Name: fmt.Sprintf("min_tool_calls:%d", min),
Passed: false,
Message: fmt.Sprintf("Only %d tool calls made (expected at least %d)", len(result.ToolCalls), min),
}
}
}
// AssertHasContent checks that the assistant produced a non-empty response
func AssertHasContent() Assertion {
return func(result *StepResult) AssertionResult {
content := strings.TrimSpace(result.Content)
if len(content) > 50 {
return AssertionResult{
Name: "has_content",
Passed: true,
Message: fmt.Sprintf("Response has %d characters", len(content)),
}
}
return AssertionResult{
Name: "has_content",
Passed: false,
Message: fmt.Sprintf("Response too short or empty (%d chars)", len(content)),
}
}
}
// AssertModelRecovered checks that if any tools were blocked, the model eventually succeeded
// with at least one tool call (indicating recovery from the block)
func AssertModelRecovered() Assertion {
return func(result *StepResult) AssertionResult {
blockedCount := 0
successAfterBlock := false
sawBlock := false
for _, tc := range result.ToolCalls {
if !tc.Success {
blockedCount++
sawBlock = true
} else if sawBlock {
successAfterBlock = true
}
}
if blockedCount == 0 {
return AssertionResult{
Name: "model_recovered",
Passed: true,
Message: "No blocks to recover from",
}
}
if successAfterBlock {
return AssertionResult{
Name: "model_recovered",
Passed: true,
Message: fmt.Sprintf("Model recovered from %d block(s)", blockedCount),
}
}
return AssertionResult{
Name: "model_recovered",
Passed: false,
Message: fmt.Sprintf("Model did not recover from %d block(s)", blockedCount),
}
}
}
// === Helper functions ===
func getToolNames(toolCalls []ToolCallEvent) []string {
names := make([]string, len(toolCalls))
for i, tc := range toolCalls {
names[i] = tc.Name
}
return names
}
func max(a, b int) int {
if a > b {
return a
}
return b
}
func min(a, b int) int {
if a < b {
return a
}
return b
}

416
internal/ai/eval/eval.go Normal file
View File

@@ -0,0 +1,416 @@
// Package eval provides an evaluation framework for testing Pulse Assistant
// behavior end-to-end. It sends prompts to the live API and captures the
// full trace of tool calls, FSM transitions, and responses for verification.
package eval
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"
)
// Config holds eval runner configuration
type Config struct {
BaseURL string // e.g., "http://127.0.0.1:7655"
Username string
Password string
Verbose bool
}
// DefaultConfig returns a config for local development
func DefaultConfig() Config {
return Config{
BaseURL: "http://127.0.0.1:7655",
Username: "admin",
Password: "admin",
Verbose: true,
}
}
// Runner executes eval scenarios against the Pulse API
type Runner struct {
config Config
client *http.Client
}
// NewRunner creates a new eval runner
func NewRunner(config Config) *Runner {
return &Runner{
config: config,
client: &http.Client{
Timeout: 5 * time.Minute, // Long timeout for AI responses
},
}
}
// StepResult captures the result of a single eval step
type StepResult struct {
StepName string
Prompt string
SessionID string
Success bool
Error error
Duration time.Duration
ToolCalls []ToolCallEvent
Content string
RawEvents []SSEEvent
Assertions []AssertionResult
}
// ToolCallEvent represents a tool call captured during execution
type ToolCallEvent struct {
ID string
Name string
Input string
Output string
Success bool
}
// SSEEvent represents a raw SSE event from the stream
type SSEEvent struct {
Type string
Data json.RawMessage
}
// AssertionResult captures the result of a single assertion
type AssertionResult struct {
Name string
Passed bool
Message string
}
// ScenarioResult captures the result of a full scenario
type ScenarioResult struct {
ScenarioName string
Steps []StepResult
Passed bool
Duration time.Duration
}
// Step defines a single step in an eval scenario
type Step struct {
Name string
Prompt string
Assertions []Assertion
}
// Assertion defines a check to run after a step
type Assertion func(result *StepResult) AssertionResult
// Scenario defines a multi-step eval scenario
type Scenario struct {
Name string
Description string
Steps []Step
}
// RunScenario executes a scenario and returns the results
func (r *Runner) RunScenario(scenario Scenario) ScenarioResult {
startTime := time.Now()
result := ScenarioResult{
ScenarioName: scenario.Name,
Passed: true,
}
var sessionID string
for i, step := range scenario.Steps {
if r.config.Verbose {
fmt.Printf("\n=== Step %d: %s ===\n", i+1, step.Name)
fmt.Printf("Prompt: %s\n", step.Prompt)
}
stepResult := r.executeStep(step, sessionID)
// Use session from first step for subsequent steps
if sessionID == "" && stepResult.SessionID != "" {
sessionID = stepResult.SessionID
}
stepResult.SessionID = sessionID
// Run assertions
for _, assertion := range step.Assertions {
assertResult := assertion(&stepResult)
stepResult.Assertions = append(stepResult.Assertions, assertResult)
if !assertResult.Passed {
stepResult.Success = false
result.Passed = false
}
}
if stepResult.Error != nil {
stepResult.Success = false
result.Passed = false
}
if r.config.Verbose {
r.printStepResult(&stepResult)
}
result.Steps = append(result.Steps, stepResult)
// Stop on failure
if !stepResult.Success {
break
}
}
result.Duration = time.Since(startTime)
return result
}
func (r *Runner) executeStep(step Step, sessionID string) StepResult {
startTime := time.Now()
result := StepResult{
StepName: step.Name,
Prompt: step.Prompt,
SessionID: sessionID,
Success: true,
}
// Build request
reqBody := map[string]string{
"prompt": step.Prompt,
}
if sessionID != "" {
reqBody["session_id"] = sessionID
}
bodyBytes, _ := json.Marshal(reqBody)
req, err := http.NewRequest("POST", r.config.BaseURL+"/api/ai/chat", bytes.NewReader(bodyBytes))
if err != nil {
result.Error = fmt.Errorf("failed to create request: %w", err)
result.Success = false
return result
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "text/event-stream")
req.SetBasicAuth(r.config.Username, r.config.Password)
// Execute request
resp, err := r.client.Do(req)
if err != nil {
result.Error = fmt.Errorf("request failed: %w", err)
result.Success = false
return result
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
result.Error = fmt.Errorf("unexpected status %d: %s", resp.StatusCode, string(body))
result.Success = false
return result
}
// Parse SSE stream
result.RawEvents, result.ToolCalls, result.Content, result.SessionID, err = r.parseSSEStream(resp.Body)
if err != nil {
result.Error = fmt.Errorf("failed to parse SSE stream: %w", err)
result.Success = false
return result
}
result.Duration = time.Since(startTime)
return result
}
func (r *Runner) parseSSEStream(body io.Reader) ([]SSEEvent, []ToolCallEvent, string, string, error) {
var events []SSEEvent
var toolCalls []ToolCallEvent
var contentBuilder strings.Builder
var sessionID string
// Track tool calls in progress
toolCallsInProgress := make(map[string]*ToolCallEvent)
scanner := bufio.NewScanner(body)
for scanner.Scan() {
line := scanner.Text()
if !strings.HasPrefix(line, "data: ") {
continue
}
data := strings.TrimPrefix(line, "data: ")
if data == "" {
continue
}
// Parse the event
var event struct {
Type string `json:"type"`
Data json.RawMessage `json:"data"`
}
if err := json.Unmarshal([]byte(data), &event); err != nil {
// Try parsing as raw event data
continue
}
events = append(events, SSEEvent{
Type: event.Type,
Data: event.Data,
})
switch event.Type {
case "session":
var sessionData struct {
ID string `json:"id"`
}
if err := json.Unmarshal(event.Data, &sessionData); err == nil {
sessionID = sessionData.ID
}
case "content":
var contentData struct {
Text string `json:"text"`
}
if err := json.Unmarshal(event.Data, &contentData); err == nil {
contentBuilder.WriteString(contentData.Text)
}
case "tool_start":
var toolData struct {
ID string `json:"id"`
Name string `json:"name"`
Input string `json:"input"`
}
if err := json.Unmarshal(event.Data, &toolData); err == nil {
toolCallsInProgress[toolData.ID] = &ToolCallEvent{
ID: toolData.ID,
Name: toolData.Name,
Input: toolData.Input,
}
}
case "tool_end":
var toolData struct {
ID string `json:"id"`
Name string `json:"name"`
Output string `json:"output"`
Success bool `json:"success"`
}
if err := json.Unmarshal(event.Data, &toolData); err == nil {
if tc, ok := toolCallsInProgress[toolData.ID]; ok {
tc.Output = toolData.Output
tc.Success = toolData.Success
toolCalls = append(toolCalls, *tc)
delete(toolCallsInProgress, toolData.ID)
} else {
// Tool end without start
toolCalls = append(toolCalls, ToolCallEvent{
ID: toolData.ID,
Name: toolData.Name,
Output: toolData.Output,
Success: toolData.Success,
})
}
}
case "error":
var errorData struct {
Message string `json:"message"`
}
if err := json.Unmarshal(event.Data, &errorData); err == nil {
return events, toolCalls, contentBuilder.String(), sessionID, fmt.Errorf("stream error: %s", errorData.Message)
}
}
}
if err := scanner.Err(); err != nil {
return events, toolCalls, contentBuilder.String(), sessionID, err
}
return events, toolCalls, contentBuilder.String(), sessionID, nil
}
func (r *Runner) printStepResult(result *StepResult) {
fmt.Printf("\n--- Result ---\n")
fmt.Printf("Duration: %v\n", result.Duration)
fmt.Printf("Session: %s\n", result.SessionID)
if result.Error != nil {
fmt.Printf("ERROR: %v\n", result.Error)
}
if len(result.ToolCalls) > 0 {
fmt.Printf("\nTool Calls:\n")
for _, tc := range result.ToolCalls {
status := "OK"
if !tc.Success {
status = "FAILED"
}
fmt.Printf(" - %s [%s]: %s\n", tc.Name, status, truncate(tc.Input, 80))
if !tc.Success || r.config.Verbose {
fmt.Printf(" Output: %s\n", truncate(tc.Output, 200))
}
}
}
if result.Content != "" {
fmt.Printf("\nAssistant Response:\n%s\n", truncate(result.Content, 500))
}
if len(result.Assertions) > 0 {
fmt.Printf("\nAssertions:\n")
for _, a := range result.Assertions {
status := "PASS"
if !a.Passed {
status = "FAIL"
}
fmt.Printf(" [%s] %s: %s\n", status, a.Name, a.Message)
}
}
}
func truncate(s string, max int) string {
if len(s) <= max {
return s
}
return s[:max] + "..."
}
// PrintSummary prints a summary of the scenario result
func (r *Runner) PrintSummary(result ScenarioResult) {
fmt.Printf("\n")
fmt.Printf("========================================\n")
fmt.Printf("SCENARIO: %s\n", result.ScenarioName)
fmt.Printf("========================================\n")
fmt.Printf("Duration: %v\n", result.Duration)
passedSteps := 0
for _, step := range result.Steps {
if step.Success {
passedSteps++
}
}
fmt.Printf("Steps: %d/%d passed\n", passedSteps, len(result.Steps))
if result.Passed {
fmt.Printf("Result: PASSED\n")
} else {
fmt.Printf("Result: FAILED\n")
fmt.Printf("\nFailures:\n")
for _, step := range result.Steps {
if !step.Success {
fmt.Printf(" - %s\n", step.StepName)
if step.Error != nil {
fmt.Printf(" Error: %v\n", step.Error)
}
for _, a := range step.Assertions {
if !a.Passed {
fmt.Printf(" Assertion '%s': %s\n", a.Name, a.Message)
}
}
}
}
}
fmt.Printf("========================================\n")
}

View File

@@ -0,0 +1,139 @@
package eval
import (
"flag"
"os"
"testing"
)
var runLiveEval = flag.Bool("live", false, "Run live eval against Pulse API (requires running Pulse)")
func TestMain(m *testing.M) {
flag.Parse()
os.Exit(m.Run())
}
// TestQuickSmokeTest runs a minimal smoke test against the live API
// Run with: go test -v ./internal/ai/eval -run TestQuickSmokeTest -live
func TestQuickSmokeTest(t *testing.T) {
if !*runLiveEval {
t.Skip("Skipping live eval test. Use -live flag to run against live Pulse API")
}
runner := NewRunner(DefaultConfig())
scenario := QuickSmokeTest()
result := runner.RunScenario(scenario)
runner.PrintSummary(result)
if !result.Passed {
t.Fatalf("Scenario '%s' failed", scenario.Name)
}
}
// TestReadOnlyInfrastructure runs the full read-only infrastructure scenario
// Run with: go test -v ./internal/ai/eval -run TestReadOnlyInfrastructure -live
func TestReadOnlyInfrastructure(t *testing.T) {
if !*runLiveEval {
t.Skip("Skipping live eval test. Use -live flag to run against live Pulse API")
}
runner := NewRunner(DefaultConfig())
scenario := ReadOnlyInfrastructureScenario()
result := runner.RunScenario(scenario)
runner.PrintSummary(result)
if !result.Passed {
t.Fatalf("Scenario '%s' failed", scenario.Name)
}
}
// TestRoutingValidation runs the routing validation scenario
// Run with: go test -v ./internal/ai/eval -run TestRoutingValidation -live
func TestRoutingValidation(t *testing.T) {
if !*runLiveEval {
t.Skip("Skipping live eval test. Use -live flag to run against live Pulse API")
}
runner := NewRunner(DefaultConfig())
scenario := RoutingValidationScenario()
result := runner.RunScenario(scenario)
runner.PrintSummary(result)
if !result.Passed {
t.Fatalf("Scenario '%s' failed", scenario.Name)
}
}
// TestLogTailing runs the log tailing scenario
// Run with: go test -v ./internal/ai/eval -run TestLogTailing -live
func TestLogTailing(t *testing.T) {
if !*runLiveEval {
t.Skip("Skipping live eval test. Use -live flag to run against live Pulse API")
}
runner := NewRunner(DefaultConfig())
scenario := LogTailingScenario()
result := runner.RunScenario(scenario)
runner.PrintSummary(result)
if !result.Passed {
t.Fatalf("Scenario '%s' failed", scenario.Name)
}
}
// TestDiscovery runs the infrastructure discovery scenario
// Run with: go test -v ./internal/ai/eval -run TestDiscovery -live
func TestDiscovery(t *testing.T) {
if !*runLiveEval {
t.Skip("Skipping live eval test. Use -live flag to run against live Pulse API")
}
runner := NewRunner(DefaultConfig())
scenario := DiscoveryScenario()
result := runner.RunScenario(scenario)
runner.PrintSummary(result)
if !result.Passed {
t.Fatalf("Scenario '%s' failed", scenario.Name)
}
}
// TestAllScenarios runs all defined scenarios
// Run with: go test -v ./internal/ai/eval -run TestAllScenarios -live
func TestAllScenarios(t *testing.T) {
if !*runLiveEval {
t.Skip("Skipping live eval test. Use -live flag to run against live Pulse API")
}
runner := NewRunner(DefaultConfig())
scenarios := []Scenario{
QuickSmokeTest(),
ReadOnlyInfrastructureScenario(),
RoutingValidationScenario(),
LogTailingScenario(),
DiscoveryScenario(),
}
allPassed := true
for _, scenario := range scenarios {
t.Run(scenario.Name, func(t *testing.T) {
result := runner.RunScenario(scenario)
runner.PrintSummary(result)
if !result.Passed {
allPassed = false
t.Errorf("Scenario '%s' failed", scenario.Name)
}
})
}
if !allPassed {
t.Fatal("One or more scenarios failed")
}
}

View File

@@ -0,0 +1,515 @@
package eval
// ReadOnlyInfrastructureScenario tests basic read-only operations:
// 1. List containers on a node
// 2. Get logs from a container
// 3. Check status of a service
//
// This scenario validates:
// - Tool usage (no phantom execution)
// - Correct routing
// - Bounded streaming (no hanging on log commands)
// - No false positive guardrail blocks
func ReadOnlyInfrastructureScenario() Scenario {
return Scenario{
Name: "Read-Only Infrastructure",
Description: "Tests basic read-only operations against live infrastructure",
Steps: []Step{
{
Name: "List containers",
Prompt: "What containers are running on delly?",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertNoToolErrors(),
AssertNoPhantomDetection(),
AssertToolNotBlocked(),
// Should mention at least one known container
AssertContentContains("homepage"),
},
},
{
Name: "Read logs",
Prompt: "Show me the recent logs from homepage-docker",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertNoToolErrors(),
AssertNoPhantomDetection(),
AssertToolNotBlocked(),
// Should complete without hanging (bounded command)
AssertDurationUnder("60s"),
},
},
{
Name: "Check service status",
Prompt: "What is the current status of the jellyfin container?",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertNoToolErrors(),
AssertNoPhantomDetection(),
AssertToolNotBlocked(),
// Should report some status
AssertContentContains("running"),
},
},
},
}
}
// RoutingValidationScenario tests that the assistant correctly routes commands
// to containers vs their parent hosts.
func RoutingValidationScenario() Scenario {
return Scenario{
Name: "Routing Validation",
Description: "Tests that commands are routed to the correct targets",
Steps: []Step{
{
Name: "Target container by name",
Prompt: "Check the disk usage inside the homepage-docker container",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertNoToolErrors(),
AssertNoPhantomDetection(),
// Should route to the container, not the host
AssertToolNotBlocked(),
},
},
{
Name: "Explicit container context",
Prompt: "Run 'hostname' inside the jellyfin container",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertNoToolErrors(),
AssertNoPhantomDetection(),
AssertToolNotBlocked(),
// Response should include jellyfin's hostname
AssertContentContains("jellyfin"),
},
},
},
}
}
// LogTailingScenario tests that log-related commands use bounded forms
// and don't hang indefinitely.
func LogTailingScenario() Scenario {
return Scenario{
Name: "Log Tailing (Bounded)",
Description: "Tests that log commands use bounded forms and complete",
Steps: []Step{
{
Name: "Tail logs request",
Prompt: "Tail the jellyfin logs",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertNoToolErrors(),
AssertNoPhantomDetection(),
AssertToolNotBlocked(),
// Should complete reasonably fast (bounded command)
AssertDurationUnder("60s"),
},
},
{
Name: "Recent logs request",
Prompt: "Show me the last few docker logs from homepage",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertNoToolErrors(),
AssertNoPhantomDetection(),
AssertToolNotBlocked(),
AssertDurationUnder("60s"),
},
},
},
}
}
// DiscoveryScenario tests infrastructure discovery capabilities
func DiscoveryScenario() Scenario {
return Scenario{
Name: "Infrastructure Discovery",
Description: "Tests ability to discover and describe infrastructure",
Steps: []Step{
{
Name: "List all infrastructure",
Prompt: "What Proxmox nodes do I have and what's running on them?",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertNoToolErrors(),
AssertNoPhantomDetection(),
AssertToolNotBlocked(),
// Should find the known node
AssertContentContains("delly"),
},
},
{
Name: "Describe specific resource",
Prompt: "Tell me about the homepage-docker container",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertNoToolErrors(),
AssertNoPhantomDetection(),
AssertToolNotBlocked(),
},
},
},
}
}
// QuickSmokeTest is a minimal single-step test to verify basic functionality
func QuickSmokeTest() Scenario {
return Scenario{
Name: "Quick Smoke Test",
Description: "Minimal test to verify Pulse Assistant is working",
Steps: []Step{
{
Name: "List infrastructure",
Prompt: "List all my containers",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertNoToolErrors(),
AssertNoPhantomDetection(),
AssertDurationUnder("30s"),
},
},
},
}
}
// TroubleshootingScenario tests a multi-step troubleshooting workflow
// where the assistant must investigate an issue across multiple steps.
// Uses lenient assertions since complex workflows may hit guardrails
// that the model should recover from.
//
// NOTE: NoPhantomDetection assertion is removed from complex scenarios because
// the model may legitimately describe actions it took ("the container is running")
// which can match phantom detection patterns. The fix in agentic.go should prevent
// false positives, but edge cases exist where the model's natural language overlaps
// with detection patterns after a failed recovery attempt.
func TroubleshootingScenario() Scenario {
return Scenario{
Name: "Troubleshooting Investigation",
Description: "Tests multi-step troubleshooting: status check -> logs -> analysis",
Steps: []Step{
{
Name: "Initial complaint",
Prompt: "My home automation seems slow. Can you check the status of my homeassistant container?",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertEventualSuccess(), // Allow intermediate failures if model recovers
AssertHasContent(),
AssertContentContains("homeassistant"),
},
},
{
Name: "Dig into logs",
Prompt: "Can you check the Home Assistant logs for any errors or warnings?",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertEventualSuccess(),
AssertHasContent(),
AssertDurationUnder("90s"),
},
},
{
Name: "Check related services",
Prompt: "What about mqtt and zigbee2mqtt? Are they running okay?",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertEventualSuccess(),
AssertHasContent(),
},
},
{
Name: "Resource comparison",
Prompt: "Which of these containers is using the most CPU and memory?",
Assertions: []Assertion{
AssertNoError(),
AssertHasContent(),
// May not need tools if it remembers from context
},
},
},
}
}
// DeepDiveScenario tests a thorough investigation of a single service
func DeepDiveScenario() Scenario {
return Scenario{
Name: "Deep Dive Investigation",
Description: "Thorough investigation of a single service: status, config, logs, processes",
Steps: []Step{
{
Name: "Get overview",
Prompt: "Check the status and resource usage of my grafana container",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertEventualSuccess(),
AssertHasContent(),
AssertContentContains("grafana"),
},
},
{
Name: "Check running processes",
Prompt: "What processes are running inside the grafana container?",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertEventualSuccess(),
AssertHasContent(),
},
},
{
Name: "Check listening ports",
Prompt: "What ports is grafana listening on inside the container?",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertEventualSuccess(),
AssertHasContent(),
// Grafana typically listens on 3000
AssertContentContains("3000"),
},
},
{
Name: "Recent logs",
Prompt: "Show me the most recent grafana logs, I want to see if there are any errors",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertEventualSuccess(),
AssertHasContent(),
AssertDurationUnder("90s"),
},
},
},
}
}
// ConfigInspectionScenario tests reading configuration files from containers
func ConfigInspectionScenario() Scenario {
return Scenario{
Name: "Configuration Inspection",
Description: "Tests reading and analyzing configuration files from containers",
Steps: []Step{
{
Name: "Find config location",
Prompt: "Where is the configuration file for zigbee2mqtt?",
Assertions: []Assertion{
AssertNoError(),
AssertHasContent(),
// May or may not need tools depending on model knowledge
},
},
{
Name: "Read config file",
Prompt: "Can you read the zigbee2mqtt configuration and tell me what MQTT broker it's connecting to?",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertEventualSuccess(),
AssertHasContent(),
// Should mention mqtt connection details
AssertContentContains("mqtt"),
},
},
{
Name: "Verify connectivity",
Prompt: "Is the mqtt container actually running and accessible?",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertEventualSuccess(),
AssertHasContent(),
},
},
},
}
}
// ResourceAnalysisScenario tests the assistant's ability to gather and compare
// resource metrics across multiple containers
func ResourceAnalysisScenario() Scenario {
return Scenario{
Name: "Resource Analysis",
Description: "Tests gathering and comparing resource usage across containers",
Steps: []Step{
{
Name: "Find heavy hitters",
Prompt: "Which of my containers are using the most resources? Show me the top 5 by CPU and memory.",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertEventualSuccess(),
AssertHasContent(),
},
},
{
Name: "Investigate top consumer",
Prompt: "Tell me more about the one using the most memory. What's it doing?",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertEventualSuccess(),
AssertHasContent(),
},
},
{
Name: "Check for issues",
Prompt: "Check the logs for that container - are there any memory-related warnings or errors?",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertEventualSuccess(),
AssertHasContent(),
AssertDurationUnder("90s"),
},
},
},
}
}
// MultiNodeScenario tests operations across multiple Proxmox nodes
func MultiNodeScenario() Scenario {
return Scenario{
Name: "Multi-Node Operations",
Description: "Tests ability to work across multiple Proxmox nodes",
Steps: []Step{
{
Name: "List all nodes",
Prompt: "What Proxmox nodes do I have and are they all healthy?",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertEventualSuccess(),
AssertHasContent(),
},
},
{
Name: "Compare nodes",
Prompt: "Compare the resource usage between my nodes. Which one has the most headroom?",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertEventualSuccess(),
AssertHasContent(),
},
},
{
Name: "Cross-node query",
Prompt: "Show me all running containers across all nodes, sorted by memory usage",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertEventualSuccess(),
AssertHasContent(),
},
},
},
}
}
// DockerInDockerScenario tests operations on Docker containers running inside LXCs
func DockerInDockerScenario() Scenario {
return Scenario{
Name: "Docker-in-LXC Operations",
Description: "Tests operations on Docker containers running inside LXC containers",
Steps: []Step{
{
Name: "List Docker containers",
Prompt: "What Docker containers are running inside homepage-docker?",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertEventualSuccess(),
AssertHasContent(),
},
},
{
Name: "Docker container logs",
Prompt: "Show me the logs from the homepage Docker container",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertEventualSuccess(),
AssertHasContent(),
AssertDurationUnder("90s"),
},
},
{
Name: "Docker resource usage",
Prompt: "How much CPU and memory is the homepage Docker container using?",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertEventualSuccess(),
AssertHasContent(),
},
},
},
}
}
// ContextChainScenario tests the assistant's ability to maintain context
// across multiple related questions
func ContextChainScenario() Scenario {
return Scenario{
Name: "Context Chain",
Description: "Tests context retention across a chain of related questions",
Steps: []Step{
{
Name: "Initial query",
Prompt: "Check the status of frigate",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertEventualSuccess(),
AssertHasContent(),
AssertContentContains("frigate"),
},
},
{
Name: "Follow-up (implicit reference)",
Prompt: "What's its IP address?",
Assertions: []Assertion{
AssertNoError(),
AssertHasContent(),
// Should understand "its" refers to frigate
},
},
{
Name: "Another follow-up",
Prompt: "Show me the frigate logs",
Assertions: []Assertion{
AssertNoError(),
AssertAnyToolUsed(),
AssertEventualSuccess(),
AssertHasContent(),
AssertDurationUnder("90s"),
},
},
{
Name: "Deep follow-up",
Prompt: "Are there any errors in there?",
Assertions: []Assertion{
AssertNoError(),
AssertHasContent(),
// Should analyze the logs from previous step
},
},
},
}
}