Add recovery notifications and grouping controls

This commit is contained in:
rcourtman
2025-11-21 22:07:00 +00:00
parent 3b85436c0f
commit 255357d2fe
7 changed files with 660 additions and 58 deletions

View File

@@ -280,11 +280,13 @@ export const createDefaultCooldown = (): CooldownConfig => ({
export const createDefaultGrouping = (): GroupingConfig => ({
enabled: true,
window: 5,
window: 1,
byNode: true,
byGuest: false,
});
export const createDefaultResolveNotifications = (): boolean => true;
const createDefaultAppriseConfig = (): UIAppriseConfig => ({
enabled: false,
mode: 'cli',
@@ -519,6 +521,9 @@ const [appriseConfig, setAppriseConfig] = createSignal<UIAppriseConfig>(
const [scheduleEscalation, setScheduleEscalation] =
createSignal<EscalationConfig>(createDefaultEscalation());
const [notifyOnResolve, setNotifyOnResolve] =
createSignal<boolean>(createDefaultResolveNotifications());
// Set up destinationsRef.emailConfig function immediately
destinationsRef.emailConfig = () => {
const config = emailConfig();
@@ -1082,28 +1087,36 @@ const [appriseConfig, setAppriseConfig] = createSignal<UIAppriseConfig>(
});
}
if (config.schedule.grouping) {
if (config.schedule.grouping || config.schedule.groupingWindow !== undefined) {
const groupingConfig = config.schedule.grouping;
const rawGroupingWindowSeconds =
typeof groupingConfig?.window === 'number'
? groupingConfig.window
: typeof config.schedule.groupingWindow === 'number'
? config.schedule.groupingWindow
: 30;
const normalizedGroupingWindowSeconds = Math.max(0, rawGroupingWindowSeconds);
const groupingWindowMinutes = Math.round(normalizedGroupingWindowSeconds / 60);
setScheduleGrouping({
enabled: config.schedule.grouping.enabled || false,
window: Math.floor((config.schedule.grouping.window || 300) / 60),
enabled:
groupingConfig?.enabled !== undefined
? Boolean(groupingConfig.enabled)
: normalizedGroupingWindowSeconds > 0,
window: groupingWindowMinutes,
byNode:
config.schedule.grouping.byNode !== undefined
? config.schedule.grouping.byNode
: true,
groupingConfig?.byNode !== undefined ? groupingConfig.byNode : true,
byGuest:
config.schedule.grouping.byGuest !== undefined
? config.schedule.grouping.byGuest
: false,
});
} else if (config.schedule.groupingWindow !== undefined) {
setScheduleGrouping({
enabled: config.schedule.groupingWindow > 0,
window: Math.floor(config.schedule.groupingWindow / 60),
byNode: true,
byGuest: false,
groupingConfig?.byGuest !== undefined ? groupingConfig.byGuest : false,
});
}
if (config.schedule.notifyOnResolve !== undefined) {
setNotifyOnResolve(Boolean(config.schedule.notifyOnResolve));
} else {
setNotifyOnResolve(createDefaultResolveNotifications());
}
if (config.schedule.escalation) {
const rawLevels = config.schedule.escalation.levels || [];
const levels = rawLevels.map((level) => ({
@@ -1555,6 +1568,12 @@ const [appriseConfig, setAppriseConfig] = createSignal<UIAppriseConfig>(
? clampCooldownMinutes(scheduleCooldown().minutes)
: 0;
const normalizedMaxAlertsHour = clampMaxAlertsPerHour(scheduleCooldown().maxAlerts);
const groupingState = scheduleGrouping();
const groupingWindowSeconds =
groupingState.enabled && groupingState.window >= 0
? groupingState.window * 60
: 0;
const groupingEnabled = groupingState.enabled && groupingWindowSeconds > 0;
const alertConfig = {
enabled: true,
@@ -1636,17 +1655,15 @@ const [appriseConfig, setAppriseConfig] = createSignal<UIAppriseConfig>(
schedule: {
quietHours: scheduleQuietHours(),
cooldown: normalizedCooldownMinutes,
groupingWindow:
scheduleGrouping().enabled && scheduleGrouping().window
? scheduleGrouping().window * 60
: 30, // Convert minutes to seconds
groupingWindow: groupingWindowSeconds,
notifyOnResolve: notifyOnResolve(),
maxAlertsHour: normalizedMaxAlertsHour,
escalation: scheduleEscalation(),
grouping: {
enabled: scheduleGrouping().enabled,
window: scheduleGrouping().window * 60, // Convert minutes to seconds
byNode: scheduleGrouping().byNode,
byGuest: scheduleGrouping().byGuest,
enabled: groupingEnabled,
window: groupingWindowSeconds, // Convert minutes to seconds
byNode: groupingState.byNode,
byGuest: groupingState.byGuest,
},
},
// Add missing required fields
@@ -1951,6 +1968,8 @@ const [appriseConfig, setAppriseConfig] = createSignal<UIAppriseConfig>(
setCooldown={setScheduleCooldown}
grouping={scheduleGrouping}
setGrouping={setScheduleGrouping}
notifyOnResolve={notifyOnResolve}
setNotifyOnResolve={setNotifyOnResolve}
escalation={scheduleEscalation}
setEscalation={setScheduleEscalation}
/>
@@ -3042,6 +3061,8 @@ interface ScheduleTabProps {
setCooldown: (value: CooldownConfig) => void;
grouping: () => GroupingConfig;
setGrouping: (value: GroupingConfig) => void;
notifyOnResolve: () => boolean;
setNotifyOnResolve: (value: boolean) => void;
escalation: () => EscalationConfig;
setEscalation: (value: EscalationConfig) => void;
}
@@ -3054,12 +3075,15 @@ function ScheduleTab(props: ScheduleTabProps) {
const setCooldown = props.setCooldown;
const grouping = props.grouping;
const setGrouping = props.setGrouping;
const notifyOnResolve = props.notifyOnResolve;
const setNotifyOnResolve = props.setNotifyOnResolve;
const escalation = props.escalation;
const setEscalation = props.setEscalation;
const resetToDefaults = () => {
setQuietHours(createDefaultQuietHours());
setCooldown(createDefaultCooldown());
setGrouping(createDefaultGrouping());
setNotifyOnResolve(createDefaultResolveNotifications());
setEscalation(createDefaultEscalation());
props.setHasUnsavedChanges(true);
};
@@ -3526,7 +3550,7 @@ function ScheduleTab(props: ScheduleTabProps) {
<div class="flex items-center gap-3">
<input
type="range"
min="1"
min="0"
max="30"
value={grouping().window}
onChange={(e) => {
@@ -3540,7 +3564,7 @@ function ScheduleTab(props: ScheduleTabProps) {
</div>
</div>
<p class={`${formHelpText} mt-1`}>
Alerts within this window are grouped together.
Alerts within this window are grouped together. Set to 0 to send immediately.
</p>
</div>
@@ -3634,6 +3658,32 @@ function ScheduleTab(props: ScheduleTabProps) {
</Show>
</SettingsPanel>
{/* Recovery notifications */}
<SettingsPanel
title="Recovery notifications"
description="Send a follow-up when an alert returns to normal."
action={
<Toggle
checked={notifyOnResolve()}
onChange={(e) => {
setNotifyOnResolve(e.currentTarget.checked);
props.setHasUnsavedChanges(true);
}}
containerClass="sm:self-start"
label={
<span class="text-xs font-medium text-gray-600 dark:text-gray-400">
{notifyOnResolve() ? 'Enabled' : 'Disabled'}
</span>
}
/>
}
class="space-y-3"
>
<p class={formHelpText}>
Sends on the same channels as live alerts to confirm when a condition clears.
</p>
</SettingsPanel>
{/* Escalation Rules */}
<SettingsPanel
title="Alert escalation"
@@ -3811,6 +3861,9 @@ function ScheduleTab(props: ScheduleTabProps) {
</Show>
</p>
</Show>
<Show when={notifyOnResolve()}>
<p> Recovery notifications enabled when alerts clear</p>
</Show>
<Show when={escalation().enabled && escalation().levels.length > 0}>
<p>
{escalation().levels.length} escalation level

View File

@@ -185,6 +185,7 @@ export interface AlertConfig {
cooldown?: number;
groupingWindow?: number;
maxAlertsHour?: number;
notifyOnResolve?: boolean;
grouping?: {
enabled: boolean;
window: number;

View File

@@ -218,12 +218,13 @@ type GroupingConfig struct {
// ScheduleConfig represents alerting schedule configuration
type ScheduleConfig struct {
QuietHours QuietHours `json:"quietHours"`
Cooldown int `json:"cooldown"` // minutes
GroupingWindow int `json:"groupingWindow"` // seconds (deprecated, use Grouping.Window)
MaxAlertsHour int `json:"maxAlertsHour"` // max alerts per hour per resource
Escalation EscalationConfig `json:"escalation"`
Grouping GroupingConfig `json:"grouping"`
QuietHours QuietHours `json:"quietHours"`
Cooldown int `json:"cooldown"` // minutes
GroupingWindow int `json:"groupingWindow"` // seconds (deprecated, use Grouping.Window)
MaxAlertsHour int `json:"maxAlertsHour"` // max alerts per hour per resource
NotifyOnResolve bool `json:"notifyOnResolve"` // Send notification when alert clears
Escalation EscalationConfig `json:"escalation"`
Grouping GroupingConfig `json:"grouping"`
}
// FilterCondition represents a single filter condition
@@ -604,9 +605,10 @@ func NewManager() *Manager {
},
Suppress: QuietHoursSuppression{},
},
Cooldown: 5, // ON - 5 minutes prevents spam
GroupingWindow: 30, // ON - 30 seconds groups related alerts
MaxAlertsHour: 10, // ON - 10 alerts/hour prevents flooding
Cooldown: 5, // ON - 5 minutes prevents spam
GroupingWindow: 30, // ON - 30 seconds groups related alerts
MaxAlertsHour: 10, // ON - 10 alerts/hour prevents flooding
NotifyOnResolve: true,
Escalation: EscalationConfig{
Enabled: false, // OFF - requires user configuration
Levels: []EscalationLevel{
@@ -904,16 +906,37 @@ func (m *Manager) UpdateConfig(config AlertConfig) {
config.StorageDefault.Clear = 80
}
// Initialize Docker defaults if missing/zero
if config.DockerDefaults.CPU.Trigger <= 0 {
config.DockerDefaults.CPU = HysteresisThreshold{Trigger: 80, Clear: 75}
}
if config.DockerDefaults.Memory.Trigger <= 0 {
config.DockerDefaults.Memory = HysteresisThreshold{Trigger: 85, Clear: 80}
}
if config.DockerDefaults.Disk.Trigger <= 0 {
config.DockerDefaults.Disk = HysteresisThreshold{Trigger: 85, Clear: 80}
// Initialize Docker defaults while allowing explicit disables (trigger == 0)
normalizeDockerThreshold := func(th HysteresisThreshold, defaultTrigger float64, metricName string) HysteresisThreshold {
normalized := th
// Negative triggers are treated as unset and replaced with defaults.
if normalized.Trigger < 0 {
normalized.Trigger = defaultTrigger
}
// Explicit disable: keep trigger at 0 and clamp clear to 0.
if normalized.Trigger == 0 {
if normalized.Clear < 0 {
normalized.Clear = 0
}
return normalized
}
if normalized.Clear <= 0 {
normalized.Clear = normalized.Trigger - 5
if normalized.Clear < 0 {
normalized.Clear = 0
}
}
ensureValidHysteresis(&normalized, metricName)
return normalized
}
config.DockerDefaults.CPU = normalizeDockerThreshold(config.DockerDefaults.CPU, 80, "docker.cpu")
config.DockerDefaults.Memory = normalizeDockerThreshold(config.DockerDefaults.Memory, 85, "docker.memory")
config.DockerDefaults.Disk = normalizeDockerThreshold(config.DockerDefaults.Disk, 85, "docker.disk")
if config.DockerDefaults.RestartCount <= 0 {
config.DockerDefaults.RestartCount = 3
}
@@ -1399,8 +1422,16 @@ func (m *Manager) reevaluateActiveAlertsLocked() {
alertsToResolve = append(alertsToResolve, alertID)
continue
}
thresholds := m.config.GuestDefaults
thresholds := ThresholdConfig{
CPU: cloneThreshold(&m.config.DockerDefaults.CPU),
Memory: cloneThreshold(&m.config.DockerDefaults.Memory),
Disk: cloneThreshold(&m.config.DockerDefaults.Disk),
}
if override, exists := m.config.Overrides[resourceID]; exists {
if override.Disabled {
alertsToResolve = append(alertsToResolve, alertID)
continue
}
thresholds = m.applyThresholdOverride(thresholds, override)
}
threshold = getThresholdForMetric(thresholds, metricType)
@@ -5663,6 +5694,22 @@ func (m *Manager) GetRecentlyResolved() []models.ResolvedAlert {
return resolved
}
// GetResolvedAlert returns a copy of a recently resolved alert by ID.
func (m *Manager) GetResolvedAlert(alertID string) *ResolvedAlert {
m.resolvedMutex.RLock()
defer m.resolvedMutex.RUnlock()
resolved, ok := m.recentlyResolved[alertID]
if !ok || resolved == nil || resolved.Alert == nil {
return nil
}
return &ResolvedAlert{
Alert: resolved.Alert.Clone(),
ResolvedTime: resolved.ResolvedTime,
}
}
// GetAlertHistory returns alert history
func (m *Manager) GetAlertHistory(limit int) []Alert {
return m.historyManager.GetAllHistory(limit)

View File

@@ -1772,3 +1772,76 @@ func TestDisableAllStorageClearsExistingAlerts(t *testing.T) {
t.Fatalf("expected pending alert entry to be cleared after disabling all storage")
}
}
func TestUpdateConfigPreservesZeroDockerThresholds(t *testing.T) {
t.Helper()
m := NewManager()
config := m.GetConfig()
config.DockerDefaults.Memory = HysteresisThreshold{Trigger: 0, Clear: 0}
m.UpdateConfig(config)
m.mu.RLock()
defer m.mu.RUnlock()
if m.config.DockerDefaults.Memory.Trigger != 0 {
t.Fatalf("expected docker memory trigger to remain 0 when disabled, got %.1f", m.config.DockerDefaults.Memory.Trigger)
}
if m.config.DockerDefaults.Memory.Clear != 0 {
t.Fatalf("expected docker memory clear to remain 0 when disabled, got %.1f", m.config.DockerDefaults.Memory.Clear)
}
}
func TestReevaluateClearsDockerContainerAlertWhenOverrideDisabled(t *testing.T) {
m := NewManager()
resourceID := "docker:host-1/container-1"
alertID := resourceID + "-memory"
resolved := make(chan string, 1)
m.SetResolvedCallback(func(id string) {
resolved <- id
})
m.mu.Lock()
m.activeAlerts[alertID] = &Alert{
ID: alertID,
Type: "memory",
ResourceID: resourceID,
ResourceName: "qbittorrent",
Instance: "Docker",
Metadata: map[string]interface{}{
"resourceType": "Docker Container",
},
Threshold: 80,
Value: 90,
}
m.mu.Unlock()
config := m.GetConfig()
config.Overrides = map[string]ThresholdConfig{
resourceID: {
Disabled: true,
},
}
config.ActivationState = ActivationActive
m.UpdateConfig(config)
select {
case got := <-resolved:
if got != alertID {
t.Fatalf("resolved callback fired for unexpected alert %s", got)
}
case <-time.After(200 * time.Millisecond):
t.Fatalf("expected alert to be resolved when docker container override is disabled")
}
m.mu.RLock()
_, exists := m.activeAlerts[alertID]
m.mu.RUnlock()
if exists {
t.Fatalf("expected docker container alert to be cleared when override is disabled")
}
}

View File

@@ -91,6 +91,7 @@ func (h *AlertHandlers) UpdateAlertConfig(w http.ResponseWriter, r *http.Request
config.Schedule.Grouping.ByNode,
config.Schedule.Grouping.ByGuest,
)
h.monitor.GetNotificationManager().SetNotifyOnResolve(config.Schedule.NotifyOnResolve)
// Save to persistent storage
if err := h.monitor.GetConfigPersistence().SaveAlertConfig(config); err != nil {

View File

@@ -3865,6 +3865,7 @@ func New(cfg *config.Config) (*Monitor, error) {
alertConfig.Schedule.Grouping.ByNode,
alertConfig.Schedule.Grouping.ByGuest,
)
m.notificationMgr.SetNotifyOnResolve(alertConfig.Schedule.NotifyOnResolve)
} else {
log.Warn().Err(err).Msg("Failed to load alert configuration")
}
@@ -4288,6 +4289,11 @@ func (m *Monitor) Start(ctx context.Context, wsHub *websocket.Hub) {
m.alertManager.SetResolvedCallback(func(alertID string) {
wsHub.BroadcastAlertResolved(alertID)
m.notificationMgr.CancelAlert(alertID)
if m.notificationMgr.GetNotifyOnResolve() {
if resolved := m.alertManager.GetResolvedAlert(alertID); resolved != nil {
go m.notificationMgr.SendResolvedAlert(resolved)
}
}
// Don't broadcast full state here - it causes a cascade with many guests.
// The frontend will get the updated alerts through the regular broadcast ticker.
})

View File

@@ -6,6 +6,7 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"html"
"io"
"math"
"net"
@@ -42,6 +43,18 @@ const (
WebhookRateLimitMax = 10 // Max requests per window per webhook
)
const (
queueTypeSuffixResolved = "_resolved"
metadataResolvedAt = "resolvedAt"
)
type notificationEvent string
const (
eventAlert notificationEvent = "alert"
eventResolved notificationEvent = "resolved"
)
// createSecureWebhookClient creates an HTTP client with security controls
func (n *NotificationManager) createSecureWebhookClient(timeout time.Duration) *http.Client {
redirectCount := 0
@@ -112,6 +125,7 @@ type NotificationManager struct {
appriseConfig AppriseConfig
enabled bool
cooldown time.Duration
notifyOnResolve bool
lastNotified map[string]notificationRecord
groupWindow time.Duration
pendingAlerts []*alerts.Alert
@@ -178,6 +192,17 @@ func copyAppriseConfig(cfg AppriseConfig) AppriseConfig {
return copy
}
// annotateResolvedMetadata stores the resolution timestamp on the alert metadata for queue persistence.
func annotateResolvedMetadata(alert *alerts.Alert, resolvedAt time.Time) {
if alert == nil {
return
}
if alert.Metadata == nil {
alert.Metadata = make(map[string]interface{})
}
alert.Metadata[metadataResolvedAt] = resolvedAt.Format(time.RFC3339)
}
// NormalizeAppriseConfig cleans and normalizes Apprise configuration values.
func NormalizeAppriseConfig(cfg AppriseConfig) AppriseConfig {
normalized := cfg
@@ -334,10 +359,11 @@ func NewNotificationManager(publicURL string) *NotificationManager {
}
nm := &NotificationManager{
enabled: true,
cooldown: 5 * time.Minute,
lastNotified: make(map[string]notificationRecord),
webhooks: []WebhookConfig{},
enabled: true,
cooldown: 5 * time.Minute,
notifyOnResolve: true,
lastNotified: make(map[string]notificationRecord),
webhooks: []WebhookConfig{},
appriseConfig: AppriseConfig{
Enabled: false,
Mode: AppriseModeCLI,
@@ -442,6 +468,25 @@ func (n *NotificationManager) SetCooldown(minutes int) {
log.Info().Int("minutes", minutes).Msg("Updated notification cooldown")
}
// SetNotifyOnResolve toggles whether resolved alerts send notifications.
func (n *NotificationManager) SetNotifyOnResolve(enabled bool) {
n.mu.Lock()
was := n.notifyOnResolve
n.notifyOnResolve = enabled
n.mu.Unlock()
if was != enabled {
log.Info().Bool("enabled", enabled).Msg("Updated resolved alert notifications")
}
}
// GetNotifyOnResolve returns whether resolved alerts trigger notifications.
func (n *NotificationManager) GetNotifyOnResolve() bool {
n.mu.RLock()
defer n.mu.RUnlock()
return n.notifyOnResolve
}
// SetGroupingWindow updates the grouping window duration
func (n *NotificationManager) SetGroupingWindow(seconds int) {
n.mu.Lock()
@@ -580,6 +625,48 @@ func (n *NotificationManager) SendAlert(alert *alerts.Alert) {
}
}
// SendResolvedAlert delivers notifications for a resolved alert immediately.
func (n *NotificationManager) SendResolvedAlert(resolved *alerts.ResolvedAlert) {
if resolved == nil || resolved.Alert == nil {
return
}
// Clone the alert so downstream goroutines cannot mutate shared state.
alertCopy := resolved.Alert.Clone()
if alertCopy == nil {
return
}
resolvedAt := resolved.ResolvedTime
if resolvedAt.IsZero() {
resolvedAt = time.Now()
}
annotateResolvedMetadata(alertCopy, resolvedAt)
n.mu.RLock()
enabled := n.enabled && n.notifyOnResolve
emailConfig := copyEmailConfig(n.emailConfig)
webhooks := copyWebhookConfigs(n.webhooks)
appriseConfig := copyAppriseConfig(n.appriseConfig)
queue := n.queue
n.mu.RUnlock()
if !enabled {
log.Debug().
Str("alertID", alertCopy.ID).
Msg("Resolved notifications disabled, skipping")
return
}
alertsToSend := []*alerts.Alert{alertCopy}
if queue != nil {
n.enqueueResolvedNotifications(queue, emailConfig, webhooks, appriseConfig, alertsToSend, resolvedAt)
} else {
n.sendResolvedNotificationsDirect(emailConfig, webhooks, appriseConfig, alertsToSend, resolvedAt)
}
}
// CancelAlert removes pending notifications for a resolved alert
func (n *NotificationManager) CancelAlert(alertID string) {
n.mu.Lock()
@@ -765,6 +852,86 @@ func (n *NotificationManager) enqueueNotifications(emailConfig EmailConfig, webh
}
}
// enqueueResolvedNotifications adds resolved notifications to the persistent queue.
func (n *NotificationManager) enqueueResolvedNotifications(queue *NotificationQueue, emailConfig EmailConfig, webhooks []WebhookConfig, appriseConfig AppriseConfig, alertsToSend []*alerts.Alert, resolvedAt time.Time) {
if queue == nil {
return
}
anyFailed := false
if emailConfig.Enabled {
configJSON, err := json.Marshal(emailConfig)
if err != nil {
log.Error().Err(err).Msg("Failed to marshal email config for resolved queue")
} else {
notif := &QueuedNotification{
Type: "email" + queueTypeSuffixResolved,
Alerts: alertsToSend,
Config: configJSON,
MaxAttempts: 3,
}
if err := queue.Enqueue(notif); err != nil {
log.Error().Err(err).Msg("Failed to enqueue resolved email notification - falling back to direct send")
anyFailed = true
go n.sendResolvedEmail(emailConfig, alertsToSend, resolvedAt)
} else {
log.Debug().Int("alertCount", len(alertsToSend)).Msg("Enqueued resolved email notification")
}
}
}
for _, webhook := range webhooks {
if !webhook.Enabled {
continue
}
webhookCopy := webhook
configJSON, err := json.Marshal(webhookCopy)
if err != nil {
log.Error().Err(err).Str("webhookName", webhookCopy.Name).Msg("Failed to marshal webhook config for resolved queue")
continue
}
notif := &QueuedNotification{
Type: "webhook" + queueTypeSuffixResolved,
Alerts: alertsToSend,
Config: configJSON,
MaxAttempts: 3,
}
if err := queue.Enqueue(notif); err != nil {
log.Error().Err(err).Str("webhookName", webhookCopy.Name).Msg("Failed to enqueue resolved webhook notification - falling back to direct send")
anyFailed = true
go n.sendResolvedWebhook(webhookCopy, alertsToSend, resolvedAt)
} else {
log.Debug().Str("webhookName", webhookCopy.Name).Int("alertCount", len(alertsToSend)).Msg("Enqueued resolved webhook notification")
}
}
if appriseConfig.Enabled {
configJSON, err := json.Marshal(appriseConfig)
if err != nil {
log.Error().Err(err).Msg("Failed to marshal apprise config for resolved queue")
} else {
notif := &QueuedNotification{
Type: "apprise" + queueTypeSuffixResolved,
Alerts: alertsToSend,
Config: configJSON,
MaxAttempts: 3,
}
if err := queue.Enqueue(notif); err != nil {
log.Error().Err(err).Msg("Failed to enqueue resolved Apprise notification - falling back to direct send")
anyFailed = true
go n.sendResolvedApprise(appriseConfig, alertsToSend, resolvedAt)
} else {
log.Debug().Int("alertCount", len(alertsToSend)).Msg("Enqueued resolved Apprise notification")
}
}
}
if anyFailed {
log.Debug().Msg("At least one resolved notification enqueue failed; direct sends were triggered")
}
}
// sendNotificationsDirect sends notifications without using the queue (fallback)
func (n *NotificationManager) sendNotificationsDirect(emailConfig EmailConfig, webhooks []WebhookConfig, appriseConfig AppriseConfig, alertsToSend []*alerts.Alert) {
// Send notifications using the captured snapshots outside the lock to avoid blocking writers
@@ -794,6 +961,44 @@ func (n *NotificationManager) sendNotificationsDirect(emailConfig EmailConfig, w
}
}
// sendResolvedNotificationsDirect delivers resolved notifications without queue persistence.
func (n *NotificationManager) sendResolvedNotificationsDirect(emailConfig EmailConfig, webhooks []WebhookConfig, appriseConfig AppriseConfig, alertsToSend []*alerts.Alert, resolvedAt time.Time) {
if len(alertsToSend) == 0 {
return
}
if emailConfig.Enabled {
go func() {
if err := n.sendResolvedEmail(emailConfig, alertsToSend, resolvedAt); err != nil {
log.Error().Err(err).Msg("Failed to send resolved email notification")
}
}()
}
for _, webhook := range webhooks {
if !webhook.Enabled {
continue
}
webhookCopy := webhook
go func() {
if err := n.sendResolvedWebhook(webhookCopy, alertsToSend, resolvedAt); err != nil {
log.Error().
Err(err).
Str("webhookName", webhookCopy.Name).
Msg("Failed to send resolved webhook notification")
}
}()
}
if appriseConfig.Enabled {
go func() {
if err := n.sendResolvedApprise(appriseConfig, alertsToSend, resolvedAt); err != nil {
log.Error().Err(err).Msg("Failed to send resolved Apprise notification")
}
}()
}
}
// sendGroupedEmail sends a grouped email notification
func (n *NotificationManager) sendGroupedEmail(config EmailConfig, alertList []*alerts.Alert) error {
@@ -807,6 +1012,19 @@ func (n *NotificationManager) sendGroupedEmail(config EmailConfig, alertList []*
return n.sendHTMLEmailWithError(subject, htmlBody, textBody, config)
}
func (n *NotificationManager) sendResolvedEmail(config EmailConfig, alertList []*alerts.Alert, resolvedAt time.Time) error {
if len(alertList) == 0 {
return fmt.Errorf("no alerts to send")
}
subject, htmlBody, textBody := buildResolvedNotificationContent(alertList, resolvedAt, n.publicURL)
if subject == "" && textBody == "" {
return fmt.Errorf("failed to build resolved email content")
}
return n.sendHTMLEmailWithError(subject, htmlBody, textBody, config)
}
func (n *NotificationManager) sendGroupedApprise(config AppriseConfig, alertList []*alerts.Alert) error {
if len(alertList) == 0 {
return fmt.Errorf("no alerts to send")
@@ -891,6 +1109,81 @@ func buildApprisePayload(alertList []*alerts.Alert, publicURL string) (string, s
return title, bodyBuilder.String(), resolveAppriseNotificationType(validAlerts)
}
func buildResolvedNotificationContent(alertList []*alerts.Alert, resolvedAt time.Time, publicURL string) (string, string, string) {
validAlerts := make([]*alerts.Alert, 0, len(alertList))
var primary *alerts.Alert
for _, alert := range alertList {
if alert == nil {
continue
}
if primary == nil {
primary = alert
}
validAlerts = append(validAlerts, alert)
}
if len(validAlerts) == 0 || primary == nil {
return "", "", ""
}
if resolvedAt.IsZero() {
resolvedAt = time.Now()
}
resolvedLabel := resolvedAt.Format(time.RFC3339)
title := fmt.Sprintf("Pulse alert resolved: %s", primary.ResourceName)
if len(validAlerts) > 1 {
title = fmt.Sprintf("Pulse alerts resolved (%d)", len(validAlerts))
}
var bodyBuilder strings.Builder
bodyBuilder.WriteString("Resolved at ")
bodyBuilder.WriteString(resolvedLabel)
bodyBuilder.WriteString("\n\n")
for _, alert := range validAlerts {
bodyBuilder.WriteString(fmt.Sprintf("[%s] %s\n", strings.ToUpper(string(alert.Level)), alert.ResourceName))
if alert.Message != "" {
bodyBuilder.WriteString(alert.Message)
bodyBuilder.WriteString("\n")
}
if !alert.StartTime.IsZero() {
bodyBuilder.WriteString("Started: ")
bodyBuilder.WriteString(alert.StartTime.Format(time.RFC3339))
bodyBuilder.WriteString("\n")
}
bodyBuilder.WriteString("Cleared: ")
bodyBuilder.WriteString(resolvedLabel)
bodyBuilder.WriteString("\n")
if alert.Node != "" {
bodyBuilder.WriteString("Node: ")
bodyBuilder.WriteString(alert.Node)
bodyBuilder.WriteString("\n")
}
if alert.Instance != "" && alert.Instance != alert.Node {
bodyBuilder.WriteString("Instance: ")
bodyBuilder.WriteString(alert.Instance)
bodyBuilder.WriteString("\n")
}
if alert.Threshold != 0 || alert.Value != 0 {
bodyBuilder.WriteString(fmt.Sprintf("Last value %.2f (threshold %.2f)\n", alert.Value, alert.Threshold))
}
bodyBuilder.WriteString("\n")
}
if publicURL != "" {
bodyBuilder.WriteString("Dashboard: ")
bodyBuilder.WriteString(publicURL)
bodyBuilder.WriteString("\n")
}
textBody := bodyBuilder.String()
htmlBody := "<pre style=\"font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, \\\"Liberation Mono\\\", \\\"Courier New\\\", monospace\">" +
html.EscapeString(textBody) + "</pre>"
return title, htmlBody, textBody
}
func resolveAppriseNotificationType(alertList []*alerts.Alert) string {
notifyType := "info"
for _, alert := range alertList {
@@ -1044,6 +1337,45 @@ func (n *NotificationManager) sendAppriseViaHTTP(cfg AppriseConfig, title, body,
return nil
}
func (n *NotificationManager) sendResolvedApprise(config AppriseConfig, alertList []*alerts.Alert, resolvedAt time.Time) error {
if len(alertList) == 0 {
return fmt.Errorf("no alerts to send")
}
cfg := NormalizeAppriseConfig(config)
if !cfg.Enabled {
return fmt.Errorf("apprise not enabled")
}
title, _, body := buildResolvedNotificationContent(alertList, resolvedAt, n.publicURL)
if title == "" && body == "" {
return fmt.Errorf("failed to build resolved apprise payload")
}
switch cfg.Mode {
case AppriseModeHTTP:
if err := n.sendAppriseViaHTTP(cfg, title, body, "info"); err != nil {
log.Warn().
Err(err).
Str("mode", string(cfg.Mode)).
Str("serverUrl", cfg.ServerURL).
Msg("Failed to send resolved Apprise notification via API")
return fmt.Errorf("apprise HTTP send failed: %w", err)
}
default:
if err := n.sendAppriseViaCLI(cfg, title, body); err != nil {
log.Warn().
Err(err).
Str("mode", string(cfg.Mode)).
Str("cliPath", cfg.CLIPath).
Strs("targets", cfg.Targets).
Msg("Failed to send resolved Apprise notification")
return fmt.Errorf("apprise CLI send failed: %w", err)
}
}
return nil
}
// sendEmail sends an email notification
func (n *NotificationManager) sendEmail(alert *alerts.Alert) {
n.mu.RLock()
@@ -1406,6 +1738,49 @@ func (n *NotificationManager) sendGroupedWebhook(webhook WebhookConfig, alertLis
return n.sendWebhookRequest(webhook, jsonData, "grouped")
}
func (n *NotificationManager) sendResolvedWebhook(webhook WebhookConfig, alertList []*alerts.Alert, resolvedAt time.Time) error {
if len(alertList) == 0 {
return fmt.Errorf("no alerts to send")
}
if !webhook.Enabled {
return fmt.Errorf("webhook is disabled")
}
if resolvedAt.IsZero() {
resolvedAt = time.Now()
}
payload := map[string]interface{}{
"event": string(eventResolved),
"alerts": alertList,
"count": len(alertList),
"resolvedAt": resolvedAt.Unix(),
"resolvedAtIso": resolvedAt.Format(time.RFC3339),
"source": "pulse-monitoring",
}
if n.publicURL != "" {
payload["dashboard"] = n.publicURL
}
if len(alertList) == 1 && alertList[0] != nil {
payload["alertId"] = alertList[0].ID
}
jsonData, err := json.Marshal(payload)
if err != nil {
log.Error().
Err(err).
Str("webhook", webhook.Name).
Int("alertCount", len(alertList)).
Msg("Failed to marshal resolved webhook payload")
return fmt.Errorf("failed to marshal resolved webhook payload: %w", err)
}
return n.sendWebhookRequest(webhook, jsonData, "resolved")
}
// checkWebhookRateLimit checks if a webhook can be sent based on rate limits
func (n *NotificationManager) checkWebhookRateLimit(webhookURL string) bool {
n.webhookRateMu.Lock()
@@ -2476,43 +2851,89 @@ func (n *NotificationManager) SendTestNotificationWithConfig(method string, conf
}
}
func normalizeQueueType(notifType string) (string, notificationEvent) {
if strings.HasSuffix(notifType, queueTypeSuffixResolved) {
return strings.TrimSuffix(notifType, queueTypeSuffixResolved), eventResolved
}
return notifType, eventAlert
}
func resolvedTimeFromAlerts(alerts []*alerts.Alert) time.Time {
for _, alert := range alerts {
if alert == nil || alert.Metadata == nil {
continue
}
raw, ok := alert.Metadata[metadataResolvedAt]
if !ok {
continue
}
switch ts := raw.(type) {
case string:
if parsed, err := time.Parse(time.RFC3339, ts); err == nil {
return parsed
}
case float64:
if ts > 0 {
return time.Unix(int64(ts), 0)
}
}
}
return time.Now()
}
// ProcessQueuedNotification processes a notification from the persistent queue
func (n *NotificationManager) ProcessQueuedNotification(notif *QueuedNotification) error {
baseType, event := normalizeQueueType(notif.Type)
log.Debug().
Str("notificationID", notif.ID).
Str("type", notif.Type).
Str("type", baseType).
Str("event", string(event)).
Int("alertCount", len(notif.Alerts)).
Msg("Processing queued notification")
var err error
switch notif.Type {
switch baseType {
case "email":
var emailConfig EmailConfig
if err = json.Unmarshal(notif.Config, &emailConfig); err != nil {
return fmt.Errorf("failed to unmarshal email config: %w", err)
}
err = n.sendGroupedEmail(emailConfig, notif.Alerts)
if event == eventResolved {
err = n.sendResolvedEmail(emailConfig, notif.Alerts, resolvedTimeFromAlerts(notif.Alerts))
} else {
err = n.sendGroupedEmail(emailConfig, notif.Alerts)
}
case "webhook":
var webhookConfig WebhookConfig
if err = json.Unmarshal(notif.Config, &webhookConfig); err != nil {
return fmt.Errorf("failed to unmarshal webhook config: %w", err)
}
err = n.sendGroupedWebhook(webhookConfig, notif.Alerts)
if event == eventResolved {
err = n.sendResolvedWebhook(webhookConfig, notif.Alerts, resolvedTimeFromAlerts(notif.Alerts))
} else {
err = n.sendGroupedWebhook(webhookConfig, notif.Alerts)
}
case "apprise":
var appriseConfig AppriseConfig
if err = json.Unmarshal(notif.Config, &appriseConfig); err != nil {
return fmt.Errorf("failed to unmarshal apprise config: %w", err)
}
err = n.sendGroupedApprise(appriseConfig, notif.Alerts)
if event == eventResolved {
err = n.sendResolvedApprise(appriseConfig, notif.Alerts, resolvedTimeFromAlerts(notif.Alerts))
} else {
err = n.sendGroupedApprise(appriseConfig, notif.Alerts)
}
default:
return fmt.Errorf("unknown notification type: %s", notif.Type)
return fmt.Errorf("unknown notification type: %s", baseType)
}
// Mark cooldown after successful send
if err == nil {
// Mark cooldown after successful send for active alerts only
if err == nil && event == eventAlert {
n.mu.Lock()
now := time.Now()
for _, alert := range notif.Alerts {