diff --git a/internal/monitoring/diagnostic_snapshots.go b/internal/monitoring/diagnostic_snapshots.go index 9f02aa210..db8a93707 100644 --- a/internal/monitoring/diagnostic_snapshots.go +++ b/internal/monitoring/diagnostic_snapshots.go @@ -21,6 +21,9 @@ type NodeMemoryRaw struct { Cached uint64 `json:"cached"` Shared uint64 `json:"shared"` EffectiveAvailable uint64 `json:"effectiveAvailable"` + RRDAvailable uint64 `json:"rrdAvailable,omitempty"` + RRDUsed uint64 `json:"rrdUsed,omitempty"` + RRDTotal uint64 `json:"rrdTotal,omitempty"` TotalMinusUsed uint64 `json:"totalMinusUsed,omitempty"` FallbackTotal uint64 `json:"fallbackTotal,omitempty"` FallbackUsed uint64 `json:"fallbackUsed,omitempty"` @@ -138,6 +141,15 @@ func (m *Monitor) logNodeMemorySource(instance, node string, snapshot NodeMemory if snapshot.Raw.TotalMinusUsed > 0 { evt = evt.Uint64("rawTotalMinusUsed", snapshot.Raw.TotalMinusUsed) } + if snapshot.Raw.RRDAvailable > 0 { + evt = evt.Uint64("rrdAvailable", snapshot.Raw.RRDAvailable) + } + if snapshot.Raw.RRDUsed > 0 { + evt = evt.Uint64("rrdUsed", snapshot.Raw.RRDUsed) + } + if snapshot.Raw.RRDTotal > 0 { + evt = evt.Uint64("rrdTotal", snapshot.Raw.RRDTotal) + } if snapshot.Memory.Total > 0 { evt = evt.Int64("total", snapshot.Memory.Total) } diff --git a/internal/monitoring/monitor.go b/internal/monitoring/monitor.go index 48d1bd1ad..6f597e9ce 100644 --- a/internal/monitoring/monitor.go +++ b/internal/monitoring/monitor.go @@ -475,7 +475,9 @@ type Monitor struct { } type rrdMemCacheEntry struct { - value uint64 + available uint64 + used uint64 + total uint64 fetchedAt time.Time } @@ -569,9 +571,9 @@ type taskOutcome struct { recordedAt time.Time } -func (m *Monitor) getNodeRRDMemAvailable(ctx context.Context, client PVEClientInterface, nodeName string) (uint64, error) { +func (m *Monitor) getNodeRRDMetrics(ctx context.Context, client PVEClientInterface, nodeName string) (rrdMemCacheEntry, error) { if client == nil || nodeName == "" { - return 0, fmt.Errorf("invalid arguments for RRD lookup") + return rrdMemCacheEntry{}, fmt.Errorf("invalid arguments for RRD lookup") } now := time.Now() @@ -579,51 +581,67 @@ func (m *Monitor) getNodeRRDMemAvailable(ctx context.Context, client PVEClientIn m.rrdCacheMu.RLock() if entry, ok := m.nodeRRDMemCache[nodeName]; ok && now.Sub(entry.fetchedAt) < nodeRRDCacheTTL { m.rrdCacheMu.RUnlock() - return entry.value, nil + return entry, nil } m.rrdCacheMu.RUnlock() requestCtx, cancel := context.WithTimeout(ctx, nodeRRDRequestTimeout) defer cancel() - points, err := client.GetNodeRRDData(requestCtx, nodeName, "hour", "AVERAGE", []string{"memavailable", "memtotal"}) + points, err := client.GetNodeRRDData(requestCtx, nodeName, "hour", "AVERAGE", []string{"memavailable", "memused", "memtotal"}) if err != nil { - return 0, err + return rrdMemCacheEntry{}, err } var memAvailable uint64 + var memUsed uint64 var memTotal uint64 for i := len(points) - 1; i >= 0; i-- { point := points[i] - if point.MemTotal != nil && !math.IsNaN(*point.MemTotal) && *point.MemTotal > 0 { + + if memTotal == 0 && point.MemTotal != nil && !math.IsNaN(*point.MemTotal) && *point.MemTotal > 0 { memTotal = uint64(math.Round(*point.MemTotal)) } - if point.MemAvailable == nil || math.IsNaN(*point.MemAvailable) || *point.MemAvailable <= 0 { - continue + if memAvailable == 0 && point.MemAvailable != nil && !math.IsNaN(*point.MemAvailable) && *point.MemAvailable > 0 { + memAvailable = uint64(math.Round(*point.MemAvailable)) } - memAvailable = uint64(math.Round(*point.MemAvailable)) - break + if memUsed == 0 && point.MemUsed != nil && !math.IsNaN(*point.MemUsed) && *point.MemUsed > 0 { + memUsed = uint64(math.Round(*point.MemUsed)) + } + + if memTotal > 0 && (memAvailable > 0 || memUsed > 0) { + break + } } - if memAvailable == 0 { - return 0, fmt.Errorf("rrd memavailable not present") + if memTotal > 0 { + if memAvailable > memTotal { + memAvailable = memTotal + } + if memUsed > memTotal { + memUsed = memTotal + } } - if memTotal > 0 && memAvailable > memTotal { - memAvailable = memTotal + if memAvailable == 0 && memUsed == 0 { + return rrdMemCacheEntry{}, fmt.Errorf("rrd mem metrics not present") + } + + entry := rrdMemCacheEntry{ + available: memAvailable, + used: memUsed, + total: memTotal, + fetchedAt: now, } m.rrdCacheMu.Lock() - m.nodeRRDMemCache[nodeName] = rrdMemCacheEntry{ - value: memAvailable, - fetchedAt: now, - } + m.nodeRRDMemCache[nodeName] = entry m.rrdCacheMu.Unlock() - return memAvailable, nil + return entry, nil } // RemoveDockerHost removes a docker host from the shared state and clears related alerts. @@ -3670,7 +3688,6 @@ func (m *Monitor) pollPVEInstance(ctx context.Context, instanceName string, clie if nodeInfo.Memory != nil && nodeInfo.Memory.Total > 0 { var actualUsed uint64 effectiveAvailable := nodeInfo.Memory.EffectiveAvailable() - usedRRDFallback := false componentAvailable := nodeInfo.Memory.Free if nodeInfo.Memory.Buffers > 0 { if math.MaxUint64-componentAvailable < nodeInfo.Memory.Buffers { @@ -3701,10 +3718,22 @@ func (m *Monitor) pollPVEInstance(ctx context.Context, instanceName string, clie nodeInfo.Memory.Buffers == 0 && nodeInfo.Memory.Cached == 0 + var rrdMetrics rrdMemCacheEntry + haveRRDMetrics := false + usedRRDAvailableFallback := false + rrdMemUsedFallback := false + if effectiveAvailable == 0 && missingCacheMetrics { - if memAvail, err := m.getNodeRRDMemAvailable(ctx, client, node.Node); err == nil && memAvail > 0 { - effectiveAvailable = memAvail - usedRRDFallback = true + if metrics, err := m.getNodeRRDMetrics(ctx, client, node.Node); err == nil { + haveRRDMetrics = true + rrdMetrics = metrics + if metrics.available > 0 { + effectiveAvailable = metrics.available + usedRRDAvailableFallback = true + } + if metrics.used > 0 { + rrdMemUsedFallback = true + } } else if err != nil { log.Debug(). Err(err). @@ -3723,7 +3752,7 @@ func (m *Monitor) pollPVEInstance(ctx context.Context, instanceName string, clie } } - derivedFromTotalMinusUsed := !usedRRDFallback && + derivedFromTotalMinusUsed := !usedRRDAvailableFallback && missingCacheMetrics && availableFromUsed > 0 && gapGreaterThanComponents && @@ -3743,7 +3772,10 @@ func (m *Monitor) pollPVEInstance(ctx context.Context, instanceName string, clie Uint64("effectiveAvailable", effectiveAvailable). Uint64("actualUsed", actualUsed). Float64("usage", safePercentage(float64(actualUsed), float64(nodeInfo.Memory.Total))) - if usedRRDFallback { + if usedRRDAvailableFallback { + if haveRRDMetrics && rrdMetrics.available > 0 { + logCtx = logCtx.Uint64("rrdAvailable", rrdMetrics.available) + } logCtx.Msg("Node memory: using RRD memavailable fallback (excludes reclaimable cache)") nodeMemorySource = "rrd-memavailable" nodeFallbackReason = "rrd-memavailable" @@ -3775,20 +3807,44 @@ func (m *Monitor) pollPVEInstance(ctx context.Context, instanceName string, clie nodeMemorySource = "derived-free-buffers-cached" } default: - // Fallback to traditional used memory if no cache-aware data is exposed - actualUsed = nodeInfo.Memory.Used - if actualUsed > nodeInfo.Memory.Total { - actualUsed = nodeInfo.Memory.Total + switch { + case rrdMemUsedFallback && haveRRDMetrics && rrdMetrics.used > 0: + actualUsed = rrdMetrics.used + if actualUsed > nodeInfo.Memory.Total { + actualUsed = nodeInfo.Memory.Total + } + log.Debug(). + Str("node", node.Node). + Uint64("total", nodeInfo.Memory.Total). + Uint64("rrdUsed", rrdMetrics.used). + Msg("Node memory: using RRD memused fallback (excludes reclaimable cache)") + nodeMemorySource = "rrd-memused" + if nodeFallbackReason == "" { + nodeFallbackReason = "rrd-memused" + } + nodeSnapshotRaw.FallbackCalculated = true + nodeSnapshotRaw.ProxmoxMemorySource = "rrd-memused" + default: + // Fallback to traditional used memory if no cache-aware data is exposed + actualUsed = nodeInfo.Memory.Used + if actualUsed > nodeInfo.Memory.Total { + actualUsed = nodeInfo.Memory.Total + } + log.Debug(). + Str("node", node.Node). + Uint64("total", nodeInfo.Memory.Total). + Uint64("used", actualUsed). + Msg("Node memory: no cache-aware metrics - using traditional calculation (includes cache)") + nodeMemorySource = "node-status-used" } - log.Debug(). - Str("node", node.Node). - Uint64("total", nodeInfo.Memory.Total). - Uint64("used", actualUsed). - Msg("Node memory: no cache-aware metrics - using traditional calculation (includes cache)") - nodeMemorySource = "node-status-used" } nodeSnapshotRaw.EffectiveAvailable = effectiveAvailable + if haveRRDMetrics { + nodeSnapshotRaw.RRDAvailable = rrdMetrics.available + nodeSnapshotRaw.RRDUsed = rrdMetrics.used + nodeSnapshotRaw.RRDTotal = rrdMetrics.total + } free := int64(nodeInfo.Memory.Total - actualUsed) if free < 0 { diff --git a/internal/monitoring/monitor_memory_test.go b/internal/monitoring/monitor_memory_test.go new file mode 100644 index 000000000..2f3507048 --- /dev/null +++ b/internal/monitoring/monitor_memory_test.go @@ -0,0 +1,178 @@ +package monitoring + +import ( + "context" + "math" + "testing" + "time" + + "github.com/rcourtman/pulse-go-rewrite/internal/alerts" + "github.com/rcourtman/pulse-go-rewrite/internal/config" + "github.com/rcourtman/pulse-go-rewrite/internal/models" + "github.com/rcourtman/pulse-go-rewrite/internal/notifications" + "github.com/rcourtman/pulse-go-rewrite/pkg/proxmox" +) + +type stubPVEClient struct { + nodes []proxmox.Node + nodeStatus *proxmox.NodeStatus + rrdPoints []proxmox.NodeRRDPoint +} + +var _ PVEClientInterface = (*stubPVEClient)(nil) + +func (s *stubPVEClient) GetNodes(ctx context.Context) ([]proxmox.Node, error) { + return s.nodes, nil +} + +func (s *stubPVEClient) GetNodeStatus(ctx context.Context, node string) (*proxmox.NodeStatus, error) { + return s.nodeStatus, nil +} + +func (s *stubPVEClient) GetNodeRRDData(ctx context.Context, node string, timeframe string, cf string, ds []string) ([]proxmox.NodeRRDPoint, error) { + return s.rrdPoints, nil +} + +func (s *stubPVEClient) GetVMs(ctx context.Context, node string) ([]proxmox.VM, error) { + return nil, nil +} +func (s *stubPVEClient) GetContainers(ctx context.Context, node string) ([]proxmox.Container, error) { + return nil, nil +} +func (s *stubPVEClient) GetStorage(ctx context.Context, node string) ([]proxmox.Storage, error) { + return nil, nil +} +func (s *stubPVEClient) GetAllStorage(ctx context.Context) ([]proxmox.Storage, error) { + return nil, nil +} +func (s *stubPVEClient) GetBackupTasks(ctx context.Context) ([]proxmox.Task, error) { return nil, nil } +func (s *stubPVEClient) GetStorageContent(ctx context.Context, node, storage string) ([]proxmox.StorageContent, error) { + return nil, nil +} +func (s *stubPVEClient) GetVMSnapshots(ctx context.Context, node string, vmid int) ([]proxmox.Snapshot, error) { + return nil, nil +} +func (s *stubPVEClient) GetContainerSnapshots(ctx context.Context, node string, vmid int) ([]proxmox.Snapshot, error) { + return nil, nil +} +func (s *stubPVEClient) GetVMStatus(ctx context.Context, node string, vmid int) (*proxmox.VMStatus, error) { + return nil, nil +} +func (s *stubPVEClient) GetContainerStatus(ctx context.Context, node string, vmid int) (*proxmox.Container, error) { + return nil, nil +} +func (s *stubPVEClient) GetClusterResources(ctx context.Context, resourceType string) ([]proxmox.ClusterResource, error) { + return nil, nil +} +func (s *stubPVEClient) IsClusterMember(ctx context.Context) (bool, error) { return false, nil } +func (s *stubPVEClient) GetVMFSInfo(ctx context.Context, node string, vmid int) ([]proxmox.VMFileSystem, error) { + return nil, nil +} +func (s *stubPVEClient) GetVMNetworkInterfaces(ctx context.Context, node string, vmid int) ([]proxmox.VMNetworkInterface, error) { + return nil, nil +} +func (s *stubPVEClient) GetVMAgentInfo(ctx context.Context, node string, vmid int) (map[string]interface{}, error) { + return map[string]interface{}{}, nil +} +func (s *stubPVEClient) GetZFSPoolStatus(ctx context.Context, node string) ([]proxmox.ZFSPoolStatus, error) { + return nil, nil +} +func (s *stubPVEClient) GetZFSPoolsWithDetails(ctx context.Context, node string) ([]proxmox.ZFSPoolInfo, error) { + return nil, nil +} +func (s *stubPVEClient) GetDisks(ctx context.Context, node string) ([]proxmox.Disk, error) { + return nil, nil +} +func (s *stubPVEClient) GetCephStatus(ctx context.Context) (*proxmox.CephStatus, error) { + return nil, nil +} +func (s *stubPVEClient) GetCephDF(ctx context.Context) (*proxmox.CephDF, error) { return nil, nil } + +func floatPtr(v float64) *float64 { return &v } + +func TestPollPVEInstanceUsesRRDMemUsedFallback(t *testing.T) { + t.Setenv("PULSE_DATA_DIR", t.TempDir()) + + total := uint64(16 * 1024 * 1024 * 1024) + actualUsed := total / 3 + + client := &stubPVEClient{ + nodes: []proxmox.Node{ + { + Node: "node1", + Status: "online", + CPU: 0.15, + MaxCPU: 8, + Mem: total, + MaxMem: total, + Disk: 0, + MaxDisk: 0, + Uptime: 3600, + }, + }, + nodeStatus: &proxmox.NodeStatus{ + Memory: &proxmox.MemoryStatus{ + Total: total, + Used: total, + Free: 0, + }, + }, + rrdPoints: []proxmox.NodeRRDPoint{ + { + MemTotal: floatPtr(float64(total)), + MemUsed: floatPtr(float64(actualUsed)), + }, + }, + } + + mon := &Monitor{ + config: &config.Config{ + PVEInstances: []config.PVEInstance{ + { + Name: "test", + Host: "https://pve", + }, + }, + }, + state: models.NewState(), + alertManager: alerts.NewManager(), + notificationMgr: notifications.NewNotificationManager(""), + metricsHistory: NewMetricsHistory(32, time.Hour), + nodeSnapshots: make(map[string]NodeMemorySnapshot), + guestSnapshots: make(map[string]GuestMemorySnapshot), + nodeRRDMemCache: make(map[string]rrdMemCacheEntry), + lastClusterCheck: make(map[string]time.Time), + lastPhysicalDiskPoll: make(map[string]time.Time), + } + defer mon.alertManager.Stop() + + mon.pollPVEInstance(context.Background(), "test", client) + + snapshot := mon.state.GetSnapshot() + if len(snapshot.Nodes) != 1 { + t.Fatalf("expected one node in state, got %d", len(snapshot.Nodes)) + } + + node := snapshot.Nodes[0] + expectedUsage := (float64(actualUsed) / float64(total)) * 100 + if diff := math.Abs(node.Memory.Usage - expectedUsage); diff > 0.5 { + t.Fatalf("memory usage mismatch: got %.2f want %.2f (diff %.2f)", node.Memory.Usage, expectedUsage, diff) + } + if node.Memory.Used != int64(actualUsed) { + t.Fatalf("memory used mismatch: got %d want %d", node.Memory.Used, actualUsed) + } + + snapKey := makeNodeSnapshotKey("test", "node1") + mon.diagMu.RLock() + snap, ok := mon.nodeSnapshots[snapKey] + mon.diagMu.RUnlock() + if !ok { + t.Fatal("expected node snapshot entry to be recorded") + } + if snap.MemorySource != "rrd-memused" { + t.Fatalf("expected memory source rrd-memused, got %q", snap.MemorySource) + } + if snap.Raw.RRDUsed != actualUsed { + t.Fatalf("expected snapshot RRD used %d, got %d", actualUsed, snap.Raw.RRDUsed) + } +}