Make connections short-lived redis (#3509)

This commit is contained in:
Jai Agrawal 2025-04-15 15:39:13 -07:00 committed by GitHub
parent 04659a8198
commit 76be502e16
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -224,8 +224,6 @@ impl RedisPool {
+ Serialize, + Serialize,
S: Display + Clone + DeserializeOwned + Serialize + Debug, S: Display + Clone + DeserializeOwned + Serialize + Debug,
{ {
let connection = self.connect().await?.connection;
let ids = keys let ids = keys
.iter() .iter()
.map(|x| (x.to_string(), x.clone())) .map(|x| (x.to_string(), x.clone()))
@ -235,49 +233,21 @@ impl RedisPool {
return Ok(HashMap::new()); return Ok(HashMap::new());
} }
let get_cached_values = let get_cached_values = |ids: DashMap<String, I>| async move {
|ids: DashMap<String, I>, let slug_ids = if let Some(slug_namespace) = slug_namespace {
mut connection: deadpool_redis::Connection| async move { let mut connection = self.pool.get().await?;
let slug_ids = if let Some(slug_namespace) = slug_namespace { cmd("MGET")
cmd("MGET")
.arg(
ids.iter()
.map(|x| {
format!(
"{}_{slug_namespace}:{}",
self.meta_namespace,
if case_sensitive {
x.value().to_string()
} else {
x.value().to_string().to_lowercase()
}
)
})
.collect::<Vec<_>>(),
)
.query_async::<Vec<Option<String>>>(&mut connection)
.await?
.into_iter()
.flatten()
.collect::<Vec<_>>()
} else {
Vec::new()
};
let cached_values = cmd("MGET")
.arg( .arg(
ids.iter() ids.iter()
.map(|x| x.value().to_string())
.chain(ids.iter().filter_map(|x| {
parse_base62(&x.value().to_string())
.ok()
.map(|x| x.to_string())
}))
.chain(slug_ids)
.map(|x| { .map(|x| {
format!( format!(
"{}_{namespace}:{x}", "{}_{slug_namespace}:{}",
self.meta_namespace self.meta_namespace,
if case_sensitive {
x.value().to_string()
} else {
x.value().to_string().to_lowercase()
}
) )
}) })
.collect::<Vec<_>>(), .collect::<Vec<_>>(),
@ -285,23 +255,46 @@ impl RedisPool {
.query_async::<Vec<Option<String>>>(&mut connection) .query_async::<Vec<Option<String>>>(&mut connection)
.await? .await?
.into_iter() .into_iter()
.filter_map(|x| { .flatten()
x.and_then(|val| { .collect::<Vec<_>>()
serde_json::from_str::<RedisValue<T, K, S>>(&val) } else {
.ok() Vec::new()
})
.map(|val| (val.key.clone(), val))
})
.collect::<HashMap<_, _>>();
Ok::<_, DatabaseError>((cached_values, connection, ids))
}; };
let mut connection = self.pool.get().await?;
let cached_values = cmd("MGET")
.arg(
ids.iter()
.map(|x| x.value().to_string())
.chain(ids.iter().filter_map(|x| {
parse_base62(&x.value().to_string())
.ok()
.map(|x| x.to_string())
}))
.chain(slug_ids)
.map(|x| {
format!("{}_{namespace}:{x}", self.meta_namespace)
})
.collect::<Vec<_>>(),
)
.query_async::<Vec<Option<String>>>(&mut connection)
.await?
.into_iter()
.filter_map(|x| {
x.and_then(|val| {
serde_json::from_str::<RedisValue<T, K, S>>(&val).ok()
})
.map(|val| (val.key.clone(), val))
})
.collect::<HashMap<_, _>>();
Ok::<_, DatabaseError>((cached_values, ids))
};
let current_time = Utc::now(); let current_time = Utc::now();
let mut expired_values = HashMap::new(); let mut expired_values = HashMap::new();
let (cached_values_raw, mut connection, ids) = let (cached_values_raw, ids) = get_cached_values(ids).await?;
get_cached_values(ids, connection).await?;
let mut cached_values = cached_values_raw let mut cached_values = cached_values_raw
.into_iter() .into_iter()
.filter_map(|(key, val)| { .filter_map(|(key, val)| {
@ -352,9 +345,12 @@ impl RedisPool {
.with_expiration(SetExpiry::EX(60)), .with_expiration(SetExpiry::EX(60)),
); );
}); });
let results = pipe let results = {
.query_async::<Vec<Option<i32>>>(&mut connection) let mut connection = self.pool.get().await?;
.await?;
pipe.query_async::<Vec<Option<i32>>>(&mut connection)
.await?
};
for (idx, key) in fetch_ids.into_iter().enumerate() { for (idx, key) in fetch_ids.into_iter().enumerate() {
if let Some(locked) = results.get(idx) { if let Some(locked) = results.get(idx) {
@ -487,6 +483,7 @@ impl RedisPool {
)); ));
} }
let mut connection = self.pool.get().await?;
pipe.query_async::<()>(&mut connection).await?; pipe.query_async::<()>(&mut connection).await?;
Ok(return_values) Ok(return_values)
@ -495,28 +492,29 @@ impl RedisPool {
if !subscribe_ids.is_empty() { if !subscribe_ids.is_empty() {
fetch_tasks.push(Box::pin(async { fetch_tasks.push(Box::pin(async {
let mut connection = self.pool.get().await?;
let mut interval = let mut interval =
tokio::time::interval(Duration::from_millis(100)); tokio::time::interval(Duration::from_millis(100));
let start = Utc::now(); let start = Utc::now();
loop { loop {
let results = cmd("MGET") let results = {
.arg( let mut connection = self.pool.get().await?;
subscribe_ids cmd("MGET")
.iter() .arg(
.map(|x| { subscribe_ids
format!( .iter()
"{}_{namespace}:{}/lock", .map(|x| {
self.meta_namespace, format!(
// We lowercase key because locks are stored in lowercase "{}_{namespace}:{}/lock",
x.key().to_lowercase() self.meta_namespace,
) // We lowercase key because locks are stored in lowercase
}) x.key().to_lowercase()
.collect::<Vec<_>>(), )
) })
.query_async::<Vec<Option<String>>>(&mut connection) .collect::<Vec<_>>(),
.await?; )
.query_async::<Vec<Option<String>>>(&mut connection)
.await?
};
if results.into_iter().all(|x| x.is_none()) { if results.into_iter().all(|x| x.is_none()) {
break; break;
@ -529,8 +527,8 @@ impl RedisPool {
interval.tick().await; interval.tick().await;
} }
let (return_values, _, _) = let (return_values, _) =
get_cached_values(subscribe_ids, connection).await?; get_cached_values(subscribe_ids).await?;
Ok(return_values) Ok(return_values)
})); }));