mirror of
https://github.com/rcourtman/Pulse.git
synced 2026-02-18 00:17:39 +01:00
merge: parallel-36-database-persistence into v6
This commit is contained in:
@@ -28,6 +28,7 @@ func NewTenantRegistry(dir string) (*TenantRegistry, error) {
|
||||
dsn := dbPath + "?" + url.Values{
|
||||
"_pragma": []string{
|
||||
"busy_timeout(30000)",
|
||||
"foreign_keys(ON)",
|
||||
"journal_mode(WAL)",
|
||||
"synchronous(NORMAL)",
|
||||
},
|
||||
@@ -71,6 +72,8 @@ func (r *TenantRegistry) initSchema() error {
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_tenants_state ON tenants(state);
|
||||
CREATE INDEX IF NOT EXISTS idx_tenants_stripe_customer_id ON tenants(stripe_customer_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_tenants_state_created_at ON tenants(state, created_at DESC);
|
||||
CREATE INDEX IF NOT EXISTS idx_tenants_account_id_created_at ON tenants(account_id, created_at DESC);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS accounts (
|
||||
id TEXT PRIMARY KEY,
|
||||
@@ -119,6 +122,8 @@ func (r *TenantRegistry) initSchema() error {
|
||||
FOREIGN KEY (user_id) REFERENCES users(id)
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_memberships_user_id ON account_memberships(user_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_memberships_user_id_created_at ON account_memberships(user_id, created_at DESC);
|
||||
CREATE INDEX IF NOT EXISTS idx_memberships_account_id_created_at ON account_memberships(account_id, created_at DESC);
|
||||
`
|
||||
if _, err := r.db.Exec(schema); err != nil {
|
||||
return fmt.Errorf("init tenant registry schema: %w", err)
|
||||
|
||||
@@ -400,6 +400,28 @@ func TestMembershipCRUD(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateMembership_RequiresExistingAccountAndUser(t *testing.T) {
|
||||
reg := newTestRegistry(t)
|
||||
|
||||
accountID, err := GenerateAccountID()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
userID, err := GenerateUserID()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = reg.CreateMembership(&AccountMembership{
|
||||
AccountID: accountID,
|
||||
UserID: userID,
|
||||
Role: MemberRoleOwner,
|
||||
})
|
||||
if err == nil {
|
||||
t.Fatal("expected CreateMembership to fail when account/user do not exist")
|
||||
}
|
||||
}
|
||||
|
||||
func TestList(t *testing.T) {
|
||||
reg := newTestRegistry(t)
|
||||
|
||||
@@ -668,6 +690,20 @@ func TestStripeAccountCRUD(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateStripeAccount_RequiresExistingAccount(t *testing.T) {
|
||||
reg := newTestRegistry(t)
|
||||
|
||||
sa := &StripeAccount{
|
||||
AccountID: "a_missing123",
|
||||
StripeCustomerID: "cus_missing_123",
|
||||
PlanVersion: "msp_hosted_v1",
|
||||
SubscriptionState: "trial",
|
||||
}
|
||||
if err := reg.CreateStripeAccount(sa); err == nil {
|
||||
t.Fatal("expected CreateStripeAccount to fail when account does not exist")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStripeEventIdempotency(t *testing.T) {
|
||||
reg := newTestRegistry(t)
|
||||
|
||||
|
||||
@@ -101,6 +101,7 @@ func (s *ConversionStore) initSchema() error {
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_conversion_events_org_time ON conversion_events(org_id, created_at);
|
||||
CREATE INDEX IF NOT EXISTS idx_conversion_events_type ON conversion_events(event_type, created_at);
|
||||
CREATE INDEX IF NOT EXISTS idx_conversion_events_time ON conversion_events(created_at);
|
||||
`
|
||||
|
||||
if _, err := s.db.Exec(schema); err != nil {
|
||||
|
||||
@@ -87,6 +87,28 @@ func TestConversionStoreIdempotency(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestConversionStoreSchemaHasCreatedAtIndex(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
store, err := NewConversionStore(filepath.Join(tmp, "conversion.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("NewConversionStore() error = %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
var count int
|
||||
err = store.db.QueryRow(
|
||||
`SELECT COUNT(1)
|
||||
FROM sqlite_master
|
||||
WHERE type = 'index' AND name = 'idx_conversion_events_time'`,
|
||||
).Scan(&count)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to query sqlite_master: %v", err)
|
||||
}
|
||||
if count != 1 {
|
||||
t.Fatalf("idx_conversion_events_time missing, count = %d", count)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConversionStoreFunnelSummaryAggregation(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
store, err := NewConversionStore(filepath.Join(tmp, "conversion.db"))
|
||||
|
||||
@@ -148,6 +148,7 @@ func (nq *NotificationQueue) initSchema() error {
|
||||
CREATE INDEX IF NOT EXISTS idx_status ON notification_queue(status);
|
||||
CREATE INDEX IF NOT EXISTS idx_next_retry ON notification_queue(next_retry_at) WHERE status = 'pending';
|
||||
CREATE INDEX IF NOT EXISTS idx_created_at ON notification_queue(created_at);
|
||||
CREATE INDEX IF NOT EXISTS idx_status_completed_at ON notification_queue(status, completed_at);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS notification_audit (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
@@ -673,39 +674,97 @@ func (nq *NotificationQueue) performCleanup() {
|
||||
// Keep completed/failed for 7 days, DLQ for 30 days
|
||||
completedCutoff := time.Now().Add(-7 * 24 * time.Hour).Unix()
|
||||
dlqCutoff := time.Now().Add(-30 * 24 * time.Hour).Unix()
|
||||
auditCutoff := time.Now().Add(-30 * 24 * time.Hour).Unix()
|
||||
|
||||
tx, err := nq.db.Begin()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to start cleanup transaction")
|
||||
return
|
||||
}
|
||||
|
||||
var queueCompletedDeleted int64
|
||||
var queueDLQDeleted int64
|
||||
var auditDeleted int64
|
||||
|
||||
// With foreign keys enabled, delete child audit rows before parent queue rows.
|
||||
query := `DELETE FROM notification_audit WHERE notification_id IN (
|
||||
SELECT id FROM notification_queue
|
||||
WHERE status IN ('sent', 'failed', 'cancelled') AND completed_at < ?
|
||||
)`
|
||||
result, err := tx.Exec(query, completedCutoff)
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
log.Error().Err(err).Msg("Failed to cleanup audit rows for completed notifications")
|
||||
return
|
||||
}
|
||||
if rows, rowsErr := result.RowsAffected(); rowsErr == nil {
|
||||
auditDeleted += rows
|
||||
}
|
||||
|
||||
// Clean completed/sent/failed/cancelled
|
||||
query := `DELETE FROM notification_queue WHERE status IN ('sent', 'failed', 'cancelled') AND completed_at < ?`
|
||||
result, err := nq.db.Exec(query, completedCutoff)
|
||||
query = `DELETE FROM notification_queue WHERE status IN ('sent', 'failed', 'cancelled') AND completed_at < ?`
|
||||
result, err = tx.Exec(query, completedCutoff)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to cleanup old notifications")
|
||||
} else {
|
||||
if rows, _ := result.RowsAffected(); rows > 0 {
|
||||
log.Info().Int64("count", rows).Msg("Cleaned up old completed notifications")
|
||||
}
|
||||
_ = tx.Rollback()
|
||||
log.Error().Err(err).Msg("Failed to cleanup old completed notifications")
|
||||
return
|
||||
}
|
||||
if rows, rowsErr := result.RowsAffected(); rowsErr == nil {
|
||||
queueCompletedDeleted = rows
|
||||
}
|
||||
|
||||
// Delete audit rows for expired DLQ entries before deleting DLQ queue rows.
|
||||
query = `DELETE FROM notification_audit WHERE notification_id IN (
|
||||
SELECT id FROM notification_queue
|
||||
WHERE status = 'dlq' AND completed_at < ?
|
||||
)`
|
||||
result, err = tx.Exec(query, dlqCutoff)
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
log.Error().Err(err).Msg("Failed to cleanup audit rows for old DLQ notifications")
|
||||
return
|
||||
}
|
||||
if rows, rowsErr := result.RowsAffected(); rowsErr == nil {
|
||||
auditDeleted += rows
|
||||
}
|
||||
|
||||
// Clean old DLQ entries
|
||||
query = `DELETE FROM notification_queue WHERE status = 'dlq' AND completed_at < ?`
|
||||
result, err = nq.db.Exec(query, dlqCutoff)
|
||||
result, err = tx.Exec(query, dlqCutoff)
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
log.Error().Err(err).Msg("Failed to cleanup old DLQ entries")
|
||||
} else {
|
||||
if rows, _ := result.RowsAffected(); rows > 0 {
|
||||
log.Info().Int64("count", rows).Msg("Cleaned up old DLQ entries")
|
||||
}
|
||||
return
|
||||
}
|
||||
if rows, rowsErr := result.RowsAffected(); rowsErr == nil {
|
||||
queueDLQDeleted = rows
|
||||
}
|
||||
|
||||
// Clean old audit logs (keep 30 days)
|
||||
auditCutoff := time.Now().Add(-30 * 24 * time.Hour).Unix()
|
||||
query = `DELETE FROM notification_audit WHERE timestamp < ?`
|
||||
result, err = nq.db.Exec(query, auditCutoff)
|
||||
result, err = tx.Exec(query, auditCutoff)
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
log.Error().Err(err).Msg("Failed to cleanup old audit logs")
|
||||
} else {
|
||||
if rows, _ := result.RowsAffected(); rows > 0 {
|
||||
log.Debug().Int64("count", rows).Msg("Cleaned up old audit logs")
|
||||
}
|
||||
return
|
||||
}
|
||||
if rows, rowsErr := result.RowsAffected(); rowsErr == nil {
|
||||
auditDeleted += rows
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
log.Error().Err(err).Msg("Failed to commit cleanup transaction")
|
||||
return
|
||||
}
|
||||
|
||||
if queueCompletedDeleted > 0 {
|
||||
log.Info().Int64("count", queueCompletedDeleted).Msg("Cleaned up old completed notifications")
|
||||
}
|
||||
if queueDLQDeleted > 0 {
|
||||
log.Info().Int64("count", queueDLQDeleted).Msg("Cleaned up old DLQ entries")
|
||||
}
|
||||
if auditDeleted > 0 {
|
||||
log.Debug().Int64("count", auditDeleted).Msg("Cleaned up old audit logs")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -318,6 +318,15 @@ func TestCancelByAlertIDs_MatchingNotificationCancelled(t *testing.T) {
|
||||
if stats["cancelled"] != 1 {
|
||||
t.Errorf("Expected 1 cancelled notification, got %d (stats: %v)", stats["cancelled"], stats)
|
||||
}
|
||||
|
||||
var completedAt sql.NullInt64
|
||||
err = nq.db.QueryRow(`SELECT completed_at FROM notification_queue WHERE id = ?`, "notif-1").Scan(&completedAt)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to query completed_at: %v", err)
|
||||
}
|
||||
if !completedAt.Valid || completedAt.Int64 <= 0 {
|
||||
t.Errorf("Expected completed_at to be set for cancelled notification, got %+v", completedAt)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCancelByAlertIDs_MultipleAlertsPartialMatch(t *testing.T) {
|
||||
@@ -779,6 +788,54 @@ func TestPerformCleanup(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("cleanup removes old completed entries with audit rows", func(t *testing.T) {
|
||||
tempDir := t.TempDir()
|
||||
nq, err := NewNotificationQueue(tempDir)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create notification queue: %v", err)
|
||||
}
|
||||
defer nq.Stop()
|
||||
|
||||
oldTime := time.Now().Add(-10 * 24 * time.Hour).Unix() // 10 days ago
|
||||
recentAuditTime := time.Now().Add(-1 * time.Hour).Unix()
|
||||
|
||||
_, err = nq.db.Exec(`
|
||||
INSERT INTO notification_queue
|
||||
(id, type, status, config, alerts, attempts, max_attempts, created_at, completed_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||
"old-sent-with-audit", "email", "sent", "{}", "[]", 1, 3, oldTime, oldTime)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to insert old notification: %v", err)
|
||||
}
|
||||
|
||||
_, err = nq.db.Exec(`
|
||||
INSERT INTO notification_audit (notification_id, type, status, timestamp)
|
||||
VALUES (?, ?, ?, ?)`,
|
||||
"old-sent-with-audit", "email", "sent", recentAuditTime)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to insert audit row: %v", err)
|
||||
}
|
||||
|
||||
nq.performCleanup()
|
||||
|
||||
var count int
|
||||
err = nq.db.QueryRow(`SELECT COUNT(*) FROM notification_queue WHERE id = ?`, "old-sent-with-audit").Scan(&count)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to query queue row: %v", err)
|
||||
}
|
||||
if count != 0 {
|
||||
t.Error("old completed notification with audit row should have been cleaned up")
|
||||
}
|
||||
|
||||
err = nq.db.QueryRow(`SELECT COUNT(*) FROM notification_audit WHERE notification_id = ?`, "old-sent-with-audit").Scan(&count)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to query audit rows: %v", err)
|
||||
}
|
||||
if count != 0 {
|
||||
t.Error("audit rows for deleted notification should have been cleaned up")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("cleanup removes old DLQ entries", func(t *testing.T) {
|
||||
tempDir := t.TempDir()
|
||||
nq, err := NewNotificationQueue(tempDir)
|
||||
|
||||
@@ -259,6 +259,10 @@ func (l *SQLiteLogger) Query(filter QueryFilter) ([]Event, error) {
|
||||
args = append(args, filter.Limit)
|
||||
}
|
||||
if filter.Offset > 0 {
|
||||
// SQLite requires LIMIT when OFFSET is present.
|
||||
if filter.Limit <= 0 {
|
||||
query += " LIMIT -1"
|
||||
}
|
||||
query += " OFFSET ?"
|
||||
args = append(args, filter.Offset)
|
||||
}
|
||||
|
||||
@@ -187,6 +187,17 @@ func TestSQLiteLoggerQuery(t *testing.T) {
|
||||
t.Errorf("Expected 2 events with limit, got %d", len(results))
|
||||
}
|
||||
})
|
||||
|
||||
// SQLite requires LIMIT when using OFFSET; OFFSET-only should still work.
|
||||
t.Run("OffsetWithoutLimit", func(t *testing.T) {
|
||||
results, err := logger.Query(QueryFilter{Offset: 1})
|
||||
if err != nil {
|
||||
t.Fatalf("Query failed: %v", err)
|
||||
}
|
||||
if len(results) != 3 {
|
||||
t.Errorf("Expected 3 events with offset-only query, got %d", len(results))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestSQLiteLoggerCount(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user