diff --git a/.github/workflows/theseus-release.yml b/.github/workflows/theseus-release.yml
index 4cc9a98cd..a34f5ae17 100644
--- a/.github/workflows/theseus-release.yml
+++ b/.github/workflows/theseus-release.yml
@@ -6,9 +6,11 @@ on:
tags:
- 'v*'
paths:
- - .github/workflows/app-release.yml
+ - .github/workflows/theseus-release.yml
- 'apps/app/**'
- 'apps/app-frontend/**'
+ - 'apps/labrinth/src/common/**'
+ - 'apps/labrinth/Cargo.toml'
- 'packages/app-lib/**'
- 'packages/app-macros/**'
- 'packages/assets/**'
diff --git a/.idea/code.iml b/.idea/code.iml
index 70105a514..5fda4410f 100644
--- a/.idea/code.iml
+++ b/.idea/code.iml
@@ -10,6 +10,7 @@
+
diff --git a/Cargo.lock b/Cargo.lock
index 02385145a..a3923eb18 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1204,7 +1204,7 @@ checksum = "d38f2da7a0a2c4ccf0065be06397cc26a81f4e528be095826eee9d4adbb8c60f"
dependencies = [
"byteorder",
"fnv",
- "uuid 1.10.0",
+ "uuid 1.12.0",
]
[[package]]
@@ -1291,7 +1291,7 @@ dependencies = [
"time",
"tokio 1.42.0",
"url",
- "uuid 1.10.0",
+ "uuid 1.12.0",
]
[[package]]
@@ -1995,7 +1995,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef552e6f588e446098f6ba40d89ac146c8c7b64aade83c051ee00bb5d2bc18d"
dependencies = [
"serde",
- "uuid 1.10.0",
+ "uuid 1.12.0",
]
[[package]]
@@ -2511,7 +2511,7 @@ checksum = "887d93f60543e9a9362ef8a21beedd0a833c5d9610e18c67abe15a5963dcb1a4"
dependencies = [
"bit_field",
"flume",
- "half",
+ "half 2.4.1",
"lebe",
"miniz_oxide 0.7.4",
"rayon-core",
@@ -3242,6 +3242,12 @@ dependencies = [
"tracing",
]
+[[package]]
+name = "half"
+version = "1.8.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1b43ede17f21864e81be2fa654110bf1e793774238d86ef8555c37e6519c0403"
+
[[package]]
name = "half"
version = "2.4.1"
@@ -4245,6 +4251,7 @@ dependencies = [
"deadpool-redis",
"derive-new",
"dotenvy",
+ "either",
"env_logger",
"flate2",
"futures 0.3.30",
@@ -4277,6 +4284,8 @@ dependencies = [
"sentry",
"sentry-actix",
"serde",
+ "serde_bytes",
+ "serde_cbor",
"serde_json",
"serde_with",
"sha1 0.6.1",
@@ -4290,7 +4299,7 @@ dependencies = [
"totp-rs",
"url",
"urlencoding",
- "uuid 1.10.0",
+ "uuid 1.12.0",
"validator",
"webp",
"woothee",
@@ -4657,7 +4666,7 @@ dependencies = [
"serde_json",
"thiserror 1.0.64",
"time",
- "uuid 1.10.0",
+ "uuid 1.12.0",
"wasm-bindgen-futures",
"web-sys",
"yaup",
@@ -6724,7 +6733,7 @@ dependencies = [
"rkyv_derive",
"seahash",
"tinyvec",
- "uuid 1.10.0",
+ "uuid 1.12.0",
]
[[package]]
@@ -7108,7 +7117,7 @@ dependencies = [
"serde",
"serde_json",
"url",
- "uuid 1.10.0",
+ "uuid 1.12.0",
]
[[package]]
@@ -7361,7 +7370,7 @@ dependencies = [
"thiserror 1.0.64",
"time",
"url",
- "uuid 1.10.0",
+ "uuid 1.12.0",
]
[[package]]
@@ -7396,6 +7405,25 @@ dependencies = [
"xml-rs",
]
+[[package]]
+name = "serde_bytes"
+version = "0.11.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "387cc504cb06bb40a96c8e04e951fe01854cf6bc921053c954e4a606d9675c6a"
+dependencies = [
+ "serde",
+]
+
+[[package]]
+name = "serde_cbor"
+version = "0.11.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2bef2ebfde456fb76bbcf9f59315333decc4fda0b2b44b420243c11e0f5ec1f5"
+dependencies = [
+ "half 1.8.3",
+ "serde",
+]
+
[[package]]
name = "serde_derive"
version = "1.0.210"
@@ -8536,7 +8564,7 @@ dependencies = [
"thiserror 2.0.7",
"time",
"url",
- "uuid 1.10.0",
+ "uuid 1.12.0",
"walkdir",
]
@@ -8629,7 +8657,7 @@ dependencies = [
"thiserror 2.0.7",
"toml 0.8.19",
"url",
- "uuid 1.10.0",
+ "uuid 1.12.0",
]
[[package]]
@@ -8810,7 +8838,7 @@ dependencies = [
"toml 0.8.19",
"url",
"urlpattern",
- "uuid 1.10.0",
+ "uuid 1.12.0",
"walkdir",
]
@@ -8884,9 +8912,11 @@ dependencies = [
"dirs 5.0.1",
"discord-rich-presence",
"dunce",
+ "either",
"flate2",
"futures 0.3.30",
"indicatif",
+ "labrinth",
"lazy_static",
"notify",
"notify-debouncer-mini",
@@ -8913,7 +8943,7 @@ dependencies = [
"tracing-subscriber",
"url",
"urlencoding",
- "uuid 1.10.0",
+ "uuid 1.12.0",
"whoami",
"winreg 0.52.0",
"zip 0.6.6",
@@ -8955,7 +8985,7 @@ dependencies = [
"tracing",
"tracing-error",
"url",
- "uuid 1.10.0",
+ "uuid 1.12.0",
"window-shadows",
]
@@ -8974,7 +9004,7 @@ dependencies = [
"tracing-error",
"tracing-subscriber",
"url",
- "uuid 1.10.0",
+ "uuid 1.12.0",
"webbrowser",
]
@@ -9819,9 +9849,9 @@ dependencies = [
[[package]]
name = "uuid"
-version = "1.10.0"
+version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314"
+checksum = "744018581f9a3454a9e15beb8a33b017183f1e7c0cd170232a2d1453b23a51c4"
dependencies = [
"getrandom 0.2.15",
"rand 0.8.5",
diff --git a/Cargo.toml b/Cargo.toml
index c55a6fc2e..9afcf86c6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -21,4 +21,4 @@ strip = true # Remove debug symbols
opt-level = 3
[patch.crates-io]
-wry = { git = "https://github.com/modrinth/wry", rev = "51907c6" }
\ No newline at end of file
+wry = { git = "https://github.com/modrinth/wry", rev = "51907c6" }
diff --git a/apps/app-playground/.cargo/config.toml b/apps/app-playground/.cargo/config.toml
new file mode 100644
index 000000000..19f0a3a5a
--- /dev/null
+++ b/apps/app-playground/.cargo/config.toml
@@ -0,0 +1,2 @@
+[env]
+SQLX_OFFLINE = "true"
diff --git a/apps/app-playground/src/main.rs b/apps/app-playground/src/main.rs
index a50002dc7..f8d943938 100644
--- a/apps/app-playground/src/main.rs
+++ b/apps/app-playground/src/main.rs
@@ -3,9 +3,9 @@
windows_subsystem = "windows"
)]
+use std::time::Duration;
use theseus::prelude::*;
-
-use theseus::profile::create::profile_create;
+use tokio::signal::ctrl_c;
// A simple Rust implementation of the authentication run
// 1) call the authenticate_begin_flow() function to get the URL to open (like you would in the frontend)
@@ -41,54 +41,21 @@ async fn main() -> theseus::Result<()> {
// Initialize state
State::init().await?;
- if minecraft_auth::users().await?.is_empty() {
- println!("No users found, authenticating.");
- authenticate_run().await?; // could take credentials from here direct, but also deposited in state users
- }
- //
- // st.settings
- // .write()
- // .await
- // .java_globals
- // .insert(JAVA_8_KEY.to_string(), check_jre(path).await?.unwrap());
- // Clear profiles
- println!("Clearing profiles.");
- {
- let h = profile::list().await?;
- for profile in h.into_iter() {
- profile::remove(&profile.path).await?;
+ loop {
+ if State::get().await?.friends_socket.is_connected().await {
+ break;
}
+ tokio::time::sleep(Duration::from_millis(500)).await;
}
- println!("Creating/adding profile.");
+ tracing::info!("Starting host");
- let name = "Example".to_string();
- let game_version = "1.16.1".to_string();
- let modloader = ModLoader::Forge;
- let loader_version = "stable".to_string();
+ let socket = State::get().await?.friends_socket.open_port(25565).await?;
+ tracing::info!("Running host on socket {}", socket.socket_id());
- let profile_path = profile_create(
- name,
- game_version,
- modloader,
- Some(loader_version),
- None,
- None,
- None,
- )
- .await?;
-
- println!("running");
- // Run a profile, running minecraft and store the RwLock to the process
- let process = profile::run(&profile_path).await?;
-
- println!("Minecraft UUID: {}", process.uuid);
-
- println!("All running process UUID {:?}", process::get_all().await?);
-
- // hold the lock to the process until it ends
- println!("Waiting for process to end...");
- process::wait_for(process.uuid).await?;
+ ctrl_c().await?;
+ tracing::info!("Stopping host");
+ socket.shutdown().await?;
Ok(())
}
diff --git a/apps/app/.cargo/config.toml b/apps/app/.cargo/config.toml
new file mode 100644
index 000000000..19f0a3a5a
--- /dev/null
+++ b/apps/app/.cargo/config.toml
@@ -0,0 +1,2 @@
+[env]
+SQLX_OFFLINE = "true"
diff --git a/apps/labrinth/.sqlx/query-11344e920ea606504c2fdc3c5a3cb1b1e990def66cf260cb5d648cab72cc34f1.json b/apps/labrinth/.sqlx/query-11344e920ea606504c2fdc3c5a3cb1b1e990def66cf260cb5d648cab72cc34f1.json
new file mode 100644
index 000000000..5948a9568
--- /dev/null
+++ b/apps/labrinth/.sqlx/query-11344e920ea606504c2fdc3c5a3cb1b1e990def66cf260cb5d648cab72cc34f1.json
@@ -0,0 +1,15 @@
+{
+ "db_name": "PostgreSQL",
+ "query": "\n UPDATE team_members\n SET\n is_owner = TRUE,\n accepted = TRUE,\n permissions = $2,\n organization_permissions = NULL,\n role = 'Inherited Owner'\n WHERE (id = $1)\n ",
+ "describe": {
+ "columns": [],
+ "parameters": {
+ "Left": [
+ "Int8",
+ "Int8"
+ ]
+ },
+ "nullable": []
+ },
+ "hash": "11344e920ea606504c2fdc3c5a3cb1b1e990def66cf260cb5d648cab72cc34f1"
+}
diff --git a/apps/labrinth/.sqlx/query-2b097a9a1b24b9648d3558e348c7d8cd467e589504c6e754f1f6836203946590.json b/apps/labrinth/.sqlx/query-2b097a9a1b24b9648d3558e348c7d8cd467e589504c6e754f1f6836203946590.json
deleted file mode 100644
index 1ccb38543..000000000
--- a/apps/labrinth/.sqlx/query-2b097a9a1b24b9648d3558e348c7d8cd467e589504c6e754f1f6836203946590.json
+++ /dev/null
@@ -1,22 +0,0 @@
-{
- "db_name": "PostgreSQL",
- "query": "\n SELECT u.id \n FROM team_members\n INNER JOIN users u ON u.id = team_members.user_id\n WHERE team_id = $1 AND is_owner = TRUE\n ",
- "describe": {
- "columns": [
- {
- "ordinal": 0,
- "name": "id",
- "type_info": "Int8"
- }
- ],
- "parameters": {
- "Left": [
- "Int8"
- ]
- },
- "nullable": [
- false
- ]
- },
- "hash": "2b097a9a1b24b9648d3558e348c7d8cd467e589504c6e754f1f6836203946590"
-}
diff --git a/apps/labrinth/.sqlx/query-527291243eb3684e956d7d49c579857ce857ff462c830dd0cb74574f415d4105.json b/apps/labrinth/.sqlx/query-527291243eb3684e956d7d49c579857ce857ff462c830dd0cb74574f415d4105.json
new file mode 100644
index 000000000..28d1860e5
--- /dev/null
+++ b/apps/labrinth/.sqlx/query-527291243eb3684e956d7d49c579857ce857ff462c830dd0cb74574f415d4105.json
@@ -0,0 +1,15 @@
+{
+ "db_name": "PostgreSQL",
+ "query": "\n DELETE FROM version_fields\n WHERE version_id = $1\n AND field_id = ANY($2)\n ",
+ "describe": {
+ "columns": [],
+ "parameters": {
+ "Left": [
+ "Int8",
+ "Int4Array"
+ ]
+ },
+ "nullable": []
+ },
+ "hash": "527291243eb3684e956d7d49c579857ce857ff462c830dd0cb74574f415d4105"
+}
diff --git a/apps/labrinth/.sqlx/query-96ebe21d1430779e88dcaf8872a8c939b3889f91df9a0e404d4c63d466869fe5.json b/apps/labrinth/.sqlx/query-96ebe21d1430779e88dcaf8872a8c939b3889f91df9a0e404d4c63d466869fe5.json
new file mode 100644
index 000000000..ac570f7b7
--- /dev/null
+++ b/apps/labrinth/.sqlx/query-96ebe21d1430779e88dcaf8872a8c939b3889f91df9a0e404d4c63d466869fe5.json
@@ -0,0 +1,22 @@
+{
+ "db_name": "PostgreSQL",
+ "query": "\n SELECT u.id\n FROM team_members\n INNER JOIN users u ON u.id = team_members.user_id\n WHERE team_id = $1 AND is_owner = TRUE\n ",
+ "describe": {
+ "columns": [
+ {
+ "ordinal": 0,
+ "name": "id",
+ "type_info": "Int8"
+ }
+ ],
+ "parameters": {
+ "Left": [
+ "Int8"
+ ]
+ },
+ "nullable": [
+ false
+ ]
+ },
+ "hash": "96ebe21d1430779e88dcaf8872a8c939b3889f91df9a0e404d4c63d466869fe5"
+}
diff --git a/apps/labrinth/.sqlx/query-acd2e72610008d4fe240cdfadc1c70c997443f7319a5c535df967d56d24bd54a.json b/apps/labrinth/.sqlx/query-acd2e72610008d4fe240cdfadc1c70c997443f7319a5c535df967d56d24bd54a.json
deleted file mode 100644
index a80e8a365..000000000
--- a/apps/labrinth/.sqlx/query-acd2e72610008d4fe240cdfadc1c70c997443f7319a5c535df967d56d24bd54a.json
+++ /dev/null
@@ -1,15 +0,0 @@
-{
- "db_name": "PostgreSQL",
- "query": "\n DELETE FROM version_fields \n WHERE version_id = $1\n AND field_id = ANY($2)\n ",
- "describe": {
- "columns": [],
- "parameters": {
- "Left": [
- "Int8",
- "Int4Array"
- ]
- },
- "nullable": []
- },
- "hash": "acd2e72610008d4fe240cdfadc1c70c997443f7319a5c535df967d56d24bd54a"
-}
diff --git a/apps/labrinth/.sqlx/query-f899b378fad8fcfa1ebf527146b565b7c4466205e0bfd84f299123329926fe3f.json b/apps/labrinth/.sqlx/query-bcbcac3c0b2b2b0327577d3095fa744ab42f7f1dcd2b7f3c3dace12b899b3f38.json
similarity index 73%
rename from apps/labrinth/.sqlx/query-f899b378fad8fcfa1ebf527146b565b7c4466205e0bfd84f299123329926fe3f.json
rename to apps/labrinth/.sqlx/query-bcbcac3c0b2b2b0327577d3095fa744ab42f7f1dcd2b7f3c3dace12b899b3f38.json
index 3adb97d48..1de75299c 100644
--- a/apps/labrinth/.sqlx/query-f899b378fad8fcfa1ebf527146b565b7c4466205e0bfd84f299123329926fe3f.json
+++ b/apps/labrinth/.sqlx/query-bcbcac3c0b2b2b0327577d3095fa744ab42f7f1dcd2b7f3c3dace12b899b3f38.json
@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
- "query": "\n INSERT INTO mods (\n id, team_id, name, summary, description,\n published, downloads, icon_url, raw_icon_url, status, requested_status,\n license_url, license,\n slug, color, monetization_status, organization_id\n )\n VALUES (\n $1, $2, $3, $4, $5, $6, \n $7, $8, $9, $10, $11,\n $12, $13,\n LOWER($14), $15, $16, $17\n )\n ",
+ "query": "\n INSERT INTO mods (\n id, team_id, name, summary, description,\n published, downloads, icon_url, raw_icon_url, status, requested_status,\n license_url, license,\n slug, color, monetization_status, organization_id\n )\n VALUES (\n $1, $2, $3, $4, $5, $6,\n $7, $8, $9, $10, $11,\n $12, $13,\n LOWER($14), $15, $16, $17\n )\n ",
"describe": {
"columns": [],
"parameters": {
@@ -26,5 +26,5 @@
},
"nullable": []
},
- "hash": "f899b378fad8fcfa1ebf527146b565b7c4466205e0bfd84f299123329926fe3f"
+ "hash": "bcbcac3c0b2b2b0327577d3095fa744ab42f7f1dcd2b7f3c3dace12b899b3f38"
}
diff --git a/apps/labrinth/.sqlx/query-dc64653d72645b76e42a1834124ce3f9225c5b6b8b941812167b3b7002bfdb2a.json b/apps/labrinth/.sqlx/query-dc64653d72645b76e42a1834124ce3f9225c5b6b8b941812167b3b7002bfdb2a.json
deleted file mode 100644
index e1051cee0..000000000
--- a/apps/labrinth/.sqlx/query-dc64653d72645b76e42a1834124ce3f9225c5b6b8b941812167b3b7002bfdb2a.json
+++ /dev/null
@@ -1,15 +0,0 @@
-{
- "db_name": "PostgreSQL",
- "query": "\n UPDATE team_members\n SET \n is_owner = TRUE,\n accepted = TRUE,\n permissions = $2,\n organization_permissions = NULL,\n role = 'Inherited Owner'\n WHERE (id = $1)\n ",
- "describe": {
- "columns": [],
- "parameters": {
- "Left": [
- "Int8",
- "Int8"
- ]
- },
- "nullable": []
- },
- "hash": "dc64653d72645b76e42a1834124ce3f9225c5b6b8b941812167b3b7002bfdb2a"
-}
diff --git a/apps/labrinth/Cargo.toml b/apps/labrinth/Cargo.toml
index 376045a0a..7ae09784b 100644
--- a/apps/labrinth/Cargo.toml
+++ b/apps/labrinth/Cargo.toml
@@ -36,8 +36,10 @@ reqwest = { version = "0.11.18", features = ["json", "multipart"] }
hyper = { version = "0.14", features = ["full"] }
hyper-tls = "0.5.0"
-serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
+serde_bytes = "0.11"
+serde_json = "1.0"
+serde_cbor = "0.11"
serde_with = "3.0.0"
chrono = { version = "0.4.26", features = ["serde"] }
yaserde = "0.12.0"
@@ -74,6 +76,7 @@ dotenvy = "0.15.7"
log = "0.4.20"
env_logger = "0.10.1"
thiserror = "1.0.56"
+either = "1.13"
sqlx = { version = "0.8.2", features = [
"runtime-tokio-rustls",
diff --git a/apps/labrinth/src/auth/mod.rs b/apps/labrinth/src/auth/mod.rs
index 30eca4d15..9a8a29c06 100644
--- a/apps/labrinth/src/auth/mod.rs
+++ b/apps/labrinth/src/auth/mod.rs
@@ -34,7 +34,7 @@ pub enum AuthenticationError {
#[error("Error uploading user profile picture")]
FileHosting(#[from] FileHostingError),
#[error("Error while decoding PAT: {0}")]
- Decoding(#[from] crate::models::ids::DecodingError),
+ Decoding(#[from] crate::common::ids::DecodingError),
#[error("{0}")]
Mail(#[from] email::MailError),
#[error("Invalid Authentication Credentials")]
diff --git a/apps/labrinth/src/auth/oauth/errors.rs b/apps/labrinth/src/auth/oauth/errors.rs
index dab6ff850..bc2c66cbc 100644
--- a/apps/labrinth/src/auth/oauth/errors.rs
+++ b/apps/labrinth/src/auth/oauth/errors.rs
@@ -1,7 +1,7 @@
use super::ValidatedRedirectUri;
use crate::auth::AuthenticationError;
+use crate::common::ids::DecodingError;
use crate::models::error::ApiError;
-use crate::models::ids::DecodingError;
use actix_web::http::{header::LOCATION, StatusCode};
use actix_web::HttpResponse;
diff --git a/apps/labrinth/src/common/ids.rs b/apps/labrinth/src/common/ids.rs
new file mode 100644
index 000000000..218ee9a8b
--- /dev/null
+++ b/apps/labrinth/src/common/ids.rs
@@ -0,0 +1,218 @@
+pub use super::users::UserId;
+use thiserror::Error;
+
+/// Generates a random 64 bit integer that is exactly `n` characters
+/// long when encoded as base62.
+///
+/// Uses `rand`'s thread rng on every call.
+///
+/// # Panics
+///
+/// This method panics if `n` is 0 or greater than 11, since a `u64`
+/// can only represent up to 11 character base62 strings
+#[inline]
+pub fn random_base62(n: usize) -> u64 {
+ random_base62_rng(&mut rand::thread_rng(), n)
+}
+
+/// Generates a random 64 bit integer that is exactly `n` characters
+/// long when encoded as base62, using the given rng.
+///
+/// # Panics
+///
+/// This method panics if `n` is 0 or greater than 11, since a `u64`
+/// can only represent up to 11 character base62 strings
+pub fn random_base62_rng(rng: &mut R, n: usize) -> u64 {
+ random_base62_rng_range(rng, n, n)
+}
+
+pub fn random_base62_rng_range(
+ rng: &mut R,
+ n_min: usize,
+ n_max: usize,
+) -> u64 {
+ use rand::Rng;
+ assert!(n_min > 0 && n_max <= 11 && n_min <= n_max);
+ // gen_range is [low, high): max value is `MULTIPLES[n] - 1`,
+ // which is n characters long when encoded
+ rng.gen_range(MULTIPLES[n_min - 1]..MULTIPLES[n_max])
+}
+
+const MULTIPLES: [u64; 12] = [
+ 1,
+ 62,
+ 62 * 62,
+ 62 * 62 * 62,
+ 62 * 62 * 62 * 62,
+ 62 * 62 * 62 * 62 * 62,
+ 62 * 62 * 62 * 62 * 62 * 62,
+ 62 * 62 * 62 * 62 * 62 * 62 * 62,
+ 62 * 62 * 62 * 62 * 62 * 62 * 62 * 62,
+ 62 * 62 * 62 * 62 * 62 * 62 * 62 * 62 * 62,
+ 62 * 62 * 62 * 62 * 62 * 62 * 62 * 62 * 62 * 62,
+ u64::MAX,
+];
+
+/// An ID encoded as base62 for use in the API.
+///
+/// All ids should be random and encode to 8-10 character base62 strings,
+/// to avoid enumeration and other attacks.
+#[derive(Copy, Clone, PartialEq, Eq)]
+pub struct Base62Id(pub u64);
+
+/// An error decoding a number from base62.
+#[derive(Error, Debug)]
+pub enum DecodingError {
+ /// Encountered a non-base62 character in a base62 string
+ #[error("Invalid character {0:?} in base62 encoding")]
+ InvalidBase62(char),
+ /// Encountered integer overflow when decoding a base62 id.
+ #[error("Base62 decoding overflowed")]
+ Overflow,
+}
+
+#[macro_export]
+macro_rules! from_base62id {
+ ($($struct:ty, $con:expr;)+) => {
+ $(
+ impl From for $struct {
+ fn from(id: Base62Id) -> $struct {
+ $con(id.0)
+ }
+ }
+ impl From<$struct> for Base62Id {
+ fn from(id: $struct) -> Base62Id {
+ Base62Id(id.0)
+ }
+ }
+ )+
+ };
+}
+
+#[macro_export]
+macro_rules! impl_base62_display {
+ ($struct:ty) => {
+ impl std::fmt::Display for $struct {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.write_str(&$crate::common::ids::base62_impl::to_base62(
+ self.0,
+ ))
+ }
+ }
+ };
+}
+impl_base62_display!(Base62Id);
+
+#[macro_export]
+macro_rules! base62_id_impl {
+ ($struct:ty, $cons:expr) => {
+ $crate::common::ids::from_base62id!($struct, $cons;);
+ $crate::common::ids::impl_base62_display!($struct);
+ }
+}
+base62_id_impl!(UserId, UserId);
+
+pub use {base62_id_impl, from_base62id, impl_base62_display};
+
+pub mod base62_impl {
+ use serde::de::{self, Deserializer, Visitor};
+ use serde::ser::Serializer;
+ use serde::{Deserialize, Serialize};
+
+ use super::{Base62Id, DecodingError};
+
+ impl<'de> Deserialize<'de> for Base62Id {
+ fn deserialize(deserializer: D) -> Result
+ where
+ D: Deserializer<'de>,
+ {
+ struct Base62Visitor;
+
+ impl Visitor<'_> for Base62Visitor {
+ type Value = Base62Id;
+
+ fn expecting(
+ &self,
+ formatter: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ formatter.write_str("a base62 string id")
+ }
+
+ fn visit_u64(self, v: u64) -> Result
+ where
+ E: de::Error,
+ {
+ Ok(Base62Id(v))
+ }
+
+ fn visit_str(self, string: &str) -> Result
+ where
+ E: de::Error,
+ {
+ parse_base62(string).map(Base62Id).map_err(E::custom)
+ }
+ }
+
+ if deserializer.is_human_readable() {
+ deserializer.deserialize_str(Base62Visitor)
+ } else {
+ deserializer.deserialize_u64(Base62Visitor)
+ }
+ }
+ }
+
+ impl Serialize for Base62Id {
+ fn serialize(&self, serializer: S) -> Result
+ where
+ S: Serializer,
+ {
+ if serializer.is_human_readable() {
+ serializer.serialize_str(&to_base62(self.0))
+ } else {
+ serializer.serialize_u64(self.0)
+ }
+ }
+ }
+
+ const BASE62_CHARS: [u8; 62] =
+ *b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+
+ pub fn to_base62(mut num: u64) -> String {
+ let length = (num as f64).log(62.0).ceil() as usize;
+ let mut output = String::with_capacity(length);
+
+ while num > 0 {
+ // Could be done more efficiently, but requires byte
+ // manipulation of strings & Vec -> String conversion
+ output.insert(0, BASE62_CHARS[(num % 62) as usize] as char);
+ num /= 62;
+ }
+ output
+ }
+
+ pub fn parse_base62(string: &str) -> Result {
+ let mut num: u64 = 0;
+ for c in string.chars() {
+ let next_digit;
+ if c.is_ascii_digit() {
+ next_digit = (c as u8 - b'0') as u64;
+ } else if c.is_ascii_uppercase() {
+ next_digit = 10 + (c as u8 - b'A') as u64;
+ } else if c.is_ascii_lowercase() {
+ next_digit = 36 + (c as u8 - b'a') as u64;
+ } else {
+ return Err(DecodingError::InvalidBase62(c));
+ }
+
+ // We don't want this panicking or wrapping on integer overflow
+ if let Some(n) =
+ num.checked_mul(62).and_then(|n| n.checked_add(next_digit))
+ {
+ num = n;
+ } else {
+ return Err(DecodingError::Overflow);
+ }
+ }
+ Ok(num)
+ }
+}
diff --git a/apps/labrinth/src/common/mod.rs b/apps/labrinth/src/common/mod.rs
new file mode 100644
index 000000000..4a4251a3e
--- /dev/null
+++ b/apps/labrinth/src/common/mod.rs
@@ -0,0 +1,3 @@
+pub mod ids;
+pub mod networking;
+pub mod users;
diff --git a/apps/labrinth/src/common/networking/message.rs b/apps/labrinth/src/common/networking/message.rs
new file mode 100644
index 000000000..96bc86a9f
--- /dev/null
+++ b/apps/labrinth/src/common/networking/message.rs
@@ -0,0 +1,65 @@
+use crate::common::ids::UserId;
+use crate::common::users::UserStatus;
+use serde::{Deserialize, Serialize};
+use uuid::Uuid;
+
+#[derive(Debug, Serialize, Deserialize)]
+#[serde(tag = "type", rename_all = "snake_case")]
+pub enum ClientToServerMessage {
+ StatusUpdate {
+ profile_name: Option,
+ },
+
+ SocketListen {
+ socket: Uuid,
+ },
+ SocketClose {
+ socket: Uuid,
+ },
+ SocketSend {
+ socket: Uuid,
+ #[serde(with = "serde_bytes")]
+ data: Vec,
+ },
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+#[serde(tag = "type", rename_all = "snake_case")]
+pub enum ServerToClientMessage {
+ StatusUpdate {
+ status: UserStatus,
+ },
+ UserOffline {
+ id: UserId,
+ },
+ FriendStatuses {
+ statuses: Vec,
+ },
+ FriendRequest {
+ from: UserId,
+ },
+ FriendRequestRejected {
+ from: UserId,
+ },
+
+ FriendSocketListening {
+ user: UserId,
+ socket: Uuid,
+ },
+ FriendSocketStoppedListening {
+ user: UserId,
+ },
+
+ SocketConnected {
+ to_socket: Uuid,
+ new_socket: Uuid,
+ },
+ SocketClosed {
+ socket: Uuid,
+ },
+ SocketData {
+ socket: Uuid,
+ #[serde(with = "serde_bytes")]
+ data: Vec,
+ },
+}
diff --git a/apps/labrinth/src/common/networking/mod.rs b/apps/labrinth/src/common/networking/mod.rs
new file mode 100644
index 000000000..b7855d592
--- /dev/null
+++ b/apps/labrinth/src/common/networking/mod.rs
@@ -0,0 +1,2 @@
+pub mod message;
+pub mod serialization;
diff --git a/apps/labrinth/src/common/networking/serialization.rs b/apps/labrinth/src/common/networking/serialization.rs
new file mode 100644
index 000000000..e39f5d44a
--- /dev/null
+++ b/apps/labrinth/src/common/networking/serialization.rs
@@ -0,0 +1,56 @@
+use super::message::{ClientToServerMessage, ServerToClientMessage};
+use either::Either;
+use thiserror::Error;
+
+#[derive(Debug, Error)]
+pub enum SerializationError {
+ #[error("Failed to (de)serialize message: {0}")]
+ SerializationFailed(#[from] serde_json::Error),
+
+ #[error("Failed to (de)serialize binary message: {0}")]
+ BinarySerializationFailed(#[from] serde_cbor::Error),
+}
+
+macro_rules! message_serialization {
+ ($message_enum:ty $(,$binary_pattern:pat_param)* $(,)?) => {
+ impl $message_enum {
+ pub fn is_binary(&self) -> bool {
+ match self {
+ $(
+ $binary_pattern => true,
+ )*
+ _ => false,
+ }
+ }
+
+ pub fn serialize(
+ &self,
+ ) -> Result>, SerializationError> {
+ Ok(match self {
+ $(
+ $binary_pattern => Either::Right(serde_cbor::to_vec(self)?),
+ )*
+ _ => Either::Left(serde_json::to_string(self)?),
+ })
+ }
+
+ pub fn deserialize(
+ msg: Either<&str, &[u8]>,
+ ) -> Result {
+ Ok(match msg {
+ Either::Left(text) => serde_json::from_str(&text)?,
+ Either::Right(bytes) => serde_cbor::from_slice(&bytes)?,
+ })
+ }
+ }
+ };
+}
+
+message_serialization!(
+ ClientToServerMessage,
+ ClientToServerMessage::SocketSend { .. },
+);
+message_serialization!(
+ ServerToClientMessage,
+ ServerToClientMessage::SocketData { .. },
+);
diff --git a/apps/labrinth/src/common/users.rs b/apps/labrinth/src/common/users.rs
new file mode 100644
index 000000000..2fe087283
--- /dev/null
+++ b/apps/labrinth/src/common/users.rs
@@ -0,0 +1,15 @@
+use super::ids::Base62Id;
+use chrono::{DateTime, Utc};
+use serde::{Deserialize, Serialize};
+
+#[derive(Copy, Clone, PartialEq, Eq, Serialize, Deserialize, Debug, Hash)]
+#[serde(from = "Base62Id")]
+#[serde(into = "Base62Id")]
+pub struct UserId(pub u64);
+
+#[derive(Debug, Serialize, Deserialize, Clone)]
+pub struct UserStatus {
+ pub user_id: UserId,
+ pub profile_name: Option,
+ pub last_update: DateTime,
+}
diff --git a/apps/labrinth/src/database/models/ids.rs b/apps/labrinth/src/database/models/ids.rs
index aa1b99895..b7f338729 100644
--- a/apps/labrinth/src/database/models/ids.rs
+++ b/apps/labrinth/src/database/models/ids.rs
@@ -1,6 +1,6 @@
use super::DatabaseError;
-use crate::models::ids::base62_impl::to_base62;
-use crate::models::ids::{random_base62_rng, random_base62_rng_range};
+use crate::common::ids::base62_impl::to_base62;
+use crate::common::ids::{random_base62_rng, random_base62_rng_range};
use censor::Censor;
use rand::SeedableRng;
use rand_chacha::ChaCha20Rng;
diff --git a/apps/labrinth/src/database/models/organization_item.rs b/apps/labrinth/src/database/models/organization_item.rs
index b01052776..51a9c8c02 100644
--- a/apps/labrinth/src/database/models/organization_item.rs
+++ b/apps/labrinth/src/database/models/organization_item.rs
@@ -1,6 +1,5 @@
-use crate::{
- database::redis::RedisPool, models::ids::base62_impl::parse_base62,
-};
+use crate::common::ids::base62_impl::parse_base62;
+use crate::database::redis::RedisPool;
use dashmap::DashMap;
use futures::TryStreamExt;
use std::fmt::{Debug, Display};
diff --git a/apps/labrinth/src/database/models/pat_item.rs b/apps/labrinth/src/database/models/pat_item.rs
index 205a70e4b..2ae53666d 100644
--- a/apps/labrinth/src/database/models/pat_item.rs
+++ b/apps/labrinth/src/database/models/pat_item.rs
@@ -1,7 +1,7 @@
use super::ids::*;
+use crate::common::ids::base62_impl::parse_base62;
use crate::database::models::DatabaseError;
use crate::database::redis::RedisPool;
-use crate::models::ids::base62_impl::parse_base62;
use crate::models::pats::Scopes;
use chrono::{DateTime, Utc};
use dashmap::DashMap;
diff --git a/apps/labrinth/src/database/models/project_item.rs b/apps/labrinth/src/database/models/project_item.rs
index f7f3ab9ca..2a40b6865 100644
--- a/apps/labrinth/src/database/models/project_item.rs
+++ b/apps/labrinth/src/database/models/project_item.rs
@@ -3,10 +3,10 @@ use super::loader_fields::{
VersionField,
};
use super::{ids::*, User};
+use crate::common::ids::base62_impl::parse_base62;
use crate::database::models;
use crate::database::models::DatabaseError;
use crate::database::redis::RedisPool;
-use crate::models::ids::base62_impl::parse_base62;
use crate::models::projects::{MonetizationStatus, ProjectStatus};
use chrono::{DateTime, Utc};
use dashmap::{DashMap, DashSet};
@@ -300,7 +300,7 @@ impl Project {
slug, color, monetization_status, organization_id
)
VALUES (
- $1, $2, $3, $4, $5, $6,
+ $1, $2, $3, $4, $5, $6,
$7, $8, $9, $10, $11,
$12, $13,
LOWER($14), $15, $16, $17
diff --git a/apps/labrinth/src/database/models/session_item.rs b/apps/labrinth/src/database/models/session_item.rs
index adb1659ea..4717c2a81 100644
--- a/apps/labrinth/src/database/models/session_item.rs
+++ b/apps/labrinth/src/database/models/session_item.rs
@@ -1,7 +1,7 @@
use super::ids::*;
+use crate::common::ids::base62_impl::parse_base62;
use crate::database::models::DatabaseError;
use crate::database::redis::RedisPool;
-use crate::models::ids::base62_impl::parse_base62;
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
diff --git a/apps/labrinth/src/database/models/user_item.rs b/apps/labrinth/src/database/models/user_item.rs
index b42fc4651..09f017594 100644
--- a/apps/labrinth/src/database/models/user_item.rs
+++ b/apps/labrinth/src/database/models/user_item.rs
@@ -1,9 +1,9 @@
use super::ids::{ProjectId, UserId};
use super::{CollectionId, ReportId, ThreadId};
+use crate::common::ids::base62_impl::{parse_base62, to_base62};
use crate::database::models;
use crate::database::models::{DatabaseError, OrganizationId};
use crate::database::redis::RedisPool;
-use crate::models::ids::base62_impl::{parse_base62, to_base62};
use crate::models::users::Badges;
use chrono::{DateTime, Utc};
use dashmap::DashMap;
diff --git a/apps/labrinth/src/database/redis.rs b/apps/labrinth/src/database/redis.rs
index cc59ffef7..b28317fc0 100644
--- a/apps/labrinth/src/database/redis.rs
+++ b/apps/labrinth/src/database/redis.rs
@@ -1,5 +1,5 @@
use super::models::DatabaseError;
-use crate::models::ids::base62_impl::{parse_base62, to_base62};
+use crate::common::ids::base62_impl::{parse_base62, to_base62};
use chrono::{TimeZone, Utc};
use dashmap::DashMap;
use deadpool_redis::{Config, Runtime};
diff --git a/apps/labrinth/src/lib.rs b/apps/labrinth/src/lib.rs
index db1416a71..9856cfb1d 100644
--- a/apps/labrinth/src/lib.rs
+++ b/apps/labrinth/src/lib.rs
@@ -25,6 +25,8 @@ use crate::{
util::env::{parse_strings_from_var, parse_var},
};
+pub mod common;
+
pub mod auth;
pub mod clickhouse;
pub mod database;
@@ -297,8 +299,10 @@ pub fn app_setup(
}
let ip_salt = Pepper {
- pepper: models::ids::Base62Id(models::ids::random_base62(11))
- .to_string(),
+ pepper: crate::common::ids::Base62Id(
+ crate::common::ids::random_base62(11),
+ )
+ .to_string(),
};
let payouts_queue = web::Data::new(PayoutsQueue::new());
diff --git a/apps/labrinth/src/models/v3/ids.rs b/apps/labrinth/src/models/v3/ids.rs
index d6530af2f..3d8a8cfd2 100644
--- a/apps/labrinth/src/models/v3/ids.rs
+++ b/apps/labrinth/src/models/v3/ids.rs
@@ -13,117 +13,13 @@ pub use super::teams::TeamId;
pub use super::threads::ThreadId;
pub use super::threads::ThreadMessageId;
pub use super::users::UserId;
+use crate::common::ids::base62_id_impl;
+pub use crate::common::ids::Base62Id;
pub use crate::models::billing::{
ChargeId, ProductId, ProductPriceId, UserSubscriptionId,
};
-use thiserror::Error;
-/// Generates a random 64 bit integer that is exactly `n` characters
-/// long when encoded as base62.
-///
-/// Uses `rand`'s thread rng on every call.
-///
-/// # Panics
-///
-/// This method panics if `n` is 0 or greater than 11, since a `u64`
-/// can only represent up to 11 character base62 strings
-#[inline]
-pub fn random_base62(n: usize) -> u64 {
- random_base62_rng(&mut rand::thread_rng(), n)
-}
-
-/// Generates a random 64 bit integer that is exactly `n` characters
-/// long when encoded as base62, using the given rng.
-///
-/// # Panics
-///
-/// This method panics if `n` is 0 or greater than 11, since a `u64`
-/// can only represent up to 11 character base62 strings
-pub fn random_base62_rng(rng: &mut R, n: usize) -> u64 {
- random_base62_rng_range(rng, n, n)
-}
-
-pub fn random_base62_rng_range(
- rng: &mut R,
- n_min: usize,
- n_max: usize,
-) -> u64 {
- use rand::Rng;
- assert!(n_min > 0 && n_max <= 11 && n_min <= n_max);
- // gen_range is [low, high): max value is `MULTIPLES[n] - 1`,
- // which is n characters long when encoded
- rng.gen_range(MULTIPLES[n_min - 1]..MULTIPLES[n_max])
-}
-
-const MULTIPLES: [u64; 12] = [
- 1,
- 62,
- 62 * 62,
- 62 * 62 * 62,
- 62 * 62 * 62 * 62,
- 62 * 62 * 62 * 62 * 62,
- 62 * 62 * 62 * 62 * 62 * 62,
- 62 * 62 * 62 * 62 * 62 * 62 * 62,
- 62 * 62 * 62 * 62 * 62 * 62 * 62 * 62,
- 62 * 62 * 62 * 62 * 62 * 62 * 62 * 62 * 62,
- 62 * 62 * 62 * 62 * 62 * 62 * 62 * 62 * 62 * 62,
- u64::MAX,
-];
-
-/// An ID encoded as base62 for use in the API.
-///
-/// All ids should be random and encode to 8-10 character base62 strings,
-/// to avoid enumeration and other attacks.
-#[derive(Copy, Clone, PartialEq, Eq)]
-pub struct Base62Id(pub u64);
-
-/// An error decoding a number from base62.
-#[derive(Error, Debug)]
-pub enum DecodingError {
- /// Encountered a non-base62 character in a base62 string
- #[error("Invalid character {0:?} in base62 encoding")]
- InvalidBase62(char),
- /// Encountered integer overflow when decoding a base62 id.
- #[error("Base62 decoding overflowed")]
- Overflow,
-}
-
-macro_rules! from_base62id {
- ($($struct:ty, $con:expr;)+) => {
- $(
- impl From for $struct {
- fn from(id: Base62Id) -> $struct {
- $con(id.0)
- }
- }
- impl From<$struct> for Base62Id {
- fn from(id: $struct) -> Base62Id {
- Base62Id(id.0)
- }
- }
- )+
- };
-}
-
-macro_rules! impl_base62_display {
- ($struct:ty) => {
- impl std::fmt::Display for $struct {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.write_str(&base62_impl::to_base62(self.0))
- }
- }
- };
-}
-impl_base62_display!(Base62Id);
-
-macro_rules! base62_id_impl {
- ($struct:ty, $cons:expr) => {
- from_base62id!($struct, $cons;);
- impl_base62_display!($struct);
- }
-}
base62_id_impl!(ProjectId, ProjectId);
-base62_id_impl!(UserId, UserId);
base62_id_impl!(VersionId, VersionId);
base62_id_impl!(CollectionId, CollectionId);
base62_id_impl!(TeamId, TeamId);
@@ -143,91 +39,3 @@ base62_id_impl!(ProductId, ProductId);
base62_id_impl!(ProductPriceId, ProductPriceId);
base62_id_impl!(UserSubscriptionId, UserSubscriptionId);
base62_id_impl!(ChargeId, ChargeId);
-
-pub mod base62_impl {
- use serde::de::{self, Deserializer, Visitor};
- use serde::ser::Serializer;
- use serde::{Deserialize, Serialize};
-
- use super::{Base62Id, DecodingError};
-
- impl<'de> Deserialize<'de> for Base62Id {
- fn deserialize(deserializer: D) -> Result
- where
- D: Deserializer<'de>,
- {
- struct Base62Visitor;
-
- impl Visitor<'_> for Base62Visitor {
- type Value = Base62Id;
-
- fn expecting(
- &self,
- formatter: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- formatter.write_str("a base62 string id")
- }
-
- fn visit_str(self, string: &str) -> Result
- where
- E: de::Error,
- {
- parse_base62(string).map(Base62Id).map_err(E::custom)
- }
- }
-
- deserializer.deserialize_str(Base62Visitor)
- }
- }
-
- impl Serialize for Base62Id {
- fn serialize(&self, serializer: S) -> Result
- where
- S: Serializer,
- {
- serializer.serialize_str(&to_base62(self.0))
- }
- }
-
- const BASE62_CHARS: [u8; 62] =
- *b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
-
- pub fn to_base62(mut num: u64) -> String {
- let length = (num as f64).log(62.0).ceil() as usize;
- let mut output = String::with_capacity(length);
-
- while num > 0 {
- // Could be done more efficiently, but requires byte
- // manipulation of strings & Vec -> String conversion
- output.insert(0, BASE62_CHARS[(num % 62) as usize] as char);
- num /= 62;
- }
- output
- }
-
- pub fn parse_base62(string: &str) -> Result {
- let mut num: u64 = 0;
- for c in string.chars() {
- let next_digit;
- if c.is_ascii_digit() {
- next_digit = (c as u8 - b'0') as u64;
- } else if c.is_ascii_uppercase() {
- next_digit = 10 + (c as u8 - b'A') as u64;
- } else if c.is_ascii_lowercase() {
- next_digit = 36 + (c as u8 - b'a') as u64;
- } else {
- return Err(DecodingError::InvalidBase62(c));
- }
-
- // We don't want this panicking or wrapping on integer overflow
- if let Some(n) =
- num.checked_mul(62).and_then(|n| n.checked_add(next_digit))
- {
- num = n;
- } else {
- return Err(DecodingError::Overflow);
- }
- }
- Ok(num)
- }
-}
diff --git a/apps/labrinth/src/models/v3/notifications.rs b/apps/labrinth/src/models/v3/notifications.rs
index 2d0813102..e47375afc 100644
--- a/apps/labrinth/src/models/v3/notifications.rs
+++ b/apps/labrinth/src/models/v3/notifications.rs
@@ -178,7 +178,7 @@ impl From for Notification {
name.clone(),
text.clone(),
link.clone(),
- actions.clone().into_iter().map(Into::into).collect(),
+ actions.clone().into_iter().collect(),
),
NotificationBody::Unknown => {
("".to_string(), "".to_string(), "#".to_string(), vec![])
diff --git a/apps/labrinth/src/models/v3/users.rs b/apps/labrinth/src/models/v3/users.rs
index 9bffd442f..8e9427dd9 100644
--- a/apps/labrinth/src/models/v3/users.rs
+++ b/apps/labrinth/src/models/v3/users.rs
@@ -1,14 +1,9 @@
-use super::ids::Base62Id;
+pub use crate::common::users::{UserId, UserStatus};
use crate::{auth::AuthProvider, bitflags_serde_impl};
use chrono::{DateTime, Utc};
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
-#[derive(Copy, Clone, PartialEq, Eq, Serialize, Deserialize, Debug, Hash)]
-#[serde(from = "Base62Id")]
-#[serde(into = "Base62Id")]
-pub struct UserId(pub u64);
-
pub const DELETED_USER: UserId = UserId(127155982985829);
bitflags::bitflags! {
@@ -211,10 +206,3 @@ impl UserFriend {
}
}
}
-
-#[derive(Serialize, Deserialize, Clone)]
-pub struct UserStatus {
- pub user_id: UserId,
- pub profile_name: Option,
- pub last_update: DateTime,
-}
diff --git a/apps/labrinth/src/queue/socket.rs b/apps/labrinth/src/queue/socket.rs
index c39c16b8c..f11d477ed 100644
--- a/apps/labrinth/src/queue/socket.rs
+++ b/apps/labrinth/src/queue/socket.rs
@@ -1,16 +1,68 @@
//! "Database" for Hydra
+
use crate::models::users::{UserId, UserStatus};
use actix_ws::Session;
-use dashmap::DashMap;
+use dashmap::{DashMap, DashSet};
+use std::sync::atomic::AtomicU32;
+use uuid::Uuid;
+
+pub type SocketId = u32;
pub struct ActiveSockets {
- pub auth_sockets: DashMap,
+ pub sockets: DashMap,
+ pub sockets_by_user_id: DashMap>,
+ pub next_socket_id: AtomicU32,
+ pub tunnel_sockets: DashMap,
}
impl Default for ActiveSockets {
fn default() -> Self {
Self {
- auth_sockets: DashMap::new(),
+ sockets: DashMap::new(),
+ sockets_by_user_id: DashMap::new(),
+ next_socket_id: AtomicU32::new(0),
+ tunnel_sockets: DashMap::new(),
}
}
}
+
+impl ActiveSockets {
+ pub fn get_status(&self, user: UserId) -> Option {
+ self.sockets_by_user_id
+ .get(&user)
+ .and_then(|x| x.iter().next().and_then(|x| self.sockets.get(&*x)))
+ .map(|x| x.status.clone())
+ }
+}
+
+pub struct ActiveSocket {
+ pub status: UserStatus,
+ pub socket: Session,
+ pub owned_tunnel_sockets: DashSet,
+}
+
+impl ActiveSocket {
+ pub fn new(status: UserStatus, session: Session) -> Self {
+ Self {
+ status,
+ socket: session,
+ owned_tunnel_sockets: DashSet::new(),
+ }
+ }
+}
+
+pub struct TunnelSocket {
+ pub owner: SocketId,
+ pub socket_type: TunnelSocketType,
+}
+
+impl TunnelSocket {
+ pub fn new(owner: SocketId, socket_type: TunnelSocketType) -> Self {
+ Self { owner, socket_type }
+ }
+}
+
+pub enum TunnelSocketType {
+ Listening,
+ Connected { connected_to: Uuid },
+}
diff --git a/apps/labrinth/src/routes/internal/admin.rs b/apps/labrinth/src/routes/internal/admin.rs
index 95b1eb920..62562ab1a 100644
--- a/apps/labrinth/src/routes/internal/admin.rs
+++ b/apps/labrinth/src/routes/internal/admin.rs
@@ -74,7 +74,7 @@ pub async fn count_download(
let project_id: crate::database::models::ids::ProjectId =
download_body.project_id.into();
- let id_option = crate::models::ids::base62_impl::parse_base62(
+ let id_option = crate::common::ids::base62_impl::parse_base62(
&download_body.version_name,
)
.ok()
diff --git a/apps/labrinth/src/routes/internal/billing.rs b/apps/labrinth/src/routes/internal/billing.rs
index 51b271ba8..380533cd3 100644
--- a/apps/labrinth/src/routes/internal/billing.rs
+++ b/apps/labrinth/src/routes/internal/billing.rs
@@ -1,4 +1,5 @@
use crate::auth::{get_user_from_headers, send_email};
+use crate::common::ids::base62_impl::{parse_base62, to_base62};
use crate::database::models::charge_item::ChargeItem;
use crate::database::models::{
generate_charge_id, generate_user_subscription_id, product_item,
@@ -10,7 +11,6 @@ use crate::models::billing::{
Product, ProductMetadata, ProductPrice, SubscriptionMetadata,
SubscriptionStatus, UserSubscription,
};
-use crate::models::ids::base62_impl::{parse_base62, to_base62};
use crate::models::pats::Scopes;
use crate::models::users::Badges;
use crate::queue::session::AuthQueue;
diff --git a/apps/labrinth/src/routes/internal/flows.rs b/apps/labrinth/src/routes/internal/flows.rs
index d0f830609..5d20b6c87 100644
--- a/apps/labrinth/src/routes/internal/flows.rs
+++ b/apps/labrinth/src/routes/internal/flows.rs
@@ -1,11 +1,11 @@
use crate::auth::email::send_email;
use crate::auth::validate::get_user_record_from_bearer_token;
use crate::auth::{get_user_from_headers, AuthProvider, AuthenticationError};
+use crate::common::ids::base62_impl::{parse_base62, to_base62};
+use crate::common::ids::random_base62_rng;
use crate::database::models::flow_item::Flow;
use crate::database::redis::RedisPool;
use crate::file_hosting::FileHost;
-use crate::models::ids::base62_impl::{parse_base62, to_base62};
-use crate::models::ids::random_base62_rng;
use crate::models::pats::Scopes;
use crate::models::users::{Badges, Role};
use crate::queue::session::AuthQueue;
diff --git a/apps/labrinth/src/routes/internal/moderation.rs b/apps/labrinth/src/routes/internal/moderation.rs
index 9f59e738e..28f604e31 100644
--- a/apps/labrinth/src/routes/internal/moderation.rs
+++ b/apps/labrinth/src/routes/internal/moderation.rs
@@ -1,7 +1,7 @@
use super::ApiError;
+use crate::common::ids::random_base62;
use crate::database;
use crate::database::redis::RedisPool;
-use crate::models::ids::random_base62;
use crate::models::projects::ProjectStatus;
use crate::queue::moderation::{ApprovalType, IdentifiedFile, MissingMetadata};
use crate::queue::session::AuthQueue;
diff --git a/apps/labrinth/src/routes/internal/statuses.rs b/apps/labrinth/src/routes/internal/statuses.rs
index 4b595b1cc..c531c5c7e 100644
--- a/apps/labrinth/src/routes/internal/statuses.rs
+++ b/apps/labrinth/src/routes/internal/statuses.rs
@@ -1,41 +1,33 @@
use crate::auth::validate::get_user_record_from_bearer_token;
use crate::auth::AuthenticationError;
+use crate::common::ids::UserId;
+use crate::common::networking::message::{
+ ClientToServerMessage, ServerToClientMessage,
+};
+use crate::common::users::UserStatus;
use crate::database::models::friend_item::FriendItem;
use crate::database::redis::RedisPool;
-use crate::models::ids::UserId;
use crate::models::pats::Scopes;
-use crate::models::users::{User, UserStatus};
+use crate::models::users::User;
use crate::queue::session::AuthQueue;
-use crate::queue::socket::ActiveSockets;
+use crate::queue::socket::{
+ ActiveSocket, ActiveSockets, SocketId, TunnelSocketType,
+};
use crate::routes::ApiError;
use actix_web::web::{Data, Payload};
use actix_web::{get, web, HttpRequest, HttpResponse};
use actix_ws::Message;
use chrono::Utc;
+use either::Either;
use futures_util::{StreamExt, TryStreamExt};
-use serde::{Deserialize, Serialize};
+use serde::Deserialize;
use sqlx::PgPool;
+use std::sync::atomic::Ordering;
pub fn config(cfg: &mut web::ServiceConfig) {
cfg.service(ws_init);
}
-#[derive(Deserialize)]
-#[serde(tag = "type", rename_all = "snake_case")]
-pub enum ClientToServerMessage {
- StatusUpdate { profile_name: Option },
-}
-
-#[derive(Serialize)]
-#[serde(tag = "type", rename_all = "snake_case")]
-pub enum ServerToClientMessage {
- StatusUpdate { status: UserStatus },
- UserOffline { id: UserId },
- FriendStatuses { statuses: Vec },
- FriendRequest { from: UserId },
- FriendRequestRejected { from: UserId },
-}
-
#[derive(Deserialize)]
struct LauncherHeartbeatInit {
code: String,
@@ -71,10 +63,6 @@ pub async fn ws_init(
let user = User::from_full(db_user);
- if let Some((_, (_, session))) = db.auth_sockets.remove(&user.id) {
- let _ = session.close(None).await;
- }
-
let (res, mut session, msg_stream) = match actix_ws::handle(&req, body) {
Ok(x) => x,
Err(e) => return Ok(e.error_response()),
@@ -94,8 +82,8 @@ pub async fn ws_init(
friends
.iter()
.filter_map(|x| {
- db.auth_sockets.get(
- &if x.user_id == user.id.into() {
+ db.get_status(
+ if x.user_id == user.id.into() {
x.friend_id
} else {
x.user_id
@@ -103,7 +91,6 @@ pub async fn ws_init(
.into(),
)
})
- .map(|x| x.value().0.clone())
.collect::>()
} else {
Vec::new()
@@ -117,7 +104,17 @@ pub async fn ws_init(
)?)
.await;
- db.auth_sockets.insert(user.id, (status.clone(), session));
+ let db = db.clone();
+ let socket_id = db.next_socket_id.fetch_add(1, Ordering::Relaxed);
+ db.sockets
+ .insert(socket_id, ActiveSocket::new(status.clone(), session));
+ db.sockets_by_user_id
+ .entry(user.id)
+ .or_default()
+ .insert(socket_id);
+
+ #[cfg(debug_assertions)]
+ log::info!("Connection {socket_id} opened by {}", user.id);
broadcast_friends(
user.id,
@@ -133,68 +130,182 @@ pub async fn ws_init(
actix_web::rt::spawn(async move {
// receive messages from websocket
while let Some(msg) = stream.next().await {
- match msg {
+ let message = match msg {
Ok(Message::Text(text)) => {
- if let Ok(message) =
- serde_json::from_str::(&text)
- {
- match message {
- ClientToServerMessage::StatusUpdate {
- profile_name,
- } => {
- if let Some(mut pair) =
- db.auth_sockets.get_mut(&user.id)
- {
- let (status, _) = pair.value_mut();
+ ClientToServerMessage::deserialize(Either::Left(&text))
+ }
- if status
- .profile_name
- .as_ref()
- .map(|x| x.len() > 64)
- .unwrap_or(false)
- {
- continue;
- }
-
- status.profile_name = profile_name;
- status.last_update = Utc::now();
-
- let user_status = status.clone();
- // We drop the pair to avoid holding the lock for too long
- drop(pair);
-
- let _ = broadcast_friends(
- user.id,
- ServerToClientMessage::StatusUpdate {
- status: user_status,
- },
- &pool,
- &db,
- None,
- )
- .await;
- }
- }
- }
- }
+ Ok(Message::Binary(bytes)) => {
+ ClientToServerMessage::deserialize(Either::Right(&bytes))
}
Ok(Message::Close(_)) => {
- let _ = close_socket(user.id, &pool, &db).await;
+ let _ = close_socket(socket_id, &pool, &db).await;
+ continue;
}
Ok(Message::Ping(msg)) => {
- if let Some(socket) = db.auth_sockets.get(&user.id) {
- let (_, socket) = socket.value();
- let _ = socket.clone().pong(&msg).await;
+ if let Some(socket) = db.sockets.get(&socket_id) {
+ let _ = socket.socket.clone().pong(&msg).await;
+ }
+ continue;
+ }
+
+ _ => continue,
+ };
+
+ if message.is_err() {
+ continue;
+ }
+ let message = message.unwrap();
+
+ #[cfg(debug_assertions)]
+ if !message.is_binary() {
+ log::info!("Received message from {socket_id}: {:?}", message);
+ }
+
+ match message {
+ ClientToServerMessage::StatusUpdate { profile_name } => {
+ if let Some(mut pair) = db.sockets.get_mut(&socket_id) {
+ let ActiveSocket { status, .. } = pair.value_mut();
+
+ if status
+ .profile_name
+ .as_ref()
+ .map(|x| x.len() > 64)
+ .unwrap_or(false)
+ {
+ return;
+ }
+
+ status.profile_name = profile_name;
+ status.last_update = Utc::now();
+
+ let user_status = status.clone();
+ // We drop the pair to avoid holding the lock for too long
+ drop(pair);
+
+ let _ = broadcast_friends(
+ user.id,
+ ServerToClientMessage::StatusUpdate {
+ status: user_status,
+ },
+ &pool,
+ &db,
+ None,
+ )
+ .await;
}
}
- _ => {}
+ ClientToServerMessage::SocketListen { .. } => {
+ // TODO: Listen to socket
+ // The code below probably won't need changes, but there's no way to connect to
+ // a tunnel socket yet, so we shouldn't be storing them
+
+ // let Some(active_socket) = db.sockets.get(&socket_id) else {
+ // return;
+ // };
+ // let Vacant(entry) = db.tunnel_sockets.entry(socket) else {
+ // continue;
+ // };
+ // entry.insert(TunnelSocket::new(
+ // socket_id,
+ // TunnelSocketType::Listening,
+ // ));
+ // active_socket.owned_tunnel_sockets.insert(socket);
+ // let _ = broadcast_friends(
+ // user.id,
+ // ServerToClientMessage::FriendSocketListening {
+ // user: user.id,
+ // socket,
+ // },
+ // &pool,
+ // &db,
+ // None,
+ // )
+ // .await;
+ }
+ ClientToServerMessage::SocketClose { socket } => {
+ let Some(active_socket) = db.sockets.get(&socket_id) else {
+ return;
+ };
+ if active_socket
+ .owned_tunnel_sockets
+ .remove(&socket)
+ .is_none()
+ {
+ continue;
+ }
+ let Some((_, tunnel_socket)) =
+ db.tunnel_sockets.remove(&socket)
+ else {
+ continue;
+ };
+ match tunnel_socket.socket_type {
+ TunnelSocketType::Listening => {
+ let _ = broadcast_friends(
+ user.id,
+ ServerToClientMessage::FriendSocketStoppedListening { user: user.id },
+ &pool,
+ &db,
+ None,
+ )
+ .await;
+ }
+ TunnelSocketType::Connected { connected_to } => {
+ let Some((_, other)) =
+ db.tunnel_sockets.remove(&connected_to)
+ else {
+ continue;
+ };
+ let Some(other_user) = db.sockets.get(&other.owner)
+ else {
+ continue;
+ };
+ let _ = send_message(
+ &other_user,
+ &ServerToClientMessage::SocketClosed { socket },
+ )
+ .await;
+ }
+ }
+ }
+ ClientToServerMessage::SocketSend { socket, data } => {
+ let Some(tunnel_socket) = db.tunnel_sockets.get(&socket)
+ else {
+ continue;
+ };
+ if tunnel_socket.owner != socket_id {
+ continue;
+ }
+ let TunnelSocketType::Connected { connected_to } =
+ tunnel_socket.socket_type
+ else {
+ continue;
+ };
+ let Some(other_tunnel) =
+ db.tunnel_sockets.get(&connected_to)
+ else {
+ continue;
+ };
+ let Some(other_user) = db.sockets.get(&other_tunnel.owner)
+ else {
+ continue;
+ };
+ let _ = send_message(
+ &other_user,
+ &ServerToClientMessage::SocketData {
+ socket: connected_to,
+ data,
+ },
+ )
+ .await;
+ }
}
}
- let _ = close_socket(user.id, &pool, &db).await;
+ let _ = close_socket(socket_id, &pool, &db).await;
});
Ok(res)
@@ -207,6 +318,7 @@ pub async fn broadcast_friends(
sockets: &ActiveSockets,
friends: Option>,
) -> Result<(), crate::database::models::DatabaseError> {
+ // FIXME Probably shouldn't be using database errors for this. Maybe ApiError?
let friends = if let Some(friends) = friends {
friends
} else {
@@ -221,11 +333,46 @@ pub async fn broadcast_friends(
};
if friend.accepted {
- if let Some(socket) = sockets.auth_sockets.get(&friend_id.into()) {
- let (_, socket) = socket.value();
+ if let Some(socket_ids) =
+ sockets.sockets_by_user_id.get(&friend_id.into())
+ {
+ for socket_id in socket_ids.iter() {
+ if let Some(socket) = sockets.sockets.get(&socket_id) {
+ let _ = send_message(socket.value(), &message).await;
+ }
+ }
+ }
+ }
+ }
- let _ =
- socket.clone().text(serde_json::to_string(&message)?).await;
+ Ok(())
+}
+
+pub async fn send_message(
+ socket: &ActiveSocket,
+ message: &ServerToClientMessage,
+) -> Result<(), crate::database::models::DatabaseError> {
+ let mut socket = socket.socket.clone();
+
+ // FIXME Probably shouldn't swallow sending errors
+ let _ = match message.serialize() {
+ Ok(Either::Left(text)) => socket.text(text).await,
+ Ok(Either::Right(bytes)) => socket.binary(bytes).await,
+ Err(_) => Ok(()), // TODO: Maybe should log these? Though it is the backend
+ };
+
+ Ok(())
+}
+
+pub async fn send_message_to_user(
+ db: &ActiveSockets,
+ user: UserId,
+ message: &ServerToClientMessage,
+) -> Result<(), crate::database::models::DatabaseError> {
+ if let Some(socket_ids) = db.sockets_by_user_id.get(&user) {
+ for socket_id in socket_ids.iter() {
+ if let Some(socket) = db.sockets.get(&socket_id) {
+ send_message(&socket, message).await?;
}
}
}
@@ -234,21 +381,66 @@ pub async fn broadcast_friends(
}
pub async fn close_socket(
- id: UserId,
+ id: SocketId,
pool: &PgPool,
- sockets: &ActiveSockets,
+ db: &ActiveSockets,
) -> Result<(), crate::database::models::DatabaseError> {
- if let Some((_, (_, socket))) = sockets.auth_sockets.remove(&id) {
- let _ = socket.close(None).await;
+ if let Some((_, socket)) = db.sockets.remove(&id) {
+ let user_id = socket.status.user_id;
+ db.sockets_by_user_id.remove_if(&user_id, |_, sockets| {
+ sockets.remove(&id);
+ sockets.is_empty()
+ });
+
+ let _ = socket.socket.close(None).await;
broadcast_friends(
- id,
- ServerToClientMessage::UserOffline { id },
+ user_id,
+ ServerToClientMessage::UserOffline { id: user_id },
pool,
- sockets,
+ db,
None,
)
.await?;
+
+ for owned_socket in socket.owned_tunnel_sockets {
+ let Some((_, tunnel_socket)) =
+ db.tunnel_sockets.remove(&owned_socket)
+ else {
+ continue;
+ };
+ match tunnel_socket.socket_type {
+ TunnelSocketType::Listening => {
+ let _ = broadcast_friends(
+ user_id,
+ ServerToClientMessage::SocketClosed {
+ socket: owned_socket,
+ },
+ pool,
+ db,
+ None,
+ )
+ .await;
+ }
+ TunnelSocketType::Connected { connected_to } => {
+ let Some((_, other)) =
+ db.tunnel_sockets.remove(&connected_to)
+ else {
+ continue;
+ };
+ let Some(other_user) = db.sockets.get(&other.owner) else {
+ continue;
+ };
+ let _ = send_message(
+ &other_user,
+ &ServerToClientMessage::SocketClosed {
+ socket: connected_to,
+ },
+ )
+ .await;
+ }
+ }
+ }
}
Ok(())
diff --git a/apps/labrinth/src/routes/maven.rs b/apps/labrinth/src/routes/maven.rs
index 193b5b0f9..e53ec6a31 100644
--- a/apps/labrinth/src/routes/maven.rs
+++ b/apps/labrinth/src/routes/maven.rs
@@ -164,7 +164,7 @@ async fn find_version(
pool: &PgPool,
redis: &RedisPool,
) -> Result