diff --git a/internal/cloudcp/registry/registry.go b/internal/cloudcp/registry/registry.go index bfd0ae241..7c849553b 100644 --- a/internal/cloudcp/registry/registry.go +++ b/internal/cloudcp/registry/registry.go @@ -28,6 +28,7 @@ func NewTenantRegistry(dir string) (*TenantRegistry, error) { dsn := dbPath + "?" + url.Values{ "_pragma": []string{ "busy_timeout(30000)", + "foreign_keys(ON)", "journal_mode(WAL)", "synchronous(NORMAL)", }, @@ -71,6 +72,8 @@ func (r *TenantRegistry) initSchema() error { ); CREATE INDEX IF NOT EXISTS idx_tenants_state ON tenants(state); CREATE INDEX IF NOT EXISTS idx_tenants_stripe_customer_id ON tenants(stripe_customer_id); + CREATE INDEX IF NOT EXISTS idx_tenants_state_created_at ON tenants(state, created_at DESC); + CREATE INDEX IF NOT EXISTS idx_tenants_account_id_created_at ON tenants(account_id, created_at DESC); CREATE TABLE IF NOT EXISTS accounts ( id TEXT PRIMARY KEY, @@ -119,6 +122,8 @@ func (r *TenantRegistry) initSchema() error { FOREIGN KEY (user_id) REFERENCES users(id) ); CREATE INDEX IF NOT EXISTS idx_memberships_user_id ON account_memberships(user_id); + CREATE INDEX IF NOT EXISTS idx_memberships_user_id_created_at ON account_memberships(user_id, created_at DESC); + CREATE INDEX IF NOT EXISTS idx_memberships_account_id_created_at ON account_memberships(account_id, created_at DESC); ` if _, err := r.db.Exec(schema); err != nil { return fmt.Errorf("init tenant registry schema: %w", err) diff --git a/internal/cloudcp/registry/registry_test.go b/internal/cloudcp/registry/registry_test.go index 5bffde473..ace672a39 100644 --- a/internal/cloudcp/registry/registry_test.go +++ b/internal/cloudcp/registry/registry_test.go @@ -400,6 +400,28 @@ func TestMembershipCRUD(t *testing.T) { } } +func TestCreateMembership_RequiresExistingAccountAndUser(t *testing.T) { + reg := newTestRegistry(t) + + accountID, err := GenerateAccountID() + if err != nil { + t.Fatal(err) + } + userID, err := GenerateUserID() + if err != nil { + t.Fatal(err) + } + + err = reg.CreateMembership(&AccountMembership{ + AccountID: accountID, + UserID: userID, + Role: MemberRoleOwner, + }) + if err == nil { + t.Fatal("expected CreateMembership to fail when account/user do not exist") + } +} + func TestList(t *testing.T) { reg := newTestRegistry(t) @@ -668,6 +690,20 @@ func TestStripeAccountCRUD(t *testing.T) { } } +func TestCreateStripeAccount_RequiresExistingAccount(t *testing.T) { + reg := newTestRegistry(t) + + sa := &StripeAccount{ + AccountID: "a_missing123", + StripeCustomerID: "cus_missing_123", + PlanVersion: "msp_hosted_v1", + SubscriptionState: "trial", + } + if err := reg.CreateStripeAccount(sa); err == nil { + t.Fatal("expected CreateStripeAccount to fail when account does not exist") + } +} + func TestStripeEventIdempotency(t *testing.T) { reg := newTestRegistry(t) diff --git a/internal/license/conversion/store.go b/internal/license/conversion/store.go index 3cefc07bf..eed46b4b5 100644 --- a/internal/license/conversion/store.go +++ b/internal/license/conversion/store.go @@ -101,6 +101,7 @@ func (s *ConversionStore) initSchema() error { ); CREATE INDEX IF NOT EXISTS idx_conversion_events_org_time ON conversion_events(org_id, created_at); CREATE INDEX IF NOT EXISTS idx_conversion_events_type ON conversion_events(event_type, created_at); + CREATE INDEX IF NOT EXISTS idx_conversion_events_time ON conversion_events(created_at); ` if _, err := s.db.Exec(schema); err != nil { diff --git a/internal/license/conversion/store_test.go b/internal/license/conversion/store_test.go index da888a4d4..dc2b3010c 100644 --- a/internal/license/conversion/store_test.go +++ b/internal/license/conversion/store_test.go @@ -87,6 +87,28 @@ func TestConversionStoreIdempotency(t *testing.T) { } } +func TestConversionStoreSchemaHasCreatedAtIndex(t *testing.T) { + tmp := t.TempDir() + store, err := NewConversionStore(filepath.Join(tmp, "conversion.db")) + if err != nil { + t.Fatalf("NewConversionStore() error = %v", err) + } + defer store.Close() + + var count int + err = store.db.QueryRow( + `SELECT COUNT(1) + FROM sqlite_master + WHERE type = 'index' AND name = 'idx_conversion_events_time'`, + ).Scan(&count) + if err != nil { + t.Fatalf("failed to query sqlite_master: %v", err) + } + if count != 1 { + t.Fatalf("idx_conversion_events_time missing, count = %d", count) + } +} + func TestConversionStoreFunnelSummaryAggregation(t *testing.T) { tmp := t.TempDir() store, err := NewConversionStore(filepath.Join(tmp, "conversion.db")) diff --git a/internal/notifications/queue.go b/internal/notifications/queue.go index abe1b8538..12a6d7eb0 100644 --- a/internal/notifications/queue.go +++ b/internal/notifications/queue.go @@ -148,6 +148,7 @@ func (nq *NotificationQueue) initSchema() error { CREATE INDEX IF NOT EXISTS idx_status ON notification_queue(status); CREATE INDEX IF NOT EXISTS idx_next_retry ON notification_queue(next_retry_at) WHERE status = 'pending'; CREATE INDEX IF NOT EXISTS idx_created_at ON notification_queue(created_at); + CREATE INDEX IF NOT EXISTS idx_status_completed_at ON notification_queue(status, completed_at); CREATE TABLE IF NOT EXISTS notification_audit ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -673,39 +674,97 @@ func (nq *NotificationQueue) performCleanup() { // Keep completed/failed for 7 days, DLQ for 30 days completedCutoff := time.Now().Add(-7 * 24 * time.Hour).Unix() dlqCutoff := time.Now().Add(-30 * 24 * time.Hour).Unix() + auditCutoff := time.Now().Add(-30 * 24 * time.Hour).Unix() + + tx, err := nq.db.Begin() + if err != nil { + log.Error().Err(err).Msg("Failed to start cleanup transaction") + return + } + + var queueCompletedDeleted int64 + var queueDLQDeleted int64 + var auditDeleted int64 + + // With foreign keys enabled, delete child audit rows before parent queue rows. + query := `DELETE FROM notification_audit WHERE notification_id IN ( + SELECT id FROM notification_queue + WHERE status IN ('sent', 'failed', 'cancelled') AND completed_at < ? + )` + result, err := tx.Exec(query, completedCutoff) + if err != nil { + _ = tx.Rollback() + log.Error().Err(err).Msg("Failed to cleanup audit rows for completed notifications") + return + } + if rows, rowsErr := result.RowsAffected(); rowsErr == nil { + auditDeleted += rows + } // Clean completed/sent/failed/cancelled - query := `DELETE FROM notification_queue WHERE status IN ('sent', 'failed', 'cancelled') AND completed_at < ?` - result, err := nq.db.Exec(query, completedCutoff) + query = `DELETE FROM notification_queue WHERE status IN ('sent', 'failed', 'cancelled') AND completed_at < ?` + result, err = tx.Exec(query, completedCutoff) if err != nil { - log.Error().Err(err).Msg("Failed to cleanup old notifications") - } else { - if rows, _ := result.RowsAffected(); rows > 0 { - log.Info().Int64("count", rows).Msg("Cleaned up old completed notifications") - } + _ = tx.Rollback() + log.Error().Err(err).Msg("Failed to cleanup old completed notifications") + return + } + if rows, rowsErr := result.RowsAffected(); rowsErr == nil { + queueCompletedDeleted = rows + } + + // Delete audit rows for expired DLQ entries before deleting DLQ queue rows. + query = `DELETE FROM notification_audit WHERE notification_id IN ( + SELECT id FROM notification_queue + WHERE status = 'dlq' AND completed_at < ? + )` + result, err = tx.Exec(query, dlqCutoff) + if err != nil { + _ = tx.Rollback() + log.Error().Err(err).Msg("Failed to cleanup audit rows for old DLQ notifications") + return + } + if rows, rowsErr := result.RowsAffected(); rowsErr == nil { + auditDeleted += rows } // Clean old DLQ entries query = `DELETE FROM notification_queue WHERE status = 'dlq' AND completed_at < ?` - result, err = nq.db.Exec(query, dlqCutoff) + result, err = tx.Exec(query, dlqCutoff) if err != nil { + _ = tx.Rollback() log.Error().Err(err).Msg("Failed to cleanup old DLQ entries") - } else { - if rows, _ := result.RowsAffected(); rows > 0 { - log.Info().Int64("count", rows).Msg("Cleaned up old DLQ entries") - } + return + } + if rows, rowsErr := result.RowsAffected(); rowsErr == nil { + queueDLQDeleted = rows } // Clean old audit logs (keep 30 days) - auditCutoff := time.Now().Add(-30 * 24 * time.Hour).Unix() query = `DELETE FROM notification_audit WHERE timestamp < ?` - result, err = nq.db.Exec(query, auditCutoff) + result, err = tx.Exec(query, auditCutoff) if err != nil { + _ = tx.Rollback() log.Error().Err(err).Msg("Failed to cleanup old audit logs") - } else { - if rows, _ := result.RowsAffected(); rows > 0 { - log.Debug().Int64("count", rows).Msg("Cleaned up old audit logs") - } + return + } + if rows, rowsErr := result.RowsAffected(); rowsErr == nil { + auditDeleted += rows + } + + if err := tx.Commit(); err != nil { + log.Error().Err(err).Msg("Failed to commit cleanup transaction") + return + } + + if queueCompletedDeleted > 0 { + log.Info().Int64("count", queueCompletedDeleted).Msg("Cleaned up old completed notifications") + } + if queueDLQDeleted > 0 { + log.Info().Int64("count", queueDLQDeleted).Msg("Cleaned up old DLQ entries") + } + if auditDeleted > 0 { + log.Debug().Int64("count", auditDeleted).Msg("Cleaned up old audit logs") } } diff --git a/internal/notifications/queue_test.go b/internal/notifications/queue_test.go index e2435c575..936dd8f77 100644 --- a/internal/notifications/queue_test.go +++ b/internal/notifications/queue_test.go @@ -318,6 +318,15 @@ func TestCancelByAlertIDs_MatchingNotificationCancelled(t *testing.T) { if stats["cancelled"] != 1 { t.Errorf("Expected 1 cancelled notification, got %d (stats: %v)", stats["cancelled"], stats) } + + var completedAt sql.NullInt64 + err = nq.db.QueryRow(`SELECT completed_at FROM notification_queue WHERE id = ?`, "notif-1").Scan(&completedAt) + if err != nil { + t.Fatalf("Failed to query completed_at: %v", err) + } + if !completedAt.Valid || completedAt.Int64 <= 0 { + t.Errorf("Expected completed_at to be set for cancelled notification, got %+v", completedAt) + } } func TestCancelByAlertIDs_MultipleAlertsPartialMatch(t *testing.T) { @@ -779,6 +788,54 @@ func TestPerformCleanup(t *testing.T) { } }) + t.Run("cleanup removes old completed entries with audit rows", func(t *testing.T) { + tempDir := t.TempDir() + nq, err := NewNotificationQueue(tempDir) + if err != nil { + t.Fatalf("Failed to create notification queue: %v", err) + } + defer nq.Stop() + + oldTime := time.Now().Add(-10 * 24 * time.Hour).Unix() // 10 days ago + recentAuditTime := time.Now().Add(-1 * time.Hour).Unix() + + _, err = nq.db.Exec(` + INSERT INTO notification_queue + (id, type, status, config, alerts, attempts, max_attempts, created_at, completed_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, + "old-sent-with-audit", "email", "sent", "{}", "[]", 1, 3, oldTime, oldTime) + if err != nil { + t.Fatalf("Failed to insert old notification: %v", err) + } + + _, err = nq.db.Exec(` + INSERT INTO notification_audit (notification_id, type, status, timestamp) + VALUES (?, ?, ?, ?)`, + "old-sent-with-audit", "email", "sent", recentAuditTime) + if err != nil { + t.Fatalf("Failed to insert audit row: %v", err) + } + + nq.performCleanup() + + var count int + err = nq.db.QueryRow(`SELECT COUNT(*) FROM notification_queue WHERE id = ?`, "old-sent-with-audit").Scan(&count) + if err != nil { + t.Fatalf("Failed to query queue row: %v", err) + } + if count != 0 { + t.Error("old completed notification with audit row should have been cleaned up") + } + + err = nq.db.QueryRow(`SELECT COUNT(*) FROM notification_audit WHERE notification_id = ?`, "old-sent-with-audit").Scan(&count) + if err != nil { + t.Fatalf("Failed to query audit rows: %v", err) + } + if count != 0 { + t.Error("audit rows for deleted notification should have been cleaned up") + } + }) + t.Run("cleanup removes old DLQ entries", func(t *testing.T) { tempDir := t.TempDir() nq, err := NewNotificationQueue(tempDir) diff --git a/pkg/audit/sqlite_logger.go b/pkg/audit/sqlite_logger.go index 85e44fb00..6ab0c9375 100644 --- a/pkg/audit/sqlite_logger.go +++ b/pkg/audit/sqlite_logger.go @@ -259,6 +259,10 @@ func (l *SQLiteLogger) Query(filter QueryFilter) ([]Event, error) { args = append(args, filter.Limit) } if filter.Offset > 0 { + // SQLite requires LIMIT when OFFSET is present. + if filter.Limit <= 0 { + query += " LIMIT -1" + } query += " OFFSET ?" args = append(args, filter.Offset) } diff --git a/pkg/audit/sqlite_logger_test.go b/pkg/audit/sqlite_logger_test.go index 02ce93707..c79fa436c 100644 --- a/pkg/audit/sqlite_logger_test.go +++ b/pkg/audit/sqlite_logger_test.go @@ -187,6 +187,17 @@ func TestSQLiteLoggerQuery(t *testing.T) { t.Errorf("Expected 2 events with limit, got %d", len(results)) } }) + + // SQLite requires LIMIT when using OFFSET; OFFSET-only should still work. + t.Run("OffsetWithoutLimit", func(t *testing.T) { + results, err := logger.Query(QueryFilter{Offset: 1}) + if err != nil { + t.Fatalf("Query failed: %v", err) + } + if len(results) != 3 { + t.Errorf("Expected 3 events with offset-only query, got %d", len(results)) + } + }) } func TestSQLiteLoggerCount(t *testing.T) {