Skip to content

Commit

Permalink
refactor(manager): extend and refactor prepare_and_run_stress_read
Browse files Browse the repository at this point in the history
Changes:
- include verification step into the method (verify_stress_thread);
- var names and docstring general refactoring
  • Loading branch information
mikliapko authored and vponomaryov committed Dec 13, 2024
1 parent 6378cc9 commit b11f8df
Showing 1 changed file with 21 additions and 17 deletions.
38 changes: 21 additions & 17 deletions mgmt_cli_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,19 +258,25 @@ def run_verification_read_stress(self, ks_names=None):
stress_run_time = datetime.now() - stress_start_time
InfoEvent(message=f'The read stress run was completed. Total run time: {stress_run_time}').publish()

def prepare_and_run_stress_read(self, command_template, keyspace_name, number_of_rows):
def prepare_run_and_verify_stress_in_threads(self, cmd_template: str, keyspace_name: str, num_of_rows: int,
stop_on_failure: bool = False) -> None:
"""Prepares C-S commands, runs them in threads and verifies their execution results.
Stress operation can be either read or write, depending on the cmd_template.
"""
stress_queue = []
number_of_loaders = self.params.get("n_loaders")
rows_per_loader = int(number_of_rows / number_of_loaders)
for loader_index in range(number_of_loaders):
stress_command = command_template.format(num_of_rows=rows_per_loader,
keyspace_name=keyspace_name,
sequence_start=rows_per_loader * loader_index + 1,
sequence_end=rows_per_loader * (loader_index + 1),)
read_thread = self.run_stress_thread(stress_cmd=stress_command, round_robin=True,
stop_test_on_failure=False)
stress_queue.append(read_thread)
return stress_queue
num_of_loaders = self.params.get("n_loaders")
rows_per_loader = int(num_of_rows / num_of_loaders)
for loader_index in range(num_of_loaders):
stress_cmd = cmd_template.format(num_of_rows=rows_per_loader,
keyspace_name=keyspace_name,
sequence_start=rows_per_loader * loader_index + 1,
sequence_end=rows_per_loader * (loader_index + 1),)
_thread = self.run_stress_thread(stress_cmd=stress_cmd, round_robin=True,
stop_test_on_failure=stop_on_failure)
stress_queue.append(_thread)

for _thread in stress_queue:
assert self.verify_stress_thread(cs_thread_pool=_thread), "Stress thread verification failed"


class ClusterOperations(ClusterTester):
Expand Down Expand Up @@ -1531,11 +1537,9 @@ def test_restore_from_precreated_backup(self, snapshot_name: str, restore_outsid

if not (self.params.get('mgmt_skip_post_restore_stress_read') or snapshot_data.prohibit_verification_read):
self.log.info("Running verification read stress")
stress_queue = self.prepare_and_run_stress_read(command_template=snapshot_data.cs_read_cmd_template,
keyspace_name=snapshot_data.keyspaces[0],
number_of_rows=snapshot_data.number_of_rows)
for stress in stress_queue:
assert self.verify_stress_thread(cs_thread_pool=stress), "Data verification stress command"
self.prepare_run_and_verify_stress_in_threads(cmd_template=snapshot_data.cs_read_cmd_template,
keyspace_name=snapshot_data.keyspaces[0],
num_of_rows=snapshot_data.number_of_rows)
else:
self.log.info(f"Skipping verification read stress because of the test or snapshot configuration")

Expand Down

0 comments on commit b11f8df

Please sign in to comment.