fix views analytics (#885)

* fix views analytics

* update ip stripping

* update clickhouse tables

* fix broken queries

* Fix panics

* fix download undercounting

* fix packerator failing sometimes

* run prep
This commit is contained in:
Geometrically 2024-03-02 14:04:46 -07:00 committed by GitHub
parent 04d834187b
commit e2ffeab8fa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 207 additions and 140 deletions

View File

@ -1,15 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE users\n SET balance = balance + $1\n WHERE id = $2\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Numeric",
"Int8"
]
},
"nullable": []
},
"hash": "7ab21e7613dd88e97cf602e76bff62170c13ceef8104a4ce4cb2d101f8ce4f48"
}

View File

@ -1,14 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE versions\n SET downloads = downloads + 1\n WHERE id = ANY($1)",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8Array"
]
},
"nullable": []
},
"hash": "b993ec7579f06603a2a308dccd1ea1fbffd94286db48bc0e36a30f4f6a9d39af"
}

View File

@ -1,14 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE mods\n SET downloads = downloads + 1\n WHERE id = ANY($1)",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8Array"
]
},
"nullable": []
},
"hash": "d08c9ef6a8829ce1d23d66f27c58f4b9b64f4ce985e60ded871d1f31eb0c818b"
}

View File

@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO moderation_external_files (sha1, external_license_id)\n SELECT * FROM UNNEST ($1::bytea[], $2::bigint[])\n ",
"query": "\n INSERT INTO moderation_external_files (sha1, external_license_id)\n SELECT * FROM UNNEST ($1::bytea[], $2::bigint[])\n ON CONFLICT (sha1) DO NOTHING\n ",
"describe": {
"columns": [],
"parameters": {
@ -11,5 +11,5 @@
},
"nullable": []
},
"hash": "3d535886d8a239967e6556fb0cd0588b79a7787b9b3cbbd4f8968cd0d99ed49d"
"hash": "f297b517bc3bbd8628c0c222c0e3daf8f4efbe628ee2e8ddbbb4b9734cc9c915"
}

4
Cargo.lock generated
View File

@ -690,9 +690,9 @@ dependencies = [
[[package]]
name = "bstr"
version = "1.9.0"
version = "1.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c48f0051a4b4c5e0b6d365cd04af53aeaa209e3cc15ec2cdb69e73cc87fbd0dc"
checksum = "05efc5cfd9110c8416e471df0e96702d58690178e206e61b7173706673c93706"
dependencies = [
"memchr",
]

View File

@ -42,14 +42,15 @@ pub async fn init_client_with_database(
user_id UInt64,
project_id UInt64,
monetized Bool DEFAULT True,
ip IPv6,
country String,
user_agent String,
headers Array(Tuple(String, String)),
headers Array(Tuple(String, String))
)
ENGINE = MergeTree()
PRIMARY KEY (project_id, recorded)
PRIMARY KEY (project_id, recorded, ip)
"
))
.execute()
@ -71,10 +72,10 @@ pub async fn init_client_with_database(
ip IPv6,
country String,
user_agent String,
headers Array(Tuple(String, String)),
headers Array(Tuple(String, String))
)
ENGINE = MergeTree()
PRIMARY KEY (project_id, recorded)
PRIMARY KEY (project_id, recorded, ip)
"
))
.execute()
@ -94,10 +95,10 @@ pub async fn init_client_with_database(
loader String,
game_version String,
parent UInt64,
parent UInt64
)
ENGINE = MergeTree()
PRIMARY KEY (project_id, recorded)
PRIMARY KEY (project_id, recorded, user_id)
"
))
.execute()

View File

@ -16,7 +16,7 @@ use util::cors::default_cors;
use crate::queue::moderation::AutomatedModerationQueue;
use crate::{
// queue::payouts::process_payout,
queue::payouts::process_payout,
search::indexing::index_projects,
util::env::{parse_strings_from_var, parse_var},
};
@ -214,25 +214,25 @@ pub fn app_setup(
});
}
// {
// let pool_ref = pool.clone();
// let redis_ref = redis_pool.clone();
// let client_ref = clickhouse.clone();
// scheduler.run(std::time::Duration::from_secs(60 * 60 * 6), move || {
// let pool_ref = pool_ref.clone();
// let redis_ref = redis_ref.clone();
// let client_ref = client_ref.clone();
//
// async move {
// info!("Started running payouts");
// let result = process_payout(&pool_ref, &redis_ref, &client_ref).await;
// if let Err(e) = result {
// warn!("Payouts run failed: {:?}", e);
// }
// info!("Done running payouts");
// }
// });
// }
{
let pool_ref = pool.clone();
let redis_ref = redis_pool.clone();
let client_ref = clickhouse.clone();
scheduler.run(std::time::Duration::from_secs(60 * 60 * 6), move || {
let pool_ref = pool_ref.clone();
let redis_ref = redis_ref.clone();
let client_ref = client_ref.clone();
async move {
info!("Started running payouts");
let result = process_payout(&pool_ref, &redis_ref, &client_ref).await;
if let Err(e) = result {
warn!("Payouts run failed: {:?}", e);
}
info!("Done running payouts");
}
});
}
let ip_salt = Pepper {
pepper: models::ids::Base62Id(models::ids::random_base62(11)).to_string(),

View File

@ -34,6 +34,8 @@ pub struct PageView {
pub user_id: u64,
// Modrinth Project ID (used for payouts)
pub project_id: u64,
// whether this view will be monetized / counted for payouts
pub monetized: bool,
// The below information is used exclusively for data aggregation and fraud detection
// (ex: page view botting).

View File

@ -5,12 +5,15 @@ use crate::routes::ApiError;
use dashmap::{DashMap, DashSet};
use redis::cmd;
use sqlx::PgPool;
use std::collections::HashMap;
use std::net::Ipv6Addr;
const DOWNLOADS_NAMESPACE: &str = "downloads";
const VIEWS_NAMESPACE: &str = "views";
pub struct AnalyticsQueue {
views_queue: DashSet<PageView>,
downloads_queue: DashMap<String, Download>,
views_queue: DashMap<(u64, u64), Vec<PageView>>,
downloads_queue: DashMap<(u64, u64), Download>,
playtime_queue: DashSet<Playtime>,
}
@ -24,26 +27,37 @@ impl Default for AnalyticsQueue {
impl AnalyticsQueue {
pub fn new() -> Self {
AnalyticsQueue {
views_queue: DashSet::with_capacity(1000),
views_queue: DashMap::with_capacity(1000),
downloads_queue: DashMap::with_capacity(1000),
playtime_queue: DashSet::with_capacity(1000),
}
}
pub fn add_view(&self, page_view: PageView) {
self.views_queue.insert(page_view);
fn strip_ip(ip: Ipv6Addr) -> u64 {
if let Some(ip) = ip.to_ipv4_mapped() {
let octets = ip.octets();
u64::from_be_bytes([octets[0], octets[1], octets[2], octets[3], 0, 0, 0, 0])
} else {
let octets = ip.octets();
u64::from_be_bytes([
octets[0], octets[1], octets[2], octets[3], octets[4], octets[5], octets[6],
octets[7],
])
}
}
pub fn add_view(&self, page_view: PageView) {
let ip_stripped = Self::strip_ip(page_view.ip);
self.views_queue
.entry((ip_stripped, page_view.project_id))
.or_default()
.push(page_view);
}
pub fn add_download(&self, download: Download) {
let ip_stripped = if let Some(ip) = download.ip.to_ipv4_mapped() {
let octets = ip.octets();
u64::from_be_bytes([0, 0, 0, 0, octets[0], octets[1], octets[2], octets[3]])
} else {
let octets = download.ip.octets();
u64::from_be_bytes([0, 0, 0, 0, octets[0], octets[1], octets[2], octets[3]])
};
let ip_stripped = Self::strip_ip(download.ip);
self.downloads_queue
.insert(format!("{}-{}", ip_stripped, download.project_id), download);
.insert((ip_stripped, download.project_id), download);
}
pub fn add_playtime(&self, playtime: Playtime) {
@ -65,16 +79,6 @@ impl AnalyticsQueue {
let playtime_queue = self.playtime_queue.clone();
self.playtime_queue.clear();
if !views_queue.is_empty() {
let mut views = client.insert("views")?;
for view in views_queue {
views.write(&view).await?;
}
views.end().await?;
}
if !playtime_queue.is_empty() {
let mut playtimes = client.insert("playtime")?;
@ -85,6 +89,78 @@ impl AnalyticsQueue {
playtimes.end().await?;
}
if !views_queue.is_empty() {
let mut views_keys = Vec::new();
let mut raw_views = Vec::new();
for (key, views) in views_queue {
views_keys.push(key);
raw_views.push((views, true));
}
let mut redis = redis.pool.get().await.map_err(DatabaseError::RedisPool)?;
let results = cmd("MGET")
.arg(
views_keys
.iter()
.map(|x| format!("{}:{}-{}", VIEWS_NAMESPACE, x.0, x.1))
.collect::<Vec<_>>(),
)
.query_async::<_, Vec<Option<u32>>>(&mut redis)
.await
.map_err(DatabaseError::CacheError)?;
let mut pipe = redis::pipe();
for (idx, count) in results.into_iter().enumerate() {
let key = &views_keys[idx];
let new_count = if let Some((views, monetized)) = raw_views.get_mut(idx) {
if let Some(count) = count {
println!("len: {} count: {}", views.len(), count);
if count > 3 {
*monetized = false;
continue;
}
if (count + views.len() as u32) > 3 {
*monetized = false;
}
count + (views.len() as u32)
} else {
views.len() as u32
}
} else {
1
};
pipe.atomic().set_ex(
format!("{}:{}-{}", VIEWS_NAMESPACE, key.0, key.1),
new_count,
6 * 60 * 60,
);
}
pipe.query_async(&mut *redis)
.await
.map_err(DatabaseError::CacheError)?;
let mut views = client.insert("views")?;
for (all_views, monetized) in raw_views {
for (idx, mut view) in all_views.into_iter().enumerate() {
if idx != 0 || !monetized {
view.monetized = false;
}
views.write(&view).await?;
}
}
views.end().await?;
}
if !downloads_queue.is_empty() {
let mut downloads_keys = Vec::new();
let raw_downloads = DashMap::new();
@ -100,7 +176,7 @@ impl AnalyticsQueue {
.arg(
downloads_keys
.iter()
.map(|x| format!("{}:{}", DOWNLOADS_NAMESPACE, x))
.map(|x| format!("{}:{}-{}", DOWNLOADS_NAMESPACE, x.0, x.1))
.collect::<Vec<_>>(),
)
.query_async::<_, Vec<Option<u32>>>(&mut redis)
@ -123,7 +199,7 @@ impl AnalyticsQueue {
};
pipe.atomic().set_ex(
format!("{}:{}", DOWNLOADS_NAMESPACE, key),
format!("{}:{}-{}", DOWNLOADS_NAMESPACE, key.0, key.1),
new_count,
6 * 60 * 60,
);
@ -132,37 +208,46 @@ impl AnalyticsQueue {
.await
.map_err(DatabaseError::CacheError)?;
let version_ids = raw_downloads
.iter()
.map(|x| x.version_id as i64)
.collect::<Vec<_>>();
let project_ids = raw_downloads
.iter()
.map(|x| x.project_id as i64)
.collect::<Vec<_>>();
let mut transaction = pool.begin().await?;
let mut downloads = client.insert("downloads")?;
let mut version_downloads: HashMap<i64, i32> = HashMap::new();
let mut project_downloads: HashMap<i64, i32> = HashMap::new();
for (_, download) in raw_downloads {
*version_downloads
.entry(download.version_id as i64)
.or_default() += 1;
*project_downloads
.entry(download.project_id as i64)
.or_default() += 1;
downloads.write(&download).await?;
}
sqlx::query!(
"UPDATE versions
SET downloads = downloads + 1
WHERE id = ANY($1)",
&version_ids
sqlx::query(
"
UPDATE versions v
SET downloads = v.downloads + x.amount
FROM unnest($1::BIGINT[], $2::int[]) AS x(id, amount)
WHERE v.id = x.id
",
)
.bind(version_downloads.keys().copied().collect::<Vec<_>>())
.bind(version_downloads.values().copied().collect::<Vec<_>>())
.execute(&mut *transaction)
.await?;
sqlx::query!(
"UPDATE mods
SET downloads = downloads + 1
WHERE id = ANY($1)",
&project_ids
sqlx::query(
"
UPDATE mods m
SET downloads = m.downloads + x.amount
FROM unnest($1::BIGINT[], $2::int[]) AS x(id, amount)
WHERE m.id = x.id
",
)
.bind(project_downloads.keys().copied().collect::<Vec<_>>())
.bind(project_downloads.values().copied().collect::<Vec<_>>())
.execute(&mut *transaction)
.await?;

View File

@ -511,6 +511,7 @@ impl AutomatedModerationQueue {
"
INSERT INTO moderation_external_files (sha1, external_license_id)
SELECT * FROM UNNEST ($1::bytea[], $2::bigint[])
ON CONFLICT (sha1) DO NOTHING
",
&insert_hashes[..],
&insert_ids[..]

View File

@ -6,6 +6,8 @@ use crate::util::env::parse_var;
use crate::{database::redis::RedisPool, models::projects::MonetizationStatus};
use base64::Engine;
use chrono::{DateTime, Datelike, Duration, Utc, Weekday};
use dashmap::DashMap;
use futures::TryStreamExt;
use reqwest::Method;
use rust_decimal::Decimal;
use serde::de::DeserializeOwned;
@ -548,7 +550,7 @@ pub async fn process_payout(
r#"
SELECT COUNT(1) page_views, project_id
FROM views
WHERE (recorded BETWEEN ? AND ?) AND (project_id != 0)
WHERE (recorded BETWEEN ? AND ?) AND (project_id != 0) AND (monetized = TRUE)
GROUP BY project_id
ORDER BY page_views DESC
"#,
@ -557,7 +559,7 @@ pub async fn process_payout(
.bind(end.timestamp())
.fetch_all::<ProjectMultiplier>(),
client
.query("SELECT COUNT(1) FROM views WHERE (recorded BETWEEN ? AND ?) AND (project_id != 0)")
.query("SELECT COUNT(1) FROM views WHERE (recorded BETWEEN ? AND ?) AND (project_id != 0) AND (monetized = TRUE)")
.bind(start.timestamp())
.bind(end.timestamp())
.fetch_one::<u64>(),
@ -636,7 +638,13 @@ pub async fn process_payout(
.map(|x| x.to_string())
.collect::<Vec<String>>(),
)
.fetch_all(&mut *transaction)
.fetch(&mut *transaction)
.try_fold(DashMap::new(), |acc: DashMap<i64, HashMap<i64, Decimal>>, r| {
acc.entry(r.id)
.or_default()
.insert(r.user_id, r.payouts_split);
async move { Ok(acc) }
})
.await?;
let project_team_members = sqlx::query!(
@ -653,20 +661,27 @@ pub async fn process_payout(
.map(|x| x.to_string())
.collect::<Vec<String>>(),
)
.fetch_all(&mut *transaction)
.fetch(&mut *transaction)
.try_fold(
DashMap::new(),
|acc: DashMap<i64, HashMap<i64, Decimal>>, r| {
acc.entry(r.id)
.or_default()
.insert(r.user_id, r.payouts_split);
async move { Ok(acc) }
},
)
.await?;
for project_id in project_ids {
let team_members: HashMap<i64, Decimal> = project_team_members
.iter()
.filter(|r| r.id == project_id)
.map(|r| (r.user_id, r.payouts_split))
.collect();
.remove(&project_id)
.unwrap_or((0, HashMap::new()))
.1;
let org_team_members: HashMap<i64, Decimal> = project_org_members
.iter()
.filter(|r| r.id == project_id)
.map(|r| (r.user_id, r.payouts_split))
.collect();
.remove(&project_id)
.unwrap_or((0, HashMap::new()))
.1;
let mut all_team_members = vec![];
@ -711,6 +726,7 @@ pub async fn process_payout(
let mut clear_cache_users = Vec::new();
let (mut insert_user_ids, mut insert_project_ids, mut insert_payouts, mut insert_starts) =
(Vec::new(), Vec::new(), Vec::new(), Vec::new());
let (mut update_user_ids, mut update_user_balances) = (Vec::new(), Vec::new());
for (id, project) in projects_map {
if let Some(value) = &multipliers.values.get(&(id as u64)) {
let project_multiplier: Decimal =
@ -728,17 +744,8 @@ pub async fn process_payout(
insert_payouts.push(payout);
insert_starts.push(start);
sqlx::query!(
"
UPDATE users
SET balance = balance + $1
WHERE id = $2
",
payout,
user_id
)
.execute(&mut *transaction)
.await?;
update_user_ids.push(user_id);
update_user_balances.push(payout);
clear_cache_users.push(user_id);
}
@ -747,6 +754,19 @@ pub async fn process_payout(
}
}
sqlx::query(
"
UPDATE users u
SET balance = u.balance + v.amount
FROM unnest($1::BIGINT[], $2::NUMERIC[]) AS v(id, amount)
WHERE u.id = v.id
",
)
.bind(&update_user_ids)
.bind(&update_user_balances)
.execute(&mut *transaction)
.await?;
sqlx::query!(
"
INSERT INTO payouts_values (user_id, mod_id, amount, created)

View File

@ -118,6 +118,7 @@ pub async fn page_view_ingest(
.into_iter()
.filter(|x| !FILTERED_HEADERS.contains(&&*x.0))
.collect(),
monetized: true,
};
if let Some(segments) = url.path_segments() {