Add unit tests for validation utility functions (pulse-sensor-proxy)

Add 42 test cases for security-critical validation utility functions:

- TestStripNodeDelimiters (10 cases): IPv6 bracket handling, edge cases
- TestParseNodeIP (10 cases): IPv4/IPv6 parsing with bracket support
- TestNormalizeAllowlistEntry (11 cases): case normalization, whitespace
  handling, IPv6 full form compression
- TestIPAllowed (11 cases): CIDR matching, hosts map lookup, nil handling

These functions are used for node allowlist validation to prevent SSRF
attacks in the sensor proxy.
This commit is contained in:
rcourtman
2025-11-30 17:04:43 +00:00
parent 8b851bc80b
commit 97672b4701

View File

@@ -192,3 +192,329 @@ func TestNodeValidatorStrictNoSources(t *testing.T) {
t.Fatalf("expected strict mode without sources to reject nodes")
}
}
func TestStripNodeDelimiters(t *testing.T) {
tests := []struct {
name string
input string
expected string
}{
{
name: "ipv6 with brackets",
input: "[2001:db8::1]",
expected: "2001:db8::1",
},
{
name: "ipv6 loopback bracketed",
input: "[::1]",
expected: "::1",
},
{
name: "ipv4 no brackets",
input: "192.168.1.1",
expected: "192.168.1.1",
},
{
name: "hostname no brackets",
input: "node-1.example.com",
expected: "node-1.example.com",
},
{
name: "empty string",
input: "",
expected: "",
},
{
name: "only opening bracket",
input: "[2001:db8::1",
expected: "[2001:db8::1",
},
{
name: "only closing bracket",
input: "2001:db8::1]",
expected: "2001:db8::1]",
},
{
name: "brackets with single char",
input: "[a]",
expected: "a",
},
{
name: "empty brackets",
input: "[]",
expected: "[]",
},
{
name: "single bracket char",
input: "[",
expected: "[",
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
result := stripNodeDelimiters(tc.input)
if result != tc.expected {
t.Errorf("stripNodeDelimiters(%q) = %q, want %q", tc.input, result, tc.expected)
}
})
}
}
func TestParseNodeIP(t *testing.T) {
tests := []struct {
name string
input string
expectNil bool
expectedIP string
}{
{
name: "ipv4",
input: "192.168.1.1",
expectNil: false,
expectedIP: "192.168.1.1",
},
{
name: "ipv4 with whitespace",
input: " 10.0.0.1 ",
expectNil: false,
expectedIP: "10.0.0.1",
},
{
name: "ipv6",
input: "2001:db8::1",
expectNil: false,
expectedIP: "2001:db8::1",
},
{
name: "ipv6 bracketed",
input: "[2001:db8::1]",
expectNil: false,
expectedIP: "2001:db8::1",
},
{
name: "ipv6 loopback",
input: "::1",
expectNil: false,
expectedIP: "::1",
},
{
name: "ipv6 loopback bracketed",
input: "[::1]",
expectNil: false,
expectedIP: "::1",
},
{
name: "hostname",
input: "node-1.example.com",
expectNil: true,
},
{
name: "empty string",
input: "",
expectNil: true,
},
{
name: "invalid ip",
input: "999.999.999.999",
expectNil: true,
},
{
name: "partial ipv4",
input: "192.168.1",
expectNil: true,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
result := parseNodeIP(tc.input)
if tc.expectNil {
if result != nil {
t.Errorf("parseNodeIP(%q) = %v, want nil", tc.input, result)
}
} else {
if result == nil {
t.Errorf("parseNodeIP(%q) = nil, want %s", tc.input, tc.expectedIP)
} else if result.String() != tc.expectedIP {
t.Errorf("parseNodeIP(%q) = %v, want %s", tc.input, result, tc.expectedIP)
}
}
})
}
}
func TestNormalizeAllowlistEntry(t *testing.T) {
tests := []struct {
name string
input string
expected string
}{
{
name: "ipv4",
input: "192.168.1.1",
expected: "192.168.1.1",
},
{
name: "ipv4 with whitespace",
input: " 10.0.0.1 ",
expected: "10.0.0.1",
},
{
name: "ipv6",
input: "2001:db8::1",
expected: "2001:db8::1",
},
{
name: "ipv6 bracketed",
input: "[2001:db8::1]",
expected: "2001:db8::1",
},
{
name: "hostname lowercase",
input: "node-1.example.com",
expected: "node-1.example.com",
},
{
name: "hostname uppercase normalized",
input: "NODE-1.EXAMPLE.COM",
expected: "node-1.example.com",
},
{
name: "hostname mixed case",
input: "Node-1.Example.Com",
expected: "node-1.example.com",
},
{
name: "empty string",
input: "",
expected: "",
},
{
name: "whitespace only",
input: " ",
expected: "",
},
{
name: "ipv6 loopback",
input: "::1",
expected: "::1",
},
{
name: "ipv6 full form normalized",
input: "2001:0db8:0000:0000:0000:0000:0000:0001",
expected: "2001:db8::1",
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
result := normalizeAllowlistEntry(tc.input)
if result != tc.expected {
t.Errorf("normalizeAllowlistEntry(%q) = %q, want %q", tc.input, result, tc.expected)
}
})
}
}
func TestIPAllowed(t *testing.T) {
// Setup test CIDRs
_, cidr10, _ := net.ParseCIDR("10.0.0.0/8")
_, cidr192, _ := net.ParseCIDR("192.168.1.0/24")
_, cidr172, _ := net.ParseCIDR("172.16.0.0/12")
tests := []struct {
name string
ip net.IP
hosts map[string]struct{}
cidrs []*net.IPNet
expected bool
}{
{
name: "nil ip",
ip: nil,
hosts: map[string]struct{}{"10.0.0.1": {}},
cidrs: nil,
expected: false,
},
{
name: "ip in hosts map",
ip: net.ParseIP("192.168.1.100"),
hosts: map[string]struct{}{"192.168.1.100": {}},
cidrs: nil,
expected: true,
},
{
name: "ip not in hosts map",
ip: net.ParseIP("192.168.1.100"),
hosts: map[string]struct{}{"192.168.1.200": {}},
cidrs: nil,
expected: false,
},
{
name: "ip in cidr range",
ip: net.ParseIP("10.1.2.3"),
hosts: nil,
cidrs: []*net.IPNet{cidr10},
expected: true,
},
{
name: "ip not in cidr range",
ip: net.ParseIP("11.0.0.1"),
hosts: nil,
cidrs: []*net.IPNet{cidr10},
expected: false,
},
{
name: "ip in second cidr",
ip: net.ParseIP("192.168.1.50"),
hosts: nil,
cidrs: []*net.IPNet{cidr10, cidr192},
expected: true,
},
{
name: "ip matches hosts not cidrs",
ip: net.ParseIP("8.8.8.8"),
hosts: map[string]struct{}{"8.8.8.8": {}},
cidrs: []*net.IPNet{cidr10},
expected: true,
},
{
name: "ip matches cidrs not hosts",
ip: net.ParseIP("172.20.0.1"),
hosts: map[string]struct{}{"8.8.8.8": {}},
cidrs: []*net.IPNet{cidr172},
expected: true,
},
{
name: "empty hosts and cidrs",
ip: net.ParseIP("192.168.1.1"),
hosts: nil,
cidrs: nil,
expected: false,
},
{
name: "ipv6 in hosts",
ip: net.ParseIP("2001:db8::1"),
hosts: map[string]struct{}{"2001:db8::1": {}},
cidrs: nil,
expected: true,
},
{
name: "nil hosts map with ip match in cidr",
ip: net.ParseIP("10.255.255.255"),
hosts: nil,
cidrs: []*net.IPNet{cidr10},
expected: true,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
result := ipAllowed(tc.ip, tc.hosts, tc.cidrs)
if result != tc.expected {
t.Errorf("ipAllowed(%v, hosts, cidrs) = %v, want %v", tc.ip, result, tc.expected)
}
})
}
}