fix: ship alerting hotfixes and prepare 5.1.4

This commit is contained in:
rcourtman
2026-02-07 22:05:55 +00:00
parent 3d0082c07e
commit d1e61d8a8a
8 changed files with 367 additions and 30 deletions

View File

@@ -1 +1 @@
5.1.3
5.1.4

View File

@@ -543,7 +543,7 @@ export const HistoryChart: Component<HistoryChartProps> = (props) => {
}`}
title={source() === 'live' ? 'Live sample shown because history is not available yet.' : 'In-memory buffer shown while history is warming up.'}
>
{source() === 'live' ? 'Live' : 'Memory'}
{source() === 'live' ? 'Live' : 'Buffer'}
</span>
</Show>
</div>

View File

@@ -534,7 +534,7 @@ export const UnifiedHistoryChart: Component<UnifiedHistoryChartProps> = (props)
}`}
title={source() === 'live' ? 'Live sample shown because history is not available yet.' : 'In-memory buffer shown while history is warming up.'}
>
{source() === 'live' ? 'Live' : 'Memory'}
{source() === 'live' ? 'Live' : 'Buffer'}
</span>
</Show>
<div class="flex items-center gap-2">

View File

@@ -9036,6 +9036,12 @@ func (m *Manager) checkEscalations() {
m.mu.Lock()
defer m.mu.Unlock()
// Respect global alert and activation controls before escalating.
// Escalations should never bypass a user disabling alerts.
if !m.config.Enabled || m.config.ActivationState != ActivationActive {
return
}
if !m.config.Schedule.Escalation.Enabled {
return
}

View File

@@ -9919,6 +9919,7 @@ func TestCheckEscalations(t *testing.T) {
oldTime := time.Now().Add(-2 * time.Hour)
m.mu.Lock()
m.config.ActivationState = ActivationActive
m.config.Schedule.Escalation.Enabled = false
m.config.Schedule.Escalation.Levels = []EscalationLevel{
{After: 30, Notify: "email"},
@@ -9941,12 +9942,100 @@ func TestCheckEscalations(t *testing.T) {
}
})
t.Run("does nothing when alerts are globally disabled", func(t *testing.T) {
m := newTestManager(t)
oldTime := time.Now().Add(-2 * time.Hour)
m.mu.Lock()
m.config.Enabled = false
m.config.ActivationState = ActivationActive
m.config.Schedule.Escalation.Enabled = true
m.config.Schedule.Escalation.Levels = []EscalationLevel{
{After: 30, Notify: "email"},
}
m.activeAlerts["global-disabled-alert"] = &Alert{
ID: "global-disabled-alert",
StartTime: oldTime,
LastEscalation: 0,
}
m.mu.Unlock()
m.checkEscalations()
m.mu.RLock()
alert := m.activeAlerts["global-disabled-alert"]
m.mu.RUnlock()
if alert.LastEscalation != 0 {
t.Errorf("expected no escalation when alerts are globally disabled, got %d", alert.LastEscalation)
}
})
t.Run("does nothing when activation state is pending", func(t *testing.T) {
m := newTestManager(t)
oldTime := time.Now().Add(-2 * time.Hour)
m.mu.Lock()
m.config.Enabled = true
m.config.ActivationState = ActivationPending
m.config.Schedule.Escalation.Enabled = true
m.config.Schedule.Escalation.Levels = []EscalationLevel{
{After: 30, Notify: "email"},
}
m.activeAlerts["pending-alert"] = &Alert{
ID: "pending-alert",
StartTime: oldTime,
LastEscalation: 0,
}
m.mu.Unlock()
m.checkEscalations()
m.mu.RLock()
alert := m.activeAlerts["pending-alert"]
m.mu.RUnlock()
if alert.LastEscalation != 0 {
t.Errorf("expected no escalation when activation is pending, got %d", alert.LastEscalation)
}
})
t.Run("does nothing when activation state is snoozed", func(t *testing.T) {
m := newTestManager(t)
oldTime := time.Now().Add(-2 * time.Hour)
m.mu.Lock()
m.config.Enabled = true
m.config.ActivationState = ActivationSnoozed
m.config.Schedule.Escalation.Enabled = true
m.config.Schedule.Escalation.Levels = []EscalationLevel{
{After: 30, Notify: "email"},
}
m.activeAlerts["snoozed-alert"] = &Alert{
ID: "snoozed-alert",
StartTime: oldTime,
LastEscalation: 0,
}
m.mu.Unlock()
m.checkEscalations()
m.mu.RLock()
alert := m.activeAlerts["snoozed-alert"]
m.mu.RUnlock()
if alert.LastEscalation != 0 {
t.Errorf("expected no escalation when activation is snoozed, got %d", alert.LastEscalation)
}
})
t.Run("skips acknowledged alerts", func(t *testing.T) {
// t.Parallel()
m := newTestManager(t)
oldTime := time.Now().Add(-2 * time.Hour)
m.mu.Lock()
m.config.ActivationState = ActivationActive
m.config.Schedule.Escalation.Enabled = true
m.config.Schedule.Escalation.Levels = []EscalationLevel{
{After: 30, Notify: "email"},
@@ -9976,6 +10065,7 @@ func TestCheckEscalations(t *testing.T) {
oldTime := time.Now().Add(-45 * time.Minute) // 45 minutes ago
m.mu.Lock()
m.config.ActivationState = ActivationActive
m.config.Schedule.Escalation.Enabled = true
m.config.Schedule.Escalation.Levels = []EscalationLevel{
{After: 30, Notify: "email"}, // 30 minutes
@@ -10008,6 +10098,7 @@ func TestCheckEscalations(t *testing.T) {
oldTime := time.Now().Add(-90 * time.Minute) // 90 minutes ago
m.mu.Lock()
m.config.ActivationState = ActivationActive
m.config.Schedule.Escalation.Enabled = true
m.config.Schedule.Escalation.Levels = []EscalationLevel{
{After: 30, Notify: "email"}, // 30 minutes
@@ -10040,6 +10131,7 @@ func TestCheckEscalations(t *testing.T) {
oldTime := time.Now().Add(-45 * time.Minute)
m.mu.Lock()
m.config.ActivationState = ActivationActive
m.config.Schedule.Escalation.Enabled = true
m.config.Schedule.Escalation.Levels = []EscalationLevel{
{After: 30, Notify: "email"},
@@ -10072,6 +10164,7 @@ func TestCheckEscalations(t *testing.T) {
recentTime := time.Now().Add(-10 * time.Minute) // Only 10 minutes ago
m.mu.Lock()
m.config.ActivationState = ActivationActive
m.config.Schedule.Escalation.Enabled = true
m.config.Schedule.Escalation.Levels = []EscalationLevel{
{After: 30, Notify: "email"}, // 30 minutes threshold

View File

@@ -1,6 +1,9 @@
package monitoring
import "testing"
import (
"errors"
"testing"
)
func TestShouldPreserveBackups(t *testing.T) {
t.Parallel()
@@ -71,34 +74,46 @@ func TestShouldPreservePBSBackups(t *testing.T) {
t.Parallel()
tests := []struct {
name string
datastoreCount int
datastoreFetches int
want bool
name string
datastoreCount int
datastoreFetches int
datastoreTerminalFailure int
want bool
}{
{
name: "all datastores failed",
datastoreCount: 3,
datastoreFetches: 0,
want: true,
name: "all datastores failed transiently",
datastoreCount: 3,
datastoreFetches: 0,
datastoreTerminalFailure: 0,
want: true,
},
{
name: "no datastores skips preservation",
datastoreCount: 0,
datastoreFetches: 0,
want: false,
name: "all datastores failed with terminal errors",
datastoreCount: 3,
datastoreFetches: 0,
datastoreTerminalFailure: 3,
want: false,
},
{
name: "some datastores succeeded",
datastoreCount: 3,
datastoreFetches: 2,
want: false,
name: "no datastores skips preservation",
datastoreCount: 0,
datastoreFetches: 0,
datastoreTerminalFailure: 0,
want: false,
},
{
name: "all datastores succeeded",
datastoreCount: 3,
datastoreFetches: 3,
want: false,
name: "some datastores succeeded",
datastoreCount: 3,
datastoreFetches: 2,
datastoreTerminalFailure: 0,
want: false,
},
{
name: "all datastores succeeded",
datastoreCount: 3,
datastoreFetches: 3,
datastoreTerminalFailure: 0,
want: false,
},
}
@@ -106,9 +121,56 @@ func TestShouldPreservePBSBackups(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
got := shouldPreservePBSBackups(tt.datastoreCount, tt.datastoreFetches)
got := shouldPreservePBSBackupsWithTerminal(tt.datastoreCount, tt.datastoreFetches, tt.datastoreTerminalFailure)
if got != tt.want {
t.Fatalf("shouldPreservePBSBackups() = %v, want %v", got, tt.want)
t.Fatalf("shouldPreservePBSBackupsWithTerminal() = %v, want %v", got, tt.want)
}
})
}
}
func TestShouldReuseCachedPBSBackups(t *testing.T) {
t.Parallel()
tests := []struct {
name string
err error
want bool
}{
{
name: "404 datastore missing should not reuse cache",
err: errors.New("API error 404: datastore 'archive' does not exist"),
want: false,
},
{
name: "400 namespace missing should not reuse cache",
err: errors.New("API error 400: namespace '/old' not found"),
want: false,
},
{
name: "400 invalid backup group should not reuse cache",
err: errors.New("API error 400: invalid backup group"),
want: false,
},
{
name: "500 server error should reuse cache",
err: errors.New("API error 500: internal server error"),
want: true,
},
{
name: "timeout should reuse cache",
err: errors.New("Get \"https://pbs/api2/json\": context deadline exceeded"),
want: true,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
got := shouldReuseCachedPBSBackups(tt.err)
if got != tt.want {
t.Fatalf("shouldReuseCachedPBSBackups() = %v, want %v", got, tt.want)
}
})
}

View File

@@ -9437,13 +9437,50 @@ func shouldPreserveBackups(nodeCount int, hadSuccessfulNode bool, storagesWithBa
}
func shouldPreservePBSBackups(datastoreCount, datastoreFetches int) bool {
return shouldPreservePBSBackupsWithTerminal(datastoreCount, datastoreFetches, 0)
}
func shouldPreservePBSBackupsWithTerminal(datastoreCount, datastoreFetches, datastoreTerminalFailures int) bool {
// If there are datastores but all fetches failed, preserve existing backups
if datastoreCount > 0 && datastoreFetches == 0 {
if datastoreCount > 0 && datastoreFetches == 0 && datastoreTerminalFailures == 0 {
return true
}
return false
}
func shouldReuseCachedPBSBackups(err error) bool {
if err == nil {
return false
}
msg := strings.ToLower(err.Error())
if strings.Contains(msg, "api error 404") || strings.Contains(msg, "status 404") {
return false
}
if strings.Contains(msg, "api error 400") || strings.Contains(msg, "status 400") {
if strings.Contains(msg, "datastore") ||
strings.Contains(msg, "namespace") ||
strings.Contains(msg, "backup group") ||
strings.Contains(msg, "not found") ||
strings.Contains(msg, "does not exist") ||
strings.Contains(msg, "invalid") {
return false
}
}
if strings.Contains(msg, "does not exist") && (strings.Contains(msg, "datastore") || strings.Contains(msg, "namespace")) {
return false
}
if strings.Contains(msg, "not found") && (strings.Contains(msg, "datastore") || strings.Contains(msg, "namespace") || strings.Contains(msg, "backup")) {
return false
}
return true
}
func storageNamesForNode(instanceName, nodeName string, snapshot models.StateSnapshot) []string {
if nodeName == "" {
return nil
@@ -10230,7 +10267,8 @@ func (m *Monitor) pollPBSBackups(ctx context.Context, instanceName string, clien
var allBackups []models.PBSBackup
datastoreCount := len(datastores) // Number of datastores to query
datastoreFetches := 0 // Number of successful datastore fetches
datastoreErrors := 0 // Number of failed datastore fetches
datastoreErrors := 0 // Number of transiently failed datastore fetches
datastoreTerminalFailures := 0 // Number of terminal datastore failures
// Process each datastore
for _, ds := range datastores {
@@ -10251,6 +10289,7 @@ func (m *Monitor) pollPBSBackups(ctx context.Context, instanceName string, clien
Msg("Processing datastore namespaces")
datastoreHadSuccess := false
datastoreHadTerminalFailure := false
groupsReused := 0
groupsRequested := 0
@@ -10264,6 +10303,17 @@ func (m *Monitor) pollPBSBackups(ctx context.Context, instanceName string, clien
groups, err := client.ListBackupGroups(ctx, ds.Name, namespace)
if err != nil {
if !shouldReuseCachedPBSBackups(err) {
datastoreHadTerminalFailure = true
log.Warn().
Err(err).
Str("instance", instanceName).
Str("datastore", ds.Name).
Str("namespace", namespace).
Msg("PBS backup groups returned terminal error; stale cache will be dropped for this datastore")
continue
}
log.Error().
Err(err).
Str("instance", instanceName).
@@ -10353,6 +10403,15 @@ func (m *Monitor) pollPBSBackups(ctx context.Context, instanceName string, clien
Int("groups_refreshed", groupsRequested).
Msg("PBS datastore processed")
} else {
if datastoreHadTerminalFailure {
datastoreTerminalFailures++
log.Warn().
Str("instance", instanceName).
Str("datastore", ds.Name).
Msg("Skipping cached PBS backups due to terminal datastore errors")
continue
}
// Preserve cached data for this datastore if we couldn't fetch anything new.
log.Warn().
Str("instance", instanceName).
@@ -10374,7 +10433,7 @@ func (m *Monitor) pollPBSBackups(ctx context.Context, instanceName string, clien
Msg("PBS backups fetched")
// Decide whether to keep existing backups when all queries failed
if shouldPreservePBSBackups(datastoreCount, datastoreFetches) {
if shouldPreservePBSBackupsWithTerminal(datastoreCount, datastoreFetches, datastoreTerminalFailures) {
log.Warn().
Str("instance", instanceName).
Int("datastores", datastoreCount).
@@ -10523,8 +10582,16 @@ func (m *Monitor) fetchPBSBackupSnapshots(ctx context.Context, client *pbs.Clien
Str("id", req.group.BackupID).
Msg("Failed to list PBS backup snapshots")
if len(req.cached.snapshots) > 0 {
if len(req.cached.snapshots) > 0 && shouldReuseCachedPBSBackups(err) {
results <- req.cached.snapshots
} else if len(req.cached.snapshots) > 0 {
log.Warn().
Str("instance", instanceName).
Str("datastore", req.datastore).
Str("namespace", req.namespace).
Str("type", req.group.BackupType).
Str("id", req.group.BackupID).
Msg("Discarding cached PBS snapshots due to terminal API error")
}
return
}

View File

@@ -0,0 +1,109 @@
package monitoring
import (
"context"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/rcourtman/pulse-go-rewrite/internal/models"
"github.com/rcourtman/pulse-go-rewrite/pkg/pbs"
)
func TestPollPBSBackups_DropsStaleCacheOnTerminalDatastoreError(t *testing.T) {
t.Parallel()
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.Contains(r.URL.Path, "/admin/datastore/archive/groups") {
http.Error(w, `{"errors":"datastore does not exist"}`, http.StatusNotFound)
return
}
http.NotFound(w, r)
}))
defer server.Close()
client, err := pbs.NewClient(pbs.ClientConfig{
Host: server.URL,
TokenName: "root@pam!token",
TokenValue: "secret",
})
if err != nil {
t.Fatalf("failed to create PBS client: %v", err)
}
m := &Monitor{state: models.NewState()}
m.state.UpdatePBSBackups("pbs1", []models.PBSBackup{
{
ID: "pbs-pbs1-archive--vm-100-1700000000",
Instance: "pbs1",
Datastore: "archive",
Namespace: "",
BackupType: "vm",
VMID: "100",
BackupTime: time.Unix(1700000000, 0),
},
})
m.pollPBSBackups(context.Background(), "pbs1", client, []models.PBSDatastore{
{Name: "archive"},
})
snapshot := m.state.GetSnapshot()
for _, backup := range snapshot.PBSBackups {
if backup.Instance == "pbs1" {
t.Fatalf("expected stale backups to be removed after terminal error, found: %+v", backup)
}
}
}
func TestPollPBSBackups_PreservesCacheOnTransientDatastoreError(t *testing.T) {
t.Parallel()
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.Contains(r.URL.Path, "/admin/datastore/archive/groups") {
http.Error(w, `{"errors":"temporary server issue"}`, http.StatusInternalServerError)
return
}
http.NotFound(w, r)
}))
defer server.Close()
client, err := pbs.NewClient(pbs.ClientConfig{
Host: server.URL,
TokenName: "root@pam!token",
TokenValue: "secret",
})
if err != nil {
t.Fatalf("failed to create PBS client: %v", err)
}
m := &Monitor{state: models.NewState()}
original := models.PBSBackup{
ID: "pbs-pbs1-archive--vm-100-1700000000",
Instance: "pbs1",
Datastore: "archive",
Namespace: "",
BackupType: "vm",
VMID: "100",
BackupTime: time.Unix(1700000000, 0),
}
m.state.UpdatePBSBackups("pbs1", []models.PBSBackup{original})
m.pollPBSBackups(context.Background(), "pbs1", client, []models.PBSDatastore{
{Name: "archive"},
})
snapshot := m.state.GetSnapshot()
var found bool
for _, backup := range snapshot.PBSBackups {
if backup.Instance == "pbs1" && backup.ID == original.ID {
found = true
break
}
}
if !found {
t.Fatal("expected cached backup to be preserved on transient error")
}
}