diff --git a/internal/api/diagnostics.go b/internal/api/diagnostics.go index e4dc1517e..6d5ed1d11 100644 --- a/internal/api/diagnostics.go +++ b/internal/api/diagnostics.go @@ -13,9 +13,11 @@ import ( "sort" "strconv" "strings" + "sync" "syscall" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/rcourtman/pulse-go-rewrite/internal/alerts" "github.com/rcourtman/pulse-go-rewrite/internal/config" "github.com/rcourtman/pulse-go-rewrite/internal/models" @@ -51,7 +53,7 @@ type DiagnosticsInfo struct { // MemorySourceStat aggregates memory-source usage per instance. type MemorySourceStat struct { - Instance string `json:"instance"` + Instance string `json:"instance"` Source string `json:"source"` NodeCount int `json:"nodeCount"` LastUpdated string `json:"lastUpdated"` @@ -67,6 +69,38 @@ func isFallbackMemorySource(source string) bool { } } +const diagnosticsCacheTTL = 45 * time.Second + +var ( + diagnosticsMetricsOnce sync.Once + + diagnosticsCacheMu sync.RWMutex + diagnosticsCache DiagnosticsInfo + diagnosticsCacheTimestamp time.Time + + diagnosticsCacheHits = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "pulse", + Subsystem: "diagnostics", + Name: "cache_hits_total", + Help: "Total number of diagnostics cache hits.", + }) + + diagnosticsCacheMisses = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "pulse", + Subsystem: "diagnostics", + Name: "cache_misses_total", + Help: "Total number of diagnostics cache misses.", + }) + + diagnosticsRefreshDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "pulse", + Subsystem: "diagnostics", + Name: "refresh_duration_seconds", + Help: "Duration of diagnostics refresh operations in seconds.", + Buckets: []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10, 20, 30}, + }) +) + // NodeDiagnostic contains diagnostic info for a Proxmox node type NodeDiagnostic struct { ID string `json:"id"` @@ -256,9 +290,53 @@ type AlertsDiagnostic struct { // handleDiagnostics returns comprehensive diagnostic information func (r *Router) handleDiagnostics(w http.ResponseWriter, req *http.Request) { + diagnosticsMetricsOnce.Do(func() { + prometheus.MustRegister(diagnosticsCacheHits, diagnosticsCacheMisses, diagnosticsRefreshDuration) + }) + + now := time.Now() + + diagnosticsCacheMu.RLock() + cachedDiag := diagnosticsCache + cachedAt := diagnosticsCacheTimestamp + diagnosticsCacheMu.RUnlock() + + if !cachedAt.IsZero() && now.Sub(cachedAt) <= diagnosticsCacheTTL { + diagnosticsCacheHits.Inc() + writeDiagnosticsResponse(w, cachedDiag, cachedAt) + return + } + + diagnosticsCacheMisses.Inc() + ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second) defer cancel() + start := time.Now() + fresh := r.computeDiagnostics(ctx) + diagnosticsRefreshDuration.Observe(time.Since(start).Seconds()) + + diagnosticsCacheMu.Lock() + diagnosticsCache = fresh + diagnosticsCacheTimestamp = time.Now() + cachedAt = diagnosticsCacheTimestamp + diagnosticsCacheMu.Unlock() + + writeDiagnosticsResponse(w, fresh, cachedAt) +} + +func writeDiagnosticsResponse(w http.ResponseWriter, diag DiagnosticsInfo, cachedAt time.Time) { + w.Header().Set("Content-Type", "application/json") + if !cachedAt.IsZero() { + w.Header().Set("X-Diagnostics-Cached-At", cachedAt.UTC().Format(time.RFC3339)) + } + if err := json.NewEncoder(w).Encode(diag); err != nil { + log.Error().Err(err).Msg("Failed to encode diagnostics") + http.Error(w, "Failed to generate diagnostics", http.StatusInternalServerError) + } +} + +func (r *Router) computeDiagnostics(ctx context.Context) DiagnosticsInfo { diag := DiagnosticsInfo{ Errors: []string{}, } @@ -325,7 +403,6 @@ func (r *Router) handleDiagnostics(w http.ResponseWriter, req *http.Request) { nodeDiag.Connected = false nodeDiag.Error = err.Error() } else { - // Try to get nodes first (this should work for both clustered and standalone) nodes, err := client.GetNodes(ctx) if err != nil { nodeDiag.Connected = false @@ -333,13 +410,11 @@ func (r *Router) handleDiagnostics(w http.ResponseWriter, req *http.Request) { } else { nodeDiag.Connected = true - // Set node details if len(nodes) > 0 { nodeDiag.Details = &NodeDetails{ NodeCount: len(nodes), } - // Get version from first node if status, err := client.GetNodeStatus(ctx, nodes[0].Node); err == nil && status != nil { if status.PVEVersion != "" { nodeDiag.Details.Version = status.PVEVersion @@ -347,24 +422,14 @@ func (r *Router) handleDiagnostics(w http.ResponseWriter, req *http.Request) { } } - // Try to get cluster status (this may fail for standalone nodes, which is OK) if clusterStatus, err := client.GetClusterStatus(ctx); err == nil { - nodeDiag.ClusterInfo = &ClusterInfo{ - Nodes: len(clusterStatus), - } + nodeDiag.ClusterInfo = &ClusterInfo{Nodes: len(clusterStatus)} } else { - // Standalone node or cluster status not available - // This is not an error - standalone nodes don't have cluster status log.Debug().Str("node", node.Name).Msg("Cluster status not available (likely standalone node)") - nodeDiag.ClusterInfo = &ClusterInfo{ - Nodes: 1, // Standalone node - } + nodeDiag.ClusterInfo = &ClusterInfo{Nodes: 1} } - // Run VM disk monitoring check nodeDiag.VMDiskCheck = r.checkVMDiskMonitoring(ctx, client, node.Name) - - // Run physical disk check nodeDiag.PhysicalDisks = r.checkPhysicalDisks(ctx, client, node.Name) } } @@ -380,7 +445,6 @@ func (r *Router) handleDiagnostics(w http.ResponseWriter, req *http.Request) { Host: pbsNode.Host, } - // Test connection testCfg := pbs.ClientConfig{ Host: pbsNode.Host, User: pbsNode.User, @@ -396,15 +460,12 @@ func (r *Router) handleDiagnostics(w http.ResponseWriter, req *http.Request) { pbsDiag.Connected = false pbsDiag.Error = err.Error() } else { - // Try to get version if version, err := client.GetVersion(ctx); err != nil { pbsDiag.Connected = false pbsDiag.Error = "Connection established but version check failed: " + err.Error() } else { pbsDiag.Connected = true - pbsDiag.Details = &PBSDetails{ - Version: version.Version, - } + pbsDiag.Details = &PBSDetails{Version: version.Version} } } @@ -414,7 +475,6 @@ func (r *Router) handleDiagnostics(w http.ResponseWriter, req *http.Request) { diag.DockerAgents = buildDockerAgentDiagnostic(r.monitor, diag.Version) diag.Alerts = buildAlertsDiagnostic(r.monitor) - // Include cached monitor snapshots for memory diagnostics if available if r.monitor != nil { snapshots := r.monitor.GetDiagnosticSnapshots() if len(snapshots.Nodes) > 0 { @@ -473,14 +533,7 @@ func (r *Router) handleDiagnostics(w http.ResponseWriter, req *http.Request) { } } - // Add any recent errors from logs (this would need a log collector) - // For now, just check basic connectivity - - w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(diag); err != nil { - log.Error().Err(err).Msg("Failed to encode diagnostics") - http.Error(w, "Failed to generate diagnostics", http.StatusInternalServerError) - } + return diag } func buildTemperatureProxyDiagnostic(cfg *config.Config, legacyDetected, recommendProxy bool) *TemperatureProxyDiagnostic { diff --git a/internal/api/http_metrics.go b/internal/api/http_metrics.go new file mode 100644 index 000000000..5ce0a5dc3 --- /dev/null +++ b/internal/api/http_metrics.go @@ -0,0 +1,152 @@ +package api + +import ( + "strconv" + "strings" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +var ( + httpMetricsOnce sync.Once + + apiRequestDuration *prometheus.HistogramVec + apiRequestTotal *prometheus.CounterVec + apiRequestErrors *prometheus.CounterVec +) + +func initHTTPMetrics() { + apiRequestDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "pulse", + Subsystem: "http", + Name: "request_duration_seconds", + Help: "HTTP request duration observed at the API layer.", + Buckets: []float64{0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10}, + }, + []string{"method", "route", "status"}, + ) + + apiRequestTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pulse", + Subsystem: "http", + Name: "requests_total", + Help: "Total number of HTTP requests handled by the API.", + }, + []string{"method", "route", "status"}, + ) + + apiRequestErrors = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pulse", + Subsystem: "http", + Name: "request_errors_total", + Help: "Total number of HTTP errors surfaced to clients.", + }, + []string{"method", "route", "status_class"}, + ) + + prometheus.MustRegister(apiRequestDuration, apiRequestTotal, apiRequestErrors) +} + +func recordAPIRequest(method, route string, status int, elapsed time.Duration) { + httpMetricsOnce.Do(initHTTPMetrics) + + statusCode := strconv.Itoa(status) + + apiRequestDuration.WithLabelValues(method, route, statusCode).Observe(elapsed.Seconds()) + apiRequestTotal.WithLabelValues(method, route, statusCode).Inc() + + if status >= 400 { + apiRequestErrors.WithLabelValues(method, route, classifyStatus(status)).Inc() + } +} + +func classifyStatus(status int) string { + switch { + case status >= 500: + return "server_error" + case status >= 400: + return "client_error" + default: + return "none" + } +} + +func normalizeRoute(path string) string { + if path == "" || path == "/" { + return "/" + } + + // Strip query parameters. + if idx := strings.Index(path, "?"); idx >= 0 { + path = path[:idx] + } + + segments := strings.Split(path, "/") + normSegments := make([]string, 0, len(segments)) + count := 0 + for _, seg := range segments { + if seg == "" { + continue + } + count++ + if count > 5 { + break + } + normSegments = append(normSegments, normalizeSegment(seg)) + } + + if len(normSegments) == 0 { + return "/" + } + + return "/" + strings.Join(normSegments, "/") +} + +func normalizeSegment(seg string) string { + if isNumeric(seg) { + return ":id" + } + if looksLikeUUID(seg) { + return ":uuid" + } + if len(seg) > 32 { + return ":token" + } + return seg +} + +func isNumeric(s string) bool { + if s == "" { + return false + } + for _, r := range s { + if r < '0' || r > '9' { + return false + } + } + return true +} + +func looksLikeUUID(s string) bool { + if len(s) != 36 { + return false + } + for i, r := range s { + switch { + case r == '-': + if i != 8 && i != 13 && i != 18 && i != 23 { + return false + } + case (r >= '0' && r <= '9') || (r >= 'a' && r <= 'f') || (r >= 'A' && r <= 'F'): + continue + default: + return false + } + } + return true +} diff --git a/internal/api/middleware.go b/internal/api/middleware.go index 46a546c6a..2832ac309 100644 --- a/internal/api/middleware.go +++ b/internal/api/middleware.go @@ -53,6 +53,15 @@ func ErrorHandler(next http.Handler) http.Handler { rw := &responseWriter{ResponseWriter: w, statusCode: http.StatusOK} rw.Header().Set("X-Request-ID", requestID) + start := time.Now() + routeLabel := normalizeRoute(r.URL.Path) + method := r.Method + + defer func() { + elapsed := time.Since(start) + recordAPIRequest(method, routeLabel, rw.StatusCode(), elapsed) + }() + // Recover from panics defer func() { if err := recover(); err != nil { @@ -64,7 +73,7 @@ func ErrorHandler(next http.Handler) http.Handler { Bytes("stack", debug.Stack()). Msg("Panic recovered in API handler") - writeErrorResponse(w, http.StatusInternalServerError, "internal_error", + writeErrorResponse(rw, http.StatusInternalServerError, "internal_error", "An unexpected error occurred", nil) } }() @@ -163,6 +172,13 @@ func (rw *responseWriter) Write(b []byte) (int, error) { return rw.ResponseWriter.Write(b) } +func (rw *responseWriter) StatusCode() int { + if rw == nil { + return http.StatusInternalServerError + } + return rw.statusCode +} + // Hijack implements http.Hijacker interface func (rw *responseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { hijacker, ok := rw.ResponseWriter.(http.Hijacker) diff --git a/internal/logging/logging.go b/internal/logging/logging.go index b94b4c0db..be4705615 100644 --- a/internal/logging/logging.go +++ b/internal/logging/logging.go @@ -1,10 +1,13 @@ package logging import ( + "compress/gzip" "context" "fmt" "io" "os" + "path/filepath" + "strconv" "strings" "sync" "time" @@ -28,6 +31,10 @@ type Config struct { Format string // "json", "console", or "auto" Level string // "debug", "info", "warn", "error" Component string // optional component name + FilePath string // optional log file path + MaxSizeMB int // rotate after this size (MB) + MaxAgeDays int // keep rotated logs for this many days + Compress bool // gzip rotated logs } // Option customizes logger construction. @@ -63,6 +70,11 @@ func Init(cfg Config) zerolog.Logger { zerolog.ErrorStackMarshaler = pkgerrors.MarshalStack writer := selectWriter(cfg.Format) + if fileWriter, err := newRollingFileWriter(cfg); err != nil { + fmt.Fprintf(os.Stderr, "logging: unable to configure file output: %v\n", err) + } else if fileWriter != nil { + writer = io.MultiWriter(writer, fileWriter) + } component := strings.TrimSpace(cfg.Component) contextBuilder := zerolog.New(writer).With().Timestamp() @@ -93,6 +105,27 @@ func InitFromConfig(ctx context.Context, cfg Config) (zerolog.Logger, error) { if envFormat := os.Getenv("LOG_FORMAT"); envFormat != "" { cfg.Format = envFormat } + if envFile := os.Getenv("LOG_FILE"); envFile != "" { + cfg.FilePath = envFile + } + if envSize := os.Getenv("LOG_MAX_SIZE"); envSize != "" { + if size, err := strconv.Atoi(envSize); err == nil { + cfg.MaxSizeMB = size + } + } + if envAge := os.Getenv("LOG_MAX_AGE"); envAge != "" { + if age, err := strconv.Atoi(envAge); err == nil { + cfg.MaxAgeDays = age + } + } + if envCompress := os.Getenv("LOG_COMPRESS"); envCompress != "" { + switch strings.ToLower(strings.TrimSpace(envCompress)) { + case "0", "false", "no": + cfg.Compress = false + default: + cfg.Compress = true + } + } if !isValidLevel(cfg.Level) { return zerolog.Logger{}, fmt.Errorf("invalid log level %q: must be debug, info, warn, or error", cfg.Level) @@ -305,3 +338,168 @@ func enrichWithRequestID(logger zerolog.Logger, ctx context.Context) zerolog.Log } return logger } + +type rollingFileWriter struct { + mu sync.Mutex + path string + file *os.File + currentSize int64 + maxBytes int64 + maxAge time.Duration + compress bool +} + +func newRollingFileWriter(cfg Config) (io.Writer, error) { + path := strings.TrimSpace(cfg.FilePath) + if path == "" { + return nil, nil + } + + dir := filepath.Dir(path) + if err := os.MkdirAll(dir, 0o755); err != nil { + return nil, fmt.Errorf("create log directory: %w", err) + } + + writer := &rollingFileWriter{ + path: path, + maxBytes: int64(cfg.MaxSizeMB) * 1024 * 1024, + maxAge: time.Duration(cfg.MaxAgeDays) * 24 * time.Hour, + compress: cfg.Compress, + } + + if writer.maxBytes <= 0 { + writer.maxBytes = 100 * 1024 * 1024 // default 100MB + } + + if err := writer.openOrCreateLocked(); err != nil { + return nil, err + } + writer.cleanupOldFiles() + return writer, nil +} + +func (w *rollingFileWriter) Write(p []byte) (int, error) { + w.mu.Lock() + defer w.mu.Unlock() + + if err := w.openOrCreateLocked(); err != nil { + return 0, err + } + + if w.maxBytes > 0 && w.currentSize+int64(len(p)) > w.maxBytes { + if err := w.rotateLocked(); err != nil { + return 0, err + } + } + + n, err := w.file.Write(p) + if n > 0 { + w.currentSize += int64(n) + } + return n, err +} + +func (w *rollingFileWriter) openOrCreateLocked() error { + if w.file != nil { + return nil + } + + file, err := os.OpenFile(w.path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o600) + if err != nil { + return fmt.Errorf("open log file: %w", err) + } + w.file = file + + info, err := file.Stat() + if err != nil { + w.currentSize = 0 + return nil + } + w.currentSize = info.Size() + return nil +} + +func (w *rollingFileWriter) rotateLocked() error { + if err := w.closeLocked(); err != nil { + return err + } + + if _, err := os.Stat(w.path); err == nil { + rotated := fmt.Sprintf("%s.%s", w.path, time.Now().Format("20060102-150405")) + if err := os.Rename(w.path, rotated); err == nil { + if w.compress { + go compressAndRemove(rotated) + } + } + } + + w.cleanupOldFiles() + return w.openOrCreateLocked() +} + +func (w *rollingFileWriter) closeLocked() error { + if w.file == nil { + return nil + } + err := w.file.Close() + w.file = nil + w.currentSize = 0 + return err +} + +func (w *rollingFileWriter) cleanupOldFiles() { + if w.maxAge <= 0 { + return + } + + dir := filepath.Dir(w.path) + base := filepath.Base(w.path) + prefix := base + "." + cutoff := time.Now().Add(-w.maxAge) + + entries, err := os.ReadDir(dir) + if err != nil { + return + } + + for _, entry := range entries { + name := entry.Name() + if !strings.HasPrefix(name, prefix) { + continue + } + info, err := entry.Info() + if err != nil { + continue + } + if info.ModTime().Before(cutoff) { + _ = os.Remove(filepath.Join(dir, name)) + } + } +} + +func compressAndRemove(path string) { + in, err := os.Open(path) + if err != nil { + return + } + defer in.Close() + + outPath := path + ".gz" + out, err := os.OpenFile(outPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o600) + if err != nil { + return + } + + gw := gzip.NewWriter(out) + if _, err = io.Copy(gw, in); err != nil { + gw.Close() + out.Close() + return + } + if err := gw.Close(); err != nil { + out.Close() + return + } + out.Close() + _ = os.Remove(path) +} diff --git a/internal/monitoring/metrics.go b/internal/monitoring/metrics.go index f31bb0374..24fd449c6 100644 --- a/internal/monitoring/metrics.go +++ b/internal/monitoring/metrics.go @@ -3,6 +3,7 @@ package monitoring import ( stdErrors "errors" "fmt" + "strings" "sync" "time" @@ -19,9 +20,24 @@ type PollMetrics struct { staleness *prometheus.GaugeVec queueDepth prometheus.Gauge inflight *prometheus.GaugeVec + nodePollDuration *prometheus.HistogramVec + nodePollResults *prometheus.CounterVec + nodePollErrors *prometheus.CounterVec + nodeLastSuccess *prometheus.GaugeVec + nodeStaleness *prometheus.GaugeVec + schedulerQueueReady prometheus.Gauge + schedulerQueueDepthByType *prometheus.GaugeVec + schedulerQueueWait *prometheus.HistogramVec + schedulerDeadLetterDepth *prometheus.GaugeVec + schedulerBreakerState *prometheus.GaugeVec + schedulerBreakerFailureCount *prometheus.GaugeVec + schedulerBreakerRetrySeconds *prometheus.GaugeVec mu sync.RWMutex lastSuccessByKey map[string]time.Time + nodeLastSuccessByKey map[string]time.Time + lastQueueTypeKeys map[string]struct{} + lastDLQKeys map[string]struct{} pending int } @@ -102,7 +118,119 @@ func newPollMetrics() *PollMetrics { }, []string{"instance_type"}, ), - lastSuccessByKey: make(map[string]time.Time), + nodePollDuration: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "pulse", + Subsystem: "monitor", + Name: "node_poll_duration_seconds", + Help: "Duration of polling operations per node.", + Buckets: []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10, 15, 20, 30}, + }, + []string{"instance_type", "instance", "node"}, + ), + nodePollResults: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pulse", + Subsystem: "monitor", + Name: "node_poll_total", + Help: "Total polling attempts per node partitioned by result.", + }, + []string{"instance_type", "instance", "node", "result"}, + ), + nodePollErrors: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pulse", + Subsystem: "monitor", + Name: "node_poll_errors_total", + Help: "Polling failures per node grouped by error type.", + }, + []string{"instance_type", "instance", "node", "error_type"}, + ), + nodeLastSuccess: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pulse", + Subsystem: "monitor", + Name: "node_poll_last_success_timestamp", + Help: "Unix timestamp of the last successful poll for a node.", + }, + []string{"instance_type", "instance", "node"}, + ), + nodeStaleness: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pulse", + Subsystem: "monitor", + Name: "node_poll_staleness_seconds", + Help: "Seconds since the last successful poll for a node. -1 indicates no successes yet.", + }, + []string{"instance_type", "instance", "node"}, + ), + schedulerQueueReady: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "pulse", + Subsystem: "scheduler", + Name: "queue_due_soon", + Help: "Number of tasks due to run within the immediate window (12s).", + }, + ), + schedulerQueueDepthByType: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pulse", + Subsystem: "scheduler", + Name: "queue_depth", + Help: "Current scheduler queue depth partitioned by instance type.", + }, + []string{"instance_type"}, + ), + schedulerQueueWait: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "pulse", + Subsystem: "scheduler", + Name: "queue_wait_seconds", + Help: "Observed wait time between task readiness and execution.", + Buckets: []float64{0.01, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 60}, + }, + []string{"instance_type"}, + ), + schedulerDeadLetterDepth: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pulse", + Subsystem: "scheduler", + Name: "dead_letter_depth", + Help: "Number of tasks currently parked in the dead-letter queue per instance.", + }, + []string{"instance_type", "instance"}, + ), + schedulerBreakerState: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pulse", + Subsystem: "scheduler", + Name: "breaker_state", + Help: "Circuit breaker state encoded as 0=closed, 1=half-open, 2=open, -1=unknown.", + }, + []string{"instance_type", "instance"}, + ), + schedulerBreakerFailureCount: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pulse", + Subsystem: "scheduler", + Name: "breaker_failure_count", + Help: "Current consecutive failure count tracked by the circuit breaker.", + }, + []string{"instance_type", "instance"}, + ), + schedulerBreakerRetrySeconds: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pulse", + Subsystem: "scheduler", + Name: "breaker_retry_seconds", + Help: "Seconds until the circuit breaker will allow another attempt.", + }, + []string{"instance_type", "instance"}, + ), + lastSuccessByKey: make(map[string]time.Time), + nodeLastSuccessByKey: make(map[string]time.Time), + lastQueueTypeKeys: make(map[string]struct{}), + lastDLQKeys: make(map[string]struct{}), } prometheus.MustRegister( @@ -113,11 +241,209 @@ func newPollMetrics() *PollMetrics { pm.staleness, pm.queueDepth, pm.inflight, + pm.nodePollDuration, + pm.nodePollResults, + pm.nodePollErrors, + pm.nodeLastSuccess, + pm.nodeStaleness, + pm.schedulerQueueReady, + pm.schedulerQueueDepthByType, + pm.schedulerQueueWait, + pm.schedulerDeadLetterDepth, + pm.schedulerBreakerState, + pm.schedulerBreakerFailureCount, + pm.schedulerBreakerRetrySeconds, ) return pm } +// NodePollResult captures timing and outcome for a specific node within a poll cycle. +type NodePollResult struct { + InstanceName string + InstanceType string + NodeName string + Success bool + Error error + StartTime time.Time + EndTime time.Time +} + +// RecordNodeResult records metrics for an individual node poll. +func (pm *PollMetrics) RecordNodeResult(result NodePollResult) { + if pm == nil { + return + } + + nodeLabel := strings.TrimSpace(result.NodeName) + if nodeLabel == "" { + nodeLabel = "unknown-node" + } + + labels := prometheus.Labels{ + "instance_type": result.InstanceType, + "instance": result.InstanceName, + "node": nodeLabel, + } + + duration := result.EndTime.Sub(result.StartTime).Seconds() + if duration < 0 { + duration = 0 + } + pm.nodePollDuration.With(labels).Observe(duration) + + resultValue := "success" + if !result.Success { + resultValue = "error" + } + pm.nodePollResults.With(prometheus.Labels{ + "instance_type": result.InstanceType, + "instance": result.InstanceName, + "node": nodeLabel, + "result": resultValue, + }).Inc() + + if result.Success { + pm.nodeLastSuccess.With(labels).Set(float64(result.EndTime.Unix())) + pm.storeNodeLastSuccess(result.InstanceType, result.InstanceName, nodeLabel, result.EndTime) + pm.updateNodeStaleness(result.InstanceType, result.InstanceName, nodeLabel, 0) + return + } + + errType := pm.classifyError(result.Error) + pm.nodePollErrors.With(prometheus.Labels{ + "instance_type": result.InstanceType, + "instance": result.InstanceName, + "node": nodeLabel, + "error_type": errType, + }).Inc() + + if last, ok := pm.lastNodeSuccessFor(result.InstanceType, result.InstanceName, nodeLabel); ok && !last.IsZero() { + staleness := result.EndTime.Sub(last).Seconds() + if staleness < 0 { + staleness = 0 + } + pm.updateNodeStaleness(result.InstanceType, result.InstanceName, nodeLabel, staleness) + } else { + pm.updateNodeStaleness(result.InstanceType, result.InstanceName, nodeLabel, -1) + } +} + +// RecordQueueWait observes the time a task spent waiting in the scheduler queue. +func (pm *PollMetrics) RecordQueueWait(instanceType string, wait time.Duration) { + if pm == nil { + return + } + if wait < 0 { + wait = 0 + } + instanceType = strings.TrimSpace(instanceType) + if instanceType == "" { + instanceType = "unknown" + } + pm.schedulerQueueWait.WithLabelValues(instanceType).Observe(wait.Seconds()) +} + +// UpdateQueueSnapshot updates scheduler queue depth metrics. +func (pm *PollMetrics) UpdateQueueSnapshot(snapshot QueueSnapshot) { + if pm == nil { + return + } + + pm.schedulerQueueReady.Set(float64(snapshot.DueWithinSeconds)) + + current := make(map[string]struct{}, len(snapshot.PerType)) + for instanceType, depth := range snapshot.PerType { + key := strings.TrimSpace(instanceType) + if key == "" { + key = "unknown" + } + pm.schedulerQueueDepthByType.WithLabelValues(key).Set(float64(depth)) + current[key] = struct{}{} + } + + pm.mu.Lock() + for key := range pm.lastQueueTypeKeys { + if _, ok := current[key]; !ok { + pm.schedulerQueueDepthByType.WithLabelValues(key).Set(0) + } + } + pm.lastQueueTypeKeys = current + pm.mu.Unlock() +} + +// UpdateDeadLetterCounts refreshes dead-letter queue gauges based on the provided tasks. +func (pm *PollMetrics) UpdateDeadLetterCounts(tasks []DeadLetterTask) { + if pm == nil { + return + } + + current := make(map[string]float64) + for _, task := range tasks { + instType := strings.TrimSpace(task.Type) + if instType == "" { + instType = "unknown" + } + inst := strings.TrimSpace(task.Instance) + if inst == "" { + inst = "unknown" + } + key := instType + "::" + inst + current[key] = current[key] + 1 + } + + pm.mu.Lock() + prev := pm.lastDLQKeys + pm.lastDLQKeys = make(map[string]struct{}, len(current)) + pm.mu.Unlock() + + for key, count := range current { + instType, inst := splitInstanceKey(key) + pm.schedulerDeadLetterDepth.WithLabelValues(instType, inst).Set(count) + } + + pm.mu.Lock() + for key := range current { + pm.lastDLQKeys[key] = struct{}{} + } + for key := range prev { + if _, ok := current[key]; !ok { + instType, inst := splitInstanceKey(key) + pm.schedulerDeadLetterDepth.WithLabelValues(instType, inst).Set(0) + } + } + pm.mu.Unlock() +} + +// SetBreakerState updates circuit breaker metrics for a specific instance. +func (pm *PollMetrics) SetBreakerState(instanceType, instance, state string, failures int, retryAt time.Time) { + if pm == nil { + return + } + + instType := strings.TrimSpace(instanceType) + if instType == "" { + instType = "unknown" + } + inst := strings.TrimSpace(instance) + if inst == "" { + inst = "unknown" + } + + value := breakerStateToValue(state) + pm.schedulerBreakerState.WithLabelValues(instType, inst).Set(value) + pm.schedulerBreakerFailureCount.WithLabelValues(instType, inst).Set(float64(failures)) + + retrySeconds := 0.0 + if !retryAt.IsZero() { + retrySeconds = retryAt.Sub(time.Now()).Seconds() + if retrySeconds < 0 { + retrySeconds = 0 + } + } + pm.schedulerBreakerRetrySeconds.WithLabelValues(instType, inst).Set(retrySeconds) +} + // RecordResult records metrics for a polling result. func (pm *PollMetrics) RecordResult(result PollResult) { if pm == nil { @@ -249,6 +575,57 @@ func (pm *PollMetrics) key(instanceType, instance string) string { return fmt.Sprintf("%s::%s", instanceType, instance) } +func (pm *PollMetrics) storeNodeLastSuccess(instanceType, instance, node string, ts time.Time) { + pm.mu.Lock() + pm.nodeLastSuccessByKey[pm.nodeKey(instanceType, instance, node)] = ts + pm.mu.Unlock() +} + +func (pm *PollMetrics) lastNodeSuccessFor(instanceType, instance, node string) (time.Time, bool) { + pm.mu.RLock() + ts, ok := pm.nodeLastSuccessByKey[pm.nodeKey(instanceType, instance, node)] + pm.mu.RUnlock() + return ts, ok +} + +func (pm *PollMetrics) updateNodeStaleness(instanceType, instance, node string, value float64) { + pm.nodeStaleness.WithLabelValues(instanceType, instance, node).Set(value) +} + +func (pm *PollMetrics) nodeKey(instanceType, instance, node string) string { + return fmt.Sprintf("%s::%s::%s", instanceType, instance, node) +} + +func splitInstanceKey(key string) (string, string) { + parts := strings.SplitN(key, "::", 2) + if len(parts) == 2 { + if parts[0] == "" { + parts[0] = "unknown" + } + if parts[1] == "" { + parts[1] = "unknown" + } + return parts[0], parts[1] + } + if key == "" { + return "unknown", "unknown" + } + return "unknown", key +} + +func breakerStateToValue(state string) float64 { + switch strings.ToLower(state) { + case "closed": + return 0 + case "half_open", "half-open": + return 1 + case "open": + return 2 + default: + return -1 + } +} + func (pm *PollMetrics) classifyError(err error) string { if err == nil { return "none" diff --git a/internal/monitoring/monitor.go b/internal/monitoring/monitor.go index 619c7aea5..b02ced64c 100644 --- a/internal/monitoring/monitor.go +++ b/internal/monitoring/monitor.go @@ -22,6 +22,7 @@ import ( "github.com/rcourtman/pulse-go-rewrite/internal/config" "github.com/rcourtman/pulse-go-rewrite/internal/discovery" "github.com/rcourtman/pulse-go-rewrite/internal/errors" + "github.com/rcourtman/pulse-go-rewrite/internal/logging" "github.com/rcourtman/pulse-go-rewrite/internal/mock" "github.com/rcourtman/pulse-go-rewrite/internal/models" "github.com/rcourtman/pulse-go-rewrite/internal/notifications" @@ -30,6 +31,7 @@ import ( "github.com/rcourtman/pulse-go-rewrite/pkg/pbs" "github.com/rcourtman/pulse-go-rewrite/pkg/pmg" "github.com/rcourtman/pulse-go-rewrite/pkg/proxmox" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) @@ -109,7 +111,7 @@ func mergeNVMeTempsIntoDisks(disks []models.PhysicalDisk, nodes []models.Node) [ } nvmeTempsByNode := make(map[string][]models.NVMeTemp) - for _, node := range nodes { + for _, node := range nodes { if node.Temperature == nil || !node.Temperature.Available || len(node.Temperature.NVMe) == 0 { continue } @@ -294,10 +296,12 @@ func (r *realExecutor) Execute(ctx context.Context, task PollTask) { } r.monitor.pollPMGInstance(ctx, task.InstanceName, task.PMGClient) default: - log.Debug(). - Str("instance", task.InstanceName). - Str("type", task.InstanceType). - Msg("PollExecutor received unsupported task type") + if logging.IsLevelEnabled(zerolog.DebugLevel) { + log.Debug(). + Str("instance", task.InstanceName). + Str("type", task.InstanceType). + Msg("PollExecutor received unsupported task type") + } } } @@ -598,7 +602,9 @@ func (m *Monitor) RemoveDockerHost(hostID string) (models.DockerHost, error) { host, removed := m.state.RemoveDockerHost(hostID) if !removed { - log.Debug().Str("dockerHostID", hostID).Msg("Docker host not present in state during removal; proceeding to clear alerts") + if logging.IsLevelEnabled(zerolog.DebugLevel) { + log.Debug().Str("dockerHostID", hostID).Msg("Docker host not present in state during removal; proceeding to clear alerts") + } host = models.DockerHost{ ID: hostID, Hostname: hostID, @@ -2230,12 +2236,16 @@ func (m *Monitor) poll(ctx context.Context, wsHub *websocket.Hub) { currentCount := atomic.AddInt32(&m.activePollCount, 1) if currentCount > 2 { atomic.AddInt32(&m.activePollCount, -1) - log.Debug().Int32("activePolls", currentCount-1).Msg("Too many concurrent polls, skipping") + if logging.IsLevelEnabled(zerolog.DebugLevel) { + log.Debug().Int32("activePolls", currentCount-1).Msg("Too many concurrent polls, skipping") + } return } defer atomic.AddInt32(&m.activePollCount, -1) - log.Debug().Msg("Starting polling cycle") + if logging.IsLevelEnabled(zerolog.DebugLevel) { + log.Debug().Msg("Starting polling cycle") + } startTime := time.Now() now := startTime @@ -2259,7 +2269,9 @@ func (m *Monitor) poll(ctx context.Context, wsHub *websocket.Hub) { m.pollCounter++ m.mu.Unlock() - log.Debug().Dur("duration", time.Since(startTime)).Msg("Polling cycle completed") + if logging.IsLevelEnabled(zerolog.DebugLevel) { + log.Debug().Dur("duration", time.Since(startTime)).Msg("Polling cycle completed") + } // Broadcasting is now handled by the timer in Start() } @@ -2268,7 +2280,9 @@ func (m *Monitor) poll(ctx context.Context, wsHub *websocket.Hub) { // This keeps WebSocket broadcasts aligned with in-memory acknowledgement updates. func (m *Monitor) syncAlertsToState() { if m.pruneStaleDockerAlerts() { - log.Debug().Msg("Pruned stale docker alerts during sync") + if logging.IsLevelEnabled(zerolog.DebugLevel) { + log.Debug().Msg("Pruned stale docker alerts during sync") + } } activeAlerts := m.alertManager.GetActiveAlerts() @@ -2290,7 +2304,7 @@ func (m *Monitor) syncAlertsToState() { AckTime: alert.AckTime, AckUser: alert.AckUser, }) - if alert.Acknowledged { + if alert.Acknowledged && logging.IsLevelEnabled(zerolog.DebugLevel) { log.Debug().Str("alertID", alert.ID).Interface("ackTime", alert.AckTime).Msg("Syncing acknowledged alert") } } @@ -2396,11 +2410,15 @@ func (m *Monitor) startTaskWorkers(ctx context.Context, workers int) { } func (m *Monitor) taskWorker(ctx context.Context, id int) { - log.Debug().Int("worker", id).Msg("Task worker started") + if logging.IsLevelEnabled(zerolog.DebugLevel) { + log.Debug().Int("worker", id).Msg("Task worker started") + } for { task, ok := m.taskQueue.WaitNext(ctx) if !ok { - log.Debug().Int("worker", id).Msg("Task worker stopping") + if logging.IsLevelEnabled(zerolog.DebugLevel) { + log.Debug().Int("worker", id).Msg("Task worker stopping") + } return } @@ -2413,13 +2431,30 @@ func (m *Monitor) taskWorker(ctx context.Context, id int) { func (m *Monitor) executeScheduledTask(ctx context.Context, task ScheduledTask) { if !m.allowExecution(task) { - log.Debug(). - Str("instance", task.InstanceName). - Str("type", string(task.InstanceType)). - Msg("Task blocked by circuit breaker") + if logging.IsLevelEnabled(zerolog.DebugLevel) { + log.Debug(). + Str("instance", task.InstanceName). + Str("type", string(task.InstanceType)). + Msg("Task blocked by circuit breaker") + } return } + if m.pollMetrics != nil { + wait := time.Duration(0) + if !task.NextRun.IsZero() { + wait = time.Since(task.NextRun) + if wait < 0 { + wait = 0 + } + } + instanceType := string(task.InstanceType) + if strings.TrimSpace(instanceType) == "" { + instanceType = "unknown" + } + m.pollMetrics.RecordQueueWait(instanceType, wait) + } + executor := m.getExecutor() if executor == nil { log.Error(). @@ -2571,6 +2606,7 @@ func (m *Monitor) sendToDeadLetter(task ScheduledTask, err error) { next.Interval = 30 * time.Minute next.NextRun = time.Now().Add(next.Interval) m.deadLetterQueue.Upsert(next) + m.updateDeadLetterMetrics() key := schedulerKey(task.InstanceType, task.InstanceName) now := time.Now() @@ -2606,6 +2642,30 @@ func classifyDLQReason(err error) string { return "permanent_failure" } +func (m *Monitor) updateDeadLetterMetrics() { + if m.pollMetrics == nil || m.deadLetterQueue == nil { + return + } + + size := m.deadLetterQueue.Size() + if size <= 0 { + m.pollMetrics.UpdateDeadLetterCounts(nil) + return + } + + tasks := m.deadLetterQueue.PeekAll(size) + m.pollMetrics.UpdateDeadLetterCounts(tasks) +} + +func (m *Monitor) updateBreakerMetric(instanceType InstanceType, instance string, breaker *circuitBreaker) { + if m.pollMetrics == nil || breaker == nil { + return + } + + state, failures, retryAt, _, _ := breaker.stateDetails() + m.pollMetrics.SetBreakerState(string(instanceType), instance, state, failures, retryAt) +} + func (m *Monitor) randomFloat() float64 { if m.rng == nil { m.rng = rand.New(rand.NewSource(time.Now().UnixNano())) @@ -2617,7 +2677,9 @@ func (m *Monitor) updateQueueDepthMetric() { if m.pollMetrics == nil || m.taskQueue == nil { return } - m.pollMetrics.SetQueueDepth(m.taskQueue.Size()) + snapshot := m.taskQueue.Snapshot() + m.pollMetrics.SetQueueDepth(snapshot.Depth) + m.pollMetrics.UpdateQueueSnapshot(snapshot) } func (m *Monitor) allowExecution(task ScheduledTask) bool { @@ -2626,7 +2688,9 @@ func (m *Monitor) allowExecution(task ScheduledTask) bool { } key := schedulerKey(task.InstanceType, task.InstanceName) breaker := m.ensureBreaker(key) - return breaker.allow(time.Now()) + allowed := breaker.allow(time.Now()) + m.updateBreakerMetric(task.InstanceType, task.InstanceName, breaker) + return allowed } func (m *Monitor) ensureBreaker(key string) *circuitBreaker { @@ -2691,6 +2755,7 @@ func (m *Monitor) recordTaskResult(instanceType InstanceType, instance string, p m.mu.Unlock() if breaker != nil { breaker.recordSuccess() + m.updateBreakerMetric(instanceType, instance, breaker) } return } @@ -2721,6 +2786,7 @@ func (m *Monitor) recordTaskResult(instanceType InstanceType, instance string, p m.mu.Unlock() if breaker != nil { breaker.recordFailure(now) + m.updateBreakerMetric(instanceType, instance, breaker) } } @@ -2751,6 +2817,9 @@ func (m *Monitor) SchedulerHealth() SchedulerHealthResponse { // Queue snapshot if m.taskQueue != nil { response.Queue = m.taskQueue.Snapshot() + if m.pollMetrics != nil { + m.pollMetrics.UpdateQueueSnapshot(response.Queue) + } } // Dead-letter queue snapshot @@ -2771,6 +2840,7 @@ func (m *Monitor) SchedulerHealth() SchedulerHealthResponse { Count: m.deadLetterQueue.Size(), Tasks: deadLetterTasks, } + m.updateDeadLetterMetrics() } // Circuit breaker snapshots @@ -2834,6 +2904,19 @@ func (m *Monitor) SchedulerHealth() SchedulerHealthResponse { } } m.mu.RUnlock() + for key, breaker := range breakerRefs { + instanceType := InstanceType("unknown") + instanceName := key + if parts := strings.SplitN(key, "::", 2); len(parts) == 2 { + if parts[0] != "" { + instanceType = InstanceType(parts[0]) + } + if parts[1] != "" { + instanceName = parts[1] + } + } + m.updateBreakerMetric(instanceType, instanceName, breaker) + } keySet := make(map[string]struct{}) for k := range instanceInfos { @@ -3010,6 +3093,7 @@ func isTransientError(err error) bool { // pollPVEInstance polls a single PVE instance func (m *Monitor) pollPVEInstance(ctx context.Context, instanceName string, client PVEClientInterface) { start := time.Now() + debugEnabled := logging.IsLevelEnabled(zerolog.DebugLevel) var pollErr error if m.pollMetrics != nil { m.pollMetrics.IncInFlight("pve") @@ -3040,12 +3124,16 @@ func (m *Monitor) pollPVEInstance(ctx context.Context, instanceName string, clie select { case <-ctx.Done(): pollErr = ctx.Err() - log.Debug().Str("instance", instanceName).Msg("Polling cancelled") + if debugEnabled { + log.Debug().Str("instance", instanceName).Msg("Polling cancelled") + } return default: } - log.Debug().Str("instance", instanceName).Msg("Polling PVE instance") + if debugEnabled { + log.Debug().Str("instance", instanceName).Msg("Polling PVE instance") + } // Get instance config var instanceCfg *config.PVEInstance @@ -3130,6 +3218,7 @@ func (m *Monitor) pollPVEInstance(ctx context.Context, instanceName string, clie // Convert to models var modelNodes []models.Node for _, node := range nodes { + nodeStart := time.Now() displayName := getNodeDisplayName(instanceCfg, node.Node) modelNode := models.Node{ @@ -3581,7 +3670,49 @@ func (m *Monitor) pollPVEInstance(ctx context.Context, instanceName string, clie } } - modelNodes = append(modelNodes, modelNode) + if m.pollMetrics != nil { + nodeNameLabel := strings.TrimSpace(node.Node) + if nodeNameLabel == "" { + nodeNameLabel = strings.TrimSpace(modelNode.DisplayName) + } + if nodeNameLabel == "" { + nodeNameLabel = "unknown-node" + } + + success := true + nodeErrReason := "" + health := strings.ToLower(strings.TrimSpace(modelNode.ConnectionHealth)) + if health != "" && health != "healthy" { + success = false + nodeErrReason = fmt.Sprintf("connection health %s", health) + } + + status := strings.ToLower(strings.TrimSpace(modelNode.Status)) + if success && status != "" && status != "online" { + success = false + nodeErrReason = fmt.Sprintf("status %s", status) + } + + var nodeErr error + if !success { + if nodeErrReason == "" { + nodeErrReason = "unknown node error" + } + nodeErr = fmt.Errorf(nodeErrReason) + } + + m.pollMetrics.RecordNodeResult(NodePollResult{ + InstanceName: instanceName, + InstanceType: "pve", + NodeName: nodeNameLabel, + Success: success, + Error: nodeErr, + StartTime: nodeStart, + EndTime: time.Now(), + }) + } + + modelNodes = append(modelNodes, modelNode) } if len(modelNodes) == 0 && len(prevInstanceNodes) > 0 { @@ -4781,6 +4912,7 @@ func (m *Monitor) pollBackupTasks(ctx context.Context, instanceName string, clie // pollPBSInstance polls a single PBS instance func (m *Monitor) pollPBSInstance(ctx context.Context, instanceName string, client *pbs.Client) { start := time.Now() + debugEnabled := logging.IsLevelEnabled(zerolog.DebugLevel) var pollErr error if m.pollMetrics != nil { m.pollMetrics.IncInFlight("pbs") @@ -4809,24 +4941,30 @@ func (m *Monitor) pollPBSInstance(ctx context.Context, instanceName string, clie // Check if context is cancelled select { - case <-ctx.Done(): + case <-ctx.Done(): pollErr = ctx.Err() - log.Debug().Str("instance", instanceName).Msg("Polling cancelled") - return - default: - } + if debugEnabled { + log.Debug().Str("instance", instanceName).Msg("Polling cancelled") + } + return + default: + } - log.Debug().Str("instance", instanceName).Msg("Polling PBS instance") + if debugEnabled { + log.Debug().Str("instance", instanceName).Msg("Polling PBS instance") + } // Get instance config var instanceCfg *config.PBSInstance for _, cfg := range m.config.PBSInstances { if cfg.Name == instanceName { instanceCfg = &cfg - log.Debug(). - Str("instance", instanceName). - Bool("monitorDatastores", cfg.MonitorDatastores). - Msg("Found PBS instance config") + if debugEnabled { + log.Debug(). + Str("instance", instanceName). + Bool("monitorDatastores", cfg.MonitorDatastores). + Msg("Found PBS instance config") + } break } } @@ -4855,13 +4993,17 @@ func (m *Monitor) pollPBSInstance(ctx context.Context, instanceName string, clie m.resetAuthFailures(instanceName, "pbs") m.state.SetConnectionHealth("pbs-"+instanceName, true) - log.Debug(). - Str("instance", instanceName). - Str("version", version.Version). - Bool("monitorDatastores", instanceCfg.MonitorDatastores). - Msg("PBS version retrieved successfully") - } else { - log.Debug().Err(versionErr).Str("instance", instanceName).Msg("Failed to get PBS version, trying fallback") + if debugEnabled { + log.Debug(). + Str("instance", instanceName). + Str("version", version.Version). + Bool("monitorDatastores", instanceCfg.MonitorDatastores). + Msg("PBS version retrieved successfully") + } + } else { + if debugEnabled { + log.Debug().Err(versionErr).Str("instance", instanceName).Msg("Failed to get PBS version, trying fallback") + } ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second) defer cancel2() @@ -4893,7 +5035,9 @@ func (m *Monitor) pollPBSInstance(ctx context.Context, instanceName string, clie // Get node status (CPU, memory, etc.) nodeStatus, err := client.GetNodeStatus(ctx) if err != nil { - log.Debug().Err(err).Str("instance", instanceName).Msg("Could not get PBS node status (may need Sys.Audit permission)") + if debugEnabled { + log.Debug().Err(err).Str("instance", instanceName).Msg("Could not get PBS node status (may need Sys.Audit permission)") + } } else if nodeStatus != nil { pbsInst.CPU = nodeStatus.CPU if nodeStatus.Memory.Total > 0 { @@ -5092,6 +5236,7 @@ func (m *Monitor) pollPBSInstance(ctx context.Context, instanceName string, clie // pollPMGInstance polls a single Proxmox Mail Gateway instance func (m *Monitor) pollPMGInstance(ctx context.Context, instanceName string, client *pmg.Client) { start := time.Now() + debugEnabled := logging.IsLevelEnabled(zerolog.DebugLevel) var pollErr error if m.pollMetrics != nil { m.pollMetrics.IncInFlight("pmg") @@ -5121,12 +5266,16 @@ func (m *Monitor) pollPMGInstance(ctx context.Context, instanceName string, clie select { case <-ctx.Done(): pollErr = ctx.Err() - log.Debug().Str("instance", instanceName).Msg("PMG polling cancelled by context") + if debugEnabled { + log.Debug().Str("instance", instanceName).Msg("PMG polling cancelled by context") + } return default: } - log.Debug().Str("instance", instanceName).Msg("Polling PMG instance") + if debugEnabled { + log.Debug().Str("instance", instanceName).Msg("Polling PMG instance") + } var instanceCfg *config.PMGInstance for idx := range m.config.PMGInstances { @@ -5182,7 +5331,9 @@ func (m *Monitor) pollPMGInstance(ctx context.Context, instanceName string, clie cluster, err := client.GetClusterStatus(ctx, true) if err != nil { - log.Debug().Err(err).Str("instance", instanceName).Msg("Failed to retrieve PMG cluster status") + if debugEnabled { + log.Debug().Err(err).Str("instance", instanceName).Msg("Failed to retrieve PMG cluster status") + } } backupNodes := make(map[string]struct{}) @@ -5204,10 +5355,12 @@ func (m *Monitor) pollPMGInstance(ctx context.Context, instanceName string, clie // Fetch queue status for this node if queueData, qErr := client.GetQueueStatus(ctx, entry.Name); qErr != nil { - log.Debug().Err(qErr). - Str("instance", instanceName). - Str("node", entry.Name). - Msg("Failed to fetch PMG queue status") + if debugEnabled { + log.Debug().Err(qErr). + Str("instance", instanceName). + Str("node", entry.Name). + Msg("Failed to fetch PMG queue status") + } } else if queueData != nil { total := queueData.Active + queueData.Deferred + queueData.Hold + queueData.Incoming node.QueueStatus = &models.PMGQueueStatus{ @@ -5243,10 +5396,12 @@ func (m *Monitor) pollPMGInstance(ctx context.Context, instanceName string, clie backups, backupErr := client.ListBackups(ctx, nodeName) if backupErr != nil { - log.Debug().Err(backupErr). - Str("instance", instanceName). - Str("node", nodeName). - Msg("Failed to list PMG configuration backups") + if debugEnabled { + log.Debug().Err(backupErr). + Str("instance", instanceName). + Str("node", nodeName). + Msg("Failed to list PMG configuration backups") + } continue } @@ -5268,10 +5423,12 @@ func (m *Monitor) pollPMGInstance(ctx context.Context, instanceName string, clie } } - log.Debug(). - Str("instance", instanceName). - Int("backupCount", len(pmgBackups)). - Msg("PMG backups polled") + if debugEnabled { + log.Debug(). + Str("instance", instanceName). + Int("backupCount", len(pmgBackups)). + Msg("PMG backups polled") + } if stats, err := client.GetMailStatistics(ctx, "day"); err != nil { log.Warn().Err(err).Str("instance", instanceName).Msg("Failed to fetch PMG mail statistics") @@ -5299,7 +5456,9 @@ func (m *Monitor) pollPMGInstance(ctx context.Context, instanceName string, clie } if counts, err := client.GetMailCount(ctx, 24); err != nil { - log.Debug().Err(err).Str("instance", instanceName).Msg("Failed to fetch PMG mail count data") + if debugEnabled { + log.Debug().Err(err).Str("instance", instanceName).Msg("Failed to fetch PMG mail count data") + } } else if len(counts) > 0 { points := make([]models.PMGMailCountPoint, 0, len(counts)) for _, entry := range counts { @@ -5327,7 +5486,9 @@ func (m *Monitor) pollPMGInstance(ctx context.Context, instanceName string, clie } if scores, err := client.GetSpamScores(ctx); err != nil { - log.Debug().Err(err).Str("instance", instanceName).Msg("Failed to fetch PMG spam score distribution") + if debugEnabled { + log.Debug().Err(err).Str("instance", instanceName).Msg("Failed to fetch PMG spam score distribution") + } } else if len(scores) > 0 { buckets := make([]models.PMGSpamBucket, 0, len(scores)) for _, bucket := range scores {