package tests import ( "context" "fmt" "sync" "sync/atomic" "time" "git.techease.ru/Smart-search/smart-search-back/internal/repository" "git.techease.ru/Smart-search/smart-search-back/pkg/crypto" authpb "git.techease.ru/Smart-search/smart-search-back/pkg/pb/auth" invitepb "git.techease.ru/Smart-search/smart-search-back/pkg/pb/invite" "github.com/jackc/pgx/v5" ) func (s *IntegrationSuite) TestConcurrent_GenerateInvites_ExceedsLimit() { email := "invite_limit_test@example.com" password := "testpassword" inviteLimit := 5 s.createTestUserWithLimit(email, password, inviteLimit) cryptoHelper := crypto.NewCrypto(testCryptoSecret) emailHash := cryptoHelper.EmailHash(email) var userID int64 err := s.pool.QueryRow(s.ctx, "SELECT id FROM users WHERE email_hash = $1", emailHash, ).Scan(&userID) s.Require().NoError(err) var wg sync.WaitGroup var successCount int32 var errorCount int32 goroutines := 20 for i := 0; i < goroutines; i++ { wg.Add(1) go func() { defer wg.Done() _, err := s.inviteClient.Generate(s.ctx, &invitepb.GenerateRequest{ UserId: userID, MaxUses: 1, TtlDays: 7, }) if err == nil { atomic.AddInt32(&successCount, 1) } else { atomic.AddInt32(&errorCount, 1) } }() } wg.Wait() s.T().Logf("Success: %d, Errors: %d", successCount, errorCount) s.LessOrEqual(int(successCount), inviteLimit, "Количество успешных генераций (%d) не должно превышать лимит (%d)", successCount, inviteLimit) var invitesIssued int err = s.pool.QueryRow(s.ctx, "SELECT invites_issued FROM users WHERE email_hash = $1", emailHash, ).Scan(&invitesIssued) s.Require().NoError(err) s.LessOrEqual(invitesIssued, inviteLimit, "invites_issued (%d) не должен превышать invites_limit (%d)", invitesIssued, inviteLimit) } func (s *IntegrationSuite) TestConcurrent_UpdateBalance_GoesNegative() { email := "balance_test@example.com" password := "testpassword" initialBalance := 100.0 s.createTestUserWithBalance(email, password, initialBalance) cryptoHelper := crypto.NewCrypto(testCryptoSecret) emailHash := cryptoHelper.EmailHash(email) var wg sync.WaitGroup var successCount int32 var errorCount int32 goroutines := 20 debitAmount := 50.0 for i := 0; i < goroutines; i++ { wg.Add(1) go func() { defer wg.Done() result, err := s.pool.Exec(s.ctx, "UPDATE users SET balance = balance - $1 WHERE email_hash = $2 AND balance - $1 >= 0", debitAmount, emailHash, ) if err == nil && result.RowsAffected() > 0 { atomic.AddInt32(&successCount, 1) } else { atomic.AddInt32(&errorCount, 1) } }() } wg.Wait() s.T().Logf("Success: %d, Errors: %d", successCount, errorCount) var finalBalance float64 err := s.pool.QueryRow(s.ctx, "SELECT balance FROM users WHERE email_hash = $1", emailHash, ).Scan(&finalBalance) s.Require().NoError(err) s.T().Logf("Final balance: %.2f", finalBalance) s.GreaterOrEqual(finalBalance, 0.0, "Баланс (%f) не должен быть отрицательным", finalBalance) maxPossibleDebits := int(initialBalance / debitAmount) s.Equal(maxPossibleDebits, int(successCount), "Должно быть ровно %d успешных списаний при балансе %.0f и списании %.0f", maxPossibleDebits, initialBalance, debitAmount) } func (s *IntegrationSuite) TestConcurrent_UpdateBalance_ViaRepository() { email := "balance_repo_test@example.com" password := "testpassword" initialBalance := 1000.0 s.createTestUserWithBalance(email, password, initialBalance) cryptoHelper := crypto.NewCrypto(testCryptoSecret) emailHash := cryptoHelper.EmailHash(email) var userID int err := s.pool.QueryRow(s.ctx, "SELECT id FROM users WHERE email_hash = $1", emailHash, ).Scan(&userID) s.Require().NoError(err) var wg sync.WaitGroup var successCount int32 var errorCount int32 goroutines := 20 debitAmount := 200.0 for i := 0; i < goroutines; i++ { wg.Add(1) go func() { defer wg.Done() result, err := s.pool.Exec(s.ctx, "UPDATE users SET balance = balance + $1 WHERE id = $2 AND balance + $1 >= 0", -debitAmount, userID, ) if err == nil && result.RowsAffected() > 0 { atomic.AddInt32(&successCount, 1) } else { atomic.AddInt32(&errorCount, 1) } }() } wg.Wait() s.T().Logf("Success: %d, Errors: %d", successCount, errorCount) var finalBalance float64 err = s.pool.QueryRow(s.ctx, "SELECT balance FROM users WHERE id = $1", userID, ).Scan(&finalBalance) s.Require().NoError(err) s.T().Logf("Final balance: %.2f", finalBalance) s.GreaterOrEqual(finalBalance, 0.0, "Баланс (%f) не должен быть отрицательным", finalBalance) expectedMaxDebits := int(initialBalance / debitAmount) s.Equal(expectedMaxDebits, int(successCount), "Должно быть ровно %d успешных списаний при балансе %.0f и списании %.0f", expectedMaxDebits, initialBalance, debitAmount) expectedFinalBalance := initialBalance - float64(successCount)*debitAmount s.Equal(expectedFinalBalance, finalBalance, "Финальный баланс должен быть %.2f", expectedFinalBalance) } func (s *IntegrationSuite) TestConcurrent_IncrementInvitesIssued() { email := "increment_test@example.com" password := "testpassword" inviteLimit := 5 s.createTestUserWithLimit(email, password, inviteLimit) cryptoHelper := crypto.NewCrypto(testCryptoSecret) emailHash := cryptoHelper.EmailHash(email) var userID int err := s.pool.QueryRow(s.ctx, "SELECT id FROM users WHERE email_hash = $1", emailHash, ).Scan(&userID) s.Require().NoError(err) var wg sync.WaitGroup var successCount int32 goroutines := 20 for i := 0; i < goroutines; i++ { wg.Add(1) go func() { defer wg.Done() result, err := s.pool.Exec(s.ctx, "UPDATE users SET invites_issued = invites_issued + 1 WHERE id = $1 AND invites_issued < invites_limit", userID, ) if err == nil && result.RowsAffected() > 0 { atomic.AddInt32(&successCount, 1) } }() } wg.Wait() s.T().Logf("Success increments: %d", successCount) var invitesIssued int err = s.pool.QueryRow(s.ctx, "SELECT invites_issued FROM users WHERE id = $1", userID, ).Scan(&invitesIssued) s.Require().NoError(err) s.Equal(inviteLimit, invitesIssued, "invites_issued должен быть равен лимиту %d", inviteLimit) s.Equal(inviteLimit, int(successCount), "Количество успешных инкрементов должно быть равно лимиту %d", inviteLimit) } func (s *IntegrationSuite) createTestUserWithLimit(email, password string, inviteLimit int) { cryptoHelper := crypto.NewCrypto(testCryptoSecret) encryptedEmail, err := cryptoHelper.Encrypt(email) s.Require().NoError(err) encryptedPhone, err := cryptoHelper.Encrypt("+1234567890") s.Require().NoError(err) encryptedUserName, err := cryptoHelper.Encrypt("Test User") s.Require().NoError(err) emailHash := cryptoHelper.EmailHash(email) passwordHash := crypto.PasswordHash(password) query := ` INSERT INTO users (email, email_hash, password_hash, phone, user_name, company_name, balance, payment_status, invites_issued, invites_limit) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT (email_hash) DO UPDATE SET invites_issued = 0, invites_limit = $10 ` _, err = s.pool.Exec(s.ctx, query, encryptedEmail, emailHash, passwordHash, encryptedPhone, encryptedUserName, "Test Company", 1000.0, "active", 0, inviteLimit, ) s.Require().NoError(err) } func (s *IntegrationSuite) createTestUserWithBalance(email, password string, balance float64) { cryptoHelper := crypto.NewCrypto(testCryptoSecret) encryptedEmail, err := cryptoHelper.Encrypt(email) s.Require().NoError(err) encryptedPhone, err := cryptoHelper.Encrypt("+1234567890") s.Require().NoError(err) encryptedUserName, err := cryptoHelper.Encrypt("Test User") s.Require().NoError(err) emailHash := cryptoHelper.EmailHash(email) passwordHash := crypto.PasswordHash(password) query := ` INSERT INTO users (email, email_hash, password_hash, phone, user_name, company_name, balance, payment_status, invites_issued, invites_limit) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT (email_hash) DO UPDATE SET balance = $7 ` _, err = s.pool.Exec(s.ctx, query, encryptedEmail, emailHash, passwordHash, encryptedPhone, encryptedUserName, "Test Company", balance, "active", 0, 10, ) s.Require().NoError(err) } func (s *IntegrationSuite) TestConcurrent_UpdateBalance_ViaUserRepository() { email := "balance_repo_real_test@example.com" password := "testpassword" initialBalance := 1000.0 s.createTestUserWithBalance(email, password, initialBalance) cryptoHelper := crypto.NewCrypto(testCryptoSecret) emailHash := cryptoHelper.EmailHash(email) var userID int err := s.pool.QueryRow(s.ctx, "SELECT id FROM users WHERE email_hash = $1", emailHash, ).Scan(&userID) s.Require().NoError(err) userRepo := repository.NewUserRepository(s.pool, testCryptoSecret) var wg sync.WaitGroup var successCount int32 var errorCount int32 goroutines := 20 debitAmount := 200.0 for i := 0; i < goroutines; i++ { wg.Add(1) go func() { defer wg.Done() err := userRepo.UpdateBalance(s.ctx, userID, -debitAmount) if err == nil { atomic.AddInt32(&successCount, 1) } else { atomic.AddInt32(&errorCount, 1) } }() } wg.Wait() s.T().Logf("Success: %d, Errors: %d", successCount, errorCount) finalBalance, err := userRepo.GetBalance(s.ctx, userID) s.Require().NoError(err) s.T().Logf("Final balance: %.2f", finalBalance) s.GreaterOrEqual(finalBalance, 0.0, "Баланс (%f) не должен быть отрицательным", finalBalance) expectedMaxDebits := int(initialBalance / debitAmount) s.Equal(expectedMaxDebits, int(successCount), "Должно быть ровно %d успешных списаний", expectedMaxDebits) } func (s *IntegrationSuite) TestConcurrent_RefreshToken_SameSession() { email := "refresh_concurrent@example.com" password := "testpassword" s.createTestUser(email, password) loginResp, err := s.authClient.Login(s.ctx, &authpb.LoginRequest{ Email: email, Password: password, Ip: "127.0.0.1", UserAgent: "test-agent", }) s.Require().NoError(err) var wg sync.WaitGroup var successCount int32 var errorCount int32 goroutines := 10 for i := 0; i < goroutines; i++ { wg.Add(1) go func() { defer wg.Done() _, err := s.authClient.Refresh(s.ctx, &authpb.RefreshRequest{ RefreshToken: loginResp.RefreshToken, }) if err == nil { atomic.AddInt32(&successCount, 1) } else { atomic.AddInt32(&errorCount, 1) } }() } wg.Wait() s.T().Logf("Refresh success: %d, errors: %d", successCount, errorCount) s.GreaterOrEqual(int(successCount), 1, "Как минимум один refresh должен быть успешным") } func (s *IntegrationSuite) TestConcurrent_Login_SameUser() { email := "login_concurrent@example.com" password := "testpassword" s.createTestUser(email, password) cryptoHelper := crypto.NewCrypto(testCryptoSecret) emailHash := cryptoHelper.EmailHash(email) var wg sync.WaitGroup var successCount int32 var errorCount int32 goroutines := 20 for i := 0; i < goroutines; i++ { wg.Add(1) go func() { defer wg.Done() _, err := s.authClient.Login(s.ctx, &authpb.LoginRequest{ Email: email, Password: password, Ip: "127.0.0.1", UserAgent: "test-agent", }) if err == nil { atomic.AddInt32(&successCount, 1) } else { atomic.AddInt32(&errorCount, 1) } }() } wg.Wait() s.T().Logf("Login success: %d, errors: %d", successCount, errorCount) s.Equal(int32(goroutines), successCount, "Все %d login запросов должны быть успешными", goroutines) var sessionsCount int err := s.pool.QueryRow(s.ctx, "SELECT COUNT(*) FROM sessions WHERE user_id = (SELECT id FROM users WHERE email_hash = $1)", emailHash, ).Scan(&sessionsCount) s.Require().NoError(err) s.Equal(goroutines, sessionsCount, "Должно быть создано %d сессий", goroutines) } func (s *IntegrationSuite) TestConcurrent_Logout_SameSession() { email := "logout_concurrent@example.com" password := "testpassword" s.createTestUser(email, password) loginResp, err := s.authClient.Login(s.ctx, &authpb.LoginRequest{ Email: email, Password: password, Ip: "127.0.0.1", UserAgent: "test-agent", }) s.Require().NoError(err) var wg sync.WaitGroup var successCount int32 goroutines := 10 for i := 0; i < goroutines; i++ { wg.Add(1) go func() { defer wg.Done() _, err := s.authClient.Logout(s.ctx, &authpb.LogoutRequest{ AccessToken: loginResp.AccessToken, }) if err == nil { atomic.AddInt32(&successCount, 1) } }() } wg.Wait() s.T().Logf("Logout success: %d", successCount) s.GreaterOrEqual(int(successCount), 1, "Как минимум один logout должен быть успешным") } func (s *IntegrationSuite) TestConcurrent_Validate_AfterLogout() { email := "validate_concurrent@example.com" password := "testpassword" s.createTestUser(email, password) loginResp, err := s.authClient.Login(s.ctx, &authpb.LoginRequest{ Email: email, Password: password, Ip: "127.0.0.1", UserAgent: "test-agent", }) s.Require().NoError(err) var wg sync.WaitGroup var validateSuccess int32 var validateFailed int32 var logoutDone int32 goroutines := 20 for i := 0; i < goroutines; i++ { wg.Add(1) go func(idx int) { defer wg.Done() if idx == 0 { time.Sleep(5 * time.Millisecond) _, err := s.authClient.Logout(s.ctx, &authpb.LogoutRequest{ AccessToken: loginResp.AccessToken, }) if err == nil { atomic.AddInt32(&logoutDone, 1) } } else { _, err := s.authClient.Validate(s.ctx, &authpb.ValidateRequest{ AccessToken: loginResp.AccessToken, }) if err == nil { atomic.AddInt32(&validateSuccess, 1) } else { atomic.AddInt32(&validateFailed, 1) } } }(i) } wg.Wait() s.T().Logf("Validate success: %d, failed: %d, logout done: %d", validateSuccess, validateFailed, logoutDone) s.Equal(int32(1), logoutDone, "Logout должен быть выполнен") } func (s *IntegrationSuite) TestConcurrent_IncrementInvites_ViaRepository() { email := "increment_repo_test@example.com" password := "testpassword" inviteLimit := 5 s.createTestUserWithLimit(email, password, inviteLimit) cryptoHelper := crypto.NewCrypto(testCryptoSecret) emailHash := cryptoHelper.EmailHash(email) var userID int err := s.pool.QueryRow(s.ctx, "SELECT id FROM users WHERE email_hash = $1", emailHash, ).Scan(&userID) s.Require().NoError(err) userRepo := repository.NewUserRepository(s.pool, testCryptoSecret) var wg sync.WaitGroup var successCount int32 var errorCount int32 goroutines := 20 for i := 0; i < goroutines; i++ { wg.Add(1) go func() { defer wg.Done() err := userRepo.IncrementInvitesIssued(s.ctx, userID) if err == nil { atomic.AddInt32(&successCount, 1) } else { atomic.AddInt32(&errorCount, 1) } }() } wg.Wait() s.T().Logf("Success: %d, Errors: %d", successCount, errorCount) var invitesIssued int err = s.pool.QueryRow(s.ctx, "SELECT invites_issued FROM users WHERE id = $1", userID, ).Scan(&invitesIssued) s.Require().NoError(err) s.Equal(inviteLimit, invitesIssued, "invites_issued должен быть равен лимиту %d", inviteLimit) s.Equal(inviteLimit, int(successCount), "Количество успешных инкрементов должно быть равно лимиту %d", inviteLimit) } func (s *IntegrationSuite) TestConcurrent_WithTransaction_InviteGenerate() { email := "tx_invite_test@example.com" password := "testpassword" inviteLimit := 3 s.createTestUserWithLimit(email, password, inviteLimit) cryptoHelper := crypto.NewCrypto(testCryptoSecret) emailHash := cryptoHelper.EmailHash(email) var userID int64 err := s.pool.QueryRow(s.ctx, "SELECT id FROM users WHERE email_hash = $1", emailHash, ).Scan(&userID) s.Require().NoError(err) var wg sync.WaitGroup var successCount int32 var errorCount int32 goroutines := 50 for i := 0; i < goroutines; i++ { wg.Add(1) go func() { defer wg.Done() _, err := s.inviteClient.Generate(s.ctx, &invitepb.GenerateRequest{ UserId: userID, MaxUses: 1, TtlDays: 7, }) if err == nil { atomic.AddInt32(&successCount, 1) } else { atomic.AddInt32(&errorCount, 1) } }() } wg.Wait() s.T().Logf("Transaction test - Success: %d, Errors: %d", successCount, errorCount) s.LessOrEqual(int(successCount), inviteLimit, "Количество успешных генераций (%d) не должно превышать лимит (%d)", successCount, inviteLimit) var invitesIssued int var inviteCodesCount int err = s.pool.QueryRow(s.ctx, "SELECT invites_issued FROM users WHERE email_hash = $1", emailHash, ).Scan(&invitesIssued) s.Require().NoError(err) err = s.pool.QueryRow(s.ctx, "SELECT COUNT(*) FROM invite_codes WHERE user_id = $1", userID, ).Scan(&inviteCodesCount) s.Require().NoError(err) s.Equal(invitesIssued, inviteCodesCount, "invites_issued (%d) должен соответствовать количеству кодов (%d)", invitesIssued, inviteCodesCount) } func (s *IntegrationSuite) TestConcurrent_CheckInviteLimit_WithForUpdate() { email := "check_limit_test@example.com" password := "testpassword" inviteLimit := 2 s.createTestUserWithLimit(email, password, inviteLimit) cryptoHelper := crypto.NewCrypto(testCryptoSecret) emailHash := cryptoHelper.EmailHash(email) var userID int err := s.pool.QueryRow(s.ctx, "SELECT id FROM users WHERE email_hash = $1", emailHash, ).Scan(&userID) s.Require().NoError(err) var wg sync.WaitGroup var successCount int32 goroutines := 30 for i := 0; i < goroutines; i++ { wg.Add(1) go func() { defer wg.Done() tx, err := s.pool.Begin(s.ctx) if err != nil { return } defer func() { _ = tx.Rollback(s.ctx) }() var issued, limit int err = tx.QueryRow(s.ctx, "SELECT invites_issued, invites_limit FROM users WHERE id = $1 FOR UPDATE", userID, ).Scan(&issued, &limit) if err != nil { return } if issued < limit { atomic.AddInt32(&successCount, 1) } _ = tx.Commit(s.ctx) }() } wg.Wait() s.T().Logf("CheckInviteLimit success count: %d", successCount) } func (s *IntegrationSuite) TestConcurrent_CreateTZ_BalanceDeduction() { email := "create_tz_concurrent@example.com" password := "testpassword" initialBalance := 100.0 s.createTestUserWithBalance(email, password, initialBalance) cryptoHelper := crypto.NewCrypto(testCryptoSecret) emailHash := cryptoHelper.EmailHash(email) var userID int err := s.pool.QueryRow(s.ctx, "SELECT id FROM users WHERE email_hash = $1", emailHash, ).Scan(&userID) s.Require().NoError(err) var wg sync.WaitGroup var successCount int32 var errorCount int32 goroutines := 10 for i := 0; i < goroutines; i++ { wg.Add(1) go func() { defer wg.Done() _, err := s.pool.Exec(s.ctx, ` INSERT INTO requests_for_suppliers (user_id, request_txt) VALUES ($1, $2) `, userID, "Test request") if err == nil { atomic.AddInt32(&successCount, 1) } else { atomic.AddInt32(&errorCount, 1) } }() } wg.Wait() s.T().Logf("CreateTZ success: %d, errors: %d", successCount, errorCount) s.Equal(int32(goroutines), successCount, "Все %d запросов должны быть успешными", goroutines) var requestCount int err = s.pool.QueryRow(s.ctx, "SELECT COUNT(*) FROM requests_for_suppliers WHERE user_id = $1", userID, ).Scan(&requestCount) s.Require().NoError(err) s.Equal(goroutines, requestCount, "Должно быть создано %d запросов", goroutines) } func (s *IntegrationSuite) TestConcurrent_BalanceDeduction_WithTransaction() { email := "balance_tx_concurrent@example.com" password := "testpassword" initialBalance := 50.0 costPerOperation := 10.0 s.createTestUserWithBalance(email, password, initialBalance) cryptoHelper := crypto.NewCrypto(testCryptoSecret) emailHash := cryptoHelper.EmailHash(email) var userID int err := s.pool.QueryRow(s.ctx, "SELECT id FROM users WHERE email_hash = $1", emailHash, ).Scan(&userID) s.Require().NoError(err) userRepo := repository.NewUserRepository(s.pool, testCryptoSecret) txManager := repository.NewTxManager(s.pool) var wg sync.WaitGroup var successCount int32 var errorCount int32 goroutines := 20 for i := 0; i < goroutines; i++ { wg.Add(1) go func() { defer wg.Done() err := txManager.WithTx(s.ctx, func(tx pgx.Tx) error { return userRepo.UpdateBalanceTx(s.ctx, tx, userID, -costPerOperation) }) if err == nil { atomic.AddInt32(&successCount, 1) } else { atomic.AddInt32(&errorCount, 1) } }() } wg.Wait() s.T().Logf("Transaction balance deduction - Success: %d, Errors: %d", successCount, errorCount) finalBalance, err := userRepo.GetBalance(s.ctx, userID) s.Require().NoError(err) s.T().Logf("Final balance: %.2f", finalBalance) s.GreaterOrEqual(finalBalance, 0.0, "Баланс не должен быть отрицательным") expectedMaxOperations := int(initialBalance / costPerOperation) s.Equal(expectedMaxOperations, int(successCount), "Должно быть ровно %d успешных операций", expectedMaxOperations) } func (s *IntegrationSuite) TestConcurrent_SessionCleanup_WithExpiredSessions() { email := "session_cleanup@example.com" password := "testpassword" s.createTestUser(email, password) cryptoHelper := crypto.NewCrypto(testCryptoSecret) emailHash := cryptoHelper.EmailHash(email) var userID int err := s.pool.QueryRow(s.ctx, "SELECT id FROM users WHERE email_hash = $1", emailHash, ).Scan(&userID) s.Require().NoError(err) expiredTime := time.Now().Add(-24 * time.Hour) for i := 0; i < 10; i++ { _, err := s.pool.Exec(s.ctx, ` INSERT INTO sessions (user_id, access_token, refresh_token, ip, user_agent, expires_at) VALUES ($1, $2, $3, $4, $5, $6) `, userID, fmt.Sprintf("expired_access_%d", i), fmt.Sprintf("expired_refresh_%d", i), "127.0.0.1", "test-agent", expiredTime) s.Require().NoError(err) } validTime := time.Now().Add(24 * time.Hour) for i := 0; i < 5; i++ { _, err := s.pool.Exec(s.ctx, ` INSERT INTO sessions (user_id, access_token, refresh_token, ip, user_agent, expires_at) VALUES ($1, $2, $3, $4, $5, $6) `, userID, fmt.Sprintf("valid_access_%d", i), fmt.Sprintf("valid_refresh_%d", i), "127.0.0.1", "test-agent", validTime) s.Require().NoError(err) } var wg sync.WaitGroup var cleanupCount int32 goroutines := 5 for i := 0; i < goroutines; i++ { wg.Add(1) go func() { defer wg.Done() result, err := s.pool.Exec(s.ctx, ` DELETE FROM sessions WHERE expires_at < now() OR (revoked_at IS NOT NULL AND revoked_at < now() - interval '30 days') `) if err == nil { atomic.AddInt32(&cleanupCount, int32(result.RowsAffected())) } }() } wg.Wait() s.T().Logf("Total cleaned up: %d", cleanupCount) s.GreaterOrEqual(int(cleanupCount), 10, "Должно быть удалено как минимум 10 истекших сессий") var remainingCount int err = s.pool.QueryRow(s.ctx, "SELECT COUNT(*) FROM sessions WHERE user_id = $1", userID, ).Scan(&remainingCount) s.Require().NoError(err) s.Equal(5, remainingCount, "Должно остаться 5 валидных сессий") } func (s *IntegrationSuite) TestConcurrent_InviteCleanup_WithExpiredInvites() { email := "invite_cleanup@example.com" password := "testpassword" inviteLimit := 20 s.createTestUserWithLimit(email, password, inviteLimit) cryptoHelper := crypto.NewCrypto(testCryptoSecret) emailHash := cryptoHelper.EmailHash(email) var userID int err := s.pool.QueryRow(s.ctx, "SELECT id FROM users WHERE email_hash = $1", emailHash, ).Scan(&userID) s.Require().NoError(err) expiredTime := time.Now().Add(-24 * time.Hour) for i := 0; i < 10; i++ { code := int64(10000000 + i) _, err := s.pool.Exec(s.ctx, ` INSERT INTO invite_codes (user_id, code, can_be_used_count, expires_at, is_active) VALUES ($1, $2, $3, $4, $5) `, userID, code, 5, expiredTime, true) s.Require().NoError(err) } validTime := time.Now().Add(24 * time.Hour) for i := 0; i < 5; i++ { code := int64(20000000 + i) _, err := s.pool.Exec(s.ctx, ` INSERT INTO invite_codes (user_id, code, can_be_used_count, expires_at, is_active) VALUES ($1, $2, $3, $4, $5) `, userID, code, 5, validTime, true) s.Require().NoError(err) } var wg sync.WaitGroup var deactivatedCount int32 goroutines := 5 for i := 0; i < goroutines; i++ { wg.Add(1) go func() { defer wg.Done() result, err := s.pool.Exec(s.ctx, ` UPDATE invite_codes SET is_active = false WHERE expires_at < now() AND is_active = true `) if err == nil { atomic.AddInt32(&deactivatedCount, int32(result.RowsAffected())) } }() } wg.Wait() s.T().Logf("Total deactivated: %d", deactivatedCount) s.GreaterOrEqual(int(deactivatedCount), 10, "Должно быть деактивировано как минимум 10 истекших инвайтов") var activeCount int err = s.pool.QueryRow(s.ctx, "SELECT COUNT(*) FROM invite_codes WHERE user_id = $1 AND is_active = true", userID, ).Scan(&activeCount) s.Require().NoError(err) s.Equal(5, activeCount, "Должно остаться 5 активных инвайтов") } func (s *IntegrationSuite) TestConcurrent_TransactionTimeout_Simulation() { email := "tx_timeout@example.com" password := "testpassword" initialBalance := 1000.0 s.createTestUserWithBalance(email, password, initialBalance) cryptoHelper := crypto.NewCrypto(testCryptoSecret) emailHash := cryptoHelper.EmailHash(email) var userID int err := s.pool.QueryRow(s.ctx, "SELECT id FROM users WHERE email_hash = $1", emailHash, ).Scan(&userID) s.Require().NoError(err) var wg sync.WaitGroup var successCount int32 var blockedCount int32 goroutines := 10 startBarrier := make(chan struct{}) for i := 0; i < goroutines; i++ { wg.Add(1) go func(idx int) { defer wg.Done() <-startBarrier ctx, cancel := context.WithTimeout(s.ctx, 100*time.Millisecond) defer cancel() tx, err := s.pool.Begin(ctx) if err != nil { atomic.AddInt32(&blockedCount, 1) return } defer func() { _ = tx.Rollback(ctx) }() var balance float64 err = tx.QueryRow(ctx, "SELECT balance FROM users WHERE id = $1 FOR UPDATE", userID, ).Scan(&balance) if err != nil { atomic.AddInt32(&blockedCount, 1) return } if idx%2 == 0 { time.Sleep(50 * time.Millisecond) } _, err = tx.Exec(ctx, "UPDATE users SET balance = balance - 10 WHERE id = $1", userID, ) if err != nil { atomic.AddInt32(&blockedCount, 1) return } err = tx.Commit(ctx) if err != nil { atomic.AddInt32(&blockedCount, 1) return } atomic.AddInt32(&successCount, 1) }(i) } close(startBarrier) wg.Wait() s.T().Logf("Transaction timeout test - Success: %d, Blocked/Timeout: %d", successCount, blockedCount) s.Greater(int(successCount), 0, "Должна быть хотя бы одна успешная транзакция") var finalBalance float64 err = s.pool.QueryRow(s.ctx, "SELECT balance FROM users WHERE id = $1", userID, ).Scan(&finalBalance) s.Require().NoError(err) expectedBalance := initialBalance - float64(successCount)*10 s.Equal(expectedBalance, finalBalance, "Баланс должен соответствовать количеству успешных транзакций") }