Use row level locking for payouts (#926)
This commit is contained in:
parent
6bbd8c9b16
commit
beaaed6613
22
.sqlx/query-67d494c0b8818b3df09d091400626271156754cabcaad1df7c0d9576b3273a6c.json
generated
Normal file
22
.sqlx/query-67d494c0b8818b3df09d091400626271156754cabcaad1df7c0d9576b3273a6c.json
generated
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
{
|
||||||
|
"db_name": "PostgreSQL",
|
||||||
|
"query": "\n SELECT balance FROM users WHERE id = $1 FOR UPDATE\n ",
|
||||||
|
"describe": {
|
||||||
|
"columns": [
|
||||||
|
{
|
||||||
|
"ordinal": 0,
|
||||||
|
"name": "balance",
|
||||||
|
"type_info": "Numeric"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"parameters": {
|
||||||
|
"Left": [
|
||||||
|
"Int8"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"nullable": [
|
||||||
|
false
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"hash": "67d494c0b8818b3df09d091400626271156754cabcaad1df7c0d9576b3273a6c"
|
||||||
|
}
|
||||||
22
.sqlx/query-a911bd1b5d19d305e5dae51941c169cba3afed4b6c7d9b99fc2d6a0db853cc5c.json
generated
Normal file
22
.sqlx/query-a911bd1b5d19d305e5dae51941c169cba3afed4b6c7d9b99fc2d6a0db853cc5c.json
generated
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
{
|
||||||
|
"db_name": "PostgreSQL",
|
||||||
|
"query": "\n SELECT balance FROM users WHERE id = $1 FOR UPDATE\n ",
|
||||||
|
"describe": {
|
||||||
|
"columns": [
|
||||||
|
{
|
||||||
|
"ordinal": 0,
|
||||||
|
"name": "balance",
|
||||||
|
"type_info": "Numeric"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"parameters": {
|
||||||
|
"Left": [
|
||||||
|
"Int8"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"nullable": [
|
||||||
|
false
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"hash": "a911bd1b5d19d305e5dae51941c169cba3afed4b6c7d9b99fc2d6a0db853cc5c"
|
||||||
|
}
|
||||||
@ -16,12 +16,11 @@ use serde_json::Value;
|
|||||||
use sqlx::postgres::PgQueryResult;
|
use sqlx::postgres::PgQueryResult;
|
||||||
use sqlx::PgPool;
|
use sqlx::PgPool;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use tokio::sync::{Mutex, RwLock};
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
pub struct PayoutsQueue {
|
pub struct PayoutsQueue {
|
||||||
credential: RwLock<Option<PayPalCredentials>>,
|
credential: RwLock<Option<PayPalCredentials>>,
|
||||||
payout_options: RwLock<Option<PayoutMethods>>,
|
payout_options: RwLock<Option<PayoutMethods>>,
|
||||||
pub payouts_locks: Mutex<()>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
@ -48,7 +47,6 @@ impl PayoutsQueue {
|
|||||||
PayoutsQueue {
|
PayoutsQueue {
|
||||||
credential: RwLock::new(None),
|
credential: RwLock::new(None),
|
||||||
payout_options: RwLock::new(None),
|
payout_options: RwLock::new(None),
|
||||||
payouts_locks: Mutex::new(()),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -128,7 +128,14 @@ pub async fn paypal_webhook(
|
|||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
if let Some(result) = result {
|
if let Some(result) = result {
|
||||||
let _guard = payouts.payouts_locks.lock().await;
|
sqlx::query!(
|
||||||
|
"
|
||||||
|
SELECT balance FROM users WHERE id = $1 FOR UPDATE
|
||||||
|
",
|
||||||
|
result.user_id
|
||||||
|
)
|
||||||
|
.fetch_optional(&mut *transaction)
|
||||||
|
.await?;
|
||||||
|
|
||||||
sqlx::query!(
|
sqlx::query!(
|
||||||
"
|
"
|
||||||
@ -194,7 +201,6 @@ pub async fn tremendous_webhook(
|
|||||||
req: HttpRequest,
|
req: HttpRequest,
|
||||||
pool: web::Data<PgPool>,
|
pool: web::Data<PgPool>,
|
||||||
redis: web::Data<RedisPool>,
|
redis: web::Data<RedisPool>,
|
||||||
payouts: web::Data<PayoutsQueue>,
|
|
||||||
body: String,
|
body: String,
|
||||||
) -> Result<HttpResponse, ApiError> {
|
) -> Result<HttpResponse, ApiError> {
|
||||||
let signature = req
|
let signature = req
|
||||||
@ -247,7 +253,14 @@ pub async fn tremendous_webhook(
|
|||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
if let Some(result) = result {
|
if let Some(result) = result {
|
||||||
let _guard = payouts.payouts_locks.lock().await;
|
sqlx::query!(
|
||||||
|
"
|
||||||
|
SELECT balance FROM users WHERE id = $1 FOR UPDATE
|
||||||
|
",
|
||||||
|
result.user_id
|
||||||
|
)
|
||||||
|
.fetch_optional(&mut *transaction)
|
||||||
|
.await?;
|
||||||
|
|
||||||
sqlx::query!(
|
sqlx::query!(
|
||||||
"
|
"
|
||||||
@ -367,8 +380,6 @@ pub async fn create_payout(
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let _guard = payouts_queue.payouts_locks.lock().await;
|
|
||||||
|
|
||||||
if user.balance < body.amount || body.amount < Decimal::ZERO {
|
if user.balance < body.amount || body.amount < Decimal::ZERO {
|
||||||
return Err(ApiError::InvalidInput(
|
return Err(ApiError::InvalidInput(
|
||||||
"You do not have enough funds to make this payout!".to_string(),
|
"You do not have enough funds to make this payout!".to_string(),
|
||||||
@ -398,6 +409,16 @@ pub async fn create_payout(
|
|||||||
}
|
}
|
||||||
|
|
||||||
let mut transaction = pool.begin().await?;
|
let mut transaction = pool.begin().await?;
|
||||||
|
|
||||||
|
sqlx::query!(
|
||||||
|
"
|
||||||
|
SELECT balance FROM users WHERE id = $1 FOR UPDATE
|
||||||
|
",
|
||||||
|
user.id.0
|
||||||
|
)
|
||||||
|
.fetch_optional(&mut *transaction)
|
||||||
|
.await?;
|
||||||
|
|
||||||
let payout_id = generate_payout_id(&mut transaction).await?;
|
let payout_id = generate_payout_id(&mut transaction).await?;
|
||||||
|
|
||||||
let payout_item = match body.method {
|
let payout_item = match body.method {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user