Socket cleanup (#682)

* testing changes

* added success

* removed success

* Fix compile error

---------

Co-authored-by: Geometrically <18202329+Geometrically@users.noreply.github.com>
Co-authored-by: Jai A <jaiagr+gpg@pm.me>
This commit is contained in:
Wyatt Verchere 2023-08-21 11:21:05 -07:00 committed by GitHub
parent e9c7f5d664
commit a1cfdf1a5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 62 additions and 29 deletions

View File

@ -1000,15 +1000,19 @@ pub async fn ws_init(
pub async fn auth_callback(
req: HttpRequest,
Query(query): Query<HashMap<String, String>>,
sockets: Data<RwLock<ActiveSockets>>,
active_sockets: Data<RwLock<ActiveSockets>>,
client: Data<PgPool>,
file_host: Data<Arc<dyn FileHost + Send + Sync>>,
redis: Data<deadpool_redis::Pool>,
) -> Result<HttpResponse, super::templates::ErrorPage> {
let res = async move {
let state = query
.get("state")
.ok_or_else(|| AuthenticationError::InvalidCredentials)?.clone();
let state_string = query
.get("state")
.ok_or_else(|| AuthenticationError::InvalidCredentials)?
.clone();
let sockets = active_sockets.clone();
let state = state_string.clone();
let res: Result<HttpResponse, AuthenticationError> = (|| async move {
let flow = Flow::get(&state, &redis).await?;
@ -1170,7 +1174,29 @@ pub async fn auth_callback(
} else {
Err::<HttpResponse, AuthenticationError>(AuthenticationError::InvalidCredentials)
}
}.await;
})().await;
// Because this is callback route, if we have an error, we need to ensure we close the original socket if it exists
if let Err(ref e) = res {
let db = active_sockets.read().await;
let mut x = db.auth_sockets.get_mut(&state_string);
if let Some(x) = x.as_mut() {
let mut ws_conn = x.value_mut().clone();
ws_conn
.text(
serde_json::json!({
"error": &e.error_name(),
"description": &e.to_string(),
} )
.to_string(),
)
.await
.map_err(|_| AuthenticationError::SocketError)?;
let _ = ws_conn.close(None).await;
}
}
Ok(res?)
}

View File

@ -47,7 +47,7 @@ pub enum AuthenticationError {
#[error("Invalid state sent, you probably need to get a new websocket")]
SocketError,
#[error("Invalid callback URL specified")]
Url
Url,
}
impl actix_web::ResponseError for AuthenticationError {
@ -72,23 +72,29 @@ impl actix_web::ResponseError for AuthenticationError {
fn error_response(&self) -> HttpResponse {
HttpResponse::build(self.status_code()).json(ApiError {
error: match self {
AuthenticationError::Env(..) => "environment_error",
AuthenticationError::Sqlx(..) => "database_error",
AuthenticationError::Database(..) => "database_error",
AuthenticationError::SerDe(..) => "invalid_input",
AuthenticationError::Reqwest(..) => "network_error",
AuthenticationError::InvalidCredentials => "invalid_credentials",
AuthenticationError::Decoding(..) => "decoding_error",
AuthenticationError::Mail(..) => "mail_error",
AuthenticationError::InvalidAuthMethod => "invalid_auth_method",
AuthenticationError::InvalidClientId => "invalid_client_id",
AuthenticationError::Url => "url_error",
AuthenticationError::FileHosting(..) => "file_hosting",
AuthenticationError::DuplicateUser => "duplicate_user",
AuthenticationError::SocketError => "socket",
},
error: self.error_name(),
description: &self.to_string(),
})
}
}
impl AuthenticationError {
pub fn error_name(&self) -> &'static str {
match self {
AuthenticationError::Env(..) => "environment_error",
AuthenticationError::Sqlx(..) => "database_error",
AuthenticationError::Database(..) => "database_error",
AuthenticationError::SerDe(..) => "invalid_input",
AuthenticationError::Reqwest(..) => "network_error",
AuthenticationError::InvalidCredentials => "invalid_credentials",
AuthenticationError::Decoding(..) => "decoding_error",
AuthenticationError::Mail(..) => "mail_error",
AuthenticationError::InvalidAuthMethod => "invalid_auth_method",
AuthenticationError::InvalidClientId => "invalid_client_id",
AuthenticationError::Url => "url_error",
AuthenticationError::FileHosting(..) => "file_hosting",
AuthenticationError::DuplicateUser => "duplicate_user",
AuthenticationError::SocketError => "socket",
}
}
}

View File

@ -345,7 +345,7 @@ async fn main() -> std::io::Result<()> {
let client_ref = client_ref.clone();
async move {
info!("Done running payouts");
info!("Started running payouts");
let result = process_payout(&pool_ref, &redis_ref, &client_ref).await;
if let Err(e) = result {
warn!("Payouts run failed: {:?}", e);

View File

@ -2,7 +2,7 @@ use crate::models::projects::MonetizationStatus;
use crate::routes::ApiError;
use crate::util::env::parse_var;
use base64::Engine;
use chrono::{DateTime, Datelike, Duration, Utc, Weekday, NaiveDateTime, NaiveDate, NaiveTime};
use chrono::{DateTime, Datelike, Duration, Utc, Weekday};
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use serde_json::json;

View File

@ -134,10 +134,11 @@ impl super::Validator for SpongeValidator {
&self,
archive: &mut ZipArchive<Cursor<bytes::Bytes>>,
) -> Result<ValidationResult, ValidationError> {
if !archive
.file_names()
.any(|name| name == "sponge_plugins.json" || name == "mcmod.info" || name == "META-INF/sponge_plugins.json")
{
if !archive.file_names().any(|name| {
name == "sponge_plugins.json"
|| name == "mcmod.info"
|| name == "META-INF/sponge_plugins.json"
}) {
return Ok(ValidationResult::Warning(
"No sponge_plugins.json or mcmod.info present for Sponge plugin.",
));