feat(sensor-proxy): Phase 2 - atomic config management with CLI

Implements bullet-proof configuration management to completely eliminate
allowed_nodes corruption by design. This builds on Phase 1 (file-only mode)
by replacing all shell/Python config manipulation with proper Go tooling.

**New Features:**
- `pulse-sensor-proxy config validate` - parse and validate config files
- `pulse-sensor-proxy config set-allowed-nodes` - atomic node list updates
- File locking via flock prevents concurrent write races
- Atomic writes (temp file + rename) ensure consistency
- systemd ExecStartPre validation prevents startup with bad config

**Architectural Changes:**
1. Installer now calls config CLI instead of embedded Python/shell scripts
2. All config mutations go through single authoritative writer
3. Deduplication and normalization handled in Go (reuses existing logic)
4. Sanitizer kept as noisy failsafe (warns if corruption still occurs)

**Implementation Details:**
- New cmd/pulse-sensor-proxy/config_cmd.go with cobra commands
- withLockedFile() wrapper ensures exclusive access
- atomicWriteFile() uses temp + rename pattern
- Installer update_allowed_nodes() simplified to CLI calls
- Both systemd service modes include ExecStartPre validation

**Why This Works:**
- Single code path for all writes (no shell/Python divergence)
- File locking serializes self-heal timer + manual installer runs
- Validation gate prevents proxy from starting with corrupt config
- CLI uses same YAML parser as the daemon (guaranteed compatibility)

**Phase 2 Benefits:**
- Corruption impossible by design (not just detected and fixed)
- No more Python dependency for config management
- Atomic operations prevent partial writes
- Clear error messages on validation failures

The defensive sanitizer remains active but now logs loudly if triggered,
allowing us to confirm Phase 2 eliminates corruption in production before
removing the safety net entirely.

This completes the fix for the recurring temperature monitoring outages.

Related to Phase 1 commit 53dec6010
This commit is contained in:
rcourtman
2025-11-19 09:37:49 +00:00
parent 53dec6010c
commit 3dc073a285
3 changed files with 357 additions and 56 deletions

View File

@@ -552,15 +552,17 @@ func sanitizeDuplicateAllowedNodesBlocks(path string, data []byte) (bool, []byte
Str("config_file", path).
Msg("Failed to rewrite sanitized configuration; using in-memory copy only")
} else {
// Phase 2: This sanitizer should never fire with the new config CLI
// If it does, it indicates a bug or manual file editing
log.Warn().
Str("config_file", path).
Int("removed_duplicate_blocks", len(matches)-1).
Msg("Detected duplicate allowed_nodes blocks and sanitized configuration automatically")
Msg("CONFIG SANITIZED duplicate allowed_nodes blocks removed; this should not happen with Phase 2 config CLI please report if you see this")
}
} else {
log.Warn().
Int("removed_duplicate_blocks", len(matches)-1).
Msg("Detected duplicate allowed_nodes blocks and sanitized configuration (in-memory only)")
Msg("CONFIG SANITIZED duplicate allowed_nodes blocks detected in-memory this should not happen with Phase 2 config CLI")
}
return true, cleaned

View File

@@ -0,0 +1,331 @@
package main
import (
"fmt"
"os"
"path/filepath"
"github.com/spf13/cobra"
"golang.org/x/sys/unix"
"gopkg.in/yaml.v3"
)
var (
// Config command flags
configPathFlag string
allowedNodesPathFlag string
mergeNodesFlag []string
replaceMode bool
)
var configCmd = &cobra.Command{
Use: "config",
Short: "Manage sensor proxy configuration",
Long: `Atomic configuration management for pulse-sensor-proxy`,
}
var validateCmd = &cobra.Command{
Use: "validate",
Short: "Validate configuration files",
Long: `Parse and validate config.yaml and allowed_nodes.yaml files`,
RunE: func(cmd *cobra.Command, args []string) error {
cfgPath := configPathFlag
if cfgPath == "" {
cfgPath = defaultConfigPath
}
allowedNodesPath := allowedNodesPathFlag
if allowedNodesPath == "" {
allowedNodesPath = filepath.Join(filepath.Dir(cfgPath), "allowed_nodes.yaml")
}
if err := validateConfigFile(cfgPath); err != nil {
fmt.Fprintf(os.Stderr, "Config validation failed: %v\n", err)
return err
}
// Check if allowed_nodes.yaml exists and validate it
if _, err := os.Stat(allowedNodesPath); err == nil {
if err := validateAllowedNodesFile(allowedNodesPath); err != nil {
fmt.Fprintf(os.Stderr, "Allowed nodes validation failed: %v\n", err)
return err
}
}
fmt.Println("Configuration valid")
return nil
},
}
var setAllowedNodesCmd = &cobra.Command{
Use: "set-allowed-nodes",
Short: "Atomically update allowed_nodes.yaml",
Long: `Merge or replace allowed nodes with atomic writes and file locking.
Examples:
# Merge new nodes into existing list
pulse-sensor-proxy config set-allowed-nodes --merge 192.168.0.1 --merge node1.local
# Replace entire list
pulse-sensor-proxy config set-allowed-nodes --replace --merge 192.168.0.1 --merge 192.168.0.2
`,
RunE: func(cmd *cobra.Command, args []string) error {
allowedNodesPath := allowedNodesPathFlag
if allowedNodesPath == "" {
// Default to /etc/pulse-sensor-proxy/allowed_nodes.yaml
allowedNodesPath = "/etc/pulse-sensor-proxy/allowed_nodes.yaml"
}
if len(mergeNodesFlag) == 0 {
return fmt.Errorf("no nodes specified (use --merge flag)")
}
if err := setAllowedNodes(allowedNodesPath, mergeNodesFlag, replaceMode); err != nil {
fmt.Fprintf(os.Stderr, "Failed to update allowed nodes: %v\n", err)
return err
}
if replaceMode {
fmt.Printf("Replaced allowed nodes with %d entries\n", len(mergeNodesFlag))
} else {
fmt.Printf("Merged %d nodes into allowed nodes list\n", len(mergeNodesFlag))
}
return nil
},
}
func init() {
// Add subcommands to config command
configCmd.AddCommand(validateCmd)
configCmd.AddCommand(setAllowedNodesCmd)
// Validate command flags
validateCmd.Flags().StringVar(&configPathFlag, "config", "", "Path to config.yaml (default: /etc/pulse-sensor-proxy/config.yaml)")
validateCmd.Flags().StringVar(&allowedNodesPathFlag, "allowed-nodes", "", "Path to allowed_nodes.yaml (default: same dir as config)")
// Set-allowed-nodes command flags
setAllowedNodesCmd.Flags().StringVar(&allowedNodesPathFlag, "allowed-nodes", "", "Path to allowed_nodes.yaml (default: /etc/pulse-sensor-proxy/allowed_nodes.yaml)")
setAllowedNodesCmd.Flags().StringSliceVar(&mergeNodesFlag, "merge", []string{}, "Node to merge (can be specified multiple times)")
setAllowedNodesCmd.Flags().BoolVar(&replaceMode, "replace", false, "Replace entire list instead of merging")
// Add config command to root
rootCmd.AddCommand(configCmd)
}
// validateConfigFile parses and validates the main config file
func validateConfigFile(path string) error {
data, err := os.ReadFile(path)
if err != nil {
return fmt.Errorf("failed to read config file: %w", err)
}
// Check for duplicate allowed_nodes blocks (the issue we're fixing)
sanitized, cleanData := sanitizeDuplicateAllowedNodesBlocks("", data)
if sanitized {
return fmt.Errorf("config contains duplicate allowed_nodes blocks (would auto-fix on service start)")
}
// Parse YAML
cfg := &Config{}
if err := yaml.Unmarshal(cleanData, cfg); err != nil {
return fmt.Errorf("failed to parse config YAML: %w", err)
}
// Validate required fields
if cfg.ReadTimeout <= 0 {
return fmt.Errorf("read_timeout must be positive")
}
if cfg.WriteTimeout <= 0 {
return fmt.Errorf("write_timeout must be positive")
}
return nil
}
// validateAllowedNodesFile parses and validates the allowed_nodes.yaml file
func validateAllowedNodesFile(path string) error {
data, err := os.ReadFile(path)
if err != nil {
return fmt.Errorf("failed to read allowed_nodes file: %w", err)
}
// Parse YAML - can be either a dict with allowed_nodes key or a list
var result interface{}
if err := yaml.Unmarshal(data, &result); err != nil {
return fmt.Errorf("failed to parse allowed_nodes YAML: %w", err)
}
// Extract nodes
var nodes []string
switch v := result.(type) {
case map[string]interface{}:
if nodeList, ok := v["allowed_nodes"]; ok {
if list, ok := nodeList.([]interface{}); ok {
for _, item := range list {
if s, ok := item.(string); ok && s != "" {
nodes = append(nodes, s)
}
}
}
}
case []interface{}:
for _, item := range v {
if s, ok := item.(string); ok && s != "" {
nodes = append(nodes, s)
}
}
}
if len(nodes) == 0 {
return fmt.Errorf("allowed_nodes file is empty or invalid")
}
return nil
}
// setAllowedNodes atomically updates the allowed_nodes.yaml file with file locking
func setAllowedNodes(path string, newNodes []string, replace bool) error {
// Create directory if it doesn't exist
dir := filepath.Dir(path)
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("failed to create config directory: %w", err)
}
// Use file locking to prevent concurrent writes
return withLockedFile(path, func(f *os.File) error {
var existing []string
// Read existing nodes if not in replace mode
if !replace {
if data, err := os.ReadFile(path); err == nil {
existing = extractNodesFromYAML(data)
}
}
// Merge and deduplicate
merged := normalizeNodes(append(existing, newNodes...))
// Serialize to YAML
output := map[string]interface{}{
"allowed_nodes": merged,
}
data, err := yaml.Marshal(output)
if err != nil {
return fmt.Errorf("failed to marshal YAML: %w", err)
}
// Add header comment
header := "# Managed by pulse-sensor-proxy config CLI\n# Do not edit manually while service is running\n"
finalData := []byte(header + string(data))
// Write atomically
return atomicWriteFile(path, finalData, 0644)
})
}
// withLockedFile opens a file with exclusive locking and runs a callback
func withLockedFile(path string, fn func(f *os.File) error) error {
// Open or create the file
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
}
defer f.Close()
// Acquire exclusive lock
if err := unix.Flock(int(f.Fd()), unix.LOCK_EX); err != nil {
return fmt.Errorf("failed to acquire file lock: %w", err)
}
defer unix.Flock(int(f.Fd()), unix.LOCK_UN) //nolint:errcheck
// Run callback
return fn(f)
}
// atomicWriteFile writes data to a file atomically using temp file + rename
func atomicWriteFile(path string, data []byte, perm os.FileMode) error {
dir := filepath.Dir(path)
// Create temp file in same directory
tmp, err := os.CreateTemp(dir, ".tmp-*")
if err != nil {
return fmt.Errorf("failed to create temp file: %w", err)
}
tmpPath := tmp.Name()
// Clean up temp file on error
defer func() {
if tmpPath != "" {
os.Remove(tmpPath)
}
}()
// Write data
if _, err := tmp.Write(data); err != nil {
tmp.Close()
return fmt.Errorf("failed to write temp file: %w", err)
}
// Sync to disk
if err := tmp.Sync(); err != nil {
tmp.Close()
return fmt.Errorf("failed to sync temp file: %w", err)
}
// Close temp file
if err := tmp.Close(); err != nil {
return fmt.Errorf("failed to close temp file: %w", err)
}
// Set permissions
if err := os.Chmod(tmpPath, perm); err != nil {
return fmt.Errorf("failed to set permissions: %w", err)
}
// Atomic rename
if err := os.Rename(tmpPath, path); err != nil {
return fmt.Errorf("failed to rename temp file: %w", err)
}
// Mark temp file as successfully moved (don't delete in defer)
tmpPath = ""
// Sync directory to ensure rename is persisted
dirFile, err := os.Open(dir)
if err == nil {
dirFile.Sync() //nolint:errcheck
dirFile.Close()
}
return nil
}
// extractNodesFromYAML extracts node list from YAML data
func extractNodesFromYAML(data []byte) []string {
var result interface{}
if err := yaml.Unmarshal(data, &result); err != nil {
return nil
}
var nodes []string
switch v := result.(type) {
case map[string]interface{}:
if nodeList, ok := v["allowed_nodes"]; ok {
if list, ok := nodeList.([]interface{}); ok {
for _, item := range list {
if s, ok := item.(string); ok && s != "" {
nodes = append(nodes, s)
}
}
}
}
case []interface{}:
for _, item := range v {
if s, ok := item.(string); ok && s != "" {
nodes = append(nodes, s)
}
}
}
return nodes
}

View File

@@ -650,66 +650,30 @@ update_allowed_nodes() {
local nodes=("$@")
# File mode is now required - inline mode has been removed
# Phase 2: Use config CLI instead of shell/Python manipulation
ensure_allowed_nodes_file_reference
remove_allowed_nodes_block
if ! command -v python3 >/dev/null 2>&1; then
print_warn "python3 is required to manage allowed_nodes; skipping update"
# Build --merge flags for the CLI
local merge_args=()
for node in "${nodes[@]}"; do
if [[ -n "$node" ]]; then
merge_args+=(--merge "$node")
fi
done
if [[ ${#merge_args[@]} -eq 0 ]]; then
return
fi
python3 - "$ALLOWED_NODES_FILE" "$comment_line" "${nodes[@]}" <<'PY'
import sys
from pathlib import Path
import yaml
path = Path(sys.argv[1])
comment = sys.argv[2]
new_nodes = [n.strip() for n in sys.argv[3:] if n.strip()]
existing = []
if path.exists():
text = path.read_text()
try:
data = yaml.safe_load(text)
except yaml.YAMLError:
data = None
if isinstance(data, dict):
arr = data.get('allowed_nodes')
if isinstance(arr, list):
existing = [str(x).strip() for x in arr if str(x).strip()]
elif isinstance(data, list):
existing = [str(x).strip() for x in data if str(x).strip()]
else:
for line in text.splitlines():
line = line.strip()
if not line or line.startswith('#'):
continue
if line.startswith('-'):
line = line[1:].strip()
if line:
existing.append(line)
seen = set()
merged = []
for entry in existing + new_nodes:
entry = entry.strip()
if not entry:
continue
key = entry.lower()
if key in seen:
continue
seen.add(key)
merged.append(entry)
path.parent.mkdir(parents=True, exist_ok=True)
with path.open('w') as fh:
fh.write("# Managed by install-sensor-proxy.sh\n")
fh.write(f"# {comment}\n")
yaml.safe_dump({'allowed_nodes': merged}, fh, default_flow_style=False, sort_keys=False)
PY
chmod 0644 "$ALLOWED_NODES_FILE" 2>/dev/null || true
chown pulse-sensor-proxy:pulse-sensor-proxy "$ALLOWED_NODES_FILE" 2>/dev/null || true
# Use the config CLI for atomic, locked updates
if "$BINARY_PATH" config set-allowed-nodes --allowed-nodes "$ALLOWED_NODES_FILE" "${merge_args[@]}"; then
chmod 0644 "$ALLOWED_NODES_FILE" 2>/dev/null || true
chown pulse-sensor-proxy:pulse-sensor-proxy "$ALLOWED_NODES_FILE" 2>/dev/null || true
else
print_error "Failed to update allowed_nodes using config CLI"
return 1
fi
}
@@ -1958,6 +1922,8 @@ Type=simple
User=pulse-sensor-proxy
Group=pulse-sensor-proxy
WorkingDirectory=/var/lib/pulse-sensor-proxy
# Validate config before starting (Phase 2: prevent corruption from starting service)
ExecStartPre=${BINARY_PATH} config validate --config /etc/pulse-sensor-proxy/config.yaml
ExecStart=${BINARY_PATH} --config /etc/pulse-sensor-proxy/config.yaml
Restart=on-failure
RestartSec=5s
@@ -2017,6 +1983,8 @@ User=pulse-sensor-proxy
Group=pulse-sensor-proxy
SupplementaryGroups=www-data
WorkingDirectory=/var/lib/pulse-sensor-proxy
# Validate config before starting (Phase 2: prevent corruption from starting service)
ExecStartPre=${BINARY_PATH} config validate --config /etc/pulse-sensor-proxy/config.yaml
ExecStart=${BINARY_PATH}
Restart=on-failure
RestartSec=5s