package tests import ( "fmt" "sync" "sync/atomic" "time" "github.com/google/uuid" ) func (s *IntegrationSuite) TestWorkerConcurrent_SessionCleanup_MassExpired() { _, _, userID := s.createUniqueTestUser("session_cleanup", 100.0) expiredTime := time.Now().Add(-24 * time.Hour) validTime := time.Now().Add(24 * time.Hour) expiredCount := 100 validCount := 50 for i := 0; i < expiredCount; i++ { _, err := s.pool.Exec(s.ctx, ` INSERT INTO sessions (user_id, access_token, refresh_token, ip, user_agent, expires_at) VALUES ($1, $2, $3, '127.0.0.1', 'test-agent', $4) `, userID, fmt.Sprintf("expired_access_%d_%s", i, uuid.New().String()), fmt.Sprintf("expired_refresh_%d_%s", i, uuid.New().String()), expiredTime, ) s.Require().NoError(err) } for i := 0; i < validCount; i++ { _, err := s.pool.Exec(s.ctx, ` INSERT INTO sessions (user_id, access_token, refresh_token, ip, user_agent, expires_at) VALUES ($1, $2, $3, '127.0.0.1', 'test-agent', $4) `, userID, fmt.Sprintf("valid_access_%d_%s", i, uuid.New().String()), fmt.Sprintf("valid_refresh_%d_%s", i, uuid.New().String()), validTime, ) s.Require().NoError(err) } var totalBefore int err := s.pool.QueryRow(s.ctx, "SELECT COUNT(*) FROM sessions WHERE user_id = $1", userID, ).Scan(&totalBefore) s.Require().NoError(err) s.T().Logf("Sessions before cleanup: %d", totalBefore) var wg sync.WaitGroup var totalDeleted int32 goroutines := 10 startBarrier := make(chan struct{}) for i := 0; i < goroutines; i++ { wg.Add(1) go func() { defer wg.Done() <-startBarrier result, err := s.pool.Exec(s.ctx, ` DELETE FROM sessions WHERE expires_at < now() OR (revoked_at IS NOT NULL AND revoked_at < now() - interval '30 days') `) if err == nil { atomic.AddInt32(&totalDeleted, int32(result.RowsAffected())) } }() } close(startBarrier) wg.Wait() s.T().Logf("Total deleted by concurrent cleanup: %d", totalDeleted) var validRemaining int err = s.pool.QueryRow(s.ctx, "SELECT COUNT(*) FROM sessions WHERE user_id = $1 AND expires_at > now()", userID, ).Scan(&validRemaining) s.NoError(err) s.T().Logf("Valid sessions remaining: %d (expected: %d)", validRemaining, validCount) s.Equal(validCount, validRemaining, "Все валидные сессии должны остаться после cleanup") s.GreaterOrEqual(int(totalDeleted), expiredCount, "Все истекшие сессии должны быть удалены") } func (s *IntegrationSuite) TestWorkerConcurrent_InviteCleanup_MassExpired() { _, _, userID := s.createUniqueTestUser("invite_cleanup", 100.0) _, err := s.pool.Exec(s.ctx, "UPDATE users SET invites_limit = 200 WHERE id = $1", userID) s.Require().NoError(err) expiredTime := time.Now().Add(-24 * time.Hour) validTime := time.Now().Add(24 * time.Hour) expiredCount := 100 validCount := 50 for i := 0; i < expiredCount; i++ { code := int64(30000000 + i) _, err := s.pool.Exec(s.ctx, ` INSERT INTO invite_codes (user_id, code, can_be_used_count, expires_at, is_active) VALUES ($1, $2, 5, $3, true) `, userID, code, expiredTime) s.Require().NoError(err) } for i := 0; i < validCount; i++ { code := int64(40000000 + i) _, err := s.pool.Exec(s.ctx, ` INSERT INTO invite_codes (user_id, code, can_be_used_count, expires_at, is_active) VALUES ($1, $2, 5, $3, true) `, userID, code, validTime) s.Require().NoError(err) } var activeBefore int err = s.pool.QueryRow(s.ctx, "SELECT COUNT(*) FROM invite_codes WHERE user_id = $1 AND is_active = true", userID, ).Scan(&activeBefore) s.Require().NoError(err) s.T().Logf("Active invites before cleanup: %d", activeBefore) var wg sync.WaitGroup var totalDeactivated int32 goroutines := 10 startBarrier := make(chan struct{}) for i := 0; i < goroutines; i++ { wg.Add(1) go func() { defer wg.Done() <-startBarrier result, err := s.pool.Exec(s.ctx, ` UPDATE invite_codes SET is_active = false WHERE expires_at < now() AND is_active = true `) if err == nil { atomic.AddInt32(&totalDeactivated, int32(result.RowsAffected())) } }() } close(startBarrier) wg.Wait() s.T().Logf("Total deactivated by concurrent cleanup: %d", totalDeactivated) var activeRemaining int err = s.pool.QueryRow(s.ctx, "SELECT COUNT(*) FROM invite_codes WHERE user_id = $1 AND is_active = true", userID, ).Scan(&activeRemaining) s.NoError(err) s.T().Logf("Active invites remaining: %d (expected: %d)", activeRemaining, validCount) s.Equal(validCount, activeRemaining, "Все валидные инвайты должны остаться активными после cleanup") var expiredStillActive int err = s.pool.QueryRow(s.ctx, "SELECT COUNT(*) FROM invite_codes WHERE user_id = $1 AND expires_at < now() AND is_active = true", userID, ).Scan(&expiredStillActive) s.NoError(err) s.Equal(0, expiredStillActive, "Не должно остаться активных истекших инвайтов") }