From c998d2566ed491ea6680d07002b6f2e14c31c34e Mon Sep 17 00:00:00 2001 From: Josiah Glosson Date: Sat, 15 Mar 2025 09:28:20 -0500 Subject: [PATCH] Allow multiple labrinth instances (#3360) * Move a lot of scheduled tasks to be runnable from the command-line * Use pubsub to handle sockets connected to multiple Labrinths * Clippy fix * Fix build and merge some stuff * Fix build fmt : --------- Signed-off-by: Jai Agrawal <18202329+Geometrically@users.noreply.github.com> Co-authored-by: Jai A Co-authored-by: Jai Agrawal <18202329+Geometrically@users.noreply.github.com> --- Cargo.lock | 1 + ...179cc9c02b1c0364319e76454dff713abdd45.json | 14 + ...bd168aaa2f791305f30adbf3b002ba39da7fa.json | 14 + ...a6e45856841256d812ce9ae3c07f903c5cc62.json | 14 - ...2800a0a43dfef4a37a5725403d33ccb20d908.json | 15 - ...f9e8193eb240501d30d5ffb4129e2103efd3d.json | 14 - ...95ff0b6eb1d3c4350d8e025d39d927d4547fc.json | 15 + apps/labrinth/Cargo.toml | 2 + apps/labrinth/src/background_task.rs | 278 +++++++++ apps/labrinth/src/database/redis.rs | 31 +- apps/labrinth/src/lib.rs | 219 ++++--- apps/labrinth/src/main.rs | 43 +- apps/labrinth/src/routes/internal/billing.rs | 552 +++++++++--------- apps/labrinth/src/routes/internal/statuses.rs | 162 +++-- apps/labrinth/src/routes/mod.rs | 6 +- apps/labrinth/src/routes/v3/friends.rs | 26 +- apps/labrinth/src/scheduler.rs | 177 ------ apps/labrinth/src/sync/friends.rs | 87 +++ apps/labrinth/src/sync/mod.rs | 2 + apps/labrinth/src/sync/status.rs | 71 +++ apps/labrinth/tests/common/mod.rs | 5 + 21 files changed, 1056 insertions(+), 692 deletions(-) create mode 100644 apps/labrinth/.sqlx/query-41ec8301348dc912d0e5a16def1179cc9c02b1c0364319e76454dff713abdd45.json create mode 100644 apps/labrinth/.sqlx/query-4ce906f3bec42a2d4b9ed8b8481bd168aaa2f791305f30adbf3b002ba39da7fa.json delete mode 100644 apps/labrinth/.sqlx/query-b971cecafab7046c5952447fd78a6e45856841256d812ce9ae3c07f903c5cc62.json delete mode 100644 apps/labrinth/.sqlx/query-bd26a27ce80ca796ae19bc709c92800a0a43dfef4a37a5725403d33ccb20d908.json delete mode 100644 apps/labrinth/.sqlx/query-c8a27a122160a0896914c786deef9e8193eb240501d30d5ffb4129e2103efd3d.json create mode 100644 apps/labrinth/.sqlx/query-f2525e9be3b90fc0c42c8333ca795ff0b6eb1d3c4350d8e025d39d927d4547fc.json create mode 100644 apps/labrinth/src/background_task.rs create mode 100644 apps/labrinth/src/sync/friends.rs create mode 100644 apps/labrinth/src/sync/mod.rs create mode 100644 apps/labrinth/src/sync/status.rs diff --git a/Cargo.lock b/Cargo.lock index 55f13a32d..3396996df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4522,6 +4522,7 @@ dependencies = [ "bytes 1.7.2", "censor", "chrono", + "clap", "clickhouse", "color-thief", "console-subscriber", diff --git a/apps/labrinth/.sqlx/query-41ec8301348dc912d0e5a16def1179cc9c02b1c0364319e76454dff713abdd45.json b/apps/labrinth/.sqlx/query-41ec8301348dc912d0e5a16def1179cc9c02b1c0364319e76454dff713abdd45.json new file mode 100644 index 000000000..117ee6ed2 --- /dev/null +++ b/apps/labrinth/.sqlx/query-41ec8301348dc912d0e5a16def1179cc9c02b1c0364319e76454dff713abdd45.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE versions\n SET status = requested_status\n WHERE status = $1 AND date_published < CURRENT_DATE AND requested_status IS NOT NULL\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [] + }, + "hash": "41ec8301348dc912d0e5a16def1179cc9c02b1c0364319e76454dff713abdd45" +} diff --git a/apps/labrinth/.sqlx/query-4ce906f3bec42a2d4b9ed8b8481bd168aaa2f791305f30adbf3b002ba39da7fa.json b/apps/labrinth/.sqlx/query-4ce906f3bec42a2d4b9ed8b8481bd168aaa2f791305f30adbf3b002ba39da7fa.json new file mode 100644 index 000000000..ec8e1a23b --- /dev/null +++ b/apps/labrinth/.sqlx/query-4ce906f3bec42a2d4b9ed8b8481bd168aaa2f791305f30adbf3b002ba39da7fa.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE mods\n SET status = requested_status\n WHERE status = $1 AND approved < CURRENT_DATE AND requested_status IS NOT NULL\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [] + }, + "hash": "4ce906f3bec42a2d4b9ed8b8481bd168aaa2f791305f30adbf3b002ba39da7fa" +} diff --git a/apps/labrinth/.sqlx/query-b971cecafab7046c5952447fd78a6e45856841256d812ce9ae3c07f903c5cc62.json b/apps/labrinth/.sqlx/query-b971cecafab7046c5952447fd78a6e45856841256d812ce9ae3c07f903c5cc62.json deleted file mode 100644 index be3795083..000000000 --- a/apps/labrinth/.sqlx/query-b971cecafab7046c5952447fd78a6e45856841256d812ce9ae3c07f903c5cc62.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE mods\n SET status = requested_status\n WHERE status = $1 AND approved < CURRENT_DATE AND requested_status IS NOT NULL\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Text" - ] - }, - "nullable": [] - }, - "hash": "b971cecafab7046c5952447fd78a6e45856841256d812ce9ae3c07f903c5cc62" -} diff --git a/apps/labrinth/.sqlx/query-bd26a27ce80ca796ae19bc709c92800a0a43dfef4a37a5725403d33ccb20d908.json b/apps/labrinth/.sqlx/query-bd26a27ce80ca796ae19bc709c92800a0a43dfef4a37a5725403d33ccb20d908.json deleted file mode 100644 index c0c2cbe97..000000000 --- a/apps/labrinth/.sqlx/query-bd26a27ce80ca796ae19bc709c92800a0a43dfef4a37a5725403d33ccb20d908.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE users\n SET badges = $1\n WHERE (id = $2)\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Int8" - ] - }, - "nullable": [] - }, - "hash": "bd26a27ce80ca796ae19bc709c92800a0a43dfef4a37a5725403d33ccb20d908" -} diff --git a/apps/labrinth/.sqlx/query-c8a27a122160a0896914c786deef9e8193eb240501d30d5ffb4129e2103efd3d.json b/apps/labrinth/.sqlx/query-c8a27a122160a0896914c786deef9e8193eb240501d30d5ffb4129e2103efd3d.json deleted file mode 100644 index bebb6425a..000000000 --- a/apps/labrinth/.sqlx/query-c8a27a122160a0896914c786deef9e8193eb240501d30d5ffb4129e2103efd3d.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE versions\n SET status = requested_status\n WHERE status = $1 AND date_published < CURRENT_DATE AND requested_status IS NOT NULL\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Text" - ] - }, - "nullable": [] - }, - "hash": "c8a27a122160a0896914c786deef9e8193eb240501d30d5ffb4129e2103efd3d" -} diff --git a/apps/labrinth/.sqlx/query-f2525e9be3b90fc0c42c8333ca795ff0b6eb1d3c4350d8e025d39d927d4547fc.json b/apps/labrinth/.sqlx/query-f2525e9be3b90fc0c42c8333ca795ff0b6eb1d3c4350d8e025d39d927d4547fc.json new file mode 100644 index 000000000..7b1913392 --- /dev/null +++ b/apps/labrinth/.sqlx/query-f2525e9be3b90fc0c42c8333ca795ff0b6eb1d3c4350d8e025d39d927d4547fc.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE users\n SET badges = $1\n WHERE (id = $2)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "f2525e9be3b90fc0c42c8333ca795ff0b6eb1d3c4350d8e025d39d927d4547fc" +} diff --git a/apps/labrinth/Cargo.toml b/apps/labrinth/Cargo.toml index 584b8b935..07bb1fbfe 100644 --- a/apps/labrinth/Cargo.toml +++ b/apps/labrinth/Cargo.toml @@ -131,6 +131,8 @@ json-patch = "*" ariadne = { path = "../../packages/ariadne" } +clap = { version = "4.5", features = ["derive"] } + [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = { version = "0.6.0", features = ["profiling", "unprefixed_malloc_on_supported_platforms"] } tikv-jemalloc-ctl = { version = "0.6.0", features = ["stats"] } diff --git a/apps/labrinth/src/background_task.rs b/apps/labrinth/src/background_task.rs new file mode 100644 index 000000000..c534a83a4 --- /dev/null +++ b/apps/labrinth/src/background_task.rs @@ -0,0 +1,278 @@ +use crate::database::redis::RedisPool; +use crate::queue::payouts::process_payout; +use crate::search; +use crate::search::indexing::index_projects; +use clap::ValueEnum; +use sqlx::Postgres; +use tracing::{info, warn}; + +#[derive(ValueEnum, Debug, Copy, Clone, PartialEq, Eq)] +#[clap(rename_all = "kebab_case")] +pub enum BackgroundTask { + IndexSearch, + ReleaseScheduled, + UpdateVersions, + Payouts, + IndexBilling, + IndexSubscriptions, +} + +impl BackgroundTask { + pub async fn run( + self, + pool: sqlx::Pool, + redis_pool: RedisPool, + search_config: search::SearchConfig, + clickhouse: clickhouse::Client, + stripe_client: stripe::Client, + ) { + use BackgroundTask::*; + match self { + IndexSearch => index_search(pool, redis_pool, search_config).await, + ReleaseScheduled => release_scheduled(pool).await, + UpdateVersions => update_versions(pool, redis_pool).await, + Payouts => payouts(pool, clickhouse).await, + IndexBilling => { + crate::routes::internal::billing::index_billing( + stripe_client, + pool, + redis_pool, + ) + .await + } + IndexSubscriptions => { + crate::routes::internal::billing::index_subscriptions( + pool, redis_pool, + ) + .await + } + } + } +} + +pub async fn index_search( + pool: sqlx::Pool, + redis_pool: RedisPool, + search_config: search::SearchConfig, +) { + info!("Indexing local database"); + let result = index_projects(pool, redis_pool, &search_config).await; + if let Err(e) = result { + warn!("Local project indexing failed: {:?}", e); + } + info!("Done indexing local database"); +} + +pub async fn release_scheduled(pool: sqlx::Pool) { + info!("Releasing scheduled versions/projects!"); + + let projects_results = sqlx::query!( + " + UPDATE mods + SET status = requested_status + WHERE status = $1 AND approved < CURRENT_DATE AND requested_status IS NOT NULL + ", + crate::models::projects::ProjectStatus::Scheduled.as_str(), + ) + .execute(&pool) + .await; + + if let Err(e) = projects_results { + warn!("Syncing scheduled releases for projects failed: {:?}", e); + } + + let versions_results = sqlx::query!( + " + UPDATE versions + SET status = requested_status + WHERE status = $1 AND date_published < CURRENT_DATE AND requested_status IS NOT NULL + ", + crate::models::projects::VersionStatus::Scheduled.as_str(), + ) + .execute(&pool) + .await; + + if let Err(e) = versions_results { + warn!("Syncing scheduled releases for versions failed: {:?}", e); + } + + info!("Finished releasing scheduled versions/projects"); +} + +pub async fn update_versions( + pool: sqlx::Pool, + redis_pool: RedisPool, +) { + info!("Indexing game versions list from Mojang"); + let result = version_updater::update_versions(&pool, &redis_pool).await; + if let Err(e) = result { + warn!("Version update failed: {}", e); + } + info!("Done indexing game versions"); +} + +pub async fn payouts( + pool: sqlx::Pool, + clickhouse: clickhouse::Client, +) { + info!("Started running payouts"); + let result = process_payout(&pool, &clickhouse).await; + if let Err(e) = result { + warn!("Payouts run failed: {:?}", e); + } + info!("Done running payouts"); +} + +mod version_updater { + use crate::database::models::legacy_loader_fields::MinecraftGameVersion; + use crate::database::redis::RedisPool; + use chrono::{DateTime, Utc}; + use serde::Deserialize; + use sqlx::Postgres; + use thiserror::Error; + use tracing::warn; + + #[derive(Deserialize)] + struct InputFormat<'a> { + // latest: LatestFormat, + versions: Vec>, + } + + #[derive(Deserialize)] + struct VersionFormat<'a> { + id: String, + #[serde(rename = "type")] + type_: std::borrow::Cow<'a, str>, + #[serde(rename = "releaseTime")] + release_time: DateTime, + } + + #[derive(Error, Debug)] + pub enum VersionIndexingError { + #[error("Network error while updating game versions list: {0}")] + NetworkError(#[from] reqwest::Error), + #[error("Database error while updating game versions list: {0}")] + DatabaseError(#[from] crate::database::models::DatabaseError), + } + + pub async fn update_versions( + pool: &sqlx::Pool, + redis: &RedisPool, + ) -> Result<(), VersionIndexingError> { + let input = reqwest::get( + "https://piston-meta.mojang.com/mc/game/version_manifest_v2.json", + ) + .await? + .json::() + .await?; + + let mut skipped_versions_count = 0u32; + + // A list of version names that contains spaces. + // Generated using the command + // ```sh + // curl https://launchermeta.mojang.com/mc/game/version_manifest.json \ + // | jq '[.versions[].id | select(contains(" "))]' + // ``` + const HALL_OF_SHAME: [(&str, &str); 12] = [ + ("1.14.2 Pre-Release 4", "1.14.2-pre4"), + ("1.14.2 Pre-Release 3", "1.14.2-pre3"), + ("1.14.2 Pre-Release 2", "1.14.2-pre2"), + ("1.14.2 Pre-Release 1", "1.14.2-pre1"), + ("1.14.1 Pre-Release 2", "1.14.1-pre2"), + ("1.14.1 Pre-Release 1", "1.14.1-pre1"), + ("1.14 Pre-Release 5", "1.14-pre5"), + ("1.14 Pre-Release 4", "1.14-pre4"), + ("1.14 Pre-Release 3", "1.14-pre3"), + ("1.14 Pre-Release 2", "1.14-pre2"), + ("1.14 Pre-Release 1", "1.14-pre1"), + ("3D Shareware v1.34", "3D-Shareware-v1.34"), + ]; + + lazy_static::lazy_static! { + /// Mojank for some reason has versions released at the same DateTime. This hardcodes them to fix this, + /// as most of our ordering logic is with DateTime + static ref HALL_OF_SHAME_2: [(&'static str, DateTime); 4] = [ + ( + "1.4.5", + chrono::DateTime::parse_from_rfc3339("2012-12-19T22:00:00+00:00") + .unwrap() + .into(), + ), + ( + "1.4.6", + chrono::DateTime::parse_from_rfc3339("2012-12-19T22:00:01+00:00") + .unwrap() + .into(), + ), + ( + "1.6.3", + chrono::DateTime::parse_from_rfc3339("2013-09-13T10:54:41+00:00") + .unwrap() + .into(), + ), + ( + "13w37b", + chrono::DateTime::parse_from_rfc3339("2013-09-13T10:54:42+00:00") + .unwrap() + .into(), + ), + ]; + } + + for version in input.versions.into_iter() { + let mut name = version.id; + if !name + .chars() + .all(|c| c.is_ascii_alphanumeric() || "-_.".contains(c)) + { + if let Some((_, alternate)) = + HALL_OF_SHAME.iter().find(|(version, _)| name == *version) + { + name = String::from(*alternate); + } else { + // We'll deal with these manually + skipped_versions_count += 1; + continue; + } + } + + let type_ = match &*version.type_ { + "release" => "release", + "snapshot" => "snapshot", + "old_alpha" => "alpha", + "old_beta" => "beta", + _ => "other", + }; + + MinecraftGameVersion::builder() + .version(&name)? + .version_type(type_)? + .created( + if let Some((_, alternate)) = HALL_OF_SHAME_2 + .iter() + .find(|(version, _)| name == *version) + { + alternate + } else { + &version.release_time + }, + ) + .insert(pool, redis) + .await?; + } + + if skipped_versions_count > 0 { + // This will currently always trigger due to 1.14 pre releases + // and the shareware april fools update. We could set a threshold + // that accounts for those versions and update it whenever we + // manually fix another version. + warn!( + "Skipped {} game versions; check for new versions and add them manually", + skipped_versions_count + ); + } + + Ok(()) + } +} diff --git a/apps/labrinth/src/database/redis.rs b/apps/labrinth/src/database/redis.rs index 3fe92a53d..9a0409071 100644 --- a/apps/labrinth/src/database/redis.rs +++ b/apps/labrinth/src/database/redis.rs @@ -19,6 +19,7 @@ const ACTUAL_EXPIRY: i64 = 60 * 30; // 30 minutes #[derive(Clone)] pub struct RedisPool { + pub url: String, pub pool: deadpool_redis::Pool, meta_namespace: String, } @@ -33,23 +34,23 @@ impl RedisPool { // testing pool uses a hashmap to mimic redis behaviour for very small data sizes (ie: tests) // PANICS: production pool will panic if redis url is not set pub fn new(meta_namespace: Option) -> Self { - let redis_pool = Config::from_url( - dotenvy::var("REDIS_URL").expect("Redis URL not set"), - ) - .builder() - .expect("Error building Redis pool") - .max_size( - dotenvy::var("REDIS_MAX_CONNECTIONS") - .ok() - .and_then(|x| x.parse().ok()) - .unwrap_or(10000), - ) - .runtime(Runtime::Tokio1) - .build() - .expect("Redis connection failed"); + let url = dotenvy::var("REDIS_URL").expect("Redis URL not set"); + let pool = Config::from_url(url.clone()) + .builder() + .expect("Error building Redis pool") + .max_size( + dotenvy::var("REDIS_MAX_CONNECTIONS") + .ok() + .and_then(|x| x.parse().ok()) + .unwrap_or(10000), + ) + .runtime(Runtime::Tokio1) + .build() + .expect("Redis connection failed"); RedisPool { - pool: redis_pool, + url, + pool, meta_namespace: meta_namespace.unwrap_or("".to_string()), } } diff --git a/apps/labrinth/src/lib.rs b/apps/labrinth/src/lib.rs index bfaf95d81..97052fb78 100644 --- a/apps/labrinth/src/lib.rs +++ b/apps/labrinth/src/lib.rs @@ -17,15 +17,14 @@ use governor::middleware::StateInformationMiddleware; use governor::{Quota, RateLimiter}; use util::cors::default_cors; +use crate::background_task::update_versions; use crate::queue::moderation::AutomatedModerationQueue; +use crate::util::env::{parse_strings_from_var, parse_var}; use crate::util::ratelimit::KeyedRateLimiter; -use crate::{ - queue::payouts::process_payout, - search::indexing::index_projects, - util::env::{parse_strings_from_var, parse_var}, -}; +use sync::friends::handle_pubsub; pub mod auth; +pub mod background_task; pub mod clickhouse; pub mod database; pub mod file_hosting; @@ -34,6 +33,7 @@ pub mod queue; pub mod routes; pub mod scheduler; pub mod search; +pub mod sync; pub mod util; pub mod validate; @@ -61,6 +61,7 @@ pub struct LabrinthConfig { pub stripe_client: stripe::Client, } +#[allow(clippy::too_many_arguments)] pub fn app_setup( pool: sqlx::Pool, redis_pool: RedisPool, @@ -68,6 +69,8 @@ pub fn app_setup( clickhouse: &mut Client, file_host: Arc, maxmind: Arc, + stripe_client: stripe::Client, + enable_background_tasks: bool, ) -> LabrinthConfig { info!( "Starting Labrinth on {}", @@ -109,88 +112,97 @@ pub fn app_setup( async move {} }); - // The interval in seconds at which the local database is indexed - // for searching. Defaults to 1 hour if unset. - let local_index_interval = std::time::Duration::from_secs( - parse_var("LOCAL_INDEX_INTERVAL").unwrap_or(3600), - ); - - let pool_ref = pool.clone(); - let search_config_ref = search_config.clone(); - let redis_pool_ref = redis_pool.clone(); - scheduler.run(local_index_interval, move || { - let pool_ref = pool_ref.clone(); - let redis_pool_ref = redis_pool_ref.clone(); - let search_config_ref = search_config_ref.clone(); - async move { - info!("Indexing local database"); - let result = index_projects( - pool_ref, - redis_pool_ref.clone(), - &search_config_ref, - ) - .await; - if let Err(e) = result { - warn!("Local project indexing failed: {:?}", e); + if enable_background_tasks { + // The interval in seconds at which the local database is indexed + // for searching. Defaults to 1 hour if unset. + let local_index_interval = Duration::from_secs( + parse_var("LOCAL_INDEX_INTERVAL").unwrap_or(3600), + ); + let pool_ref = pool.clone(); + let search_config_ref = search_config.clone(); + let redis_pool_ref = redis_pool.clone(); + scheduler.run(local_index_interval, move || { + let pool_ref = pool_ref.clone(); + let redis_pool_ref = redis_pool_ref.clone(); + let search_config_ref = search_config_ref.clone(); + async move { + background_task::index_search( + pool_ref, + redis_pool_ref, + search_config_ref, + ) + .await; } - info!("Done indexing local database"); - } - }); + }); - // Changes statuses of scheduled projects/versions - let pool_ref = pool.clone(); - // TODO: Clear cache when these are run - scheduler.run(std::time::Duration::from_secs(60 * 5), move || { - let pool_ref = pool_ref.clone(); - info!("Releasing scheduled versions/projects!"); - - async move { - let projects_results = sqlx::query!( - " - UPDATE mods - SET status = requested_status - WHERE status = $1 AND approved < CURRENT_DATE AND requested_status IS NOT NULL - ", - crate::models::projects::ProjectStatus::Scheduled.as_str(), - ) - .execute(&pool_ref) - .await; - - if let Err(e) = projects_results { - warn!("Syncing scheduled releases for projects failed: {:?}", e); + // Changes statuses of scheduled projects/versions + let pool_ref = pool.clone(); + // TODO: Clear cache when these are run + scheduler.run(Duration::from_secs(60 * 5), move || { + let pool_ref = pool_ref.clone(); + async move { + background_task::release_scheduled(pool_ref).await; } + }); - let versions_results = sqlx::query!( - " - UPDATE versions - SET status = requested_status - WHERE status = $1 AND date_published < CURRENT_DATE AND requested_status IS NOT NULL - ", - crate::models::projects::VersionStatus::Scheduled.as_str(), - ) - .execute(&pool_ref) - .await; - - if let Err(e) = versions_results { - warn!("Syncing scheduled releases for versions failed: {:?}", e); + let version_index_interval = Duration::from_secs( + parse_var("VERSION_INDEX_INTERVAL").unwrap_or(1800), + ); + let pool_ref = pool.clone(); + let redis_pool_ref = redis_pool.clone(); + scheduler.run(version_index_interval, move || { + let pool_ref = pool_ref.clone(); + let redis = redis_pool_ref.clone(); + async move { + update_versions(pool_ref, redis).await; } + }); - info!("Finished releasing scheduled versions/projects"); - } - }); + let pool_ref = pool.clone(); + let client_ref = clickhouse.clone(); + scheduler.run(Duration::from_secs(60 * 60 * 6), move || { + let pool_ref = pool_ref.clone(); + let client_ref = client_ref.clone(); + async move { + background_task::payouts(pool_ref, client_ref).await; + } + }); - scheduler::schedule_versions( - &mut scheduler, - pool.clone(), - redis_pool.clone(), - ); + let pool_ref = pool.clone(); + let redis_ref = redis_pool.clone(); + let stripe_client_ref = stripe_client.clone(); + actix_rt::spawn(async move { + loop { + routes::internal::billing::index_billing( + stripe_client_ref.clone(), + pool_ref.clone(), + redis_ref.clone(), + ) + .await; + tokio::time::sleep(Duration::from_secs(60 * 5)).await; + } + }); + + let pool_ref = pool.clone(); + let redis_ref = redis_pool.clone(); + actix_rt::spawn(async move { + loop { + routes::internal::billing::index_subscriptions( + pool_ref.clone(), + redis_ref.clone(), + ) + .await; + tokio::time::sleep(Duration::from_secs(60 * 5)).await; + } + }); + } let session_queue = web::Data::new(AuthQueue::new()); let pool_ref = pool.clone(); let redis_ref = redis_pool.clone(); let session_queue_ref = session_queue.clone(); - scheduler.run(std::time::Duration::from_secs(60 * 30), move || { + scheduler.run(Duration::from_secs(60 * 30), move || { let pool_ref = pool_ref.clone(); let redis_ref = redis_ref.clone(); let session_queue_ref = session_queue_ref.clone(); @@ -208,7 +220,7 @@ pub fn app_setup( let reader = maxmind.clone(); { let reader_ref = reader; - scheduler.run(std::time::Duration::from_secs(60 * 60 * 24), move || { + scheduler.run(Duration::from_secs(60 * 60 * 24), move || { let reader_ref = reader_ref.clone(); async move { @@ -232,7 +244,7 @@ pub fn app_setup( let analytics_queue_ref = analytics_queue.clone(); let pool_ref = pool.clone(); let redis_ref = redis_pool.clone(); - scheduler.run(std::time::Duration::from_secs(15), move || { + scheduler.run(Duration::from_secs(15), move || { let client_ref = client_ref.clone(); let analytics_queue_ref = analytics_queue_ref.clone(); let pool_ref = pool_ref.clone(); @@ -251,51 +263,6 @@ pub fn app_setup( }); } - { - let pool_ref = pool.clone(); - let client_ref = clickhouse.clone(); - scheduler.run(std::time::Duration::from_secs(60 * 60 * 6), move || { - let pool_ref = pool_ref.clone(); - let client_ref = client_ref.clone(); - - async move { - info!("Started running payouts"); - let result = process_payout(&pool_ref, &client_ref).await; - if let Err(e) = result { - warn!("Payouts run failed: {:?}", e); - } - info!("Done running payouts"); - } - }); - } - - let stripe_client = - stripe::Client::new(dotenvy::var("STRIPE_API_KEY").unwrap()); - { - let pool_ref = pool.clone(); - let redis_ref = redis_pool.clone(); - let stripe_client_ref = stripe_client.clone(); - - actix_rt::spawn(async move { - routes::internal::billing::task( - stripe_client_ref, - pool_ref, - redis_ref, - ) - .await; - }); - } - - { - let pool_ref = pool.clone(); - let redis_ref = redis_pool.clone(); - - actix_rt::spawn(async move { - routes::internal::billing::subscription_task(pool_ref, redis_ref) - .await; - }); - } - let ip_salt = Pepper { pepper: ariadne::ids::Base62Id(ariadne::ids::random_base62(11)) .to_string(), @@ -304,6 +271,16 @@ pub fn app_setup( let payouts_queue = web::Data::new(PayoutsQueue::new()); let active_sockets = web::Data::new(ActiveSockets::default()); + { + let pool = pool.clone(); + let redis_client = redis::Client::open(redis_pool.url.clone()).unwrap(); + let sockets = active_sockets.clone(); + actix_rt::spawn(async move { + let pubsub = redis_client.get_async_pubsub().await.unwrap(); + handle_pubsub(pubsub, pool, sockets).await; + }); + } + LabrinthConfig { pool, redis_pool, diff --git a/apps/labrinth/src/main.rs b/apps/labrinth/src/main.rs index e6058eed6..f45b3609c 100644 --- a/apps/labrinth/src/main.rs +++ b/apps/labrinth/src/main.rs @@ -1,5 +1,7 @@ use actix_web::{App, HttpServer}; use actix_web_prom::PrometheusMetricsBuilder; +use clap::Parser; +use labrinth::background_task::BackgroundTask; use labrinth::database::redis::RedisPool; use labrinth::file_hosting::S3Host; use labrinth::search; @@ -23,8 +25,23 @@ pub struct Pepper { pub pepper: String, } +#[derive(Parser)] +#[command(version)] +struct Args { + /// Don't run regularly scheduled background tasks. This means the tasks should be run + /// manually with --run-background-task. + #[arg(long)] + no_background_tasks: bool, + + /// Run a single background task and then exit. Perfect for cron jobs. + #[arg(long, value_enum, id = "task")] + run_background_task: Option, +} + #[actix_rt::main] async fn main() -> std::io::Result<()> { + let args = Args::parse(); + dotenvy::dotenv().ok(); console_subscriber::init(); @@ -44,10 +61,12 @@ async fn main() -> std::io::Result<()> { std::env::set_var("RUST_BACKTRACE", "1"); } - info!( - "Starting Labrinth on {}", - dotenvy::var("BIND_ADDR").unwrap() - ); + if args.run_background_task.is_none() { + info!( + "Starting Labrinth on {}", + dotenvy::var("BIND_ADDR").unwrap() + ); + } database::check_for_migrations() .await @@ -91,6 +110,18 @@ async fn main() -> std::io::Result<()> { info!("Initializing clickhouse connection"); let mut clickhouse = clickhouse::init_client().await.unwrap(); + let search_config = search::SearchConfig::new(None); + + let stripe_client = + stripe::Client::new(dotenvy::var("STRIPE_API_KEY").unwrap()); + + if let Some(task) = args.run_background_task { + info!("Running task {task:?} and exiting"); + task.run(pool, redis_pool, search_config, clickhouse, stripe_client) + .await; + return Ok(()); + } + let maxmind_reader = Arc::new(queue::maxmind::MaxMindIndexer::new().await.unwrap()); @@ -115,8 +146,6 @@ async fn main() -> std::io::Result<()> { labrinth::routes::debug::jemalloc_mmeory_stats(&prometheus.registry) .expect("Failed to register jemalloc metrics"); - let search_config = search::SearchConfig::new(None); - let labrinth_config = labrinth::app_setup( pool.clone(), redis_pool.clone(), @@ -124,6 +153,8 @@ async fn main() -> std::io::Result<()> { &mut clickhouse, file_host.clone(), maxmind_reader.clone(), + stripe_client, + !args.no_background_tasks, ); info!("Starting Actix HTTP server!"); diff --git a/apps/labrinth/src/routes/internal/billing.rs b/apps/labrinth/src/routes/internal/billing.rs index 036c3c3db..469555946 100644 --- a/apps/labrinth/src/routes/internal/billing.rs +++ b/apps/labrinth/src/routes/internal/billing.rs @@ -2091,323 +2091,331 @@ async fn get_or_create_customer( } } -pub async fn subscription_task(pool: PgPool, redis: RedisPool) { - loop { - info!("Indexing subscriptions"); +pub async fn index_subscriptions(pool: PgPool, redis: RedisPool) { + info!("Indexing subscriptions"); - let res = async { - let mut transaction = pool.begin().await?; - let mut clear_cache_users = Vec::new(); + let res = async { + let mut transaction = pool.begin().await?; + let mut clear_cache_users = Vec::new(); - // If an active subscription has a canceled charge OR a failed charge more than two days ago, it should be cancelled - let all_charges = ChargeItem::get_unprovision(&pool).await?; + // If an active subscription has a canceled charge OR a failed charge more than two days ago, it should be cancelled + let all_charges = ChargeItem::get_unprovision(&pool).await?; - let mut all_subscriptions = - user_subscription_item::UserSubscriptionItem::get_many( - &all_charges - .iter() - .filter_map(|x| x.subscription_id) - .collect::>() - .into_iter() - .collect::>(), - &pool, - ) - .await?; - let subscription_prices = product_item::ProductPriceItem::get_many( - &all_subscriptions + let mut all_subscriptions = + user_subscription_item::UserSubscriptionItem::get_many( + &all_charges .iter() - .map(|x| x.price_id) + .filter_map(|x| x.subscription_id) .collect::>() .into_iter() .collect::>(), &pool, ) .await?; - let subscription_products = product_item::ProductItem::get_many( - &subscription_prices - .iter() - .map(|x| x.product_id) - .collect::>() - .into_iter() - .collect::>(), - &pool, - ) - .await?; - let users = crate::database::models::User::get_many_ids( - &all_subscriptions - .iter() - .map(|x| x.user_id) - .collect::>() - .into_iter() - .collect::>(), - &pool, - &redis, - ) - .await?; + let subscription_prices = product_item::ProductPriceItem::get_many( + &all_subscriptions + .iter() + .map(|x| x.price_id) + .collect::>() + .into_iter() + .collect::>(), + &pool, + ) + .await?; + let subscription_products = product_item::ProductItem::get_many( + &subscription_prices + .iter() + .map(|x| x.product_id) + .collect::>() + .into_iter() + .collect::>(), + &pool, + ) + .await?; + let users = crate::database::models::User::get_many_ids( + &all_subscriptions + .iter() + .map(|x| x.user_id) + .collect::>() + .into_iter() + .collect::>(), + &pool, + &redis, + ) + .await?; - for charge in all_charges { - let subscription = if let Some(subscription) = all_subscriptions - .iter_mut() - .find(|x| Some(x.id) == charge.subscription_id) - { - subscription - } else { - continue; - }; + for charge in all_charges { + let subscription = if let Some(subscription) = all_subscriptions + .iter_mut() + .find(|x| Some(x.id) == charge.subscription_id) + { + subscription + } else { + continue; + }; - if subscription.status == SubscriptionStatus::Unprovisioned { - continue; + if subscription.status == SubscriptionStatus::Unprovisioned { + continue; + } + + let product_price = if let Some(product_price) = subscription_prices + .iter() + .find(|x| x.id == subscription.price_id) + { + product_price + } else { + continue; + }; + + let product = if let Some(product) = subscription_products + .iter() + .find(|x| x.id == product_price.product_id) + { + product + } else { + continue; + }; + + let user = if let Some(user) = + users.iter().find(|x| x.id == subscription.user_id) + { + user + } else { + continue; + }; + + let unprovisioned = match product.metadata { + ProductMetadata::Midas => { + let badges = user.badges - Badges::MIDAS; + + sqlx::query!( + " + UPDATE users + SET badges = $1 + WHERE (id = $2) + ", + badges.bits() as i64, + user.id as crate::database::models::ids::UserId, + ) + .execute(&mut *transaction) + .await?; + + true } + ProductMetadata::Pyro { .. } => { + if let Some(SubscriptionMetadata::Pyro { id }) = + &subscription.metadata + { + let res = reqwest::Client::new() + .post(format!( + "{}/modrinth/v0/servers/{}/suspend", + dotenvy::var("ARCHON_URL")?, + id + )) + .header("X-Master-Key", dotenvy::var("PYRO_API_KEY")?) + .json(&serde_json::json!({ + "reason": if charge.status == ChargeStatus::Cancelled { + "cancelled" + } else { + "paymentfailed" + } + })) + .send() + .await; - let product_price = if let Some(product_price) = - subscription_prices - .iter() - .find(|x| x.id == subscription.price_id) - { - product_price - } else { - continue; - }; - - let product = if let Some(product) = subscription_products - .iter() - .find(|x| x.id == product_price.product_id) - { - product - } else { - continue; - }; - - let user = if let Some(user) = - users.iter().find(|x| x.id == subscription.user_id) - { - user - } else { - continue; - }; - - let unprovisioned = match product.metadata { - ProductMetadata::Midas => { - let badges = user.badges - Badges::MIDAS; - - sqlx::query!( - " - UPDATE users - SET badges = $1 - WHERE (id = $2) - ", - badges.bits() as i64, - user.id as crate::database::models::ids::UserId, - ) - .execute(&mut *transaction) - .await?; - - true - } - ProductMetadata::Pyro { .. } => { - if let Some(SubscriptionMetadata::Pyro { id }) = - &subscription.metadata - { - let res = reqwest::Client::new() - .post(format!( - "{}/modrinth/v0/servers/{}/suspend", - dotenvy::var("ARCHON_URL")?, - id - )) - .header("X-Master-Key", dotenvy::var("PYRO_API_KEY")?) - .json(&serde_json::json!({ - "reason": if charge.status == ChargeStatus::Cancelled { - "cancelled" - } else { - "paymentfailed" - } - })) - .send() - .await; - - if let Err(e) = res { - warn!("Error suspending pyro server: {:?}", e); - false - } else { - true - } + if let Err(e) = res { + warn!("Error suspending pyro server: {:?}", e); + false } else { true } + } else { + true } - }; - - if unprovisioned { - subscription.status = SubscriptionStatus::Unprovisioned; - subscription.upsert(&mut transaction).await?; } + }; - clear_cache_users.push(user.id); + if unprovisioned { + subscription.status = SubscriptionStatus::Unprovisioned; + subscription.upsert(&mut transaction).await?; } - crate::database::models::User::clear_caches( - &clear_cache_users - .into_iter() - .map(|x| (x, None)) - .collect::>(), - &redis, - ) - .await?; - transaction.commit().await?; - - Ok::<(), ApiError>(()) - }; - - if let Err(e) = res.await { - warn!("Error indexing billing queue: {:?}", e); + clear_cache_users.push(user.id); } - info!("Done indexing billing queue"); + crate::database::models::User::clear_caches( + &clear_cache_users + .into_iter() + .map(|x| (x, None)) + .collect::>(), + &redis, + ) + .await?; + transaction.commit().await?; - tokio::time::sleep(std::time::Duration::from_secs(60 * 5)).await; + Ok::<(), ApiError>(()) + }; + + if let Err(e) = res.await { + warn!("Error indexing subscriptions: {:?}", e); } + + info!("Done indexing subscriptions"); } -pub async fn task( +pub async fn index_billing( stripe_client: stripe::Client, pool: PgPool, redis: RedisPool, ) { - loop { - info!("Indexing billing queue"); - let res = async { - // If a charge is open and due or has been attempted more than two days ago, it should be processed - let charges_to_do = - crate::database::models::charge_item::ChargeItem::get_chargeable(&pool).await?; - - let prices = product_item::ProductPriceItem::get_many( - &charges_to_do - .iter() - .map(|x| x.price_id) - .collect::>() - .into_iter() - .collect::>(), + info!("Indexing billing queue"); + let res = async { + // If a charge is open and due or has been attempted more than two days ago, it should be processed + let charges_to_do = + crate::database::models::charge_item::ChargeItem::get_chargeable( &pool, ) .await?; - let users = crate::database::models::User::get_many_ids( - &charges_to_do - .iter() - .map(|x| x.user_id) - .collect::>() - .into_iter() - .collect::>(), - &pool, - &redis, - ) - .await?; + let prices = product_item::ProductPriceItem::get_many( + &charges_to_do + .iter() + .map(|x| x.price_id) + .collect::>() + .into_iter() + .collect::>(), + &pool, + ) + .await?; - let mut transaction = pool.begin().await?; + let users = crate::database::models::User::get_many_ids( + &charges_to_do + .iter() + .map(|x| x.user_id) + .collect::>() + .into_iter() + .collect::>(), + &pool, + &redis, + ) + .await?; - for mut charge in charges_to_do { - let product_price = - if let Some(price) = prices.iter().find(|x| x.id == charge.price_id) { - price + let mut transaction = pool.begin().await?; + + for mut charge in charges_to_do { + let product_price = if let Some(price) = + prices.iter().find(|x| x.id == charge.price_id) + { + price + } else { + continue; + }; + + let user = if let Some(user) = + users.iter().find(|x| x.id == charge.user_id) + { + user + } else { + continue; + }; + + let price = match &product_price.prices { + Price::OneTime { price } => Some(price), + Price::Recurring { intervals } => { + if let Some(ref interval) = charge.subscription_interval { + intervals.get(interval) } else { + warn!( + "Could not find subscription for charge {:?}", + charge.id + ); continue; - }; - - let user = if let Some(user) = users.iter().find(|x| x.id == charge.user_id) { - user - } else { - continue; - }; - - let price = match &product_price.prices { - Price::OneTime { price } => Some(price), - Price::Recurring { intervals } => { - if let Some(ref interval) = charge.subscription_interval { - intervals.get(interval) - } else { - warn!("Could not find subscription for charge {:?}", charge.id); - continue; - } } - }; - - if let Some(price) = price { - let customer_id = get_or_create_customer( - user.id.into(), - user.stripe_customer_id.as_deref(), - user.email.as_deref(), - &stripe_client, - &pool, - &redis, - ) - .await?; - - let customer = - stripe::Customer::retrieve(&stripe_client, &customer_id, &[]).await?; - - let currency = - match Currency::from_str(&product_price.currency_code.to_lowercase()) { - Ok(x) => x, - Err(_) => { - warn!( - "Could not find currency for {}", - product_price.currency_code - ); - continue; - } - }; - - let mut intent = CreatePaymentIntent::new(*price as i64, currency); - - let mut metadata = HashMap::new(); - metadata.insert( - "modrinth_user_id".to_string(), - to_base62(charge.user_id.0 as u64), - ); - metadata.insert( - "modrinth_charge_id".to_string(), - to_base62(charge.id.0 as u64), - ); - metadata.insert( - "modrinth_charge_type".to_string(), - charge.type_.as_str().to_string(), - ); - - intent.metadata = Some(metadata); - intent.customer = Some(customer.id); - - if let Some(payment_method) = customer - .invoice_settings - .and_then(|x| x.default_payment_method.map(|x| x.id())) - { - intent.payment_method = Some(payment_method); - intent.confirm = Some(true); - intent.off_session = Some(PaymentIntentOffSession::Exists(true)); - - charge.status = ChargeStatus::Processing; - - stripe::PaymentIntent::create(&stripe_client, intent).await?; - } else { - charge.status = ChargeStatus::Failed; - charge.last_attempt = Some(Utc::now()); - } - - charge.upsert(&mut transaction).await?; } + }; + + if let Some(price) = price { + let customer_id = get_or_create_customer( + user.id.into(), + user.stripe_customer_id.as_deref(), + user.email.as_deref(), + &stripe_client, + &pool, + &redis, + ) + .await?; + + let customer = stripe::Customer::retrieve( + &stripe_client, + &customer_id, + &[], + ) + .await?; + + let currency = match Currency::from_str( + &product_price.currency_code.to_lowercase(), + ) { + Ok(x) => x, + Err(_) => { + warn!( + "Could not find currency for {}", + product_price.currency_code + ); + continue; + } + }; + + let mut intent = + CreatePaymentIntent::new(*price as i64, currency); + + let mut metadata = HashMap::new(); + metadata.insert( + "modrinth_user_id".to_string(), + to_base62(charge.user_id.0 as u64), + ); + metadata.insert( + "modrinth_charge_id".to_string(), + to_base62(charge.id.0 as u64), + ); + metadata.insert( + "modrinth_charge_type".to_string(), + charge.type_.as_str().to_string(), + ); + + intent.metadata = Some(metadata); + intent.customer = Some(customer.id); + + if let Some(payment_method) = customer + .invoice_settings + .and_then(|x| x.default_payment_method.map(|x| x.id())) + { + intent.payment_method = Some(payment_method); + intent.confirm = Some(true); + intent.off_session = + Some(PaymentIntentOffSession::Exists(true)); + + charge.status = ChargeStatus::Processing; + + stripe::PaymentIntent::create(&stripe_client, intent) + .await?; + } else { + charge.status = ChargeStatus::Failed; + charge.last_attempt = Some(Utc::now()); + } + + charge.upsert(&mut transaction).await?; } - - transaction.commit().await?; - - Ok::<(), ApiError>(()) - } - .await; - - if let Err(e) = res { - warn!("Error indexing billing queue: {:?}", e); } - info!("Done indexing billing queue"); + transaction.commit().await?; - tokio::time::sleep(std::time::Duration::from_secs(60 * 5)).await; + Ok::<(), ApiError>(()) } + .await; + + if let Err(e) = res { + warn!("Error indexing billing queue: {:?}", e); + } + + info!("Done indexing billing queue"); } diff --git a/apps/labrinth/src/routes/internal/statuses.rs b/apps/labrinth/src/routes/internal/statuses.rs index 0c5c08ad6..ac5721088 100644 --- a/apps/labrinth/src/routes/internal/statuses.rs +++ b/apps/labrinth/src/routes/internal/statuses.rs @@ -9,6 +9,10 @@ use crate::queue::socket::{ ActiveSocket, ActiveSockets, SocketId, TunnelSocketType, }; use crate::routes::ApiError; +use crate::sync::friends::{RedisFriendsMessage, FRIENDS_CHANNEL_NAME}; +use crate::sync::status::{ + get_user_status, push_back_user_expiry, replace_user_status, +}; use actix_web::web::{Data, Payload}; use actix_web::{get, web, HttpRequest, HttpResponse}; use actix_ws::Message; @@ -19,10 +23,15 @@ use ariadne::networking::message::{ use ariadne::users::UserStatus; use chrono::Utc; use either::Either; +use futures_util::future::select; use futures_util::{StreamExt, TryStreamExt}; +use redis::AsyncCommands; use serde::Deserialize; use sqlx::PgPool; +use std::pin::pin; use std::sync::atomic::Ordering; +use tokio::sync::oneshot::error::TryRecvError; +use tokio::time::{sleep, Duration}; pub fn config(cfg: &mut web::ServiceConfig) { cfg.service(ws_init); @@ -62,6 +71,7 @@ pub async fn ws_init( } let user = User::from_full(db_user); + let user_id = user.id; let (res, mut session, msg_stream) = match actix_ws::handle(&req, body) { Ok(x) => x, @@ -79,19 +89,32 @@ pub async fn ws_init( .await?; let friend_statuses = if !friends.is_empty() { - friends - .iter() - .filter_map(|x| { - db.get_status( - if x.user_id == user.id.into() { - x.friend_id - } else { - x.user_id + let db = db.clone(); + let redis = redis.clone(); + tokio_stream::iter(friends.iter()) + .map(|x| { + let db = db.clone(); + let redis = redis.clone(); + async move { + async move { + get_user_status( + if x.user_id == user_id.into() { + x.friend_id + } else { + x.user_id + } + .into(), + &db, + &redis, + ) + .await } - .into(), - ) + } }) + .buffer_unordered(16) + .filter_map(|x| x) .collect::>() + .await } else { Vec::new() }; @@ -116,20 +139,42 @@ pub async fn ws_init( #[cfg(debug_assertions)] tracing::info!("Connection {socket_id} opened by {}", user.id); - broadcast_friends( - user.id, - ServerToClientMessage::StatusUpdate { status }, - &pool, - &db, - Some(friends), + replace_user_status(None, Some(&status), &redis).await?; + broadcast_friends_message( + &redis, + RedisFriendsMessage::StatusUpdate { status }, ) .await?; + let (shutdown_sender, mut shutdown_receiver) = + tokio::sync::oneshot::channel::<()>(); + + { + let db = db.clone(); + let redis = redis.clone(); + actix_web::rt::spawn(async move { + while shutdown_receiver.try_recv() == Err(TryRecvError::Empty) { + sleep(Duration::from_secs(30)).await; + if let Some(socket) = db.sockets.get(&socket_id) { + let _ = socket.socket.clone().ping(&[]).await; + } + let _ = push_back_user_expiry(user_id, &redis).await; + } + }); + } + let mut stream = msg_stream.into_stream(); actix_web::rt::spawn(async move { - // receive messages from websocket - while let Some(msg) = stream.next().await { + loop { + let next = pin!(stream.next()); + let timeout = pin!(sleep(Duration::from_secs(30))); + let futures_util::future::Either::Left((Some(msg), _)) = + select(next, timeout).await + else { + break; + }; + let message = match msg { Ok(Message::Text(text)) => { ClientToServerMessage::deserialize(Either::Left(&text)) @@ -139,10 +184,7 @@ pub async fn ws_init( ClientToServerMessage::deserialize(Either::Right(&bytes)) } - Ok(Message::Close(_)) => { - let _ = close_socket(socket_id, &pool, &db).await; - continue; - } + Ok(Message::Close(_)) => break, Ok(Message::Ping(msg)) => { if let Some(socket) = db.sockets.get(&socket_id) { @@ -162,8 +204,7 @@ pub async fn ws_init( #[cfg(debug_assertions)] if !message.is_binary() { tracing::info!( - "Received message from {socket_id}: {:?}", - message + "Received message from {socket_id}: {message:?}" ); } @@ -172,6 +213,8 @@ pub async fn ws_init( if let Some(mut pair) = db.sockets.get_mut(&socket_id) { let ActiveSocket { status, .. } = pair.value_mut(); + let old_status = status.clone(); + if status .profile_name .as_ref() @@ -188,14 +231,17 @@ pub async fn ws_init( // We drop the pair to avoid holding the lock for too long drop(pair); - let _ = broadcast_friends( - user.id, - ServerToClientMessage::StatusUpdate { + let _ = replace_user_status( + Some(&old_status), + Some(&user_status), + &redis, + ) + .await; + let _ = broadcast_friends_message( + &redis, + RedisFriendsMessage::StatusUpdate { status: user_status, }, - &pool, - &db, - None, ) .await; } @@ -247,12 +293,11 @@ pub async fn ws_init( }; match tunnel_socket.socket_type { TunnelSocketType::Listening => { - let _ = broadcast_friends( + let _ = broadcast_to_local_friends( user.id, ServerToClientMessage::FriendSocketStoppedListening { user: user.id }, &pool, &db, - None, ) .await; } @@ -308,25 +353,48 @@ pub async fn ws_init( } } - let _ = close_socket(socket_id, &pool, &db).await; + let _ = shutdown_sender.send(()); + let _ = close_socket(socket_id, &pool, &db, &redis).await; }); Ok(res) } -pub async fn broadcast_friends( +pub async fn broadcast_friends_message( + redis: &RedisPool, + message: RedisFriendsMessage, +) -> Result<(), crate::database::models::DatabaseError> { + let _: () = redis + .pool + .get() + .await? + .publish(FRIENDS_CHANNEL_NAME, message) + .await?; + Ok(()) +} + +pub async fn broadcast_to_local_friends( user_id: UserId, message: ServerToClientMessage, pool: &PgPool, sockets: &ActiveSockets, - friends: Option>, +) -> Result<(), crate::database::models::DatabaseError> { + broadcast_to_known_local_friends( + user_id, + message, + sockets, + FriendItem::get_user_friends(user_id.into(), Some(true), pool).await?, + ) + .await +} + +async fn broadcast_to_known_local_friends( + user_id: UserId, + message: ServerToClientMessage, + sockets: &ActiveSockets, + friends: Vec, ) -> Result<(), crate::database::models::DatabaseError> { // FIXME Probably shouldn't be using database errors for this. Maybe ApiError? - let friends = if let Some(friends) = friends { - friends - } else { - FriendItem::get_user_friends(user_id.into(), Some(true), pool).await? - }; for friend in friends { let friend_id = if friend.user_id == user_id.into() { @@ -387,6 +455,7 @@ pub async fn close_socket( id: SocketId, pool: &PgPool, db: &ActiveSockets, + redis: &RedisPool, ) -> Result<(), crate::database::models::DatabaseError> { if let Some((_, socket)) = db.sockets.remove(&id) { let user_id = socket.status.user_id; @@ -397,12 +466,10 @@ pub async fn close_socket( let _ = socket.socket.close(None).await; - broadcast_friends( - user_id, - ServerToClientMessage::UserOffline { id: user_id }, - pool, - db, - None, + replace_user_status(Some(&socket.status), None, redis).await?; + broadcast_friends_message( + redis, + RedisFriendsMessage::UserOffline { user: user_id }, ) .await?; @@ -414,14 +481,13 @@ pub async fn close_socket( }; match tunnel_socket.socket_type { TunnelSocketType::Listening => { - let _ = broadcast_friends( + let _ = broadcast_to_local_friends( user_id, ServerToClientMessage::SocketClosed { socket: owned_socket, }, pool, db, - None, ) .await; } diff --git a/apps/labrinth/src/routes/mod.rs b/apps/labrinth/src/routes/mod.rs index 39337d5f3..131bddabc 100644 --- a/apps/labrinth/src/routes/mod.rs +++ b/apps/labrinth/src/routes/mod.rs @@ -95,6 +95,8 @@ pub enum ApiError { Database(#[from] crate::database::models::DatabaseError), #[error("Database Error: {0}")] SqlxDatabase(#[from] sqlx::Error), + #[error("Database Error: {0}")] + RedisDatabase(#[from] redis::RedisError), #[error("Clickhouse Error: {0}")] Clickhouse(#[from] clickhouse::error::Error), #[error("Internal server error: {0}")] @@ -148,8 +150,9 @@ impl ApiError { crate::models::error::ApiError { error: match self { ApiError::Env(..) => "environment_error", - ApiError::SqlxDatabase(..) => "database_error", ApiError::Database(..) => "database_error", + ApiError::SqlxDatabase(..) => "database_error", + ApiError::RedisDatabase(..) => "database_error", ApiError::Authentication(..) => "unauthorized", ApiError::CustomAuthentication(..) => "unauthorized", ApiError::Xml(..) => "xml_error", @@ -186,6 +189,7 @@ impl actix_web::ResponseError for ApiError { ApiError::Env(..) => StatusCode::INTERNAL_SERVER_ERROR, ApiError::Database(..) => StatusCode::INTERNAL_SERVER_ERROR, ApiError::SqlxDatabase(..) => StatusCode::INTERNAL_SERVER_ERROR, + ApiError::RedisDatabase(..) => StatusCode::INTERNAL_SERVER_ERROR, ApiError::Clickhouse(..) => StatusCode::INTERNAL_SERVER_ERROR, ApiError::Authentication(..) => StatusCode::UNAUTHORIZED, ApiError::CustomAuthentication(..) => StatusCode::UNAUTHORIZED, diff --git a/apps/labrinth/src/routes/v3/friends.rs b/apps/labrinth/src/routes/v3/friends.rs index 1f7453236..56d70d568 100644 --- a/apps/labrinth/src/routes/v3/friends.rs +++ b/apps/labrinth/src/routes/v3/friends.rs @@ -5,8 +5,12 @@ use crate::models::pats::Scopes; use crate::models::users::UserFriend; use crate::queue::session::AuthQueue; use crate::queue::socket::ActiveSockets; -use crate::routes::internal::statuses::send_message_to_user; +use crate::routes::internal::statuses::{ + broadcast_friends_message, send_message_to_user, +}; use crate::routes::ApiError; +use crate::sync::friends::RedisFriendsMessage; +use crate::sync::status::get_user_status; use actix_web::{delete, get, post, web, HttpRequest, HttpResponse}; use ariadne::networking::message::ServerToClientMessage; use chrono::Utc; @@ -76,14 +80,16 @@ pub async fn add_friend( user_id: UserId, friend_id: UserId, sockets: &ActiveSockets, + redis: &RedisPool, ) -> Result<(), ApiError> { - if let Some(friend_status) = sockets.get_status(user_id.into()) + if let Some(friend_status) = + get_user_status(user_id.into(), sockets, redis).await { - send_message_to_user( - sockets, - friend_id.into(), - &ServerToClientMessage::StatusUpdate { - status: friend_status.clone(), + broadcast_friends_message( + redis, + RedisFriendsMessage::DirectStatusUpdate { + to_user: friend_id.into(), + status: friend_status, }, ) .await?; @@ -92,8 +98,10 @@ pub async fn add_friend( Ok(()) } - send_friend_status(friend.user_id, friend.friend_id, &db).await?; - send_friend_status(friend.friend_id, friend.user_id, &db).await?; + send_friend_status(friend.user_id, friend.friend_id, &db, &redis) + .await?; + send_friend_status(friend.friend_id, friend.user_id, &db, &redis) + .await?; } else { if friend.id == user.id.into() { return Err(ApiError::InvalidInput( diff --git a/apps/labrinth/src/scheduler.rs b/apps/labrinth/src/scheduler.rs index f94540392..82213effd 100644 --- a/apps/labrinth/src/scheduler.rs +++ b/apps/labrinth/src/scheduler.rs @@ -36,181 +36,4 @@ impl Drop for Scheduler { } } -use tracing::{info, warn}; - -pub fn schedule_versions( - scheduler: &mut Scheduler, - pool: sqlx::Pool, - redis: RedisPool, -) { - let version_index_interval = std::time::Duration::from_secs( - parse_var("VERSION_INDEX_INTERVAL").unwrap_or(1800), - ); - - scheduler.run(version_index_interval, move || { - let pool_ref = pool.clone(); - let redis = redis.clone(); - async move { - info!("Indexing game versions list from Mojang"); - let result = update_versions(&pool_ref, &redis).await; - if let Err(e) = result { - warn!("Version update failed: {}", e); - } - info!("Done indexing game versions"); - } - }); -} - -use thiserror::Error; - -#[derive(Error, Debug)] -pub enum VersionIndexingError { - #[error("Network error while updating game versions list: {0}")] - NetworkError(#[from] reqwest::Error), - #[error("Database error while updating game versions list: {0}")] - DatabaseError(#[from] crate::database::models::DatabaseError), -} - -use crate::{ - database::{ - models::legacy_loader_fields::MinecraftGameVersion, redis::RedisPool, - }, - util::env::parse_var, -}; -use chrono::{DateTime, Utc}; -use serde::Deserialize; use tokio_stream::wrappers::IntervalStream; - -#[derive(Deserialize)] -struct InputFormat<'a> { - // latest: LatestFormat, - versions: Vec>, -} -#[derive(Deserialize)] -struct VersionFormat<'a> { - id: String, - #[serde(rename = "type")] - type_: std::borrow::Cow<'a, str>, - #[serde(rename = "releaseTime")] - release_time: DateTime, -} - -async fn update_versions( - pool: &sqlx::Pool, - redis: &RedisPool, -) -> Result<(), VersionIndexingError> { - let input = reqwest::get( - "https://piston-meta.mojang.com/mc/game/version_manifest_v2.json", - ) - .await? - .json::() - .await?; - - let mut skipped_versions_count = 0u32; - - // A list of version names that contains spaces. - // Generated using the command - // ```sh - // curl https://launchermeta.mojang.com/mc/game/version_manifest.json \ - // | jq '[.versions[].id | select(contains(" "))]' - // ``` - const HALL_OF_SHAME: [(&str, &str); 12] = [ - ("1.14.2 Pre-Release 4", "1.14.2-pre4"), - ("1.14.2 Pre-Release 3", "1.14.2-pre3"), - ("1.14.2 Pre-Release 2", "1.14.2-pre2"), - ("1.14.2 Pre-Release 1", "1.14.2-pre1"), - ("1.14.1 Pre-Release 2", "1.14.1-pre2"), - ("1.14.1 Pre-Release 1", "1.14.1-pre1"), - ("1.14 Pre-Release 5", "1.14-pre5"), - ("1.14 Pre-Release 4", "1.14-pre4"), - ("1.14 Pre-Release 3", "1.14-pre3"), - ("1.14 Pre-Release 2", "1.14-pre2"), - ("1.14 Pre-Release 1", "1.14-pre1"), - ("3D Shareware v1.34", "3D-Shareware-v1.34"), - ]; - - lazy_static::lazy_static! { - /// Mojank for some reason has versions released at the same DateTime. This hardcodes them to fix this, - /// as most of our ordering logic is with DateTime - static ref HALL_OF_SHAME_2: [(&'static str, chrono::DateTime); 4] = [ - ( - "1.4.5", - chrono::DateTime::parse_from_rfc3339("2012-12-19T22:00:00+00:00") - .unwrap() - .into(), - ), - ( - "1.4.6", - chrono::DateTime::parse_from_rfc3339("2012-12-19T22:00:01+00:00") - .unwrap() - .into(), - ), - ( - "1.6.3", - chrono::DateTime::parse_from_rfc3339("2013-09-13T10:54:41+00:00") - .unwrap() - .into(), - ), - ( - "13w37b", - chrono::DateTime::parse_from_rfc3339("2013-09-13T10:54:42+00:00") - .unwrap() - .into(), - ), - ]; - } - - for version in input.versions.into_iter() { - let mut name = version.id; - if !name - .chars() - .all(|c| c.is_ascii_alphanumeric() || "-_.".contains(c)) - { - if let Some((_, alternate)) = - HALL_OF_SHAME.iter().find(|(version, _)| name == *version) - { - name = String::from(*alternate); - } else { - // We'll deal with these manually - skipped_versions_count += 1; - continue; - } - } - - let type_ = match &*version.type_ { - "release" => "release", - "snapshot" => "snapshot", - "old_alpha" => "alpha", - "old_beta" => "beta", - _ => "other", - }; - - MinecraftGameVersion::builder() - .version(&name)? - .version_type(type_)? - .created( - if let Some((_, alternate)) = - HALL_OF_SHAME_2.iter().find(|(version, _)| name == *version) - { - alternate - } else { - &version.release_time - }, - ) - .insert(pool, redis) - .await?; - } - - if skipped_versions_count > 0 { - // This will currently always trigger due to 1.14 pre releases - // and the shareware april fools update. We could set a threshold - // that accounts for those versions and update it whenever we - // manually fix another version. - warn!( - "Skipped {} game versions; check for new versions and add them manually", - skipped_versions_count - ); - } - - Ok(()) -} diff --git a/apps/labrinth/src/sync/friends.rs b/apps/labrinth/src/sync/friends.rs new file mode 100644 index 000000000..77f407da3 --- /dev/null +++ b/apps/labrinth/src/sync/friends.rs @@ -0,0 +1,87 @@ +use crate::queue::socket::ActiveSockets; +use crate::routes::internal::statuses::{ + broadcast_to_local_friends, send_message_to_user, +}; +use actix_web::web::Data; +use ariadne::ids::UserId; +use ariadne::networking::message::ServerToClientMessage; +use ariadne::users::UserStatus; +use redis::aio::PubSub; +use redis::{RedisWrite, ToRedisArgs}; +use serde::{Deserialize, Serialize}; +use sqlx::PgPool; +use tokio_stream::StreamExt; + +pub const FRIENDS_CHANNEL_NAME: &str = "friends"; + +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum RedisFriendsMessage { + StatusUpdate { status: UserStatus }, + UserOffline { user: UserId }, + DirectStatusUpdate { to_user: UserId, status: UserStatus }, +} + +impl ToRedisArgs for RedisFriendsMessage { + fn write_redis_args(&self, out: &mut W) + where + W: ?Sized + RedisWrite, + { + out.write_arg(&serde_json::to_vec(&self).unwrap()) + } +} + +pub async fn handle_pubsub( + mut pubsub: PubSub, + pool: PgPool, + sockets: Data, +) { + pubsub.subscribe(FRIENDS_CHANNEL_NAME).await.unwrap(); + let mut stream = pubsub.into_on_message(); + while let Some(message) = stream.next().await { + if message.get_channel_name() != FRIENDS_CHANNEL_NAME { + continue; + } + let payload = serde_json::from_slice(message.get_payload_bytes()); + + let pool = pool.clone(); + let sockets = sockets.clone(); + actix_rt::spawn(async move { + match payload { + Ok(RedisFriendsMessage::StatusUpdate { status }) => { + let _ = broadcast_to_local_friends( + status.user_id, + ServerToClientMessage::StatusUpdate { status }, + &pool, + &sockets, + ) + .await; + } + + Ok(RedisFriendsMessage::UserOffline { user }) => { + let _ = broadcast_to_local_friends( + user, + ServerToClientMessage::UserOffline { id: user }, + &pool, + &sockets, + ) + .await; + } + + Ok(RedisFriendsMessage::DirectStatusUpdate { + to_user, + status, + }) => { + let _ = send_message_to_user( + &sockets, + to_user, + &ServerToClientMessage::StatusUpdate { status }, + ) + .await; + } + + Err(_) => {} + } + }); + } +} diff --git a/apps/labrinth/src/sync/mod.rs b/apps/labrinth/src/sync/mod.rs new file mode 100644 index 000000000..e7a3cc811 --- /dev/null +++ b/apps/labrinth/src/sync/mod.rs @@ -0,0 +1,2 @@ +pub mod friends; +pub mod status; diff --git a/apps/labrinth/src/sync/status.rs b/apps/labrinth/src/sync/status.rs new file mode 100644 index 000000000..b475e28b4 --- /dev/null +++ b/apps/labrinth/src/sync/status.rs @@ -0,0 +1,71 @@ +use crate::database::redis::RedisPool; +use crate::queue::socket::ActiveSockets; +use ariadne::ids::UserId; +use ariadne::users::UserStatus; +use redis::AsyncCommands; + +const EXPIRY_TIME_SECONDS: i64 = 60; + +pub async fn get_user_status( + user: UserId, + local_sockets: &ActiveSockets, + redis: &RedisPool, +) -> Option { + if let Some(friend_status) = local_sockets.get_status(user) { + return Some(friend_status); + } + + if let Ok(mut conn) = redis.pool.get().await { + if let Ok(mut statuses) = + conn.sscan::<_, String>(get_field_name(user)).await + { + if let Some(status_json) = statuses.next_item().await { + return serde_json::from_str::(&status_json).ok(); + } + } + } + + None +} + +pub async fn replace_user_status( + old_status: Option<&UserStatus>, + new_status: Option<&UserStatus>, + redis: &RedisPool, +) -> Result<(), redis::RedisError> { + let Some(user) = new_status.or(old_status).map(|x| x.user_id) else { + return Ok(()); + }; + + if let Ok(mut conn) = redis.pool.get().await { + let field_name = get_field_name(user); + let mut pipe = redis::pipe(); + pipe.atomic(); + if let Some(status) = old_status { + pipe.srem(&field_name, serde_json::to_string(&status).unwrap()) + .ignore(); + } + if let Some(status) = new_status { + pipe.sadd(&field_name, serde_json::to_string(&status).unwrap()) + .ignore(); + pipe.expire(&field_name, EXPIRY_TIME_SECONDS).ignore(); + } + return pipe.query_async(&mut conn).await; + } + + Ok(()) +} + +pub async fn push_back_user_expiry( + user: UserId, + redis: &RedisPool, +) -> Result<(), redis::RedisError> { + if let Ok(mut conn) = redis.pool.get().await { + return conn.expire(get_field_name(user), EXPIRY_TIME_SECONDS).await; + } + Ok(()) +} + +fn get_field_name(user: UserId) -> String { + format!("user_status:{}", user) +} diff --git a/apps/labrinth/tests/common/mod.rs b/apps/labrinth/tests/common/mod.rs index 840bad667..75d0ea7d7 100644 --- a/apps/labrinth/tests/common/mod.rs +++ b/apps/labrinth/tests/common/mod.rs @@ -35,6 +35,9 @@ pub async fn setup(db: &database::TemporaryDatabase) -> LabrinthConfig { let maxmind_reader = Arc::new(queue::maxmind::MaxMindIndexer::new().await.unwrap()); + let stripe_client = + stripe::Client::new(dotenvy::var("STRIPE_API_KEY").unwrap()); + labrinth::app_setup( pool.clone(), redis_pool.clone(), @@ -42,6 +45,8 @@ pub async fn setup(db: &database::TemporaryDatabase) -> LabrinthConfig { &mut clickhouse, file_host.clone(), maxmind_reader, + stripe_client, + false, ) }