Files
Pulse/cmd/pulse-agent/main.go

258 lines
6.9 KiB
Go

package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/rcourtman/pulse-go-rewrite/internal/dockeragent"
"github.com/rcourtman/pulse-go-rewrite/internal/hostagent"
"github.com/rcourtman/pulse-go-rewrite/internal/utils"
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"
)
var (
Version = "dev"
)
type multiValue []string
func (m *multiValue) String() string {
return strings.Join(*m, ",")
}
func (m *multiValue) Set(value string) error {
*m = append(*m, value)
return nil
}
func main() {
// 1. Parse Configuration
cfg := loadConfig()
// 2. Setup Logging
zerolog.SetGlobalLevel(cfg.LogLevel)
logger := zerolog.New(os.Stdout).Level(cfg.LogLevel).With().Timestamp().Logger()
cfg.Logger = &logger
// 3. Setup Context & Signal Handling
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
g, ctx := errgroup.WithContext(ctx)
logger.Info().
Str("version", Version).
Str("pulse_url", cfg.PulseURL).
Bool("host_agent", cfg.EnableHost).
Bool("docker_agent", cfg.EnableDocker).
Msg("Starting Pulse Unified Agent")
// 4. Start Host Agent (if enabled)
if cfg.EnableHost {
hostCfg := hostagent.Config{
PulseURL: cfg.PulseURL,
APIToken: cfg.APIToken,
Interval: cfg.Interval,
HostnameOverride: cfg.HostnameOverride,
AgentID: cfg.AgentID, // Shared ID? Or separate? Usually separate for now.
Tags: cfg.Tags,
InsecureSkipVerify: cfg.InsecureSkipVerify,
LogLevel: cfg.LogLevel,
Logger: &logger,
}
// If AgentID is set globally, we might want to suffix it or let the agents derive their own.
// For now, let's pass it through. If it's empty, agents derive their own.
agent, err := hostagent.New(hostCfg)
if err != nil {
logger.Fatal().Err(err).Msg("Failed to initialize host agent")
}
g.Go(func() error {
logger.Info().Msg("Host agent module started")
return agent.Run(ctx)
})
}
// 5. Start Docker Agent (if enabled)
if cfg.EnableDocker {
dockerCfg := dockeragent.Config{
PulseURL: cfg.PulseURL,
APIToken: cfg.APIToken,
Interval: cfg.Interval,
HostnameOverride: cfg.HostnameOverride,
AgentID: cfg.AgentID,
InsecureSkipVerify: cfg.InsecureSkipVerify,
DisableAutoUpdate: true, // Unified agent handles updates (future)
LogLevel: cfg.LogLevel,
Logger: &logger,
// Docker specific defaults
SwarmScope: "node",
IncludeContainers: true,
IncludeServices: true,
IncludeTasks: true,
CollectDiskMetrics: true,
}
agent, err := dockeragent.New(dockerCfg)
if err != nil {
logger.Fatal().Err(err).Msg("Failed to initialize docker agent")
}
// Docker agent has a Close method we should call, but errgroup doesn't make defer easy here.
// Ideally we wrap this. For now, we rely on OS cleanup or context cancellation.
g.Go(func() error {
logger.Info().Msg("Docker agent module started")
return agent.Run(ctx)
})
}
// 6. Wait for all agents to exit
if err := g.Wait(); err != nil && err != context.Canceled {
logger.Error().Err(err).Msg("Agent terminated with error")
os.Exit(1)
}
logger.Info().Msg("Pulse Unified Agent stopped")
}
type Config struct {
PulseURL string
APIToken string
Interval time.Duration
HostnameOverride string
AgentID string
Tags []string
InsecureSkipVerify bool
LogLevel zerolog.Level
Logger *zerolog.Logger
// Module flags
EnableHost bool
EnableDocker bool
}
func loadConfig() Config {
// Environment Variables
envURL := utils.GetenvTrim("PULSE_URL")
envToken := utils.GetenvTrim("PULSE_TOKEN")
envInterval := utils.GetenvTrim("PULSE_INTERVAL")
envHostname := utils.GetenvTrim("PULSE_HOSTNAME")
envAgentID := utils.GetenvTrim("PULSE_AGENT_ID")
envInsecure := utils.GetenvTrim("PULSE_INSECURE_SKIP_VERIFY")
envTags := utils.GetenvTrim("PULSE_TAGS")
envLogLevel := utils.GetenvTrim("LOG_LEVEL")
envEnableHost := utils.GetenvTrim("PULSE_ENABLE_HOST")
envEnableDocker := utils.GetenvTrim("PULSE_ENABLE_DOCKER")
// Defaults
defaultInterval := 30 * time.Second
if envInterval != "" {
if parsed, err := time.ParseDuration(envInterval); err == nil {
defaultInterval = parsed
}
}
defaultEnableHost := true
if envEnableHost != "" {
defaultEnableHost = utils.ParseBool(envEnableHost)
}
defaultEnableDocker := false
if envEnableDocker != "" {
defaultEnableDocker = utils.ParseBool(envEnableDocker)
}
// Flags
urlFlag := flag.String("url", envURL, "Pulse server URL")
tokenFlag := flag.String("token", envToken, "Pulse API token")
intervalFlag := flag.Duration("interval", defaultInterval, "Reporting interval")
hostnameFlag := flag.String("hostname", envHostname, "Override hostname")
agentIDFlag := flag.String("agent-id", envAgentID, "Override agent identifier")
insecureFlag := flag.Bool("insecure", utils.ParseBool(envInsecure), "Skip TLS verification")
logLevelFlag := flag.String("log-level", defaultLogLevel(envLogLevel), "Log level")
enableHostFlag := flag.Bool("enable-host", defaultEnableHost, "Enable Host Agent module")
enableDockerFlag := flag.Bool("enable-docker", defaultEnableDocker, "Enable Docker Agent module")
var tagFlags multiValue
flag.Var(&tagFlags, "tag", "Tag to apply (repeatable)")
flag.Parse()
// Validation
pulseURL := strings.TrimSpace(*urlFlag)
if pulseURL == "" {
pulseURL = "http://localhost:7655"
}
token := strings.TrimSpace(*tokenFlag)
if token == "" {
fmt.Fprintln(os.Stderr, "error: Pulse API token is required")
os.Exit(1)
}
logLevel, err := parseLogLevel(*logLevelFlag)
if err != nil {
logLevel = zerolog.InfoLevel
}
tags := gatherTags(envTags, tagFlags)
return Config{
PulseURL: pulseURL,
APIToken: token,
Interval: *intervalFlag,
HostnameOverride: strings.TrimSpace(*hostnameFlag),
AgentID: strings.TrimSpace(*agentIDFlag),
Tags: tags,
InsecureSkipVerify: *insecureFlag,
LogLevel: logLevel,
EnableHost: *enableHostFlag,
EnableDocker: *enableDockerFlag,
}
}
// Helpers (duplicated from existing agents for now, to be moved to shared pkg later)
func gatherTags(env string, flags []string) []string {
tags := make([]string, 0)
if env != "" {
for _, tag := range strings.Split(env, ",") {
tag = strings.TrimSpace(tag)
if tag != "" {
tags = append(tags, tag)
}
}
}
for _, tag := range flags {
tag = strings.TrimSpace(tag)
if tag != "" {
tags = append(tags, tag)
}
}
return tags
}
func parseLogLevel(value string) (zerolog.Level, error) {
normalized := strings.ToLower(strings.TrimSpace(value))
if normalized == "" {
return zerolog.InfoLevel, nil
}
return zerolog.ParseLevel(normalized)
}
func defaultLogLevel(envValue string) string {
if strings.TrimSpace(envValue) == "" {
return "info"
}
return envValue
}