fix(replication): fetch status from per-node endpoint

The /cluster/replication endpoint only returns job configuration (guest,
schedule, source, target), not status data (last_sync, next_sync,
duration, fail_count, state).

This fix enriches each replication job with status from the per-node
endpoint /nodes/{node}/replication/{id}/status to get timing and state
data needed for proper UI display.

Added integration tests to verify:
- Status endpoint is called and data is merged correctly
- Graceful handling when status endpoint fails

Fixes #992
This commit is contained in:
rcourtman
2025-12-31 23:52:37 +00:00
parent 724362504e
commit 567a4ad147
2 changed files with 286 additions and 0 deletions

View File

@@ -42,6 +42,9 @@ type ReplicationJob struct {
}
// GetReplicationStatus returns the replication jobs configured on a PVE instance.
// It fetches job configuration from /cluster/replication and then enriches each
// job with status data (last_sync, next_sync, duration, fail_count, state) from
// /nodes/{node}/replication/{id}/status.
func (c *Client) GetReplicationStatus(ctx context.Context) ([]ReplicationJob, error) {
resp, err := c.get(ctx, "/cluster/replication")
if err != nil {
@@ -62,9 +65,84 @@ func (c *Client) GetReplicationStatus(ctx context.Context) ([]ReplicationJob, er
jobs = append(jobs, parseReplicationJob(entry))
}
// Enrich jobs with status data from the per-node status endpoint
// The /cluster/replication endpoint only returns config, not status
for i := range jobs {
c.enrichReplicationJobStatus(ctx, &jobs[i])
}
return jobs, nil
}
// enrichReplicationJobStatus fetches status data for a replication job from
// /nodes/{node}/replication/{id}/status and merges it into the job struct.
func (c *Client) enrichReplicationJobStatus(ctx context.Context, job *ReplicationJob) {
// Status is stored on the source node
sourceNode := job.Source
if sourceNode == "" {
return
}
jobID := job.ID
if jobID == "" {
return
}
endpoint := fmt.Sprintf("/nodes/%s/replication/%s/status", sourceNode, jobID)
resp, err := c.get(ctx, endpoint)
if err != nil {
// Silently ignore - status endpoint may not be available or job may be new
return
}
defer resp.Body.Close()
var statusResp struct {
Data []map[string]json.RawMessage `json:"data"`
}
if err := json.NewDecoder(resp.Body).Decode(&statusResp); err != nil {
return
}
// The status endpoint returns an array, usually with one entry
if len(statusResp.Data) == 0 {
return
}
status := statusResp.Data[0]
// Parse and merge status fields
if t, unix := parseReplicationTime(decodeRaw(status["last_sync"])); t != nil {
job.LastSyncTime = t
job.LastSyncUnix = unix
}
if t, unix := parseReplicationTime(decodeRaw(status["next_sync"])); t != nil {
job.NextSyncTime = t
job.NextSyncUnix = unix
}
if failCount, ok := intFromAny(decodeRaw(status["fail_count"])); ok {
job.FailCount = failCount
}
if duration, _ := parseDurationSeconds(decodeRaw(status["duration"])); duration > 0 {
job.DurationSeconds = duration
job.LastSyncDurationSeconds = duration
}
if state := stringFromAny(decodeRaw(status["state"])); state != "" {
job.State = state
if job.Status == "" {
job.Status = state
}
}
if errMsg := stringFromAny(decodeRaw(status["error"])); errMsg != "" {
job.Error = errMsg
}
}
func parseReplicationJob(entry map[string]json.RawMessage) ReplicationJob {
job := ReplicationJob{
Enabled: true,

View File

@@ -1,8 +1,12 @@
package proxmox
import (
"context"
"encoding/json"
"fmt"
"math"
"net/http"
"net/http/httptest"
"testing"
"time"
)
@@ -749,3 +753,207 @@ func TestParseReplicationJob_NextSyncFallback(t *testing.T) {
t.Errorf("NextSyncUnix = %d, want %d (from next-sync fallback)", job.NextSyncUnix, 1736936600)
}
}
// TestGetReplicationStatus_EnrichesWithStatusData tests that GetReplicationStatus
// fetches job config from /cluster/replication AND enriches with status data from
// /nodes/{node}/replication/{id}/status. This tests the fix for issue #992.
func TestGetReplicationStatus_EnrichesWithStatusData(t *testing.T) {
// Track which endpoints were called
var calledClusterReplication bool
var calledStatusEndpoint bool
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
switch r.URL.Path {
case "/api2/json/cluster/replication":
calledClusterReplication = true
// Return job config (this is what /cluster/replication returns - NO status data)
fmt.Fprint(w, `{
"data": [
{
"id": "100-0",
"guest": 100,
"source": "pve1",
"target": "pve2",
"schedule": "*/15",
"type": "local"
}
]
}`)
case "/api2/json/nodes/pve1/replication/100-0/status":
calledStatusEndpoint = true
// Return status data (this is what the per-node endpoint returns)
fmt.Fprint(w, `{
"data": [
{
"last_sync": 1735689600,
"next_sync": 1735690500,
"duration": 120,
"fail_count": 0,
"state": "ok"
}
]
}`)
default:
t.Logf("Unexpected request to: %s", r.URL.Path)
w.WriteHeader(http.StatusNotFound)
}
}))
defer server.Close()
cfg := ClientConfig{
Host: server.URL,
TokenName: "test@pve!token",
TokenValue: "secret",
VerifySSL: false,
Timeout: 2 * time.Second,
}
client, err := NewClient(cfg)
if err != nil {
t.Fatalf("NewClient failed: %v", err)
}
ctx := context.Background()
jobs, err := client.GetReplicationStatus(ctx)
if err != nil {
t.Fatalf("GetReplicationStatus failed: %v", err)
}
// Verify both endpoints were called
if !calledClusterReplication {
t.Error("Expected /cluster/replication to be called")
}
if !calledStatusEndpoint {
t.Error("Expected /nodes/pve1/replication/100-0/status to be called")
}
// Verify we got one job
if len(jobs) != 1 {
t.Fatalf("Expected 1 job, got %d", len(jobs))
}
job := jobs[0]
// Verify config fields came from /cluster/replication
if job.ID != "100-0" {
t.Errorf("ID = %q, want %q", job.ID, "100-0")
}
if job.GuestID != 100 {
t.Errorf("GuestID = %d, want %d", job.GuestID, 100)
}
if job.Source != "pve1" {
t.Errorf("Source = %q, want %q", job.Source, "pve1")
}
if job.Target != "pve2" {
t.Errorf("Target = %q, want %q", job.Target, "pve2")
}
if job.Schedule != "*/15" {
t.Errorf("Schedule = %q, want %q", job.Schedule, "*/15")
}
// Verify status fields came from /nodes/{node}/replication/{id}/status
if job.LastSyncUnix != 1735689600 {
t.Errorf("LastSyncUnix = %d, want %d", job.LastSyncUnix, 1735689600)
}
if job.LastSyncTime == nil {
t.Error("LastSyncTime should not be nil")
}
if job.NextSyncUnix != 1735690500 {
t.Errorf("NextSyncUnix = %d, want %d", job.NextSyncUnix, 1735690500)
}
if job.NextSyncTime == nil {
t.Error("NextSyncTime should not be nil")
}
if job.DurationSeconds != 120 {
t.Errorf("DurationSeconds = %d, want %d", job.DurationSeconds, 120)
}
if job.LastSyncDurationSeconds != 120 {
t.Errorf("LastSyncDurationSeconds = %d, want %d", job.LastSyncDurationSeconds, 120)
}
if job.FailCount != 0 {
t.Errorf("FailCount = %d, want %d", job.FailCount, 0)
}
if job.State != "ok" {
t.Errorf("State = %q, want %q", job.State, "ok")
}
}
// TestGetReplicationStatus_StatusEndpointFails tests that GetReplicationStatus
// still returns job config even if the per-node status endpoint fails.
func TestGetReplicationStatus_StatusEndpointFails(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
switch r.URL.Path {
case "/api2/json/cluster/replication":
// Return job config
fmt.Fprint(w, `{
"data": [
{
"id": "100-0",
"guest": 100,
"source": "pve1",
"target": "pve2",
"schedule": "*/15",
"type": "local"
}
]
}`)
case "/api2/json/nodes/pve1/replication/100-0/status":
// Status endpoint fails (404)
w.WriteHeader(http.StatusNotFound)
fmt.Fprint(w, `{"errors": "not found"}`)
default:
w.WriteHeader(http.StatusNotFound)
}
}))
defer server.Close()
cfg := ClientConfig{
Host: server.URL,
TokenName: "test@pve!token",
TokenValue: "secret",
VerifySSL: false,
Timeout: 2 * time.Second,
}
client, err := NewClient(cfg)
if err != nil {
t.Fatalf("NewClient failed: %v", err)
}
ctx := context.Background()
jobs, err := client.GetReplicationStatus(ctx)
if err != nil {
t.Fatalf("GetReplicationStatus failed: %v", err)
}
// Should still get the job config even though status failed
if len(jobs) != 1 {
t.Fatalf("Expected 1 job, got %d", len(jobs))
}
job := jobs[0]
// Config fields should be populated
if job.ID != "100-0" {
t.Errorf("ID = %q, want %q", job.ID, "100-0")
}
if job.GuestID != 100 {
t.Errorf("GuestID = %d, want %d", job.GuestID, 100)
}
// Status fields should be empty/zero (status endpoint failed)
if job.LastSyncUnix != 0 {
t.Errorf("LastSyncUnix = %d, want 0 (status endpoint failed)", job.LastSyncUnix)
}
if job.LastSyncTime != nil {
t.Error("LastSyncTime should be nil (status endpoint failed)")
}
}