run migrate

This commit is contained in:
Jai A 2025-04-03 17:47:30 -07:00
parent 69b70d70a8
commit 13f2961e43
No known key found for this signature in database
GPG Key ID: 9A9F9B7250E9883C
3 changed files with 17 additions and 14 deletions

View File

@ -1,6 +1,6 @@
{ {
"db_name": "PostgreSQL", "db_name": "PostgreSQL",
"query": "\n SELECT\n id, user_id, price_id, amount, currency_code, status, due, last_attempt,\n charge_type, subscription_id,\n -- Workaround for https://github.com/launchbadge/sqlx/issues/3336\n subscription_interval AS \"subscription_interval?\",\n payment_platform,\n payment_platform_id AS \"payment_platform_id?\",\n parent_charge_id AS \"parent_charge_id?\",\n net AS \"net?\"\n FROM charges\n \n WHERE\n charge_type = $1 AND\n (\n (status = 'open' AND due < NOW()) OR\n (status = 'failed' AND last_attempt < NOW() - INTERVAL '2 days')\n )\n FOR UPDATE SKIP LOCKED\n ", "query": "\n SELECT\n id, user_id, price_id, amount, currency_code, status, due, last_attempt,\n charge_type, subscription_id,\n -- Workaround for https://github.com/launchbadge/sqlx/issues/3336\n subscription_interval AS \"subscription_interval?\",\n payment_platform,\n payment_platform_id AS \"payment_platform_id?\",\n parent_charge_id AS \"parent_charge_id?\",\n net AS \"net?\"\n FROM charges\n \n WHERE\n charge_type = $1 AND\n (\n (status = 'open' AND due < NOW()) OR\n (status = 'failed' AND last_attempt < NOW() - INTERVAL '2 days')\n )\n ",
"describe": { "describe": {
"columns": [ "columns": [
{ {
@ -102,5 +102,5 @@
true true
] ]
}, },
"hash": "6cd0cfece6d87d039bb22379045509824d2c9eee5e1864a15186e713394f5926" "hash": "8f51f552d1c63fa818cbdba581691e97f693a187ed05224a44adeee68c6809d1"
} }

View File

@ -206,8 +206,8 @@ impl ChargeItem {
Ok(res.and_then(|r| r.try_into().ok())) Ok(res.and_then(|r| r.try_into().ok()))
} }
pub async fn get_chargeable_lock( pub async fn get_chargeable(
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>, exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Vec<ChargeItem>, DatabaseError> { ) -> Result<Vec<ChargeItem>, DatabaseError> {
let charge_type = ChargeType::Subscription.as_str(); let charge_type = ChargeType::Subscription.as_str();
let res = select_charges_with_predicate!( let res = select_charges_with_predicate!(
@ -218,11 +218,10 @@ impl ChargeItem {
(status = 'open' AND due < NOW()) OR (status = 'open' AND due < NOW()) OR
(status = 'failed' AND last_attempt < NOW() - INTERVAL '2 days') (status = 'failed' AND last_attempt < NOW() - INTERVAL '2 days')
) )
-- FOR UPDATE SKIP LOCKED
"#, "#,
charge_type charge_type
) )
.fetch_all(&mut **transaction) .fetch_all(exec)
.await?; .await?;
Ok(res Ok(res

View File

@ -2271,11 +2271,9 @@ pub async fn index_billing(
info!("Indexing billing queue"); info!("Indexing billing queue");
let res = async { let res = async {
// If a charge is open and due or has been attempted more than two days ago, it should be processed // If a charge is open and due or has been attempted more than two days ago, it should be processed
let mut transaction = pool.begin().await?;
let charges_to_do = let charges_to_do =
crate::database::models::charge_item::ChargeItem::get_chargeable_lock( crate::database::models::charge_item::ChargeItem::get_chargeable(
&mut transaction, &pool,
) )
.await?; .await?;
@ -2396,8 +2394,14 @@ pub async fn index_billing(
charge.status = ChargeStatus::Processing; charge.status = ChargeStatus::Processing;
if let Err(e) = stripe::PaymentIntent::create(&stripe_client, intent).await { if let Err(e) =
tracing::error!("Failed to create payment intent: {:?}", e); stripe::PaymentIntent::create(&stripe_client, intent)
.await
{
tracing::error!(
"Failed to create payment intent: {:?}",
e
);
charge.status = ChargeStatus::Failed; charge.status = ChargeStatus::Failed;
charge.last_attempt = Some(Utc::now()); charge.last_attempt = Some(Utc::now());
} }
@ -2406,11 +2410,11 @@ pub async fn index_billing(
charge.last_attempt = Some(Utc::now()); charge.last_attempt = Some(Utc::now());
} }
let mut transaction = pool.begin().await?;
charge.upsert(&mut transaction).await?; charge.upsert(&mut transaction).await?;
}
}
transaction.commit().await?; transaction.commit().await?;
}
}
Ok::<(), ApiError>(()) Ok::<(), ApiError>(())
} }