mirror of
https://github.com/rcourtman/Pulse.git
synced 2026-02-18 00:17:39 +01:00
feat(audit): real per-tenant SQLite audit logging with license gating (W1-B)
- Add SQLiteLoggerFactory implementing LoggerFactory interface, bridging dbPath to SQLiteLoggerConfig.DataDir with per-tenant crypto support - Wire factory into server.go TenantLoggerManager initialization - Remove stub initAuditLoggerIfLicensed from license_handlers.go - Make all /api/audit/* handlers tenant-aware via GetOrgID(ctx) - Register /api/audit/export and /api/audit/summary with audit_logging license gate - Add factory persistence + HMAC signing test - Add tenant isolation test (org-a events invisible to org-b) Decision: always capture audit events to SQLite; gate query/export endpoints behind audit_logging license feature.
This commit is contained in:
@@ -35,6 +35,9 @@ func (h *AuditHandlers) HandleListAuditEvents(w http.ResponseWriter, r *http.Req
|
||||
return
|
||||
}
|
||||
|
||||
orgID := GetOrgID(r.Context())
|
||||
logger := getLoggerForOrg(orgID)
|
||||
|
||||
query := r.URL.Query()
|
||||
|
||||
filter := audit.QueryFilter{
|
||||
@@ -78,8 +81,6 @@ func (h *AuditHandlers) HandleListAuditEvents(w http.ResponseWriter, r *http.Req
|
||||
filter.Success = &success
|
||||
}
|
||||
|
||||
logger := audit.GetLogger()
|
||||
|
||||
// Query events from the current logger
|
||||
events, err := logger.Query(filter)
|
||||
if err != nil {
|
||||
@@ -102,7 +103,7 @@ func (h *AuditHandlers) HandleListAuditEvents(w http.ResponseWriter, r *http.Req
|
||||
response := map[string]interface{}{
|
||||
"events": events,
|
||||
"total": totalCount,
|
||||
"persistentLogging": len(events) > 0 || isPersistentLogger(),
|
||||
"persistentLogging": len(events) > 0 || isPersistentLogger(logger),
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
@@ -116,6 +117,9 @@ func (h *AuditHandlers) HandleVerifyAuditEvent(w http.ResponseWriter, r *http.Re
|
||||
return
|
||||
}
|
||||
|
||||
orgID := GetOrgID(r.Context())
|
||||
logger := getLoggerForOrg(orgID)
|
||||
|
||||
eventID := r.PathValue("id")
|
||||
if eventID == "" {
|
||||
writeErrorResponse(w, http.StatusBadRequest, "missing_id", "Missing event ID", nil)
|
||||
@@ -129,7 +133,7 @@ func (h *AuditHandlers) HandleVerifyAuditEvent(w http.ResponseWriter, r *http.Re
|
||||
}
|
||||
|
||||
// For OSS, return not_available
|
||||
if !isPersistentLogger() {
|
||||
if !isPersistentLogger(logger) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||||
"available": false,
|
||||
@@ -138,7 +142,6 @@ func (h *AuditHandlers) HandleVerifyAuditEvent(w http.ResponseWriter, r *http.Re
|
||||
return
|
||||
}
|
||||
|
||||
logger := audit.GetLogger()
|
||||
verifier, ok := logger.(interface {
|
||||
VerifySignature(event audit.Event) bool
|
||||
})
|
||||
@@ -181,7 +184,8 @@ func (h *AuditHandlers) HandleGetWebhooks(w http.ResponseWriter, r *http.Request
|
||||
return
|
||||
}
|
||||
|
||||
logger := audit.GetLogger()
|
||||
orgID := GetOrgID(r.Context())
|
||||
logger := getLoggerForOrg(orgID)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||||
"urls": logger.GetWebhookURLs(),
|
||||
@@ -216,7 +220,8 @@ func (h *AuditHandlers) HandleUpdateWebhooks(w http.ResponseWriter, r *http.Requ
|
||||
validatedURLs = append(validatedURLs, rawURL)
|
||||
}
|
||||
|
||||
logger := audit.GetLogger()
|
||||
orgID := GetOrgID(r.Context())
|
||||
logger := getLoggerForOrg(orgID)
|
||||
if err := logger.UpdateWebhookURLs(validatedURLs); err != nil {
|
||||
writeErrorResponse(w, http.StatusInternalServerError, "update_failed", "Failed to update webhooks", nil)
|
||||
return
|
||||
@@ -324,8 +329,7 @@ func isPrivateOrReservedIP(ip net.IP) bool {
|
||||
}
|
||||
|
||||
// isPersistentLogger checks if we're using a persistent audit logger (enterprise).
|
||||
func isPersistentLogger() bool {
|
||||
logger := audit.GetLogger()
|
||||
func isPersistentLogger(logger audit.Logger) bool {
|
||||
_, isConsole := logger.(*audit.ConsoleLogger)
|
||||
return !isConsole
|
||||
}
|
||||
@@ -337,8 +341,11 @@ func (h *AuditHandlers) HandleExportAuditEvents(w http.ResponseWriter, r *http.R
|
||||
return
|
||||
}
|
||||
|
||||
orgID := GetOrgID(r.Context())
|
||||
logger := getLoggerForOrg(orgID)
|
||||
|
||||
// Check if persistent logger is available
|
||||
if !isPersistentLogger() {
|
||||
if !isPersistentLogger(logger) {
|
||||
writeErrorResponse(w, http.StatusNotImplemented, "export_unavailable",
|
||||
"Export requires Pulse Pro with enterprise audit logging", nil)
|
||||
return
|
||||
@@ -381,8 +388,6 @@ func (h *AuditHandlers) HandleExportAuditEvents(w http.ResponseWriter, r *http.R
|
||||
// Parse verification flag
|
||||
includeVerification := query.Get("verify") == "true"
|
||||
|
||||
// Get the logger and check if it's persistent
|
||||
logger := audit.GetLogger()
|
||||
persistentLogger, ok := logger.(audit.PersistentLogger)
|
||||
if !ok {
|
||||
writeErrorResponse(w, http.StatusNotImplemented, "export_unavailable",
|
||||
@@ -413,8 +418,11 @@ func (h *AuditHandlers) HandleAuditSummary(w http.ResponseWriter, r *http.Reques
|
||||
return
|
||||
}
|
||||
|
||||
orgID := GetOrgID(r.Context())
|
||||
logger := getLoggerForOrg(orgID)
|
||||
|
||||
// Check if persistent logger is available
|
||||
if !isPersistentLogger() {
|
||||
if !isPersistentLogger(logger) {
|
||||
writeErrorResponse(w, http.StatusNotImplemented, "summary_unavailable",
|
||||
"Summary requires Pulse Pro with enterprise audit logging", nil)
|
||||
return
|
||||
@@ -445,8 +453,6 @@ func (h *AuditHandlers) HandleAuditSummary(w http.ResponseWriter, r *http.Reques
|
||||
// Parse verification flag
|
||||
verifySignatures := query.Get("verify") == "true"
|
||||
|
||||
// Get the logger and check if it's persistent
|
||||
logger := audit.GetLogger()
|
||||
persistentLogger, ok := logger.(audit.PersistentLogger)
|
||||
if !ok {
|
||||
writeErrorResponse(w, http.StatusNotImplemented, "summary_unavailable",
|
||||
@@ -466,3 +472,10 @@ func (h *AuditHandlers) HandleAuditSummary(w http.ResponseWriter, r *http.Reques
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(summary)
|
||||
}
|
||||
|
||||
func getLoggerForOrg(orgID string) audit.Logger {
|
||||
if mgr := GetTenantAuditManager(); mgr != nil {
|
||||
return mgr.GetLogger(orgID)
|
||||
}
|
||||
return audit.GetLogger()
|
||||
}
|
||||
|
||||
@@ -15,11 +15,13 @@ func TestAuditEndpointsRequireSettingsReadScope(t *testing.T) {
|
||||
rawToken := "audit-scope-token-123.12345678"
|
||||
record := newTokenRecord(t, rawToken, []string{config.ScopeMonitoringRead}, nil)
|
||||
cfg := newTestConfigWithTokens(t, record)
|
||||
router := NewRouter(cfg, nil, nil, nil, nil, "1.0.0")
|
||||
router := NewRouter(cfg, nil, nil, nil, nil, "1.0.0", nil)
|
||||
|
||||
paths := []string{
|
||||
"/api/audit",
|
||||
"/api/audit/event-1/verify",
|
||||
"/api/audit/export",
|
||||
"/api/audit/summary",
|
||||
}
|
||||
|
||||
for _, path := range paths {
|
||||
@@ -42,7 +44,7 @@ func TestReportingEndpointsRequireSettingsReadScope(t *testing.T) {
|
||||
rawToken := "reports-scope-token-123.12345678"
|
||||
record := newTokenRecord(t, rawToken, []string{config.ScopeMonitoringRead}, nil)
|
||||
cfg := newTestConfigWithTokens(t, record)
|
||||
router := NewRouter(cfg, nil, nil, nil, nil, "1.0.0")
|
||||
router := NewRouter(cfg, nil, nil, nil, nil, "1.0.0", nil)
|
||||
|
||||
paths := []string{
|
||||
"/api/admin/reports/generate",
|
||||
@@ -70,7 +72,7 @@ func TestAuditWebhooksRequireSettingsScopes(t *testing.T) {
|
||||
rawToken := "audit-webhooks-read-scope-token-123.12345678"
|
||||
record := newTokenRecord(t, rawToken, []string{config.ScopeMonitoringRead}, nil)
|
||||
cfg := newTestConfigWithTokens(t, record)
|
||||
router := NewRouter(cfg, nil, nil, nil, nil, "1.0.0")
|
||||
router := NewRouter(cfg, nil, nil, nil, nil, "1.0.0", nil)
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "/api/admin/webhooks/audit", nil)
|
||||
req.Header.Set("X-API-Token", rawToken)
|
||||
@@ -88,7 +90,7 @@ func TestAuditWebhooksRequireSettingsScopes(t *testing.T) {
|
||||
rawToken := "audit-webhooks-write-scope-token-123.12345678"
|
||||
record := newTokenRecord(t, rawToken, []string{config.ScopeSettingsRead}, nil)
|
||||
cfg := newTestConfigWithTokens(t, record)
|
||||
router := NewRouter(cfg, nil, nil, nil, nil, "1.0.0")
|
||||
router := NewRouter(cfg, nil, nil, nil, nil, "1.0.0", nil)
|
||||
|
||||
req := httptest.NewRequest(http.MethodPost, "/api/admin/webhooks/audit", strings.NewReader(`{"urls":[]}`))
|
||||
req.Header.Set("X-API-Token", rawToken)
|
||||
|
||||
@@ -11,7 +11,6 @@ import (
|
||||
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/license"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/license/entitlements"
|
||||
"github.com/rcourtman/pulse-go-rewrite/pkg/audit"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
@@ -21,8 +20,6 @@ type LicenseHandlers struct {
|
||||
mtPersistence *config.MultiTenantPersistence
|
||||
hostedMode bool
|
||||
services sync.Map // map[string]*license.Service
|
||||
configDir string // Base config dir, though we use mtPersistence for tenants
|
||||
auditOnce sync.Once
|
||||
}
|
||||
|
||||
// NewLicenseHandlers creates a new license handlers instance.
|
||||
@@ -79,11 +76,6 @@ func (h *LicenseHandlers) getTenantComponents(ctx context.Context) (*license.Ser
|
||||
lic.GracePeriodEnd = &gracePeriodEnd
|
||||
}
|
||||
log.Info().Str("org_id", orgID).Msg("Loaded saved Pulse Pro license")
|
||||
|
||||
// Initialize audit logger (globally) if licensed
|
||||
// This is a trade-off: if ANY tenant is licensed, we enable audit logging globally (or for that path?)
|
||||
// Since audit logger is global, we do this once.
|
||||
h.initAuditLoggerIfLicensed(service, persistence)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -100,45 +92,6 @@ func (h *LicenseHandlers) getPersistenceForOrg(orgID string) (*license.Persisten
|
||||
return license.NewPersistence(configPersistence.GetConfigDir())
|
||||
}
|
||||
|
||||
// initAuditLoggerIfLicensed initializes the SQLite audit logger if the license
|
||||
// includes the audit_logging feature. This enables persistent audit logs with
|
||||
// HMAC signing for Pro users.
|
||||
func (h *LicenseHandlers) initAuditLoggerIfLicensed(service *license.Service, persistence *license.Persistence) {
|
||||
if !service.HasFeature(license.FeatureAuditLogging) {
|
||||
return
|
||||
}
|
||||
|
||||
h.auditOnce.Do(func() {
|
||||
// Check if we already have a SQLiteLogger (avoid re-initialization)
|
||||
if _, ok := audit.GetLogger().(*audit.SQLiteLogger); ok {
|
||||
return
|
||||
}
|
||||
|
||||
// Use the directory of the license persistence as base?
|
||||
// Or stick to the first tenant's dir? Or global?
|
||||
// For now, let's use the directory where this license was found.
|
||||
// Note: This relies on license.Persistence exposing methods or we assume logic.
|
||||
// Since license.Persistence doesn't expose dir, we might need a workaround or pass dir.
|
||||
// But in getTenantComponents we construct persistence from configDir.
|
||||
// We'll trust audit.NewSQLiteLogger to handle it.
|
||||
// Wait, we don't have configDir easily here unless we pass it.
|
||||
// But we can assume audit should go to the same place as the license.
|
||||
// Actually, let's just use the `configDir` passed to NewLicenseHandlers?
|
||||
// No, we removed it.
|
||||
// We'll use the directory from the persistence if possible, or just default.
|
||||
// Let's assume passed persistence knows its path? No.
|
||||
// We'll skip passing dir for now and rely on global settings or revisit.
|
||||
// Wait, audit.NewSQLiteLogger NEEDS a DataDir.
|
||||
// I'll grab it from the calling context in getTenantComponents?
|
||||
// Refactoring: getTenantComponents calls getPersistenceForOrg which uses configPersistence.GetConfigDir().
|
||||
// I'll assume we can use that directory.
|
||||
})
|
||||
|
||||
// Re-check lock outside Once to avoid blocking, but for simplicity:
|
||||
// If Global logger is already set, we are good.
|
||||
// NOTE: We are merely enabling it.
|
||||
}
|
||||
|
||||
// Service returns the license service for use by other handlers.
|
||||
// NOTE: This now requires context to identify the tenant.
|
||||
// Handlers using this will need to be updated.
|
||||
@@ -299,9 +252,6 @@ func (h *LicenseHandlers) HandleActivateLicense(w http.ResponseWriter, r *http.R
|
||||
Bool("lifetime", lic.IsLifetime()).
|
||||
Msg("Pulse Pro license activated")
|
||||
|
||||
// Initialize audit logger if the new license has audit_logging feature
|
||||
h.initAuditLoggerIfLicensed(service, persistence)
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(ActivateLicenseResponse{
|
||||
Success: true,
|
||||
|
||||
@@ -224,6 +224,9 @@ var publicRouteAllowlist = []string{
|
||||
"/api/security/quick-setup",
|
||||
"/api/login",
|
||||
"/api/oidc/login",
|
||||
"/api/public/signup",
|
||||
"/api/public/magic-link/request",
|
||||
"/api/public/magic-link/verify",
|
||||
"/api/ai/oauth/callback",
|
||||
"/api/setup-script",
|
||||
"/api/system/verify-temperature-ssh",
|
||||
@@ -405,6 +408,8 @@ var allRouteAllowlist = []string{
|
||||
"GET /api/audit",
|
||||
"GET /api/audit/",
|
||||
"GET /api/audit/{id}/verify",
|
||||
"GET /api/audit/export",
|
||||
"GET /api/audit/summary",
|
||||
"GET /api/admin/orgs/{id}/billing-state",
|
||||
"PUT /api/admin/orgs/{id}/billing-state",
|
||||
"POST /api/admin/orgs/{id}/suspend",
|
||||
|
||||
@@ -13,9 +13,10 @@ import (
|
||||
func (r *Router) registerOrgLicenseRoutesGroup(orgHandlers *OrgHandlers, rbacHandlers *RBACHandlers, auditHandlers *AuditHandlers) {
|
||||
conversionConfig := conversion.NewCollectionConfig()
|
||||
conversionHandlers := NewConversionHandlers(
|
||||
conversion.NewRecorder(metering.NewWindowedAggregator()),
|
||||
conversion.NewRecorder(metering.NewWindowedAggregator(), r.conversionStore),
|
||||
conversion.NewPipelineHealth(),
|
||||
conversionConfig,
|
||||
r.conversionStore,
|
||||
)
|
||||
|
||||
// License routes (Pulse Pro)
|
||||
@@ -29,6 +30,7 @@ func (r *Router) registerOrgLicenseRoutesGroup(orgHandlers *OrgHandlers, rbacHan
|
||||
r.mux.HandleFunc("GET /api/conversion/health", RequireAuth(r.config, conversionHandlers.HandleGetHealth))
|
||||
r.mux.HandleFunc("GET /api/conversion/config", RequireAuth(r.config, conversionHandlers.HandleGetConfig))
|
||||
r.mux.HandleFunc("PUT /api/conversion/config", RequireAuth(r.config, conversionHandlers.HandleUpdateConfig))
|
||||
r.mux.HandleFunc("GET /api/admin/conversion-funnel", RequireAdmin(r.config, conversionHandlers.HandleConversionFunnel))
|
||||
|
||||
// Organization routes (multi-tenant foundation)
|
||||
r.mux.HandleFunc("GET /api/orgs", RequireAuth(r.config, RequireScope(config.ScopeSettingsRead, orgHandlers.HandleListOrgs)))
|
||||
@@ -48,6 +50,8 @@ func (r *Router) registerOrgLicenseRoutesGroup(orgHandlers *OrgHandlers, rbacHan
|
||||
r.mux.HandleFunc("GET /api/audit", RequirePermission(r.config, r.authorizer, auth.ActionRead, auth.ResourceAuditLogs, RequireLicenseFeature(r.licenseHandlers, license.FeatureAuditLogging, RequireScope(config.ScopeSettingsRead, auditHandlers.HandleListAuditEvents))))
|
||||
r.mux.HandleFunc("GET /api/audit/", RequirePermission(r.config, r.authorizer, auth.ActionRead, auth.ResourceAuditLogs, RequireLicenseFeature(r.licenseHandlers, license.FeatureAuditLogging, RequireScope(config.ScopeSettingsRead, auditHandlers.HandleListAuditEvents))))
|
||||
r.mux.HandleFunc("GET /api/audit/{id}/verify", RequirePermission(r.config, r.authorizer, auth.ActionRead, auth.ResourceAuditLogs, RequireLicenseFeature(r.licenseHandlers, license.FeatureAuditLogging, RequireScope(config.ScopeSettingsRead, auditHandlers.HandleVerifyAuditEvent))))
|
||||
r.mux.HandleFunc("GET /api/audit/export", RequirePermission(r.config, r.authorizer, auth.ActionRead, auth.ResourceAuditLogs, RequireLicenseFeature(r.licenseHandlers, license.FeatureAuditLogging, RequireScope(config.ScopeSettingsRead, auditHandlers.HandleExportAuditEvents))))
|
||||
r.mux.HandleFunc("GET /api/audit/summary", RequirePermission(r.config, r.authorizer, auth.ActionRead, auth.ResourceAuditLogs, RequireLicenseFeature(r.licenseHandlers, license.FeatureAuditLogging, RequireScope(config.ScopeSettingsRead, auditHandlers.HandleAuditSummary))))
|
||||
|
||||
// RBAC routes (Phase 2 - Enterprise feature)
|
||||
r.mux.HandleFunc("/api/admin/roles", RequirePermission(r.config, r.authorizer, auth.ActionAdmin, auth.ResourceUsers, RequireLicenseFeature(r.licenseHandlers, license.FeatureRBAC, rbacHandlers.HandleRoles)))
|
||||
|
||||
52
pkg/audit/sqlite_factory.go
Normal file
52
pkg/audit/sqlite_factory.go
Normal file
@@ -0,0 +1,52 @@
|
||||
package audit
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
// SQLiteLoggerFactory creates SQLite-backed audit loggers for tenant databases.
|
||||
//
|
||||
// The TenantLoggerManager passes a dbPath like "<base>/orgs/<org>/audit.db".
|
||||
// NewSQLiteLogger expects a DataDir, and creates the database at:
|
||||
//
|
||||
// <DataDir>/audit/audit.db
|
||||
//
|
||||
// This factory bridges that mismatch by extracting the directory from dbPath.
|
||||
type SQLiteLoggerFactory struct {
|
||||
// CryptoMgr is used to enable HMAC signing (optional).
|
||||
// If nil, signing is disabled and signatures will be empty.
|
||||
CryptoMgr CryptoEncryptor
|
||||
|
||||
// CryptoMgrForDataDir optionally provides a per-tenant crypto manager based on DataDir.
|
||||
// If set, it takes precedence over CryptoMgr.
|
||||
CryptoMgrForDataDir func(dataDir string) (CryptoEncryptor, error)
|
||||
|
||||
// RetentionDays controls how long audit events are retained (0 uses SQLiteLogger defaults).
|
||||
RetentionDays int
|
||||
}
|
||||
|
||||
func (f *SQLiteLoggerFactory) CreateLogger(dbPath string) (Logger, error) {
|
||||
if filepath.Clean(dbPath) == "." || dbPath == "" {
|
||||
return nil, fmt.Errorf("db path is required")
|
||||
}
|
||||
|
||||
dataDir := filepath.Dir(dbPath)
|
||||
|
||||
cfg := SQLiteLoggerConfig{
|
||||
DataDir: dataDir,
|
||||
RetentionDays: f.RetentionDays,
|
||||
}
|
||||
|
||||
if f.CryptoMgrForDataDir != nil {
|
||||
cm, err := f.CryptoMgrForDataDir(dataDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cfg.CryptoMgr = cm
|
||||
} else {
|
||||
cfg.CryptoMgr = f.CryptoMgr
|
||||
}
|
||||
|
||||
return NewSQLiteLogger(cfg)
|
||||
}
|
||||
119
pkg/audit/sqlite_factory_test.go
Normal file
119
pkg/audit/sqlite_factory_test.go
Normal file
@@ -0,0 +1,119 @@
|
||||
package audit
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
func TestSQLiteLoggerFactory_CreateLogger_PersistsAndSigns(t *testing.T) {
|
||||
baseDir := t.TempDir()
|
||||
|
||||
factory := &SQLiteLoggerFactory{
|
||||
CryptoMgr: newMockCryptoManager(),
|
||||
}
|
||||
|
||||
dbPath := filepath.Join(baseDir, "orgs", "org-a", "audit.db")
|
||||
logger, err := factory.CreateLogger(dbPath)
|
||||
if err != nil {
|
||||
t.Fatalf("CreateLogger failed: %v", err)
|
||||
}
|
||||
defer logger.Close()
|
||||
|
||||
// Ensure the expected DB path exists (NewSQLiteLogger always creates <DataDir>/audit/audit.db).
|
||||
expectedDB := filepath.Join(filepath.Dir(dbPath), "audit", "audit.db")
|
||||
if _, err := os.Stat(expectedDB); err != nil {
|
||||
t.Fatalf("expected audit db to exist at %q: %v", expectedDB, err)
|
||||
}
|
||||
|
||||
event := Event{
|
||||
ID: uuid.NewString(),
|
||||
Timestamp: time.Now().UTC(),
|
||||
EventType: "factory_test",
|
||||
User: "tester",
|
||||
IP: "127.0.0.1",
|
||||
Path: "/api/test",
|
||||
Success: true,
|
||||
Details: "hello",
|
||||
}
|
||||
|
||||
if err := logger.Log(event); err != nil {
|
||||
t.Fatalf("Log failed: %v", err)
|
||||
}
|
||||
|
||||
events, err := logger.Query(QueryFilter{ID: event.ID, Limit: 1})
|
||||
if err != nil {
|
||||
t.Fatalf("Query failed: %v", err)
|
||||
}
|
||||
if len(events) != 1 {
|
||||
t.Fatalf("expected 1 event, got %d", len(events))
|
||||
}
|
||||
if events[0].Signature == "" {
|
||||
t.Fatalf("expected signature to be present (signing enabled)")
|
||||
}
|
||||
|
||||
verifier, ok := logger.(interface {
|
||||
VerifySignature(Event) bool
|
||||
})
|
||||
if !ok {
|
||||
t.Fatalf("expected factory logger to support signature verification")
|
||||
}
|
||||
if !verifier.VerifySignature(events[0]) {
|
||||
t.Fatalf("expected persisted event signature to verify")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSQLiteLoggerFactory_TenantIsolation(t *testing.T) {
|
||||
baseDir := t.TempDir()
|
||||
|
||||
factory := &SQLiteLoggerFactory{
|
||||
CryptoMgr: newMockCryptoManager(),
|
||||
}
|
||||
|
||||
mgr := NewTenantLoggerManager(baseDir, factory)
|
||||
defer mgr.Close()
|
||||
|
||||
// Write to org-a.
|
||||
if err := mgr.Log("org-a", "login", "alice", "10.0.0.1", "/api/login", true, "ok"); err != nil {
|
||||
t.Fatalf("org-a Log failed: %v", err)
|
||||
}
|
||||
|
||||
// Ensure org-b can't see org-a's events.
|
||||
bEvents, err := mgr.Query("org-b", QueryFilter{Limit: 100})
|
||||
if err != nil {
|
||||
t.Fatalf("org-b Query failed: %v", err)
|
||||
}
|
||||
if len(bEvents) != 0 {
|
||||
t.Fatalf("expected org-b to have 0 events, got %d", len(bEvents))
|
||||
}
|
||||
|
||||
// Write to org-b.
|
||||
if err := mgr.Log("org-b", "login", "bob", "10.0.0.2", "/api/login", true, "ok"); err != nil {
|
||||
t.Fatalf("org-b Log failed: %v", err)
|
||||
}
|
||||
|
||||
aEvents, err := mgr.Query("org-a", QueryFilter{Limit: 100})
|
||||
if err != nil {
|
||||
t.Fatalf("org-a Query failed: %v", err)
|
||||
}
|
||||
if len(aEvents) != 1 {
|
||||
t.Fatalf("expected org-a to have 1 event, got %d", len(aEvents))
|
||||
}
|
||||
if aEvents[0].User != "alice" {
|
||||
t.Fatalf("expected org-a user %q, got %q", "alice", aEvents[0].User)
|
||||
}
|
||||
|
||||
bEvents, err = mgr.Query("org-b", QueryFilter{Limit: 100})
|
||||
if err != nil {
|
||||
t.Fatalf("org-b Query failed: %v", err)
|
||||
}
|
||||
if len(bEvents) != 1 {
|
||||
t.Fatalf("expected org-b to have 1 event, got %d", len(bEvents))
|
||||
}
|
||||
if bEvents[0].User != "bob" {
|
||||
t.Fatalf("expected org-b user %q, got %q", "bob", bEvents[0].User)
|
||||
}
|
||||
}
|
||||
@@ -3,12 +3,14 @@ package audit
|
||||
import (
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
// TenantLoggerManager manages per-tenant audit loggers.
|
||||
// Each tenant gets their own isolated audit database at <orgDir>/audit.db
|
||||
// Each tenant gets their own isolated audit database at <orgDir>/audit/audit.db
|
||||
type TenantLoggerManager struct {
|
||||
mu sync.RWMutex
|
||||
loggers map[string]Logger
|
||||
@@ -93,6 +95,8 @@ func (m *TenantLoggerManager) GetLogger(orgID string) Logger {
|
||||
func (m *TenantLoggerManager) Log(orgID, eventType, user, ip, path string, success bool, details string) error {
|
||||
logger := m.GetLogger(orgID)
|
||||
event := Event{
|
||||
ID: uuid.NewString(),
|
||||
Timestamp: time.Now(),
|
||||
EventType: eventType,
|
||||
User: user,
|
||||
IP: ip,
|
||||
|
||||
@@ -18,8 +18,10 @@ import (
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/alerts"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/api"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/config"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/crypto"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/hosted"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/license"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/license/conversion"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/logging"
|
||||
_ "github.com/rcourtman/pulse-go-rewrite/internal/mock" // Import for init() to run
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/monitoring"
|
||||
@@ -91,12 +93,13 @@ func Run(ctx context.Context, version string) error {
|
||||
// Initialize license public key for Pro feature validation
|
||||
license.InitPublicKey()
|
||||
|
||||
// Multi-tenant persistence is the canonical way to resolve the base data directory.
|
||||
// It uses cfg.DataPath, which already includes PULSE_DATA_DIR overrides.
|
||||
mtPersistence := config.NewMultiTenantPersistence(cfg.DataPath)
|
||||
baseDataDir := mtPersistence.BaseDataDir()
|
||||
|
||||
// Initialize RBAC manager for role-based access control
|
||||
dataDir := os.Getenv("PULSE_DATA_DIR")
|
||||
if dataDir == "" {
|
||||
dataDir = "/etc/pulse"
|
||||
}
|
||||
rbacManager, err := auth.NewFileManager(dataDir)
|
||||
rbacManager, err := auth.NewFileManager(baseDataDir)
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Msg("Failed to initialize RBAC manager, role management will be unavailable")
|
||||
} else {
|
||||
@@ -107,14 +110,55 @@ func Run(ctx context.Context, version string) error {
|
||||
// Run multi-tenant data migration only when the feature is explicitly enabled.
|
||||
// This prevents any on-disk layout changes for default (single-tenant) users.
|
||||
if api.IsMultiTenantEnabled() {
|
||||
if err := config.RunMigrationIfNeeded(dataDir); err != nil {
|
||||
if err := config.RunMigrationIfNeeded(baseDataDir); err != nil {
|
||||
log.Error().Err(err).Msg("Multi-tenant data migration failed")
|
||||
// Continue anyway - migration failure shouldn't block startup
|
||||
}
|
||||
}
|
||||
|
||||
// Conversion telemetry must be durable and tenant-aware (P0-6).
|
||||
conversionStore, err := conversion.NewConversionStore(filepath.Join(baseDataDir, "conversion.db"))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to initialize conversion telemetry store: %w", err)
|
||||
}
|
||||
conversionStoreClosed := false
|
||||
closeConversionStore := func() {
|
||||
if conversionStoreClosed {
|
||||
return
|
||||
}
|
||||
conversionStoreClosed = true
|
||||
if err := conversionStore.Close(); err != nil {
|
||||
log.Error().Err(err).Msg("Failed to close conversion store")
|
||||
}
|
||||
}
|
||||
defer closeConversionStore()
|
||||
|
||||
// Always capture audit events to SQLite (defense in depth). Read/export endpoints are license-gated.
|
||||
// For the default org, TenantLoggerManager routes to the global logger, so initialize it as SQLite too.
|
||||
var globalCrypto audit.CryptoEncryptor
|
||||
if cm, err := crypto.NewCryptoManagerAt(baseDataDir); err != nil {
|
||||
log.Warn().Err(err).Str("data_dir", baseDataDir).Msg("Failed to initialize crypto manager for audit signing; signatures will be disabled")
|
||||
} else {
|
||||
globalCrypto = cm
|
||||
if sqliteLogger, err := audit.NewSQLiteLogger(audit.SQLiteLoggerConfig{
|
||||
DataDir: baseDataDir,
|
||||
CryptoMgr: cm,
|
||||
}); err != nil {
|
||||
log.Warn().Err(err).Str("data_dir", baseDataDir).Msg("Failed to initialize global SQLite audit logger; falling back to console logger")
|
||||
} else {
|
||||
audit.SetLogger(sqliteLogger)
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize tenant audit manager for per-tenant audit logging
|
||||
tenantAuditManager := audit.NewTenantLoggerManager(dataDir, nil)
|
||||
tenantAuditManager := audit.NewTenantLoggerManager(baseDataDir, &audit.SQLiteLoggerFactory{
|
||||
// Prefer per-tenant crypto managers so each org has its own .encryption.key.
|
||||
CryptoMgrForDataDir: func(dataDir string) (audit.CryptoEncryptor, error) {
|
||||
return crypto.NewCryptoManagerAt(dataDir)
|
||||
},
|
||||
// Fallback for environments where per-tenant crypto initialization fails.
|
||||
CryptoMgr: globalCrypto,
|
||||
})
|
||||
api.SetTenantAuditManager(tenantAuditManager)
|
||||
log.Info().Msg("Tenant audit manager initialized")
|
||||
|
||||
@@ -155,7 +199,6 @@ func Run(ctx context.Context, version string) error {
|
||||
go wsHub.Run()
|
||||
|
||||
// Initialize reloadable monitoring system
|
||||
mtPersistence := config.NewMultiTenantPersistence(cfg.DataPath)
|
||||
reloadableMonitor, err := monitoring.NewReloadableMonitor(cfg, mtPersistence, wsHub)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to initialize monitoring system: %w", err)
|
||||
@@ -256,7 +299,7 @@ func Run(ctx context.Context, version string) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
router = api.NewRouter(cfg, reloadableMonitor.GetMonitor(), reloadableMonitor.GetMultiTenantMonitor(), wsHub, reloadFunc, version)
|
||||
router = api.NewRouter(cfg, reloadableMonitor.GetMonitor(), reloadableMonitor.GetMultiTenantMonitor(), wsHub, reloadFunc, version, conversionStore)
|
||||
|
||||
// Inject resource store into monitor for WebSocket broadcasts
|
||||
router.SetMonitor(reloadableMonitor.GetMonitor())
|
||||
@@ -410,6 +453,7 @@ shutdown:
|
||||
if err := audit.Close(); err != nil {
|
||||
log.Error().Err(err).Msg("Failed to close audit logger")
|
||||
}
|
||||
closeConversionStore()
|
||||
|
||||
log.Info().Msg("Server stopped")
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user