Files
Pulse/pkg/reporting/engine_integration_test.go
rcourtman 6427f28a08 Fix stale metrics store reference in reporting engine after monitor reload
The reporting engine held a direct pointer to the metrics store, which
becomes invalid after a monitor reload (settings change, node config
save, etc.) closes and recreates the store. Use a dynamic getter closure
that always resolves to the current monitor's active store.

Also adds diagnostic logging when report queries return zero metrics,
and integration tests covering the full metrics-to-report pipeline
including reload scenarios.

Fixes #1186
2026-02-04 12:34:40 +00:00

389 lines
11 KiB
Go

package reporting
import (
"path/filepath"
"strings"
"testing"
"time"
"github.com/rcourtman/pulse-go-rewrite/pkg/metrics"
)
// TestReportEngineWithMetricsStore verifies the full data pipeline from
// metrics store writes through report generation. This is a regression test
// for the zero-datapoints bug reported in GitHub issue #1186.
func TestReportEngineWithMetricsStore(t *testing.T) {
dir := t.TempDir()
store, err := metrics.NewStore(metrics.StoreConfig{
DBPath: filepath.Join(dir, "metrics.db"),
WriteBufferSize: 10,
FlushInterval: 100 * time.Millisecond,
RetentionRaw: 24 * time.Hour,
RetentionMinute: 7 * 24 * time.Hour,
RetentionHourly: 30 * 24 * time.Hour,
RetentionDaily: 90 * 24 * time.Hour,
})
if err != nil {
t.Fatalf("failed to create metrics store: %v", err)
}
defer store.Close()
engine := NewReportEngine(EngineConfig{MetricsStore: store})
// Write metrics data for a node over the last hour
now := time.Now()
nodeID := "pve-prod-pve1"
for i := 0; i < 12; i++ {
ts := now.Add(time.Duration(-60+i*5) * time.Minute)
store.Write("node", nodeID, "cpu", float64(40+i*2), ts)
store.Write("node", nodeID, "memory", float64(55+i), ts)
}
// Flush to ensure data is written to SQLite
store.Flush()
// Brief pause to let the background writer process
time.Sleep(200 * time.Millisecond)
// Generate a report for the last 2 hours (uses raw tier)
req := MetricReportRequest{
ResourceType: "node",
ResourceID: nodeID,
Start: now.Add(-2 * time.Hour),
End: now.Add(time.Minute), // slight buffer
Format: FormatCSV,
}
data, contentType, err := engine.Generate(req)
if err != nil {
t.Fatalf("report generation failed: %v", err)
}
if contentType != "text/csv" {
t.Errorf("expected text/csv, got %s", contentType)
}
csv := string(data)
// Verify report has data points (CSV uses display names)
if !strings.Contains(csv, "CPU") {
t.Error("report missing CPU metric data")
}
if !strings.Contains(csv, "Memory") {
t.Error("report missing memory metric data")
}
// Verify the report header includes the correct resource ID
if !strings.Contains(csv, nodeID) {
t.Errorf("report missing resource ID %s", nodeID)
}
// Verify data rows exist (not just headers)
lines := strings.Split(csv, "\n")
dataStarted := false
dataRows := 0
for _, line := range lines {
if strings.HasPrefix(line, "# DATA") {
dataStarted = true
continue
}
if dataStarted && !strings.HasPrefix(line, "#") && !strings.HasPrefix(line, "Timestamp") && line != "" {
dataRows++
}
}
if dataRows == 0 {
t.Fatal("report has zero data rows — metrics store data did not reach report output")
}
t.Logf("Report generated with %d data rows", dataRows)
}
// TestReportEngineWithMetricsStore_VM verifies reports work for VM resources
// with composite IDs (instance:node:vmid format).
func TestReportEngineWithMetricsStore_VM(t *testing.T) {
dir := t.TempDir()
store, err := metrics.NewStore(metrics.StoreConfig{
DBPath: filepath.Join(dir, "metrics.db"),
WriteBufferSize: 10,
FlushInterval: 100 * time.Millisecond,
RetentionRaw: 24 * time.Hour,
RetentionMinute: 7 * 24 * time.Hour,
RetentionHourly: 30 * 24 * time.Hour,
RetentionDaily: 90 * 24 * time.Hour,
})
if err != nil {
t.Fatalf("failed to create metrics store: %v", err)
}
defer store.Close()
engine := NewReportEngine(EngineConfig{MetricsStore: store})
now := time.Now()
vmID := "pve-prod:pve1:100"
for i := 0; i < 6; i++ {
ts := now.Add(time.Duration(-30+i*5) * time.Minute)
store.Write("vm", vmID, "cpu", float64(30+i*5), ts)
store.Write("vm", vmID, "memory", float64(60+i*2), ts)
store.Write("vm", vmID, "disk", float64(40+i), ts)
}
store.Flush()
time.Sleep(200 * time.Millisecond)
req := MetricReportRequest{
ResourceType: "vm",
ResourceID: vmID,
Start: now.Add(-1 * time.Hour),
End: now.Add(time.Minute),
Format: FormatPDF,
}
data, contentType, err := engine.Generate(req)
if err != nil {
t.Fatalf("PDF report generation failed: %v", err)
}
if contentType != "application/pdf" {
t.Errorf("expected application/pdf, got %s", contentType)
}
if len(data) < 1000 {
t.Errorf("PDF seems too small (%d bytes), likely has no data", len(data))
}
}
// TestReportEngineStaleStoreAfterClose verifies that querying a closed
// metrics store returns an error rather than silently returning zero results.
// This documents the behavior that causes blank reports after a monitor reload.
func TestReportEngineStaleStoreAfterClose(t *testing.T) {
dir := t.TempDir()
store, err := metrics.NewStore(metrics.StoreConfig{
DBPath: filepath.Join(dir, "metrics.db"),
WriteBufferSize: 10,
FlushInterval: 100 * time.Millisecond,
RetentionRaw: 24 * time.Hour,
RetentionMinute: 7 * 24 * time.Hour,
RetentionHourly: 30 * 24 * time.Hour,
RetentionDaily: 90 * 24 * time.Hour,
})
if err != nil {
t.Fatalf("failed to create metrics store: %v", err)
}
engine := NewReportEngine(EngineConfig{MetricsStore: store})
// Write some data
now := time.Now()
store.Write("node", "test-node", "cpu", 50.0, now)
store.Flush()
time.Sleep(200 * time.Millisecond)
// Close the store (simulates what happens during monitor reload)
store.Close()
// Attempt to generate a report with the stale store reference
req := MetricReportRequest{
ResourceType: "node",
ResourceID: "test-node",
Start: now.Add(-1 * time.Hour),
End: now.Add(time.Minute),
Format: FormatCSV,
}
_, _, err = engine.Generate(req)
// After a store is closed, the engine should return an error (not silent empty results)
if err == nil {
t.Log("Note: querying a closed store did not return an error — this means blank reports could occur silently after monitor reload")
} else {
t.Logf("Querying closed store returned error as expected: %v", err)
}
}
// TestReportEngineReplacedStore verifies that replacing the global engine
// with a new one pointing to a fresh store produces reports with data.
func TestReportEngineReplacedStore(t *testing.T) {
dir1 := t.TempDir()
store1, err := metrics.NewStore(metrics.StoreConfig{
DBPath: filepath.Join(dir1, "metrics.db"),
WriteBufferSize: 10,
FlushInterval: 100 * time.Millisecond,
RetentionRaw: 24 * time.Hour,
RetentionMinute: 7 * 24 * time.Hour,
RetentionHourly: 30 * 24 * time.Hour,
RetentionDaily: 90 * 24 * time.Hour,
})
if err != nil {
t.Fatalf("failed to create store1: %v", err)
}
engine1 := NewReportEngine(EngineConfig{MetricsStore: store1})
SetEngine(engine1)
t.Cleanup(func() { SetEngine(nil) })
now := time.Now()
store1.Write("node", "node-1", "cpu", 50.0, now)
store1.Flush()
time.Sleep(200 * time.Millisecond)
// Simulate reload: close old store, create new one (same DB file)
store1.Close()
store2, err := metrics.NewStore(metrics.StoreConfig{
DBPath: filepath.Join(dir1, "metrics.db"), // Same file
WriteBufferSize: 10,
FlushInterval: 100 * time.Millisecond,
RetentionRaw: 24 * time.Hour,
RetentionMinute: 7 * 24 * time.Hour,
RetentionHourly: 30 * 24 * time.Hour,
RetentionDaily: 90 * 24 * time.Hour,
})
if err != nil {
t.Fatalf("failed to create store2: %v", err)
}
defer store2.Close()
// Write additional data via new store
store2.Write("node", "node-1", "cpu", 55.0, now.Add(5*time.Minute))
store2.Flush()
time.Sleep(200 * time.Millisecond)
// Replace global engine with new store
engine2 := NewReportEngine(EngineConfig{MetricsStore: store2})
SetEngine(engine2)
req := MetricReportRequest{
ResourceType: "node",
ResourceID: "node-1",
Start: now.Add(-1 * time.Hour),
End: now.Add(1 * time.Hour),
Format: FormatCSV,
}
data, _, err := engine2.Generate(req)
if err != nil {
t.Fatalf("report generation failed after store replacement: %v", err)
}
csv := string(data)
lines := strings.Split(csv, "\n")
dataStarted := false
dataRows := 0
for _, line := range lines {
if strings.HasPrefix(line, "# DATA") {
dataStarted = true
continue
}
if dataStarted && !strings.HasPrefix(line, "#") && !strings.HasPrefix(line, "Timestamp") && line != "" {
dataRows++
}
}
if dataRows == 0 {
t.Fatal("report has zero data rows after store replacement — fix did not work")
}
t.Logf("After store replacement: report generated with %d data rows", dataRows)
}
// TestReportEngineGetterSurvivesReload verifies that an engine configured with
// MetricsStoreGetter automatically picks up a new store after the old one is
// closed, without needing to recreate the engine. This is the key behavior
// that prevents stale store references after monitor reloads.
func TestReportEngineGetterSurvivesReload(t *testing.T) {
dir := t.TempDir()
dbPath := filepath.Join(dir, "metrics.db")
newStore := func() *metrics.Store {
s, err := metrics.NewStore(metrics.StoreConfig{
DBPath: dbPath,
WriteBufferSize: 10,
FlushInterval: 100 * time.Millisecond,
RetentionRaw: 24 * time.Hour,
RetentionMinute: 7 * 24 * time.Hour,
RetentionHourly: 30 * 24 * time.Hour,
RetentionDaily: 90 * 24 * time.Hour,
})
if err != nil {
t.Fatalf("failed to create store: %v", err)
}
return s
}
// Simulate what reloadableMonitor.GetMonitor().GetMetricsStore() does:
// returns whatever the current store is.
var currentStore *metrics.Store
currentStore = newStore()
engine := NewReportEngine(EngineConfig{
MetricsStoreGetter: func() *metrics.Store {
return currentStore
},
})
now := time.Now()
req := MetricReportRequest{
ResourceType: "node",
ResourceID: "node-1",
Start: now.Add(-1 * time.Hour),
End: now.Add(1 * time.Hour),
Format: FormatCSV,
}
// Phase 1: Write data and generate a report — should work
currentStore.Write("node", "node-1", "cpu", 50.0, now)
currentStore.Flush()
time.Sleep(200 * time.Millisecond)
data, _, err := engine.Generate(req)
if err != nil {
t.Fatalf("phase 1: report generation failed: %v", err)
}
rows1 := countCSVDataRows(string(data))
if rows1 == 0 {
t.Fatal("phase 1: expected data rows, got zero")
}
t.Logf("Phase 1 (before reload): %d data rows", rows1)
// Phase 2: Simulate reload — close old store, create new one
currentStore.Close()
currentStore = newStore()
// Write new data through the new store
currentStore.Write("node", "node-1", "cpu", 60.0, now.Add(5*time.Minute))
currentStore.Write("node", "node-1", "cpu", 70.0, now.Add(10*time.Minute))
currentStore.Flush()
time.Sleep(200 * time.Millisecond)
// Phase 3: Same engine instance, should use the new store via getter
data, _, err = engine.Generate(req)
if err != nil {
t.Fatalf("phase 3: report generation failed after reload: %v", err)
}
rows3 := countCSVDataRows(string(data))
if rows3 == 0 {
t.Fatal("phase 3: report has zero data rows after reload — getter did not resolve new store")
}
t.Logf("Phase 3 (after reload): %d data rows", rows3)
// The new store should see ALL data (original + new) since it opens the same SQLite file
if rows3 < rows1 {
t.Errorf("phase 3: expected at least %d rows (pre-reload count), got %d", rows1, rows3)
}
currentStore.Close()
}
func countCSVDataRows(csv string) int {
lines := strings.Split(csv, "\n")
dataStarted := false
count := 0
for _, line := range lines {
if strings.HasPrefix(line, "# DATA") {
dataStarted = true
continue
}
if dataStarted && !strings.HasPrefix(line, "#") && !strings.HasPrefix(line, "Timestamp") && line != "" {
count++
}
}
return count
}