Files
Pulse/internal/monitoring/task_queue_test.go

803 lines
19 KiB
Go

package monitoring
import (
"testing"
"time"
)
func TestTaskQueue_Snapshot(t *testing.T) {
tests := []struct {
name string
tasks []ScheduledTask
wantDepth int
wantDueWithinSeconds int
wantPerType map[string]int
}{
{
name: "empty queue",
tasks: []ScheduledTask{},
wantDepth: 0,
wantDueWithinSeconds: 0,
wantPerType: map[string]int{},
},
{
name: "single task due soon (within 12 seconds)",
tasks: []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(5 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
},
wantDepth: 1,
wantDueWithinSeconds: 1,
wantPerType: map[string]int{
"pve": 1,
},
},
{
name: "single task NOT due soon (>12 seconds away)",
tasks: []ScheduledTask{
{
InstanceName: "pbs-1",
InstanceType: InstanceTypePBS,
NextRun: time.Now().Add(20 * time.Second),
Interval: 60 * time.Second,
Priority: 1.0,
},
},
wantDepth: 1,
wantDueWithinSeconds: 0,
wantPerType: map[string]int{
"pbs": 1,
},
},
{
name: "multiple tasks of same type",
tasks: []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(5 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(15 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-3",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
},
wantDepth: 3,
wantDueWithinSeconds: 2, // pve-1 and pve-3 are within 12 seconds
wantPerType: map[string]int{
"pve": 3,
},
},
{
name: "multiple tasks of different types",
tasks: []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(5 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pbs-1",
InstanceType: InstanceTypePBS,
NextRun: time.Now().Add(8 * time.Second),
Interval: 60 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pmg-1",
InstanceType: InstanceTypePMG,
NextRun: time.Now().Add(25 * time.Second),
Interval: 45 * time.Second,
Priority: 1.0,
},
},
wantDepth: 3,
wantDueWithinSeconds: 2, // pve-1 and pbs-1 are within 12 seconds
wantPerType: map[string]int{
"pve": 1,
"pbs": 1,
"pmg": 1,
},
},
{
name: "boundary case: task exactly 12 seconds away",
tasks: []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(12 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
},
wantDepth: 1,
wantDueWithinSeconds: 1, // <= 12 seconds should be included
wantPerType: map[string]int{
"pve": 1,
},
},
{
name: "mix of due-soon and not-due-soon tasks",
tasks: []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(1 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(50 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pbs-1",
InstanceType: InstanceTypePBS,
NextRun: time.Now().Add(11 * time.Second),
Interval: 60 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pbs-2",
InstanceType: InstanceTypePBS,
NextRun: time.Now().Add(100 * time.Second),
Interval: 60 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pmg-1",
InstanceType: InstanceTypePMG,
NextRun: time.Now().Add(30 * time.Second),
Interval: 45 * time.Second,
Priority: 1.0,
},
},
wantDepth: 5,
wantDueWithinSeconds: 2, // pve-1 and pbs-1 are within 12 seconds
wantPerType: map[string]int{
"pve": 2,
"pbs": 2,
"pmg": 1,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
queue := NewTaskQueue()
// Add all tasks to the queue
for _, task := range tt.tasks {
queue.Upsert(task)
}
// Get snapshot
snapshot := queue.Snapshot()
// Verify depth
if snapshot.Depth != tt.wantDepth {
t.Errorf("Depth = %d, want %d", snapshot.Depth, tt.wantDepth)
}
// Verify DueWithinSeconds
if snapshot.DueWithinSeconds != tt.wantDueWithinSeconds {
t.Errorf("DueWithinSeconds = %d, want %d", snapshot.DueWithinSeconds, tt.wantDueWithinSeconds)
}
// Verify PerType map
if len(snapshot.PerType) != len(tt.wantPerType) {
t.Errorf("PerType has %d entries, want %d", len(snapshot.PerType), len(tt.wantPerType))
}
for typeStr, wantCount := range tt.wantPerType {
gotCount, ok := snapshot.PerType[typeStr]
if !ok {
t.Errorf("PerType missing entry for %s", typeStr)
continue
}
if gotCount != wantCount {
t.Errorf("PerType[%s] = %d, want %d", typeStr, gotCount, wantCount)
}
}
// Verify no extra keys in PerType
for typeStr := range snapshot.PerType {
if _, ok := tt.wantPerType[typeStr]; !ok {
t.Errorf("PerType has unexpected entry for %s", typeStr)
}
}
})
}
}
func TestTaskQueue_Upsert(t *testing.T) {
t.Run("insert into empty queue", func(t *testing.T) {
queue := NewTaskQueue()
task := ScheduledTask{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
}
queue.Upsert(task)
if queue.Size() != 1 {
t.Errorf("Size() = %d, want 1", queue.Size())
}
verifyHeapInvariant(t, queue)
})
t.Run("upsert existing entry with different NextRun", func(t *testing.T) {
queue := NewTaskQueue()
task1 := ScheduledTask{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
}
queue.Upsert(task1)
// Update same task with different NextRun
task2 := ScheduledTask{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(20 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
}
queue.Upsert(task2)
if queue.Size() != 1 {
t.Errorf("Size() = %d, want 1 (not 2)", queue.Size())
}
// Verify the task was updated
queue.mu.Lock()
key := schedulerKey(InstanceTypePVE, "pve-1")
entry := queue.entries[key]
if !entry.task.NextRun.Equal(task2.NextRun) {
t.Errorf("NextRun not updated: got %v, want %v", entry.task.NextRun, task2.NextRun)
}
queue.mu.Unlock()
verifyHeapInvariant(t, queue)
})
t.Run("insert multiple entries - verify heap ordering", func(t *testing.T) {
queue := NewTaskQueue()
now := time.Now()
tasks := []ScheduledTask{
{
InstanceName: "pve-3",
InstanceType: InstanceTypePVE,
NextRun: now.Add(30 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: now.Add(20 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
}
for _, task := range tasks {
queue.Upsert(task)
}
if queue.Size() != 3 {
t.Errorf("Size() = %d, want 3", queue.Size())
}
// Verify heap root is earliest NextRun
queue.mu.Lock()
root := queue.heap[0]
if root.task.InstanceName != "pve-1" {
t.Errorf("heap root = %s, want pve-1 (earliest NextRun)", root.task.InstanceName)
}
queue.mu.Unlock()
verifyHeapInvariant(t, queue)
})
t.Run("upsert changes heap position - earlier NextRun", func(t *testing.T) {
queue := NewTaskQueue()
now := time.Now()
tasks := []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: now.Add(20 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-3",
InstanceType: InstanceTypePVE,
NextRun: now.Add(30 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
}
for _, task := range tasks {
queue.Upsert(task)
}
// Update pve-3 to have earliest NextRun
updatedTask := ScheduledTask{
InstanceName: "pve-3",
InstanceType: InstanceTypePVE,
NextRun: now.Add(5 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
}
queue.Upsert(updatedTask)
if queue.Size() != 3 {
t.Errorf("Size() = %d, want 3", queue.Size())
}
// Verify pve-3 is now at heap root
queue.mu.Lock()
root := queue.heap[0]
if root.task.InstanceName != "pve-3" {
t.Errorf("heap root = %s, want pve-3 (updated to earliest)", root.task.InstanceName)
}
queue.mu.Unlock()
verifyHeapInvariant(t, queue)
})
t.Run("upsert changes heap position - later NextRun", func(t *testing.T) {
queue := NewTaskQueue()
now := time.Now()
tasks := []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: now.Add(20 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-3",
InstanceType: InstanceTypePVE,
NextRun: now.Add(30 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
}
for _, task := range tasks {
queue.Upsert(task)
}
// Update pve-1 to have latest NextRun
updatedTask := ScheduledTask{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(40 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
}
queue.Upsert(updatedTask)
if queue.Size() != 3 {
t.Errorf("Size() = %d, want 3", queue.Size())
}
// Verify pve-2 is now at heap root
queue.mu.Lock()
root := queue.heap[0]
if root.task.InstanceName != "pve-2" {
t.Errorf("heap root = %s, want pve-2 (pve-1 moved down)", root.task.InstanceName)
}
queue.mu.Unlock()
verifyHeapInvariant(t, queue)
})
t.Run("upsert with priority ordering", func(t *testing.T) {
queue := NewTaskQueue()
now := time.Now()
// Same NextRun, different priorities
tasks := []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 0.5,
},
{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: now.Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
}
for _, task := range tasks {
queue.Upsert(task)
}
// Higher priority should be at root
queue.mu.Lock()
root := queue.heap[0]
if root.task.InstanceName != "pve-2" {
t.Errorf("heap root = %s, want pve-2 (higher priority)", root.task.InstanceName)
}
queue.mu.Unlock()
verifyHeapInvariant(t, queue)
})
}
func TestTaskQueue_Remove(t *testing.T) {
t.Run("remove from empty queue", func(t *testing.T) {
queue := NewTaskQueue()
// Should not panic
queue.Remove(InstanceTypePVE, "pve-1")
if queue.Size() != 0 {
t.Errorf("Size() = %d, want 0", queue.Size())
}
})
t.Run("remove non-existent key", func(t *testing.T) {
queue := NewTaskQueue()
task := ScheduledTask{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
}
queue.Upsert(task)
// Remove different instance
queue.Remove(InstanceTypePVE, "pve-2")
if queue.Size() != 1 {
t.Errorf("Size() = %d, want 1 (pve-1 should still exist)", queue.Size())
}
verifyHeapInvariant(t, queue)
})
t.Run("remove only entry", func(t *testing.T) {
queue := NewTaskQueue()
task := ScheduledTask{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
}
queue.Upsert(task)
queue.Remove(InstanceTypePVE, "pve-1")
if queue.Size() != 0 {
t.Errorf("Size() = %d, want 0", queue.Size())
}
queue.mu.Lock()
if len(queue.entries) != 0 {
t.Errorf("entries map has %d items, want 0", len(queue.entries))
}
if len(queue.heap) != 0 {
t.Errorf("heap has %d items, want 0", len(queue.heap))
}
queue.mu.Unlock()
})
t.Run("remove entry from middle of queue", func(t *testing.T) {
queue := NewTaskQueue()
now := time.Now()
tasks := []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: now.Add(20 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-3",
InstanceType: InstanceTypePVE,
NextRun: now.Add(30 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
}
for _, task := range tasks {
queue.Upsert(task)
}
// Remove middle entry
queue.Remove(InstanceTypePVE, "pve-2")
if queue.Size() != 2 {
t.Errorf("Size() = %d, want 2", queue.Size())
}
// Verify pve-2 is not in entries map
queue.mu.Lock()
key := schedulerKey(InstanceTypePVE, "pve-2")
if _, exists := queue.entries[key]; exists {
t.Errorf("pve-2 still exists in entries map")
}
queue.mu.Unlock()
verifyHeapInvariant(t, queue)
})
t.Run("remove heap root", func(t *testing.T) {
queue := NewTaskQueue()
now := time.Now()
tasks := []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: now.Add(20 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-3",
InstanceType: InstanceTypePVE,
NextRun: now.Add(30 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
}
for _, task := range tasks {
queue.Upsert(task)
}
// Remove root (pve-1 has earliest NextRun)
queue.Remove(InstanceTypePVE, "pve-1")
if queue.Size() != 2 {
t.Errorf("Size() = %d, want 2", queue.Size())
}
// Verify pve-2 is now the root
queue.mu.Lock()
root := queue.heap[0]
if root.task.InstanceName != "pve-2" {
t.Errorf("heap root = %s, want pve-2 (next earliest)", root.task.InstanceName)
}
queue.mu.Unlock()
verifyHeapInvariant(t, queue)
})
t.Run("remove all entries sequentially", func(t *testing.T) {
queue := NewTaskQueue()
now := time.Now()
tasks := []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: now.Add(20 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-3",
InstanceType: InstanceTypePVE,
NextRun: now.Add(30 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
}
for _, task := range tasks {
queue.Upsert(task)
}
// Remove each entry and verify invariant after each removal
queue.Remove(InstanceTypePVE, "pve-2")
if queue.Size() != 2 {
t.Errorf("Size after removing pve-2 = %d, want 2", queue.Size())
}
verifyHeapInvariant(t, queue)
queue.Remove(InstanceTypePVE, "pve-1")
if queue.Size() != 1 {
t.Errorf("Size after removing pve-1 = %d, want 1", queue.Size())
}
verifyHeapInvariant(t, queue)
queue.Remove(InstanceTypePVE, "pve-3")
if queue.Size() != 0 {
t.Errorf("Size after removing pve-3 = %d, want 0", queue.Size())
}
verifyHeapInvariant(t, queue)
})
t.Run("remove different instance types", func(t *testing.T) {
queue := NewTaskQueue()
now := time.Now()
tasks := []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pbs-1",
InstanceType: InstanceTypePBS,
NextRun: now.Add(20 * time.Second),
Interval: 60 * time.Second,
Priority: 1.0,
},
}
for _, task := range tasks {
queue.Upsert(task)
}
// Remove PBS instance
queue.Remove(InstanceTypePBS, "pbs-1")
if queue.Size() != 1 {
t.Errorf("Size() = %d, want 1", queue.Size())
}
// Verify PVE instance still exists
queue.mu.Lock()
key := schedulerKey(InstanceTypePVE, "pve-1")
if _, exists := queue.entries[key]; !exists {
t.Errorf("pve-1 should still exist in entries map")
}
queue.mu.Unlock()
verifyHeapInvariant(t, queue)
})
}
// verifyHeapInvariant checks that the heap maintains its invariants:
// 1. len(entries) matches heap size
// 2. Each entry's index matches its actual position in heap
// 3. Heap property: parent is less than or equal to children
func verifyHeapInvariant(t *testing.T, queue *TaskQueue) {
t.Helper()
queue.mu.Lock()
defer queue.mu.Unlock()
// Check entries count matches heap size
if len(queue.entries) != len(queue.heap) {
t.Errorf("entries count %d != heap size %d", len(queue.entries), len(queue.heap))
}
// Check each entry's index is correct
for _, entry := range queue.heap {
if entry.index < 0 || entry.index >= len(queue.heap) {
t.Errorf("entry %s has invalid index %d (heap size: %d)", entry.key(), entry.index, len(queue.heap))
continue
}
if queue.heap[entry.index] != entry {
t.Errorf("entry %s has index %d but is not at that position in heap", entry.key(), entry.index)
}
}
// Check all entries in map are also in heap
for key, entry := range queue.entries {
found := false
for _, heapEntry := range queue.heap {
if heapEntry == entry {
found = true
break
}
}
if !found {
t.Errorf("entry %s in map but not in heap", key)
}
}
// Check heap property: parent <= children
for i := 0; i < len(queue.heap); i++ {
leftChild := 2*i + 1
rightChild := 2*i + 2
if leftChild < len(queue.heap) {
if queue.heap.Less(leftChild, i) {
t.Errorf("heap violation: child at %d is less than parent at %d", leftChild, i)
}
}
if rightChild < len(queue.heap) {
if queue.heap.Less(rightChild, i) {
t.Errorf("heap violation: child at %d is less than parent at %d", rightChild, i)
}
}
}
}