Compare commits

...

18 Commits

Author SHA1 Message Date
fetch
8546efd572
Add tags field to Archon /create request 2025-08-07 16:57:01 -04:00
fetch
2796d770f7
Additional comments 2025-08-07 00:46:51 -04:00
fetch
6d39122ca9
Patch expiring charge on promotion, comments 2025-08-07 00:44:42 -04:00
fetch
09e89724de
Promote to full subscription, fmt + clippy 2025-08-07 00:16:36 -04:00
fetch
7a39f5853f
Merge branch 'main' into fetch/offer-redemption-preview-subscriptions 2025-08-06 20:20:36 -04:00
fetch
e8639510aa
Use a queue 2025-08-06 20:19:45 -04:00
fetch
d6ee0c42c8
Query cache 2025-08-06 04:10:09 -04:00
fetch
c1fc072efe
Consider due expiring charge as unprovisionable 2025-08-06 04:09:26 -04:00
fetch
3f36a67bc8
Unprovision Medal subscriptions 2025-08-06 04:05:16 -04:00
fetch
b0443dc49d
Fix race condition 2025-08-06 03:41:03 -04:00
fetch
4981151cea
Query cache 2025-08-06 03:32:00 -04:00
fetch
360d24f2e0
Create server on redeem 2025-08-06 03:31:39 -04:00
fetch
84cfd21920
5 days subscription interval, metadata 2025-08-04 21:42:53 -04:00
fetch
158f5171fc
Add partner subscription type 2025-08-04 21:07:55 -04:00
fetch
1fd21e99c3
Query cache 2025-08-04 20:32:23 -04:00
fetch
da0fed3e21
Add public column to products prices, only expose public prices 2025-08-04 20:31:49 -04:00
fetch
b65a16adff
Add guard to /redeem 2025-08-04 19:49:34 -04:00
fetch
6909f4a678
Initial db migration/impl, guarded partner routes 2025-08-04 19:48:28 -04:00
26 changed files with 1406 additions and 147 deletions

View File

@ -0,0 +1,34 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT id, metadata, unitary\n FROM products\n WHERE metadata ->> 'type' = $1",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Int8"
},
{
"ordinal": 1,
"name": "metadata",
"type_info": "Jsonb"
},
{
"ordinal": 2,
"name": "unitary",
"type_info": "Bool"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false,
false,
false
]
},
"hash": "139ba392ab53d975e63e2c328abad04831b4bed925bded054bb8a35d0680bed8"
}

View File

@ -0,0 +1,15 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE users_redeemals\n SET status = $1\n WHERE\n status = $2\n AND NOW() - last_attempt > INTERVAL '5 minutes'\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Varchar",
"Text"
]
},
"nullable": []
},
"hash": "1dea22589b0440cfeaf98b6869bdaad852d58c61cf2a1affb01acc4984d42341"
}

View File

@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT\n id, user_id, price_id, amount, currency_code, status, due, last_attempt,\n charge_type, subscription_id,\n -- Workaround for https://github.com/launchbadge/sqlx/issues/3336\n subscription_interval AS \"subscription_interval?\",\n payment_platform,\n payment_platform_id AS \"payment_platform_id?\",\n parent_charge_id AS \"parent_charge_id?\",\n net AS \"net?\"\n FROM charges\n WHERE subscription_id = $1 AND (status = 'open' OR status = 'cancelled' OR status = 'failed')",
"query": "\n SELECT\n id, user_id, price_id, amount, currency_code, status, due, last_attempt,\n charge_type, subscription_id,\n -- Workaround for https://github.com/launchbadge/sqlx/issues/3336\n subscription_interval AS \"subscription_interval?\",\n payment_platform,\n payment_platform_id AS \"payment_platform_id?\",\n parent_charge_id AS \"parent_charge_id?\",\n net AS \"net?\"\n FROM charges\n WHERE subscription_id = $1 AND (status = 'open' OR status = 'expiring' OR status = 'cancelled' OR status = 'failed')",
"describe": {
"columns": [
{
@ -102,5 +102,5 @@
true
]
},
"hash": "cf020daa52a1316e5f60d197196b880b72c0b2a576e470d9fd7182558103d055"
"hash": "2e18682890f7ec5a618991c2a4c77ca9546970f314f902a5197eb2d189cf81f7"
}

View File

@ -0,0 +1,59 @@
{
"db_name": "PostgreSQL",
"query": "SELECT * FROM users_redeemals WHERE status = $1 LIMIT $2",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Int4"
},
{
"ordinal": 1,
"name": "user_id",
"type_info": "Int8"
},
{
"ordinal": 2,
"name": "offer",
"type_info": "Varchar"
},
{
"ordinal": 3,
"name": "redeemed",
"type_info": "Timestamptz"
},
{
"ordinal": 4,
"name": "status",
"type_info": "Varchar"
},
{
"ordinal": 5,
"name": "last_attempt",
"type_info": "Timestamptz"
},
{
"ordinal": 6,
"name": "n_attempts",
"type_info": "Int4"
}
],
"parameters": {
"Left": [
"Text",
"Int8"
]
},
"nullable": [
false,
false,
false,
false,
false,
true,
false
]
},
"hash": "58ccda393820a272d72a3e41eccc5db30ab6ad0bb346caf781efdb5aab524286"
}

View File

@ -0,0 +1,27 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO users_redeemals\n (user_id, offer, redeemed, status, last_attempt, n_attempts)\n VALUES ($1, $2, $3, $4, $5, $6)\n RETURNING id",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Int4"
}
],
"parameters": {
"Left": [
"Int8",
"Varchar",
"Timestamptz",
"Varchar",
"Timestamptz",
"Int4"
]
},
"nullable": [
false
]
},
"hash": "7adff98b270adc4a48e2c8a89a32ca1b83104102190597f4cda05e6f1c1e8f26"
}

View File

@ -0,0 +1,19 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE users_redeemals\n SET\n offer = $2,\n status = $3,\n redeemed = $4,\n last_attempt = $5,\n n_attempts = $6\n WHERE id = $1\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int4",
"Varchar",
"Varchar",
"Timestamptz",
"Timestamptz",
"Int4"
]
},
"nullable": []
},
"hash": "8d61d1ecc5321e2ac8932ef99de0f77e49cced9c7726ea746392a5fcbe75f2f5"
}

View File

@ -0,0 +1,29 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT\n users.id,\n users_redeemals.status AS \"status: Option<String>\"\n FROM\n users\n LEFT JOIN\n users_redeemals ON users_redeemals.user_id = users.id\n AND users_redeemals.offer = $2\n WHERE\n users.username = $1\n ORDER BY\n users_redeemals.redeemed DESC\n LIMIT 1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Int8"
},
{
"ordinal": 1,
"name": "status: Option<String>",
"type_info": "Varchar"
}
],
"parameters": {
"Left": [
"Text",
"Text"
]
},
"nullable": [
false,
false
]
},
"hash": "949da1b1e3c772f79dd1248f99774fa39f140d3943f975067799f46f2cb48a0f"
}

View File

@ -0,0 +1,23 @@
{
"db_name": "PostgreSQL",
"query": "SELECT\n EXISTS (\n SELECT\n 1\n FROM\n users_redeemals\n WHERE\n user_id = $1\n AND offer = $2\n ) AS \"exists!\"\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "exists!",
"type_info": "Bool"
}
],
"parameters": {
"Left": [
"Int8",
"Text"
]
},
"nullable": [
null
]
},
"hash": "9898e9962ba497ef8482ffa57d6590f7933e9f2465e9458fab005fe33d96ec7a"
}

View File

@ -0,0 +1,41 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT id, product_id, prices, currency_code\n FROM products_prices\n WHERE product_id = ANY($1::bigint[])\n AND public = $2",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Int8"
},
{
"ordinal": 1,
"name": "product_id",
"type_info": "Int8"
},
{
"ordinal": 2,
"name": "prices",
"type_info": "Jsonb"
},
{
"ordinal": 3,
"name": "currency_code",
"type_info": "Text"
}
],
"parameters": {
"Left": [
"Int8Array",
"Bool"
]
},
"nullable": [
false,
false,
false,
false
]
},
"hash": "c37fc91df7619ac5c10fd04fdc2556aa98b80ccbfc53813659464a0e5e09fae8"
}

View File

@ -0,0 +1,18 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE users_redeemals\n SET\n status = $3,\n last_attempt = $4,\n n_attempts = $5\n WHERE id = $1 AND status = $2\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int4",
"Text",
"Varchar",
"Timestamptz",
"Int4"
]
},
"nullable": []
},
"hash": "e3f6fa7e5ec6dee4fcdff904b3e692dccd55372d9cc827a1d68361fd036bc183"
}

View File

@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT\n id, user_id, price_id, amount, currency_code, status, due, last_attempt,\n charge_type, subscription_id,\n -- Workaround for https://github.com/launchbadge/sqlx/issues/3336\n subscription_interval AS \"subscription_interval?\",\n payment_platform,\n payment_platform_id AS \"payment_platform_id?\",\n parent_charge_id AS \"parent_charge_id?\",\n net AS \"net?\"\n FROM charges\n \n WHERE\n charge_type = $1 AND\n (\n (status = 'cancelled' AND due < NOW()) OR\n (status = 'failed' AND last_attempt < NOW() - INTERVAL '2 days')\n )\n ",
"query": "\n SELECT\n id, user_id, price_id, amount, currency_code, status, due, last_attempt,\n charge_type, subscription_id,\n -- Workaround for https://github.com/launchbadge/sqlx/issues/3336\n subscription_interval AS \"subscription_interval?\",\n payment_platform,\n payment_platform_id AS \"payment_platform_id?\",\n parent_charge_id AS \"parent_charge_id?\",\n net AS \"net?\"\n FROM charges\n \n WHERE\n charge_type = $1 AND\n (\n (status = 'cancelled' AND due < NOW()) OR\n (status = 'expiring' AND due < NOW()) OR\n (status = 'failed' AND last_attempt < NOW() - INTERVAL '2 days')\n )\n ",
"describe": {
"columns": [
{
@ -102,5 +102,5 @@
true
]
},
"hash": "bfcbcadda1e323d56b6a21fc060c56bff2f38a54cf65dd1cc21f209240c7091b"
"hash": "fda7d5659efb2b6940a3247043945503c85e3f167216e0e2403e08095a3e32c9"
}

View File

@ -0,0 +1,11 @@
-- Add migration script here
CREATE TABLE IF NOT EXISTS users_redeemals (
id SERIAL PRIMARY KEY,
user_id BIGINT NOT NULL REFERENCES users(id),
offer VARCHAR NOT NULL,
redeemed TIMESTAMP WITH TIME ZONE NOT NULL,
status VARCHAR NOT NULL,
last_attempt TIMESTAMP WITH TIME ZONE,
n_attempts INTEGER NOT NULL
);

View File

@ -0,0 +1,6 @@
-- Add migration script here
ALTER TABLE
products_prices
ADD COLUMN
public BOOLEAN NOT NULL DEFAULT true;

View File

@ -197,7 +197,7 @@ impl DBCharge {
) -> Result<Option<DBCharge>, DatabaseError> {
let user_subscription_id = user_subscription_id.0;
let res = select_charges_with_predicate!(
"WHERE subscription_id = $1 AND (status = 'open' OR status = 'cancelled' OR status = 'failed')",
"WHERE subscription_id = $1 AND (status = 'open' OR status = 'expiring' OR status = 'cancelled' OR status = 'failed')",
user_subscription_id
)
.fetch_optional(exec)
@ -240,6 +240,7 @@ impl DBCharge {
charge_type = $1 AND
(
(status = 'cancelled' AND due < NOW()) OR
(status = 'expiring' AND due < NOW()) OR
(status = 'failed' AND last_attempt < NOW() - INTERVAL '2 days')
)
"#,

View File

@ -25,6 +25,7 @@ pub mod team_item;
pub mod thread_item;
pub mod user_item;
pub mod user_subscription_item;
pub mod users_redeemals;
pub mod version_item;
pub use collection_item::DBCollection;

View File

@ -57,6 +57,26 @@ impl DBProduct {
Ok(Self::get_many(&[id], exec).await?.into_iter().next())
}
pub async fn get_by_type<'a, E>(
exec: E,
r#type: &str,
) -> Result<Vec<Self>, DatabaseError>
where
E: sqlx::PgExecutor<'a>,
{
let maybe_row = select_products_with_predicate!(
"WHERE metadata ->> 'type' = $1",
r#type
)
.fetch_all(exec)
.await?;
maybe_row
.into_iter()
.map(|r| r.try_into().map_err(Into::into))
.collect()
}
pub async fn get_many(
ids: &[DBProductId],
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
@ -100,10 +120,11 @@ pub struct QueryProductWithPrices {
}
impl QueryProductWithPrices {
pub async fn list<'a, E>(
/// Lists products with at least one public price.
pub async fn list_purchaseable<'a, E>(
exec: E,
redis: &RedisPool,
) -> Result<Vec<QueryProductWithPrices>, DatabaseError>
) -> Result<Vec<Self>, DatabaseError>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
{
@ -118,7 +139,51 @@ impl QueryProductWithPrices {
}
let all_products = product_item::DBProduct::get_all(exec).await?;
let prices = product_item::DBProductPrice::get_all_products_prices(
let prices =
product_item::DBProductPrice::get_all_public_products_prices(
&all_products.iter().map(|x| x.id).collect::<Vec<_>>(),
exec,
)
.await?;
let products = all_products
.into_iter()
.filter_map(|x| {
Some(QueryProductWithPrices {
id: x.id,
metadata: x.metadata,
prices: prices
.remove(&x.id)
.map(|x| x.1)?
.into_iter()
.map(|x| DBProductPrice {
id: x.id,
product_id: x.product_id,
prices: x.prices,
currency_code: x.currency_code,
})
.collect(),
unitary: x.unitary,
})
})
.collect::<Vec<_>>();
redis
.set_serialized_to_json(PRODUCTS_NAMESPACE, "all", &products, None)
.await?;
Ok(products)
}
pub async fn list_by_product_type<'a, E>(
exec: E,
r#type: &str,
) -> Result<Vec<Self>, DatabaseError>
where
E: sqlx::PgExecutor<'a> + Copy,
{
let all_products = DBProduct::get_by_type(exec, r#type).await?;
let prices = DBProductPrice::get_all_products_prices(
&all_products.iter().map(|x| x.id).collect::<Vec<_>>(),
exec,
)
@ -126,29 +191,26 @@ impl QueryProductWithPrices {
let products = all_products
.into_iter()
.map(|x| QueryProductWithPrices {
id: x.id,
metadata: x.metadata,
prices: prices
.remove(&x.id)
.map(|x| x.1)
.unwrap_or_default()
.into_iter()
.map(|x| DBProductPrice {
id: x.id,
product_id: x.product_id,
prices: x.prices,
currency_code: x.currency_code,
})
.collect(),
unitary: x.unitary,
.filter_map(|x| {
Some(QueryProductWithPrices {
id: x.id,
metadata: x.metadata,
prices: prices
.remove(&x.id)
.map(|x| x.1)?
.into_iter()
.map(|x| DBProductPrice {
id: x.id,
product_id: x.product_id,
prices: x.prices,
currency_code: x.currency_code,
})
.collect(),
unitary: x.unitary,
})
})
.collect::<Vec<_>>();
redis
.set_serialized_to_json(PRODUCTS_NAMESPACE, "all", &products, None)
.await?;
Ok(products)
}
}
@ -169,7 +231,11 @@ struct ProductPriceQueryResult {
}
macro_rules! select_prices_with_predicate {
($predicate:tt, $param:ident) => {
($predicate:tt, $param1:ident) => {
select_prices_with_predicate!($predicate, $param1, )
};
($predicate:tt, $($param:ident,)+) => {
sqlx::query_as!(
ProductPriceQueryResult,
r#"
@ -177,7 +243,7 @@ macro_rules! select_prices_with_predicate {
FROM products_prices
"#
+ $predicate,
$param
$($param),+
)
};
}
@ -231,33 +297,82 @@ impl DBProductPrice {
Ok(res.remove(&product_id).map(|x| x.1).unwrap_or_default())
}
pub async fn get_all_public_product_prices(
product_id: DBProductId,
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Vec<DBProductPrice>, DatabaseError> {
let res =
Self::get_all_public_products_prices(&[product_id], exec).await?;
Ok(res.remove(&product_id).map(|x| x.1).unwrap_or_default())
}
/// Gets all public prices for the given products. If a product has no public price,
/// it won't be included in the resulting map.
pub async fn get_all_public_products_prices(
product_ids: &[DBProductId],
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<DashMap<DBProductId, Vec<DBProductPrice>>, DatabaseError> {
Self::get_all_products_prices_with_visibility(
product_ids,
Some(true),
exec,
)
.await
}
pub async fn get_all_products_prices(
product_ids: &[DBProductId],
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<DashMap<DBProductId, Vec<DBProductPrice>>, DatabaseError> {
Self::get_all_products_prices_with_visibility(product_ids, None, exec)
.await
}
async fn get_all_products_prices_with_visibility(
product_ids: &[DBProductId],
public_filter: Option<bool>,
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<DashMap<DBProductId, Vec<DBProductPrice>>, DatabaseError> {
let ids = product_ids.iter().map(|id| id.0).collect_vec();
let ids_ref: &[i64] = &ids;
use futures_util::TryStreamExt;
let prices = select_prices_with_predicate!(
"WHERE product_id = ANY($1::bigint[])",
ids_ref
)
.fetch(exec)
.try_fold(
DashMap::new(),
|acc: DashMap<DBProductId, Vec<DBProductPrice>>, x| {
if let Ok(item) = <ProductPriceQueryResult as TryInto<
DBProductPrice,
>>::try_into(x)
{
acc.entry(item.product_id).or_default().push(item);
}
async move { Ok(acc) }
},
)
.await?;
let predicate = |acc: DashMap<DBProductId, Vec<DBProductPrice>>, x| {
if let Ok(item) = <ProductPriceQueryResult as TryInto<
DBProductPrice,
>>::try_into(x)
{
acc.entry(item.product_id).or_default().push(item);
}
async move { Ok(acc) }
};
let prices = match public_filter {
None => {
select_prices_with_predicate!(
"WHERE product_id = ANY($1::bigint[])",
ids_ref,
)
.fetch(exec)
.try_fold(DashMap::new(), predicate)
.await?
}
Some(public) => {
select_prices_with_predicate!(
"WHERE product_id = ANY($1::bigint[])
AND public = $2",
ids_ref,
public,
)
.fetch(exec)
.try_fold(DashMap::new(), predicate)
.await?
}
};
Ok(prices)
}

View File

@ -0,0 +1,301 @@
use crate::database::models::DBUserId;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::{query, query_scalar};
use std::fmt;
#[derive(
Debug, Default, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize,
)]
#[serde(rename_all = "snake_case")]
pub enum Offer {
#[default]
Medal,
}
impl Offer {
pub fn as_str(&self) -> &'static str {
match self {
Offer::Medal => "medal",
}
}
pub fn from_str_or_default(s: &str) -> Self {
match s {
"medal" => Offer::Medal,
_ => Offer::Medal,
}
}
}
impl fmt::Display for Offer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_str())
}
}
#[derive(
Debug, Default, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize,
)]
#[serde(rename_all = "snake_case")]
pub enum Status {
#[default]
Pending,
Processing,
Processed,
}
impl Status {
pub fn as_str(&self) -> &'static str {
match self {
Status::Pending => "pending",
Status::Processing => "processing",
Status::Processed => "processed",
}
}
pub fn from_str_or_default(s: &str) -> Self {
match s {
"pending" => Status::Pending,
"processing" => Status::Processing,
"processed" => Status::Processed,
_ => Status::default(),
}
}
}
impl fmt::Display for Status {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_str())
}
}
#[derive(Debug)]
pub struct UserRedeemal {
pub id: i32,
pub user_id: DBUserId,
pub offer: Offer,
pub redeemed: DateTime<Utc>,
pub last_attempt: Option<DateTime<Utc>>,
pub n_attempts: i32,
pub status: Status,
}
impl UserRedeemal {
pub async fn get_pending<'a, E>(
exec: E,
limit: i64,
) -> sqlx::Result<Vec<UserRedeemal>>
where
E: sqlx::PgExecutor<'a>,
{
let redeemals = query!(
r#"SELECT * FROM users_redeemals WHERE status = $1 LIMIT $2"#,
Status::Pending.as_str(),
limit
)
.fetch_all(exec)
.await?
.into_iter()
.map(|row| UserRedeemal {
id: row.id,
user_id: DBUserId(row.user_id),
offer: Offer::from_str_or_default(&row.offer),
redeemed: row.redeemed,
last_attempt: row.last_attempt,
n_attempts: row.n_attempts,
status: Status::from_str_or_default(&row.status),
})
.collect();
Ok(redeemals)
}
pub async fn update_stuck_5_minutes<'a, E>(exec: E) -> sqlx::Result<()>
where
E: sqlx::PgExecutor<'a>,
{
query!(
r#"
UPDATE users_redeemals
SET status = $1
WHERE
status = $2
AND NOW() - last_attempt > INTERVAL '5 minutes'
"#,
Status::Pending.as_str(),
Status::Processing.as_str(),
)
.execute(exec)
.await?;
Ok(())
}
pub async fn exists_by_user_and_offer<'a, E>(
exec: E,
user_id: DBUserId,
offer: Offer,
) -> sqlx::Result<bool>
where
E: sqlx::PgExecutor<'a>,
{
query_scalar!(
r#"SELECT
EXISTS (
SELECT
1
FROM
users_redeemals
WHERE
user_id = $1
AND offer = $2
) AS "exists!"
"#,
user_id.0,
offer.as_str(),
)
.fetch_one(exec)
.await
}
pub async fn insert<'a, E>(&mut self, exec: E) -> sqlx::Result<()>
where
E: sqlx::PgExecutor<'a>,
{
let query = query_scalar!(
r#"
INSERT INTO users_redeemals
(user_id, offer, redeemed, status, last_attempt, n_attempts)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING id"#,
self.user_id.0,
self.offer.as_str(),
self.redeemed,
self.status.as_str(),
self.last_attempt,
self.n_attempts,
);
let id = query.fetch_one(exec).await?;
self.id = id;
Ok(())
}
/// Updates `status`, `last_attempt`, and `n_attempts` only if `status` is currently pending.
/// Returns `true` if the status was updated, `false` otherwise.
pub async fn update_status_if_pending<'a, E>(
&self,
exec: E,
) -> sqlx::Result<bool>
where
E: sqlx::PgExecutor<'a>,
{
let query = query!(
r#"
UPDATE users_redeemals
SET
status = $3,
last_attempt = $4,
n_attempts = $5
WHERE id = $1 AND status = $2
"#,
self.id,
Status::Pending.as_str(),
self.status.as_str(),
self.last_attempt,
self.n_attempts,
);
let query_result = query.execute(exec).await?;
Ok(query_result.rows_affected() > 0)
}
pub async fn update<'a, E>(&self, exec: E) -> sqlx::Result<()>
where
E: sqlx::PgExecutor<'a>,
{
let query = query!(
r#"
UPDATE users_redeemals
SET
offer = $2,
status = $3,
redeemed = $4,
last_attempt = $5,
n_attempts = $6
WHERE id = $1
"#,
self.id,
self.offer.as_str(),
self.status.as_str(),
self.redeemed,
self.last_attempt,
self.n_attempts,
);
query.execute(exec).await?;
Ok(())
}
}
#[derive(Debug)]
pub struct RedeemalLookupFields {
pub user_id: DBUserId,
pub redeemal_status: Option<Status>,
}
impl RedeemalLookupFields {
/// Returns the redeemal status of a user for an offer, while looking up the user
/// itself. **This expects a single redeemal per user/offer pair**.
///
/// If the returned value is `Ok(None)`, the user doesn't exist.
///
/// If the returned value is `Ok(Some(fields))`, but `redeemal_status` is `None`,
/// the user exists and has not redeemed the offer.
pub async fn redeemal_status_by_username_and_offer<'a, E>(
exec: E,
user_username: &str,
offer: Offer,
) -> sqlx::Result<Option<RedeemalLookupFields>>
where
E: sqlx::PgExecutor<'a>,
{
let maybe_row = query!(
r#"
SELECT
users.id,
users_redeemals.status AS "status: Option<String>"
FROM
users
LEFT JOIN
users_redeemals ON users_redeemals.user_id = users.id
AND users_redeemals.offer = $2
WHERE
users.username = $1
ORDER BY
users_redeemals.redeemed DESC
LIMIT 1
"#,
user_username,
offer.as_str(),
)
.fetch_optional(exec)
.await?;
// If no row was returned, the user doesn't exist.
// If a row NULL status was returned, the user exists but has no redeemed the offer.
Ok(maybe_row.map(|row| RedeemalLookupFields {
user_id: DBUserId(row.id),
redeemal_status: row
.status
.as_deref()
.map(Status::from_str_or_default),
}))
}
}

View File

@ -24,6 +24,13 @@ pub enum ProductMetadata {
swap: u32,
storage: u32,
},
Medal {
cpu: u32,
ram: u32,
swap: u32,
storage: u32,
region: String,
},
}
#[derive(Serialize, Deserialize)]
@ -48,6 +55,7 @@ pub enum Price {
#[derive(Serialize, Deserialize, Hash, Eq, PartialEq, Debug, Copy, Clone)]
#[serde(rename_all = "kebab-case")]
pub enum PriceDuration {
FiveDays,
Monthly,
Quarterly,
Yearly,
@ -56,6 +64,7 @@ pub enum PriceDuration {
impl PriceDuration {
pub fn duration(&self) -> chrono::Duration {
match self {
PriceDuration::FiveDays => chrono::Duration::days(5),
PriceDuration::Monthly => chrono::Duration::days(30),
PriceDuration::Quarterly => chrono::Duration::days(90),
PriceDuration::Yearly => chrono::Duration::days(365),
@ -64,6 +73,7 @@ impl PriceDuration {
pub fn from_string(string: &str) -> PriceDuration {
match string {
"five-days" => PriceDuration::FiveDays,
"monthly" => PriceDuration::Monthly,
"quarterly" => PriceDuration::Quarterly,
"yearly" => PriceDuration::Yearly,
@ -76,6 +86,7 @@ impl PriceDuration {
PriceDuration::Monthly => "monthly",
PriceDuration::Quarterly => "quarterly",
PriceDuration::Yearly => "yearly",
PriceDuration::FiveDays => "five-days",
}
}
@ -84,6 +95,7 @@ impl PriceDuration {
PriceDuration::Monthly,
PriceDuration::Quarterly,
PriceDuration::Yearly,
PriceDuration::FiveDays,
]
.into_iter()
}
@ -146,6 +158,7 @@ impl SubscriptionStatus {
#[serde(tag = "type", rename_all = "kebab-case")]
pub enum SubscriptionMetadata {
Pyro { id: String, region: Option<String> },
Medal { id: String },
}
#[derive(Serialize, Deserialize)]
@ -207,6 +220,10 @@ pub enum ChargeStatus {
Succeeded,
Failed,
Cancelled,
// Expiring charges are charges that aren't expected to be processed
// but can be promoted to a full charge, like for trials/freebies. When
// due, the underlying subscription is unprovisioned.
Expiring,
}
impl ChargeStatus {
@ -217,6 +234,7 @@ impl ChargeStatus {
"failed" => ChargeStatus::Failed,
"open" => ChargeStatus::Open,
"cancelled" => ChargeStatus::Cancelled,
"expiring" => ChargeStatus::Expiring,
_ => ChargeStatus::Failed,
}
}
@ -228,6 +246,7 @@ impl ChargeStatus {
ChargeStatus::Failed => "failed",
ChargeStatus::Open => "open",
ChargeStatus::Cancelled => "cancelled",
ChargeStatus::Expiring => "expiring",
}
}
}
@ -235,12 +254,14 @@ impl ChargeStatus {
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
pub enum PaymentPlatform {
Stripe,
None,
}
impl PaymentPlatform {
pub fn from_string(string: &str) -> PaymentPlatform {
match string {
"stripe" => PaymentPlatform::Stripe,
"none" => PaymentPlatform::None,
_ => PaymentPlatform::Stripe,
}
}
@ -248,6 +269,7 @@ impl PaymentPlatform {
pub fn as_str(&self) -> &'static str {
match self {
PaymentPlatform::Stripe => "stripe",
PaymentPlatform::None => "none",
}
}
}

View File

View File

@ -1,5 +1,8 @@
use crate::auth::{get_user_from_headers, send_email};
use crate::database::models::charge_item::DBCharge;
use crate::database::models::user_item::DBUser;
use crate::database::models::user_subscription_item::DBUserSubscription;
use crate::database::models::users_redeemals::{self, UserRedeemal};
use crate::database::models::{
generate_charge_id, generate_user_subscription_id, product_item,
user_subscription_item,
@ -14,6 +17,7 @@ use crate::models::pats::Scopes;
use crate::models::users::Badges;
use crate::queue::session::AuthQueue;
use crate::routes::ApiError;
use crate::util::archon::{ArchonClient, CreateServerRequest, Specs};
use actix_web::{HttpRequest, HttpResponse, delete, get, patch, post, web};
use ariadne::ids::base62_impl::{parse_base62, to_base62};
use chrono::{Duration, Utc};
@ -59,8 +63,10 @@ pub async fn products(
pool: web::Data<PgPool>,
redis: web::Data<RedisPool>,
) -> Result<HttpResponse, ApiError> {
let products =
product_item::QueryProductWithPrices::list(&**pool, &redis).await?;
let products = product_item::QueryProductWithPrices::list_purchaseable(
&**pool, &redis,
)
.await?;
let products = products
.into_iter()
@ -182,7 +188,9 @@ pub async fn refund_charge(
ChargeStatus::Open
| ChargeStatus::Processing
| ChargeStatus::Succeeded => Some(x.amount),
ChargeStatus::Failed | ChargeStatus::Cancelled => None,
ChargeStatus::Failed
| ChargeStatus::Cancelled
| ChargeStatus::Expiring => None,
})
.sum::<i64>();
@ -256,6 +264,12 @@ pub async fn refund_charge(
));
}
}
PaymentPlatform::None => {
return Err(ApiError::InvalidInput(
"This charge was not processed via a payment platform."
.to_owned(),
));
}
}
};
@ -370,6 +384,7 @@ pub async fn edit_subscription(
})?;
if let Some(cancelled) = &edit_subscription.cancelled {
// Notably, cannot cancel/uncancel expiring charges.
if !matches!(
open_charge.status,
ChargeStatus::Open
@ -394,14 +409,17 @@ pub async fn edit_subscription(
if let Some(interval) = &edit_subscription.interval {
if let Price::Recurring { intervals } = &current_price.prices {
if let Some(price) = intervals.get(interval) {
open_charge.subscription_interval = Some(*interval);
open_charge.amount = *price as i64;
} else {
return Err(ApiError::InvalidInput(
"Interval is not valid for this subscription!"
.to_string(),
));
// For expiring charges, the interval is handled in the Product branch.
if open_charge.status != ChargeStatus::Expiring {
if let Some(price) = intervals.get(interval) {
open_charge.subscription_interval = Some(*interval);
open_charge.amount = *price as i64;
} else {
return Err(ApiError::InvalidInput(
"Interval is not valid for this subscription!"
.to_string(),
));
}
}
}
}
@ -429,48 +447,14 @@ pub async fn edit_subscription(
));
}
let interval = open_charge.due - Utc::now();
let duration = PriceDuration::Monthly;
// If the charge is an expiring charge, we need to create a payment
// intent as if the user was subscribing to the product, as opposed
// to a proration.
if open_charge.status == ChargeStatus::Expiring {
// We need a new interval when promoting the charge.
let interval = edit_subscription.interval
.ok_or_else(|| ApiError::InvalidInput("You need to specify an interval when promoting an expiring charge.".to_owned()))?;
let current_amount = match &current_price.prices {
Price::OneTime { price } => *price,
Price::Recurring { intervals } => *intervals.get(&duration).ok_or_else(|| {
ApiError::InvalidInput(
"Could not find a valid price for the user's duration".to_string(),
)
})?,
};
let amount = match &product_price.prices {
Price::OneTime { price } => *price,
Price::Recurring { intervals } => *intervals.get(&duration).ok_or_else(|| {
ApiError::InvalidInput(
"Could not find a valid price for the user's duration".to_string(),
)
})?,
};
let complete = Decimal::from(interval.num_seconds())
/ Decimal::from(duration.duration().num_seconds());
let proration = (Decimal::from(amount - current_amount) * complete)
.floor()
.to_i32()
.ok_or_else(|| {
ApiError::InvalidInput(
"Could not convert proration to i32".to_string(),
)
})?;
// First branch: Plan downgrade, update future charge
// Second branch: For small transactions (under 30 cents), we make a loss on the
// proration due to fees. In these situations, just give it to them for free, because
// their next charge will be in a day or two anyway.
if current_amount > amount || proration < 30 {
open_charge.price_id = product_price.id;
open_charge.amount = amount as i64;
None
} else {
let charge_id = generate_charge_id(&mut transaction).await?;
let customer_id = get_or_create_customer(
@ -483,6 +467,15 @@ pub async fn edit_subscription(
)
.await?;
let new_price_value = match product_price.prices {
Price::OneTime { ref price } => *price,
Price::Recurring { ref intervals } => {
*intervals
.get(&interval)
.ok_or_else(|| ApiError::InvalidInput("Could not find a valid price for the specified duration".to_owned()))?
}
};
let currency = Currency::from_str(
&current_price.currency_code.to_lowercase(),
)
@ -491,7 +484,7 @@ pub async fn edit_subscription(
})?;
let mut intent =
CreatePaymentIntent::new(proration as i64, currency);
CreatePaymentIntent::new(new_price_value as i64, currency);
let mut metadata = HashMap::new();
metadata.insert(
@ -512,15 +505,11 @@ pub async fn edit_subscription(
);
metadata.insert(
"modrinth_subscription_interval".to_string(),
open_charge
.subscription_interval
.unwrap_or(PriceDuration::Monthly)
.as_str()
.to_string(),
interval.as_str().to_string(),
);
metadata.insert(
"modrinth_charge_type".to_string(),
ChargeType::Proration.as_str().to_string(),
ChargeType::Subscription.as_str().to_string(),
);
intent.customer = Some(customer_id);
@ -545,7 +534,139 @@ pub async fn edit_subscription(
stripe::PaymentIntent::create(&stripe_client, intent)
.await?;
Some((proration, 0, intent))
// We do NOT update the open charge here. It will be patched to be the next
// charge of the subscription in the stripe webhook.
//
// We also shouldn't delete it, because if the payment fails, the expiring
// charge will be gone and the preview subscription will never be unprovisioned.
Some((new_price_value, 0, intent))
} else {
// The charge is not an expiring charge, need to prorate.
let interval = open_charge.due - Utc::now();
let duration = PriceDuration::Monthly;
let current_amount = match &current_price.prices {
Price::OneTime { price } => *price,
Price::Recurring { intervals } => *intervals.get(&duration).ok_or_else(|| {
ApiError::InvalidInput(
"Could not find a valid price for the user's duration".to_string(),
)
})?,
};
let amount = match &product_price.prices {
Price::OneTime { price } => *price,
Price::Recurring { intervals } => *intervals.get(&duration).ok_or_else(|| {
ApiError::InvalidInput(
"Could not find a valid price for the user's duration".to_string(),
)
})?,
};
let complete = Decimal::from(interval.num_seconds())
/ Decimal::from(duration.duration().num_seconds());
let proration = (Decimal::from(amount - current_amount)
* complete)
.floor()
.to_i32()
.ok_or_else(|| {
ApiError::InvalidInput(
"Could not convert proration to i32".to_string(),
)
})?;
// First condition: Plan downgrade, update future charge
// Second condition: For small transactions (under 30 cents), we make a loss on the
// proration due to fees. In these situations, just give it to them for free, because
// their next charge will be in a day or two anyway.
if current_amount > amount || proration < 30 {
open_charge.price_id = product_price.id;
open_charge.amount = amount as i64;
None
} else {
let charge_id =
generate_charge_id(&mut transaction).await?;
let customer_id = get_or_create_customer(
user.id,
user.stripe_customer_id.as_deref(),
user.email.as_deref(),
&stripe_client,
&pool,
&redis,
)
.await?;
let currency = Currency::from_str(
&current_price.currency_code.to_lowercase(),
)
.map_err(|_| {
ApiError::InvalidInput(
"Invalid currency code".to_string(),
)
})?;
let mut intent =
CreatePaymentIntent::new(proration as i64, currency);
let mut metadata = HashMap::new();
metadata.insert(
"modrinth_user_id".to_string(),
to_base62(user.id.0),
);
metadata.insert(
"modrinth_charge_id".to_string(),
to_base62(charge_id.0 as u64),
);
metadata.insert(
"modrinth_subscription_id".to_string(),
to_base62(subscription.id.0 as u64),
);
metadata.insert(
"modrinth_price_id".to_string(),
to_base62(product_price.id.0 as u64),
);
metadata.insert(
"modrinth_subscription_interval".to_string(),
open_charge
.subscription_interval
.unwrap_or(PriceDuration::Monthly)
.as_str()
.to_string(),
);
metadata.insert(
"modrinth_charge_type".to_string(),
ChargeType::Proration.as_str().to_string(),
);
intent.customer = Some(customer_id);
intent.metadata = Some(metadata);
intent.receipt_email = user.email.as_deref();
intent.setup_future_usage =
Some(PaymentIntentSetupFutureUsage::OffSession);
if let Some(payment_method) =
&edit_subscription.payment_method
{
let Ok(payment_method_id) =
PaymentMethodId::from_str(payment_method)
else {
return Err(ApiError::InvalidInput(
"Invalid payment method id".to_string(),
));
};
intent.payment_method = Some(payment_method_id);
}
let intent =
stripe::PaymentIntent::create(&stripe_client, intent)
.await?;
Some((proration, 0, intent))
}
}
} else {
None
@ -948,14 +1069,17 @@ pub async fn active_servers(
let server_ids = servers
.into_iter()
.filter_map(|x| {
x.metadata.as_ref().map(|metadata| match metadata {
SubscriptionMetadata::Pyro { id, region } => ActiveServer {
user_id: x.user_id.into(),
server_id: id.clone(),
price_id: x.price_id.into(),
interval: x.interval,
region: region.clone(),
},
x.metadata.as_ref().and_then(|metadata| match metadata {
SubscriptionMetadata::Pyro { id, region } => {
Some(ActiveServer {
user_id: x.user_id.into(),
server_id: id.clone(),
price_id: x.price_id.into(),
interval: x.interval,
region: region.clone(),
})
}
SubscriptionMetadata::Medal { .. } => None,
})
})
.collect::<Vec<ActiveServer>>();
@ -1187,7 +1311,7 @@ pub async fn initiate_payment(
})?;
let mut product_prices =
product_item::DBProductPrice::get_all_product_prices(
product_item::DBProductPrice::get_all_public_product_prices(
product.id, &**pool,
)
.await?;
@ -1704,6 +1828,13 @@ pub async fn stripe_webhook(
// Provision subscription
match metadata.product_item.metadata {
// A payment shouldn't be processed for Medal subscriptions.
ProductMetadata::Medal { .. } => {
warn!(
"A payment processed for a free subscription"
);
}
ProductMetadata::Midas => {
let badges =
metadata.user_item.badges | Badges::MIDAS;
@ -1833,6 +1964,7 @@ pub async fn stripe_webhook(
"region": server_region,
"source": source,
"payment_interval": metadata.charge_item.subscription_interval.map(|x| match x {
PriceDuration::FiveDays => 1,
PriceDuration::Monthly => 1,
PriceDuration::Quarterly => 3,
PriceDuration::Yearly => 12,
@ -1879,10 +2011,32 @@ pub async fn stripe_webhook(
}
};
// If the next open charge is actually an expiring charge,
// this means the subscription was promoted from a temporary
// free subscription to a paid subscription.
//
// In this case, we need to modify this expiring charge to be the
// next charge of the subscription, turn it into a normal open charge.
//
// Otherwise, if there *is* an open charge, the subscription was upgraded
// and the just-processed payment was the proration charge. In this case,
// the existing open charge must be updated to reflect the new product's price.
//
// If there are no open charges, the just-processed payment was a recurring
// or initial subscription charge, and we need to create the next charge.
if let Some(mut charge) = open_charge {
charge.price_id = metadata.product_price_item.id;
charge.amount = new_price as i64;
if charge.status == ChargeStatus::Expiring {
charge.status = ChargeStatus::Open;
charge.due = Utc::now()
+ subscription.interval.duration();
charge.payment_platform =
PaymentPlatform::Stripe;
charge.last_attempt = None;
} else {
charge.price_id =
metadata.product_price_item.id;
charge.amount = new_price as i64;
}
charge.upsert(&mut transaction).await?;
} else if metadata.charge_item.status
!= ChargeStatus::Cancelled
@ -2105,7 +2259,11 @@ pub async fn index_subscriptions(pool: PgPool, redis: RedisPool) {
let mut transaction = pool.begin().await?;
let mut clear_cache_users = Vec::new();
// If an active subscription has a canceled charge OR a failed charge more than two days ago, it should be cancelled
// If an active subscription has:
// - A canceled charge due now
// - An expiring charge due now
// - A failed charge more than two days ago
// It should be unprovisioned.
let all_charges = DBCharge::get_unprovision(&pool).await?;
let mut all_subscriptions =
@ -2201,33 +2359,37 @@ pub async fn index_subscriptions(pool: PgPool, redis: RedisPool) {
true
}
ProductMetadata::Pyro { .. } => {
if let Some(SubscriptionMetadata::Pyro { id, region: _ }) =
&subscription.metadata
{
let res = reqwest::Client::new()
.post(format!(
"{}/modrinth/v0/servers/{}/suspend",
dotenvy::var("ARCHON_URL")?,
id
))
.header("X-Master-Key", dotenvy::var("PYRO_API_KEY")?)
.json(&serde_json::json!({
"reason": if charge.status == ChargeStatus::Cancelled {
"cancelled"
} else {
"paymentfailed"
}
}))
.send()
.await;
if let Err(e) = res {
warn!("Error suspending pyro server: {:?}", e);
false
} else {
true
ProductMetadata::Pyro { .. }
| ProductMetadata::Medal { .. } => 'server: {
let server_id = match &subscription.metadata {
Some(SubscriptionMetadata::Pyro { id, region: _ }) => {
id
}
Some(SubscriptionMetadata::Medal { id }) => id,
_ => break 'server true,
};
let res = reqwest::Client::new()
.post(format!(
"{}/modrinth/v0/servers/{}/suspend",
dotenvy::var("ARCHON_URL")?,
server_id
))
.header("X-Master-Key", dotenvy::var("PYRO_API_KEY")?)
.json(&serde_json::json!({
"reason": if charge.status == ChargeStatus::Cancelled || charge.status == ChargeStatus::Expiring {
"cancelled"
} else {
"paymentfailed"
}
}))
.send()
.await;
if let Err(e) = res {
warn!("Error suspending pyro server: {:?}", e);
false
} else {
true
}
@ -2252,6 +2414,20 @@ pub async fn index_subscriptions(pool: PgPool, redis: RedisPool) {
.await?;
transaction.commit().await?;
// If an offer redeemal has been processing for over 5 minutes, it should be set pending.
UserRedeemal::update_stuck_5_minutes(&pool).await?;
// If an offer redeemal is pending, try processing it.
// Try processing it.
let pending_redeemals = UserRedeemal::get_pending(&pool, 100).await?;
for redeemal in pending_redeemals {
if let Err(error) =
try_process_user_redeemal(&pool, &redis, redeemal).await
{
warn!(%error, "Failed to process a redeemal.")
}
}
Ok::<(), ApiError>(())
};
@ -2262,6 +2438,161 @@ pub async fn index_subscriptions(pool: PgPool, redis: RedisPool) {
info!("Done indexing subscriptions");
}
/// Attempts to process a user redeemal.
///
/// Returns `Ok` if the entry has been succesfully processed, or will not be processed.
pub async fn try_process_user_redeemal(
pool: &PgPool,
redis: &RedisPool,
mut user_redeemal: UserRedeemal,
) -> Result<(), ApiError> {
// Immediately update redeemal row
user_redeemal.last_attempt = Some(Utc::now());
user_redeemal.n_attempts += 1;
user_redeemal.status = users_redeemals::Status::Processing;
let updated = user_redeemal.update_status_if_pending(pool).await?;
if !updated {
return Ok(());
}
let user_id = user_redeemal.user_id;
// Find the Medal product's price & metadata
let mut medal_products =
product_item::QueryProductWithPrices::list_by_product_type(
pool, "medal",
)
.await?;
let Some(product_item::QueryProductWithPrices {
id: _product_id,
metadata,
mut prices,
unitary: _,
}) = medal_products.pop()
else {
return Err(ApiError::Conflict(
"Missing Medal subscription product".to_owned(),
));
};
let ProductMetadata::Medal {
cpu,
ram,
swap,
storage,
region,
} = metadata
else {
return Err(ApiError::Conflict(
"Missing or incorrect metadata for Medal subscription".to_owned(),
));
};
let Some(medal_price) = prices.pop() else {
return Err(ApiError::Conflict(
"Missing price for Medal subscription".to_owned(),
));
};
let (price_duration, price_amount) = match medal_price.prices {
Price::OneTime { price: _ } => {
return Err(ApiError::Conflict(
"Unexpected metadata for Medal subscription price".to_owned(),
));
}
Price::Recurring { intervals } => {
let Some((price_duration, price_amount)) =
intervals.into_iter().next()
else {
return Err(ApiError::Conflict(
"Missing price interval for Medal subscription".to_owned(),
));
};
(price_duration, price_amount)
}
};
let price_id = medal_price.id;
// Get the user's username
let user = DBUser::get_id(user_id, pool, redis)
.await?
.ok_or(ApiError::NotFound)?;
// Send the provision request to Archon. On failure, the redeemal will be "stuck" processing,
// and moved back to pending by `index_subscriptions`.
let archon_client = ArchonClient::from_env()?;
let server_id = archon_client
.create_server(&CreateServerRequest {
user_id: to_base62(user_id.0 as u64),
name: format!("{}'s Medal server", user.username),
specs: Specs {
memory_mb: ram,
cpu,
swap_mb: swap,
storage_mb: storage,
},
source: crate::util::archon::Empty::default(),
region,
tags: vec!["medal".to_owned()],
})
.await?;
let mut txn = pool.begin().await?;
// Build a subscription using this price ID.
let subscription = DBUserSubscription {
id: generate_user_subscription_id(&mut txn).await?,
user_id,
price_id,
interval: PriceDuration::FiveDays,
created: Utc::now(),
status: SubscriptionStatus::Provisioned,
metadata: Some(SubscriptionMetadata::Medal {
id: server_id.to_string(),
}),
};
subscription.upsert(&mut txn).await?;
// Insert an expiring charge, `index_subscriptions` will unprovision the
// subscription when expired.
DBCharge {
id: generate_charge_id(&mut txn).await?,
user_id,
price_id,
amount: price_amount.into(),
currency_code: medal_price.currency_code,
status: ChargeStatus::Expiring,
due: Utc::now() + price_duration.duration(),
last_attempt: None,
type_: ChargeType::Subscription,
subscription_id: Some(subscription.id),
subscription_interval: Some(subscription.interval),
payment_platform: PaymentPlatform::None,
payment_platform_id: None,
parent_charge_id: None,
net: None,
}
.upsert(&mut txn)
.await?;
// Update `users_redeemal`, mark subscription as redeemed.
user_redeemal.status = users_redeemals::Status::Processed;
user_redeemal.update(&mut *txn).await?;
txn.commit().await?;
Ok(())
}
pub async fn index_billing(
stripe_client: stripe::Client,
pool: PgPool,

View File

@ -0,0 +1,109 @@
use actix_web::{HttpResponse, post, web};
use ariadne::ids::UserId;
use chrono::Utc;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use tracing::warn;
use crate::database::models::users_redeemals::{
Offer, RedeemalLookupFields, Status, UserRedeemal,
};
use crate::database::redis::RedisPool;
use crate::routes::ApiError;
use crate::routes::internal::billing::try_process_user_redeemal;
use crate::util::guards::medal_key_guard;
pub fn config(cfg: &mut web::ServiceConfig) {
cfg.service(web::scope("medal").service(verify).service(redeem));
}
#[derive(Deserialize)]
struct MedalQuery {
username: String,
}
#[post("verify", guard = "medal_key_guard")]
pub async fn verify(
pool: web::Data<PgPool>,
web::Query(MedalQuery { username }): web::Query<MedalQuery>,
) -> Result<HttpResponse, ApiError> {
let maybe_fields =
RedeemalLookupFields::redeemal_status_by_username_and_offer(
&**pool,
&username,
Offer::Medal,
)
.await?;
#[derive(Serialize)]
struct VerifyResponse {
user_id: UserId,
redeemed: bool,
}
match maybe_fields {
None => Err(ApiError::NotFound),
Some(fields) => Ok(HttpResponse::Ok().json(VerifyResponse {
user_id: fields.user_id.into(),
redeemed: fields.redeemal_status.is_some(),
})),
}
}
#[post("redeem", guard = "medal_key_guard")]
pub async fn redeem(
pool: web::Data<PgPool>,
redis: web::Data<RedisPool>,
web::Query(MedalQuery { username }): web::Query<MedalQuery>,
) -> Result<HttpResponse, ApiError> {
// Check the offer hasn't been redeemed yet, then insert into the table.
// In a transaction to avoid double inserts.
let mut txn = pool.begin().await?;
let maybe_fields =
RedeemalLookupFields::redeemal_status_by_username_and_offer(
&mut *txn,
&username,
Offer::Medal,
)
.await?;
let user_id = match maybe_fields {
None => return Err(ApiError::NotFound),
Some(fields) => {
if fields.redeemal_status.is_some() {
return Err(ApiError::Conflict(
"User already redeemed this offer".to_string(),
));
}
fields.user_id
}
};
// Link user to offer redeemal.
let mut redeemal = UserRedeemal {
id: 0,
user_id,
offer: Offer::Medal,
redeemed: Utc::now(),
status: Status::Pending,
last_attempt: None,
n_attempts: 0,
};
redeemal.insert(&mut *txn).await?;
txn.commit().await?;
// Immediately try to process the redeemal
if let Err(error) = try_process_user_redeemal(&pool, &redis, redeemal).await
{
warn!(%error, "Medal redeemal processing failed");
Ok(HttpResponse::Accepted().finish())
} else {
Ok(HttpResponse::Created().finish())
}
}

View File

@ -2,6 +2,7 @@ pub(crate) mod admin;
pub mod billing;
pub mod flows;
pub mod gdpr;
pub mod medal;
pub mod moderation;
pub mod pats;
pub mod session;
@ -24,6 +25,7 @@ pub fn config(cfg: &mut actix_web::web::ServiceConfig) {
.configure(moderation::config)
.configure(billing::config)
.configure(gdpr::config)
.configure(statuses::config),
.configure(statuses::config)
.configure(medal::config),
);
}

View File

@ -137,6 +137,8 @@ pub enum ApiError {
Io(#[from] std::io::Error),
#[error("Resource not found")]
NotFound,
#[error("Conflict: {0}")]
Conflict(String),
#[error(
"You are being rate-limited. Please wait {0} milliseconds. 0/{1} remaining."
)]
@ -172,6 +174,7 @@ impl ApiError {
ApiError::Clickhouse(..) => "clickhouse_error",
ApiError::Reroute(..) => "reroute_error",
ApiError::NotFound => "not_found",
ApiError::Conflict(..) => "conflict",
ApiError::Zip(..) => "zip_error",
ApiError::Io(..) => "io_error",
ApiError::RateLimitError(..) => "ratelimit_error",
@ -208,6 +211,7 @@ impl actix_web::ResponseError for ApiError {
ApiError::Mail(..) => StatusCode::INTERNAL_SERVER_ERROR,
ApiError::Reroute(..) => StatusCode::INTERNAL_SERVER_ERROR,
ApiError::NotFound => StatusCode::NOT_FOUND,
ApiError::Conflict(..) => StatusCode::CONFLICT,
ApiError::Zip(..) => StatusCode::BAD_REQUEST,
ApiError::Io(..) => StatusCode::BAD_REQUEST,
ApiError::RateLimitError(..) => StatusCode::TOO_MANY_REQUESTS,

View File

@ -0,0 +1,75 @@
use reqwest::header::HeaderName;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::routes::ApiError;
const X_MASTER_KEY: HeaderName = HeaderName::from_static("x-master-key");
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct Empty {}
#[derive(Debug, Serialize, Deserialize)]
pub struct Specs {
pub memory_mb: u32,
pub cpu: u32,
pub swap_mb: u32,
pub storage_mb: u32,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateServerRequest {
pub user_id: String,
pub name: String,
pub specs: Specs,
// Must be included because archon doesn't accept null values, only
// an empty struct, as a source.
pub source: Empty,
pub region: String,
pub tags: Vec<String>,
}
#[derive(Clone)]
pub struct ArchonClient {
client: reqwest::Client,
base_url: String,
pyro_api_key: String,
}
impl ArchonClient {
/// Builds an Archon client from environment variables. Returns `None` if the
/// required environment variables are not set.
pub fn from_env() -> Result<Self, ApiError> {
let client = reqwest::Client::new();
let base_url =
dotenvy::var("ARCHON_URL")?.trim_end_matches('/').to_owned();
Ok(Self {
client,
base_url,
pyro_api_key: dotenvy::var("PYRO_API_KEY")?,
})
}
pub async fn create_server(
&self,
request: &CreateServerRequest,
) -> Result<Uuid, reqwest::Error> {
#[derive(Deserialize)]
struct CreateServerResponse {
uuid: Uuid,
}
let response = self
.client
.post(format!("{}/modrinth/v0/servers/create", self.base_url))
.header(X_MASTER_KEY, &self.pyro_api_key)
.json(request)
.send()
.await?
.error_for_status()?;
Ok(response.json::<CreateServerResponse>().await?.uuid)
}
}

View File

@ -1,6 +1,8 @@
use actix_web::guard::GuardContext;
pub const ADMIN_KEY_HEADER: &str = "Modrinth-Admin";
pub const MEDAL_KEY_HEADER: &str = "X-Medal-Access-Key";
pub fn admin_key_guard(ctx: &GuardContext) -> bool {
let admin_key = std::env::var("LABRINTH_ADMIN_KEY").expect(
"No admin key provided, this should have been caught by check_env_vars",
@ -10,3 +12,16 @@ pub fn admin_key_guard(ctx: &GuardContext) -> bool {
.get(ADMIN_KEY_HEADER)
.is_some_and(|it| it.as_bytes() == admin_key.as_bytes())
}
pub fn medal_key_guard(ctx: &GuardContext) -> bool {
let maybe_medal_key = dotenvy::var("LABRINTH_MEDAL_KEY").ok();
match maybe_medal_key {
None => false,
Some(medal_key) => ctx
.head()
.headers()
.get(MEDAL_KEY_HEADER)
.is_some_and(|it| it.as_bytes() == medal_key.as_bytes()),
}
}

View File

@ -1,4 +1,5 @@
pub mod actix;
pub mod archon;
pub mod bitflag;
pub mod captcha;
pub mod cors;