Skip to content

Commit

Permalink
chore(proxy): disallow unwrap and unimplemented (#10142)
Browse files Browse the repository at this point in the history
As the title says, I updated the lint rules to no longer allow unwrap or
unimplemented.

Three special cases:
* Tests are allowed to use them
* std::sync::Mutex lock().unwrap() is common because it's usually
correct to continue panicking on poison
* `tokio::spawn_blocking(...).await.unwrap()` is common because it will
only error if the blocking fn panics, so continuing the panic is also
correct

I've introduced two extension traits to help with these last two, that
are a bit more explicit so they don't need an expect message every time.
  • Loading branch information
conradludgate authored Dec 16, 2024
1 parent 2e4c9c5 commit 59b7ff8
Show file tree
Hide file tree
Showing 39 changed files with 178 additions and 113 deletions.
1 change: 1 addition & 0 deletions proxy/src/auth/backend/jwt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,7 @@ impl From<&jose_jwk::Key> for KeyType {
}

#[cfg(test)]
#[expect(clippy::unwrap_used)]
mod tests {
use std::future::IntoFuture;
use std::net::SocketAddr;
Expand Down
2 changes: 2 additions & 0 deletions proxy/src/auth/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,8 @@ impl ComputeConnectBackend for Backend<'_, ComputeCredentials> {

#[cfg(test)]
mod tests {
#![allow(clippy::unimplemented, clippy::unwrap_used)]

use std::net::IpAddr;
use std::sync::Arc;
use std::time::Duration;
Expand Down
1 change: 1 addition & 0 deletions proxy/src/auth/credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ fn project_name_valid(name: &str) -> bool {
}

#[cfg(test)]
#[expect(clippy::unwrap_used)]
mod tests {
use serde_json::json;
use ComputeUserInfoParseError::*;
Expand Down
4 changes: 3 additions & 1 deletion proxy/src/cache/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tracing::info;

use crate::config::EndpointCacheConfig;
use crate::context::RequestContext;
use crate::ext::LockExt;
use crate::intern::{BranchIdInt, EndpointIdInt, ProjectIdInt};
use crate::metrics::{Metrics, RedisErrors, RedisEventsCount};
use crate::rate_limiter::GlobalRateLimiter;
Expand Down Expand Up @@ -96,7 +97,7 @@ impl EndpointsCache {

// If the limiter allows, we can pretend like it's valid
// (incase it is, due to redis channel lag).
if self.limiter.lock().unwrap().check() {
if self.limiter.lock_propagate_poison().check() {
return true;
}

Expand Down Expand Up @@ -258,6 +259,7 @@ impl EndpointsCache {
}

#[cfg(test)]
#[expect(clippy::unwrap_used)]
mod tests {
use super::*;

Expand Down
1 change: 1 addition & 0 deletions proxy/src/cache/project_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ impl Cache for ProjectInfoCacheImpl {
}

#[cfg(test)]
#[expect(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::scram::ServerSecret;
Expand Down
4 changes: 3 additions & 1 deletion proxy/src/cancellation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use uuid::Uuid;

use crate::auth::{check_peer_addr_is_in_list, IpPattern};
use crate::error::ReportableError;
use crate::ext::LockExt;
use crate::metrics::{CancellationRequest, CancellationSource, Metrics};
use crate::rate_limiter::LeakyBucketRateLimiter;
use crate::redis::cancellation_publisher::{
Expand Down Expand Up @@ -114,7 +115,7 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
IpAddr::V4(ip) => IpNet::V4(Ipv4Net::new_assert(ip, 24).trunc()), // use defaut mask here
IpAddr::V6(ip) => IpNet::V6(Ipv6Net::new_assert(ip, 64).trunc()),
};
if !self.limiter.lock().unwrap().check(subnet_key, 1) {
if !self.limiter.lock_propagate_poison().check(subnet_key, 1) {
// log only the subnet part of the IP address to know which subnet is rate limited
tracing::warn!("Rate limit exceeded. Skipping cancellation message, {subnet_key}");
Metrics::get()
Expand Down Expand Up @@ -283,6 +284,7 @@ impl<P> Drop for Session<P> {
}

#[cfg(test)]
#[expect(clippy::unwrap_used)]
mod tests {
use super::*;

Expand Down
13 changes: 4 additions & 9 deletions proxy/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,10 @@ impl CertResolver {
) -> anyhow::Result<()> {
let priv_key = {
let key_bytes = std::fs::read(key_path)
.context(format!("Failed to read TLS keys at '{key_path}'"))?;
let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..]).collect_vec();

ensure!(keys.len() == 1, "keys.len() = {} (should be 1)", keys.len());
PrivateKeyDer::Pkcs8(
keys.pop()
.unwrap()
.context(format!("Failed to parse TLS keys at '{key_path}'"))?,
)
.with_context(|| format!("Failed to read TLS keys at '{key_path}'"))?;
rustls_pemfile::private_key(&mut &key_bytes[..])
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
};

let cert_chain_bytes = std::fs::read(cert_path)
Expand Down
14 changes: 10 additions & 4 deletions proxy/src/context/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use utils::backoff;
use super::{RequestContextInner, LOG_CHAN};
use crate::config::remote_storage_from_toml;
use crate::context::LOG_CHAN_DISCONNECT;
use crate::ext::TaskExt;

#[derive(clap::Args, Clone, Debug)]
pub struct ParquetUploadArgs {
Expand Down Expand Up @@ -171,7 +172,9 @@ pub async fn worker(
};

let (tx, mut rx) = mpsc::unbounded_channel();
LOG_CHAN.set(tx.downgrade()).unwrap();
LOG_CHAN
.set(tx.downgrade())
.expect("only one worker should set the channel");

// setup row stream that will close on cancellation
let cancellation_token2 = cancellation_token.clone();
Expand Down Expand Up @@ -207,7 +210,9 @@ pub async fn worker(
config.parquet_upload_disconnect_events_remote_storage
{
let (tx_disconnect, mut rx_disconnect) = mpsc::unbounded_channel();
LOG_CHAN_DISCONNECT.set(tx_disconnect.downgrade()).unwrap();
LOG_CHAN_DISCONNECT
.set(tx_disconnect.downgrade())
.expect("only one worker should set the channel");

// setup row stream that will close on cancellation
tokio::spawn(async move {
Expand Down Expand Up @@ -326,7 +331,7 @@ where
Ok::<_, parquet::errors::ParquetError>((rows, w, rg_meta))
})
.await
.unwrap()?;
.propagate_task_panic()?;

rows.clear();
Ok((rows, w, rg_meta))
Expand All @@ -352,7 +357,7 @@ async fn upload_parquet(
Ok((buffer, metadata))
})
.await
.unwrap()?;
.propagate_task_panic()?;

let data = buffer.split().freeze();

Expand Down Expand Up @@ -409,6 +414,7 @@ async fn upload_parquet(
}

#[cfg(test)]
#[expect(clippy::unwrap_used)]
mod tests {
use std::net::Ipv4Addr;
use std::num::NonZeroUsize;
Expand Down
4 changes: 3 additions & 1 deletion proxy/src/control_plane/client/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ impl MockControlPlane {
Some(s) => {
info!("got allowed_ips: {s}");
s.split(',')
.map(|s| IpPattern::from_str(s).unwrap())
.map(|s| {
IpPattern::from_str(s).expect("mocked ip pattern should be correct")
})
.collect()
}
None => vec![],
Expand Down
41 changes: 41 additions & 0 deletions proxy/src/ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::panic::resume_unwind;
use std::sync::{Mutex, MutexGuard};

use tokio::task::JoinError;

pub(crate) trait LockExt<T> {
fn lock_propagate_poison(&self) -> MutexGuard<'_, T>;
}

impl<T> LockExt<T> for Mutex<T> {
/// Lock the mutex and panic if the mutex was poisoned.
#[track_caller]
fn lock_propagate_poison(&self) -> MutexGuard<'_, T> {
match self.lock() {
Ok(guard) => guard,
// poison occurs when another thread panicked while holding the lock guard.
// since panicking is often unrecoverable, propagating the poison panic is reasonable.
Err(poison) => panic!("{poison}"),
}
}
}

pub(crate) trait TaskExt<T> {
fn propagate_task_panic(self) -> T;
}

impl<T> TaskExt<T> for Result<T, JoinError> {
/// Unwrap the result and panic if the inner task panicked.
/// Also panics if the task was cancelled
#[track_caller]
fn propagate_task_panic(self) -> T {
match self {
Ok(t) => t,
// Using resume_unwind prevents the panic hook being called twice.
// Since we use this for structured concurrency, there is only
// 1 logical panic, so this is more correct.
Err(e) if e.is_panic() => resume_unwind(e.into_panic()),
Err(e) => panic!("unexpected task error: {e}"),
}
}
}
7 changes: 4 additions & 3 deletions proxy/src/http/health_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use utils::http::error::ApiError;
use utils::http::json::json_response;
use utils::http::{RouterBuilder, RouterService};

use crate::ext::{LockExt, TaskExt};
use crate::jemalloc;

async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
Expand Down Expand Up @@ -76,7 +77,7 @@ async fn prometheus_metrics_handler(
let body = tokio::task::spawn_blocking(move || {
let _span = span.entered();

let mut state = state.lock().unwrap();
let mut state = state.lock_propagate_poison();
let PrometheusHandler { encoder, metrics } = &mut *state;

metrics
Expand All @@ -94,13 +95,13 @@ async fn prometheus_metrics_handler(
body
})
.await
.unwrap();
.propagate_task_panic();

let response = Response::builder()
.status(200)
.header(CONTENT_TYPE, "text/plain; version=0.0.4")
.body(Body::from(body))
.unwrap();
.expect("response headers should be valid");

Ok(response)
}
3 changes: 2 additions & 1 deletion proxy/src/intern.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl<Id: InternId> StringInterner<Id> {
pub(crate) fn new() -> Self {
StringInterner {
inner: ThreadedRodeo::with_capacity_memory_limits_and_hasher(
Capacity::new(2500, NonZeroUsize::new(1 << 16).unwrap()),
Capacity::new(2500, NonZeroUsize::new(1 << 16).expect("value is nonzero")),
// unbounded
MemoryLimits::for_memory_usage(usize::MAX),
BuildHasherDefault::<FxHasher>::default(),
Expand Down Expand Up @@ -207,6 +207,7 @@ impl From<ProjectId> for ProjectIdInt {
}

#[cfg(test)]
#[expect(clippy::unwrap_used)]
mod tests {
use std::sync::OnceLock;

Expand Down
5 changes: 3 additions & 2 deletions proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
clippy::string_add,
clippy::string_to_string,
clippy::todo,
// TODO: consider clippy::unimplemented
// TODO: consider clippy::unwrap_used
clippy::unimplemented,
clippy::unwrap_used,
)]
// List of permanently allowed lints.
#![allow(
Expand Down Expand Up @@ -82,6 +82,7 @@ pub mod console_redirect_proxy;
pub mod context;
pub mod control_plane;
pub mod error;
mod ext;
pub mod http;
pub mod intern;
pub mod jemalloc;
Expand Down
12 changes: 10 additions & 2 deletions proxy/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,16 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
let env_filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy()
.add_directive("aws_config=info".parse().unwrap())
.add_directive("azure_core::policies::transport=off".parse().unwrap());
.add_directive(
"aws_config=info"
.parse()
.expect("this should be a valid filter directive"),
)
.add_directive(
"azure_core::policies::transport=off"
.parse()
.expect("this should be a valid filter directive"),
);

let fmt_layer = tracing_subscriber::fmt::layer()
.with_ansi(false)
Expand Down
15 changes: 0 additions & 15 deletions proxy/src/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,6 @@ pub(crate) fn split_cstr(bytes: &[u8]) -> Option<(&CStr, &[u8])> {
Some((cstr, other))
}

/// See <https://doc.rust-lang.org/std/primitive.slice.html#method.split_array_ref>.
pub(crate) fn split_at_const<const N: usize>(bytes: &[u8]) -> Option<(&[u8; N], &[u8])> {
(bytes.len() >= N).then(|| {
let (head, tail) = bytes.split_at(N);
(head.try_into().unwrap(), tail)
})
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -33,11 +25,4 @@ mod tests {
assert_eq!(cstr.to_bytes(), b"foo");
assert_eq!(rest, b"bar");
}

#[test]
fn test_split_at_const() {
assert!(split_at_const::<0>(b"").is_some());
assert!(split_at_const::<1>(b"").is_none());
assert!(matches!(split_at_const::<1>(b"ok"), Some((b"o", b"k"))));
}
}
1 change: 1 addition & 0 deletions proxy/src/protocol2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ impl NetworkEndianIpv6 {
}

#[cfg(test)]
#[expect(clippy::unwrap_used)]
mod tests {
use tokio::io::AsyncReadExt;

Expand Down
1 change: 1 addition & 0 deletions proxy/src/proxy/copy_bidirectional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ impl CopyBuffer {
}

#[cfg(test)]
#[expect(clippy::unwrap_used)]
mod tests {
use tokio::io::AsyncWriteExt;

Expand Down
2 changes: 1 addition & 1 deletion proxy/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ impl NeonOptions {

pub(crate) fn neon_option(bytes: &str) -> Option<(&str, &str)> {
static RE: OnceCell<Regex> = OnceCell::new();
let re = RE.get_or_init(|| Regex::new(r"^neon_(\w+):(.+)").unwrap());
let re = RE.get_or_init(|| Regex::new(r"^neon_(\w+):(.+)").expect("regex should be correct"));

let cap = re.captures(bytes)?;
let (_, [k, v]) = cap.extract();
Expand Down
1 change: 1 addition & 0 deletions proxy/src/proxy/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! A group of high-level tests for connection establishing logic and auth.
#![allow(clippy::unimplemented, clippy::unwrap_used)]

mod mitm;

Expand Down
2 changes: 1 addition & 1 deletion proxy/src/rate_limiter/leaky_bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl From<LeakyBucketConfig> for utils::leaky_bucket::LeakyBucketConfig {
}

#[cfg(test)]
#[allow(clippy::float_cmp)]
#[allow(clippy::float_cmp, clippy::unwrap_used)]
mod tests {
use std::time::Duration;

Expand Down
1 change: 1 addition & 0 deletions proxy/src/rate_limiter/limit_algorithm/aimd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl LimitAlgorithm for Aimd {
}

#[cfg(test)]
#[expect(clippy::unwrap_used)]
mod tests {
use std::time::Duration;

Expand Down
4 changes: 3 additions & 1 deletion proxy/src/rate_limiter/limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use rand::{Rng, SeedableRng};
use tokio::time::{Duration, Instant};
use tracing::info;

use crate::ext::LockExt;
use crate::intern::EndpointIdInt;

pub struct GlobalRateLimiter {
Expand Down Expand Up @@ -246,12 +247,13 @@ impl<K: Hash + Eq, R: Rng, S: BuildHasher + Clone> BucketRateLimiter<K, R, S> {
let n = self.map.shards().len();
// this lock is ok as the periodic cycle of do_gc makes this very unlikely to collide
// (impossible, infact, unless we have 2048 threads)
let shard = self.rand.lock().unwrap().gen_range(0..n);
let shard = self.rand.lock_propagate_poison().gen_range(0..n);
self.map.shards()[shard].write().clear();
}
}

#[cfg(test)]
#[expect(clippy::unwrap_used)]
mod tests {
use std::hash::BuildHasherDefault;
use std::time::Duration;
Expand Down
Loading

1 comment on commit 59b7ff8

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7095 tests run: 6796 passed, 1 failed, 298 skipped (full report)


Failures on Postgres 17

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_timeline_archival_chaos[release-pg17]"
Flaky tests (4)

Postgres 17

Postgres 15

  • test_pgdata_import_smoke[None-1024-RelBlockSize.MULTIPLE_RELATION_SEGMENTS]: release-arm64

Postgres 14

Test coverage report is not available

The comment gets automatically updated with the latest test results
59b7ff8 at 2024-12-16T17:32:29.513Z :recycle:

Please sign in to comment.