diff --git a/Cargo.lock b/Cargo.lock index bbb3f5529..eef7a0117 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -359,6 +359,17 @@ dependencies = [ "syn", ] +[[package]] +name = "actix-web-prom" +version = "0.5.1" +source = "git+https://github.com/nlopes/actix-web-prom?branch=master#05ca96dfb04c9d9c783dc658c7d1e12c1e8b1706" +dependencies = [ + "actix-web", + "futures", + "pin-project 1.0.7", + "prometheus", +] + [[package]] name = "actix_derive" version = "0.5.0" @@ -1954,6 +1965,7 @@ dependencies = [ "actix-ratelimit", "actix-rt", "actix-web", + "actix-web-prom", "async-trait", "base64 0.13.0", "bitflags", @@ -1966,6 +1978,7 @@ dependencies = [ "lazy_static", "log", "meilisearch-sdk", + "prometheus", "rand 0.7.3", "regex", "reqwest 0.10.10", @@ -2564,6 +2577,27 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "prometheus" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5986aa8d62380092d2f50f8b1cdba9cb9b6731ffd4b25b51fd126b6c3e05b99c" +dependencies = [ + "cfg-if 1.0.0", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "protobuf", + "thiserror", +] + +[[package]] +name = "protobuf" +version = "2.25.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23129d50f2c9355ced935fce8a08bd706ee2e7ce2b3b33bf61dace0e379ac63a" + [[package]] name = "quick-error" version = "1.2.3" diff --git a/Cargo.toml b/Cargo.toml index d9e6492d9..5dfa74749 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,3 +56,6 @@ sqlx = { version = "0.4.2", features = ["runtime-actix-rustls", "postgres", "chr sentry = { version = "0.22.0", features = ["log"] } sentry-actix = "0.22.0" + +actix-web-prom = {git = "https://github.com/nlopes/actix-web-prom", branch = "master"} +prometheus = "0.12.0" diff --git a/src/database/postgres_database.rs b/src/database/postgres_database.rs index b04dbb959..49e9c7db1 100644 --- a/src/database/postgres_database.rs +++ b/src/database/postgres_database.rs @@ -8,7 +8,6 @@ const MIGRATION_FOLDER: &str = "migrations"; pub async fn connect() -> Result { info!("Initializing database connection"); - let database_url = dotenv::var("DATABASE_URL").expect("`DATABASE_URL` not in .env"); let pool = PgPoolOptions::new() .min_connections( diff --git a/src/health/mod.rs b/src/health/mod.rs new file mode 100644 index 000000000..9264e01e8 --- /dev/null +++ b/src/health/mod.rs @@ -0,0 +1,10 @@ +pub mod scheduler; +pub mod pod; +pub mod status; + +use lazy_static::lazy_static; +use std::sync::atomic::AtomicBool; + +lazy_static!{ + pub static ref SEARCH_READY: AtomicBool = AtomicBool::new(false); +} \ No newline at end of file diff --git a/src/health/pod.rs b/src/health/pod.rs new file mode 100644 index 000000000..083592075 --- /dev/null +++ b/src/health/pod.rs @@ -0,0 +1,33 @@ +use std::sync::{RwLock, Arc}; + +#[derive(Clone, Debug)] +pub struct PodInfo { + pub pod_name: String, + pub node_name: String, + pod_id: Arc>>, +} + +impl PodInfo { + pub fn new() -> Self { + Self { + pod_name: dotenv::var("POD_NAME").unwrap_or("DEV".to_string()), + node_name: dotenv::var("NODE_NAME").unwrap_or("self-hosted".to_string()), + pod_id: Arc::new(RwLock::new(None)) + } + } + pub fn get_id(&self) -> String { + { + let lock = self.pod_id.read().unwrap(); + if lock.is_some() { + return lock.clone().unwrap(); + } + } + let mut lock = self.pod_id.write().unwrap(); + let id = self.generate_id(); + lock.replace(id.clone()); + id + } + fn generate_id(&self) -> String { + base64::encode(format!("{}-{}", self.node_name, self.pod_name)) + } +} \ No newline at end of file diff --git a/src/health/scheduler.rs b/src/health/scheduler.rs new file mode 100644 index 000000000..a08a9295b --- /dev/null +++ b/src/health/scheduler.rs @@ -0,0 +1,127 @@ +use crate::scheduler::Scheduler; +use sqlx::{Pool, Postgres}; + +use prometheus::{opts, IntGaugeVec}; +use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform}; +use actix_web::Error; + +use std::pin::Pin; +use std::future::{Future}; +use std::task::{Context, Poll}; +use futures::future::{ok, Ready}; + +use crate::health::pod::PodInfo; +use actix_web::http::{HeaderName, HeaderValue}; +use actix_web_prom::{PrometheusMetrics}; + +pub struct HealthCounters { + pod: PodInfo, + idle_db_conn: IntGaugeVec, + opened_db_conn: IntGaugeVec, + current_requests: IntGaugeVec, +} +impl HealthCounters { + pub fn new() -> Self { + let idle_opts = opts!("idle_db_conn", "Amount of idle connections").namespace("api"); + let opened_opts = opts!("open_db_conn", "Amount of open connections").namespace("api"); + let current_opts = opts!("current_requests", "Currently open requests").namespace("api"); + Self { + pod: PodInfo::new(), + idle_db_conn: IntGaugeVec::new(idle_opts, &[]).unwrap(), + opened_db_conn: IntGaugeVec::new(opened_opts, &[]).unwrap(), + current_requests: IntGaugeVec::new(current_opts, &["endpoint", "method"]).unwrap(), + } + } + pub fn register(&self, builder: &mut PrometheusMetrics) { + builder + .registry + .register(Box::new(self.opened_db_conn.clone())).unwrap(); + builder + .registry + .register(Box::new(self.idle_db_conn.clone())).unwrap(); + builder + .registry + .register(Box::new(self.current_requests.clone())).unwrap(); + } + pub fn schedule(&self, pool: Pool, scheduler: &mut Scheduler) { + let this = self.clone(); + scheduler.run(std::time::Duration::from_secs(5), move || { + let idle = pool.num_idle(); + let total = pool.size(); + this.idle_db_conn.with_label_values(&[]).set(idle as i64); + this.opened_db_conn.with_label_values(&[]).set(total as i64); + async move { + ok::(1).await.unwrap(); + } + }); + } +} + +impl Clone for HealthCounters { + fn clone(&self) -> Self { + Self { + pod: self.pod.clone(), + idle_db_conn: self.idle_db_conn.clone(), + opened_db_conn: self.opened_db_conn.clone(), + current_requests: self.current_requests.clone(), + } + } +} + +impl Transform for HealthCounters + where + S: Service, Error = Error>, + S::Future: 'static, + B: 'static, +{ + type Request = ServiceRequest; + type Response = ServiceResponse; + type Error = Error; + type Transform = MonitoringMiddleware; + type InitError = (); + type Future = Ready>; + + fn new_transform(&self, service: S) -> Self::Future { + ok(MonitoringMiddleware { service, counters: self.clone() }) + } +} + + + + +pub struct MonitoringMiddleware { + service: S, + counters: HealthCounters, +} + +impl Service for MonitoringMiddleware + where + S: Service, Error = Error>, + S::Future: 'static, + B: 'static, +{ + type Request = ServiceRequest; + type Response = ServiceResponse; + type Error = Error; + type Future = Pin>>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.service.poll_ready(cx) + } + + fn call(&mut self, req: ServiceRequest) -> Self::Future { + // The request has started. + let pattern_or_path = req.match_pattern().unwrap_or("unknown".to_string()); + let counter = self.counters.current_requests.with_label_values(&[&*pattern_or_path,req.method().as_str()]); + counter.inc(); + let pod = self.counters.pod.clone(); + let fut = self.service.call(req); + Box::pin(async move { + let mut res: Self::Response = fut.await?; + // The request finished, remove a counter + counter.dec(); + res.headers_mut().insert(HeaderName::from_static("x-server"), HeaderValue::from_str(&*pod.get_id()).unwrap()); + Ok(res) + }) + } +} \ No newline at end of file diff --git a/src/health/status.rs b/src/health/status.rs new file mode 100644 index 000000000..a84601ce3 --- /dev/null +++ b/src/health/status.rs @@ -0,0 +1,16 @@ +use sqlx::{PgPool}; +use actix_web::web; + +pub async fn test_database(postgres: web::Data) -> Result<(), sqlx::Error> { + let mut transaction = postgres.acquire().await?; + let result = sqlx::query( + " + SELECT 1 + " + ).execute(&mut transaction) + .await; + match result { + Ok(_) => Ok(()), + Err(e) => Err(e) + } +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 0305cea79..60c71f524 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,11 @@ use rand::Rng; use search::indexing::index_projects; use search::indexing::IndexingSettings; use std::sync::Arc; +use std::collections::HashMap; +use std::sync::atomic::Ordering; +use crate::health::pod::PodInfo; +use crate::health::scheduler::HealthCounters; +use actix_web_prom::{PrometheusMetricsBuilder}; mod database; mod file_hosting; @@ -17,6 +22,7 @@ mod models; mod routes; mod scheduler; mod search; +mod health; mod util; mod validate; @@ -232,6 +238,7 @@ async fn main() -> std::io::Result<()> { if let Err(e) = result { warn!("Indexing created projects failed: {:?}", e); } + crate::health::SEARCH_READY.store(true, Ordering::Release); info!("Done indexing created project queue"); } }); @@ -243,12 +250,29 @@ async fn main() -> std::io::Result<()> { }; let store = MemoryStore::new(); + // Generate pod id + let pod = PodInfo::new(); + // Init prometheus cluster + let mut labels = HashMap::new(); + labels.insert("pod".to_string(), pod.pod_name); + labels.insert("node".to_string(), pod.node_name); + // Get prometheus service + let mut prometheus = PrometheusMetricsBuilder::new("api") + .endpoint("/metrics") + .build() + .unwrap(); + // Get custom service + let health = HealthCounters::new(); + health.register(&mut prometheus); + health.schedule(pool.clone(), &mut scheduler); info!("Starting Actix HTTP server!"); // Init App HttpServer::new(move || { App::new() + .wrap(prometheus.clone()) + .wrap(health.clone()) .wrap( Cors::default() .allowed_methods(vec!["GET", "POST", "DELETE", "PATCH", "PUT"]) @@ -311,6 +335,7 @@ async fn main() -> std::io::Result<()> { .configure(routes::v1_config) .configure(routes::v2_config) .service(routes::index_get) + .service(routes::health_get) .service(web::scope("/maven/").configure(routes::maven_config)) .default_service(web::get().to(routes::not_found)) }) diff --git a/src/models/mod.rs b/src/models/mod.rs index 0c9b7c58e..f1cc1303e 100644 --- a/src/models/mod.rs +++ b/src/models/mod.rs @@ -4,4 +4,4 @@ pub mod notifications; pub mod projects; pub mod reports; pub mod teams; -pub mod users; +pub mod users; \ No newline at end of file diff --git a/src/routes/health.rs b/src/routes/health.rs new file mode 100644 index 000000000..cce6d2717 --- /dev/null +++ b/src/routes/health.rs @@ -0,0 +1,31 @@ +use actix_web::{get, HttpResponse}; +use serde_json::json; +use crate::health::status::test_database; +use actix_web::web::Data; +use sqlx::PgPool; +use crate::health::SEARCH_READY; +use std::sync::atomic::Ordering; + +#[get("/health")] +pub async fn health_get(client: Data) -> HttpResponse { + // Check database connection: + let result = test_database(client).await; + if result.is_err() { + let data = json!({ + "ready": false, + "reason": "Database connection error" + }); + return HttpResponse::InternalServerError().json(data) + } + if !SEARCH_READY.load(Ordering::Acquire) { + let data = json!({ + "ready": false, + "reason": "Indexing is not finished" + }); + return HttpResponse::InternalServerError().json(data) + } + HttpResponse::Ok().json(json!({ + "ready": true, + "reason": "Everything is OK" + })) +} \ No newline at end of file diff --git a/src/routes/mod.rs b/src/routes/mod.rs index 7a1d54e9a..e490bb2ee 100644 --- a/src/routes/mod.rs +++ b/src/routes/mod.rs @@ -18,11 +18,13 @@ mod users; mod version_creation; mod version_file; mod versions; +mod health; pub use auth::config as auth_config; pub use tags::config as tags_config; pub use self::index::index_get; +pub use self::health::health_get; pub use self::not_found::not_found; use crate::file_hosting::FileHostingError;