From 35cd277fcfd98a16faf312e0324594f6b28c4e4d Mon Sep 17 00:00:00 2001 From: Wyatt Verchere Date: Wed, 13 Sep 2023 16:35:47 -0700 Subject: [PATCH] analytics (#695) * playtime * other routes, improvements * fmt clippy * revs --------- Co-authored-by: Geometrically <18202329+Geometrically@users.noreply.github.com> --- .env | 2 +- src/clickhouse/fetch.rs | 189 +++++++++++++++ src/clickhouse/mod.rs | 4 + src/models/analytics.rs | 10 +- src/models/projects.rs | 4 +- src/routes/analytics.rs | 8 +- src/routes/v2/analytics_get.rs | 423 +++++++++++++++++++++++++++++++++ src/routes/v2/mod.rs | 2 + 8 files changed, 630 insertions(+), 12 deletions(-) create mode 100644 src/clickhouse/fetch.rs create mode 100644 src/routes/v2/analytics_get.rs diff --git a/.env b/.env index 01e72f088..e89b7b9d7 100644 --- a/.env +++ b/.env @@ -84,7 +84,7 @@ BEEHIIV_API_KEY=none ANALYTICS_ALLOWED_ORIGINS='["http://127.0.0.1:3000", "http://localhost:3000", "https://modrinth.com", "https://www.modrinth.com", "*"]' -CLICKHOUSE_URL=http:/localhost:8123 +CLICKHOUSE_URL=http://localhost:8123 CLICKHOUSE_USER=default CLICKHOUSE_PASSWORD= CLICKHOUSE_DATABASE=staging_ariadne diff --git a/src/clickhouse/fetch.rs b/src/clickhouse/fetch.rs new file mode 100644 index 000000000..bee6fa122 --- /dev/null +++ b/src/clickhouse/fetch.rs @@ -0,0 +1,189 @@ +use std::sync::Arc; + +use crate::{ + models::ids::{ProjectId, VersionId}, + routes::ApiError, +}; +use chrono::NaiveDate; +use serde::{Deserialize, Serialize}; + +#[derive(clickhouse::Row, Serialize, Deserialize, Clone, Debug)] +pub struct ReturnPlaytimes { + pub time: u64, + pub id: u64, + pub total_seconds: u64, +} + +#[derive(clickhouse::Row, Serialize, Deserialize, Clone, Debug)] +pub struct ReturnCountry { + pub country: String, + pub id: u64, + pub total_views: u64, + pub total_downloads: u64, +} + +#[derive(clickhouse::Row, Serialize, Deserialize, Clone, Debug)] +pub struct ReturnViews { + pub time: u64, + pub id: u64, + pub total_views: u64, +} + +// Only one of project_id or version_id should be used +// Fetches playtimes as a Vec of ReturnPlaytimes +pub async fn fetch_playtimes( + projects: Option>, + versions: Option>, + start_date: NaiveDate, + end_date: NaiveDate, + resolution_minute: u32, + client: Arc, +) -> Result, ApiError> { + let project_or_version = if projects.is_some() && versions.is_none() { + "project_id" + } else if versions.is_some() { + "version_id" + } else { + return Err(ApiError::InvalidInput( + "Only one of 'project_id' or 'version_id' should be used.".to_string(), + )); + }; + + let mut query = client + .query(&format!( + " + SELECT + toYYYYMMDDhhmmss(toStartOfInterval(recorded, toIntervalMinute(?)) AS time), + {project_or_version}, + SUM(seconds) AS total_seconds + FROM playtime + WHERE time >= toDate(?) AND time <= toDate(?) + AND {project_or_version} IN ? + GROUP BY + time, + project_id, + {project_or_version} + " + )) + .bind(resolution_minute) + .bind(start_date) + .bind(end_date); + + if projects.is_some() { + query = query.bind(projects.unwrap().iter().map(|x| x.0).collect::>()); + } else if versions.is_some() { + query = query.bind(versions.unwrap().iter().map(|x| x.0).collect::>()); + } + + Ok(query.fetch_all().await?) +} + +// Fetches views as a Vec of ReturnViews +pub async fn fetch_views( + projects: Option>, + versions: Option>, + start_date: NaiveDate, + end_date: NaiveDate, + resolution_minutes: u32, + client: Arc, +) -> Result, ApiError> { + let project_or_version = if projects.is_some() && versions.is_none() { + "project_id" + } else if versions.is_some() { + "version_id" + } else { + return Err(ApiError::InvalidInput( + "Only one of 'project_id' or 'version_id' should be used.".to_string(), + )); + }; + + let mut query = client + .query(&format!( + " + SELECT + toYYYYMMDDhhmmss((toStartOfInterval(recorded, toIntervalMinute(?)) AS time)), + {project_or_version}, + count(id) AS total_views + FROM views + WHERE time >= toDate(?) AND time <= toDate(?) + AND {project_or_version} IN ? + GROUP BY + time, + {project_or_version} + " + )) + .bind(resolution_minutes) + .bind(start_date) + .bind(end_date); + + if projects.is_some() { + query = query.bind(projects.unwrap().iter().map(|x| x.0).collect::>()); + } else if versions.is_some() { + query = query.bind(versions.unwrap().iter().map(|x| x.0).collect::>()); + } + + Ok(query.fetch_all().await?) +} + +// Fetches countries as a Vec of ReturnCountry +pub async fn fetch_countries( + projects: Option>, + versions: Option>, + start_date: NaiveDate, + end_date: NaiveDate, + client: Arc, +) -> Result, ApiError> { + let project_or_version = if projects.is_some() && versions.is_none() { + "project_id" + } else if versions.is_some() { + "version_id" + } else { + return Err(ApiError::InvalidInput( + "Only one of 'project_id' or 'version_id' should be used.".to_string(), + )); + }; + + let mut query = client.query(&format!( + " + WITH view_grouping AS ( + SELECT + country, + {project_or_version}, + count(id) AS total_views + FROM views + WHERE toYYYYMMDDhhmmss(recorded) >= toYYYYMMDDhhmmss(toDate(?)) AND toYYYYMMDDhhmmss(recorded) <= toYYYYMMDDhhmmss(toDate(?)) + GROUP BY + country, + {project_or_version} + ), + download_grouping AS ( + SELECT + country, + {project_or_version}, + count(id) AS total_downloads + FROM downloads + WHERE toYYYYMMDDhhmmss(recorded) >= toYYYYMMDDhhmmss(toDate(?)) AND toYYYYMMDDhhmmss(recorded) <= toYYYYMMDDhhmmss(toDate(?)) + GROUP BY + country, + {project_or_version} + ) + + SELECT + v.country, + v.{project_or_version}, + v.total_views, + d.total_downloads + FROM view_grouping AS v + LEFT JOIN download_grouping AS d ON (v.country = d.country) AND (v.{project_or_version} = d.{project_or_version}) + WHERE {project_or_version} IN ? + " + )).bind(start_date).bind(end_date).bind(start_date).bind(end_date); + + if projects.is_some() { + query = query.bind(projects.unwrap().iter().map(|x| x.0).collect::>()); + } else if versions.is_some() { + query = query.bind(versions.unwrap().iter().map(|x| x.0).collect::>()); + } + + Ok(query.fetch_all().await?) +} diff --git a/src/clickhouse/mod.rs b/src/clickhouse/mod.rs index 1ac9480f8..097224622 100644 --- a/src/clickhouse/mod.rs +++ b/src/clickhouse/mod.rs @@ -1,6 +1,10 @@ use hyper::client::HttpConnector; use hyper_tls::{native_tls, HttpsConnector}; +mod fetch; + +pub use fetch::*; + pub async fn init_client() -> clickhouse::error::Result { let database = dotenvy::var("CLICKHOUSE_DATABASE").unwrap(); diff --git a/src/models/analytics.rs b/src/models/analytics.rs index a401c4f5b..ea20da6b4 100644 --- a/src/models/analytics.rs +++ b/src/models/analytics.rs @@ -1,10 +1,10 @@ use clickhouse::Row; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use std::hash::{Hash, Hasher}; use std::net::Ipv6Addr; use uuid::Uuid; -#[derive(Row, Serialize, Clone)] +#[derive(Row, Serialize, Deserialize, Clone)] pub struct Download { #[serde(with = "uuid::serde::compact")] pub id: Uuid, @@ -41,7 +41,7 @@ impl Hash for Download { } } -#[derive(Row, Serialize, Clone)] +#[derive(Row, Serialize, Deserialize, Clone)] pub struct PageView { #[serde(with = "uuid::serde::compact")] pub id: Uuid, @@ -76,12 +76,12 @@ impl Hash for PageView { } } -#[derive(Row, Serialize, Clone)] +#[derive(Row, Serialize, Deserialize, Clone, Debug)] pub struct Playtime { #[serde(with = "uuid::serde::compact")] pub id: Uuid, pub recorded: i64, - pub seconds: u16, + pub seconds: u64, // Modrinth User ID for logged in users (unused atm) pub user_id: u64, diff --git a/src/models/projects.rs b/src/models/projects.rs index 9aaf2dea5..35f38fb06 100644 --- a/src/models/projects.rs +++ b/src/models/projects.rs @@ -9,13 +9,13 @@ use serde::{Deserialize, Serialize}; use validator::Validate; /// The ID of a specific project, encoded as base62 for usage in the API -#[derive(Copy, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Copy, Clone, PartialEq, Eq, Serialize, Deserialize, Debug)] #[serde(from = "Base62Id")] #[serde(into = "Base62Id")] pub struct ProjectId(pub u64); /// The ID of a specific version of a project -#[derive(Copy, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)] +#[derive(Copy, Clone, PartialEq, Eq, Serialize, Deserialize, Hash, Debug)] #[serde(from = "Base62Id")] #[serde(into = "Base62Id")] pub struct VersionId(pub u64); diff --git a/src/routes/analytics.rs b/src/routes/analytics.rs index d44ff7011..042032910 100644 --- a/src/routes/analytics.rs +++ b/src/routes/analytics.rs @@ -154,7 +154,7 @@ pub async fn page_view_ingest( Ok(HttpResponse::NoContent().body("")) } -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] pub struct PlaytimeInput { seconds: u16, loader: String, @@ -205,10 +205,10 @@ pub async fn playtime_ingest( .add_playtime(Playtime { id: Default::default(), recorded: Utc::now().timestamp_nanos() / 100_000, - seconds: playtime.seconds, + seconds: playtime.seconds as u64, user_id: user.id.0, - project_id: version.inner.id.0 as u64, - version_id: version.inner.project_id.0 as u64, + project_id: version.inner.project_id.0 as u64, + version_id: version.inner.id.0 as u64, loader: playtime.loader, game_version: playtime.game_version, parent: playtime.parent.map(|x| x.0).unwrap_or(0), diff --git a/src/routes/v2/analytics_get.rs b/src/routes/v2/analytics_get.rs new file mode 100644 index 000000000..f1bd34b29 --- /dev/null +++ b/src/routes/v2/analytics_get.rs @@ -0,0 +1,423 @@ +use actix_web::{get, web, HttpRequest, HttpResponse}; +use chrono::{Duration, NaiveDate, Utc}; +use serde::{Deserialize, Serialize}; +use sqlx::PgPool; +use std::collections::HashMap; + +use crate::{ + auth::{filter_authorized_projects, filter_authorized_versions, get_user_from_headers}, + database::models::{project_item, user_item, version_item}, + models::{ + ids::{ + base62_impl::{parse_base62, to_base62}, + ProjectId, VersionId, + }, + pats::Scopes, + }, + queue::session::AuthQueue, +}; + +use super::ApiError; + +pub fn config(cfg: &mut web::ServiceConfig) { + cfg.service( + web::scope("analytics") + .service(playtimes_get) + .service(views_get) + .service(countries_downloads_get) + .service(countries_views_get), + ); +} + +/// The json data to be passed to fetch analytic data +/// Either a list of project_ids or version_ids can be used, but not both. Unauthorized projects/versions will be filtered out. +/// start_date and end_date are optional, and default to two weeks ago, and the maximum date respectively. +/// resolution_minutes is optional. This refers to the window by which we are looking (every day, every minute, etc) and defaults to 1440 (1 day) +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct GetData { + // only one of project_ids or version_ids should be used + // if neither are provided, all projects the user has access to will be used + pub project_ids: Option>, + pub version_ids: Option>, + + pub start_date: Option, // defaults to 2 weeks ago + pub end_date: Option, // defaults to now + + pub resolution_minutes: Option, // defaults to 1 day. Ignored in routes that do not aggregate over a resolution (eg: /countries) +} + +/// Get playtime data for a set of projects or versions +/// Data is returned as a hashmap of project/version ids to a hashmap of days to playtime data +/// eg: +/// { +/// "4N1tEhnO": { +/// "20230824": 23 +/// } +///} +/// Either a list of project_ids or version_ids can be used, but not both. Unauthorized projects/versions will be filtered out. +#[derive(Serialize, Deserialize, Clone)] +pub struct FetchedPlaytime { + pub time: u64, + pub total_seconds: u64, + pub loader_seconds: HashMap, + pub game_version_seconds: HashMap, + pub parent_seconds: HashMap, +} +#[get("playtime")] +pub async fn playtimes_get( + req: HttpRequest, + clickhouse: web::Data, + data: web::Json, + session_queue: web::Data, + pool: web::Data, + redis: web::Data, +) -> Result { + let user_option = get_user_from_headers( + &req, + &**pool, + &redis, + &session_queue, + Some(&[Scopes::ANALYTICS]), + ) + .await + .map(|x| x.1) + .ok(); + + let project_ids = data.project_ids.clone(); + let version_ids = data.version_ids.clone(); + + if project_ids.is_some() && version_ids.is_some() { + return Err(ApiError::InvalidInput( + "Only one of 'project_ids' or 'version_ids' should be used.".to_string(), + )); + } + + let start_date = data + .start_date + .unwrap_or(Utc::now().naive_utc().date() - Duration::weeks(2)); + let end_date = data.end_date.unwrap_or(Utc::now().naive_utc().date()); + let resolution_minutes = data.resolution_minutes.unwrap_or(60 * 24); + + // Convert String list to list of ProjectIds or VersionIds + // - Filter out unauthorized projects/versions + // - If no project_ids or version_ids are provided, we default to all projects the user has access to + let (project_ids, version_ids) = + filter_allowed_ids(project_ids, version_ids, user_option, &pool, &redis).await?; + + // Get the views + let playtimes = crate::clickhouse::fetch_playtimes( + project_ids, + version_ids, + start_date, + end_date, + resolution_minutes, + clickhouse.into_inner(), + ) + .await?; + + let mut hm = HashMap::new(); + for playtime in playtimes { + let id_string = to_base62(playtime.id); + if !hm.contains_key(&id_string) { + hm.insert(id_string.clone(), HashMap::new()); + } + if let Some(hm) = hm.get_mut(&id_string) { + hm.insert(playtime.time.to_string(), playtime.total_seconds); + } + } + + Ok(HttpResponse::Ok().json(hm)) +} + +/// Get view data for a set of projects or versions +/// Data is returned as a hashmap of project/version ids to a hashmap of days to views +/// eg: +/// { +/// "4N1tEhnO": { +/// "20230824": 1090 +/// } +///} +/// Either a list of project_ids or version_ids can be used, but not both. Unauthorized projects/versions will be filtered out. +#[get("views")] +pub async fn views_get( + req: HttpRequest, + clickhouse: web::Data, + data: web::Json, + session_queue: web::Data, + pool: web::Data, + redis: web::Data, +) -> Result { + let user_option = get_user_from_headers( + &req, + &**pool, + &redis, + &session_queue, + Some(&[Scopes::ANALYTICS]), + ) + .await + .map(|x| x.1) + .ok(); + + let project_ids = data.project_ids.clone(); + let version_ids = data.version_ids.clone(); + + if project_ids.is_some() && version_ids.is_some() { + return Err(ApiError::InvalidInput( + "Only one of 'project_ids' or 'version_ids' should be used.".to_string(), + )); + } + + let start_date = data + .start_date + .unwrap_or(Utc::now().naive_utc().date() - Duration::weeks(2)); + let end_date = data.end_date.unwrap_or(Utc::now().naive_utc().date()); + let resolution_minutes = data.resolution_minutes.unwrap_or(60 * 24); + + // Convert String list to list of ProjectIds or VersionIds + // - Filter out unauthorized projects/versions + // - If no project_ids or version_ids are provided, we default to all projects the user has access to + let (project_ids, version_ids) = + filter_allowed_ids(project_ids, version_ids, user_option, &pool, &redis).await?; + + // Get the views + let views = crate::clickhouse::fetch_views( + project_ids, + version_ids, + start_date, + end_date, + resolution_minutes, + clickhouse.into_inner(), + ) + .await?; + + let mut hm = HashMap::new(); + for views in views { + let id_string = to_base62(views.id); + if !hm.contains_key(&id_string) { + hm.insert(id_string.clone(), HashMap::new()); + } + if let Some(hm) = hm.get_mut(&id_string) { + hm.insert(views.time.to_string(), views.total_views); + } + } + + Ok(HttpResponse::Ok().json(hm)) +} + +/// Get country data for a set of projects or versions +/// Data is returned as a hashmap of project/version ids to a hashmap of coutnry to downloads. +/// Unknown countries are labeled "". +/// This is usuable to see significant performing countries per project +/// eg: +/// { +/// "4N1tEhnO": { +/// "CAN": 22 +/// } +///} +/// Either a list of project_ids or version_ids can be used, but not both. Unauthorized projects/versions will be filtered out. +/// For this endpoint, provided dates are a range to aggregate over, not specific days to fetch +#[get("countries/downloads")] +pub async fn countries_downloads_get( + req: HttpRequest, + clickhouse: web::Data, + data: web::Json, + session_queue: web::Data, + pool: web::Data, + redis: web::Data, +) -> Result { + let user_option = get_user_from_headers( + &req, + &**pool, + &redis, + &session_queue, + Some(&[Scopes::ANALYTICS]), + ) + .await + .map(|x| x.1) + .ok(); + + let project_ids = data.project_ids.clone(); + let version_ids = data.version_ids.clone(); + + if project_ids.is_some() && version_ids.is_some() { + return Err(ApiError::InvalidInput( + "Only one of 'project_ids' or 'version_ids' should be used.".to_string(), + )); + } + + let start_date = data + .start_date + .unwrap_or(Utc::now().naive_utc().date() - Duration::weeks(2)); + let end_date = data.end_date.unwrap_or(Utc::now().naive_utc().date()); + + // Convert String list to list of ProjectIds or VersionIds + // - Filter out unauthorized projects/versions + // - If no project_ids or version_ids are provided, we default to all projects the user has access to + let (project_ids, version_ids) = + filter_allowed_ids(project_ids, version_ids, user_option, &pool, &redis).await?; + + // Get the countries + let countries = crate::clickhouse::fetch_countries( + project_ids, + version_ids, + start_date, + end_date, + clickhouse.into_inner(), + ) + .await?; + + let mut hm = HashMap::new(); + for views in countries { + let id_string = to_base62(views.id); + if !hm.contains_key(&id_string) { + hm.insert(id_string.clone(), HashMap::new()); + } + if let Some(hm) = hm.get_mut(&id_string) { + hm.insert(views.country, views.total_downloads); + } + } + + Ok(HttpResponse::Ok().json(hm)) +} + +/// Get country data for a set of projects or versions +/// Data is returned as a hashmap of project/version ids to a hashmap of coutnry to views. +/// Unknown countries are labeled "". +/// This is usuable to see significant performing countries per project +/// eg: +/// { +/// "4N1tEhnO": { +/// "CAN": 56165 +/// } +///} +/// Either a list of project_ids or version_ids can be used, but not both. Unauthorized projects/versions will be filtered out. +/// For this endpoint, provided dates are a range to aggregate over, not specific days to fetch +#[get("countries/views")] +pub async fn countries_views_get( + req: HttpRequest, + clickhouse: web::Data, + data: web::Json, + session_queue: web::Data, + pool: web::Data, + redis: web::Data, +) -> Result { + let user_option = get_user_from_headers( + &req, + &**pool, + &redis, + &session_queue, + Some(&[Scopes::ANALYTICS]), + ) + .await + .map(|x| x.1) + .ok(); + + let project_ids = data.project_ids.clone(); + let version_ids = data.version_ids.clone(); + + if project_ids.is_some() && version_ids.is_some() { + return Err(ApiError::InvalidInput( + "Only one of 'project_ids' or 'version_ids' should be used.".to_string(), + )); + } + + let start_date = data + .start_date + .unwrap_or(Utc::now().naive_utc().date() - Duration::weeks(2)); + let end_date = data.end_date.unwrap_or(Utc::now().naive_utc().date()); + + // Convert String list to list of ProjectIds or VersionIds + // - Filter out unauthorized projects/versions + // - If no project_ids or version_ids are provided, we default to all projects the user has access to + let (project_ids, version_ids) = + filter_allowed_ids(project_ids, version_ids, user_option, &pool, &redis).await?; + + // Get the countries + let countries = crate::clickhouse::fetch_countries( + project_ids, + version_ids, + start_date, + end_date, + clickhouse.into_inner(), + ) + .await?; + + let mut hm = HashMap::new(); + for views in countries { + let id_string = to_base62(views.id); + if !hm.contains_key(&id_string) { + hm.insert(id_string.clone(), HashMap::new()); + } + if let Some(hm) = hm.get_mut(&id_string) { + hm.insert(views.country, views.total_views); + } + } + + Ok(HttpResponse::Ok().json(hm)) +} + +async fn filter_allowed_ids( + mut project_ids: Option>, + version_ids: Option>, + user_option: Option, + pool: &web::Data, + redis: &deadpool_redis::Pool, +) -> Result<(Option>, Option>), ApiError> { + if project_ids.is_some() && version_ids.is_some() { + return Err(ApiError::InvalidInput( + "Only one of 'project_ids' or 'version_ids' should be used.".to_string(), + )); + } + + // If no project_ids or version_ids are provided, we default to all projects the user has access to + if project_ids.is_none() && version_ids.is_none() { + if let Some(user) = &user_option { + project_ids = Some( + user_item::User::get_projects(user.id.into(), &***pool) + .await? + .into_iter() + .map(|x| ProjectId::from(x).to_string()) + .collect(), + ); + } + } + + // Convert String list to list of ProjectIds or VersionIds + // - Filter out unauthorized projects/versions + + let project_ids = if let Some(project_ids) = project_ids { + // Submitted project_ids are filtered by the user's permissions + let ids = project_ids + .iter() + .map(|id| Ok(ProjectId(parse_base62(id)?).into())) + .collect::, ApiError>>()?; + let projects = project_item::Project::get_many_ids(&ids, &***pool, redis).await?; + let ids: Vec = filter_authorized_projects(projects, &user_option, pool) + .await? + .into_iter() + .map(|x| x.id) + .collect::>(); + Some(ids) + } else { + None + }; + let version_ids = if let Some(version_ids) = version_ids { + // Submitted version_ids are filtered by the user's permissions + let ids = version_ids + .iter() + .map(|id| Ok(VersionId(parse_base62(id)?).into())) + .collect::, ApiError>>()?; + let versions = version_item::Version::get_many(&ids, &***pool, redis).await?; + let ids: Vec = filter_authorized_versions(versions, &user_option, pool) + .await? + .into_iter() + .map(|x| x.id) + .collect::>(); + Some(ids) + } else { + None + }; + + // Only one of project_ids or version_ids will be Some + Ok((project_ids, version_ids)) +} diff --git a/src/routes/v2/mod.rs b/src/routes/v2/mod.rs index 786450440..e3b769e83 100644 --- a/src/routes/v2/mod.rs +++ b/src/routes/v2/mod.rs @@ -1,4 +1,5 @@ mod admin; +mod analytics_get; mod moderation; mod notifications; pub(crate) mod project_creation; @@ -21,6 +22,7 @@ pub fn config(cfg: &mut actix_web::web::ServiceConfig) { actix_web::web::scope("v2") .wrap(default_cors()) .configure(admin::config) + .configure(analytics_get::config) .configure(crate::auth::session::config) .configure(crate::auth::flows::config) .configure(crate::auth::pats::config)