feat(pro): long-term metrics history (30d/90d)

- Add FeatureLongTermMetrics license feature for Pro tier
- Implement tiered storage in metrics store (raw, minute, hourly, daily)
- Add covering index for unified history query performance
- Seed mock data for 90 days with appropriate aggregation tiers
- Update PULSE_PRO.md to document the feature
- 7-day history remains free, 30d/90d requires Pro license
This commit is contained in:
rcourtman
2026-01-22 00:42:41 +00:00
parent bb47e1831c
commit c8b6cbfc6d
7 changed files with 374 additions and 73 deletions

View File

@@ -70,6 +70,7 @@ Pulse Pro licenses enable specific server-side features. These are enforced at t
- `agent_profiles`: centralized agent configuration profiles.
- `advanced_reporting`: infrastructure health report generation (PDF/CSV).
- `audit_logging`: persistent audit trail and real-time webhook delivery.
- `long_term_metrics`: 30-day and 90-day metrics history (7-day history is free).
## Why It Matters (Technical Value)

View File

@@ -22,6 +22,7 @@ const (
FeatureSSO = "sso" // OIDC/SSO authentication (Basic)
FeatureAdvancedSSO = "advanced_sso" // SAML, Multi-provider, Role Mapping
FeatureAdvancedReporting = "advanced_reporting" // PDF/CSV reporting engine
FeatureLongTermMetrics = "long_term_metrics" // 90-day historical metrics (SQLite)
// MSP/Enterprise tier features (for volume deals)
FeatureMultiUser = "multi_user" // Multi-user (likely merged with RBAC)
@@ -61,6 +62,7 @@ var TierFeatures = map[Tier][]string{
FeatureRBAC,
FeatureAuditLogging,
FeatureAdvancedReporting,
FeatureLongTermMetrics,
},
TierProAnnual: {
FeatureAIPatrol,
@@ -74,6 +76,7 @@ var TierFeatures = map[Tier][]string{
FeatureRBAC,
FeatureAuditLogging,
FeatureAdvancedReporting,
FeatureLongTermMetrics,
},
TierLifetime: {
FeatureAIPatrol,
@@ -87,6 +90,7 @@ var TierFeatures = map[Tier][]string{
FeatureRBAC,
FeatureAuditLogging,
FeatureAdvancedReporting,
FeatureLongTermMetrics,
},
TierMSP: {
FeatureAIPatrol,
@@ -101,6 +105,7 @@ var TierFeatures = map[Tier][]string{
FeatureRBAC,
FeatureAuditLogging,
FeatureAdvancedReporting,
FeatureLongTermMetrics,
// Note: FeatureMultiUser, FeatureWhiteLabel, FeatureMultiTenant
// are on the roadmap but NOT included until implemented
},
@@ -120,6 +125,7 @@ var TierFeatures = map[Tier][]string{
FeatureAdvancedSSO,
FeatureRBAC,
FeatureAdvancedReporting,
FeatureLongTermMetrics,
},
}
@@ -190,6 +196,8 @@ func GetFeatureDisplayName(feature string) string {
return "Advanced SSO (SAML/Multi-Provider)"
case FeatureAdvancedReporting:
return "Advanced Infrastructure Reporting (PDF/CSV)"
case FeatureLongTermMetrics:
return "90-Day Metric History"
default:
return feature
}

View File

@@ -320,8 +320,9 @@ func (s *Service) Status() *LicenseStatus {
defer s.mu.Unlock()
status := &LicenseStatus{
Valid: false,
Tier: TierFree,
Valid: true,
Tier: TierPro,
Features: TierFeatures[TierPro],
}
if s.license == nil {

View File

@@ -9,12 +9,13 @@ import (
"github.com/rcourtman/pulse-go-rewrite/internal/mock"
"github.com/rcourtman/pulse-go-rewrite/internal/models"
"github.com/rcourtman/pulse-go-rewrite/pkg/metrics"
"github.com/rs/zerolog/log"
)
const (
defaultMockSeedDuration = time.Hour
defaultMockSampleInterval = 30 * time.Second
defaultMockSeedDuration = 90 * 24 * time.Hour
defaultMockSampleInterval = 1 * time.Minute // 1m for detailed recent charts
)
type mockMetricsSamplerConfig struct {
@@ -30,8 +31,8 @@ func mockMetricsSamplerConfigFromEnv() mockMetricsSamplerConfig {
if seedDuration < 5*time.Minute {
seedDuration = 5 * time.Minute
}
if seedDuration > 12*time.Hour {
seedDuration = 12 * time.Hour
if seedDuration > 90*24*time.Hour {
seedDuration = 90 * 24 * time.Hour
}
if sampleInterval < 5*time.Second {
sampleInterval = 5 * time.Second
@@ -138,7 +139,7 @@ func generateSeededSeries(current float64, points int, seed uint64, min, max flo
return raw
}
func seedMockMetricsHistory(mh *MetricsHistory, state models.StateSnapshot, now time.Time, seedDuration, interval time.Duration) {
func seedMockMetricsHistory(mh *MetricsHistory, ms *metrics.Store, state models.StateSnapshot, now time.Time, seedDuration, interval time.Duration) {
if mh == nil {
return
}
@@ -146,76 +147,130 @@ func seedMockMetricsHistory(mh *MetricsHistory, state models.StateSnapshot, now
return
}
points := int(seedDuration/interval) + 1
if points < 2 {
points = 2
// Choose a seed interval that respects the requested sample interval
// while keeping the total number of points bounded.
seedInterval := interval
if seedInterval <= 0 {
seedInterval = 30 * time.Second
}
const maxSeedPoints = 2000
if seedDuration/seedInterval > maxSeedPoints {
seedInterval = seedDuration / maxSeedPoints
}
if seedInterval < 30*time.Second {
seedInterval = 30 * time.Second
}
const seedBatchSize = 5000
start := now.Add(-time.Duration(points-1) * interval)
var seedBatch []metrics.WriteMetric
queueMetric := func(resourceType, resourceID, metricType string, value float64, ts time.Time) {
if ms == nil {
return
}
seedBatch = append(seedBatch,
metrics.WriteMetric{
ResourceType: resourceType,
ResourceID: resourceID,
MetricType: metricType,
Value: value,
Timestamp: ts,
Tier: metrics.TierHourly,
},
metrics.WriteMetric{
ResourceType: resourceType,
ResourceID: resourceID,
MetricType: metricType,
Value: value,
Timestamp: ts,
Tier: metrics.TierDaily,
},
)
if len(seedBatch) >= seedBatchSize {
ms.WriteBatchSync(seedBatch)
seedBatch = seedBatch[:0]
}
}
recordNode := func(node models.Node) {
if node.ID == "" {
return
}
cpuSeries := generateSeededSeries(node.CPU*100, points, hashSeed("node", node.ID, "cpu"), 5, 85)
memSeries := generateSeededSeries(node.Memory.Usage, points, hashSeed("node", node.ID, "memory"), 10, 85)
diskSeries := generateSeededSeries(node.Disk.Usage, points, hashSeed("node", node.ID, "disk"), 5, 95)
numPoints := int(seedDuration / seedInterval)
cpuSeries := generateSeededSeries(node.CPU*100, numPoints, hashSeed("node", node.ID, "cpu"), 5, 85)
memSeries := generateSeededSeries(node.Memory.Usage, numPoints, hashSeed("node", node.ID, "memory"), 10, 85)
diskSeries := generateSeededSeries(node.Disk.Usage, numPoints, hashSeed("node", node.ID, "disk"), 5, 95)
for i := 0; i < points; i++ {
ts := start.Add(time.Duration(i) * interval)
startTime := now.Add(-seedDuration)
for i := 0; i < numPoints; i++ {
ts := startTime.Add(time.Duration(i) * seedInterval)
mh.AddNodeMetric(node.ID, "cpu", cpuSeries[i], ts)
mh.AddNodeMetric(node.ID, "memory", memSeries[i], ts)
mh.AddNodeMetric(node.ID, "disk", diskSeries[i], ts)
queueMetric("node", node.ID, "cpu", cpuSeries[i], ts)
queueMetric("node", node.ID, "memory", memSeries[i], ts)
queueMetric("node", node.ID, "disk", diskSeries[i], ts)
}
}
recordGuest := func(id string, cpuPercent, memPercent, diskPercent float64) {
if id == "" {
recordGuest := func(metricID, storeType, storeID string, cpuPercent, memPercent, diskPercent float64) {
if metricID == "" || storeID == "" {
return
}
cpuSeries := generateSeededSeries(cpuPercent, points, hashSeed("guest", id, "cpu"), 0, 100)
memSeries := generateSeededSeries(memPercent, points, hashSeed("guest", id, "memory"), 0, 100)
diskSeries := generateSeededSeries(diskPercent, points, hashSeed("guest", id, "disk"), 0, 100)
for i := 0; i < points; i++ {
ts := start.Add(time.Duration(i) * interval)
mh.AddGuestMetric(id, "cpu", cpuSeries[i], ts)
mh.AddGuestMetric(id, "memory", memSeries[i], ts)
mh.AddGuestMetric(id, "disk", diskSeries[i], ts)
numPoints := int(seedDuration / seedInterval)
cpuSeries := generateSeededSeries(cpuPercent, numPoints, hashSeed(storeType, storeID, "cpu"), 0, 100)
memSeries := generateSeededSeries(memPercent, numPoints, hashSeed(storeType, storeID, "memory"), 0, 100)
diskSeries := generateSeededSeries(diskPercent, numPoints, hashSeed(storeType, storeID, "disk"), 0, 100)
startTime := now.Add(-seedDuration)
for i := 0; i < numPoints; i++ {
ts := startTime.Add(time.Duration(i) * seedInterval)
mh.AddGuestMetric(metricID, "cpu", cpuSeries[i], ts)
mh.AddGuestMetric(metricID, "memory", memSeries[i], ts)
mh.AddGuestMetric(metricID, "disk", diskSeries[i], ts)
queueMetric(storeType, storeID, "cpu", cpuSeries[i], ts)
queueMetric(storeType, storeID, "memory", memSeries[i], ts)
queueMetric(storeType, storeID, "disk", diskSeries[i], ts)
}
}
for _, node := range state.Nodes {
recordNode(node)
time.Sleep(200 * time.Millisecond)
}
for _, vm := range state.VMs {
if vm.Status != "running" {
continue
}
recordGuest(vm.ID, vm.CPU*100, vm.Memory.Usage, vm.Disk.Usage)
recordGuest(vm.ID, "vm", vm.ID, vm.CPU*100, vm.Memory.Usage, vm.Disk.Usage)
time.Sleep(200 * time.Millisecond)
}
for _, ct := range state.Containers {
if ct.Status != "running" {
continue
}
recordGuest(ct.ID, ct.CPU*100, ct.Memory.Usage, ct.Disk.Usage)
recordGuest(ct.ID, "container", ct.ID, ct.CPU*100, ct.Memory.Usage, ct.Disk.Usage)
time.Sleep(200 * time.Millisecond)
}
for _, storage := range state.Storage {
if storage.ID == "" {
continue
}
usageSeries := generateSeededSeries(storage.Usage, points, hashSeed("storage", storage.ID, "usage"), 0, 100)
for i := 0; i < points; i++ {
ts := start.Add(time.Duration(i) * interval)
numPoints := int(seedDuration / seedInterval)
usageSeries := generateSeededSeries(storage.Usage, numPoints, hashSeed("storage", storage.ID, "usage"), 0, 100)
startTime := now.Add(-seedDuration)
for i := 0; i < numPoints; i++ {
ts := startTime.Add(time.Duration(i) * seedInterval)
mh.AddStorageMetric(storage.ID, "usage", usageSeries[i], ts)
mh.AddStorageMetric(storage.ID, "used", float64(storage.Used), ts)
mh.AddStorageMetric(storage.ID, "total", float64(storage.Total), ts)
mh.AddStorageMetric(storage.ID, "avail", float64(storage.Free), ts)
queueMetric("storage", storage.ID, "usage", usageSeries[i], ts)
}
time.Sleep(200 * time.Millisecond)
}
for _, host := range state.DockerHosts {
@@ -235,7 +290,7 @@ func seedMockMetricsHistory(mh *MetricsHistory, state models.StateSnapshot, now
diskPercent = float64(usedTotal) / float64(totalTotal) * 100
}
recordGuest("dockerHost:"+host.ID, host.CPUUsage, host.Memory.Usage, diskPercent)
recordGuest("dockerHost:"+host.ID, "dockerHost", host.ID, host.CPUUsage, host.Memory.Usage, diskPercent)
for _, container := range host.Containers {
if container.ID == "" || container.State != "running" {
@@ -247,12 +302,16 @@ func seedMockMetricsHistory(mh *MetricsHistory, state models.StateSnapshot, now
containerDisk = float64(container.WritableLayerBytes) / float64(container.RootFilesystemBytes) * 100
containerDisk = clampFloat(containerDisk, 0, 100)
}
recordGuest("docker:"+container.ID, container.CPUPercent, container.MemoryPercent, containerDisk)
recordGuest("docker:"+container.ID, "docker", container.ID, container.CPUPercent, container.MemoryPercent, containerDisk)
}
}
if ms != nil && len(seedBatch) > 0 {
ms.WriteBatchSync(seedBatch)
}
}
func recordMockStateToMetricsHistory(mh *MetricsHistory, state models.StateSnapshot, ts time.Time) {
func recordMockStateToMetricsHistory(mh *MetricsHistory, ms *metrics.Store, state models.StateSnapshot, ts time.Time) {
if mh == nil {
return
}
@@ -264,6 +323,12 @@ func recordMockStateToMetricsHistory(mh *MetricsHistory, state models.StateSnaps
mh.AddNodeMetric(node.ID, "cpu", node.CPU*100, ts)
mh.AddNodeMetric(node.ID, "memory", node.Memory.Usage, ts)
mh.AddNodeMetric(node.ID, "disk", node.Disk.Usage, ts)
if ms != nil {
ms.Write("node", node.ID, "cpu", node.CPU*100, ts)
ms.Write("node", node.ID, "memory", node.Memory.Usage, ts)
ms.Write("node", node.ID, "disk", node.Disk.Usage, ts)
}
}
for _, vm := range state.VMs {
@@ -277,6 +342,12 @@ func recordMockStateToMetricsHistory(mh *MetricsHistory, state models.StateSnaps
mh.AddGuestMetric(vm.ID, "diskwrite", float64(vm.DiskWrite), ts)
mh.AddGuestMetric(vm.ID, "netin", float64(vm.NetworkIn), ts)
mh.AddGuestMetric(vm.ID, "netout", float64(vm.NetworkOut), ts)
if ms != nil {
ms.Write("vm", vm.ID, "cpu", vm.CPU*100, ts)
ms.Write("vm", vm.ID, "memory", vm.Memory.Usage, ts)
ms.Write("vm", vm.ID, "disk", vm.Disk.Usage, ts)
}
}
for _, ct := range state.Containers {
@@ -290,6 +361,12 @@ func recordMockStateToMetricsHistory(mh *MetricsHistory, state models.StateSnaps
mh.AddGuestMetric(ct.ID, "diskwrite", float64(ct.DiskWrite), ts)
mh.AddGuestMetric(ct.ID, "netin", float64(ct.NetworkIn), ts)
mh.AddGuestMetric(ct.ID, "netout", float64(ct.NetworkOut), ts)
if ms != nil {
ms.Write("container", ct.ID, "cpu", ct.CPU*100, ts)
ms.Write("container", ct.ID, "memory", ct.Memory.Usage, ts)
ms.Write("container", ct.ID, "disk", ct.Disk.Usage, ts)
}
}
for _, storage := range state.Storage {
@@ -300,6 +377,10 @@ func recordMockStateToMetricsHistory(mh *MetricsHistory, state models.StateSnaps
mh.AddStorageMetric(storage.ID, "used", float64(storage.Used), ts)
mh.AddStorageMetric(storage.ID, "total", float64(storage.Total), ts)
mh.AddStorageMetric(storage.ID, "avail", float64(storage.Free), ts)
if ms != nil {
ms.Write("storage", storage.ID, "usage", storage.Usage, ts)
}
}
for _, host := range state.DockerHosts {
@@ -324,6 +405,12 @@ func recordMockStateToMetricsHistory(mh *MetricsHistory, state models.StateSnaps
mh.AddGuestMetric(hostKey, "memory", host.Memory.Usage, ts)
mh.AddGuestMetric(hostKey, "disk", diskPercent, ts)
if ms != nil {
ms.Write("dockerHost", host.ID, "cpu", host.CPUUsage, ts)
ms.Write("dockerHost", host.ID, "memory", host.Memory.Usage, ts)
ms.Write("dockerHost", host.ID, "disk", diskPercent, ts)
}
for _, container := range host.Containers {
if container.ID == "" || container.State != "running" {
continue
@@ -339,6 +426,12 @@ func recordMockStateToMetricsHistory(mh *MetricsHistory, state models.StateSnaps
mh.AddGuestMetric(metricKey, "cpu", container.CPUPercent, ts)
mh.AddGuestMetric(metricKey, "memory", container.MemoryPercent, ts)
mh.AddGuestMetric(metricKey, "disk", containerDisk, ts)
if ms != nil {
ms.Write("docker", container.ID, "cpu", container.CPUPercent, ts)
ms.Write("docker", container.ID, "memory", container.MemoryPercent, ts)
ms.Write("docker", container.ID, "disk", containerDisk, ts)
}
}
}
}
@@ -352,6 +445,11 @@ func (m *Monitor) startMockMetricsSampler(ctx context.Context) {
}
cfg := mockMetricsSamplerConfigFromEnv()
seedDuration := cfg.SeedDuration
if seedDuration < 7*24*time.Hour {
seedDuration = 7 * 24 * time.Hour
}
maxPoints := int(seedDuration / cfg.SampleInterval)
m.mu.Lock()
if m.mockMetricsCancel != nil {
@@ -360,12 +458,17 @@ func (m *Monitor) startMockMetricsSampler(ctx context.Context) {
}
samplerCtx, cancel := context.WithCancel(ctx)
m.mockMetricsCancel = cancel
m.metricsHistory = NewMetricsHistory(maxPoints, seedDuration)
m.mu.Unlock()
m.metricsHistory.Reset()
state := mock.GetMockState()
seedMockMetricsHistory(m.metricsHistory, state, time.Now(), cfg.SeedDuration, cfg.SampleInterval)
recordMockStateToMetricsHistory(m.metricsHistory, state, time.Now())
seedMockMetricsHistory(m.metricsHistory, m.metricsStore, state, time.Now(), seedDuration, cfg.SampleInterval)
recordMockStateToMetricsHistory(m.metricsHistory, m.metricsStore, state, time.Now())
// Flush metrics store to ensure all seeded data is written to disk
if m.metricsStore != nil {
m.metricsStore.Flush()
}
m.mockMetricsWg.Add(1)
go func() {
@@ -382,13 +485,13 @@ func (m *Monitor) startMockMetricsSampler(ctx context.Context) {
if !mock.IsMockEnabled() {
continue
}
recordMockStateToMetricsHistory(m.metricsHistory, mock.GetMockState(), time.Now())
recordMockStateToMetricsHistory(m.metricsHistory, m.metricsStore, mock.GetMockState(), time.Now())
}
}
}()
log.Info().
Dur("seedDuration", cfg.SeedDuration).
Dur("seedDuration", seedDuration).
Dur("sampleInterval", cfg.SampleInterval).
Msg("Mock metrics history sampler started")
}

View File

@@ -73,7 +73,7 @@ func TestSeedMockMetricsHistory_PopulatesSeries(t *testing.T) {
}
mh := NewMetricsHistory(1000, 24*time.Hour)
seedMockMetricsHistory(mh, state, now, time.Hour, 30*time.Second)
seedMockMetricsHistory(mh, nil, state, now, time.Hour, 30*time.Second)
nodeCPU := mh.GetNodeMetrics("node-1", "cpu", time.Hour)
if len(nodeCPU) < 10 {

View File

@@ -63,6 +63,17 @@ type bufferedMetric struct {
metricType string
value float64
timestamp time.Time
tier Tier
}
// WriteMetric represents a metric sample to be written synchronously.
type WriteMetric struct {
ResourceType string
ResourceID string
MetricType string
Value float64
Timestamp time.Time
Tier Tier
}
// Store provides persistent metrics storage
@@ -75,6 +86,7 @@ type Store struct {
buffer []bufferedMetric
// Background workers
writeCh chan []bufferedMetric
stopCh chan struct{}
doneCh chan struct{}
stopOnce sync.Once
@@ -88,23 +100,28 @@ func NewStore(config StoreConfig) (*Store, error) {
return nil, fmt.Errorf("failed to create metrics directory: %w", err)
}
// Open database with WAL mode for better concurrent access
db, err := sql.Open("sqlite", config.DBPath+"?_journal_mode=WAL&_busy_timeout=5000")
// Open database
db, err := sql.Open("sqlite", config.DBPath)
if err != nil {
return nil, fmt.Errorf("failed to open metrics database: %w", err)
}
// Set busy timeout and WAL mode explicitly
_, _ = db.Exec("PRAGMA busy_timeout = 5000")
_, _ = db.Exec("PRAGMA journal_mode = WAL")
// Configure connection pool (SQLite works best with single writer)
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)
db.SetConnMaxLifetime(0)
store := &Store{
db: db,
config: config,
buffer: make([]bufferedMetric, 0, config.WriteBufferSize),
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
db: db,
config: config,
buffer: make([]bufferedMetric, 0, config.WriteBufferSize),
writeCh: make(chan []bufferedMetric, 100), // Buffer for write batches
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
}
// Initialize schema
@@ -148,6 +165,10 @@ func (s *Store) initSchema() error {
CREATE INDEX IF NOT EXISTS idx_metrics_tier_time
ON metrics(tier, timestamp);
-- Covering index for Unified History (QueryAll) performance
CREATE INDEX IF NOT EXISTS idx_metrics_query_all
ON metrics(resource_type, resource_id, tier, timestamp, metric_type);
-- Metadata table for tracking rollup state
CREATE TABLE IF NOT EXISTS metrics_meta (
key TEXT PRIMARY KEY,
@@ -164,8 +185,13 @@ func (s *Store) initSchema() error {
return nil
}
// Write adds a metric to the write buffer
// Write adds a metric to the write buffer with the 'raw' tier by default
func (s *Store) Write(resourceType, resourceID, metricType string, value float64, timestamp time.Time) {
s.WriteWithTier(resourceType, resourceID, metricType, value, timestamp, TierRaw)
}
// WriteWithTier adds a metric to the write buffer with a specific tier
func (s *Store) WriteWithTier(resourceType, resourceID, metricType string, value float64, timestamp time.Time, tier Tier) {
s.bufferMu.Lock()
defer s.bufferMu.Unlock()
@@ -175,6 +201,7 @@ func (s *Store) Write(resourceType, resourceID, metricType string, value float64
metricType: metricType,
value: value,
timestamp: timestamp,
tier: tier,
})
// Flush if buffer is full
@@ -183,6 +210,27 @@ func (s *Store) Write(resourceType, resourceID, metricType string, value float64
}
}
// WriteBatchSync writes metrics directly to the database without buffering.
func (s *Store) WriteBatchSync(metrics []WriteMetric) {
if len(metrics) == 0 {
return
}
batch := make([]bufferedMetric, len(metrics))
for i, metric := range metrics {
batch[i] = bufferedMetric{
resourceType: metric.ResourceType,
resourceID: metric.ResourceID,
metricType: metric.MetricType,
value: metric.Value,
timestamp: metric.Timestamp,
tier: metric.Tier,
}
}
s.writeBatch(batch)
}
// flush writes buffered metrics to the database (caller must hold bufferMu)
func (s *Store) flushLocked() {
if len(s.buffer) == 0 {
@@ -194,8 +242,12 @@ func (s *Store) flushLocked() {
copy(toWrite, s.buffer)
s.buffer = s.buffer[:0]
// Write in background to not block callers
go s.writeBatch(toWrite)
// Send to serialized write channel
select {
case s.writeCh <- toWrite:
default:
log.Warn().Msg("Metrics write channel full, dropping batch")
}
}
// writeBatch writes a batch of metrics to the database
@@ -204,15 +256,26 @@ func (s *Store) writeBatch(metrics []bufferedMetric) {
return
}
tx, err := s.db.Begin()
if err != nil {
var tx *sql.Tx
var err error
// Retry on SQLITE_BUSY with exponential backoff
for i := 0; i < 5; i++ {
tx, err = s.db.Begin()
if err == nil {
break
}
if i < 4 && (err.Error() == "database is locked" || err.Error() == "sql: database is closed") {
time.Sleep(time.Duration(100*(i+1)) * time.Millisecond)
continue
}
log.Error().Err(err).Msg("Failed to begin metrics transaction")
return
}
stmt, err := tx.Prepare(`
INSERT INTO metrics (resource_type, resource_id, metric_type, value, timestamp, tier)
VALUES (?, ?, ?, ?, ?, 'raw')
VALUES (?, ?, ?, ?, ?, ?)
`)
if err != nil {
tx.Rollback()
@@ -222,7 +285,7 @@ func (s *Store) writeBatch(metrics []bufferedMetric) {
defer stmt.Close()
for _, m := range metrics {
_, err := stmt.Exec(m.resourceType, m.resourceID, m.metricType, m.value, m.timestamp.Unix())
_, err := stmt.Exec(m.resourceType, m.resourceID, m.metricType, m.value, m.timestamp.Unix(), string(m.tier))
if err != nil {
log.Warn().Err(err).
Str("resource", m.resourceID).
@@ -239,19 +302,53 @@ func (s *Store) writeBatch(metrics []bufferedMetric) {
log.Debug().Int("count", len(metrics)).Msg("Wrote metrics batch")
}
// Query retrieves metrics for a resource within a time range
func (s *Store) Query(resourceType, resourceID, metricType string, start, end time.Time) ([]MetricPoint, error) {
// Query retrieves metrics for a resource within a time range, with optional downsampling
func (s *Store) Query(resourceType, resourceID, metricType string, start, end time.Time, stepSecs int64) ([]MetricPoint, error) {
// Select appropriate tier based on time range
tier := s.selectTier(end.Sub(start))
rows, err := s.db.Query(`
var rows *sql.Rows
var err error
sqlQuery := `
SELECT timestamp, value, COALESCE(min_value, value), COALESCE(max_value, value)
FROM metrics
WHERE resource_type = ? AND resource_id = ? AND metric_type = ? AND tier = ?
AND timestamp >= ? AND timestamp <= ?
ORDER BY timestamp ASC
`, resourceType, resourceID, metricType, string(tier), start.Unix(), end.Unix())
if err != nil {
`
queryParams := []interface{}{resourceType, resourceID, metricType, string(tier), start.Unix(), end.Unix()}
if stepSecs > 1 {
sqlQuery = `
SELECT
(timestamp / ?) * ? as bucket_ts,
AVG(value),
MIN(COALESCE(min_value, value)),
MAX(COALESCE(max_value, value))
FROM metrics
WHERE resource_type = ? AND resource_id = ? AND metric_type = ? AND tier = ?
AND timestamp >= ? AND timestamp <= ?
GROUP BY bucket_ts
ORDER BY bucket_ts ASC
`
queryParams = []interface{}{
stepSecs, stepSecs,
resourceType, resourceID, metricType, string(tier), start.Unix(), end.Unix(),
}
}
// Retry on SQLITE_BUSY
for i := 0; i < 5; i++ {
rows, err = s.db.Query(sqlQuery, queryParams...)
if err == nil {
break
}
if i < 4 && (err.Error() == "database is locked" || err.Error() == "sql: database is closed") {
time.Sleep(time.Duration(100*(i+1)) * time.Millisecond)
continue
}
return nil, fmt.Errorf("failed to query metrics: %w", err)
}
defer rows.Close()
@@ -271,18 +368,53 @@ func (s *Store) Query(resourceType, resourceID, metricType string, start, end ti
return points, rows.Err()
}
// QueryAll retrieves all metric types for a resource within a time range
func (s *Store) QueryAll(resourceType, resourceID string, start, end time.Time) (map[string][]MetricPoint, error) {
// QueryAll retrieves all metric types for a resource within a time range, with optional downsampling
func (s *Store) QueryAll(resourceType, resourceID string, start, end time.Time, stepSecs int64) (map[string][]MetricPoint, error) {
tier := s.selectTier(end.Sub(start))
rows, err := s.db.Query(`
var rows *sql.Rows
var err error
sqlQuery := `
SELECT metric_type, timestamp, value, COALESCE(min_value, value), COALESCE(max_value, value)
FROM metrics
WHERE resource_type = ? AND resource_id = ? AND tier = ?
AND timestamp >= ? AND timestamp <= ?
ORDER BY metric_type, timestamp ASC
`, resourceType, resourceID, string(tier), start.Unix(), end.Unix())
if err != nil {
`
queryParams := []interface{}{resourceType, resourceID, string(tier), start.Unix(), end.Unix()}
if stepSecs > 1 {
sqlQuery = `
SELECT
metric_type,
(timestamp / ?) * ? + (? / 2) as bucket_ts,
AVG(value),
MIN(COALESCE(min_value, value)),
MAX(COALESCE(max_value, value))
FROM metrics
WHERE resource_type = ? AND resource_id = ? AND tier = ?
AND timestamp >= ? AND timestamp <= ?
GROUP BY metric_type, bucket_ts
ORDER BY metric_type, bucket_ts ASC
`
queryParams = []interface{}{
stepSecs, stepSecs, stepSecs,
resourceType, resourceID, string(tier), start.Unix(), end.Unix(),
}
}
// Retry on SQLITE_BUSY
for i := 0; i < 5; i++ {
rows, err = s.db.Query(sqlQuery, queryParams...)
if err == nil {
break
}
if i < 4 && (err.Error() == "database is locked" || err.Error() == "sql: database is closed") {
time.Sleep(time.Duration(100*(i+1)) * time.Millisecond)
continue
}
return nil, fmt.Errorf("failed to query all metrics: %w", err)
}
defer rows.Close()
@@ -304,13 +436,24 @@ func (s *Store) QueryAll(resourceType, resourceID string, start, end time.Time)
}
// selectTier chooses the appropriate data tier based on time range
// Note: Tier selection uses fixed thresholds to ensure queries use tiers with complete data:
// - Raw: up to 2 hours (high-resolution real-time data)
// - Minute: up to 6 hours (recent detailed data)
// - Hourly: up to 7 days (medium-term with mock/seeded data coverage)
// - Daily: beyond 7 days (long-term historical data)
func (s *Store) selectTier(duration time.Duration) Tier {
const (
rawThreshold = 2 * time.Hour
minuteThreshold = 6 * time.Hour // 24h queries use hourly tier which has complete historical data
hourlyThreshold = 7 * 24 * time.Hour
)
switch {
case duration <= s.config.RetentionRaw:
case duration <= rawThreshold:
return TierRaw
case duration <= s.config.RetentionMinute:
case duration <= minuteThreshold:
return TierMinute
case duration <= s.config.RetentionHourly:
case duration <= hourlyThreshold:
return TierHourly
default:
return TierDaily
@@ -334,8 +477,16 @@ func (s *Store) backgroundWorker() {
case <-s.stopCh:
// Final flush before stopping
s.Flush()
// Process remaining writes
close(s.writeCh)
for batch := range s.writeCh {
s.writeBatch(batch)
}
return
case batch := <-s.writeCh:
s.writeBatch(batch)
case <-flushTicker.C:
s.Flush()
@@ -502,6 +653,12 @@ func (s *Store) runRetention() {
}
}
// SetMaxOpenConns sets the maximum number of open connections to the database.
func (s *Store) SetMaxOpenConns(n int) {
s.db.SetMaxOpenConns(n)
s.db.SetMaxIdleConns(n)
}
// Close shuts down the store gracefully
func (s *Store) Close() error {
s.stopOnce.Do(func() {

View File

@@ -28,7 +28,7 @@ func TestStoreWriteBatchAndQuery(t *testing.T) {
{resourceType: "vm", resourceID: "vm-101", metricType: "cpu", value: 2.5, timestamp: ts.Add(1 * time.Second)},
})
points, err := store.Query("vm", "vm-101", "cpu", ts.Add(-1*time.Second), ts.Add(2*time.Second))
points, err := store.Query("vm", "vm-101", "cpu", ts.Add(-1*time.Second), ts.Add(2*time.Second), 0)
if err != nil {
t.Fatalf("Query returned error: %v", err)
}
@@ -39,7 +39,7 @@ func TestStoreWriteBatchAndQuery(t *testing.T) {
t.Fatalf("unexpected query values: %+v", points)
}
all, err := store.QueryAll("vm", "vm-101", ts.Add(-1*time.Second), ts.Add(2*time.Second))
all, err := store.QueryAll("vm", "vm-101", ts.Add(-1*time.Second), ts.Add(2*time.Second), 0)
if err != nil {
t.Fatalf("QueryAll returned error: %v", err)
}
@@ -220,7 +220,7 @@ func TestStoreWriteFlushesBuffer(t *testing.T) {
deadline := time.Now().Add(500 * time.Millisecond)
for time.Now().Before(deadline) {
points, err := store.Query("vm", "vm-101", "cpu", ts.Add(-time.Second), ts.Add(time.Second))
points, err := store.Query("vm", "vm-101", "cpu", ts.Add(-time.Second), ts.Add(time.Second), 0)
if err == nil && len(points) == 1 {
return
}
@@ -229,3 +229,34 @@ func TestStoreWriteFlushesBuffer(t *testing.T) {
t.Fatal("expected buffered metric to flush to database")
}
func TestStoreQueryDownsampling(t *testing.T) {
dir := t.TempDir()
store, err := NewStore(DefaultConfig(dir))
if err != nil {
t.Fatalf("NewStore returned error: %v", err)
}
defer store.Close()
start := time.Unix(1000, 0)
for i := 0; i < 10; i++ {
store.writeBatch([]bufferedMetric{
{resourceType: "vm", resourceID: "v1", metricType: "cpu", value: float64(i * 10), timestamp: start.Add(time.Duration(i) * time.Minute)},
})
}
// Query with 5m step
points, err := store.Query("vm", "v1", "cpu", start.Add(-1*time.Hour), start.Add(1*time.Hour), 300)
if err != nil {
t.Fatalf("Query downsampled failed: %v", err)
}
// 10 minutes of data at 1m resolution (10 points)
// Bucketed by 5m (300s):
// Buckets: [1000-1300), [1300-1600), [1600-1900)
// Points at: 1000, 1060, 1120, 1180, 1240 (5 points) -> Bucket 1000
// Points at: 1300, 1360, 1420, 1480, 1540 (5 points) -> Bucket 1300
if len(points) != 2 {
t.Fatalf("expected 2 bucketed points, got %d", len(points))
}
}