1106 lines
28 KiB
Go
1106 lines
28 KiB
Go
package tests
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"git.techease.ru/Smart-search/smart-search-back/internal/repository"
|
||
"git.techease.ru/Smart-search/smart-search-back/pkg/crypto"
|
||
authpb "git.techease.ru/Smart-search/smart-search-back/pkg/pb/auth"
|
||
invitepb "git.techease.ru/Smart-search/smart-search-back/pkg/pb/invite"
|
||
"github.com/jackc/pgx/v5"
|
||
)
|
||
|
||
func (s *IntegrationSuite) TestConcurrent_GenerateInvites_ExceedsLimit() {
|
||
email := "invite_limit_test@example.com"
|
||
password := "testpassword"
|
||
inviteLimit := 5
|
||
|
||
s.createTestUserWithLimit(email, password, inviteLimit)
|
||
|
||
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
|
||
emailHash := cryptoHelper.EmailHash(email)
|
||
|
||
var userID int64
|
||
err := s.pool.QueryRow(s.ctx,
|
||
"SELECT id FROM users WHERE email_hash = $1",
|
||
emailHash,
|
||
).Scan(&userID)
|
||
s.Require().NoError(err)
|
||
|
||
var wg sync.WaitGroup
|
||
var successCount int32
|
||
var errorCount int32
|
||
goroutines := 20
|
||
|
||
for i := 0; i < goroutines; i++ {
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
|
||
_, err := s.inviteClient.Generate(s.ctx, &invitepb.GenerateRequest{
|
||
UserId: userID,
|
||
MaxUses: 1,
|
||
TtlDays: 7,
|
||
})
|
||
|
||
if err == nil {
|
||
atomic.AddInt32(&successCount, 1)
|
||
} else {
|
||
atomic.AddInt32(&errorCount, 1)
|
||
}
|
||
}()
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
s.T().Logf("Success: %d, Errors: %d", successCount, errorCount)
|
||
|
||
s.LessOrEqual(int(successCount), inviteLimit,
|
||
"Количество успешных генераций (%d) не должно превышать лимит (%d)", successCount, inviteLimit)
|
||
|
||
var invitesIssued int
|
||
err = s.pool.QueryRow(s.ctx,
|
||
"SELECT invites_issued FROM users WHERE email_hash = $1",
|
||
emailHash,
|
||
).Scan(&invitesIssued)
|
||
s.Require().NoError(err)
|
||
|
||
s.LessOrEqual(invitesIssued, inviteLimit,
|
||
"invites_issued (%d) не должен превышать invites_limit (%d)", invitesIssued, inviteLimit)
|
||
}
|
||
|
||
func (s *IntegrationSuite) TestConcurrent_UpdateBalance_GoesNegative() {
|
||
email := "balance_test@example.com"
|
||
password := "testpassword"
|
||
initialBalance := 100.0
|
||
|
||
s.createTestUserWithBalance(email, password, initialBalance)
|
||
|
||
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
|
||
emailHash := cryptoHelper.EmailHash(email)
|
||
|
||
var wg sync.WaitGroup
|
||
var successCount int32
|
||
var errorCount int32
|
||
goroutines := 20
|
||
debitAmount := 50.0
|
||
|
||
for i := 0; i < goroutines; i++ {
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
|
||
result, err := s.pool.Exec(s.ctx,
|
||
"UPDATE users SET balance = balance - $1 WHERE email_hash = $2 AND balance - $1 >= 0",
|
||
debitAmount, emailHash,
|
||
)
|
||
|
||
if err == nil && result.RowsAffected() > 0 {
|
||
atomic.AddInt32(&successCount, 1)
|
||
} else {
|
||
atomic.AddInt32(&errorCount, 1)
|
||
}
|
||
}()
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
s.T().Logf("Success: %d, Errors: %d", successCount, errorCount)
|
||
|
||
var finalBalance float64
|
||
err := s.pool.QueryRow(s.ctx,
|
||
"SELECT balance FROM users WHERE email_hash = $1",
|
||
emailHash,
|
||
).Scan(&finalBalance)
|
||
s.Require().NoError(err)
|
||
|
||
s.T().Logf("Final balance: %.2f", finalBalance)
|
||
|
||
s.GreaterOrEqual(finalBalance, 0.0,
|
||
"Баланс (%f) не должен быть отрицательным", finalBalance)
|
||
|
||
maxPossibleDebits := int(initialBalance / debitAmount)
|
||
s.Equal(maxPossibleDebits, int(successCount),
|
||
"Должно быть ровно %d успешных списаний при балансе %.0f и списании %.0f",
|
||
maxPossibleDebits, initialBalance, debitAmount)
|
||
}
|
||
|
||
func (s *IntegrationSuite) TestConcurrent_UpdateBalance_ViaRepository() {
|
||
email := "balance_repo_test@example.com"
|
||
password := "testpassword"
|
||
initialBalance := 1000.0
|
||
|
||
s.createTestUserWithBalance(email, password, initialBalance)
|
||
|
||
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
|
||
emailHash := cryptoHelper.EmailHash(email)
|
||
|
||
var userID int
|
||
err := s.pool.QueryRow(s.ctx,
|
||
"SELECT id FROM users WHERE email_hash = $1",
|
||
emailHash,
|
||
).Scan(&userID)
|
||
s.Require().NoError(err)
|
||
|
||
var wg sync.WaitGroup
|
||
var successCount int32
|
||
var errorCount int32
|
||
goroutines := 20
|
||
debitAmount := 200.0
|
||
|
||
for i := 0; i < goroutines; i++ {
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
|
||
result, err := s.pool.Exec(s.ctx,
|
||
"UPDATE users SET balance = balance + $1 WHERE id = $2 AND balance + $1 >= 0",
|
||
-debitAmount, userID,
|
||
)
|
||
|
||
if err == nil && result.RowsAffected() > 0 {
|
||
atomic.AddInt32(&successCount, 1)
|
||
} else {
|
||
atomic.AddInt32(&errorCount, 1)
|
||
}
|
||
}()
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
s.T().Logf("Success: %d, Errors: %d", successCount, errorCount)
|
||
|
||
var finalBalance float64
|
||
err = s.pool.QueryRow(s.ctx,
|
||
"SELECT balance FROM users WHERE id = $1",
|
||
userID,
|
||
).Scan(&finalBalance)
|
||
s.Require().NoError(err)
|
||
|
||
s.T().Logf("Final balance: %.2f", finalBalance)
|
||
|
||
s.GreaterOrEqual(finalBalance, 0.0,
|
||
"Баланс (%f) не должен быть отрицательным", finalBalance)
|
||
|
||
expectedMaxDebits := int(initialBalance / debitAmount)
|
||
s.Equal(expectedMaxDebits, int(successCount),
|
||
"Должно быть ровно %d успешных списаний при балансе %.0f и списании %.0f",
|
||
expectedMaxDebits, initialBalance, debitAmount)
|
||
|
||
expectedFinalBalance := initialBalance - float64(successCount)*debitAmount
|
||
s.Equal(expectedFinalBalance, finalBalance,
|
||
"Финальный баланс должен быть %.2f", expectedFinalBalance)
|
||
}
|
||
|
||
func (s *IntegrationSuite) TestConcurrent_IncrementInvitesIssued() {
|
||
email := "increment_test@example.com"
|
||
password := "testpassword"
|
||
inviteLimit := 5
|
||
|
||
s.createTestUserWithLimit(email, password, inviteLimit)
|
||
|
||
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
|
||
emailHash := cryptoHelper.EmailHash(email)
|
||
|
||
var userID int
|
||
err := s.pool.QueryRow(s.ctx,
|
||
"SELECT id FROM users WHERE email_hash = $1",
|
||
emailHash,
|
||
).Scan(&userID)
|
||
s.Require().NoError(err)
|
||
|
||
var wg sync.WaitGroup
|
||
var successCount int32
|
||
goroutines := 20
|
||
|
||
for i := 0; i < goroutines; i++ {
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
|
||
result, err := s.pool.Exec(s.ctx,
|
||
"UPDATE users SET invites_issued = invites_issued + 1 WHERE id = $1 AND invites_issued < invites_limit",
|
||
userID,
|
||
)
|
||
|
||
if err == nil && result.RowsAffected() > 0 {
|
||
atomic.AddInt32(&successCount, 1)
|
||
}
|
||
}()
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
s.T().Logf("Success increments: %d", successCount)
|
||
|
||
var invitesIssued int
|
||
err = s.pool.QueryRow(s.ctx,
|
||
"SELECT invites_issued FROM users WHERE id = $1",
|
||
userID,
|
||
).Scan(&invitesIssued)
|
||
s.Require().NoError(err)
|
||
|
||
s.Equal(inviteLimit, invitesIssued,
|
||
"invites_issued должен быть равен лимиту %d", inviteLimit)
|
||
|
||
s.Equal(inviteLimit, int(successCount),
|
||
"Количество успешных инкрементов должно быть равно лимиту %d", inviteLimit)
|
||
}
|
||
|
||
func (s *IntegrationSuite) createTestUserWithLimit(email, password string, inviteLimit int) {
|
||
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
|
||
|
||
encryptedEmail, err := cryptoHelper.Encrypt(email)
|
||
s.Require().NoError(err)
|
||
|
||
encryptedPhone, err := cryptoHelper.Encrypt("+1234567890")
|
||
s.Require().NoError(err)
|
||
|
||
encryptedUserName, err := cryptoHelper.Encrypt("Test User")
|
||
s.Require().NoError(err)
|
||
|
||
emailHash := cryptoHelper.EmailHash(email)
|
||
passwordHash := crypto.PasswordHash(password)
|
||
|
||
query := `
|
||
INSERT INTO users (email, email_hash, password_hash, phone, user_name, company_name, balance, payment_status, invites_issued, invites_limit)
|
||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
||
ON CONFLICT (email_hash) DO UPDATE SET invites_issued = 0, invites_limit = $10
|
||
`
|
||
|
||
_, err = s.pool.Exec(s.ctx, query,
|
||
encryptedEmail,
|
||
emailHash,
|
||
passwordHash,
|
||
encryptedPhone,
|
||
encryptedUserName,
|
||
"Test Company",
|
||
1000.0,
|
||
"active",
|
||
0,
|
||
inviteLimit,
|
||
)
|
||
s.Require().NoError(err)
|
||
}
|
||
|
||
func (s *IntegrationSuite) createTestUserWithBalance(email, password string, balance float64) {
|
||
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
|
||
|
||
encryptedEmail, err := cryptoHelper.Encrypt(email)
|
||
s.Require().NoError(err)
|
||
|
||
encryptedPhone, err := cryptoHelper.Encrypt("+1234567890")
|
||
s.Require().NoError(err)
|
||
|
||
encryptedUserName, err := cryptoHelper.Encrypt("Test User")
|
||
s.Require().NoError(err)
|
||
|
||
emailHash := cryptoHelper.EmailHash(email)
|
||
passwordHash := crypto.PasswordHash(password)
|
||
|
||
query := `
|
||
INSERT INTO users (email, email_hash, password_hash, phone, user_name, company_name, balance, payment_status, invites_issued, invites_limit)
|
||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
||
ON CONFLICT (email_hash) DO UPDATE SET balance = $7
|
||
`
|
||
|
||
_, err = s.pool.Exec(s.ctx, query,
|
||
encryptedEmail,
|
||
emailHash,
|
||
passwordHash,
|
||
encryptedPhone,
|
||
encryptedUserName,
|
||
"Test Company",
|
||
balance,
|
||
"active",
|
||
0,
|
||
10,
|
||
)
|
||
s.Require().NoError(err)
|
||
}
|
||
|
||
func (s *IntegrationSuite) TestConcurrent_UpdateBalance_ViaUserRepository() {
|
||
email := "balance_repo_real_test@example.com"
|
||
password := "testpassword"
|
||
initialBalance := 1000.0
|
||
|
||
s.createTestUserWithBalance(email, password, initialBalance)
|
||
|
||
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
|
||
emailHash := cryptoHelper.EmailHash(email)
|
||
|
||
var userID int
|
||
err := s.pool.QueryRow(s.ctx,
|
||
"SELECT id FROM users WHERE email_hash = $1",
|
||
emailHash,
|
||
).Scan(&userID)
|
||
s.Require().NoError(err)
|
||
|
||
userRepo := repository.NewUserRepository(s.pool, testCryptoSecret)
|
||
|
||
var wg sync.WaitGroup
|
||
var successCount int32
|
||
var errorCount int32
|
||
goroutines := 20
|
||
debitAmount := 200.0
|
||
|
||
for i := 0; i < goroutines; i++ {
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
|
||
err := userRepo.UpdateBalance(s.ctx, userID, -debitAmount)
|
||
if err == nil {
|
||
atomic.AddInt32(&successCount, 1)
|
||
} else {
|
||
atomic.AddInt32(&errorCount, 1)
|
||
}
|
||
}()
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
s.T().Logf("Success: %d, Errors: %d", successCount, errorCount)
|
||
|
||
finalBalance, err := userRepo.GetBalance(s.ctx, userID)
|
||
s.Require().NoError(err)
|
||
|
||
s.T().Logf("Final balance: %.2f", finalBalance)
|
||
|
||
s.GreaterOrEqual(finalBalance, 0.0,
|
||
"Баланс (%f) не должен быть отрицательным", finalBalance)
|
||
|
||
expectedMaxDebits := int(initialBalance / debitAmount)
|
||
s.Equal(expectedMaxDebits, int(successCount),
|
||
"Должно быть ровно %d успешных списаний", expectedMaxDebits)
|
||
}
|
||
|
||
func (s *IntegrationSuite) TestConcurrent_RefreshToken_SameSession() {
|
||
email := "refresh_concurrent@example.com"
|
||
password := "testpassword"
|
||
s.createTestUser(email, password)
|
||
|
||
loginResp, err := s.authClient.Login(s.ctx, &authpb.LoginRequest{
|
||
Email: email,
|
||
Password: password,
|
||
Ip: "127.0.0.1",
|
||
UserAgent: "test-agent",
|
||
})
|
||
s.Require().NoError(err)
|
||
|
||
var wg sync.WaitGroup
|
||
var successCount int32
|
||
var errorCount int32
|
||
goroutines := 10
|
||
|
||
for i := 0; i < goroutines; i++ {
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
|
||
_, err := s.authClient.Refresh(s.ctx, &authpb.RefreshRequest{
|
||
RefreshToken: loginResp.RefreshToken,
|
||
})
|
||
|
||
if err == nil {
|
||
atomic.AddInt32(&successCount, 1)
|
||
} else {
|
||
atomic.AddInt32(&errorCount, 1)
|
||
}
|
||
}()
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
s.T().Logf("Refresh success: %d, errors: %d", successCount, errorCount)
|
||
|
||
s.GreaterOrEqual(int(successCount), 1, "Как минимум один refresh должен быть успешным")
|
||
}
|
||
|
||
func (s *IntegrationSuite) TestConcurrent_Login_SameUser() {
|
||
email := "login_concurrent@example.com"
|
||
password := "testpassword"
|
||
s.createTestUser(email, password)
|
||
|
||
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
|
||
emailHash := cryptoHelper.EmailHash(email)
|
||
|
||
var wg sync.WaitGroup
|
||
var successCount int32
|
||
var errorCount int32
|
||
goroutines := 20
|
||
|
||
for i := 0; i < goroutines; i++ {
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
|
||
_, err := s.authClient.Login(s.ctx, &authpb.LoginRequest{
|
||
Email: email,
|
||
Password: password,
|
||
Ip: "127.0.0.1",
|
||
UserAgent: "test-agent",
|
||
})
|
||
|
||
if err == nil {
|
||
atomic.AddInt32(&successCount, 1)
|
||
} else {
|
||
atomic.AddInt32(&errorCount, 1)
|
||
}
|
||
}()
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
s.T().Logf("Login success: %d, errors: %d", successCount, errorCount)
|
||
|
||
s.Equal(int32(goroutines), successCount,
|
||
"Все %d login запросов должны быть успешными", goroutines)
|
||
|
||
var sessionsCount int
|
||
err := s.pool.QueryRow(s.ctx,
|
||
"SELECT COUNT(*) FROM sessions WHERE user_id = (SELECT id FROM users WHERE email_hash = $1)",
|
||
emailHash,
|
||
).Scan(&sessionsCount)
|
||
s.Require().NoError(err)
|
||
|
||
s.Equal(goroutines, sessionsCount,
|
||
"Должно быть создано %d сессий", goroutines)
|
||
}
|
||
|
||
func (s *IntegrationSuite) TestConcurrent_Logout_SameSession() {
|
||
email := "logout_concurrent@example.com"
|
||
password := "testpassword"
|
||
s.createTestUser(email, password)
|
||
|
||
loginResp, err := s.authClient.Login(s.ctx, &authpb.LoginRequest{
|
||
Email: email,
|
||
Password: password,
|
||
Ip: "127.0.0.1",
|
||
UserAgent: "test-agent",
|
||
})
|
||
s.Require().NoError(err)
|
||
|
||
var wg sync.WaitGroup
|
||
var successCount int32
|
||
goroutines := 10
|
||
|
||
for i := 0; i < goroutines; i++ {
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
|
||
_, err := s.authClient.Logout(s.ctx, &authpb.LogoutRequest{
|
||
AccessToken: loginResp.AccessToken,
|
||
})
|
||
|
||
if err == nil {
|
||
atomic.AddInt32(&successCount, 1)
|
||
}
|
||
}()
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
s.T().Logf("Logout success: %d", successCount)
|
||
|
||
s.GreaterOrEqual(int(successCount), 1, "Как минимум один logout должен быть успешным")
|
||
}
|
||
|
||
func (s *IntegrationSuite) TestConcurrent_Validate_AfterLogout() {
|
||
email := "validate_concurrent@example.com"
|
||
password := "testpassword"
|
||
s.createTestUser(email, password)
|
||
|
||
loginResp, err := s.authClient.Login(s.ctx, &authpb.LoginRequest{
|
||
Email: email,
|
||
Password: password,
|
||
Ip: "127.0.0.1",
|
||
UserAgent: "test-agent",
|
||
})
|
||
s.Require().NoError(err)
|
||
|
||
var wg sync.WaitGroup
|
||
var validateSuccess int32
|
||
var validateFailed int32
|
||
var logoutDone int32
|
||
goroutines := 20
|
||
|
||
for i := 0; i < goroutines; i++ {
|
||
wg.Add(1)
|
||
go func(idx int) {
|
||
defer wg.Done()
|
||
|
||
if idx == 0 {
|
||
time.Sleep(5 * time.Millisecond)
|
||
_, err := s.authClient.Logout(s.ctx, &authpb.LogoutRequest{
|
||
AccessToken: loginResp.AccessToken,
|
||
})
|
||
if err == nil {
|
||
atomic.AddInt32(&logoutDone, 1)
|
||
}
|
||
} else {
|
||
_, err := s.authClient.Validate(s.ctx, &authpb.ValidateRequest{
|
||
AccessToken: loginResp.AccessToken,
|
||
})
|
||
|
||
if err == nil {
|
||
atomic.AddInt32(&validateSuccess, 1)
|
||
} else {
|
||
atomic.AddInt32(&validateFailed, 1)
|
||
}
|
||
}
|
||
}(i)
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
s.T().Logf("Validate success: %d, failed: %d, logout done: %d", validateSuccess, validateFailed, logoutDone)
|
||
|
||
s.Equal(int32(1), logoutDone, "Logout должен быть выполнен")
|
||
}
|
||
|
||
func (s *IntegrationSuite) TestConcurrent_IncrementInvites_ViaRepository() {
|
||
email := "increment_repo_test@example.com"
|
||
password := "testpassword"
|
||
inviteLimit := 5
|
||
|
||
s.createTestUserWithLimit(email, password, inviteLimit)
|
||
|
||
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
|
||
emailHash := cryptoHelper.EmailHash(email)
|
||
|
||
var userID int
|
||
err := s.pool.QueryRow(s.ctx,
|
||
"SELECT id FROM users WHERE email_hash = $1",
|
||
emailHash,
|
||
).Scan(&userID)
|
||
s.Require().NoError(err)
|
||
|
||
userRepo := repository.NewUserRepository(s.pool, testCryptoSecret)
|
||
|
||
var wg sync.WaitGroup
|
||
var successCount int32
|
||
var errorCount int32
|
||
goroutines := 20
|
||
|
||
for i := 0; i < goroutines; i++ {
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
|
||
err := userRepo.IncrementInvitesIssued(s.ctx, userID)
|
||
if err == nil {
|
||
atomic.AddInt32(&successCount, 1)
|
||
} else {
|
||
atomic.AddInt32(&errorCount, 1)
|
||
}
|
||
}()
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
s.T().Logf("Success: %d, Errors: %d", successCount, errorCount)
|
||
|
||
var invitesIssued int
|
||
err = s.pool.QueryRow(s.ctx,
|
||
"SELECT invites_issued FROM users WHERE id = $1",
|
||
userID,
|
||
).Scan(&invitesIssued)
|
||
s.Require().NoError(err)
|
||
|
||
s.Equal(inviteLimit, invitesIssued,
|
||
"invites_issued должен быть равен лимиту %d", inviteLimit)
|
||
|
||
s.Equal(inviteLimit, int(successCount),
|
||
"Количество успешных инкрементов должно быть равно лимиту %d", inviteLimit)
|
||
}
|
||
|
||
func (s *IntegrationSuite) TestConcurrent_WithTransaction_InviteGenerate() {
|
||
email := "tx_invite_test@example.com"
|
||
password := "testpassword"
|
||
inviteLimit := 3
|
||
|
||
s.createTestUserWithLimit(email, password, inviteLimit)
|
||
|
||
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
|
||
emailHash := cryptoHelper.EmailHash(email)
|
||
|
||
var userID int64
|
||
err := s.pool.QueryRow(s.ctx,
|
||
"SELECT id FROM users WHERE email_hash = $1",
|
||
emailHash,
|
||
).Scan(&userID)
|
||
s.Require().NoError(err)
|
||
|
||
var wg sync.WaitGroup
|
||
var successCount int32
|
||
var errorCount int32
|
||
goroutines := 50
|
||
|
||
for i := 0; i < goroutines; i++ {
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
|
||
_, err := s.inviteClient.Generate(s.ctx, &invitepb.GenerateRequest{
|
||
UserId: userID,
|
||
MaxUses: 1,
|
||
TtlDays: 7,
|
||
})
|
||
|
||
if err == nil {
|
||
atomic.AddInt32(&successCount, 1)
|
||
} else {
|
||
atomic.AddInt32(&errorCount, 1)
|
||
}
|
||
}()
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
s.T().Logf("Transaction test - Success: %d, Errors: %d", successCount, errorCount)
|
||
|
||
s.LessOrEqual(int(successCount), inviteLimit,
|
||
"Количество успешных генераций (%d) не должно превышать лимит (%d)", successCount, inviteLimit)
|
||
|
||
var invitesIssued int
|
||
var inviteCodesCount int
|
||
err = s.pool.QueryRow(s.ctx,
|
||
"SELECT invites_issued FROM users WHERE email_hash = $1",
|
||
emailHash,
|
||
).Scan(&invitesIssued)
|
||
s.Require().NoError(err)
|
||
|
||
err = s.pool.QueryRow(s.ctx,
|
||
"SELECT COUNT(*) FROM invite_codes WHERE user_id = $1",
|
||
userID,
|
||
).Scan(&inviteCodesCount)
|
||
s.Require().NoError(err)
|
||
|
||
s.Equal(invitesIssued, inviteCodesCount,
|
||
"invites_issued (%d) должен соответствовать количеству кодов (%d)", invitesIssued, inviteCodesCount)
|
||
}
|
||
|
||
func (s *IntegrationSuite) TestConcurrent_CheckInviteLimit_WithForUpdate() {
|
||
email := "check_limit_test@example.com"
|
||
password := "testpassword"
|
||
inviteLimit := 2
|
||
|
||
s.createTestUserWithLimit(email, password, inviteLimit)
|
||
|
||
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
|
||
emailHash := cryptoHelper.EmailHash(email)
|
||
|
||
var userID int
|
||
err := s.pool.QueryRow(s.ctx,
|
||
"SELECT id FROM users WHERE email_hash = $1",
|
||
emailHash,
|
||
).Scan(&userID)
|
||
s.Require().NoError(err)
|
||
|
||
var wg sync.WaitGroup
|
||
var successCount int32
|
||
goroutines := 30
|
||
|
||
for i := 0; i < goroutines; i++ {
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
|
||
tx, err := s.pool.Begin(s.ctx)
|
||
if err != nil {
|
||
return
|
||
}
|
||
defer func() { _ = tx.Rollback(s.ctx) }()
|
||
|
||
var issued, limit int
|
||
err = tx.QueryRow(s.ctx,
|
||
"SELECT invites_issued, invites_limit FROM users WHERE id = $1 FOR UPDATE",
|
||
userID,
|
||
).Scan(&issued, &limit)
|
||
if err != nil {
|
||
return
|
||
}
|
||
|
||
if issued < limit {
|
||
atomic.AddInt32(&successCount, 1)
|
||
}
|
||
|
||
_ = tx.Commit(s.ctx)
|
||
}()
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
s.T().Logf("CheckInviteLimit success count: %d", successCount)
|
||
}
|
||
|
||
func (s *IntegrationSuite) TestConcurrent_CreateTZ_BalanceDeduction() {
|
||
email := "create_tz_concurrent@example.com"
|
||
password := "testpassword"
|
||
initialBalance := 100.0
|
||
|
||
s.createTestUserWithBalance(email, password, initialBalance)
|
||
|
||
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
|
||
emailHash := cryptoHelper.EmailHash(email)
|
||
|
||
var userID int
|
||
err := s.pool.QueryRow(s.ctx,
|
||
"SELECT id FROM users WHERE email_hash = $1",
|
||
emailHash,
|
||
).Scan(&userID)
|
||
s.Require().NoError(err)
|
||
|
||
var wg sync.WaitGroup
|
||
var successCount int32
|
||
var errorCount int32
|
||
goroutines := 10
|
||
|
||
for i := 0; i < goroutines; i++ {
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
|
||
_, err := s.pool.Exec(s.ctx, `
|
||
INSERT INTO requests_for_suppliers (user_id, request_txt)
|
||
VALUES ($1, $2)
|
||
`, userID, "Test request")
|
||
|
||
if err == nil {
|
||
atomic.AddInt32(&successCount, 1)
|
||
} else {
|
||
atomic.AddInt32(&errorCount, 1)
|
||
}
|
||
}()
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
s.T().Logf("CreateTZ success: %d, errors: %d", successCount, errorCount)
|
||
|
||
s.Equal(int32(goroutines), successCount,
|
||
"Все %d запросов должны быть успешными", goroutines)
|
||
|
||
var requestCount int
|
||
err = s.pool.QueryRow(s.ctx,
|
||
"SELECT COUNT(*) FROM requests_for_suppliers WHERE user_id = $1",
|
||
userID,
|
||
).Scan(&requestCount)
|
||
s.Require().NoError(err)
|
||
|
||
s.Equal(goroutines, requestCount,
|
||
"Должно быть создано %d запросов", goroutines)
|
||
}
|
||
|
||
func (s *IntegrationSuite) TestConcurrent_BalanceDeduction_WithTransaction() {
|
||
email := "balance_tx_concurrent@example.com"
|
||
password := "testpassword"
|
||
initialBalance := 50.0
|
||
costPerOperation := 10.0
|
||
|
||
s.createTestUserWithBalance(email, password, initialBalance)
|
||
|
||
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
|
||
emailHash := cryptoHelper.EmailHash(email)
|
||
|
||
var userID int
|
||
err := s.pool.QueryRow(s.ctx,
|
||
"SELECT id FROM users WHERE email_hash = $1",
|
||
emailHash,
|
||
).Scan(&userID)
|
||
s.Require().NoError(err)
|
||
|
||
userRepo := repository.NewUserRepository(s.pool, testCryptoSecret)
|
||
txManager := repository.NewTxManager(s.pool)
|
||
|
||
var wg sync.WaitGroup
|
||
var successCount int32
|
||
var errorCount int32
|
||
goroutines := 20
|
||
|
||
for i := 0; i < goroutines; i++ {
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
|
||
err := txManager.WithTx(s.ctx, func(tx pgx.Tx) error {
|
||
return userRepo.UpdateBalanceTx(s.ctx, tx, userID, -costPerOperation)
|
||
})
|
||
|
||
if err == nil {
|
||
atomic.AddInt32(&successCount, 1)
|
||
} else {
|
||
atomic.AddInt32(&errorCount, 1)
|
||
}
|
||
}()
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
s.T().Logf("Transaction balance deduction - Success: %d, Errors: %d", successCount, errorCount)
|
||
|
||
finalBalance, err := userRepo.GetBalance(s.ctx, userID)
|
||
s.Require().NoError(err)
|
||
|
||
s.T().Logf("Final balance: %.2f", finalBalance)
|
||
|
||
s.GreaterOrEqual(finalBalance, 0.0,
|
||
"Баланс не должен быть отрицательным")
|
||
|
||
expectedMaxOperations := int(initialBalance / costPerOperation)
|
||
s.Equal(expectedMaxOperations, int(successCount),
|
||
"Должно быть ровно %d успешных операций", expectedMaxOperations)
|
||
}
|
||
|
||
func (s *IntegrationSuite) TestConcurrent_SessionCleanup_WithExpiredSessions() {
|
||
email := "session_cleanup@example.com"
|
||
password := "testpassword"
|
||
s.createTestUser(email, password)
|
||
|
||
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
|
||
emailHash := cryptoHelper.EmailHash(email)
|
||
|
||
var userID int
|
||
err := s.pool.QueryRow(s.ctx,
|
||
"SELECT id FROM users WHERE email_hash = $1",
|
||
emailHash,
|
||
).Scan(&userID)
|
||
s.Require().NoError(err)
|
||
|
||
expiredTime := time.Now().Add(-24 * time.Hour)
|
||
for i := 0; i < 10; i++ {
|
||
_, err := s.pool.Exec(s.ctx, `
|
||
INSERT INTO sessions (user_id, access_token, refresh_token, ip, user_agent, expires_at)
|
||
VALUES ($1, $2, $3, $4, $5, $6)
|
||
`, userID, fmt.Sprintf("expired_access_%d", i), fmt.Sprintf("expired_refresh_%d", i),
|
||
"127.0.0.1", "test-agent", expiredTime)
|
||
s.Require().NoError(err)
|
||
}
|
||
|
||
validTime := time.Now().Add(24 * time.Hour)
|
||
for i := 0; i < 5; i++ {
|
||
_, err := s.pool.Exec(s.ctx, `
|
||
INSERT INTO sessions (user_id, access_token, refresh_token, ip, user_agent, expires_at)
|
||
VALUES ($1, $2, $3, $4, $5, $6)
|
||
`, userID, fmt.Sprintf("valid_access_%d", i), fmt.Sprintf("valid_refresh_%d", i),
|
||
"127.0.0.1", "test-agent", validTime)
|
||
s.Require().NoError(err)
|
||
}
|
||
|
||
var wg sync.WaitGroup
|
||
var cleanupCount int32
|
||
goroutines := 5
|
||
|
||
for i := 0; i < goroutines; i++ {
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
|
||
result, err := s.pool.Exec(s.ctx, `
|
||
DELETE FROM sessions
|
||
WHERE expires_at < now()
|
||
OR (revoked_at IS NOT NULL AND revoked_at < now() - interval '30 days')
|
||
`)
|
||
|
||
if err == nil {
|
||
atomic.AddInt32(&cleanupCount, int32(result.RowsAffected()))
|
||
}
|
||
}()
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
s.T().Logf("Total cleaned up: %d", cleanupCount)
|
||
|
||
s.GreaterOrEqual(int(cleanupCount), 10,
|
||
"Должно быть удалено как минимум 10 истекших сессий")
|
||
|
||
var remainingCount int
|
||
err = s.pool.QueryRow(s.ctx,
|
||
"SELECT COUNT(*) FROM sessions WHERE user_id = $1",
|
||
userID,
|
||
).Scan(&remainingCount)
|
||
s.Require().NoError(err)
|
||
|
||
s.Equal(5, remainingCount,
|
||
"Должно остаться 5 валидных сессий")
|
||
}
|
||
|
||
func (s *IntegrationSuite) TestConcurrent_InviteCleanup_WithExpiredInvites() {
|
||
email := "invite_cleanup@example.com"
|
||
password := "testpassword"
|
||
inviteLimit := 20
|
||
|
||
s.createTestUserWithLimit(email, password, inviteLimit)
|
||
|
||
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
|
||
emailHash := cryptoHelper.EmailHash(email)
|
||
|
||
var userID int
|
||
err := s.pool.QueryRow(s.ctx,
|
||
"SELECT id FROM users WHERE email_hash = $1",
|
||
emailHash,
|
||
).Scan(&userID)
|
||
s.Require().NoError(err)
|
||
|
||
expiredTime := time.Now().Add(-24 * time.Hour)
|
||
for i := 0; i < 10; i++ {
|
||
code := int64(10000000 + i)
|
||
_, err := s.pool.Exec(s.ctx, `
|
||
INSERT INTO invite_codes (user_id, code, can_be_used_count, expires_at, is_active)
|
||
VALUES ($1, $2, $3, $4, $5)
|
||
`, userID, code, 5, expiredTime, true)
|
||
s.Require().NoError(err)
|
||
}
|
||
|
||
validTime := time.Now().Add(24 * time.Hour)
|
||
for i := 0; i < 5; i++ {
|
||
code := int64(20000000 + i)
|
||
_, err := s.pool.Exec(s.ctx, `
|
||
INSERT INTO invite_codes (user_id, code, can_be_used_count, expires_at, is_active)
|
||
VALUES ($1, $2, $3, $4, $5)
|
||
`, userID, code, 5, validTime, true)
|
||
s.Require().NoError(err)
|
||
}
|
||
|
||
var wg sync.WaitGroup
|
||
var deactivatedCount int32
|
||
goroutines := 5
|
||
|
||
for i := 0; i < goroutines; i++ {
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
|
||
result, err := s.pool.Exec(s.ctx, `
|
||
UPDATE invite_codes
|
||
SET is_active = false
|
||
WHERE expires_at < now() AND is_active = true
|
||
`)
|
||
|
||
if err == nil {
|
||
atomic.AddInt32(&deactivatedCount, int32(result.RowsAffected()))
|
||
}
|
||
}()
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
s.T().Logf("Total deactivated: %d", deactivatedCount)
|
||
|
||
s.GreaterOrEqual(int(deactivatedCount), 10,
|
||
"Должно быть деактивировано как минимум 10 истекших инвайтов")
|
||
|
||
var activeCount int
|
||
err = s.pool.QueryRow(s.ctx,
|
||
"SELECT COUNT(*) FROM invite_codes WHERE user_id = $1 AND is_active = true",
|
||
userID,
|
||
).Scan(&activeCount)
|
||
s.Require().NoError(err)
|
||
|
||
s.Equal(5, activeCount,
|
||
"Должно остаться 5 активных инвайтов")
|
||
}
|
||
|
||
func (s *IntegrationSuite) TestConcurrent_TransactionTimeout_Simulation() {
|
||
email := "tx_timeout@example.com"
|
||
password := "testpassword"
|
||
initialBalance := 1000.0
|
||
|
||
s.createTestUserWithBalance(email, password, initialBalance)
|
||
|
||
cryptoHelper := crypto.NewCrypto(testCryptoSecret)
|
||
emailHash := cryptoHelper.EmailHash(email)
|
||
|
||
var userID int
|
||
err := s.pool.QueryRow(s.ctx,
|
||
"SELECT id FROM users WHERE email_hash = $1",
|
||
emailHash,
|
||
).Scan(&userID)
|
||
s.Require().NoError(err)
|
||
|
||
var wg sync.WaitGroup
|
||
var successCount int32
|
||
var blockedCount int32
|
||
goroutines := 10
|
||
|
||
startBarrier := make(chan struct{})
|
||
|
||
for i := 0; i < goroutines; i++ {
|
||
wg.Add(1)
|
||
go func(idx int) {
|
||
defer wg.Done()
|
||
|
||
<-startBarrier
|
||
|
||
ctx, cancel := context.WithTimeout(s.ctx, 100*time.Millisecond)
|
||
defer cancel()
|
||
|
||
tx, err := s.pool.Begin(ctx)
|
||
if err != nil {
|
||
atomic.AddInt32(&blockedCount, 1)
|
||
return
|
||
}
|
||
defer func() { _ = tx.Rollback(ctx) }()
|
||
|
||
var balance float64
|
||
err = tx.QueryRow(ctx,
|
||
"SELECT balance FROM users WHERE id = $1 FOR UPDATE",
|
||
userID,
|
||
).Scan(&balance)
|
||
|
||
if err != nil {
|
||
atomic.AddInt32(&blockedCount, 1)
|
||
return
|
||
}
|
||
|
||
if idx%2 == 0 {
|
||
time.Sleep(50 * time.Millisecond)
|
||
}
|
||
|
||
_, err = tx.Exec(ctx,
|
||
"UPDATE users SET balance = balance - 10 WHERE id = $1",
|
||
userID,
|
||
)
|
||
|
||
if err != nil {
|
||
atomic.AddInt32(&blockedCount, 1)
|
||
return
|
||
}
|
||
|
||
err = tx.Commit(ctx)
|
||
if err != nil {
|
||
atomic.AddInt32(&blockedCount, 1)
|
||
return
|
||
}
|
||
|
||
atomic.AddInt32(&successCount, 1)
|
||
}(i)
|
||
}
|
||
|
||
close(startBarrier)
|
||
|
||
wg.Wait()
|
||
|
||
s.T().Logf("Transaction timeout test - Success: %d, Blocked/Timeout: %d", successCount, blockedCount)
|
||
|
||
s.Greater(int(successCount), 0, "Должна быть хотя бы одна успешная транзакция")
|
||
|
||
var finalBalance float64
|
||
err = s.pool.QueryRow(s.ctx,
|
||
"SELECT balance FROM users WHERE id = $1",
|
||
userID,
|
||
).Scan(&finalBalance)
|
||
s.Require().NoError(err)
|
||
|
||
expectedBalance := initialBalance - float64(successCount)*10
|
||
s.Equal(expectedBalance, finalBalance,
|
||
"Баланс должен соответствовать количеству успешных транзакций")
|
||
}
|