Files
Pulse/internal/monitoring/task_queue.go
rcourtman 160adeb3b8 feat: add scheduler health API endpoint (Phase 2 Task 8)
Task 8 of 10 complete. Exposes read-only scheduler health data including:
- Queue depth and distribution by instance type
- Dead-letter queue inspection (top 25 tasks with error details)
- Circuit breaker states (instance-level)
- Staleness scores per instance

New API endpoint:
  GET /api/monitoring/scheduler/health (requires authentication)

New snapshot methods:
- StalenessTracker.Snapshot() - exports all staleness data
- TaskQueue.Snapshot() - queue depth & per-type distribution
- TaskQueue.PeekAll() - dead-letter task inspection
- circuitBreaker.State() - exports state, failures, retryAt
- Monitor.SchedulerHealth() - aggregates all health data

Documentation updated with API spec, field descriptions, and usage examples.
2025-10-20 15:13:38 +00:00

217 lines
4.6 KiB
Go

package monitoring
import (
"container/heap"
"context"
"sync"
"time"
)
type scheduledTaskEntry struct {
task ScheduledTask
index int
}
func (e *scheduledTaskEntry) key() string {
return schedulerKey(e.task.InstanceType, e.task.InstanceName)
}
type taskHeap []*scheduledTaskEntry
func (h taskHeap) Len() int { return len(h) }
func (h taskHeap) Less(i, j int) bool {
if h[i].task.NextRun.Equal(h[j].task.NextRun) {
if h[i].task.Priority == h[j].task.Priority {
return h[i].task.InstanceName < h[j].task.InstanceName
}
return h[i].task.Priority > h[j].task.Priority
}
return h[i].task.NextRun.Before(h[j].task.NextRun)
}
func (h taskHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
func (h *taskHeap) Push(x interface{}) {
entry := x.(*scheduledTaskEntry)
entry.index = len(*h)
*h = append(*h, entry)
}
func (h *taskHeap) Pop() interface{} {
old := *h
n := len(old)
if n == 0 {
return nil
}
entry := old[n-1]
entry.index = -1
*h = old[:n-1]
return entry
}
// TaskQueue is a thread-safe min-heap over scheduled tasks.
type TaskQueue struct {
mu sync.Mutex
entries map[string]*scheduledTaskEntry
heap taskHeap
}
// NewTaskQueue constructs an empty queue.
func NewTaskQueue() *TaskQueue {
tq := &TaskQueue{
entries: make(map[string]*scheduledTaskEntry),
heap: make(taskHeap, 0),
}
heap.Init(&tq.heap)
return tq
}
// Upsert inserts or updates a scheduled task in the queue.
func (q *TaskQueue) Upsert(task ScheduledTask) {
key := schedulerKey(task.InstanceType, task.InstanceName)
q.mu.Lock()
defer q.mu.Unlock()
if entry, ok := q.entries[key]; ok {
entry.task = task
heap.Fix(&q.heap, entry.index)
return
}
entry := &scheduledTaskEntry{task: task}
heap.Push(&q.heap, entry)
q.entries[key] = entry
}
// Remove deletes a task by key if present.
func (q *TaskQueue) Remove(instanceType InstanceType, instance string) {
key := schedulerKey(instanceType, instance)
q.mu.Lock()
defer q.mu.Unlock()
entry, ok := q.entries[key]
if !ok {
return
}
heap.Remove(&q.heap, entry.index)
delete(q.entries, key)
}
// WaitNext blocks until a task is due or context is cancelled.
func (q *TaskQueue) WaitNext(ctx context.Context) (ScheduledTask, bool) {
for {
select {
case <-ctx.Done():
return ScheduledTask{}, false
default:
}
q.mu.Lock()
if len(q.heap) == 0 {
q.mu.Unlock()
select {
case <-ctx.Done():
return ScheduledTask{}, false
case <-time.After(100 * time.Millisecond):
continue
}
}
entry := q.heap[0]
delay := time.Until(entry.task.NextRun)
if delay <= 0 {
heap.Pop(&q.heap)
delete(q.entries, entry.key())
task := entry.task
q.mu.Unlock()
return task, true
}
q.mu.Unlock()
if delay > 250*time.Millisecond {
delay = 250 * time.Millisecond
}
timer := time.NewTimer(delay)
select {
case <-ctx.Done():
timer.Stop()
return ScheduledTask{}, false
case <-timer.C:
}
}
}
// Size returns the number of tasks currently queued.
func (q *TaskQueue) Size() int {
q.mu.Lock()
defer q.mu.Unlock()
return len(q.heap)
}
// QueueSnapshot represents the current state of the task queue.
type QueueSnapshot struct {
Depth int `json:"depth"`
DueWithinSeconds int `json:"dueWithinSeconds"`
PerType map[string]int `json:"perType"`
}
// Snapshot returns a snapshot of the queue state for API exposure.
func (q *TaskQueue) Snapshot() QueueSnapshot {
q.mu.Lock()
defer q.mu.Unlock()
snapshot := QueueSnapshot{
Depth: len(q.heap),
PerType: make(map[string]int),
}
now := time.Now()
for _, entry := range q.heap {
typeStr := string(entry.task.InstanceType)
snapshot.PerType[typeStr]++
if entry.task.NextRun.Sub(now) <= 12*time.Second {
snapshot.DueWithinSeconds++
}
}
return snapshot
}
// DeadLetterTask represents a task in the dead-letter queue.
type DeadLetterTask struct {
Instance string `json:"instance"`
Type string `json:"type"`
NextRun time.Time `json:"nextRun"`
LastError string `json:"lastError,omitempty"`
Failures int `json:"failures"`
}
// PeekAll returns up to 'limit' dead-letter tasks for inspection.
func (q *TaskQueue) PeekAll(limit int) []DeadLetterTask {
q.mu.Lock()
defer q.mu.Unlock()
if limit <= 0 || limit > len(q.heap) {
limit = len(q.heap)
}
result := make([]DeadLetterTask, 0, limit)
for i := 0; i < limit && i < len(q.heap); i++ {
entry := q.heap[i]
result = append(result, DeadLetterTask{
Instance: entry.task.InstanceName,
Type: string(entry.task.InstanceType),
NextRun: entry.task.NextRun,
Failures: 0, // will be populated by Monitor
})
}
return result
}