mirror of
https://github.com/rcourtman/Pulse.git
synced 2026-02-18 00:17:39 +01:00
feat(sensor-proxy): Phase 2 - atomic config management with CLI
Implements bullet-proof configuration management to completely eliminate
allowed_nodes corruption by design. This builds on Phase 1 (file-only mode)
by replacing all shell/Python config manipulation with proper Go tooling.
**New Features:**
- `pulse-sensor-proxy config validate` - parse and validate config files
- `pulse-sensor-proxy config set-allowed-nodes` - atomic node list updates
- File locking via flock prevents concurrent write races
- Atomic writes (temp file + rename) ensure consistency
- systemd ExecStartPre validation prevents startup with bad config
**Architectural Changes:**
1. Installer now calls config CLI instead of embedded Python/shell scripts
2. All config mutations go through single authoritative writer
3. Deduplication and normalization handled in Go (reuses existing logic)
4. Sanitizer kept as noisy failsafe (warns if corruption still occurs)
**Implementation Details:**
- New cmd/pulse-sensor-proxy/config_cmd.go with cobra commands
- withLockedFile() wrapper ensures exclusive access
- atomicWriteFile() uses temp + rename pattern
- Installer update_allowed_nodes() simplified to CLI calls
- Both systemd service modes include ExecStartPre validation
**Why This Works:**
- Single code path for all writes (no shell/Python divergence)
- File locking serializes self-heal timer + manual installer runs
- Validation gate prevents proxy from starting with corrupt config
- CLI uses same YAML parser as the daemon (guaranteed compatibility)
**Phase 2 Benefits:**
- Corruption impossible by design (not just detected and fixed)
- No more Python dependency for config management
- Atomic operations prevent partial writes
- Clear error messages on validation failures
The defensive sanitizer remains active but now logs loudly if triggered,
allowing us to confirm Phase 2 eliminates corruption in production before
removing the safety net entirely.
This completes the fix for the recurring temperature monitoring outages.
Related to Phase 1 commit 53dec6010
This commit is contained in:
@@ -552,15 +552,17 @@ func sanitizeDuplicateAllowedNodesBlocks(path string, data []byte) (bool, []byte
|
||||
Str("config_file", path).
|
||||
Msg("Failed to rewrite sanitized configuration; using in-memory copy only")
|
||||
} else {
|
||||
// Phase 2: This sanitizer should never fire with the new config CLI
|
||||
// If it does, it indicates a bug or manual file editing
|
||||
log.Warn().
|
||||
Str("config_file", path).
|
||||
Int("removed_duplicate_blocks", len(matches)-1).
|
||||
Msg("Detected duplicate allowed_nodes blocks and sanitized configuration automatically")
|
||||
Msg("CONFIG SANITIZED – duplicate allowed_nodes blocks removed; this should not happen with Phase 2 config CLI – please report if you see this")
|
||||
}
|
||||
} else {
|
||||
log.Warn().
|
||||
Int("removed_duplicate_blocks", len(matches)-1).
|
||||
Msg("Detected duplicate allowed_nodes blocks and sanitized configuration (in-memory only)")
|
||||
Msg("CONFIG SANITIZED – duplicate allowed_nodes blocks detected in-memory – this should not happen with Phase 2 config CLI")
|
||||
}
|
||||
|
||||
return true, cleaned
|
||||
|
||||
331
cmd/pulse-sensor-proxy/config_cmd.go
Normal file
331
cmd/pulse-sensor-proxy/config_cmd.go
Normal file
@@ -0,0 +1,331 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"golang.org/x/sys/unix"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
var (
|
||||
// Config command flags
|
||||
configPathFlag string
|
||||
allowedNodesPathFlag string
|
||||
mergeNodesFlag []string
|
||||
replaceMode bool
|
||||
)
|
||||
|
||||
var configCmd = &cobra.Command{
|
||||
Use: "config",
|
||||
Short: "Manage sensor proxy configuration",
|
||||
Long: `Atomic configuration management for pulse-sensor-proxy`,
|
||||
}
|
||||
|
||||
var validateCmd = &cobra.Command{
|
||||
Use: "validate",
|
||||
Short: "Validate configuration files",
|
||||
Long: `Parse and validate config.yaml and allowed_nodes.yaml files`,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
cfgPath := configPathFlag
|
||||
if cfgPath == "" {
|
||||
cfgPath = defaultConfigPath
|
||||
}
|
||||
|
||||
allowedNodesPath := allowedNodesPathFlag
|
||||
if allowedNodesPath == "" {
|
||||
allowedNodesPath = filepath.Join(filepath.Dir(cfgPath), "allowed_nodes.yaml")
|
||||
}
|
||||
|
||||
if err := validateConfigFile(cfgPath); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Config validation failed: %v\n", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Check if allowed_nodes.yaml exists and validate it
|
||||
if _, err := os.Stat(allowedNodesPath); err == nil {
|
||||
if err := validateAllowedNodesFile(allowedNodesPath); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Allowed nodes validation failed: %v\n", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Println("Configuration valid")
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
var setAllowedNodesCmd = &cobra.Command{
|
||||
Use: "set-allowed-nodes",
|
||||
Short: "Atomically update allowed_nodes.yaml",
|
||||
Long: `Merge or replace allowed nodes with atomic writes and file locking.
|
||||
|
||||
Examples:
|
||||
# Merge new nodes into existing list
|
||||
pulse-sensor-proxy config set-allowed-nodes --merge 192.168.0.1 --merge node1.local
|
||||
|
||||
# Replace entire list
|
||||
pulse-sensor-proxy config set-allowed-nodes --replace --merge 192.168.0.1 --merge 192.168.0.2
|
||||
`,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
allowedNodesPath := allowedNodesPathFlag
|
||||
if allowedNodesPath == "" {
|
||||
// Default to /etc/pulse-sensor-proxy/allowed_nodes.yaml
|
||||
allowedNodesPath = "/etc/pulse-sensor-proxy/allowed_nodes.yaml"
|
||||
}
|
||||
|
||||
if len(mergeNodesFlag) == 0 {
|
||||
return fmt.Errorf("no nodes specified (use --merge flag)")
|
||||
}
|
||||
|
||||
if err := setAllowedNodes(allowedNodesPath, mergeNodesFlag, replaceMode); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Failed to update allowed nodes: %v\n", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if replaceMode {
|
||||
fmt.Printf("Replaced allowed nodes with %d entries\n", len(mergeNodesFlag))
|
||||
} else {
|
||||
fmt.Printf("Merged %d nodes into allowed nodes list\n", len(mergeNodesFlag))
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
// Add subcommands to config command
|
||||
configCmd.AddCommand(validateCmd)
|
||||
configCmd.AddCommand(setAllowedNodesCmd)
|
||||
|
||||
// Validate command flags
|
||||
validateCmd.Flags().StringVar(&configPathFlag, "config", "", "Path to config.yaml (default: /etc/pulse-sensor-proxy/config.yaml)")
|
||||
validateCmd.Flags().StringVar(&allowedNodesPathFlag, "allowed-nodes", "", "Path to allowed_nodes.yaml (default: same dir as config)")
|
||||
|
||||
// Set-allowed-nodes command flags
|
||||
setAllowedNodesCmd.Flags().StringVar(&allowedNodesPathFlag, "allowed-nodes", "", "Path to allowed_nodes.yaml (default: /etc/pulse-sensor-proxy/allowed_nodes.yaml)")
|
||||
setAllowedNodesCmd.Flags().StringSliceVar(&mergeNodesFlag, "merge", []string{}, "Node to merge (can be specified multiple times)")
|
||||
setAllowedNodesCmd.Flags().BoolVar(&replaceMode, "replace", false, "Replace entire list instead of merging")
|
||||
|
||||
// Add config command to root
|
||||
rootCmd.AddCommand(configCmd)
|
||||
}
|
||||
|
||||
// validateConfigFile parses and validates the main config file
|
||||
func validateConfigFile(path string) error {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read config file: %w", err)
|
||||
}
|
||||
|
||||
// Check for duplicate allowed_nodes blocks (the issue we're fixing)
|
||||
sanitized, cleanData := sanitizeDuplicateAllowedNodesBlocks("", data)
|
||||
if sanitized {
|
||||
return fmt.Errorf("config contains duplicate allowed_nodes blocks (would auto-fix on service start)")
|
||||
}
|
||||
|
||||
// Parse YAML
|
||||
cfg := &Config{}
|
||||
if err := yaml.Unmarshal(cleanData, cfg); err != nil {
|
||||
return fmt.Errorf("failed to parse config YAML: %w", err)
|
||||
}
|
||||
|
||||
// Validate required fields
|
||||
if cfg.ReadTimeout <= 0 {
|
||||
return fmt.Errorf("read_timeout must be positive")
|
||||
}
|
||||
if cfg.WriteTimeout <= 0 {
|
||||
return fmt.Errorf("write_timeout must be positive")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// validateAllowedNodesFile parses and validates the allowed_nodes.yaml file
|
||||
func validateAllowedNodesFile(path string) error {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read allowed_nodes file: %w", err)
|
||||
}
|
||||
|
||||
// Parse YAML - can be either a dict with allowed_nodes key or a list
|
||||
var result interface{}
|
||||
if err := yaml.Unmarshal(data, &result); err != nil {
|
||||
return fmt.Errorf("failed to parse allowed_nodes YAML: %w", err)
|
||||
}
|
||||
|
||||
// Extract nodes
|
||||
var nodes []string
|
||||
switch v := result.(type) {
|
||||
case map[string]interface{}:
|
||||
if nodeList, ok := v["allowed_nodes"]; ok {
|
||||
if list, ok := nodeList.([]interface{}); ok {
|
||||
for _, item := range list {
|
||||
if s, ok := item.(string); ok && s != "" {
|
||||
nodes = append(nodes, s)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
case []interface{}:
|
||||
for _, item := range v {
|
||||
if s, ok := item.(string); ok && s != "" {
|
||||
nodes = append(nodes, s)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(nodes) == 0 {
|
||||
return fmt.Errorf("allowed_nodes file is empty or invalid")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// setAllowedNodes atomically updates the allowed_nodes.yaml file with file locking
|
||||
func setAllowedNodes(path string, newNodes []string, replace bool) error {
|
||||
// Create directory if it doesn't exist
|
||||
dir := filepath.Dir(path)
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
return fmt.Errorf("failed to create config directory: %w", err)
|
||||
}
|
||||
|
||||
// Use file locking to prevent concurrent writes
|
||||
return withLockedFile(path, func(f *os.File) error {
|
||||
var existing []string
|
||||
|
||||
// Read existing nodes if not in replace mode
|
||||
if !replace {
|
||||
if data, err := os.ReadFile(path); err == nil {
|
||||
existing = extractNodesFromYAML(data)
|
||||
}
|
||||
}
|
||||
|
||||
// Merge and deduplicate
|
||||
merged := normalizeNodes(append(existing, newNodes...))
|
||||
|
||||
// Serialize to YAML
|
||||
output := map[string]interface{}{
|
||||
"allowed_nodes": merged,
|
||||
}
|
||||
data, err := yaml.Marshal(output)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal YAML: %w", err)
|
||||
}
|
||||
|
||||
// Add header comment
|
||||
header := "# Managed by pulse-sensor-proxy config CLI\n# Do not edit manually while service is running\n"
|
||||
finalData := []byte(header + string(data))
|
||||
|
||||
// Write atomically
|
||||
return atomicWriteFile(path, finalData, 0644)
|
||||
})
|
||||
}
|
||||
|
||||
// withLockedFile opens a file with exclusive locking and runs a callback
|
||||
func withLockedFile(path string, fn func(f *os.File) error) error {
|
||||
// Open or create the file
|
||||
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open file: %w", err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Acquire exclusive lock
|
||||
if err := unix.Flock(int(f.Fd()), unix.LOCK_EX); err != nil {
|
||||
return fmt.Errorf("failed to acquire file lock: %w", err)
|
||||
}
|
||||
defer unix.Flock(int(f.Fd()), unix.LOCK_UN) //nolint:errcheck
|
||||
|
||||
// Run callback
|
||||
return fn(f)
|
||||
}
|
||||
|
||||
// atomicWriteFile writes data to a file atomically using temp file + rename
|
||||
func atomicWriteFile(path string, data []byte, perm os.FileMode) error {
|
||||
dir := filepath.Dir(path)
|
||||
|
||||
// Create temp file in same directory
|
||||
tmp, err := os.CreateTemp(dir, ".tmp-*")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create temp file: %w", err)
|
||||
}
|
||||
tmpPath := tmp.Name()
|
||||
|
||||
// Clean up temp file on error
|
||||
defer func() {
|
||||
if tmpPath != "" {
|
||||
os.Remove(tmpPath)
|
||||
}
|
||||
}()
|
||||
|
||||
// Write data
|
||||
if _, err := tmp.Write(data); err != nil {
|
||||
tmp.Close()
|
||||
return fmt.Errorf("failed to write temp file: %w", err)
|
||||
}
|
||||
|
||||
// Sync to disk
|
||||
if err := tmp.Sync(); err != nil {
|
||||
tmp.Close()
|
||||
return fmt.Errorf("failed to sync temp file: %w", err)
|
||||
}
|
||||
|
||||
// Close temp file
|
||||
if err := tmp.Close(); err != nil {
|
||||
return fmt.Errorf("failed to close temp file: %w", err)
|
||||
}
|
||||
|
||||
// Set permissions
|
||||
if err := os.Chmod(tmpPath, perm); err != nil {
|
||||
return fmt.Errorf("failed to set permissions: %w", err)
|
||||
}
|
||||
|
||||
// Atomic rename
|
||||
if err := os.Rename(tmpPath, path); err != nil {
|
||||
return fmt.Errorf("failed to rename temp file: %w", err)
|
||||
}
|
||||
|
||||
// Mark temp file as successfully moved (don't delete in defer)
|
||||
tmpPath = ""
|
||||
|
||||
// Sync directory to ensure rename is persisted
|
||||
dirFile, err := os.Open(dir)
|
||||
if err == nil {
|
||||
dirFile.Sync() //nolint:errcheck
|
||||
dirFile.Close()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// extractNodesFromYAML extracts node list from YAML data
|
||||
func extractNodesFromYAML(data []byte) []string {
|
||||
var result interface{}
|
||||
if err := yaml.Unmarshal(data, &result); err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var nodes []string
|
||||
switch v := result.(type) {
|
||||
case map[string]interface{}:
|
||||
if nodeList, ok := v["allowed_nodes"]; ok {
|
||||
if list, ok := nodeList.([]interface{}); ok {
|
||||
for _, item := range list {
|
||||
if s, ok := item.(string); ok && s != "" {
|
||||
nodes = append(nodes, s)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
case []interface{}:
|
||||
for _, item := range v {
|
||||
if s, ok := item.(string); ok && s != "" {
|
||||
nodes = append(nodes, s)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nodes
|
||||
}
|
||||
@@ -650,66 +650,30 @@ update_allowed_nodes() {
|
||||
local nodes=("$@")
|
||||
|
||||
# File mode is now required - inline mode has been removed
|
||||
# Phase 2: Use config CLI instead of shell/Python manipulation
|
||||
ensure_allowed_nodes_file_reference
|
||||
remove_allowed_nodes_block
|
||||
|
||||
if ! command -v python3 >/dev/null 2>&1; then
|
||||
print_warn "python3 is required to manage allowed_nodes; skipping update"
|
||||
# Build --merge flags for the CLI
|
||||
local merge_args=()
|
||||
for node in "${nodes[@]}"; do
|
||||
if [[ -n "$node" ]]; then
|
||||
merge_args+=(--merge "$node")
|
||||
fi
|
||||
done
|
||||
|
||||
if [[ ${#merge_args[@]} -eq 0 ]]; then
|
||||
return
|
||||
fi
|
||||
|
||||
python3 - "$ALLOWED_NODES_FILE" "$comment_line" "${nodes[@]}" <<'PY'
|
||||
import sys
|
||||
from pathlib import Path
|
||||
import yaml
|
||||
|
||||
path = Path(sys.argv[1])
|
||||
comment = sys.argv[2]
|
||||
new_nodes = [n.strip() for n in sys.argv[3:] if n.strip()]
|
||||
existing = []
|
||||
if path.exists():
|
||||
text = path.read_text()
|
||||
try:
|
||||
data = yaml.safe_load(text)
|
||||
except yaml.YAMLError:
|
||||
data = None
|
||||
if isinstance(data, dict):
|
||||
arr = data.get('allowed_nodes')
|
||||
if isinstance(arr, list):
|
||||
existing = [str(x).strip() for x in arr if str(x).strip()]
|
||||
elif isinstance(data, list):
|
||||
existing = [str(x).strip() for x in data if str(x).strip()]
|
||||
else:
|
||||
for line in text.splitlines():
|
||||
line = line.strip()
|
||||
if not line or line.startswith('#'):
|
||||
continue
|
||||
if line.startswith('-'):
|
||||
line = line[1:].strip()
|
||||
if line:
|
||||
existing.append(line)
|
||||
|
||||
seen = set()
|
||||
merged = []
|
||||
for entry in existing + new_nodes:
|
||||
entry = entry.strip()
|
||||
if not entry:
|
||||
continue
|
||||
key = entry.lower()
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
merged.append(entry)
|
||||
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with path.open('w') as fh:
|
||||
fh.write("# Managed by install-sensor-proxy.sh\n")
|
||||
fh.write(f"# {comment}\n")
|
||||
yaml.safe_dump({'allowed_nodes': merged}, fh, default_flow_style=False, sort_keys=False)
|
||||
PY
|
||||
|
||||
chmod 0644 "$ALLOWED_NODES_FILE" 2>/dev/null || true
|
||||
chown pulse-sensor-proxy:pulse-sensor-proxy "$ALLOWED_NODES_FILE" 2>/dev/null || true
|
||||
# Use the config CLI for atomic, locked updates
|
||||
if "$BINARY_PATH" config set-allowed-nodes --allowed-nodes "$ALLOWED_NODES_FILE" "${merge_args[@]}"; then
|
||||
chmod 0644 "$ALLOWED_NODES_FILE" 2>/dev/null || true
|
||||
chown pulse-sensor-proxy:pulse-sensor-proxy "$ALLOWED_NODES_FILE" 2>/dev/null || true
|
||||
else
|
||||
print_error "Failed to update allowed_nodes using config CLI"
|
||||
return 1
|
||||
fi
|
||||
}
|
||||
|
||||
|
||||
@@ -1958,6 +1922,8 @@ Type=simple
|
||||
User=pulse-sensor-proxy
|
||||
Group=pulse-sensor-proxy
|
||||
WorkingDirectory=/var/lib/pulse-sensor-proxy
|
||||
# Validate config before starting (Phase 2: prevent corruption from starting service)
|
||||
ExecStartPre=${BINARY_PATH} config validate --config /etc/pulse-sensor-proxy/config.yaml
|
||||
ExecStart=${BINARY_PATH} --config /etc/pulse-sensor-proxy/config.yaml
|
||||
Restart=on-failure
|
||||
RestartSec=5s
|
||||
@@ -2017,6 +1983,8 @@ User=pulse-sensor-proxy
|
||||
Group=pulse-sensor-proxy
|
||||
SupplementaryGroups=www-data
|
||||
WorkingDirectory=/var/lib/pulse-sensor-proxy
|
||||
# Validate config before starting (Phase 2: prevent corruption from starting service)
|
||||
ExecStartPre=${BINARY_PATH} config validate --config /etc/pulse-sensor-proxy/config.yaml
|
||||
ExecStart=${BINARY_PATH}
|
||||
Restart=on-failure
|
||||
RestartSec=5s
|
||||
|
||||
Reference in New Issue
Block a user