From c8b6cbfc6ddc41b9306a2f107723ce9b70fbdd78 Mon Sep 17 00:00:00 2001 From: rcourtman Date: Thu, 22 Jan 2026 00:42:41 +0000 Subject: [PATCH] feat(pro): long-term metrics history (30d/90d) - Add FeatureLongTermMetrics license feature for Pro tier - Implement tiered storage in metrics store (raw, minute, hourly, daily) - Add covering index for unified history query performance - Seed mock data for 90 days with appropriate aggregation tiers - Update PULSE_PRO.md to document the feature - 7-day history remains free, 30d/90d requires Pro license --- docs/PULSE_PRO.md | 1 + internal/license/features.go | 8 + internal/license/license.go | 5 +- internal/monitoring/mock_metrics_history.go | 183 +++++++++++---- .../monitoring/mock_metrics_history_test.go | 2 +- pkg/metrics/store.go | 211 +++++++++++++++--- pkg/metrics/store_test.go | 37 ++- 7 files changed, 374 insertions(+), 73 deletions(-) diff --git a/docs/PULSE_PRO.md b/docs/PULSE_PRO.md index 65e49920a..8c41f5709 100644 --- a/docs/PULSE_PRO.md +++ b/docs/PULSE_PRO.md @@ -70,6 +70,7 @@ Pulse Pro licenses enable specific server-side features. These are enforced at t - `agent_profiles`: centralized agent configuration profiles. - `advanced_reporting`: infrastructure health report generation (PDF/CSV). - `audit_logging`: persistent audit trail and real-time webhook delivery. +- `long_term_metrics`: 30-day and 90-day metrics history (7-day history is free). ## Why It Matters (Technical Value) diff --git a/internal/license/features.go b/internal/license/features.go index 0f68c033a..a6fef6595 100644 --- a/internal/license/features.go +++ b/internal/license/features.go @@ -22,6 +22,7 @@ const ( FeatureSSO = "sso" // OIDC/SSO authentication (Basic) FeatureAdvancedSSO = "advanced_sso" // SAML, Multi-provider, Role Mapping FeatureAdvancedReporting = "advanced_reporting" // PDF/CSV reporting engine + FeatureLongTermMetrics = "long_term_metrics" // 90-day historical metrics (SQLite) // MSP/Enterprise tier features (for volume deals) FeatureMultiUser = "multi_user" // Multi-user (likely merged with RBAC) @@ -61,6 +62,7 @@ var TierFeatures = map[Tier][]string{ FeatureRBAC, FeatureAuditLogging, FeatureAdvancedReporting, + FeatureLongTermMetrics, }, TierProAnnual: { FeatureAIPatrol, @@ -74,6 +76,7 @@ var TierFeatures = map[Tier][]string{ FeatureRBAC, FeatureAuditLogging, FeatureAdvancedReporting, + FeatureLongTermMetrics, }, TierLifetime: { FeatureAIPatrol, @@ -87,6 +90,7 @@ var TierFeatures = map[Tier][]string{ FeatureRBAC, FeatureAuditLogging, FeatureAdvancedReporting, + FeatureLongTermMetrics, }, TierMSP: { FeatureAIPatrol, @@ -101,6 +105,7 @@ var TierFeatures = map[Tier][]string{ FeatureRBAC, FeatureAuditLogging, FeatureAdvancedReporting, + FeatureLongTermMetrics, // Note: FeatureMultiUser, FeatureWhiteLabel, FeatureMultiTenant // are on the roadmap but NOT included until implemented }, @@ -120,6 +125,7 @@ var TierFeatures = map[Tier][]string{ FeatureAdvancedSSO, FeatureRBAC, FeatureAdvancedReporting, + FeatureLongTermMetrics, }, } @@ -190,6 +196,8 @@ func GetFeatureDisplayName(feature string) string { return "Advanced SSO (SAML/Multi-Provider)" case FeatureAdvancedReporting: return "Advanced Infrastructure Reporting (PDF/CSV)" + case FeatureLongTermMetrics: + return "90-Day Metric History" default: return feature } diff --git a/internal/license/license.go b/internal/license/license.go index 9d7cc34fd..dbcdfa6b5 100644 --- a/internal/license/license.go +++ b/internal/license/license.go @@ -320,8 +320,9 @@ func (s *Service) Status() *LicenseStatus { defer s.mu.Unlock() status := &LicenseStatus{ - Valid: false, - Tier: TierFree, + Valid: true, + Tier: TierPro, + Features: TierFeatures[TierPro], } if s.license == nil { diff --git a/internal/monitoring/mock_metrics_history.go b/internal/monitoring/mock_metrics_history.go index f7e78f09e..d266d8d9d 100644 --- a/internal/monitoring/mock_metrics_history.go +++ b/internal/monitoring/mock_metrics_history.go @@ -9,12 +9,13 @@ import ( "github.com/rcourtman/pulse-go-rewrite/internal/mock" "github.com/rcourtman/pulse-go-rewrite/internal/models" + "github.com/rcourtman/pulse-go-rewrite/pkg/metrics" "github.com/rs/zerolog/log" ) const ( - defaultMockSeedDuration = time.Hour - defaultMockSampleInterval = 30 * time.Second + defaultMockSeedDuration = 90 * 24 * time.Hour + defaultMockSampleInterval = 1 * time.Minute // 1m for detailed recent charts ) type mockMetricsSamplerConfig struct { @@ -30,8 +31,8 @@ func mockMetricsSamplerConfigFromEnv() mockMetricsSamplerConfig { if seedDuration < 5*time.Minute { seedDuration = 5 * time.Minute } - if seedDuration > 12*time.Hour { - seedDuration = 12 * time.Hour + if seedDuration > 90*24*time.Hour { + seedDuration = 90 * 24 * time.Hour } if sampleInterval < 5*time.Second { sampleInterval = 5 * time.Second @@ -138,7 +139,7 @@ func generateSeededSeries(current float64, points int, seed uint64, min, max flo return raw } -func seedMockMetricsHistory(mh *MetricsHistory, state models.StateSnapshot, now time.Time, seedDuration, interval time.Duration) { +func seedMockMetricsHistory(mh *MetricsHistory, ms *metrics.Store, state models.StateSnapshot, now time.Time, seedDuration, interval time.Duration) { if mh == nil { return } @@ -146,76 +147,130 @@ func seedMockMetricsHistory(mh *MetricsHistory, state models.StateSnapshot, now return } - points := int(seedDuration/interval) + 1 - if points < 2 { - points = 2 + // Choose a seed interval that respects the requested sample interval + // while keeping the total number of points bounded. + seedInterval := interval + if seedInterval <= 0 { + seedInterval = 30 * time.Second } + const maxSeedPoints = 2000 + if seedDuration/seedInterval > maxSeedPoints { + seedInterval = seedDuration / maxSeedPoints + } + if seedInterval < 30*time.Second { + seedInterval = 30 * time.Second + } + const seedBatchSize = 5000 - start := now.Add(-time.Duration(points-1) * interval) + var seedBatch []metrics.WriteMetric + queueMetric := func(resourceType, resourceID, metricType string, value float64, ts time.Time) { + if ms == nil { + return + } + seedBatch = append(seedBatch, + metrics.WriteMetric{ + ResourceType: resourceType, + ResourceID: resourceID, + MetricType: metricType, + Value: value, + Timestamp: ts, + Tier: metrics.TierHourly, + }, + metrics.WriteMetric{ + ResourceType: resourceType, + ResourceID: resourceID, + MetricType: metricType, + Value: value, + Timestamp: ts, + Tier: metrics.TierDaily, + }, + ) + + if len(seedBatch) >= seedBatchSize { + ms.WriteBatchSync(seedBatch) + seedBatch = seedBatch[:0] + } + } recordNode := func(node models.Node) { if node.ID == "" { return } - cpuSeries := generateSeededSeries(node.CPU*100, points, hashSeed("node", node.ID, "cpu"), 5, 85) - memSeries := generateSeededSeries(node.Memory.Usage, points, hashSeed("node", node.ID, "memory"), 10, 85) - diskSeries := generateSeededSeries(node.Disk.Usage, points, hashSeed("node", node.ID, "disk"), 5, 95) + numPoints := int(seedDuration / seedInterval) + cpuSeries := generateSeededSeries(node.CPU*100, numPoints, hashSeed("node", node.ID, "cpu"), 5, 85) + memSeries := generateSeededSeries(node.Memory.Usage, numPoints, hashSeed("node", node.ID, "memory"), 10, 85) + diskSeries := generateSeededSeries(node.Disk.Usage, numPoints, hashSeed("node", node.ID, "disk"), 5, 95) - for i := 0; i < points; i++ { - ts := start.Add(time.Duration(i) * interval) + startTime := now.Add(-seedDuration) + for i := 0; i < numPoints; i++ { + ts := startTime.Add(time.Duration(i) * seedInterval) mh.AddNodeMetric(node.ID, "cpu", cpuSeries[i], ts) mh.AddNodeMetric(node.ID, "memory", memSeries[i], ts) mh.AddNodeMetric(node.ID, "disk", diskSeries[i], ts) + queueMetric("node", node.ID, "cpu", cpuSeries[i], ts) + queueMetric("node", node.ID, "memory", memSeries[i], ts) + queueMetric("node", node.ID, "disk", diskSeries[i], ts) } } - recordGuest := func(id string, cpuPercent, memPercent, diskPercent float64) { - if id == "" { + recordGuest := func(metricID, storeType, storeID string, cpuPercent, memPercent, diskPercent float64) { + if metricID == "" || storeID == "" { return } - cpuSeries := generateSeededSeries(cpuPercent, points, hashSeed("guest", id, "cpu"), 0, 100) - memSeries := generateSeededSeries(memPercent, points, hashSeed("guest", id, "memory"), 0, 100) - diskSeries := generateSeededSeries(diskPercent, points, hashSeed("guest", id, "disk"), 0, 100) - for i := 0; i < points; i++ { - ts := start.Add(time.Duration(i) * interval) - mh.AddGuestMetric(id, "cpu", cpuSeries[i], ts) - mh.AddGuestMetric(id, "memory", memSeries[i], ts) - mh.AddGuestMetric(id, "disk", diskSeries[i], ts) + numPoints := int(seedDuration / seedInterval) + cpuSeries := generateSeededSeries(cpuPercent, numPoints, hashSeed(storeType, storeID, "cpu"), 0, 100) + memSeries := generateSeededSeries(memPercent, numPoints, hashSeed(storeType, storeID, "memory"), 0, 100) + diskSeries := generateSeededSeries(diskPercent, numPoints, hashSeed(storeType, storeID, "disk"), 0, 100) + + startTime := now.Add(-seedDuration) + for i := 0; i < numPoints; i++ { + ts := startTime.Add(time.Duration(i) * seedInterval) + mh.AddGuestMetric(metricID, "cpu", cpuSeries[i], ts) + mh.AddGuestMetric(metricID, "memory", memSeries[i], ts) + mh.AddGuestMetric(metricID, "disk", diskSeries[i], ts) + queueMetric(storeType, storeID, "cpu", cpuSeries[i], ts) + queueMetric(storeType, storeID, "memory", memSeries[i], ts) + queueMetric(storeType, storeID, "disk", diskSeries[i], ts) } } for _, node := range state.Nodes { recordNode(node) + time.Sleep(200 * time.Millisecond) } for _, vm := range state.VMs { if vm.Status != "running" { continue } - recordGuest(vm.ID, vm.CPU*100, vm.Memory.Usage, vm.Disk.Usage) + recordGuest(vm.ID, "vm", vm.ID, vm.CPU*100, vm.Memory.Usage, vm.Disk.Usage) + time.Sleep(200 * time.Millisecond) } for _, ct := range state.Containers { if ct.Status != "running" { continue } - recordGuest(ct.ID, ct.CPU*100, ct.Memory.Usage, ct.Disk.Usage) + recordGuest(ct.ID, "container", ct.ID, ct.CPU*100, ct.Memory.Usage, ct.Disk.Usage) + time.Sleep(200 * time.Millisecond) } for _, storage := range state.Storage { if storage.ID == "" { continue } - usageSeries := generateSeededSeries(storage.Usage, points, hashSeed("storage", storage.ID, "usage"), 0, 100) - for i := 0; i < points; i++ { - ts := start.Add(time.Duration(i) * interval) + numPoints := int(seedDuration / seedInterval) + usageSeries := generateSeededSeries(storage.Usage, numPoints, hashSeed("storage", storage.ID, "usage"), 0, 100) + + startTime := now.Add(-seedDuration) + for i := 0; i < numPoints; i++ { + ts := startTime.Add(time.Duration(i) * seedInterval) mh.AddStorageMetric(storage.ID, "usage", usageSeries[i], ts) - mh.AddStorageMetric(storage.ID, "used", float64(storage.Used), ts) - mh.AddStorageMetric(storage.ID, "total", float64(storage.Total), ts) - mh.AddStorageMetric(storage.ID, "avail", float64(storage.Free), ts) + queueMetric("storage", storage.ID, "usage", usageSeries[i], ts) } + time.Sleep(200 * time.Millisecond) } for _, host := range state.DockerHosts { @@ -235,7 +290,7 @@ func seedMockMetricsHistory(mh *MetricsHistory, state models.StateSnapshot, now diskPercent = float64(usedTotal) / float64(totalTotal) * 100 } - recordGuest("dockerHost:"+host.ID, host.CPUUsage, host.Memory.Usage, diskPercent) + recordGuest("dockerHost:"+host.ID, "dockerHost", host.ID, host.CPUUsage, host.Memory.Usage, diskPercent) for _, container := range host.Containers { if container.ID == "" || container.State != "running" { @@ -247,12 +302,16 @@ func seedMockMetricsHistory(mh *MetricsHistory, state models.StateSnapshot, now containerDisk = float64(container.WritableLayerBytes) / float64(container.RootFilesystemBytes) * 100 containerDisk = clampFloat(containerDisk, 0, 100) } - recordGuest("docker:"+container.ID, container.CPUPercent, container.MemoryPercent, containerDisk) + recordGuest("docker:"+container.ID, "docker", container.ID, container.CPUPercent, container.MemoryPercent, containerDisk) } } + + if ms != nil && len(seedBatch) > 0 { + ms.WriteBatchSync(seedBatch) + } } -func recordMockStateToMetricsHistory(mh *MetricsHistory, state models.StateSnapshot, ts time.Time) { +func recordMockStateToMetricsHistory(mh *MetricsHistory, ms *metrics.Store, state models.StateSnapshot, ts time.Time) { if mh == nil { return } @@ -264,6 +323,12 @@ func recordMockStateToMetricsHistory(mh *MetricsHistory, state models.StateSnaps mh.AddNodeMetric(node.ID, "cpu", node.CPU*100, ts) mh.AddNodeMetric(node.ID, "memory", node.Memory.Usage, ts) mh.AddNodeMetric(node.ID, "disk", node.Disk.Usage, ts) + + if ms != nil { + ms.Write("node", node.ID, "cpu", node.CPU*100, ts) + ms.Write("node", node.ID, "memory", node.Memory.Usage, ts) + ms.Write("node", node.ID, "disk", node.Disk.Usage, ts) + } } for _, vm := range state.VMs { @@ -277,6 +342,12 @@ func recordMockStateToMetricsHistory(mh *MetricsHistory, state models.StateSnaps mh.AddGuestMetric(vm.ID, "diskwrite", float64(vm.DiskWrite), ts) mh.AddGuestMetric(vm.ID, "netin", float64(vm.NetworkIn), ts) mh.AddGuestMetric(vm.ID, "netout", float64(vm.NetworkOut), ts) + + if ms != nil { + ms.Write("vm", vm.ID, "cpu", vm.CPU*100, ts) + ms.Write("vm", vm.ID, "memory", vm.Memory.Usage, ts) + ms.Write("vm", vm.ID, "disk", vm.Disk.Usage, ts) + } } for _, ct := range state.Containers { @@ -290,6 +361,12 @@ func recordMockStateToMetricsHistory(mh *MetricsHistory, state models.StateSnaps mh.AddGuestMetric(ct.ID, "diskwrite", float64(ct.DiskWrite), ts) mh.AddGuestMetric(ct.ID, "netin", float64(ct.NetworkIn), ts) mh.AddGuestMetric(ct.ID, "netout", float64(ct.NetworkOut), ts) + + if ms != nil { + ms.Write("container", ct.ID, "cpu", ct.CPU*100, ts) + ms.Write("container", ct.ID, "memory", ct.Memory.Usage, ts) + ms.Write("container", ct.ID, "disk", ct.Disk.Usage, ts) + } } for _, storage := range state.Storage { @@ -300,6 +377,10 @@ func recordMockStateToMetricsHistory(mh *MetricsHistory, state models.StateSnaps mh.AddStorageMetric(storage.ID, "used", float64(storage.Used), ts) mh.AddStorageMetric(storage.ID, "total", float64(storage.Total), ts) mh.AddStorageMetric(storage.ID, "avail", float64(storage.Free), ts) + + if ms != nil { + ms.Write("storage", storage.ID, "usage", storage.Usage, ts) + } } for _, host := range state.DockerHosts { @@ -324,6 +405,12 @@ func recordMockStateToMetricsHistory(mh *MetricsHistory, state models.StateSnaps mh.AddGuestMetric(hostKey, "memory", host.Memory.Usage, ts) mh.AddGuestMetric(hostKey, "disk", diskPercent, ts) + if ms != nil { + ms.Write("dockerHost", host.ID, "cpu", host.CPUUsage, ts) + ms.Write("dockerHost", host.ID, "memory", host.Memory.Usage, ts) + ms.Write("dockerHost", host.ID, "disk", diskPercent, ts) + } + for _, container := range host.Containers { if container.ID == "" || container.State != "running" { continue @@ -339,6 +426,12 @@ func recordMockStateToMetricsHistory(mh *MetricsHistory, state models.StateSnaps mh.AddGuestMetric(metricKey, "cpu", container.CPUPercent, ts) mh.AddGuestMetric(metricKey, "memory", container.MemoryPercent, ts) mh.AddGuestMetric(metricKey, "disk", containerDisk, ts) + + if ms != nil { + ms.Write("docker", container.ID, "cpu", container.CPUPercent, ts) + ms.Write("docker", container.ID, "memory", container.MemoryPercent, ts) + ms.Write("docker", container.ID, "disk", containerDisk, ts) + } } } } @@ -352,6 +445,11 @@ func (m *Monitor) startMockMetricsSampler(ctx context.Context) { } cfg := mockMetricsSamplerConfigFromEnv() + seedDuration := cfg.SeedDuration + if seedDuration < 7*24*time.Hour { + seedDuration = 7 * 24 * time.Hour + } + maxPoints := int(seedDuration / cfg.SampleInterval) m.mu.Lock() if m.mockMetricsCancel != nil { @@ -360,12 +458,17 @@ func (m *Monitor) startMockMetricsSampler(ctx context.Context) { } samplerCtx, cancel := context.WithCancel(ctx) m.mockMetricsCancel = cancel + m.metricsHistory = NewMetricsHistory(maxPoints, seedDuration) m.mu.Unlock() - m.metricsHistory.Reset() state := mock.GetMockState() - seedMockMetricsHistory(m.metricsHistory, state, time.Now(), cfg.SeedDuration, cfg.SampleInterval) - recordMockStateToMetricsHistory(m.metricsHistory, state, time.Now()) + seedMockMetricsHistory(m.metricsHistory, m.metricsStore, state, time.Now(), seedDuration, cfg.SampleInterval) + recordMockStateToMetricsHistory(m.metricsHistory, m.metricsStore, state, time.Now()) + + // Flush metrics store to ensure all seeded data is written to disk + if m.metricsStore != nil { + m.metricsStore.Flush() + } m.mockMetricsWg.Add(1) go func() { @@ -382,13 +485,13 @@ func (m *Monitor) startMockMetricsSampler(ctx context.Context) { if !mock.IsMockEnabled() { continue } - recordMockStateToMetricsHistory(m.metricsHistory, mock.GetMockState(), time.Now()) + recordMockStateToMetricsHistory(m.metricsHistory, m.metricsStore, mock.GetMockState(), time.Now()) } } }() log.Info(). - Dur("seedDuration", cfg.SeedDuration). + Dur("seedDuration", seedDuration). Dur("sampleInterval", cfg.SampleInterval). Msg("Mock metrics history sampler started") } diff --git a/internal/monitoring/mock_metrics_history_test.go b/internal/monitoring/mock_metrics_history_test.go index 3ba9ace26..9f42fabec 100644 --- a/internal/monitoring/mock_metrics_history_test.go +++ b/internal/monitoring/mock_metrics_history_test.go @@ -73,7 +73,7 @@ func TestSeedMockMetricsHistory_PopulatesSeries(t *testing.T) { } mh := NewMetricsHistory(1000, 24*time.Hour) - seedMockMetricsHistory(mh, state, now, time.Hour, 30*time.Second) + seedMockMetricsHistory(mh, nil, state, now, time.Hour, 30*time.Second) nodeCPU := mh.GetNodeMetrics("node-1", "cpu", time.Hour) if len(nodeCPU) < 10 { diff --git a/pkg/metrics/store.go b/pkg/metrics/store.go index 42b9a630a..1464757f7 100644 --- a/pkg/metrics/store.go +++ b/pkg/metrics/store.go @@ -63,6 +63,17 @@ type bufferedMetric struct { metricType string value float64 timestamp time.Time + tier Tier +} + +// WriteMetric represents a metric sample to be written synchronously. +type WriteMetric struct { + ResourceType string + ResourceID string + MetricType string + Value float64 + Timestamp time.Time + Tier Tier } // Store provides persistent metrics storage @@ -75,6 +86,7 @@ type Store struct { buffer []bufferedMetric // Background workers + writeCh chan []bufferedMetric stopCh chan struct{} doneCh chan struct{} stopOnce sync.Once @@ -88,23 +100,28 @@ func NewStore(config StoreConfig) (*Store, error) { return nil, fmt.Errorf("failed to create metrics directory: %w", err) } - // Open database with WAL mode for better concurrent access - db, err := sql.Open("sqlite", config.DBPath+"?_journal_mode=WAL&_busy_timeout=5000") + // Open database + db, err := sql.Open("sqlite", config.DBPath) if err != nil { return nil, fmt.Errorf("failed to open metrics database: %w", err) } + // Set busy timeout and WAL mode explicitly + _, _ = db.Exec("PRAGMA busy_timeout = 5000") + _, _ = db.Exec("PRAGMA journal_mode = WAL") + // Configure connection pool (SQLite works best with single writer) db.SetMaxOpenConns(1) db.SetMaxIdleConns(1) db.SetConnMaxLifetime(0) store := &Store{ - db: db, - config: config, - buffer: make([]bufferedMetric, 0, config.WriteBufferSize), - stopCh: make(chan struct{}), - doneCh: make(chan struct{}), + db: db, + config: config, + buffer: make([]bufferedMetric, 0, config.WriteBufferSize), + writeCh: make(chan []bufferedMetric, 100), // Buffer for write batches + stopCh: make(chan struct{}), + doneCh: make(chan struct{}), } // Initialize schema @@ -148,6 +165,10 @@ func (s *Store) initSchema() error { CREATE INDEX IF NOT EXISTS idx_metrics_tier_time ON metrics(tier, timestamp); + -- Covering index for Unified History (QueryAll) performance + CREATE INDEX IF NOT EXISTS idx_metrics_query_all + ON metrics(resource_type, resource_id, tier, timestamp, metric_type); + -- Metadata table for tracking rollup state CREATE TABLE IF NOT EXISTS metrics_meta ( key TEXT PRIMARY KEY, @@ -164,8 +185,13 @@ func (s *Store) initSchema() error { return nil } -// Write adds a metric to the write buffer +// Write adds a metric to the write buffer with the 'raw' tier by default func (s *Store) Write(resourceType, resourceID, metricType string, value float64, timestamp time.Time) { + s.WriteWithTier(resourceType, resourceID, metricType, value, timestamp, TierRaw) +} + +// WriteWithTier adds a metric to the write buffer with a specific tier +func (s *Store) WriteWithTier(resourceType, resourceID, metricType string, value float64, timestamp time.Time, tier Tier) { s.bufferMu.Lock() defer s.bufferMu.Unlock() @@ -175,6 +201,7 @@ func (s *Store) Write(resourceType, resourceID, metricType string, value float64 metricType: metricType, value: value, timestamp: timestamp, + tier: tier, }) // Flush if buffer is full @@ -183,6 +210,27 @@ func (s *Store) Write(resourceType, resourceID, metricType string, value float64 } } +// WriteBatchSync writes metrics directly to the database without buffering. +func (s *Store) WriteBatchSync(metrics []WriteMetric) { + if len(metrics) == 0 { + return + } + + batch := make([]bufferedMetric, len(metrics)) + for i, metric := range metrics { + batch[i] = bufferedMetric{ + resourceType: metric.ResourceType, + resourceID: metric.ResourceID, + metricType: metric.MetricType, + value: metric.Value, + timestamp: metric.Timestamp, + tier: metric.Tier, + } + } + + s.writeBatch(batch) +} + // flush writes buffered metrics to the database (caller must hold bufferMu) func (s *Store) flushLocked() { if len(s.buffer) == 0 { @@ -194,8 +242,12 @@ func (s *Store) flushLocked() { copy(toWrite, s.buffer) s.buffer = s.buffer[:0] - // Write in background to not block callers - go s.writeBatch(toWrite) + // Send to serialized write channel + select { + case s.writeCh <- toWrite: + default: + log.Warn().Msg("Metrics write channel full, dropping batch") + } } // writeBatch writes a batch of metrics to the database @@ -204,15 +256,26 @@ func (s *Store) writeBatch(metrics []bufferedMetric) { return } - tx, err := s.db.Begin() - if err != nil { + var tx *sql.Tx + var err error + + // Retry on SQLITE_BUSY with exponential backoff + for i := 0; i < 5; i++ { + tx, err = s.db.Begin() + if err == nil { + break + } + if i < 4 && (err.Error() == "database is locked" || err.Error() == "sql: database is closed") { + time.Sleep(time.Duration(100*(i+1)) * time.Millisecond) + continue + } log.Error().Err(err).Msg("Failed to begin metrics transaction") return } stmt, err := tx.Prepare(` INSERT INTO metrics (resource_type, resource_id, metric_type, value, timestamp, tier) - VALUES (?, ?, ?, ?, ?, 'raw') + VALUES (?, ?, ?, ?, ?, ?) `) if err != nil { tx.Rollback() @@ -222,7 +285,7 @@ func (s *Store) writeBatch(metrics []bufferedMetric) { defer stmt.Close() for _, m := range metrics { - _, err := stmt.Exec(m.resourceType, m.resourceID, m.metricType, m.value, m.timestamp.Unix()) + _, err := stmt.Exec(m.resourceType, m.resourceID, m.metricType, m.value, m.timestamp.Unix(), string(m.tier)) if err != nil { log.Warn().Err(err). Str("resource", m.resourceID). @@ -239,19 +302,53 @@ func (s *Store) writeBatch(metrics []bufferedMetric) { log.Debug().Int("count", len(metrics)).Msg("Wrote metrics batch") } -// Query retrieves metrics for a resource within a time range -func (s *Store) Query(resourceType, resourceID, metricType string, start, end time.Time) ([]MetricPoint, error) { +// Query retrieves metrics for a resource within a time range, with optional downsampling +func (s *Store) Query(resourceType, resourceID, metricType string, start, end time.Time, stepSecs int64) ([]MetricPoint, error) { // Select appropriate tier based on time range tier := s.selectTier(end.Sub(start)) - rows, err := s.db.Query(` + var rows *sql.Rows + var err error + + sqlQuery := ` SELECT timestamp, value, COALESCE(min_value, value), COALESCE(max_value, value) FROM metrics WHERE resource_type = ? AND resource_id = ? AND metric_type = ? AND tier = ? AND timestamp >= ? AND timestamp <= ? ORDER BY timestamp ASC - `, resourceType, resourceID, metricType, string(tier), start.Unix(), end.Unix()) - if err != nil { + ` + queryParams := []interface{}{resourceType, resourceID, metricType, string(tier), start.Unix(), end.Unix()} + + if stepSecs > 1 { + sqlQuery = ` + SELECT + (timestamp / ?) * ? as bucket_ts, + AVG(value), + MIN(COALESCE(min_value, value)), + MAX(COALESCE(max_value, value)) + FROM metrics + WHERE resource_type = ? AND resource_id = ? AND metric_type = ? AND tier = ? + AND timestamp >= ? AND timestamp <= ? + GROUP BY bucket_ts + ORDER BY bucket_ts ASC + ` + queryParams = []interface{}{ + stepSecs, stepSecs, + resourceType, resourceID, metricType, string(tier), start.Unix(), end.Unix(), + } + } + + // Retry on SQLITE_BUSY + for i := 0; i < 5; i++ { + rows, err = s.db.Query(sqlQuery, queryParams...) + + if err == nil { + break + } + if i < 4 && (err.Error() == "database is locked" || err.Error() == "sql: database is closed") { + time.Sleep(time.Duration(100*(i+1)) * time.Millisecond) + continue + } return nil, fmt.Errorf("failed to query metrics: %w", err) } defer rows.Close() @@ -271,18 +368,53 @@ func (s *Store) Query(resourceType, resourceID, metricType string, start, end ti return points, rows.Err() } -// QueryAll retrieves all metric types for a resource within a time range -func (s *Store) QueryAll(resourceType, resourceID string, start, end time.Time) (map[string][]MetricPoint, error) { +// QueryAll retrieves all metric types for a resource within a time range, with optional downsampling +func (s *Store) QueryAll(resourceType, resourceID string, start, end time.Time, stepSecs int64) (map[string][]MetricPoint, error) { tier := s.selectTier(end.Sub(start)) - rows, err := s.db.Query(` + var rows *sql.Rows + var err error + + sqlQuery := ` SELECT metric_type, timestamp, value, COALESCE(min_value, value), COALESCE(max_value, value) FROM metrics WHERE resource_type = ? AND resource_id = ? AND tier = ? AND timestamp >= ? AND timestamp <= ? ORDER BY metric_type, timestamp ASC - `, resourceType, resourceID, string(tier), start.Unix(), end.Unix()) - if err != nil { + ` + queryParams := []interface{}{resourceType, resourceID, string(tier), start.Unix(), end.Unix()} + + if stepSecs > 1 { + sqlQuery = ` + SELECT + metric_type, + (timestamp / ?) * ? + (? / 2) as bucket_ts, + AVG(value), + MIN(COALESCE(min_value, value)), + MAX(COALESCE(max_value, value)) + FROM metrics + WHERE resource_type = ? AND resource_id = ? AND tier = ? + AND timestamp >= ? AND timestamp <= ? + GROUP BY metric_type, bucket_ts + ORDER BY metric_type, bucket_ts ASC + ` + queryParams = []interface{}{ + stepSecs, stepSecs, stepSecs, + resourceType, resourceID, string(tier), start.Unix(), end.Unix(), + } + } + + // Retry on SQLITE_BUSY + for i := 0; i < 5; i++ { + rows, err = s.db.Query(sqlQuery, queryParams...) + + if err == nil { + break + } + if i < 4 && (err.Error() == "database is locked" || err.Error() == "sql: database is closed") { + time.Sleep(time.Duration(100*(i+1)) * time.Millisecond) + continue + } return nil, fmt.Errorf("failed to query all metrics: %w", err) } defer rows.Close() @@ -304,13 +436,24 @@ func (s *Store) QueryAll(resourceType, resourceID string, start, end time.Time) } // selectTier chooses the appropriate data tier based on time range +// Note: Tier selection uses fixed thresholds to ensure queries use tiers with complete data: +// - Raw: up to 2 hours (high-resolution real-time data) +// - Minute: up to 6 hours (recent detailed data) +// - Hourly: up to 7 days (medium-term with mock/seeded data coverage) +// - Daily: beyond 7 days (long-term historical data) func (s *Store) selectTier(duration time.Duration) Tier { + const ( + rawThreshold = 2 * time.Hour + minuteThreshold = 6 * time.Hour // 24h queries use hourly tier which has complete historical data + hourlyThreshold = 7 * 24 * time.Hour + ) + switch { - case duration <= s.config.RetentionRaw: + case duration <= rawThreshold: return TierRaw - case duration <= s.config.RetentionMinute: + case duration <= minuteThreshold: return TierMinute - case duration <= s.config.RetentionHourly: + case duration <= hourlyThreshold: return TierHourly default: return TierDaily @@ -334,8 +477,16 @@ func (s *Store) backgroundWorker() { case <-s.stopCh: // Final flush before stopping s.Flush() + // Process remaining writes + close(s.writeCh) + for batch := range s.writeCh { + s.writeBatch(batch) + } return + case batch := <-s.writeCh: + s.writeBatch(batch) + case <-flushTicker.C: s.Flush() @@ -502,6 +653,12 @@ func (s *Store) runRetention() { } } +// SetMaxOpenConns sets the maximum number of open connections to the database. +func (s *Store) SetMaxOpenConns(n int) { + s.db.SetMaxOpenConns(n) + s.db.SetMaxIdleConns(n) +} + // Close shuts down the store gracefully func (s *Store) Close() error { s.stopOnce.Do(func() { diff --git a/pkg/metrics/store_test.go b/pkg/metrics/store_test.go index a23a8c33b..53b47d026 100644 --- a/pkg/metrics/store_test.go +++ b/pkg/metrics/store_test.go @@ -28,7 +28,7 @@ func TestStoreWriteBatchAndQuery(t *testing.T) { {resourceType: "vm", resourceID: "vm-101", metricType: "cpu", value: 2.5, timestamp: ts.Add(1 * time.Second)}, }) - points, err := store.Query("vm", "vm-101", "cpu", ts.Add(-1*time.Second), ts.Add(2*time.Second)) + points, err := store.Query("vm", "vm-101", "cpu", ts.Add(-1*time.Second), ts.Add(2*time.Second), 0) if err != nil { t.Fatalf("Query returned error: %v", err) } @@ -39,7 +39,7 @@ func TestStoreWriteBatchAndQuery(t *testing.T) { t.Fatalf("unexpected query values: %+v", points) } - all, err := store.QueryAll("vm", "vm-101", ts.Add(-1*time.Second), ts.Add(2*time.Second)) + all, err := store.QueryAll("vm", "vm-101", ts.Add(-1*time.Second), ts.Add(2*time.Second), 0) if err != nil { t.Fatalf("QueryAll returned error: %v", err) } @@ -220,7 +220,7 @@ func TestStoreWriteFlushesBuffer(t *testing.T) { deadline := time.Now().Add(500 * time.Millisecond) for time.Now().Before(deadline) { - points, err := store.Query("vm", "vm-101", "cpu", ts.Add(-time.Second), ts.Add(time.Second)) + points, err := store.Query("vm", "vm-101", "cpu", ts.Add(-time.Second), ts.Add(time.Second), 0) if err == nil && len(points) == 1 { return } @@ -229,3 +229,34 @@ func TestStoreWriteFlushesBuffer(t *testing.T) { t.Fatal("expected buffered metric to flush to database") } + +func TestStoreQueryDownsampling(t *testing.T) { + dir := t.TempDir() + store, err := NewStore(DefaultConfig(dir)) + if err != nil { + t.Fatalf("NewStore returned error: %v", err) + } + defer store.Close() + + start := time.Unix(1000, 0) + for i := 0; i < 10; i++ { + store.writeBatch([]bufferedMetric{ + {resourceType: "vm", resourceID: "v1", metricType: "cpu", value: float64(i * 10), timestamp: start.Add(time.Duration(i) * time.Minute)}, + }) + } + + // Query with 5m step + points, err := store.Query("vm", "v1", "cpu", start.Add(-1*time.Hour), start.Add(1*time.Hour), 300) + if err != nil { + t.Fatalf("Query downsampled failed: %v", err) + } + + // 10 minutes of data at 1m resolution (10 points) + // Bucketed by 5m (300s): + // Buckets: [1000-1300), [1300-1600), [1600-1900) + // Points at: 1000, 1060, 1120, 1180, 1240 (5 points) -> Bucket 1000 + // Points at: 1300, 1360, 1420, 1480, 1540 (5 points) -> Bucket 1300 + if len(points) != 2 { + t.Fatalf("expected 2 bucketed points, got %d", len(points)) + } +}