-
Notifications
You must be signed in to change notification settings - Fork 120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
tests: implement ccm integration #1203
base: main
Are you sure you want to change the base?
Conversation
386ad62
to
a254460
Compare
|
281258a
to
2c4c00b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the first round of the review.
There are for sure more issues, but I'm unable to grasp them until I gain the high-level of the approach, which I failed to achieve so far due to no comments nor commit messages nor the cover letter.
stream: T, | ||
writer: Arc<Mutex<File>>, | ||
prefix: String, | ||
mut buffer: Option<&mut Vec<u8>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mut buffer: Option<&mut Vec<u8>>, | |
buffer: Option<&mut Vec<u8>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does not work:
error[E0596]: cannot borrow `buffer.0` as mutable, as `buffer` is not declared as mutable
--> scylla/tests/ccm-integration/logged_cli.rs:249:25
|
249 | if let Some(ref mut buffer) = buffer {
| ^^^^^^^^^^^^^^ cannot borrow as mutable
|
help: consider changing this to be mutable
|
237 | mut buffer: Option<&mut Vec<u8>>,
| +++
If i change ref mut
to ref
it fails with:
error[E0596]: cannot borrow `**buffer` as mutable, as it is behind a `&` reference
--> scylla/tests/ccm-integration/logged_cli.rs:250:17
|
250 | buffer.extend_from_slice(line.as_bytes());
| ^^^^^^ `buffer` is a `&` reference, so the data it refers to cannot be borrowed as mutable
|
help: consider changing this to be a mutable reference
|
249 | if let Some(ref mut buffer) = buffer {
| +++
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try experimenting with matching on buffer.as_ref()
or buffer.as_mut()
instead of matching on plain buffer
. Might solve the issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as_mut
requires buffer
to be mut, as_ref
goes wrong way, but match buffer
worked well, take a look
Cleanup unused dependancies
f14fa11
to
b900f55
Compare
b900f55
to
0c9ff2a
Compare
13d7c5e
to
d97469d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By no means a thorough review, just posting a few things that I noticed during a brief read.
Few other things:
- The utils for tests should be put in some folder inside
ccm-integration
, instead of being on the same level as the test files. - Before merging, we will need to update
Makefile
andCONTRIBUTING.md
.
pub(crate) enum NodeStartOption { | ||
// Don't wait node to start | ||
NoWait, | ||
// Wait till other nodes recognize started node | ||
WaitOtherNotice, | ||
// Wait till started node report that client port is opened and operational | ||
WaitForBinaryProto, | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My first thought was that it is nice that you translated CCM options to enum, because Rust Driver contributor shouldn't have to know anything about CCM, but now I see that it is not that obvious. Each variant corresponds to a command line option, and Node::start
accepts Option<&[NodeStartOption]>
.
So this requires the user of those utilities to understand how CCM handles repeated / conflicting command line arguments. What happends if I pass Some(&[NodeStartOption::NoWait, NodeStartOption::WaitOtherNotice])
? It is not obvious to me after reading ccm's help and briefly even source code.
I think this should be a struct, with fields wait_other_notice: bool
and wait_for_binary_proto: bool
. This way the user specifices what they want to wait for, and only the ccm wrapper needs to know how to translate this to command line options.
pub(crate) enum NodeStopOption { | ||
// Don't wait node to properly stop | ||
NoWait, | ||
// Force-terminate node with `kill -9` | ||
Kill, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above, this would be better expressed as a struct with boolean fields.
lazy_static! { | ||
static ref CLUSTER_VERSION: String = | ||
std::env::var("SCYLLA_TEST_CLUSTER").unwrap_or("release:6.2.2".to_string()); | ||
static ref TEST_KEEP_CLUSTER_ON_FAILURE: bool = !std::env::var("TEST_KEEP_CLUSTER_ON_FAILURE") | ||
.unwrap_or("".to_string()) | ||
.parse::<bool>() | ||
.unwrap_or(false); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The files with tests should not interact with environment at all. Cluster version selection (and thus reading the env var) should be done by the ccm utils.
Preferably, all interaction with env variables should be done in one place in the code.
async fn run_ccm_test<C, T, CFut, TFut>(cb: C, test_body: T) | ||
where | ||
C: FnOnce() -> CFut, | ||
CFut: std::future::Future<Output = Arc<RwLock<Cluster>>>, | ||
T: FnOnce(Arc<RwLock<Cluster>>) -> TFut, | ||
TFut: std::future::Future<Output = Result<(), Error>>, | ||
{ | ||
let cluster_arc = cb().await; | ||
{ | ||
let res = test_body(cluster_arc.clone()).await; | ||
if res.is_err() && *TEST_KEEP_CLUSTER_ON_FAILURE { | ||
println!("Test failed, keep cluster alive, TEST_KEEP_CLUSTER_ON_FAILURE=true"); | ||
cluster_arc.write().await.set_keep_on_drop(true); | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also something that should not be present in the test code, but rather in a centralized location.
async fn get_session(cluster: &Cluster) -> Session { | ||
let endpoints = cluster.nodes.get_contact_endpoints().await; | ||
SessionBuilder::new() | ||
.known_nodes(endpoints) | ||
.build() | ||
.await | ||
.unwrap() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two issues with this:
- This should not be in test file, but rather a method on
Cluster
- This should return a
SessionBuilder
so that a test can customize the session further. Seescylla-rust-driver/scylla/tests/integration/utils.rs
Lines 137 to 167 in bceac6a
pub(crate) fn create_new_session_builder() -> GenericSessionBuilder<impl SessionBuilderKind> { let session_builder = { #[cfg(not(scylla_cloud_tests))] { use scylla::client::session_builder::SessionBuilder; let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string()); SessionBuilder::new().known_node(uri) } #[cfg(scylla_cloud_tests)] { use scylla::client::session_builder::{CloudMode, CloudSessionBuilder}; use std::path::Path; std::env::var("CLOUD_CONFIG_PATH") .map(|config_path| CloudSessionBuilder::new(Path::new(&config_path))) .expect("Failed to initialize CloudSessionBuilder") .expect("CLOUD_CONFIG_PATH environment variable is missing") } }; // The reason why we enable so long waiting for TracingInfo is... Cassandra. (Yes, again.) // In Cassandra Java Driver, the wait time for tracing info is 10 seconds, so here we do the same. // However, as Scylla usually gets TracingInfo ready really fast (our default interval is hence 3ms), // we stick to a not-so-much-terribly-long interval here. session_builder .tracing_info_fetch_attempts(NonZeroU32::new(200).unwrap()) .tracing_info_fetch_interval(Duration::from_millis(50)) }
version: "".to_string(), | ||
ip_prefix: NetPrefix::empty(), | ||
root_dir: "/tmp/ccm".to_string(), | ||
nodes: Vec::new(), | ||
smp: DEFAULT_SMP, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Putting it in /tmp/ccm
will prevent running multiple instances of tests simultaneously.
Either put in in some folder in the repo (probably the better option), or (my less preferred option) use some lib like https://docs.rs/mktemp/latest/mktemp/ to make a unique folder in tmp.
// A simple abstraction to run commands, log command, exit code its stderr, stdout to a file | ||
// and optionally to own stderr/stdout | ||
// It should allow to run multiple commands in parallel | ||
pub(crate) struct LoggedCmd { | ||
file: Arc<Mutex<File>>, | ||
run_id: AtomicI32, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment explains what does this struct do, but not why.
Why would we need to log ccm output to a file? In other words why is it not enough to log it to normal output, like other stuff in tests do? Note that if you write something to stdoud/stderr it will be properly caught, and printed in the test fails.
To demonstrate, I wrote a tiny integration test:
#[tokio::test]
async fn test_output() {
setup_tracing();
debug!("Some debug output");
warn!("Some warn output");
println!("Some stdout output");
eprintln!("Some stderr output");
panic!("To see if the test fails");
}
If I run it with cargo test test_output
I get:
And if I enable tracing with RUST_LOG=trace cargo test test_output
I get the tracing output too:
Is it for some reason not enough to just normally log ccm commands?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When test is complex it is very handy to be able to see a flow of ccm commands it executed.
Even if we go with dumping everything into stderr, stdout, we will still need this wrapper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the complex test fails there are many things that are handy to see, I don't see a reason to single-out CCM to be dumped to the file.
The wrapper can still exists to dump CCM commands to the output.
I'd suggest using trace/debug for the actual output, and info for the command, env, and return code.
async fn sniff_ipv4_prefix() -> Result<NetPrefix, Error> { | ||
let mut used_ips: HashSet<IpAddr> = HashSet::new(); | ||
let file = File::open("/proc/net/tcp").await?; | ||
let mut lines = BufReader::new(file).lines(); | ||
while let Some(line) = lines.next_line().await? { | ||
let parts: Vec<&str> = line.split_whitespace().collect(); | ||
if let Some(ip_hex) = parts.get(1) { | ||
let ip_port: Vec<&str> = ip_hex.split(':').collect(); | ||
if let Some(ip_hex) = ip_port.get(0) { | ||
if let Some(ip) = u32::from_str_radix(ip_hex, 16).ok() { | ||
used_ips.insert( | ||
Ipv4Addr::new(ip as u8, (ip >> 8) as u8, (ip >> 16) as u8, 0).into(), | ||
); | ||
} | ||
} | ||
} | ||
} | ||
|
||
for a in 0..=255 { | ||
for b in 0..=255 { | ||
if a == 0 && b == 0 { | ||
continue; | ||
} | ||
let ip_prefix: IpAddr = Ipv4Addr::new(127, a, b, 0).into(); | ||
if !used_ips.contains(&ip_prefix) { | ||
return Ok(ip_prefix.into()); | ||
} | ||
} | ||
} | ||
Err(Error::msg("All ip ranges are busy")) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its absurd that we have to do this, it should be ccm's job to find a free ips for the nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, it does not do that and we have to dial with it.
// CCM does not allow to have one active cluster within one config directory | ||
// To have more than two active CCM cluster at the same time we isolate each cluster into separate | ||
// config director, each config directory is created in `root_dir`. | ||
// Example: root_dir = `/tmp/ccm`, config directory for cluster `test_cluster_1` is going be `/tmp/ccm/test_cluster_1` | ||
// and cluster directory (that is created and managed by CCM) for this cluster is going to be `/tmp/ccm/test_cluster_1/test_cluster_1` | ||
pub(crate) root_dir: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, ccm should be changed so that the commands can specify which cluster to operate on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true, but for now we have to dial with current behavior.
fn get_ccm_env(&self) -> HashMap<String, String> { | ||
let mut env: HashMap<String, String> = HashMap::new(); | ||
env.insert( | ||
"SCYLLA_EXT_OPTS".to_string(), | ||
format!("--smp={} --memory={}M", self.opts.smp, self.opts.memory), | ||
); | ||
env | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am unable to imagine why would someone think it is a good idea to pass some options throught env instead normal arguments.....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
)))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we should firstly focus on having the CI pass. Currently, the code does not even compile. Once we fix this, it will be much easier to test the code locally (which will be helpful in understanding the logic). I checked out on this PR locally, to play around with new API/run some tests but I was not able to do so, because it does not compile.
I suggest fixing the issues listed by: cargo clippy --verbose --all-features --all-targets -- -D warnings
. Or if you want to not only do the checks, but run the tests as well, I suggest using make ci
.
Implement simple abstraction that allows to run commands logging command, return status, stderr and stdout to a file and optionally to stderr/stdout
In order to programatically manipulate by scylla.yaml and cassandra.yaml We need to have some struct that provides API for that. It also needs to represent it in the way that ccm updateconf understand
CCM is cli tool to ease scylla/cassandra/dse cluster management. Using it in integration suit will help to expand test coverage.
d97469d
to
f858370
Compare
Thanks, done |
fn get_run_options(opts: RunOptions) -> (HashMap<String, String>, bool) { | ||
match opts { | ||
RunOptions::None => (HashMap::new(), false), | ||
RunOptions::Env(new_env) => (new_env, false), | ||
RunOptions::AllowFailure => (HashMap::new(), true), | ||
RunOptions::AllowFailureWithEnv(new_env) => (new_env, true), | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⛏️ I case all four combinations are valid, I'd just represent the type as:
pub(crate) struct RunOptions {
env: HashMap<String, String>,
allow_failure: bool,
}
async fn process_child_result( | ||
status: io::Result<ExitStatus>, | ||
allow_failure: bool, | ||
run_id: i32, | ||
stderr: Vec<u8>, | ||
command_with_args: String, | ||
writer: Arc<Mutex<File>>, | ||
) -> Result<ExitStatus, Error> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔧 As a rule of thumb, only pass ownership to a function if it requires it; default should to to pass heap-allocated objects by reference (mutable reference if needed).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async fn process_child_result(
status: io::Result<ExitStatus>,
allow_failure: bool,
run_id: i32,
stderr: &[u8],
command_with_args: &str,
writer: &Mutex<File>,
) -> Result<ExitStatus, Error> {
let tmp = stderr.deref(); | ||
return Err(Error::msg(format!( | ||
"Command `{}` failed: {}, stderr: \n{}", | ||
command_with_args, | ||
status, | ||
std::str::from_utf8(tmp)? | ||
))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This tmp
should be needed; deref
is often called automagically. Try putting &stderr
into from_utf8()
.
let command_with_args = std::iter::once(command.as_ref().to_string_lossy().into_owned()) | ||
.chain( | ||
args.clone() | ||
.into_iter() | ||
.map(|s| s.as_ref().to_string_lossy().into_owned()), | ||
) | ||
.collect::<Vec<String>>() | ||
.join(" "); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔧 You allocate this String
unconditionally, and its only use is in process_child_result
upon error. I believe it should be built lazily, only in the error case, inside process_child_result
. Is this possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving this would also make this function yet shorter, which is good considering its importance.
match buffer { | ||
Some(buffer) => { | ||
while let Some(line) = lines.next_line().await.ok().flatten() { | ||
let _ = writer | ||
.lock() | ||
.await | ||
.write_all(format!("{} {}\n", prefix, line).as_bytes()) | ||
.await; | ||
buffer.extend_from_slice(line.as_bytes()); | ||
} | ||
} | ||
None => { | ||
while let Some(line) = lines.next_line().await.ok().flatten() { | ||
let _ = writer | ||
.lock() | ||
.await | ||
.write_all(format!("{} {}\n", prefix, line).as_bytes()) | ||
.await; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔧 The match
over buffer
should be inside the while let
, not outside. This would deduplicate the while let
.
impl Default for NodeConfig { | ||
fn default() -> NodeConfig { | ||
let mut val = HashMap::new(); | ||
val.insert( | ||
"endpoint_snitch".to_string(), | ||
NodeConfig::String( | ||
"org.apache.cassandra.locator.GossipingPropertyFileSnitch".to_owned(), | ||
), | ||
); | ||
NodeConfig::Map(val) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw is there a reason for making this a dynamic type instead of a struct with fields corresponding to config values, besides the fact that it would probably be more work?
It's arguably less work, as we let serde_yaml
do the struct populating and error handling job for us.
let keys: Vec<&str> = path.split('.').collect(); | ||
let mut current = self; | ||
|
||
for key in keys { | ||
match current { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔧 You don't need to collect
an iterator in order to iterate over it. Let's simply remove collect()
.
CCM is cli tool to ease scylla/cassandra/dse cluster management.
Using it in integration suit will help to expand test coverage.
Pre-review checklist
./docs/source/
.Fixes:
annotations to PR description.