Fix offline buffering: add tests, remove unused config, fix flaky test

- Add unit tests for internal/buffer package
- Fix misleading "ring buffer" comment (it's a bounded FIFO queue)
- Remove unused BufferCapacity config field from both agents
- Rewrite flaky integration test to use polling instead of fixed sleeps
This commit is contained in:
rcourtman
2025-12-02 22:23:29 +00:00
parent caf0c10206
commit b4a33c4f2d
5 changed files with 251 additions and 75 deletions

View File

@@ -4,7 +4,7 @@ import (
"sync"
)
// Queue is a thread-safe ring buffer for storing failed reports.
// Queue is a thread-safe bounded FIFO queue. When full, the oldest item is dropped.
type Queue[T any] struct {
mu sync.Mutex
data []T

View File

@@ -0,0 +1,165 @@
package buffer
import (
"sync"
"testing"
)
func TestNew(t *testing.T) {
q := New[int](5)
if q.capacity != 5 {
t.Errorf("expected capacity 5, got %d", q.capacity)
}
if q.Len() != 0 {
t.Errorf("expected empty queue, got len %d", q.Len())
}
}
func TestPushPop(t *testing.T) {
q := New[int](3)
q.Push(1)
q.Push(2)
q.Push(3)
if q.Len() != 3 {
t.Errorf("expected len 3, got %d", q.Len())
}
val, ok := q.Pop()
if !ok || val != 1 {
t.Errorf("expected (1, true), got (%d, %v)", val, ok)
}
val, ok = q.Pop()
if !ok || val != 2 {
t.Errorf("expected (2, true), got (%d, %v)", val, ok)
}
val, ok = q.Pop()
if !ok || val != 3 {
t.Errorf("expected (3, true), got (%d, %v)", val, ok)
}
val, ok = q.Pop()
if ok {
t.Errorf("expected (_, false), got (%d, %v)", val, ok)
}
}
func TestPushDropsOldest(t *testing.T) {
q := New[int](3)
q.Push(1)
q.Push(2)
q.Push(3)
q.Push(4) // should drop 1
if q.Len() != 3 {
t.Errorf("expected len 3, got %d", q.Len())
}
val, ok := q.Pop()
if !ok || val != 2 {
t.Errorf("expected (2, true), got (%d, %v)", val, ok)
}
val, ok = q.Pop()
if !ok || val != 3 {
t.Errorf("expected (3, true), got (%d, %v)", val, ok)
}
val, ok = q.Pop()
if !ok || val != 4 {
t.Errorf("expected (4, true), got (%d, %v)", val, ok)
}
}
func TestPeek(t *testing.T) {
q := New[string](2)
_, ok := q.Peek()
if ok {
t.Error("expected Peek on empty queue to return false")
}
q.Push("a")
q.Push("b")
val, ok := q.Peek()
if !ok || val != "a" {
t.Errorf("expected (a, true), got (%s, %v)", val, ok)
}
// Peek should not remove
if q.Len() != 2 {
t.Errorf("Peek should not modify queue, len is %d", q.Len())
}
}
func TestIsEmpty(t *testing.T) {
q := New[int](2)
if !q.IsEmpty() {
t.Error("new queue should be empty")
}
q.Push(1)
if q.IsEmpty() {
t.Error("queue with item should not be empty")
}
q.Pop()
if !q.IsEmpty() {
t.Error("queue after pop should be empty")
}
}
func TestConcurrentAccess(t *testing.T) {
q := New[int](100)
var wg sync.WaitGroup
// Concurrent pushes
for i := 0; i < 10; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
for j := 0; j < 20; j++ {
q.Push(n*20 + j)
}
}(i)
}
// Concurrent pops
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 10; j++ {
q.Pop()
}
}()
}
wg.Wait()
// Should not panic and queue should be in valid state
_ = q.Len()
_ = q.IsEmpty()
}
func TestCapacityOne(t *testing.T) {
q := New[int](1)
q.Push(1)
q.Push(2) // drops 1
if q.Len() != 1 {
t.Errorf("expected len 1, got %d", q.Len())
}
val, ok := q.Pop()
if !ok || val != 2 {
t.Errorf("expected (2, true), got (%d, %v)", val, ok)
}
}

View File

@@ -62,7 +62,6 @@ type Config struct {
CollectDiskMetrics bool
LogLevel zerolog.Level
Logger *zerolog.Logger
BufferCapacity int // Number of reports to buffer when offline (default: 60)
}
var allowedContainerStates = map[string]string{
@@ -238,10 +237,7 @@ func New(cfg Config) (*Agent, error) {
agentVersion = Version
}
bufferCapacity := cfg.BufferCapacity
if bufferCapacity <= 0 {
bufferCapacity = 60
}
const bufferCapacity = 60
agent := &Agent{
cfg: cfg,

View File

@@ -35,7 +35,6 @@ type Config struct {
RunOnce bool
LogLevel zerolog.Level
Logger *zerolog.Logger
BufferCapacity int // Number of reports to buffer when offline (default: 60)
}
// Agent is responsible for collecting host metrics and shipping them to Pulse.
@@ -167,10 +166,7 @@ func New(cfg Config) (*Agent, error) {
agentVersion = Version
}
bufferCapacity := cfg.BufferCapacity
if bufferCapacity <= 0 {
bufferCapacity = 60
}
const bufferCapacity = 60
return &Agent{
cfg: cfg,

View File

@@ -13,66 +13,56 @@ import (
"github.com/rs/zerolog"
)
type testWriter struct {
t *testing.T
}
func (w *testWriter) Write(p []byte) (n int, err error) {
w.t.Log(string(p))
return len(p), nil
}
func TestAgentBuffering(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
// 1. Setup Mock Server
var (
mu sync.Mutex
receivedReports []host.Report
shouldFail bool
failedAttempts int
)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mu.Lock()
defer mu.Unlock()
fail := shouldFail
mu.Unlock()
t.Logf("Server received request: %s %s", r.Method, r.URL.Path)
if shouldFail {
t.Log("Server simulating failure (500)")
w.WriteHeader(http.StatusInternalServerError)
if r.URL.Path != "/api/agents/host/report" {
w.WriteHeader(http.StatusNotFound)
return
}
if r.URL.Path != "/api/agents/host/report" {
t.Logf("Server 404 for path: %s", r.URL.Path)
w.WriteHeader(http.StatusNotFound)
if fail {
mu.Lock()
failedAttempts++
mu.Unlock()
w.WriteHeader(http.StatusInternalServerError)
return
}
var report host.Report
if err := json.NewDecoder(r.Body).Decode(&report); err != nil {
t.Logf("Server failed to decode body: %v", err)
w.WriteHeader(http.StatusBadRequest)
return
}
t.Logf("Server accepted report from %s", report.Host.Hostname)
mu.Lock()
receivedReports = append(receivedReports, report)
mu.Unlock()
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
// 2. Configure Agent
// Use testWriter to capture logs in test output
logger := zerolog.New(zerolog.ConsoleWriter{Out: &testWriter{t}}).Level(zerolog.DebugLevel)
logger := zerolog.New(zerolog.NewConsoleWriter()).Level(zerolog.WarnLevel)
cfg := Config{
PulseURL: server.URL,
APIToken: "test-token",
Interval: 250 * time.Millisecond,
Interval: 50 * time.Millisecond,
HostnameOverride: "test-host",
Logger: &logger,
}
@@ -82,75 +72,104 @@ func TestAgentBuffering(t *testing.T) {
t.Fatalf("Failed to create agent: %v", err)
}
// 3. Run Agent in background
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
t.Log("Starting agent...")
if err := agent.Run(ctx); err != nil && err != context.Canceled {
t.Errorf("Agent run failed: %v", err)
}
t.Log("Agent stopped")
_ = agent.Run(ctx)
}()
// 4. Wait for initial successful report
t.Log("Waiting for initial report...")
time.Sleep(2 * time.Second)
// Wait for condition with polling
waitFor := func(cond func() bool, timeout time.Duration) bool {
deadline := time.After(timeout)
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
if cond() {
return true
}
select {
case <-ticker.C:
case <-deadline:
return false
}
}
}
// Wait for first successful report
if !waitFor(func() bool {
mu.Lock()
defer mu.Unlock()
return len(receivedReports) >= 1
}, 10*time.Second) {
t.Fatal("Timed out waiting for initial report")
}
mu.Lock()
initialCount := len(receivedReports)
mu.Unlock()
t.Logf("Initial reports: %d", initialCount)
t.Logf("Initial reports received: %d", initialCount)
if initialCount == 0 {
t.Fatal("Expected at least one initial report")
}
// 5. Simulate Outage
t.Log("Simulating outage...")
// Simulate outage
mu.Lock()
shouldFail = true
mu.Unlock()
// Wait for a few cycles (should buffer)
time.Sleep(3 * time.Second)
// Wait for at least 3 failed attempts (these should be buffered)
if !waitFor(func() bool {
mu.Lock()
defer mu.Unlock()
return failedAttempts >= 3
}, 10*time.Second) {
mu.Lock()
got := failedAttempts
mu.Unlock()
t.Fatalf("Timed out waiting for failed attempts: got %d", got)
}
mu.Lock()
// Should not have received any new reports during outage
if len(receivedReports) > initialCount {
t.Errorf("Received reports during outage! Expected %d, got %d", initialCount, len(receivedReports))
}
countDuringOutage := len(receivedReports)
mu.Unlock()
// 6. Recover
t.Log("Recovering server...")
if countDuringOutage != initialCount {
t.Errorf("Reports received during outage: expected %d, got %d", initialCount, countDuringOutage)
}
// Check buffer has items
bufferLen := agent.reportBuffer.Len()
if bufferLen == 0 {
t.Fatal("Buffer should have items after failed sends")
}
t.Logf("Buffer has %d items after outage", bufferLen)
// Recover
mu.Lock()
shouldFail = false
mu.Unlock()
// Wait for flush (flush happens after next successful report)
time.Sleep(3 * time.Second)
// Stop agent
cancel()
wg.Wait()
// Wait for buffer to empty (flush complete)
if !waitFor(func() bool {
return agent.reportBuffer.IsEmpty()
}, 10*time.Second) {
t.Fatalf("Timed out waiting for buffer to empty, still has %d items", agent.reportBuffer.Len())
}
// Get final count before stopping
mu.Lock()
finalCount := len(receivedReports)
mu.Unlock()
t.Logf("Final reports received: %d", finalCount)
cancel()
wg.Wait()
// We expect: initial + (buffered during outage) + (new ones after recovery)
// If collection is slow (e.g. 1s), we might get 1-2 buffered.
// We just want to ensure we got MORE than just the initial ones.
// And ideally more than just initial + 1 (which would be just the next report).
if finalCount <= initialCount+1 {
t.Errorf("Buffered reports were not flushed? Initial: %d, Final: %d", initialCount, finalCount)
// Verify we received the buffered reports
if finalCount < initialCount+bufferLen {
t.Errorf("Expected at least %d reports (initial %d + buffered %d), got %d",
initialCount+bufferLen, initialCount, bufferLen, finalCount)
}
t.Logf("Test passed. Initial: %d, Final: %d", initialCount, finalCount)
t.Logf("Initial: %d, Buffered: %d, Final: %d", initialCount, bufferLen, finalCount)
}