feat: implement staleness tracker for adaptive polling (Phase 2 Task 4)

Adds freshness metadata tracking for all monitored instances:
- StalenessTracker with per-instance last success/error/mutation timestamps
- Change hash detection using SHA1 for detecting data mutations
- Normalized staleness scoring (0-1 scale) based on age vs maxStale
- Integration with PollMetrics for authoritative last-success data
- Wired into all poll functions (PVE/PBS/PMG) via UpdateSuccess/UpdateError
- Connected to scheduler as StalenessSource implementation

Task 4 of 10 complete. Ready for adaptive interval logic.
This commit is contained in:
rcourtman
2025-10-20 11:14:56 +00:00
parent 57429900a6
commit c7d1abf874
5 changed files with 293 additions and 27 deletions

View File

@@ -1,4 +1,5 @@
import { createSignal, Show, For, createMemo, createEffect, onMount, onCleanup } from 'solid-js';
import { usePersistentSignal } from '@/hooks/usePersistentSignal';
import type { JSX } from 'solid-js';
import { EmailProviderSelect } from '@/components/Alerts/EmailProviderSelect';
import { WebhookConfig } from '@/components/Alerts/WebhookConfig';
@@ -3236,11 +3237,19 @@ function HistoryTab() {
const { state, activeAlerts } = useWebSocket();
// Filter states with localStorage persistence
const [timeFilter, setTimeFilter] = createSignal(
localStorage.getItem('alertHistoryTimeFilter') || '7d',
const [timeFilter, setTimeFilter] = usePersistentSignal<'24h' | '7d' | '30d' | 'all'>(
'alertHistoryTimeFilter',
'7d',
{
deserialize: (raw) => (raw === '24h' || raw === '7d' || raw === '30d' || raw === 'all' ? raw : '7d'),
},
);
const [severityFilter, setSeverityFilter] = createSignal(
localStorage.getItem('alertHistorySeverityFilter') || 'all',
const [severityFilter, setSeverityFilter] = usePersistentSignal<'all' | 'warning' | 'critical'>(
'alertHistorySeverityFilter',
'all',
{
deserialize: (raw) => (raw === 'warning' || raw === 'critical' ? raw : 'all'),
},
);
const [searchTerm, setSearchTerm] = createSignal('');
const [alertHistory, setAlertHistory] = createSignal<Alert[]>([]);
@@ -3304,13 +3313,6 @@ function HistoryTab() {
let searchInputRef: HTMLInputElement | undefined;
// Persist filter changes to localStorage
createEffect(() => {
localStorage.setItem('alertHistoryTimeFilter', timeFilter());
});
createEffect(() => {
localStorage.setItem('alertHistorySeverityFilter', severityFilter());
});
// Clear chart selection when high-level filters change
let lastTimeFilterValue: string | null = null;

View File

@@ -229,9 +229,9 @@ func (pm *PollMetrics) decrementPending() {
}
func (pm *PollMetrics) storeLastSuccess(instanceType, instance string, ts time.Time) {
pm.mu.Lock()
pm.lastSuccessByKey[pm.key(instanceType, instance)] = ts
pm.mu.Unlock()
pm.mu.Lock()
pm.lastSuccessByKey[pm.key(instanceType, instance)] = ts
pm.mu.Unlock()
}
func (pm *PollMetrics) lastSuccessFor(instanceType, instance string) (time.Time, bool) {

View File

@@ -257,6 +257,7 @@ type Monitor struct {
pmgClients map[string]*pmg.Client
pollMetrics *PollMetrics
scheduler *AdaptiveScheduler
stalenessTracker *StalenessTracker
tempCollector *TemperatureCollector // SSH-based temperature collector
mu sync.RWMutex
startTime time.Time
@@ -1314,13 +1315,16 @@ func New(cfg *config.Config) (*Monitor, error) {
// Security warning if running in container with SSH temperature monitoring
checkContainerizedTempMonitoring()
stalenessTracker := NewStalenessTracker(getPollMetrics())
stalenessTracker.SetBounds(cfg.AdaptivePollingBaseInterval, cfg.AdaptivePollingMaxInterval)
var scheduler *AdaptiveScheduler
if cfg.AdaptivePollingEnabled {
scheduler = NewAdaptiveScheduler(SchedulerConfig{
BaseInterval: cfg.AdaptivePollingBaseInterval,
MinInterval: cfg.AdaptivePollingMinInterval,
MaxInterval: cfg.AdaptivePollingMaxInterval,
}, nil, nil, nil)
}, stalenessTracker, nil, nil)
}
m := &Monitor{
@@ -1331,6 +1335,7 @@ func New(cfg *config.Config) (*Monitor, error) {
pmgClients: make(map[string]*pmg.Client),
pollMetrics: getPollMetrics(),
scheduler: scheduler,
stalenessTracker: stalenessTracker,
tempCollector: tempCollector,
startTime: time.Now(),
rateTracker: NewRateTracker(),
@@ -2196,6 +2201,15 @@ func (m *Monitor) pollPVEInstance(ctx context.Context, instanceName string, clie
})
}()
}
if m.stalenessTracker != nil {
defer func() {
if pollErr == nil {
m.stalenessTracker.UpdateSuccess(InstanceTypePVE, instanceName, nil)
} else {
m.stalenessTracker.UpdateError(InstanceTypePVE, instanceName)
}
}()
}
// Check if context is cancelled
select {
@@ -3941,9 +3955,36 @@ func (m *Monitor) pollBackupTasks(ctx context.Context, instanceName string, clie
// pollPBSInstance polls a single PBS instance
func (m *Monitor) pollPBSInstance(ctx context.Context, instanceName string, client *pbs.Client) {
start := time.Now()
var pollErr error
if m.pollMetrics != nil {
m.pollMetrics.IncInFlight("pbs")
defer m.pollMetrics.DecInFlight("pbs")
defer func() {
m.pollMetrics.RecordResult(PollResult{
InstanceName: instanceName,
InstanceType: "pbs",
Success: pollErr == nil,
Error: pollErr,
StartTime: start,
EndTime: time.Now(),
})
}()
}
if m.stalenessTracker != nil {
defer func() {
if pollErr == nil {
m.stalenessTracker.UpdateSuccess(InstanceTypePBS, instanceName, nil)
} else {
m.stalenessTracker.UpdateError(InstanceTypePBS, instanceName)
}
}()
}
// Check if context is cancelled
select {
case <-ctx.Done():
pollErr = ctx.Err()
log.Debug().Str("instance", instanceName).Msg("Polling cancelled")
return
default:
@@ -4240,6 +4281,15 @@ func (m *Monitor) pollPMGInstance(ctx context.Context, instanceName string, clie
})
}()
}
if m.stalenessTracker != nil {
defer func() {
if pollErr == nil {
m.stalenessTracker.UpdateSuccess(InstanceTypePMG, instanceName, nil)
} else {
m.stalenessTracker.UpdateError(InstanceTypePMG, instanceName)
}
}()
}
select {
case <-ctx.Done():

View File

@@ -41,6 +41,13 @@ func (m *Monitor) describeInstancesForScheduler() []InstanceDescriptor {
desc.LastInterval = last.Interval
}
}
if m.stalenessTracker != nil {
if snap, ok := m.stalenessTracker.snapshot(InstanceTypePVE, name); ok {
desc.LastSuccess = snap.LastSuccess
desc.LastFailure = snap.LastError
desc.Metadata = map[string]any{"changeHash": snap.ChangeHash}
}
}
descriptors = append(descriptors, desc)
}
}
@@ -56,14 +63,21 @@ func (m *Monitor) describeInstancesForScheduler() []InstanceDescriptor {
Name: name,
Type: InstanceTypePBS,
}
if m.scheduler != nil {
if last, ok := m.scheduler.LastScheduled(InstanceTypePBS, name); ok {
desc.LastScheduled = last.NextRun
desc.LastInterval = last.Interval
}
if m.scheduler != nil {
if last, ok := m.scheduler.LastScheduled(InstanceTypePBS, name); ok {
desc.LastScheduled = last.NextRun
desc.LastInterval = last.Interval
}
descriptors = append(descriptors, desc)
}
if m.stalenessTracker != nil {
if snap, ok := m.stalenessTracker.snapshot(InstanceTypePBS, name); ok {
desc.LastSuccess = snap.LastSuccess
desc.LastFailure = snap.LastError
desc.Metadata = map[string]any{"changeHash": snap.ChangeHash}
}
}
descriptors = append(descriptors, desc)
}
}
if len(m.pmgClients) > 0 {
@@ -77,14 +91,21 @@ func (m *Monitor) describeInstancesForScheduler() []InstanceDescriptor {
Name: name,
Type: InstanceTypePMG,
}
if m.scheduler != nil {
if last, ok := m.scheduler.LastScheduled(InstanceTypePMG, name); ok {
desc.LastScheduled = last.NextRun
desc.LastInterval = last.Interval
}
if m.scheduler != nil {
if last, ok := m.scheduler.LastScheduled(InstanceTypePMG, name); ok {
desc.LastScheduled = last.NextRun
desc.LastInterval = last.Interval
}
descriptors = append(descriptors, desc)
}
if m.stalenessTracker != nil {
if snap, ok := m.stalenessTracker.snapshot(InstanceTypePMG, name); ok {
desc.LastSuccess = snap.LastSuccess
desc.LastFailure = snap.LastError
desc.Metadata = map[string]any{"changeHash": snap.ChangeHash}
}
}
descriptors = append(descriptors, desc)
}
}
return descriptors

View File

@@ -0,0 +1,193 @@
package monitoring
import (
"crypto/sha1"
"encoding/hex"
"sync"
"time"
)
// FreshnessSnapshot captures the most recent freshness metadata available for a target instance.
type FreshnessSnapshot struct {
InstanceType InstanceType
Instance string
LastSuccess time.Time
LastError time.Time
LastMutated time.Time
ChangeHash string
}
// StalenessTracker maintains freshness metadata and exposes normalized staleness scores.
type StalenessTracker struct {
mu sync.RWMutex
entries map[string]FreshnessSnapshot
baseTTL time.Duration
maxStale time.Duration
metrics *PollMetrics
}
// NewStalenessTracker builds a tracker wired to poll metrics for last-success signal and using default parameters.
func NewStalenessTracker(metrics *PollMetrics) *StalenessTracker {
return &StalenessTracker{
entries: make(map[string]FreshnessSnapshot),
baseTTL: 10 * time.Second,
maxStale: 5 * time.Minute,
metrics: metrics,
}
}
// SetBounds allows overriding score decay windows.
func (t *StalenessTracker) SetBounds(baseTTL, maxStale time.Duration) {
t.mu.Lock()
defer t.mu.Unlock()
if baseTTL > 0 {
t.baseTTL = baseTTL
}
if maxStale > 0 {
t.maxStale = maxStale
}
}
// UpdateSuccess records a successful poll along with a change hash derived from the payload.
func (t *StalenessTracker) UpdateSuccess(instanceType InstanceType, instance string, payload []byte) {
if t == nil {
return
}
now := time.Now()
snapshot := FreshnessSnapshot{
InstanceType: instanceType,
Instance: instance,
LastSuccess: now,
}
if len(payload) > 0 {
sum := sha1.Sum(payload)
snapshot.ChangeHash = hex.EncodeToString(sum[:])
snapshot.LastMutated = now
}
t.setSnapshot(snapshot)
}
// UpdateError records the most recent error time for a target instance.
func (t *StalenessTracker) UpdateError(instanceType InstanceType, instance string) {
if t == nil {
return
}
snapshot := FreshnessSnapshot{
InstanceType: instanceType,
Instance: instance,
LastError: time.Now(),
}
t.mergeSnapshot(snapshot)
}
// SetChangeHash updates the change fingerprint without affecting success timestamps.
func (t *StalenessTracker) SetChangeHash(instanceType InstanceType, instance string, payload []byte) {
if t == nil || len(payload) == 0 {
return
}
now := time.Now()
sum := sha1.Sum(payload)
hash := hex.EncodeToString(sum[:])
t.mu.Lock()
defer t.mu.Unlock()
key := trackerKey(instanceType, instance)
snap := t.entries[key]
snap.InstanceType = instanceType
snap.Instance = instance
snap.ChangeHash = hash
snap.LastMutated = now
t.entries[key] = snap
}
// StalenessScore implements the StalenessSource interface and returns a normalized value in [0,1].
func (t *StalenessTracker) StalenessScore(instanceType InstanceType, instance string) (float64, bool) {
if t == nil {
return 0, false
}
snap, ok := t.snapshot(instanceType, instance)
if !ok {
return 0, false
}
if !snap.LastSuccess.IsZero() && t.metrics != nil {
if ts, ok := t.metrics.lastSuccessFor(string(instanceType), instance); ok {
snap.LastSuccess = ts
}
}
if snap.LastSuccess.IsZero() {
return 1, true
}
now := time.Now()
age := now.Sub(snap.LastSuccess)
if age <= 0 {
return 0, true
}
max := t.maxStale
if max <= 0 {
max = 5 * time.Minute
}
score := age.Seconds() / max.Seconds()
if score > 1 {
score = 1
}
if score < 0 {
score = 0
}
return score, true
}
func (t *StalenessTracker) setSnapshot(snapshot FreshnessSnapshot) {
t.mu.Lock()
defer t.mu.Unlock()
key := trackerKey(snapshot.InstanceType, snapshot.Instance)
t.entries[key] = snapshot
}
func (t *StalenessTracker) mergeSnapshot(snapshot FreshnessSnapshot) {
t.mu.Lock()
defer t.mu.Unlock()
key := trackerKey(snapshot.InstanceType, snapshot.Instance)
existing := t.entries[key]
if snapshot.LastSuccess.After(existing.LastSuccess) {
existing.LastSuccess = snapshot.LastSuccess
}
if snapshot.LastError.After(existing.LastError) {
existing.LastError = snapshot.LastError
}
if snapshot.LastMutated.After(existing.LastMutated) {
existing.LastMutated = snapshot.LastMutated
}
if snapshot.ChangeHash != "" {
existing.ChangeHash = snapshot.ChangeHash
}
existing.InstanceType = snapshot.InstanceType
existing.Instance = snapshot.Instance
t.entries[key] = existing
}
func (t *StalenessTracker) snapshot(instanceType InstanceType, instance string) (FreshnessSnapshot, bool) {
t.mu.RLock()
defer t.mu.RUnlock()
snap, ok := t.entries[trackerKey(instanceType, instance)]
return snap, ok
}
func trackerKey(instanceType InstanceType, instance string) string {
return string(instanceType) + "::" + instance
}