182 lines
4.9 KiB
Go
182 lines
4.9 KiB
Go
package tests
|
||
|
||
import (
|
||
"fmt"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"github.com/google/uuid"
|
||
)
|
||
|
||
func (s *IntegrationSuite) TestWorkerConcurrent_SessionCleanup_MassExpired() {
|
||
_, _, userID := s.createUniqueTestUser("session_cleanup", 100.0)
|
||
|
||
expiredTime := time.Now().Add(-24 * time.Hour)
|
||
validTime := time.Now().Add(24 * time.Hour)
|
||
|
||
expiredCount := 100
|
||
validCount := 50
|
||
|
||
for i := 0; i < expiredCount; i++ {
|
||
_, err := s.pool.Exec(s.ctx, `
|
||
INSERT INTO sessions (user_id, access_token, refresh_token, ip, user_agent, expires_at)
|
||
VALUES ($1, $2, $3, '127.0.0.1', 'test-agent', $4)
|
||
`, userID,
|
||
fmt.Sprintf("expired_access_%d_%s", i, uuid.New().String()),
|
||
fmt.Sprintf("expired_refresh_%d_%s", i, uuid.New().String()),
|
||
expiredTime,
|
||
)
|
||
s.Require().NoError(err)
|
||
}
|
||
|
||
for i := 0; i < validCount; i++ {
|
||
_, err := s.pool.Exec(s.ctx, `
|
||
INSERT INTO sessions (user_id, access_token, refresh_token, ip, user_agent, expires_at)
|
||
VALUES ($1, $2, $3, '127.0.0.1', 'test-agent', $4)
|
||
`, userID,
|
||
fmt.Sprintf("valid_access_%d_%s", i, uuid.New().String()),
|
||
fmt.Sprintf("valid_refresh_%d_%s", i, uuid.New().String()),
|
||
validTime,
|
||
)
|
||
s.Require().NoError(err)
|
||
}
|
||
|
||
var totalBefore int
|
||
err := s.pool.QueryRow(s.ctx,
|
||
"SELECT COUNT(*) FROM sessions WHERE user_id = $1", userID,
|
||
).Scan(&totalBefore)
|
||
s.Require().NoError(err)
|
||
s.T().Logf("Sessions before cleanup: %d", totalBefore)
|
||
|
||
var wg sync.WaitGroup
|
||
var totalDeleted int32
|
||
goroutines := 10
|
||
|
||
startBarrier := make(chan struct{})
|
||
|
||
for i := 0; i < goroutines; i++ {
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
<-startBarrier
|
||
|
||
result, err := s.pool.Exec(s.ctx, `
|
||
DELETE FROM sessions
|
||
WHERE expires_at < now()
|
||
OR (revoked_at IS NOT NULL AND revoked_at < now() - interval '30 days')
|
||
`)
|
||
|
||
if err == nil {
|
||
atomic.AddInt32(&totalDeleted, int32(result.RowsAffected()))
|
||
}
|
||
}()
|
||
}
|
||
|
||
close(startBarrier)
|
||
wg.Wait()
|
||
|
||
s.T().Logf("Total deleted by concurrent cleanup: %d", totalDeleted)
|
||
|
||
var validRemaining int
|
||
err = s.pool.QueryRow(s.ctx,
|
||
"SELECT COUNT(*) FROM sessions WHERE user_id = $1 AND expires_at > now()", userID,
|
||
).Scan(&validRemaining)
|
||
s.NoError(err)
|
||
|
||
s.T().Logf("Valid sessions remaining: %d (expected: %d)", validRemaining, validCount)
|
||
|
||
s.Equal(validCount, validRemaining,
|
||
"Все валидные сессии должны остаться после cleanup")
|
||
|
||
s.GreaterOrEqual(int(totalDeleted), expiredCount,
|
||
"Все истекшие сессии должны быть удалены")
|
||
}
|
||
|
||
func (s *IntegrationSuite) TestWorkerConcurrent_InviteCleanup_MassExpired() {
|
||
_, _, userID := s.createUniqueTestUser("invite_cleanup", 100.0)
|
||
|
||
_, err := s.pool.Exec(s.ctx, "UPDATE users SET invites_limit = 200 WHERE id = $1", userID)
|
||
s.Require().NoError(err)
|
||
|
||
expiredTime := time.Now().Add(-24 * time.Hour)
|
||
validTime := time.Now().Add(24 * time.Hour)
|
||
|
||
expiredCount := 100
|
||
validCount := 50
|
||
|
||
for i := 0; i < expiredCount; i++ {
|
||
code := int64(30000000 + i)
|
||
_, err := s.pool.Exec(s.ctx, `
|
||
INSERT INTO invite_codes (user_id, code, can_be_used_count, expires_at, is_active)
|
||
VALUES ($1, $2, 5, $3, true)
|
||
`, userID, code, expiredTime)
|
||
s.Require().NoError(err)
|
||
}
|
||
|
||
for i := 0; i < validCount; i++ {
|
||
code := int64(40000000 + i)
|
||
_, err := s.pool.Exec(s.ctx, `
|
||
INSERT INTO invite_codes (user_id, code, can_be_used_count, expires_at, is_active)
|
||
VALUES ($1, $2, 5, $3, true)
|
||
`, userID, code, validTime)
|
||
s.Require().NoError(err)
|
||
}
|
||
|
||
var activeBefore int
|
||
err = s.pool.QueryRow(s.ctx,
|
||
"SELECT COUNT(*) FROM invite_codes WHERE user_id = $1 AND is_active = true", userID,
|
||
).Scan(&activeBefore)
|
||
s.Require().NoError(err)
|
||
s.T().Logf("Active invites before cleanup: %d", activeBefore)
|
||
|
||
var wg sync.WaitGroup
|
||
var totalDeactivated int32
|
||
goroutines := 10
|
||
|
||
startBarrier := make(chan struct{})
|
||
|
||
for i := 0; i < goroutines; i++ {
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
<-startBarrier
|
||
|
||
result, err := s.pool.Exec(s.ctx, `
|
||
UPDATE invite_codes
|
||
SET is_active = false
|
||
WHERE expires_at < now() AND is_active = true
|
||
`)
|
||
|
||
if err == nil {
|
||
atomic.AddInt32(&totalDeactivated, int32(result.RowsAffected()))
|
||
}
|
||
}()
|
||
}
|
||
|
||
close(startBarrier)
|
||
wg.Wait()
|
||
|
||
s.T().Logf("Total deactivated by concurrent cleanup: %d", totalDeactivated)
|
||
|
||
var activeRemaining int
|
||
err = s.pool.QueryRow(s.ctx,
|
||
"SELECT COUNT(*) FROM invite_codes WHERE user_id = $1 AND is_active = true", userID,
|
||
).Scan(&activeRemaining)
|
||
s.NoError(err)
|
||
|
||
s.T().Logf("Active invites remaining: %d (expected: %d)", activeRemaining, validCount)
|
||
|
||
s.Equal(validCount, activeRemaining,
|
||
"Все валидные инвайты должны остаться активными после cleanup")
|
||
|
||
var expiredStillActive int
|
||
err = s.pool.QueryRow(s.ctx,
|
||
"SELECT COUNT(*) FROM invite_codes WHERE user_id = $1 AND expires_at < now() AND is_active = true", userID,
|
||
).Scan(&expiredStillActive)
|
||
s.NoError(err)
|
||
|
||
s.Equal(0, expiredStillActive,
|
||
"Не должно остаться активных истекших инвайтов")
|
||
}
|