mirror of
https://github.com/rcourtman/Pulse.git
synced 2026-02-18 00:17:39 +01:00
refactor(42-background-queues): guard shutdown writes in pkg/metrics
This commit is contained in:
@@ -10,6 +10,7 @@ import (
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
@@ -92,6 +93,7 @@ type Store struct {
|
||||
stopCh chan struct{}
|
||||
doneCh chan struct{}
|
||||
stopOnce sync.Once
|
||||
stopping atomic.Bool
|
||||
}
|
||||
|
||||
// NewStore creates a new metrics store with the given configuration
|
||||
@@ -197,9 +199,17 @@ func (s *Store) Write(resourceType, resourceID, metricType string, value float64
|
||||
|
||||
// WriteWithTier adds a metric to the write buffer with a specific tier
|
||||
func (s *Store) WriteWithTier(resourceType, resourceID, metricType string, value float64, timestamp time.Time, tier Tier) {
|
||||
if s.stopping.Load() {
|
||||
return
|
||||
}
|
||||
|
||||
s.bufferMu.Lock()
|
||||
defer s.bufferMu.Unlock()
|
||||
|
||||
if s.stopping.Load() {
|
||||
return
|
||||
}
|
||||
|
||||
s.buffer = append(s.buffer, bufferedMetric{
|
||||
resourceType: resourceType,
|
||||
resourceID: resourceID,
|
||||
@@ -811,6 +821,7 @@ func (s *Store) SetMaxOpenConns(n int) {
|
||||
// Close shuts down the store gracefully
|
||||
func (s *Store) Close() error {
|
||||
s.stopOnce.Do(func() {
|
||||
s.stopping.Store(true)
|
||||
close(s.stopCh)
|
||||
})
|
||||
|
||||
|
||||
@@ -111,3 +111,35 @@ func TestStoreGetMetaIntInvalid(t *testing.T) {
|
||||
t.Fatalf("expected getMetaInt to fail for invalid int, got %d", val)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStoreWriteAfterCloseDoesNotPanic(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
cfg := DefaultConfig(dir)
|
||||
cfg.DBPath = filepath.Join(dir, "metrics-after-close.db")
|
||||
cfg.WriteBufferSize = 1
|
||||
cfg.FlushInterval = time.Hour
|
||||
|
||||
store, err := NewStore(cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("NewStore returned error: %v", err)
|
||||
}
|
||||
|
||||
if err := store.Close(); err != nil {
|
||||
t.Fatalf("Close returned error: %v", err)
|
||||
}
|
||||
|
||||
var panicked bool
|
||||
func() {
|
||||
defer func() {
|
||||
if recover() != nil {
|
||||
panicked = true
|
||||
}
|
||||
}()
|
||||
store.Write("vm", "v1", "cpu", 1.0, time.Now())
|
||||
store.Flush()
|
||||
}()
|
||||
|
||||
if panicked {
|
||||
t.Fatal("expected writes after close to be ignored without panic")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user