Skip to content
This repository has been archived by the owner on Feb 9, 2024. It is now read-only.

Commit

Permalink
Cortx 32762 rvf codacy fixes and rebase with main check in (#102)
Browse files Browse the repository at this point in the history
* Fixed/updated/Improved yaml/config/utils.

Signed-off-by: Vishal Dhobale <[email protected]>

* Fixed codacy/coding style issues.

Signed-off-by: Vishal Dhobale <[email protected]>

* Library for operation on remote machine.

Signed-off-by: Vishal Dhobale <[email protected]>

* loop to check completion of ongoing iterations.

Signed-off-by: Vishal Dhobale <[email protected]>

* Fixed codacy, coding style issues.

Signed-off-by: Vishal Dhobale <[email protected]>

Signed-off-by: Vishal Dhobale <[email protected]>
  • Loading branch information
DhobaleVishal committed Aug 23, 2022
1 parent 9b52f43 commit e1afe2d
Show file tree
Hide file tree
Showing 17 changed files with 528 additions and 523 deletions.
34 changes: 17 additions & 17 deletions config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@

import munch

from src.commons.constants import CLUSTER_CFG
from src.commons.constants import CORIO_CFG_PATH
from src.commons.constants import S3_CONFIG, S3_TOOL_PATH
from src.commons import constants as const
from src.commons import yaml_parser
from src.commons.utils import config_utils


Expand All @@ -41,27 +40,28 @@ def split_args(sys_cmd: List):
return _args


CORIO_CFG = config_utils.get_config_yaml(CORIO_CFG_PATH)
S3_CFG = config_utils.get_config_yaml(fpath=S3_CONFIG)
CLUSTER_CFG = config_utils.get_config_yaml(fpath=CLUSTER_CFG)
S3_TOOLS_CFG = config_utils.get_config_yaml(fpath=S3_TOOL_PATH)
CORIO_CFG = yaml_parser.read_yaml(fpath=const.CORIO_CFG_PATH)
S3_CFG = yaml_parser.read_yaml(fpath=const.S3_CONFIG)
CLUSTER_CFG = yaml_parser.read_yaml(fpath=const.CLUSTER_CFG)
S3_TOOLS_CFG = yaml_parser.read_yaml(fpath=const.S3_TOOL_PATH)
MASTER_CFG = yaml_parser.read_yaml(fpath=const.CORIO_MASTER_CONFIG)


IO_DRIVER_ARGS = split_args(sys.argv)
_USE_SSL = '-us' if '-us' in IO_DRIVER_ARGS else '--use_ssl' if '--use_ssl' in IO_DRIVER_ARGS\
else None
_USE_SSL = ('-us' if '-us' in IO_DRIVER_ARGS else '--use_ssl' if '--use_ssl' in IO_DRIVER_ARGS
else None)
SSL_FLG = IO_DRIVER_ARGS[IO_DRIVER_ARGS.index(_USE_SSL) + 1] if _USE_SSL else True
_ENDPOINT = '-ep' if '-ep' in IO_DRIVER_ARGS else '--endpoint' if '--endpoint' in IO_DRIVER_ARGS\
else None
_ENDPOINT = ('-ep' if '-ep' in IO_DRIVER_ARGS else '--endpoint' if '--endpoint' in IO_DRIVER_ARGS
else None)
S3_URL = IO_DRIVER_ARGS[IO_DRIVER_ARGS.index(_ENDPOINT) + 1] if _ENDPOINT else "s3.seagate.com"
_ACCESS_KEY = "-ak" if '-ak' in IO_DRIVER_ARGS else '--access_key' if '--access_key' in\
IO_DRIVER_ARGS else None
_ACCESS_KEY = ("-ak" if '-ak' in IO_DRIVER_ARGS else '--access_key' if '--access_key' in
IO_DRIVER_ARGS else None)
ACCESS_KEY = IO_DRIVER_ARGS[IO_DRIVER_ARGS.index(_ACCESS_KEY) + 1] if _ACCESS_KEY else None
_SECRT_KEY = "-sk" if '-sk' in IO_DRIVER_ARGS else '--secret_key' if '--secret_key' in\
IO_DRIVER_ARGS else None
_SECRT_KEY = ("-sk" if '-sk' in IO_DRIVER_ARGS else '--secret_key' if '--secret_key' in
IO_DRIVER_ARGS else None)
SECRT_KEY = IO_DRIVER_ARGS[IO_DRIVER_ARGS.index(_SECRT_KEY) + 1] if _SECRT_KEY else None
_S3MAX_RETRY = "-mr" if '-mr' in IO_DRIVER_ARGS else '--s3max_retry' if '--s3max_retry' in\
IO_DRIVER_ARGS else None
_S3MAX_RETRY = ("-mr" if '-mr' in IO_DRIVER_ARGS else '--s3max_retry' if '--s3max_retry' in
IO_DRIVER_ARGS else None)
S3MAX_RETRY = IO_DRIVER_ARGS[IO_DRIVER_ARGS.index(_S3MAX_RETRY) + 1] if _S3MAX_RETRY else 1
USE_SSL = ast.literal_eval(str(SSL_FLG).title())
S3_ENDPOINT = f"{'https' if USE_SSL else 'http'}://{S3_URL}"
Expand Down
202 changes: 32 additions & 170 deletions corio.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
"""Perform parallel S3 operations as per the given test input YAML using Asyncio."""

import argparse
import glob
import logging
import multiprocessing
import os
Expand All @@ -32,56 +31,28 @@
from distutils.util import strtobool
from pprint import pformat

import munch
import schedule

from config import S3_CFG, CORIO_CFG
from config import CORIO_CFG
from config import S3_CFG
from src.commons import cluster_health
from src.commons import constants as const
from src.commons.cluster_health import check_health
from src.commons.cluster_health import health_check_process
from src.commons.degrade_cluster import activate_degraded_mode_parallel
from src.commons.degrade_cluster import get_degraded_mode
from src.commons import degrade_cluster
from src.commons import scheduler
from src.commons import support_bundle
from src.commons.exception import DegradedModeError
from src.commons.exception import HealthCheckError
from src.commons.logger import StreamToLogger
from src.commons.scheduler import schedule_test_plan
from src.commons.scheduler import schedule_test_status_update
from src.commons.scheduler import terminate_update_test_status
from src.commons.support_bundle import collect_upload_rotate_support_bundles
from src.commons.support_bundle import support_bundle_process
from src.commons.logger import initialize_loghandler
from src.commons.utils import corio_utils
from src.commons.utils.alerts import SendMailNotification
from src.commons.utils.corio_utils import cpu_memory_details
from src.commons.utils.corio_utils import log_cleanup
from src.commons.utils.corio_utils import run_local_cmd
from src.commons.utils.corio_utils import setup_environment
from src.commons.utils.corio_utils import store_logs_to_nfs_local_server
from src.commons.utils.jira_utils import JiraApp
from src.commons.utils.resource_util import collect_resource_utilisation
from src.commons.workload_mapping import SCRIPT_MAPPING
from src.commons.yaml_parser import yaml_parser
from src.commons.yaml_parser import test_parser

LOGGER = logging.getLogger(const.ROOT)


def initialize_loghandler(opt):
"""Initialize io driver runner logging with stream and file handlers."""
# If log level provided then it will use DEBUG else will use default INFO.
dir_path = os.path.join(os.path.join(const.LOG_DIR, "latest"))
if not os.path.exists(dir_path):
os.makedirs(dir_path, exist_ok=True)
name = os.path.splitext(os.path.basename(__file__))[0]
if opt.verbose:
level = logging.getLevelName(logging.DEBUG)
log_path = os.path.join(dir_path, f"{name}_console_{const.DT_STRING}.DEBUG")
else:
level = logging.getLevelName(logging.INFO)
log_path = os.path.join(dir_path, f"{name}_console_{const.DT_STRING}.INFO")
os.environ["log_level"] = level
LOGGER.setLevel(level)
StreamToLogger(log_path, LOGGER, stream=True)
os.environ["log_path"] = log_path


def parse_args():
"""Commandline arguments for CORIO Driver."""
parser = argparse.ArgumentParser()
Expand Down Expand Up @@ -120,10 +91,10 @@ def parse_args():

def pre_requisites(options):
"""Perform health check and start resource monitoring."""
setup_environment()
corio_utils.setup_environment()
# Check cluster is healthy to start execution.
if options.health_check:
check_health()
cluster_health.check_health()
# start resource utilisation.
collect_resource_utilisation(action="start")
# Unique id for each run.
Expand All @@ -147,17 +118,6 @@ def get_parsed_input_details(flist: list, nodes: int) -> dict:
return parsed


def get_workload_list(path: str) -> list:
"""Get all workload filepath list."""
if os.path.isdir(path):
file_list = glob.glob(path + "/*")
elif os.path.isfile(path):
file_list = [os.path.abspath(path)]
else:
raise IOError(f"Incorrect test input: {path}")
return file_list


def check_report_duplicate_missing_ids(parsed_input, tests_details):
"""Check and report duplicate test ids from workload."""
test_ids = []
Expand Down Expand Up @@ -188,43 +148,6 @@ def check_report_duplicate_missing_ids(parsed_input, tests_details):
return tests_to_execute


def schedule_execution_plan(parsed_input: dict, options: munch.Munch, return_dict):
"""Schedule the execution plan."""
processes = {}
commons_params = {"access_key": S3_CFG.access_key,
"secret_key": S3_CFG.secret_key,
"endpoint_url": S3_CFG.endpoint,
"use_ssl": S3_CFG.use_ssl,
"seed": options.seed,
"sequential_run": options.sequential_run}
for test_plan, test_plan_value in parsed_input.items():
processes[test_plan] = multiprocessing.Process(target=schedule_test_plan, name=test_plan,
args=(test_plan, test_plan_value,
commons_params,))
LOGGER.info("scheduled execution plan. Processes: %s", processes)
if options.support_bundle:
processes["support_bundle"] = multiprocessing.Process(target=support_bundle_process,
name="support_bundle",
args=(CORIO_CFG.sb_interval_mins *
60,))
LOGGER.info("Support bundle collection scheduled for every %s minutes",
CORIO_CFG.sb_interval_mins)
if options.health_check:
processes["health_check"] = multiprocessing.Process(target=health_check_process,
name="health_check",
args=(CORIO_CFG.hc_interval_mins * 60,
return_dict,))
LOGGER.info("Health check scheduled for every %s minutes", CORIO_CFG.hc_interval_mins)

if options.degraded_mode:
master_config = yaml_parser("workload/master_config.yaml")
LOGGER.info("Master config data \n %s", master_config)
processes["degraded_mode"] = multiprocessing.Process(target=activate_degraded_mode_parallel,
name="degraded_mode",
args=(return_dict, master_config,))
return processes


def get_test_ids_from_terminated_workload(workload_dict: dict, workload_key: str) -> list:
"""Get all test-id from terminated workload due to failure."""
test_ids = []
Expand All @@ -233,68 +156,6 @@ def get_test_ids_from_terminated_workload(workload_dict: dict, workload_key: str
return test_ids


def monitor_processes(processes: dict, return_dict) -> str or None:
"""Monitor the process."""
skip_process = []
for tp_key, process in processes.items():
if not process.is_alive():
if tp_key == "support_bundle":
LOGGER.critical(
"Process with PID %s stopped Support bundle collection error.", process.pid)
skip_process.append(tp_key)
continue
if tp_key == "health_check":
raise HealthCheckError(
f"Process with PID {process.pid} stopped. Health Check collection error.")
if os.path.exists(os.getenv("log_path")):
resp = run_local_cmd(
f"grep 'topic {tp_key} completed successfully' {os.getenv('log_path')} ")
if resp[0] and resp[1]:
skip_process.append(tp_key)
continue
if tp_key == "degraded_mode":
if not return_dict['degraded_done']:
LOGGER.warning("Process with PID for Cluster Degraded Mode "
"Transition %s stopped.", process.pid)
raise DegradedModeError(f"Process with PID {process.pid} stopped.")
else:
LOGGER.info("Process with PID for Cluster Degraded Mode"
" Transition %s completed.", process.pid)
skip_process.append(tp_key)
continue
LOGGER.critical("Process with PID %s Name %s exited. Stopping other Process.",
process.pid, process.name)
return tp_key
for proc in skip_process:
LOGGER.warning("Process '%s' removed from monitoring...", processes[proc].pid)
processes.pop(proc)
return None


def terminate_processes(processes: dict) -> None:
"""
Terminate Process on failure.
:param processes: List of process to be terminated.
"""
LOGGER.debug("Processes to terminate: %s", processes)
for process in processes.values():
process.terminate()
process.join()


def start_processes(processes: dict) -> None:
"""
Trigger all proces from process list.
:param processes: List of process to start.
"""
LOGGER.info("Processes to start: %s", processes)
for process in processes.values():
process.start()
LOGGER.info("Process started: %s", process)


# pylint: disable=broad-except
def main(options):
"""
Expand All @@ -306,32 +167,32 @@ def main(options):
jira_obj = JiraApp() if options.test_plan else None
if jira_obj:
tests_details = jira_obj.get_all_tests_details_from_tp(options.test_plan, reset_status=True)
workload_list = get_workload_list(options.test_input)
workload_list = corio_utils.get_workload_list(options.test_input)
LOGGER.info("Test YAML Files to be executed : %s", workload_list)
parsed_input = get_parsed_input_details(workload_list, options.number_of_nodes)
tests_to_execute = check_report_duplicate_missing_ids(parsed_input, tests_details)
corio_start_time = datetime.now()
LOGGER.info("Parsed files data:\n %s", pformat(parsed_input))
return_dict = multiprocessing.Manager().dict()
processes = schedule_execution_plan(parsed_input, options, return_dict)
sched = schedule_test_status_update(parsed_input, corio_start_time,
periodic_time=CORIO_CFG.report_interval_mins,
sequential_run=options.sequential_run)
processes = scheduler.schedule_execution_plan(parsed_input, options, return_dict)
sched = scheduler.schedule_test_status_update(parsed_input, corio_start_time,
periodic_time=CORIO_CFG.report_interval_mins,
sequential_run=options.sequential_run)
mobj = SendMailNotification(corio_start_time, options.test_plan,
health_check=options.health_check, endpoint=S3_CFG.endpoint)
mobj.email_alert(action="start")
try:
if options.degraded_mode:
get_degraded_mode()
start_processes(processes)
degrade_cluster.get_degraded_mode()
scheduler.start_processes(processes)
while processes:
time.sleep(1)
cpu_memory_details()
corio_utils.cpu_memory_details()
schedule.run_pending()
if jira_obj:
jira_obj.update_jira_status(
corio_start_time=corio_start_time, tests_details=tests_to_execute)
terminated_tp = monitor_processes(processes, return_dict)
terminated_tp = scheduler.monitor_processes(processes, return_dict)
if terminated_tp:
test_ids = get_test_ids_from_terminated_workload(parsed_input, terminated_tp)
break
Expand All @@ -341,25 +202,26 @@ def main(options):
LOGGER.exception(err)
terminated_tp = type(err).__name__
finally:
terminate_processes(processes)
terminate_update_test_status(parsed_input, corio_start_time, terminated_tp, test_ids,
sched, action="final", sequential_run=options.sequential_run,)
scheduler.terminate_processes(processes)
scheduler.terminate_update_test_status(parsed_input, corio_start_time, terminated_tp,
test_ids, sched, action="final",
sequential_run=options.sequential_run,)
if jira_obj:
jira_obj.update_jira_status(
corio_start_time=corio_start_time, tests_details=tests_to_execute, aborted=True,
terminated_tests=test_ids)
if options.support_bundle:
collect_upload_rotate_support_bundles(const.CMN_LOG_DIR)
support_bundle.collect_upload_rotate_support_bundles(const.CMN_LOG_DIR)
mobj.email_alert(action="stop", tp=terminated_tp, ids=test_ids)
collect_resource_utilisation(action="stop")
store_logs_to_nfs_local_server()
corio_utils.store_logs_to_nfs_local_server()


if __name__ == "__main__":
# backup old execution logs.
log_cleanup()
OPTS = parse_args()
initialize_loghandler(OPTS)
LOGGER.info("Arguments: %s", OPTS)
pre_requisites(OPTS)
main(OPTS)
corio_utils.log_cleanup()
opts = parse_args()
initialize_loghandler(LOGGER, opts.verbose)
LOGGER.info("Arguments: %s", opts)
pre_requisites(opts)
main(opts)
2 changes: 1 addition & 1 deletion scripts/s3/s3api/mix_object_crud_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(self, access_key: str, secret_key: str, endpoint_url: str, **kwargs
self.access_key = access_key
self.secret_key = secret_key
self.endpoint_url = endpoint_url
self.iteration = 0
self.iteration = 1
self.sessions = kwargs.get("sessions")
if kwargs.get("duration"):
self.finish_time = datetime.now() + kwargs.get("duration")
Expand Down
Loading

0 comments on commit e1afe2d

Please sign in to comment.