fix(patrol): address multiple issues in patrol service

- Add missing KubernetesChecked field to persistence (data was being lost)
- Fix Duration field to properly convert between ms and nanoseconds
- Add automatic cleanup of stale stream subscribers (memory leak fix)
- Add error tracking for findings persistence with callback support
- Add GetPersistenceStatus() and SetOnSaveError() methods
- Add tests for new error tracking functionality
This commit is contained in:
rcourtman
2026-01-02 12:45:00 +00:00
parent 3e6ebd593c
commit 3029cce172
6 changed files with 206 additions and 58 deletions

View File

@@ -28,4 +28,4 @@ You agree to indemnify and hold harmless the Author from and against all claims,
This Agreement shall be governed by and construed in accordance with the laws of England and Wales. Any legal action or proceeding related to this Agreement shall be brought exclusively in the courts of England and Wales.
---
*Pulse is a product by rcourtman.*
*Pulse is a product of Pulse Monitoring Ltd (Company No. 16939106, registered in England and Wales).*

View File

@@ -122,6 +122,10 @@ type FindingsStore struct {
saveTimer *time.Timer
savePending bool
saveDebounce time.Duration
// Error tracking for persistence failures
lastSaveError error // Last error from save operation
onSaveError func(err error) // Optional callback for save errors
lastSaveTime time.Time // Last successful save time
}
// NewFindingsStore creates a new findings store
@@ -188,12 +192,25 @@ func (s *FindingsStore) scheduleSave() {
findingsCopy[id] = &copy
}
persistence := s.persistence
onError := s.onSaveError
s.mu.Unlock()
if persistence != nil {
if err := persistence.SaveFindings(findingsCopy); err != nil {
// Log error but don't fail - persistence is best-effort
// (log import would create circular dep, so we silently fail)
// Track the error for visibility
s.mu.Lock()
s.lastSaveError = err
s.mu.Unlock()
// Call error callback if set
if onError != nil {
onError(err)
}
} else {
// Clear error and update timestamp on success
s.mu.Lock()
s.lastSaveError = nil
s.lastSaveTime = time.Now()
s.mu.Unlock()
}
}
})
@@ -225,6 +242,24 @@ func (s *FindingsStore) ForceSave() error {
return nil
}
// SetOnSaveError sets a callback function that will be called when a save operation fails.
// This allows external code (e.g., logging) to be notified of persistence errors.
func (s *FindingsStore) SetOnSaveError(callback func(err error)) {
s.mu.Lock()
defer s.mu.Unlock()
s.onSaveError = callback
}
// GetPersistenceStatus returns the current persistence state:
// - lastError: the most recent save error, or nil if last save succeeded
// - lastSaveTime: when findings were last successfully saved
// - hasPersistence: whether a persistence layer is configured
func (s *FindingsStore) GetPersistenceStatus() (lastError error, lastSaveTime time.Time, hasPersistence bool) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.lastSaveError, s.lastSaveTime, s.persistence != nil
}
// Add adds or updates a finding
// If a finding with the same ID exists, it updates LastSeenAt and increments TimesRaised
// If the finding is suppressed or dismissed, it may be skipped

View File

@@ -143,6 +143,102 @@ func TestFindingsStore_scheduleSave_SaveError(t *testing.T) {
case <-time.After(200 * time.Millisecond):
t.Fatal("timed out waiting for SaveFindings")
}
// Wait for the error to be recorded
time.Sleep(50 * time.Millisecond)
// Check that error is tracked
lastErr, _, _ := store.GetPersistenceStatus()
if lastErr == nil {
t.Error("expected lastSaveError to be set after save failure")
}
}
func TestFindingsStore_SetOnSaveError(t *testing.T) {
store := NewFindingsStore()
store.saveDebounce = 5 * time.Millisecond
saved := make(chan map[string]*Finding, 1)
errReceived := make(chan error, 1)
store.persistence = &recordingPersistence{
saveErr: errors.New("callback test error"),
saved: saved,
}
// Set error callback
store.SetOnSaveError(func(err error) {
errReceived <- err
})
store.findings["real-1"] = &Finding{ID: "real-1", Title: "Real"}
store.scheduleSave()
select {
case <-saved:
case <-time.After(200 * time.Millisecond):
t.Fatal("timed out waiting for SaveFindings")
}
select {
case err := <-errReceived:
if err == nil {
t.Error("expected error to be passed to callback")
}
if err.Error() != "callback test error" {
t.Errorf("expected 'callback test error', got %v", err)
}
case <-time.After(200 * time.Millisecond):
t.Fatal("timed out waiting for error callback")
}
}
func TestFindingsStore_GetPersistenceStatus_NoPersistence(t *testing.T) {
store := NewFindingsStore()
lastErr, lastSaveTime, hasPersistence := store.GetPersistenceStatus()
if lastErr != nil {
t.Error("expected no error when no persistence")
}
if !lastSaveTime.IsZero() {
t.Error("expected zero time when no persistence")
}
if hasPersistence {
t.Error("expected hasPersistence to be false")
}
}
func TestFindingsStore_GetPersistenceStatus_WithPersistence(t *testing.T) {
store := NewFindingsStore()
store.saveDebounce = 5 * time.Millisecond
saved := make(chan map[string]*Finding, 1)
store.persistence = &recordingPersistence{saved: saved}
_, _, hasPersistence := store.GetPersistenceStatus()
if !hasPersistence {
t.Error("expected hasPersistence to be true")
}
// Trigger a successful save
store.findings["real-1"] = &Finding{ID: "real-1", Title: "Real"}
store.scheduleSave()
select {
case <-saved:
case <-time.After(200 * time.Millisecond):
t.Fatal("timed out waiting for SaveFindings")
}
// Wait for status to be updated
time.Sleep(50 * time.Millisecond)
lastErr, lastSaveTime, _ := store.GetPersistenceStatus()
if lastErr != nil {
t.Errorf("expected no error after successful save, got %v", lastErr)
}
if lastSaveTime.IsZero() {
t.Error("expected lastSaveTime to be set after successful save")
}
}
func TestFindingsStore_ForceSave_NoPersistenceStopsTimer(t *testing.T) {

View File

@@ -625,17 +625,31 @@ func (p *PatrolService) UnsubscribeFromStream(ch chan PatrolStreamEvent) {
}
// broadcast sends an event to all subscribers
// Subscribers with full channels are automatically removed to prevent memory leaks
func (p *PatrolService) broadcast(event PatrolStreamEvent) {
p.streamMu.RLock()
defer p.streamMu.RUnlock()
p.streamMu.Lock()
defer p.streamMu.Unlock()
var staleChannels []chan PatrolStreamEvent
for ch := range p.streamSubscribers {
select {
case ch <- event:
// Successfully sent
default:
// Channel full, skip (don't block on slow consumers)
// Channel full - mark for removal (likely dead subscriber)
staleChannels = append(staleChannels, ch)
}
}
// Clean up stale subscribers
for _, ch := range staleChannels {
delete(p.streamSubscribers, ch)
// Close in a goroutine to avoid blocking if receiver is stuck
go func(c chan PatrolStreamEvent) {
defer func() { recover() }() // Ignore panic if already closed
close(c)
}(ch)
}
}
// appendStreamContent adds content to the current output and broadcasts it

View File

@@ -31,29 +31,30 @@ func (a *PatrolHistoryPersistenceAdapter) SavePatrolRunHistory(runs []PatrolRunR
records := make([]config.PatrolRunRecord, len(runs))
for i, r := range runs {
records[i] = config.PatrolRunRecord{
ID: r.ID,
StartedAt: r.StartedAt,
CompletedAt: r.CompletedAt,
DurationMs: int64(r.Duration),
Type: r.Type,
ResourcesChecked: r.ResourcesChecked,
NodesChecked: r.NodesChecked,
GuestsChecked: r.GuestsChecked,
DockerChecked: r.DockerChecked,
StorageChecked: r.StorageChecked,
HostsChecked: r.HostsChecked,
PBSChecked: r.PBSChecked,
NewFindings: r.NewFindings,
ExistingFindings: r.ExistingFindings,
ResolvedFindings: r.ResolvedFindings,
AutoFixCount: r.AutoFixCount,
FindingsSummary: r.FindingsSummary,
FindingIDs: r.FindingIDs,
ErrorCount: r.ErrorCount,
Status: r.Status,
AIAnalysis: r.AIAnalysis,
InputTokens: r.InputTokens,
OutputTokens: r.OutputTokens,
ID: r.ID,
StartedAt: r.StartedAt,
CompletedAt: r.CompletedAt,
DurationMs: int64(r.Duration / time.Millisecond), // Convert nanoseconds to milliseconds
Type: r.Type,
ResourcesChecked: r.ResourcesChecked,
NodesChecked: r.NodesChecked,
GuestsChecked: r.GuestsChecked,
DockerChecked: r.DockerChecked,
StorageChecked: r.StorageChecked,
HostsChecked: r.HostsChecked,
PBSChecked: r.PBSChecked,
KubernetesChecked: r.KubernetesChecked,
NewFindings: r.NewFindings,
ExistingFindings: r.ExistingFindings,
ResolvedFindings: r.ResolvedFindings,
AutoFixCount: r.AutoFixCount,
FindingsSummary: r.FindingsSummary,
FindingIDs: r.FindingIDs,
ErrorCount: r.ErrorCount,
Status: r.Status,
AIAnalysis: r.AIAnalysis,
InputTokens: r.InputTokens,
OutputTokens: r.OutputTokens,
}
}
return a.config.SavePatrolRunHistory(records)
@@ -70,29 +71,30 @@ func (a *PatrolHistoryPersistenceAdapter) LoadPatrolRunHistory() ([]PatrolRunRec
runs := make([]PatrolRunRecord, len(data.Runs))
for i, r := range data.Runs {
runs[i] = PatrolRunRecord{
ID: r.ID,
StartedAt: r.StartedAt,
CompletedAt: r.CompletedAt,
Duration: time.Duration(r.DurationMs),
Type: r.Type,
ResourcesChecked: r.ResourcesChecked,
NodesChecked: r.NodesChecked,
GuestsChecked: r.GuestsChecked,
DockerChecked: r.DockerChecked,
StorageChecked: r.StorageChecked,
HostsChecked: r.HostsChecked,
PBSChecked: r.PBSChecked,
NewFindings: r.NewFindings,
ExistingFindings: r.ExistingFindings,
ResolvedFindings: r.ResolvedFindings,
AutoFixCount: r.AutoFixCount,
FindingsSummary: r.FindingsSummary,
FindingIDs: r.FindingIDs,
ErrorCount: r.ErrorCount,
Status: r.Status,
AIAnalysis: r.AIAnalysis,
InputTokens: r.InputTokens,
OutputTokens: r.OutputTokens,
ID: r.ID,
StartedAt: r.StartedAt,
CompletedAt: r.CompletedAt,
Duration: time.Duration(r.DurationMs) * time.Millisecond, // Convert milliseconds to nanoseconds
Type: r.Type,
ResourcesChecked: r.ResourcesChecked,
NodesChecked: r.NodesChecked,
GuestsChecked: r.GuestsChecked,
DockerChecked: r.DockerChecked,
StorageChecked: r.StorageChecked,
HostsChecked: r.HostsChecked,
PBSChecked: r.PBSChecked,
KubernetesChecked: r.KubernetesChecked,
NewFindings: r.NewFindings,
ExistingFindings: r.ExistingFindings,
ResolvedFindings: r.ResolvedFindings,
AutoFixCount: r.AutoFixCount,
FindingsSummary: r.FindingsSummary,
FindingIDs: r.FindingIDs,
ErrorCount: r.ErrorCount,
Status: r.Status,
AIAnalysis: r.AIAnalysis,
InputTokens: r.InputTokens,
OutputTokens: r.OutputTokens,
}
}
return runs, nil

View File

@@ -1631,12 +1631,13 @@ type PatrolRunRecord struct {
Type string `json:"type"` // "quick" or "deep"
ResourcesChecked int `json:"resources_checked"`
// Breakdown by resource type
NodesChecked int `json:"nodes_checked"`
GuestsChecked int `json:"guests_checked"`
DockerChecked int `json:"docker_checked"`
StorageChecked int `json:"storage_checked"`
HostsChecked int `json:"hosts_checked"`
PBSChecked int `json:"pbs_checked"`
NodesChecked int `json:"nodes_checked"`
GuestsChecked int `json:"guests_checked"`
DockerChecked int `json:"docker_checked"`
StorageChecked int `json:"storage_checked"`
HostsChecked int `json:"hosts_checked"`
PBSChecked int `json:"pbs_checked"`
KubernetesChecked int `json:"kubernetes_checked"`
// Findings from this run
NewFindings int `json:"new_findings"`
ExistingFindings int `json:"existing_findings"`