Move downloads to queue for better performance (#367)

This commit is contained in:
Geometrically 2022-06-09 12:21:51 -07:00 committed by GitHub
parent 5c4a864680
commit 75614fb13c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 785 additions and 700 deletions

File diff suppressed because it is too large Load Diff

View File

@ -169,8 +169,8 @@ pub struct Project {
pub description: String,
pub body: String,
pub body_url: Option<String>,
pub published: time::OffsetDateTime,
pub updated: time::OffsetDateTime,
pub published: OffsetDateTime,
pub updated: OffsetDateTime,
pub status: StatusId,
pub downloads: i32,
pub follows: i32,
@ -537,30 +537,29 @@ impl Project {
}
pub async fn get_from_slug_or_project_id<'a, 'b, E>(
slug_or_project_id: String,
slug_or_project_id: &str,
executor: E,
) -> Result<Option<Project>, sqlx::error::Error>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
{
let id_option = crate::models::ids::base62_impl::parse_base62(
&*slug_or_project_id.clone(),
)
.ok();
let id_option =
crate::models::ids::base62_impl::parse_base62(slug_or_project_id)
.ok();
if let Some(id) = id_option {
let mut project =
Project::get(ProjectId(id as i64), executor).await?;
if project.is_none() {
project = Project::get_from_slug(&slug_or_project_id, executor)
project = Project::get_from_slug(slug_or_project_id, executor)
.await?;
}
Ok(project)
} else {
let project =
Project::get_from_slug(&slug_or_project_id, executor).await?;
Project::get_from_slug(slug_or_project_id, executor).await?;
Ok(project)
}
@ -612,8 +611,8 @@ impl Project {
m.team_id team_id, m.client_side client_side, m.server_side server_side, m.license license, m.slug slug, m.moderation_message moderation_message, m.moderation_message_body moderation_message_body,
s.status status_name, cs.name client_side_type, ss.name server_side_type, l.short short, l.name license_name, pt.name project_type_name,
STRING_AGG(DISTINCT c.category, ' ~~~~ ') categories, STRING_AGG(DISTINCT v.id::text, ' ~~~~ ') versions,
STRING_AGG(DISTINCT mg.image_url || ' |||| ' || mg.featured || ' |||| ' || COALESCE(mg.title, ' ') || ' |||| ' || COALESCE(mg.description, ' ') || ' |||| ' || mg.created, ' ~~~~ ') gallery,
STRING_AGG(DISTINCT md.joining_platform_id || ' |||| ' || md.url || ' |||| ' || dp.short || ' |||| ' || dp.name, ' ~~~~ ') donations
STRING_AGG(DISTINCT mg.image_url || ' |||| ' || mg.featured || ' |||| ' || mg.created || ' |||| ' || COALESCE(mg.title, ' ') || ' |||| ' || COALESCE(mg.description, ' '), ' ~~~~ ') gallery,
STRING_AGG(DISTINCT md.joining_platform_id || ' |||| ' || dp.short || ' |||| ' || dp.name || ' |||| ' || md.url, ' ~~~~ ') donations
FROM mods m
INNER JOIN project_types pt ON pt.id = m.project_type
INNER JOIN statuses s ON s.id = m.status
@ -685,10 +684,12 @@ impl Project {
if strings.len() >= 3 {
Some(DonationUrl {
project_id: id,
platform_id: DonationPlatformId(strings[0].parse().unwrap_or(0)),
platform_short: strings[2].to_string(),
platform_name: strings[3].to_string(),
url: strings[1].to_string(),
platform_id: DonationPlatformId(
strings[0].parse().unwrap_or(0),
),
platform_short: strings[1].to_string(),
platform_name: strings[2].to_string(),
url: strings[3].to_string(),
})
} else {
None
@ -708,17 +709,21 @@ impl Project {
project_id: id,
image_url: strings[0].to_string(),
featured: strings[1].parse().unwrap_or(false),
title: if strings[2] == " " {
None
} else {
Some(strings[2].to_string())
},
description: if strings[3] == " " {
title: if strings[3] == " " {
None
} else {
Some(strings[3].to_string())
},
created: OffsetDateTime::parse(strings[4], time::Format::Rfc3339).unwrap_or_else(|_| OffsetDateTime::now_utc())
description: if strings[4] == " " {
None
} else {
Some(strings[4].to_string())
},
created: OffsetDateTime::parse(
strings[2],
time::Format::Rfc3339,
)
.unwrap_or_else(|_| OffsetDateTime::now_utc()),
})
} else {
None
@ -726,11 +731,17 @@ impl Project {
})
.flatten()
.collect(),
status: crate::models::projects::ProjectStatus::from_str(&m.status_name),
status: crate::models::projects::ProjectStatus::from_str(
&m.status_name,
),
license_id: m.short,
license_name: m.license_name,
client_side: crate::models::projects::SideType::from_str(&m.client_side_type),
server_side: crate::models::projects::SideType::from_str(&m.server_side_type),
client_side: crate::models::projects::SideType::from_str(
&m.client_side_type,
),
server_side: crate::models::projects::SideType::from_str(
&m.server_side_type,
),
}))
} else {
Ok(None)
@ -746,7 +757,8 @@ impl Project {
{
use futures::TryStreamExt;
let project_ids_parsed: Vec<i64> = project_ids.into_iter().map(|x| x.0).collect();
let project_ids_parsed: Vec<i64> =
project_ids.into_iter().map(|x| x.0).collect();
sqlx::query!(
"
SELECT m.id id, m.project_type project_type, m.title title, m.description description, m.downloads downloads, m.follows follows,
@ -756,8 +768,8 @@ impl Project {
m.team_id team_id, m.client_side client_side, m.server_side server_side, m.license license, m.slug slug, m.moderation_message moderation_message, m.moderation_message_body moderation_message_body,
s.status status_name, cs.name client_side_type, ss.name server_side_type, l.short short, l.name license_name, pt.name project_type_name,
STRING_AGG(DISTINCT c.category, ' ~~~~ ') categories, STRING_AGG(DISTINCT v.id::text, ' ~~~~ ') versions,
STRING_AGG(DISTINCT mg.image_url || ' |||| ' || mg.featured || ' |||| ' || COALESCE(mg.title, ' ') || ' |||| ' || COALESCE(mg.description, ' ') || ' |||| ' || mg.created, ' ~~~~ ') gallery,
STRING_AGG(DISTINCT md.joining_platform_id || ' |||| ' || md.url || ' |||| ' || dp.short || ' |||| ' || dp.name, ' ~~~~ ') donations
STRING_AGG(DISTINCT mg.image_url || ' |||| ' || mg.featured || ' |||| ' || mg.created || ' |||| ' || COALESCE(mg.title, ' ') || ' |||| ' || COALESCE(mg.description, ' '), ' ~~~~ ') gallery,
STRING_AGG(DISTINCT md.joining_platform_id || ' |||| ' || dp.short || ' |||| ' || dp.name || ' |||| ' || md.url, ' ~~~~ ') donations
FROM mods m
INNER JOIN project_types pt ON pt.id = m.project_type
INNER JOIN statuses s ON s.id = m.status
@ -821,9 +833,9 @@ impl Project {
project_id: ProjectId(id),
image_url: strings[0].to_string(),
featured: strings[1].parse().unwrap_or(false),
title: if strings[2] == " " { None } else { Some(strings[2].to_string()) },
description: if strings[3] == " " { None } else { Some(strings[3].to_string()) },
created: OffsetDateTime::parse(strings[4], time::Format::Rfc3339).unwrap_or_else(|_| OffsetDateTime::now_utc())
title: if strings[3] == " " { None } else { Some(strings[3].to_string()) },
description: if strings[4] == " " { None } else { Some(strings[4].to_string()) },
created: OffsetDateTime::parse(strings[2], time::Format::Rfc3339).unwrap_or_else(|_| OffsetDateTime::now_utc())
})
} else {
None
@ -842,9 +854,9 @@ impl Project {
Some(DonationUrl {
project_id: ProjectId(id),
platform_id: DonationPlatformId(strings[0].parse().unwrap_or(0)),
platform_short: strings[2].to_string(),
platform_name: strings[3].to_string(),
url: strings[1].to_string(),
platform_short: strings[1].to_string(),
platform_name: strings[2].to_string(),
url: strings[3].to_string(),
})
} else {
None

View File

@ -616,9 +616,9 @@ impl Version {
v.changelog changelog, v.changelog_url changelog_url, v.date_published date_published, v.downloads downloads,
v.version_type version_type, v.featured featured,
STRING_AGG(DISTINCT gv.version, ' ~~~~ ') game_versions, STRING_AGG(DISTINCT l.loader, ' ~~~~ ') loaders,
STRING_AGG(DISTINCT f.id || ' |||| ' || f.filename || ' |||| ' || f.is_primary || ' |||| ' || f.size || ' |||| ' || f.url, ' ~~~~ ') files,
STRING_AGG(DISTINCT f.id || ' |||| ' || f.is_primary || ' |||| ' || f.size || ' |||| ' || f.url || ' |||| ' || f.filename, ' ~~~~ ') files,
STRING_AGG(DISTINCT h.algorithm || ' |||| ' || encode(h.hash, 'escape') || ' |||| ' || h.file_id, ' ~~~~ ') hashes,
STRING_AGG(DISTINCT COALESCE(d.dependency_id, 0) || ' |||| ' || COALESCE(d.mod_dependency_id, 0) || ' |||| ' || COALESCE(d.dependency_file_name, ' ') || ' |||| ' || d.dependency_type, ' ~~~~ ') dependencies
STRING_AGG(DISTINCT COALESCE(d.dependency_id, 0) || ' |||| ' || COALESCE(d.mod_dependency_id, 0) || ' |||| ' || d.dependency_type || ' |||| ' || COALESCE(d.dependency_file_name, ' '), ' ~~~~ ') dependencies
FROM versions v
LEFT OUTER JOIN game_versions_versions gvv on v.id = gvv.joining_version_id
LEFT OUTER JOIN game_versions gv on gvv.game_version_id = gv.id
@ -679,17 +679,18 @@ impl Version {
for hash in &hashes {
if (hash.0).0 == file_id.0 {
file_hashes.insert(hash.1.clone(), hash.2.clone());
file_hashes
.insert(hash.1.clone(), hash.2.clone());
}
}
Some(QueryFile {
id: file_id,
url: file[4].to_string(),
filename: file[1].to_string(),
url: file[3].to_string(),
filename: file[4].to_string(),
hashes: file_hashes,
primary: file[2].parse().unwrap_or(false),
size: file[3].parse().unwrap_or(0)
primary: file[1].parse().unwrap_or(false),
size: file[2].parse().unwrap_or(0),
})
} else {
None
@ -733,8 +734,12 @@ impl Version {
Err(_) => None,
},
},
file_name: if dependency[2] == " " { None } else { Some(dependency[2].to_string())},
dependency_type: dependency[3].to_string(),
file_name: if dependency[3] == " " {
None
} else {
Some(dependency[3].to_string())
},
dependency_type: dependency[2].to_string(),
})
} else {
None
@ -758,16 +763,17 @@ impl Version {
{
use futures::stream::TryStreamExt;
let version_ids_parsed: Vec<i64> = version_ids.into_iter().map(|x| x.0).collect();
let version_ids_parsed: Vec<i64> =
version_ids.into_iter().map(|x| x.0).collect();
sqlx::query!(
"
SELECT v.id id, v.mod_id mod_id, v.author_id author_id, v.name version_name, v.version_number version_number,
v.changelog changelog, v.changelog_url changelog_url, v.date_published date_published, v.downloads downloads,
v.version_type version_type, v.featured featured,
STRING_AGG(DISTINCT gv.version, ' ~~~~ ') game_versions, STRING_AGG(DISTINCT l.loader, ' ~~~~ ') loaders,
STRING_AGG(DISTINCT f.id || ' |||| ' || f.filename || ' |||| ' || f.is_primary || ' |||| ' || f.size || ' |||| ' || f.url, ' ~~~~ ') files,
STRING_AGG(DISTINCT f.id || ' |||| ' || f.is_primary || ' |||| ' || f.size || ' |||| ' || f.url || ' |||| ' || f.filename, ' ~~~~ ') files,
STRING_AGG(DISTINCT h.algorithm || ' |||| ' || encode(h.hash, 'escape') || ' |||| ' || h.file_id, ' ~~~~ ') hashes,
STRING_AGG(DISTINCT COALESCE(d.dependency_id, 0) || ' |||| ' || COALESCE(d.mod_dependency_id, 0) || ' |||| ' || COALESCE(d.dependency_file_name, ' ') || ' |||| ' || d.dependency_type, ' ~~~~ ') dependencies
STRING_AGG(DISTINCT COALESCE(d.dependency_id, 0) || ' |||| ' || COALESCE(d.mod_dependency_id, 0) || ' |||| ' || d.dependency_type || ' |||| ' || COALESCE(d.dependency_file_name, ' '), ' ~~~~ ') dependencies
FROM versions v
LEFT OUTER JOIN game_versions_versions gvv on v.id = gvv.joining_version_id
LEFT OUTER JOIN game_versions gv on gvv.game_version_id = gv.id
@ -824,11 +830,11 @@ impl Version {
Some(QueryFile {
id: file_id,
url: file[4].to_string(),
filename: file[1].to_string(),
url: file[3].to_string(),
filename: file[4].to_string(),
hashes: file_hashes,
primary: file[2].parse().unwrap_or(false),
size: file[3].parse().unwrap_or(0)
primary: file[1].parse().unwrap_or(false),
size: file[2].parse().unwrap_or(0),
})
} else {
None
@ -859,8 +865,12 @@ impl Version {
Err(_) => None,
},
},
file_name: if dependency[2] == " " { None } else { Some(dependency[2].to_string())},
dependency_type: dependency[3].to_string(),
file_name: if dependency[3] == " " {
None
} else {
Some(dependency[3].to_string())
},
dependency_type: dependency[2].to_string(),
})
} else {
None

View File

@ -1,4 +1,5 @@
use crate::file_hosting::S3Host;
use crate::queue::download::DownloadQueue;
use crate::ratelimit::errors::ARError;
use crate::ratelimit::memory::{MemoryStore, MemoryStoreActor};
use crate::ratelimit::middleware::RateLimiter;
@ -16,6 +17,7 @@ mod database;
mod file_hosting;
mod health;
mod models;
mod queue;
mod ratelimit;
mod routes;
mod scheduler;
@ -183,11 +185,27 @@ async fn main() -> std::io::Result<()> {
scheduler::schedule_versions(&mut scheduler, pool.clone(), skip_initial);
let download_queue = Arc::new(DownloadQueue::new());
let pool_ref = pool.clone();
let download_queue_ref = download_queue.clone();
scheduler.run(std::time::Duration::from_secs(30), move || {
let pool_ref = pool_ref.clone();
let download_queue_ref = download_queue_ref.clone();
async move {
info!("Indexing download queue");
let result = download_queue_ref.index(&pool_ref).await;
if let Err(e) = result {
warn!("Indexing download queue failed: {:?}", e);
}
info!("Done indexing download queue");
}
});
let ip_salt = Pepper {
pepper: crate::models::ids::Base62Id(
crate::models::ids::random_base62(11),
)
.to_string(),
pepper: models::ids::Base62Id(models::ids::random_base62(11))
.to_string(),
};
let store = MemoryStore::new();
@ -243,6 +261,7 @@ async fn main() -> std::io::Result<()> {
.app_data(web::Data::new(pool.clone()))
.app_data(web::Data::new(file_host.clone()))
.app_data(web::Data::new(search_config.clone()))
.app_data(web::Data::new(download_queue.clone()))
.app_data(web::Data::new(ip_salt.clone()))
.configure(routes::v1_config)
.configure(routes::v2_config)

58
src/queue/download.rs Normal file
View File

@ -0,0 +1,58 @@
use crate::database::models::{DatabaseError, ProjectId, VersionId};
use sqlx::PgPool;
use tokio::sync::Mutex;
pub struct DownloadQueue {
queue: Mutex<Vec<(ProjectId, VersionId)>>,
}
// Batches download transactions every thirty seconds
impl DownloadQueue {
pub fn new() -> Self {
DownloadQueue {
queue: Mutex::new(Vec::with_capacity(1000)),
}
}
pub async fn add(&self, project_id: ProjectId, version_id: VersionId) {
self.queue.lock().await.push((project_id, version_id));
}
pub async fn take(&self) -> Vec<(ProjectId, VersionId)> {
let mut queue = self.queue.lock().await;
let len = queue.len();
std::mem::replace(&mut queue, Vec::with_capacity(len))
}
pub async fn index(&self, pool: &PgPool) -> Result<(), DatabaseError> {
let queue = self.take().await;
if queue.len() > 0 {
let mut transaction = pool.begin().await?;
for (project_id, version_id) in queue {
sqlx::query!(
"UPDATE versions
SET downloads = downloads + 1
WHERE (id = $1)",
version_id as VersionId
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
"UPDATE mods
SET downloads = downloads + 1
WHERE (id = $1)",
project_id as ProjectId
)
.execute(&mut *transaction)
.await?;
}
transaction.commit().await?;
}
Ok(())
}
}

1
src/queue/mod.rs Normal file
View File

@ -0,0 +1 @@
pub mod download;

View File

@ -1,9 +1,11 @@
use crate::models::ids::ProjectId;
use crate::routes::ApiError;
use crate::util::guards::admin_key_guard;
use crate::DownloadQueue;
use actix_web::{patch, web, HttpResponse};
use serde::Deserialize;
use sqlx::PgPool;
use std::sync::Arc;
#[derive(Deserialize)]
pub struct DownloadBody {
@ -17,6 +19,7 @@ pub struct DownloadBody {
pub async fn count_download(
pool: web::Data<PgPool>,
download_body: web::Json<DownloadBody>,
download_queue: web::Data<Arc<DownloadQueue>>,
) -> Result<HttpResponse, ApiError> {
let project_id: crate::database::models::ids::ProjectId =
download_body.hash.into();
@ -49,27 +52,12 @@ pub async fn count_download(
));
};
let mut transaction = pool.begin().await?;
sqlx::query!(
"UPDATE versions
SET downloads = downloads + 1
WHERE (id = $1)",
version_id
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
"UPDATE mods
SET downloads = downloads + 1
WHERE (id = $1)",
project_id
)
.execute(&mut *transaction)
.await?;
transaction.commit().await?;
download_queue
.add(
crate::database::models::ProjectId(project_id),
crate::database::models::VersionId(version_id),
)
.await;
Ok(HttpResponse::Ok().body(""))
}

View File

@ -38,11 +38,10 @@ pub async fn projects_get(
web::Query(ids): web::Query<ProjectIds>,
pool: web::Data<PgPool>,
) -> Result<HttpResponse, ApiError> {
let project_ids =
serde_json::from_str::<Vec<ProjectId>>(&*ids.ids)?
.into_iter()
.map(|x| x.into())
.collect();
let project_ids = serde_json::from_str::<Vec<ProjectId>>(&*ids.ids)?
.into_iter()
.map(|x| x.into())
.collect();
let projects_data =
database::models::Project::get_many_full(project_ids, &**pool).await?;
@ -871,8 +870,7 @@ pub async fn project_icon_edit(
let project_item =
database::models::Project::get_from_slug_or_project_id(
string.clone(),
&**pool,
&string, &**pool,
)
.await?
.ok_or_else(|| {
@ -963,8 +961,7 @@ pub async fn delete_project_icon(
let string = info.into_inner().0;
let project_item = database::models::Project::get_from_slug_or_project_id(
string.clone(),
&**pool,
&string, &**pool,
)
.await?
.ok_or_else(|| {
@ -1053,8 +1050,7 @@ pub async fn add_gallery_item(
let project_item =
database::models::Project::get_from_slug_or_project_id(
string.clone(),
&**pool,
&string, &**pool,
)
.await?
.ok_or_else(|| {
@ -1173,8 +1169,7 @@ pub async fn edit_gallery_item(
})?;
let project_item = database::models::Project::get_from_slug_or_project_id(
string.clone(),
&**pool,
&string, &**pool,
)
.await?
.ok_or_else(|| {
@ -1301,8 +1296,7 @@ pub async fn delete_gallery_item(
let string = info.into_inner().0;
let project_item = database::models::Project::get_from_slug_or_project_id(
string.clone(),
&**pool,
&string, &**pool,
)
.await?
.ok_or_else(|| {
@ -1385,8 +1379,7 @@ pub async fn project_delete(
let string = info.into_inner().0;
let project = database::models::Project::get_from_slug_or_project_id(
string.clone(),
&**pool,
&string, &**pool,
)
.await?
.ok_or_else(|| {
@ -1446,14 +1439,15 @@ pub async fn project_follow(
let user = get_user_from_headers(req.headers(), &**pool).await?;
let string = info.into_inner().0;
let result =
database::models::Project::get_from_slug_or_project_id(string, &**pool)
.await?
.ok_or_else(|| {
ApiError::InvalidInput(
"The specified project does not exist!".to_string(),
)
})?;
let result = database::models::Project::get_from_slug_or_project_id(
&string, &**pool,
)
.await?
.ok_or_else(|| {
ApiError::InvalidInput(
"The specified project does not exist!".to_string(),
)
})?;
let user_id: database::models::ids::UserId = user.id.into();
let project_id: database::models::ids::ProjectId = result.id;
@ -1514,14 +1508,15 @@ pub async fn project_unfollow(
let user = get_user_from_headers(req.headers(), &**pool).await?;
let string = info.into_inner().0;
let result =
database::models::Project::get_from_slug_or_project_id(string, &**pool)
.await?
.ok_or_else(|| {
ApiError::InvalidInput(
"The specified project does not exist!".to_string(),
)
})?;
let result = database::models::Project::get_from_slug_or_project_id(
&string, &**pool,
)
.await?
.ok_or_else(|| {
ApiError::InvalidInput(
"The specified project does not exist!".to_string(),
)
})?;
let user_id: database::models::ids::UserId = user.id.into();
let project_id = result.id;

View File

@ -20,7 +20,7 @@ pub async fn team_members_get_project(
let string = info.into_inner().0;
let project_data =
crate::database::models::Project::get_from_slug_or_project_id(
string, &**pool,
&string, &**pool,
)
.await?;

View File

@ -63,9 +63,10 @@ pub async fn version_list(
) -> Result<HttpResponse, ApiError> {
let string = info.into_inner().0;
let result =
database::models::Project::get_from_slug_or_project_id(string, &**pool)
.await?;
let result = database::models::Project::get_from_slug_or_project_id(
&string, &**pool,
)
.await?;
if let Some(project) = result {
let id = project.id;
@ -161,11 +162,10 @@ pub async fn versions_get(
ids: web::Query<VersionIds>,
pool: web::Data<PgPool>,
) -> Result<HttpResponse, ApiError> {
let version_ids =
serde_json::from_str::<Vec<models::ids::VersionId>>(&*ids.ids)?
.into_iter()
.map(|x| x.into())
.collect();
let version_ids = serde_json::from_str::<Vec<VersionId>>(&*ids.ids)?
.into_iter()
.map(|x| x.into())
.collect();
let versions_data =
database::models::Version::get_many_full(version_ids, &**pool).await?;
@ -180,7 +180,7 @@ pub async fn versions_get(
#[get("{version_id}")]
pub async fn version_get(
info: web::Path<(models::ids::VersionId,)>,
info: web::Path<(VersionId,)>,
pool: web::Data<PgPool>,
) -> Result<HttpResponse, ApiError> {
let id = info.into_inner().0;

View File

@ -756,12 +756,14 @@ pub async fn upload_file(
}
for file in files {
dependencies.push(DependencyBuilder {
project_id: None,
version_id: None,
file_name: Some(file.to_string()),
dependency_type: DependencyType::Required.to_string(),
});
if !file.is_empty() {
dependencies.push(DependencyBuilder {
project_id: None,
version_id: None,
file_name: Some(file.to_string()),
dependency_type: DependencyType::Required.to_string(),
});
}
}
}
}

View File

@ -135,7 +135,7 @@ pub async fn is_authorized(
if user.role.is_mod() {
authorized = true;
} else {
let user_id: database::models::ids::UserId = user.id.into();
let user_id: models::ids::UserId = user.id.into();
let project_exists = sqlx::query!(
"SELECT EXISTS(SELECT 1 FROM team_members WHERE team_id = $1 AND user_id = $2)",