Files
Pulse/internal/monitoring/task_queue_test.go
rcourtman 42ef819943 test: Add WaitNext and key() tests for TaskQueue
Tests cover context cancellation, immediately due tasks, waiting for
future tasks, empty queue timeout, and multiple task priority ordering.
WaitNext coverage 0%→92.3%, key() 0%→100%. Monitoring package 52.3%→52.9%.
2025-12-02 12:02:13 +00:00

1134 lines
27 KiB
Go

package monitoring
import (
"context"
"testing"
"time"
)
func TestTaskQueue_Snapshot(t *testing.T) {
tests := []struct {
name string
tasks []ScheduledTask
wantDepth int
wantDueWithinSeconds int
wantPerType map[string]int
}{
{
name: "empty queue",
tasks: []ScheduledTask{},
wantDepth: 0,
wantDueWithinSeconds: 0,
wantPerType: map[string]int{},
},
{
name: "single task due soon (within 12 seconds)",
tasks: []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(5 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
},
wantDepth: 1,
wantDueWithinSeconds: 1,
wantPerType: map[string]int{
"pve": 1,
},
},
{
name: "single task NOT due soon (>12 seconds away)",
tasks: []ScheduledTask{
{
InstanceName: "pbs-1",
InstanceType: InstanceTypePBS,
NextRun: time.Now().Add(20 * time.Second),
Interval: 60 * time.Second,
Priority: 1.0,
},
},
wantDepth: 1,
wantDueWithinSeconds: 0,
wantPerType: map[string]int{
"pbs": 1,
},
},
{
name: "multiple tasks of same type",
tasks: []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(5 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(15 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-3",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
},
wantDepth: 3,
wantDueWithinSeconds: 2, // pve-1 and pve-3 are within 12 seconds
wantPerType: map[string]int{
"pve": 3,
},
},
{
name: "multiple tasks of different types",
tasks: []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(5 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pbs-1",
InstanceType: InstanceTypePBS,
NextRun: time.Now().Add(8 * time.Second),
Interval: 60 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pmg-1",
InstanceType: InstanceTypePMG,
NextRun: time.Now().Add(25 * time.Second),
Interval: 45 * time.Second,
Priority: 1.0,
},
},
wantDepth: 3,
wantDueWithinSeconds: 2, // pve-1 and pbs-1 are within 12 seconds
wantPerType: map[string]int{
"pve": 1,
"pbs": 1,
"pmg": 1,
},
},
{
name: "boundary case: task exactly 12 seconds away",
tasks: []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(12 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
},
wantDepth: 1,
wantDueWithinSeconds: 1, // <= 12 seconds should be included
wantPerType: map[string]int{
"pve": 1,
},
},
{
name: "mix of due-soon and not-due-soon tasks",
tasks: []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(1 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(50 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pbs-1",
InstanceType: InstanceTypePBS,
NextRun: time.Now().Add(11 * time.Second),
Interval: 60 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pbs-2",
InstanceType: InstanceTypePBS,
NextRun: time.Now().Add(100 * time.Second),
Interval: 60 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pmg-1",
InstanceType: InstanceTypePMG,
NextRun: time.Now().Add(30 * time.Second),
Interval: 45 * time.Second,
Priority: 1.0,
},
},
wantDepth: 5,
wantDueWithinSeconds: 2, // pve-1 and pbs-1 are within 12 seconds
wantPerType: map[string]int{
"pve": 2,
"pbs": 2,
"pmg": 1,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
queue := NewTaskQueue()
// Add all tasks to the queue
for _, task := range tt.tasks {
queue.Upsert(task)
}
// Get snapshot
snapshot := queue.Snapshot()
// Verify depth
if snapshot.Depth != tt.wantDepth {
t.Errorf("Depth = %d, want %d", snapshot.Depth, tt.wantDepth)
}
// Verify DueWithinSeconds
if snapshot.DueWithinSeconds != tt.wantDueWithinSeconds {
t.Errorf("DueWithinSeconds = %d, want %d", snapshot.DueWithinSeconds, tt.wantDueWithinSeconds)
}
// Verify PerType map
if len(snapshot.PerType) != len(tt.wantPerType) {
t.Errorf("PerType has %d entries, want %d", len(snapshot.PerType), len(tt.wantPerType))
}
for typeStr, wantCount := range tt.wantPerType {
gotCount, ok := snapshot.PerType[typeStr]
if !ok {
t.Errorf("PerType missing entry for %s", typeStr)
continue
}
if gotCount != wantCount {
t.Errorf("PerType[%s] = %d, want %d", typeStr, gotCount, wantCount)
}
}
// Verify no extra keys in PerType
for typeStr := range snapshot.PerType {
if _, ok := tt.wantPerType[typeStr]; !ok {
t.Errorf("PerType has unexpected entry for %s", typeStr)
}
}
})
}
}
func TestTaskQueue_Upsert(t *testing.T) {
t.Run("insert into empty queue", func(t *testing.T) {
queue := NewTaskQueue()
task := ScheduledTask{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
}
queue.Upsert(task)
if queue.Size() != 1 {
t.Errorf("Size() = %d, want 1", queue.Size())
}
verifyHeapInvariant(t, queue)
})
t.Run("upsert existing entry with different NextRun", func(t *testing.T) {
queue := NewTaskQueue()
task1 := ScheduledTask{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
}
queue.Upsert(task1)
// Update same task with different NextRun
task2 := ScheduledTask{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(20 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
}
queue.Upsert(task2)
if queue.Size() != 1 {
t.Errorf("Size() = %d, want 1 (not 2)", queue.Size())
}
// Verify the task was updated
queue.mu.Lock()
key := schedulerKey(InstanceTypePVE, "pve-1")
entry := queue.entries[key]
if !entry.task.NextRun.Equal(task2.NextRun) {
t.Errorf("NextRun not updated: got %v, want %v", entry.task.NextRun, task2.NextRun)
}
queue.mu.Unlock()
verifyHeapInvariant(t, queue)
})
t.Run("insert multiple entries - verify heap ordering", func(t *testing.T) {
queue := NewTaskQueue()
now := time.Now()
tasks := []ScheduledTask{
{
InstanceName: "pve-3",
InstanceType: InstanceTypePVE,
NextRun: now.Add(30 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: now.Add(20 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
}
for _, task := range tasks {
queue.Upsert(task)
}
if queue.Size() != 3 {
t.Errorf("Size() = %d, want 3", queue.Size())
}
// Verify heap root is earliest NextRun
queue.mu.Lock()
root := queue.heap[0]
if root.task.InstanceName != "pve-1" {
t.Errorf("heap root = %s, want pve-1 (earliest NextRun)", root.task.InstanceName)
}
queue.mu.Unlock()
verifyHeapInvariant(t, queue)
})
t.Run("upsert changes heap position - earlier NextRun", func(t *testing.T) {
queue := NewTaskQueue()
now := time.Now()
tasks := []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: now.Add(20 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-3",
InstanceType: InstanceTypePVE,
NextRun: now.Add(30 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
}
for _, task := range tasks {
queue.Upsert(task)
}
// Update pve-3 to have earliest NextRun
updatedTask := ScheduledTask{
InstanceName: "pve-3",
InstanceType: InstanceTypePVE,
NextRun: now.Add(5 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
}
queue.Upsert(updatedTask)
if queue.Size() != 3 {
t.Errorf("Size() = %d, want 3", queue.Size())
}
// Verify pve-3 is now at heap root
queue.mu.Lock()
root := queue.heap[0]
if root.task.InstanceName != "pve-3" {
t.Errorf("heap root = %s, want pve-3 (updated to earliest)", root.task.InstanceName)
}
queue.mu.Unlock()
verifyHeapInvariant(t, queue)
})
t.Run("upsert changes heap position - later NextRun", func(t *testing.T) {
queue := NewTaskQueue()
now := time.Now()
tasks := []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: now.Add(20 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-3",
InstanceType: InstanceTypePVE,
NextRun: now.Add(30 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
}
for _, task := range tasks {
queue.Upsert(task)
}
// Update pve-1 to have latest NextRun
updatedTask := ScheduledTask{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(40 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
}
queue.Upsert(updatedTask)
if queue.Size() != 3 {
t.Errorf("Size() = %d, want 3", queue.Size())
}
// Verify pve-2 is now at heap root
queue.mu.Lock()
root := queue.heap[0]
if root.task.InstanceName != "pve-2" {
t.Errorf("heap root = %s, want pve-2 (pve-1 moved down)", root.task.InstanceName)
}
queue.mu.Unlock()
verifyHeapInvariant(t, queue)
})
t.Run("upsert with priority ordering", func(t *testing.T) {
queue := NewTaskQueue()
now := time.Now()
// Same NextRun, different priorities
tasks := []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 0.5,
},
{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: now.Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
}
for _, task := range tasks {
queue.Upsert(task)
}
// Higher priority should be at root
queue.mu.Lock()
root := queue.heap[0]
if root.task.InstanceName != "pve-2" {
t.Errorf("heap root = %s, want pve-2 (higher priority)", root.task.InstanceName)
}
queue.mu.Unlock()
verifyHeapInvariant(t, queue)
})
t.Run("upsert with same time and priority - uses instance name for tiebreak", func(t *testing.T) {
queue := NewTaskQueue()
now := time.Now()
sameTime := now.Add(10 * time.Second)
// Same NextRun, same priorities - should use InstanceName for tiebreak
tasks := []ScheduledTask{
{
InstanceName: "pve-zebra",
InstanceType: InstanceTypePVE,
NextRun: sameTime,
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-alpha",
InstanceType: InstanceTypePVE,
NextRun: sameTime,
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-middle",
InstanceType: InstanceTypePVE,
NextRun: sameTime,
Interval: 30 * time.Second,
Priority: 1.0,
},
}
for _, task := range tasks {
queue.Upsert(task)
}
// When NextRun and Priority are equal, alphabetically earlier name should be first
queue.mu.Lock()
root := queue.heap[0]
if root.task.InstanceName != "pve-alpha" {
t.Errorf("heap root = %s, want pve-alpha (alphabetically first when time and priority equal)", root.task.InstanceName)
}
queue.mu.Unlock()
verifyHeapInvariant(t, queue)
})
}
func TestTaskQueue_Remove(t *testing.T) {
t.Run("remove from empty queue", func(t *testing.T) {
queue := NewTaskQueue()
// Should not panic
queue.Remove(InstanceTypePVE, "pve-1")
if queue.Size() != 0 {
t.Errorf("Size() = %d, want 0", queue.Size())
}
})
t.Run("remove non-existent key", func(t *testing.T) {
queue := NewTaskQueue()
task := ScheduledTask{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
}
queue.Upsert(task)
// Remove different instance
queue.Remove(InstanceTypePVE, "pve-2")
if queue.Size() != 1 {
t.Errorf("Size() = %d, want 1 (pve-1 should still exist)", queue.Size())
}
verifyHeapInvariant(t, queue)
})
t.Run("remove only entry", func(t *testing.T) {
queue := NewTaskQueue()
task := ScheduledTask{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
}
queue.Upsert(task)
queue.Remove(InstanceTypePVE, "pve-1")
if queue.Size() != 0 {
t.Errorf("Size() = %d, want 0", queue.Size())
}
queue.mu.Lock()
if len(queue.entries) != 0 {
t.Errorf("entries map has %d items, want 0", len(queue.entries))
}
if len(queue.heap) != 0 {
t.Errorf("heap has %d items, want 0", len(queue.heap))
}
queue.mu.Unlock()
})
t.Run("remove entry from middle of queue", func(t *testing.T) {
queue := NewTaskQueue()
now := time.Now()
tasks := []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: now.Add(20 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-3",
InstanceType: InstanceTypePVE,
NextRun: now.Add(30 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
}
for _, task := range tasks {
queue.Upsert(task)
}
// Remove middle entry
queue.Remove(InstanceTypePVE, "pve-2")
if queue.Size() != 2 {
t.Errorf("Size() = %d, want 2", queue.Size())
}
// Verify pve-2 is not in entries map
queue.mu.Lock()
key := schedulerKey(InstanceTypePVE, "pve-2")
if _, exists := queue.entries[key]; exists {
t.Errorf("pve-2 still exists in entries map")
}
queue.mu.Unlock()
verifyHeapInvariant(t, queue)
})
t.Run("remove heap root", func(t *testing.T) {
queue := NewTaskQueue()
now := time.Now()
tasks := []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: now.Add(20 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-3",
InstanceType: InstanceTypePVE,
NextRun: now.Add(30 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
}
for _, task := range tasks {
queue.Upsert(task)
}
// Remove root (pve-1 has earliest NextRun)
queue.Remove(InstanceTypePVE, "pve-1")
if queue.Size() != 2 {
t.Errorf("Size() = %d, want 2", queue.Size())
}
// Verify pve-2 is now the root
queue.mu.Lock()
root := queue.heap[0]
if root.task.InstanceName != "pve-2" {
t.Errorf("heap root = %s, want pve-2 (next earliest)", root.task.InstanceName)
}
queue.mu.Unlock()
verifyHeapInvariant(t, queue)
})
t.Run("remove all entries sequentially", func(t *testing.T) {
queue := NewTaskQueue()
now := time.Now()
tasks := []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: now.Add(20 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pve-3",
InstanceType: InstanceTypePVE,
NextRun: now.Add(30 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
}
for _, task := range tasks {
queue.Upsert(task)
}
// Remove each entry and verify invariant after each removal
queue.Remove(InstanceTypePVE, "pve-2")
if queue.Size() != 2 {
t.Errorf("Size after removing pve-2 = %d, want 2", queue.Size())
}
verifyHeapInvariant(t, queue)
queue.Remove(InstanceTypePVE, "pve-1")
if queue.Size() != 1 {
t.Errorf("Size after removing pve-1 = %d, want 1", queue.Size())
}
verifyHeapInvariant(t, queue)
queue.Remove(InstanceTypePVE, "pve-3")
if queue.Size() != 0 {
t.Errorf("Size after removing pve-3 = %d, want 0", queue.Size())
}
verifyHeapInvariant(t, queue)
})
t.Run("remove different instance types", func(t *testing.T) {
queue := NewTaskQueue()
now := time.Now()
tasks := []ScheduledTask{
{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(10 * time.Second),
Interval: 30 * time.Second,
Priority: 1.0,
},
{
InstanceName: "pbs-1",
InstanceType: InstanceTypePBS,
NextRun: now.Add(20 * time.Second),
Interval: 60 * time.Second,
Priority: 1.0,
},
}
for _, task := range tasks {
queue.Upsert(task)
}
// Remove PBS instance
queue.Remove(InstanceTypePBS, "pbs-1")
if queue.Size() != 1 {
t.Errorf("Size() = %d, want 1", queue.Size())
}
// Verify PVE instance still exists
queue.mu.Lock()
key := schedulerKey(InstanceTypePVE, "pve-1")
if _, exists := queue.entries[key]; !exists {
t.Errorf("pve-1 should still exist in entries map")
}
queue.mu.Unlock()
verifyHeapInvariant(t, queue)
})
}
func TestTaskHeap_Pop(t *testing.T) {
t.Run("pop from empty heap returns nil", func(t *testing.T) {
h := &taskHeap{}
result := h.Pop()
if result != nil {
t.Errorf("Pop() = %v, want nil", result)
}
})
t.Run("pop from single-element heap returns that element and empties heap", func(t *testing.T) {
entry := &scheduledTaskEntry{
task: ScheduledTask{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now(),
Priority: 1.0,
},
index: 0,
}
h := &taskHeap{entry}
result := h.Pop()
if result != entry {
t.Errorf("Pop() returned wrong entry")
}
if entry.index != -1 {
t.Errorf("entry.index = %d, want -1", entry.index)
}
if len(*h) != 0 {
t.Errorf("heap length = %d, want 0", len(*h))
}
})
t.Run("pop from multi-element heap returns last element with index set to -1", func(t *testing.T) {
entry1 := &scheduledTaskEntry{
task: ScheduledTask{
InstanceName: "pve-1",
InstanceType: InstanceTypePVE,
NextRun: time.Now(),
Priority: 1.0,
},
index: 0,
}
entry2 := &scheduledTaskEntry{
task: ScheduledTask{
InstanceName: "pve-2",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(10 * time.Second),
Priority: 1.0,
},
index: 1,
}
entry3 := &scheduledTaskEntry{
task: ScheduledTask{
InstanceName: "pve-3",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(20 * time.Second),
Priority: 1.0,
},
index: 2,
}
h := &taskHeap{entry1, entry2, entry3}
result := h.Pop()
// Pop returns the last element in the slice (entry3)
if result != entry3 {
t.Errorf("Pop() returned wrong entry, got %v want entry3", result)
}
if entry3.index != -1 {
t.Errorf("entry3.index = %d, want -1", entry3.index)
}
if len(*h) != 2 {
t.Errorf("heap length = %d, want 2", len(*h))
}
// entry1 and entry2 should still be in the heap
if (*h)[0] != entry1 || (*h)[1] != entry2 {
t.Errorf("remaining heap entries are wrong")
}
})
}
// verifyHeapInvariant checks that the heap maintains its invariants:
// 1. len(entries) matches heap size
// 2. Each entry's index matches its actual position in heap
// 3. Heap property: parent is less than or equal to children
func verifyHeapInvariant(t *testing.T, queue *TaskQueue) {
t.Helper()
queue.mu.Lock()
defer queue.mu.Unlock()
// Check entries count matches heap size
if len(queue.entries) != len(queue.heap) {
t.Errorf("entries count %d != heap size %d", len(queue.entries), len(queue.heap))
}
// Check each entry's index is correct
for _, entry := range queue.heap {
if entry.index < 0 || entry.index >= len(queue.heap) {
t.Errorf("entry %s has invalid index %d (heap size: %d)", entry.key(), entry.index, len(queue.heap))
continue
}
if queue.heap[entry.index] != entry {
t.Errorf("entry %s has index %d but is not at that position in heap", entry.key(), entry.index)
}
}
// Check all entries in map are also in heap
for key, entry := range queue.entries {
found := false
for _, heapEntry := range queue.heap {
if heapEntry == entry {
found = true
break
}
}
if !found {
t.Errorf("entry %s in map but not in heap", key)
}
}
// Check heap property: parent <= children
for i := 0; i < len(queue.heap); i++ {
leftChild := 2*i + 1
rightChild := 2*i + 2
if leftChild < len(queue.heap) {
if queue.heap.Less(leftChild, i) {
t.Errorf("heap violation: child at %d is less than parent at %d", leftChild, i)
}
}
if rightChild < len(queue.heap) {
if queue.heap.Less(rightChild, i) {
t.Errorf("heap violation: child at %d is less than parent at %d", rightChild, i)
}
}
}
}
// TestScheduledTaskEntry_Key tests the key() method on scheduledTaskEntry
func TestScheduledTaskEntry_Key(t *testing.T) {
t.Parallel()
tests := []struct {
name string
instType InstanceType
instName string
wantKey string
}{
{
name: "PVE instance",
instType: InstanceTypePVE,
instName: "pve1",
wantKey: "pve::pve1",
},
{
name: "PBS instance",
instType: InstanceTypePBS,
instName: "pbs-cluster",
wantKey: "pbs::pbs-cluster",
},
{
name: "PMG instance",
instType: InstanceTypePMG,
instName: "pmg-01",
wantKey: "pmg::pmg-01",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
entry := &scheduledTaskEntry{
task: ScheduledTask{
InstanceType: tt.instType,
InstanceName: tt.instName,
},
}
got := entry.key()
if got != tt.wantKey {
t.Errorf("key() = %q, want %q", got, tt.wantKey)
}
})
}
}
// TestWaitNext_ContextCancelled tests WaitNext returns when context is cancelled
func TestWaitNext_ContextCancelled(t *testing.T) {
t.Parallel()
queue := NewTaskQueue()
// Create a context that's already cancelled
ctx, cancel := context.WithCancel(context.Background())
cancel()
task, ok := queue.WaitNext(ctx)
if ok {
t.Error("WaitNext should return false when context is cancelled")
}
if task.InstanceName != "" {
t.Error("WaitNext should return empty task when context is cancelled")
}
}
// TestWaitNext_ImmediatelyDue tests WaitNext returns immediately for due tasks
func TestWaitNext_ImmediatelyDue(t *testing.T) {
t.Parallel()
queue := NewTaskQueue()
// Add a task that's already due (NextRun in the past)
queue.Upsert(ScheduledTask{
InstanceName: "pve1",
InstanceType: InstanceTypePVE,
NextRun: time.Now().Add(-1 * time.Second),
Interval: 30 * time.Second,
})
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
task, ok := queue.WaitNext(ctx)
if !ok {
t.Fatal("WaitNext should return true for immediately due task")
}
if task.InstanceName != "pve1" {
t.Errorf("expected task pve1, got %s", task.InstanceName)
}
// Task should be removed from queue
if queue.Size() != 0 {
t.Errorf("queue size should be 0 after WaitNext, got %d", queue.Size())
}
}
// TestWaitNext_WaitsForDue tests WaitNext waits until task is due
func TestWaitNext_WaitsForDue(t *testing.T) {
t.Parallel()
queue := NewTaskQueue()
// Add a task due 200ms in the future
dueTime := time.Now().Add(200 * time.Millisecond)
queue.Upsert(ScheduledTask{
InstanceName: "pve1",
InstanceType: InstanceTypePVE,
NextRun: dueTime,
Interval: 30 * time.Second,
})
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
start := time.Now()
task, ok := queue.WaitNext(ctx)
elapsed := time.Since(start)
if !ok {
t.Fatal("WaitNext should return true")
}
if task.InstanceName != "pve1" {
t.Errorf("expected task pve1, got %s", task.InstanceName)
}
// Should have waited at least ~150ms (allowing some tolerance)
if elapsed < 150*time.Millisecond {
t.Errorf("WaitNext returned too early: elapsed %v, expected >= 150ms", elapsed)
}
}
// TestWaitNext_EmptyQueueContextCancel tests WaitNext with empty queue and context cancel
func TestWaitNext_EmptyQueueContextCancel(t *testing.T) {
t.Parallel()
queue := NewTaskQueue()
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
start := time.Now()
task, ok := queue.WaitNext(ctx)
elapsed := time.Since(start)
if ok {
t.Error("WaitNext should return false when context times out on empty queue")
}
if task.InstanceName != "" {
t.Error("WaitNext should return empty task")
}
// Should have waited close to the timeout
if elapsed < 150*time.Millisecond {
t.Errorf("WaitNext returned too early: elapsed %v", elapsed)
}
}
// TestWaitNext_MultipleTasks tests WaitNext returns earliest due task
func TestWaitNext_MultipleTasks(t *testing.T) {
t.Parallel()
queue := NewTaskQueue()
now := time.Now()
// Add tasks with different due times
queue.Upsert(ScheduledTask{
InstanceName: "pve3",
InstanceType: InstanceTypePVE,
NextRun: now.Add(300 * time.Millisecond),
})
queue.Upsert(ScheduledTask{
InstanceName: "pve1",
InstanceType: InstanceTypePVE,
NextRun: now.Add(-10 * time.Millisecond), // Already due
})
queue.Upsert(ScheduledTask{
InstanceName: "pve2",
InstanceType: InstanceTypePVE,
NextRun: now.Add(200 * time.Millisecond),
})
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
// Should return pve1 first (earliest due)
task, ok := queue.WaitNext(ctx)
if !ok {
t.Fatal("WaitNext should return true")
}
if task.InstanceName != "pve1" {
t.Errorf("expected pve1 (earliest), got %s", task.InstanceName)
}
// Queue should still have 2 tasks
if queue.Size() != 2 {
t.Errorf("queue size should be 2, got %d", queue.Size())
}
}