Create server on redeem

This commit is contained in:
fetch
2025-08-06 03:31:39 -04:00
parent 84cfd21920
commit 360d24f2e0
4 changed files with 276 additions and 82 deletions

View File

@@ -57,6 +57,26 @@ impl DBProduct {
Ok(Self::get_many(&[id], exec).await?.into_iter().next())
}
pub async fn get_by_type<'a, E>(
exec: E,
r#type: &str,
) -> Result<Vec<Self>, DatabaseError>
where
E: sqlx::PgExecutor<'a>,
{
let maybe_row = select_products_with_predicate!(
"WHERE metadata ->> 'type' = $1",
r#type
)
.fetch_all(exec)
.await?;
maybe_row
.into_iter()
.map(|r| r.try_into().map_err(Into::into))
.collect()
}
pub async fn get_many(
ids: &[DBProductId],
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
@@ -104,7 +124,7 @@ impl QueryProductWithPrices {
pub async fn list_purchaseable<'a, E>(
exec: E,
redis: &RedisPool,
) -> Result<Vec<QueryProductWithPrices>, DatabaseError>
) -> Result<Vec<Self>, DatabaseError>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
{
@@ -154,6 +174,45 @@ impl QueryProductWithPrices {
Ok(products)
}
pub async fn list_by_product_type<'a, E>(
exec: E,
r#type: &str,
) -> Result<Vec<Self>, DatabaseError>
where
E: sqlx::PgExecutor<'a> + Copy,
{
let all_products = DBProduct::get_by_type(exec, r#type).await?;
let prices = DBProductPrice::get_all_products_prices(
&all_products.iter().map(|x| x.id).collect::<Vec<_>>(),
exec,
)
.await?;
let products = all_products
.into_iter()
.filter_map(|x| {
Some(QueryProductWithPrices {
id: x.id,
metadata: x.metadata,
prices: prices
.remove(&x.id)
.map(|x| x.1)?
.into_iter()
.map(|x| DBProductPrice {
id: x.id,
product_id: x.product_id,
prices: x.prices,
currency_code: x.currency_code,
})
.collect(),
unitary: x.unitary,
})
})
.collect::<Vec<_>>();
Ok(products)
}
}
#[derive(Deserialize, Serialize)]
@@ -172,7 +231,11 @@ struct ProductPriceQueryResult {
}
macro_rules! select_prices_with_predicate {
($predicate:tt, $param:ident) => {
($predicate:tt, $param1:ident) => {
select_prices_with_predicate!($predicate, $param1, )
};
($predicate:tt, $($param:ident,)+) => {
sqlx::query_as!(
ProductPriceQueryResult,
r#"
@@ -180,7 +243,7 @@ macro_rules! select_prices_with_predicate {
FROM products_prices
"#
+ $predicate,
$param
$($param),+
)
};
}
@@ -240,58 +303,68 @@ impl DBProductPrice {
pub async fn get_all_public_products_prices(
product_ids: &[DBProductId],
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<DashMap<DBProductId, Vec<DBProductPrice>>, DatabaseError> {
Self::get_all_products_prices_with_visibility(
product_ids,
Some(true),
exec,
)
.await
}
pub async fn get_all_products_prices(
product_ids: &[DBProductId],
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<DashMap<DBProductId, Vec<DBProductPrice>>, DatabaseError> {
Self::get_all_products_prices_with_visibility(product_ids, None, exec)
.await
}
async fn get_all_products_prices_with_visibility(
product_ids: &[DBProductId],
public_filter: Option<bool>,
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<DashMap<DBProductId, Vec<DBProductPrice>>, DatabaseError> {
let ids = product_ids.iter().map(|id| id.0).collect_vec();
let ids_ref: &[i64] = &ids;
use futures_util::TryStreamExt;
let prices = select_prices_with_predicate!(
"WHERE product_id = ANY($1::bigint[])
AND public = true",
ids_ref
)
.fetch(exec)
.try_fold(
DashMap::new(),
|acc: DashMap<DBProductId, Vec<DBProductPrice>>, x| {
if let Ok(item) = <ProductPriceQueryResult as TryInto<
DBProductPrice,
>>::try_into(x)
{
acc.entry(item.product_id).or_default().push(item);
}
async move { Ok(acc) }
},
)
.await?;
let predicate = |acc: DashMap<DBProductId, Vec<DBProductPrice>>, x| {
if let Ok(item) = <ProductPriceQueryResult as TryInto<
DBProductPrice,
>>::try_into(x)
{
acc.entry(item.product_id).or_default().push(item);
}
async move { Ok(acc) }
};
let prices = match public_filter {
None => {
select_prices_with_predicate!(
"WHERE product_id = ANY($1::bigint[])",
ids_ref,
)
.fetch(exec)
.try_fold(DashMap::new(), predicate)
.await?
}
Some(public) => {
select_prices_with_predicate!(
"WHERE product_id = ANY($1::bigint[])
AND public = $2",
ids_ref,
public,
)
.fetch(exec)
.try_fold(DashMap::new(), predicate)
.await?
}
};
Ok(prices)
}
}
/// Queries a single price ID for a product by type.
pub async fn unique_price_id_of_product_by_type<'a, E>(
exec: E,
r#type: &str,
) -> Result<Option<DBProductPriceId>, DatabaseError>
where
E: sqlx::PgExecutor<'a>,
{
let maybe_row = sqlx::query!(
"
SELECT
products_prices.id
FROM
products_prices
INNER JOIN
products ON products.metadata ->> 'type' = $1
LIMIT 1
",
r#type,
)
.fetch_optional(exec)
.await?;
Ok(maybe_row.map(|x| DBProductPriceId(x.id)))
}

View File

@@ -29,6 +29,7 @@ pub enum ProductMetadata {
ram: u32,
swap: u32,
storage: u32,
region: String,
},
}
@@ -219,6 +220,10 @@ pub enum ChargeStatus {
Succeeded,
Failed,
Cancelled,
// Expiring charges are charges that aren't expected to be processed
// but can be promoted to a full charge, like for trials/freebies. When
// due, the underlying subscription is unprovisioned.
Expiring,
}
impl ChargeStatus {
@@ -229,6 +234,7 @@ impl ChargeStatus {
"failed" => ChargeStatus::Failed,
"open" => ChargeStatus::Open,
"cancelled" => ChargeStatus::Cancelled,
"expiring" => ChargeStatus::Expiring,
_ => ChargeStatus::Failed,
}
}
@@ -240,6 +246,7 @@ impl ChargeStatus {
ChargeStatus::Failed => "failed",
ChargeStatus::Open => "open",
ChargeStatus::Cancelled => "cancelled",
ChargeStatus::Expiring => "expiring",
}
}
}
@@ -247,12 +254,14 @@ impl ChargeStatus {
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
pub enum PaymentPlatform {
Stripe,
None,
}
impl PaymentPlatform {
pub fn from_string(string: &str) -> PaymentPlatform {
match string {
"stripe" => PaymentPlatform::Stripe,
"none" => PaymentPlatform::None,
_ => PaymentPlatform::Stripe,
}
}
@@ -260,6 +269,7 @@ impl PaymentPlatform {
pub fn as_str(&self) -> &'static str {
match self {
PaymentPlatform::Stripe => "stripe",
PaymentPlatform::None => "none",
}
}
}

View File

@@ -184,7 +184,9 @@ pub async fn refund_charge(
ChargeStatus::Open
| ChargeStatus::Processing
| ChargeStatus::Succeeded => Some(x.amount),
ChargeStatus::Failed | ChargeStatus::Cancelled => None,
ChargeStatus::Failed
| ChargeStatus::Cancelled
| ChargeStatus::Expiring => None,
})
.sum::<i64>();
@@ -258,6 +260,12 @@ pub async fn refund_charge(
));
}
}
PaymentPlatform::None => {
return Err(ApiError::InvalidInput(
"This charge was not processed via a payment platform."
.to_owned(),
));
}
}
};
@@ -1714,6 +1722,7 @@ pub async fn stripe_webhook(
ram: _,
swap: _,
storage: _,
region: _,
} => {
todo!(
"Promote Medal subscription to Pyro subscription"

View File

@@ -1,16 +1,23 @@
use actix_web::{HttpResponse, post, web};
use ariadne::ids::UserId;
use ariadne::ids::base62_impl::to_base62;
use chrono::Utc;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use crate::database::models::generate_user_subscription_id;
use crate::database::models::charge_item::DBCharge;
use crate::database::models::product_item;
use crate::database::models::user_subscription_item::DBUserSubscription;
use crate::database::models::users_redeemals::{
Offer, RedeemalLookupFields, Status, UserRedeemal,
};
use crate::models::v3::billing::{PriceDuration, SubscriptionStatus};
use crate::database::models::{
generate_charge_id, generate_user_subscription_id,
};
use crate::models::v3::billing::{
ChargeStatus, ChargeType, PaymentPlatform, Price, PriceDuration,
ProductMetadata, SubscriptionMetadata, SubscriptionStatus,
};
use crate::routes::ApiError;
use crate::util::guards::medal_key_guard;
@@ -58,17 +65,15 @@ pub async fn redeem(
) -> Result<HttpResponse, ApiError> {
// Check the offer hasn't been redeemed yet, then insert into the table.
let mut txn = pool.begin().await?;
let maybe_fields =
RedeemalLookupFields::redeemal_status_by_username_and_offer(
&mut *txn,
&**pool,
&username,
Offer::Medal,
)
.await?;
let redeemal = match maybe_fields {
let user_id = match maybe_fields {
None => return Err(ApiError::NotFound),
Some(fields) => {
if fields.redeemal_status.is_some() {
@@ -77,51 +82,148 @@ pub async fn redeem(
));
}
let mut redeemal = UserRedeemal {
id: 0,
user_id: fields.user_id,
offer: Offer::Medal,
redeemed: Utc::now(),
status: Status::Pending,
};
redeemal.insert(&mut *txn).await?;
redeemal
fields.user_id
}
};
txn.commit().await?;
// TODO: Provision server (send archon request) THEN add subscription to DB
let mut txn = pool.begin().await?;
let client = reqwest::Client::new();
// Find the Medal product price
let maybe_price_id =
product_item::unique_price_id_of_product_by_type(&mut *txn, "medal")
.await?;
let mut medal_products =
product_item::QueryProductWithPrices::list_by_product_type(
&**pool, "medal",
)
.await?;
let Some(medal_price_id) = maybe_price_id else {
let Some(product_item::QueryProductWithPrices {
id: _product_id,
metadata,
mut prices,
unitary: _,
}) = medal_products.pop()
else {
return Ok(HttpResponse::NotImplemented()
.body("Missing price ID for Medal subscription"));
.body("Missing Medal subscription product"));
};
let ProductMetadata::Medal {
cpu,
ram,
swap,
storage,
region,
} = metadata
else {
return Ok(HttpResponse::NotImplemented()
.body("Missing or incorrect metadata for Medal subscription"));
};
let Some(medal_price) = prices.pop() else {
return Ok(HttpResponse::NotImplemented()
.body("Missing price for Medal subscription"));
};
let (price_duration, price_amount) = match medal_price.prices {
Price::OneTime { price: _ } => {
return Ok(HttpResponse::NotImplemented()
.body("Unexpected metadata for Medal subscription price"));
}
Price::Recurring { intervals } => {
let Some((price_duration, price_amount)) =
intervals.into_iter().next()
else {
return Ok(HttpResponse::NotImplemented()
.body("Missing price interval for Medal subscription"));
};
(price_duration, price_amount)
}
};
let price_id = medal_price.id;
#[derive(Deserialize)]
struct PyroServerResponse {
uuid: uuid::Uuid,
}
// TODO: archon-client module
let pyro_response = client
.post(format!(
"{}/modrinth/v0/servers/create",
dotenvy::var("ARCHON_URL")?,
))
.header("X-Master-Key", dotenvy::var("PYRO_API_KEY")?)
.json(&serde_json::json!({
"user_id": to_base62(user_id.0 as u64),
"name": format!("{}'s Medal server", username),
"specs": {
"memory_mb": ram,
"cpu": cpu,
"swap_mb": swap,
"storage_mb": storage,
},
"region": region,
"source": {}, // Don't install anything by default (field is ignored on Archon anyways)
"payment_interval": 1, // Doesn't matter, not used on Archon anymore anyways
}))
.send()
.await?
.error_for_status()?
.json::<PyroServerResponse>()
.await?;
let mut txn = pool.begin().await?;
// Build a subscription using this price ID.
let subscription = DBUserSubscription {
id: generate_user_subscription_id(&mut txn).await?,
user_id: redeemal.user_id,
price_id: medal_price_id,
user_id,
price_id,
interval: PriceDuration::FiveDays,
created: Utc::now(),
status: SubscriptionStatus::Unprovisioned,
metadata: None, // TODO: Provision server, then add metadata
status: SubscriptionStatus::Provisioned,
metadata: Some(SubscriptionMetadata::Medal {
id: pyro_response.uuid.to_string(),
}),
};
// TODO: Insert a cancelled charge in 5 days time, `index_subscriptions` will unprovision
// the subscription.
subscription.upsert(&mut txn).await?;
// Insert an expiring charge, `index_subscriptions` will unprovision the
// subscription when expired.
DBCharge {
id: generate_charge_id(&mut txn).await?,
user_id,
price_id,
amount: price_amount.into(),
currency_code: medal_price.currency_code,
status: ChargeStatus::Expiring,
due: Utc::now() + price_duration.duration(),
last_attempt: None,
type_: ChargeType::Subscription,
subscription_id: Some(subscription.id),
subscription_interval: Some(subscription.interval),
payment_platform: PaymentPlatform::None,
payment_platform_id: None,
parent_charge_id: None,
net: None,
}
.upsert(&mut txn)
.await?;
// Link user to offer redeemal.
let mut redeemal = UserRedeemal {
id: 0,
user_id,
offer: Offer::Medal,
redeemed: Utc::now(),
status: Status::Redeemed,
};
redeemal.insert(&mut *txn).await?;
txn.commit().await?;
Ok(HttpResponse::Ok().finish())