Files
smart-search-back/tests/concurrent_test.go
vallyenfail 1dbdb820cd
All checks were successful
Deploy Smart Search Backend Test / deploy (push) Successful in 1m25s
add service
2026-01-18 00:42:01 +03:00

1106 lines
28 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package tests
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"git.techease.ru/Smart-search/smart-search-back/internal/repository"
"git.techease.ru/Smart-search/smart-search-back/pkg/crypto"
authpb "git.techease.ru/Smart-search/smart-search-back/pkg/pb/auth"
invitepb "git.techease.ru/Smart-search/smart-search-back/pkg/pb/invite"
"github.com/jackc/pgx/v5"
)
func (s *IntegrationSuite) TestConcurrent_GenerateInvites_ExceedsLimit() {
email := "invite_limit_test@example.com"
password := "testpassword"
inviteLimit := 5
s.createTestUserWithLimit(email, password, inviteLimit)
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
emailHash := cryptoHelper.EmailHash(email)
var userID int64
err := s.pool.QueryRow(s.ctx,
"SELECT id FROM users WHERE email_hash = $1",
emailHash,
).Scan(&userID)
s.Require().NoError(err)
var wg sync.WaitGroup
var successCount int32
var errorCount int32
goroutines := 20
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err := s.inviteClient.Generate(s.ctx, &invitepb.GenerateRequest{
UserId: userID,
MaxUses: 1,
TtlDays: 7,
})
if err == nil {
atomic.AddInt32(&successCount, 1)
} else {
atomic.AddInt32(&errorCount, 1)
}
}()
}
wg.Wait()
s.T().Logf("Success: %d, Errors: %d", successCount, errorCount)
s.LessOrEqual(int(successCount), inviteLimit,
"Количество успешных генераций (%d) не должно превышать лимит (%d)", successCount, inviteLimit)
var invitesIssued int
err = s.pool.QueryRow(s.ctx,
"SELECT invites_issued FROM users WHERE email_hash = $1",
emailHash,
).Scan(&invitesIssued)
s.Require().NoError(err)
s.LessOrEqual(invitesIssued, inviteLimit,
"invites_issued (%d) не должен превышать invites_limit (%d)", invitesIssued, inviteLimit)
}
func (s *IntegrationSuite) TestConcurrent_UpdateBalance_GoesNegative() {
email := "balance_test@example.com"
password := "testpassword"
initialBalance := 100.0
s.createTestUserWithBalance(email, password, initialBalance)
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
emailHash := cryptoHelper.EmailHash(email)
var wg sync.WaitGroup
var successCount int32
var errorCount int32
goroutines := 20
debitAmount := 50.0
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
result, err := s.pool.Exec(s.ctx,
"UPDATE users SET balance = balance - $1 WHERE email_hash = $2 AND balance - $1 >= 0",
debitAmount, emailHash,
)
if err == nil && result.RowsAffected() > 0 {
atomic.AddInt32(&successCount, 1)
} else {
atomic.AddInt32(&errorCount, 1)
}
}()
}
wg.Wait()
s.T().Logf("Success: %d, Errors: %d", successCount, errorCount)
var finalBalance float64
err := s.pool.QueryRow(s.ctx,
"SELECT balance FROM users WHERE email_hash = $1",
emailHash,
).Scan(&finalBalance)
s.Require().NoError(err)
s.T().Logf("Final balance: %.2f", finalBalance)
s.GreaterOrEqual(finalBalance, 0.0,
"Баланс (%f) не должен быть отрицательным", finalBalance)
maxPossibleDebits := int(initialBalance / debitAmount)
s.Equal(maxPossibleDebits, int(successCount),
"Должно быть ровно %d успешных списаний при балансе %.0f и списании %.0f",
maxPossibleDebits, initialBalance, debitAmount)
}
func (s *IntegrationSuite) TestConcurrent_UpdateBalance_ViaRepository() {
email := "balance_repo_test@example.com"
password := "testpassword"
initialBalance := 1000.0
s.createTestUserWithBalance(email, password, initialBalance)
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
emailHash := cryptoHelper.EmailHash(email)
var userID int
err := s.pool.QueryRow(s.ctx,
"SELECT id FROM users WHERE email_hash = $1",
emailHash,
).Scan(&userID)
s.Require().NoError(err)
var wg sync.WaitGroup
var successCount int32
var errorCount int32
goroutines := 20
debitAmount := 200.0
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
result, err := s.pool.Exec(s.ctx,
"UPDATE users SET balance = balance + $1 WHERE id = $2 AND balance + $1 >= 0",
-debitAmount, userID,
)
if err == nil && result.RowsAffected() > 0 {
atomic.AddInt32(&successCount, 1)
} else {
atomic.AddInt32(&errorCount, 1)
}
}()
}
wg.Wait()
s.T().Logf("Success: %d, Errors: %d", successCount, errorCount)
var finalBalance float64
err = s.pool.QueryRow(s.ctx,
"SELECT balance FROM users WHERE id = $1",
userID,
).Scan(&finalBalance)
s.Require().NoError(err)
s.T().Logf("Final balance: %.2f", finalBalance)
s.GreaterOrEqual(finalBalance, 0.0,
"Баланс (%f) не должен быть отрицательным", finalBalance)
expectedMaxDebits := int(initialBalance / debitAmount)
s.Equal(expectedMaxDebits, int(successCount),
"Должно быть ровно %d успешных списаний при балансе %.0f и списании %.0f",
expectedMaxDebits, initialBalance, debitAmount)
expectedFinalBalance := initialBalance - float64(successCount)*debitAmount
s.Equal(expectedFinalBalance, finalBalance,
"Финальный баланс должен быть %.2f", expectedFinalBalance)
}
func (s *IntegrationSuite) TestConcurrent_IncrementInvitesIssued() {
email := "increment_test@example.com"
password := "testpassword"
inviteLimit := 5
s.createTestUserWithLimit(email, password, inviteLimit)
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
emailHash := cryptoHelper.EmailHash(email)
var userID int
err := s.pool.QueryRow(s.ctx,
"SELECT id FROM users WHERE email_hash = $1",
emailHash,
).Scan(&userID)
s.Require().NoError(err)
var wg sync.WaitGroup
var successCount int32
goroutines := 20
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
result, err := s.pool.Exec(s.ctx,
"UPDATE users SET invites_issued = invites_issued + 1 WHERE id = $1 AND invites_issued < invites_limit",
userID,
)
if err == nil && result.RowsAffected() > 0 {
atomic.AddInt32(&successCount, 1)
}
}()
}
wg.Wait()
s.T().Logf("Success increments: %d", successCount)
var invitesIssued int
err = s.pool.QueryRow(s.ctx,
"SELECT invites_issued FROM users WHERE id = $1",
userID,
).Scan(&invitesIssued)
s.Require().NoError(err)
s.Equal(inviteLimit, invitesIssued,
"invites_issued должен быть равен лимиту %d", inviteLimit)
s.Equal(inviteLimit, int(successCount),
"Количество успешных инкрементов должно быть равно лимиту %d", inviteLimit)
}
func (s *IntegrationSuite) createTestUserWithLimit(email, password string, inviteLimit int) {
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
encryptedEmail, err := cryptoHelper.Encrypt(email)
s.Require().NoError(err)
encryptedPhone, err := cryptoHelper.Encrypt("+1234567890")
s.Require().NoError(err)
encryptedUserName, err := cryptoHelper.Encrypt("Test User")
s.Require().NoError(err)
emailHash := cryptoHelper.EmailHash(email)
passwordHash := crypto.PasswordHash(password)
query := `
INSERT INTO users (email, email_hash, password_hash, phone, user_name, company_name, balance, payment_status, invites_issued, invites_limit)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (email_hash) DO UPDATE SET invites_issued = 0, invites_limit = $10
`
_, err = s.pool.Exec(s.ctx, query,
encryptedEmail,
emailHash,
passwordHash,
encryptedPhone,
encryptedUserName,
"Test Company",
1000.0,
"active",
0,
inviteLimit,
)
s.Require().NoError(err)
}
func (s *IntegrationSuite) createTestUserWithBalance(email, password string, balance float64) {
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
encryptedEmail, err := cryptoHelper.Encrypt(email)
s.Require().NoError(err)
encryptedPhone, err := cryptoHelper.Encrypt("+1234567890")
s.Require().NoError(err)
encryptedUserName, err := cryptoHelper.Encrypt("Test User")
s.Require().NoError(err)
emailHash := cryptoHelper.EmailHash(email)
passwordHash := crypto.PasswordHash(password)
query := `
INSERT INTO users (email, email_hash, password_hash, phone, user_name, company_name, balance, payment_status, invites_issued, invites_limit)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (email_hash) DO UPDATE SET balance = $7
`
_, err = s.pool.Exec(s.ctx, query,
encryptedEmail,
emailHash,
passwordHash,
encryptedPhone,
encryptedUserName,
"Test Company",
balance,
"active",
0,
10,
)
s.Require().NoError(err)
}
func (s *IntegrationSuite) TestConcurrent_UpdateBalance_ViaUserRepository() {
email := "balance_repo_real_test@example.com"
password := "testpassword"
initialBalance := 1000.0
s.createTestUserWithBalance(email, password, initialBalance)
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
emailHash := cryptoHelper.EmailHash(email)
var userID int
err := s.pool.QueryRow(s.ctx,
"SELECT id FROM users WHERE email_hash = $1",
emailHash,
).Scan(&userID)
s.Require().NoError(err)
userRepo := repository.NewUserRepository(s.pool, testCryptoSecret)
var wg sync.WaitGroup
var successCount int32
var errorCount int32
goroutines := 20
debitAmount := 200.0
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := userRepo.UpdateBalance(s.ctx, userID, -debitAmount)
if err == nil {
atomic.AddInt32(&successCount, 1)
} else {
atomic.AddInt32(&errorCount, 1)
}
}()
}
wg.Wait()
s.T().Logf("Success: %d, Errors: %d", successCount, errorCount)
finalBalance, err := userRepo.GetBalance(s.ctx, userID)
s.Require().NoError(err)
s.T().Logf("Final balance: %.2f", finalBalance)
s.GreaterOrEqual(finalBalance, 0.0,
"Баланс (%f) не должен быть отрицательным", finalBalance)
expectedMaxDebits := int(initialBalance / debitAmount)
s.Equal(expectedMaxDebits, int(successCount),
"Должно быть ровно %d успешных списаний", expectedMaxDebits)
}
func (s *IntegrationSuite) TestConcurrent_RefreshToken_SameSession() {
email := "refresh_concurrent@example.com"
password := "testpassword"
s.createTestUser(email, password)
loginResp, err := s.authClient.Login(s.ctx, &authpb.LoginRequest{
Email: email,
Password: password,
Ip: "127.0.0.1",
UserAgent: "test-agent",
})
s.Require().NoError(err)
var wg sync.WaitGroup
var successCount int32
var errorCount int32
goroutines := 10
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err := s.authClient.Refresh(s.ctx, &authpb.RefreshRequest{
RefreshToken: loginResp.RefreshToken,
})
if err == nil {
atomic.AddInt32(&successCount, 1)
} else {
atomic.AddInt32(&errorCount, 1)
}
}()
}
wg.Wait()
s.T().Logf("Refresh success: %d, errors: %d", successCount, errorCount)
s.GreaterOrEqual(int(successCount), 1, "Как минимум один refresh должен быть успешным")
}
func (s *IntegrationSuite) TestConcurrent_Login_SameUser() {
email := "login_concurrent@example.com"
password := "testpassword"
s.createTestUser(email, password)
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
emailHash := cryptoHelper.EmailHash(email)
var wg sync.WaitGroup
var successCount int32
var errorCount int32
goroutines := 20
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err := s.authClient.Login(s.ctx, &authpb.LoginRequest{
Email: email,
Password: password,
Ip: "127.0.0.1",
UserAgent: "test-agent",
})
if err == nil {
atomic.AddInt32(&successCount, 1)
} else {
atomic.AddInt32(&errorCount, 1)
}
}()
}
wg.Wait()
s.T().Logf("Login success: %d, errors: %d", successCount, errorCount)
s.Equal(int32(goroutines), successCount,
"Все %d login запросов должны быть успешными", goroutines)
var sessionsCount int
err := s.pool.QueryRow(s.ctx,
"SELECT COUNT(*) FROM sessions WHERE user_id = (SELECT id FROM users WHERE email_hash = $1)",
emailHash,
).Scan(&sessionsCount)
s.Require().NoError(err)
s.Equal(goroutines, sessionsCount,
"Должно быть создано %d сессий", goroutines)
}
func (s *IntegrationSuite) TestConcurrent_Logout_SameSession() {
email := "logout_concurrent@example.com"
password := "testpassword"
s.createTestUser(email, password)
loginResp, err := s.authClient.Login(s.ctx, &authpb.LoginRequest{
Email: email,
Password: password,
Ip: "127.0.0.1",
UserAgent: "test-agent",
})
s.Require().NoError(err)
var wg sync.WaitGroup
var successCount int32
goroutines := 10
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err := s.authClient.Logout(s.ctx, &authpb.LogoutRequest{
AccessToken: loginResp.AccessToken,
})
if err == nil {
atomic.AddInt32(&successCount, 1)
}
}()
}
wg.Wait()
s.T().Logf("Logout success: %d", successCount)
s.GreaterOrEqual(int(successCount), 1, "Как минимум один logout должен быть успешным")
}
func (s *IntegrationSuite) TestConcurrent_Validate_AfterLogout() {
email := "validate_concurrent@example.com"
password := "testpassword"
s.createTestUser(email, password)
loginResp, err := s.authClient.Login(s.ctx, &authpb.LoginRequest{
Email: email,
Password: password,
Ip: "127.0.0.1",
UserAgent: "test-agent",
})
s.Require().NoError(err)
var wg sync.WaitGroup
var validateSuccess int32
var validateFailed int32
var logoutDone int32
goroutines := 20
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
if idx == 0 {
time.Sleep(5 * time.Millisecond)
_, err := s.authClient.Logout(s.ctx, &authpb.LogoutRequest{
AccessToken: loginResp.AccessToken,
})
if err == nil {
atomic.AddInt32(&logoutDone, 1)
}
} else {
_, err := s.authClient.Validate(s.ctx, &authpb.ValidateRequest{
AccessToken: loginResp.AccessToken,
})
if err == nil {
atomic.AddInt32(&validateSuccess, 1)
} else {
atomic.AddInt32(&validateFailed, 1)
}
}
}(i)
}
wg.Wait()
s.T().Logf("Validate success: %d, failed: %d, logout done: %d", validateSuccess, validateFailed, logoutDone)
s.Equal(int32(1), logoutDone, "Logout должен быть выполнен")
}
func (s *IntegrationSuite) TestConcurrent_IncrementInvites_ViaRepository() {
email := "increment_repo_test@example.com"
password := "testpassword"
inviteLimit := 5
s.createTestUserWithLimit(email, password, inviteLimit)
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
emailHash := cryptoHelper.EmailHash(email)
var userID int
err := s.pool.QueryRow(s.ctx,
"SELECT id FROM users WHERE email_hash = $1",
emailHash,
).Scan(&userID)
s.Require().NoError(err)
userRepo := repository.NewUserRepository(s.pool, testCryptoSecret)
var wg sync.WaitGroup
var successCount int32
var errorCount int32
goroutines := 20
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := userRepo.IncrementInvitesIssued(s.ctx, userID)
if err == nil {
atomic.AddInt32(&successCount, 1)
} else {
atomic.AddInt32(&errorCount, 1)
}
}()
}
wg.Wait()
s.T().Logf("Success: %d, Errors: %d", successCount, errorCount)
var invitesIssued int
err = s.pool.QueryRow(s.ctx,
"SELECT invites_issued FROM users WHERE id = $1",
userID,
).Scan(&invitesIssued)
s.Require().NoError(err)
s.Equal(inviteLimit, invitesIssued,
"invites_issued должен быть равен лимиту %d", inviteLimit)
s.Equal(inviteLimit, int(successCount),
"Количество успешных инкрементов должно быть равно лимиту %d", inviteLimit)
}
func (s *IntegrationSuite) TestConcurrent_WithTransaction_InviteGenerate() {
email := "tx_invite_test@example.com"
password := "testpassword"
inviteLimit := 3
s.createTestUserWithLimit(email, password, inviteLimit)
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
emailHash := cryptoHelper.EmailHash(email)
var userID int64
err := s.pool.QueryRow(s.ctx,
"SELECT id FROM users WHERE email_hash = $1",
emailHash,
).Scan(&userID)
s.Require().NoError(err)
var wg sync.WaitGroup
var successCount int32
var errorCount int32
goroutines := 50
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err := s.inviteClient.Generate(s.ctx, &invitepb.GenerateRequest{
UserId: userID,
MaxUses: 1,
TtlDays: 7,
})
if err == nil {
atomic.AddInt32(&successCount, 1)
} else {
atomic.AddInt32(&errorCount, 1)
}
}()
}
wg.Wait()
s.T().Logf("Transaction test - Success: %d, Errors: %d", successCount, errorCount)
s.LessOrEqual(int(successCount), inviteLimit,
"Количество успешных генераций (%d) не должно превышать лимит (%d)", successCount, inviteLimit)
var invitesIssued int
var inviteCodesCount int
err = s.pool.QueryRow(s.ctx,
"SELECT invites_issued FROM users WHERE email_hash = $1",
emailHash,
).Scan(&invitesIssued)
s.Require().NoError(err)
err = s.pool.QueryRow(s.ctx,
"SELECT COUNT(*) FROM invite_codes WHERE user_id = $1",
userID,
).Scan(&inviteCodesCount)
s.Require().NoError(err)
s.Equal(invitesIssued, inviteCodesCount,
"invites_issued (%d) должен соответствовать количеству кодов (%d)", invitesIssued, inviteCodesCount)
}
func (s *IntegrationSuite) TestConcurrent_CheckInviteLimit_WithForUpdate() {
email := "check_limit_test@example.com"
password := "testpassword"
inviteLimit := 2
s.createTestUserWithLimit(email, password, inviteLimit)
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
emailHash := cryptoHelper.EmailHash(email)
var userID int
err := s.pool.QueryRow(s.ctx,
"SELECT id FROM users WHERE email_hash = $1",
emailHash,
).Scan(&userID)
s.Require().NoError(err)
var wg sync.WaitGroup
var successCount int32
goroutines := 30
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
tx, err := s.pool.Begin(s.ctx)
if err != nil {
return
}
defer func() { _ = tx.Rollback(s.ctx) }()
var issued, limit int
err = tx.QueryRow(s.ctx,
"SELECT invites_issued, invites_limit FROM users WHERE id = $1 FOR UPDATE",
userID,
).Scan(&issued, &limit)
if err != nil {
return
}
if issued < limit {
atomic.AddInt32(&successCount, 1)
}
_ = tx.Commit(s.ctx)
}()
}
wg.Wait()
s.T().Logf("CheckInviteLimit success count: %d", successCount)
}
func (s *IntegrationSuite) TestConcurrent_CreateTZ_BalanceDeduction() {
email := "create_tz_concurrent@example.com"
password := "testpassword"
initialBalance := 100.0
s.createTestUserWithBalance(email, password, initialBalance)
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
emailHash := cryptoHelper.EmailHash(email)
var userID int
err := s.pool.QueryRow(s.ctx,
"SELECT id FROM users WHERE email_hash = $1",
emailHash,
).Scan(&userID)
s.Require().NoError(err)
var wg sync.WaitGroup
var successCount int32
var errorCount int32
goroutines := 10
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err := s.pool.Exec(s.ctx, `
INSERT INTO requests_for_suppliers (user_id, request_txt)
VALUES ($1, $2)
`, userID, "Test request")
if err == nil {
atomic.AddInt32(&successCount, 1)
} else {
atomic.AddInt32(&errorCount, 1)
}
}()
}
wg.Wait()
s.T().Logf("CreateTZ success: %d, errors: %d", successCount, errorCount)
s.Equal(int32(goroutines), successCount,
"Все %d запросов должны быть успешными", goroutines)
var requestCount int
err = s.pool.QueryRow(s.ctx,
"SELECT COUNT(*) FROM requests_for_suppliers WHERE user_id = $1",
userID,
).Scan(&requestCount)
s.Require().NoError(err)
s.Equal(goroutines, requestCount,
"Должно быть создано %d запросов", goroutines)
}
func (s *IntegrationSuite) TestConcurrent_BalanceDeduction_WithTransaction() {
email := "balance_tx_concurrent@example.com"
password := "testpassword"
initialBalance := 50.0
costPerOperation := 10.0
s.createTestUserWithBalance(email, password, initialBalance)
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
emailHash := cryptoHelper.EmailHash(email)
var userID int
err := s.pool.QueryRow(s.ctx,
"SELECT id FROM users WHERE email_hash = $1",
emailHash,
).Scan(&userID)
s.Require().NoError(err)
userRepo := repository.NewUserRepository(s.pool, testCryptoSecret)
txManager := repository.NewTxManager(s.pool)
var wg sync.WaitGroup
var successCount int32
var errorCount int32
goroutines := 20
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := txManager.WithTx(s.ctx, func(tx pgx.Tx) error {
return userRepo.UpdateBalanceTx(s.ctx, tx, userID, -costPerOperation)
})
if err == nil {
atomic.AddInt32(&successCount, 1)
} else {
atomic.AddInt32(&errorCount, 1)
}
}()
}
wg.Wait()
s.T().Logf("Transaction balance deduction - Success: %d, Errors: %d", successCount, errorCount)
finalBalance, err := userRepo.GetBalance(s.ctx, userID)
s.Require().NoError(err)
s.T().Logf("Final balance: %.2f", finalBalance)
s.GreaterOrEqual(finalBalance, 0.0,
"Баланс не должен быть отрицательным")
expectedMaxOperations := int(initialBalance / costPerOperation)
s.Equal(expectedMaxOperations, int(successCount),
"Должно быть ровно %d успешных операций", expectedMaxOperations)
}
func (s *IntegrationSuite) TestConcurrent_SessionCleanup_WithExpiredSessions() {
email := "session_cleanup@example.com"
password := "testpassword"
s.createTestUser(email, password)
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
emailHash := cryptoHelper.EmailHash(email)
var userID int
err := s.pool.QueryRow(s.ctx,
"SELECT id FROM users WHERE email_hash = $1",
emailHash,
).Scan(&userID)
s.Require().NoError(err)
expiredTime := time.Now().Add(-24 * time.Hour)
for i := 0; i < 10; i++ {
_, err := s.pool.Exec(s.ctx, `
INSERT INTO sessions (user_id, access_token, refresh_token, ip, user_agent, expires_at)
VALUES ($1, $2, $3, $4, $5, $6)
`, userID, fmt.Sprintf("expired_access_%d", i), fmt.Sprintf("expired_refresh_%d", i),
"127.0.0.1", "test-agent", expiredTime)
s.Require().NoError(err)
}
validTime := time.Now().Add(24 * time.Hour)
for i := 0; i < 5; i++ {
_, err := s.pool.Exec(s.ctx, `
INSERT INTO sessions (user_id, access_token, refresh_token, ip, user_agent, expires_at)
VALUES ($1, $2, $3, $4, $5, $6)
`, userID, fmt.Sprintf("valid_access_%d", i), fmt.Sprintf("valid_refresh_%d", i),
"127.0.0.1", "test-agent", validTime)
s.Require().NoError(err)
}
var wg sync.WaitGroup
var cleanupCount int32
goroutines := 5
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
result, err := s.pool.Exec(s.ctx, `
DELETE FROM sessions
WHERE expires_at < now()
OR (revoked_at IS NOT NULL AND revoked_at < now() - interval '30 days')
`)
if err == nil {
atomic.AddInt32(&cleanupCount, int32(result.RowsAffected()))
}
}()
}
wg.Wait()
s.T().Logf("Total cleaned up: %d", cleanupCount)
s.GreaterOrEqual(int(cleanupCount), 10,
"Должно быть удалено как минимум 10 истекших сессий")
var remainingCount int
err = s.pool.QueryRow(s.ctx,
"SELECT COUNT(*) FROM sessions WHERE user_id = $1",
userID,
).Scan(&remainingCount)
s.Require().NoError(err)
s.Equal(5, remainingCount,
"Должно остаться 5 валидных сессий")
}
func (s *IntegrationSuite) TestConcurrent_InviteCleanup_WithExpiredInvites() {
email := "invite_cleanup@example.com"
password := "testpassword"
inviteLimit := 20
s.createTestUserWithLimit(email, password, inviteLimit)
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
emailHash := cryptoHelper.EmailHash(email)
var userID int
err := s.pool.QueryRow(s.ctx,
"SELECT id FROM users WHERE email_hash = $1",
emailHash,
).Scan(&userID)
s.Require().NoError(err)
expiredTime := time.Now().Add(-24 * time.Hour)
for i := 0; i < 10; i++ {
code := int64(10000000 + i)
_, err := s.pool.Exec(s.ctx, `
INSERT INTO invite_codes (user_id, code, can_be_used_count, expires_at, is_active)
VALUES ($1, $2, $3, $4, $5)
`, userID, code, 5, expiredTime, true)
s.Require().NoError(err)
}
validTime := time.Now().Add(24 * time.Hour)
for i := 0; i < 5; i++ {
code := int64(20000000 + i)
_, err := s.pool.Exec(s.ctx, `
INSERT INTO invite_codes (user_id, code, can_be_used_count, expires_at, is_active)
VALUES ($1, $2, $3, $4, $5)
`, userID, code, 5, validTime, true)
s.Require().NoError(err)
}
var wg sync.WaitGroup
var deactivatedCount int32
goroutines := 5
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
result, err := s.pool.Exec(s.ctx, `
UPDATE invite_codes
SET is_active = false
WHERE expires_at < now() AND is_active = true
`)
if err == nil {
atomic.AddInt32(&deactivatedCount, int32(result.RowsAffected()))
}
}()
}
wg.Wait()
s.T().Logf("Total deactivated: %d", deactivatedCount)
s.GreaterOrEqual(int(deactivatedCount), 10,
"Должно быть деактивировано как минимум 10 истекших инвайтов")
var activeCount int
err = s.pool.QueryRow(s.ctx,
"SELECT COUNT(*) FROM invite_codes WHERE user_id = $1 AND is_active = true",
userID,
).Scan(&activeCount)
s.Require().NoError(err)
s.Equal(5, activeCount,
"Должно остаться 5 активных инвайтов")
}
func (s *IntegrationSuite) TestConcurrent_TransactionTimeout_Simulation() {
email := "tx_timeout@example.com"
password := "testpassword"
initialBalance := 1000.0
s.createTestUserWithBalance(email, password, initialBalance)
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
emailHash := cryptoHelper.EmailHash(email)
var userID int
err := s.pool.QueryRow(s.ctx,
"SELECT id FROM users WHERE email_hash = $1",
emailHash,
).Scan(&userID)
s.Require().NoError(err)
var wg sync.WaitGroup
var successCount int32
var blockedCount int32
goroutines := 10
startBarrier := make(chan struct{})
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
<-startBarrier
ctx, cancel := context.WithTimeout(s.ctx, 100*time.Millisecond)
defer cancel()
tx, err := s.pool.Begin(ctx)
if err != nil {
atomic.AddInt32(&blockedCount, 1)
return
}
defer func() { _ = tx.Rollback(ctx) }()
var balance float64
err = tx.QueryRow(ctx,
"SELECT balance FROM users WHERE id = $1 FOR UPDATE",
userID,
).Scan(&balance)
if err != nil {
atomic.AddInt32(&blockedCount, 1)
return
}
if idx%2 == 0 {
time.Sleep(50 * time.Millisecond)
}
_, err = tx.Exec(ctx,
"UPDATE users SET balance = balance - 10 WHERE id = $1",
userID,
)
if err != nil {
atomic.AddInt32(&blockedCount, 1)
return
}
err = tx.Commit(ctx)
if err != nil {
atomic.AddInt32(&blockedCount, 1)
return
}
atomic.AddInt32(&successCount, 1)
}(i)
}
close(startBarrier)
wg.Wait()
s.T().Logf("Transaction timeout test - Success: %d, Blocked/Timeout: %d", successCount, blockedCount)
s.Greater(int(successCount), 0, "Должна быть хотя бы одна успешная транзакция")
var finalBalance float64
err = s.pool.QueryRow(s.ctx,
"SELECT balance FROM users WHERE id = $1",
userID,
).Scan(&finalBalance)
s.Require().NoError(err)
expectedBalance := initialBalance - float64(successCount)*10
s.Equal(expectedBalance, finalBalance,
"Баланс должен соответствовать количеству успешных транзакций")
}