Skip to content

Commit

Permalink
optimization to watch for logs before create
Browse files Browse the repository at this point in the history
  • Loading branch information
mukrishn committed Nov 1, 2023
1 parent b524580 commit 55666ed
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 84 deletions.
154 changes: 76 additions & 78 deletions libs/platforms/rosa/terraform/terraform.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ def _oidc_tf_template(self, action, tf_path, myenv):
def apply_tf_template(self, platform):
loop_counter = 0
while loop_counter < platform.environment["clusters_per_apply_count"]:
tf_counter = 0
self.logging.debug(platform.environment["clusters"])
if self.utils.force_terminate:
loop_counter += 1
Expand Down Expand Up @@ -149,14 +148,6 @@ def apply_tf_template(self, platform):
self.logging.error(f"Failed to apply with cluster seed {tf_name} looping {loop_counter + 1}")
self.logging.error(err)
return 1

while tf_counter < platform.environment["clusters_per_apply"]:
cluster_name = platform.environment["cluster_name_seed"] + "-" + str((loop_counter * self.environment['clusters_per_apply']) + (tf_counter + 1)).zfill(4)
platform.environment["clusters"][cluster_name] = {}
platform.environment["clusters"][cluster_name]["tf_index"] = loop_counter
platform.environment["clusters"][cluster_name]["cluster_start_time"] = cluster_start_time
tf_counter += 1

loop_counter += 1
return 0

Expand All @@ -165,7 +156,6 @@ def apply_tf_template(self, platform):
def destroy_tf_template(self, platform, tf_module="cluster"):
loop_counter = 0
while loop_counter < platform.environment["clusters_per_apply_count"]:
tf_counter = 0
self.logging.debug(platform.environment["clusters"])
if self.utils.force_terminate:
loop_counter += 1
Expand Down Expand Up @@ -228,13 +218,6 @@ def destroy_tf_template(self, platform, tf_module="cluster"):
self.logging.error(f"Failed to apply with cluster seed {tf_name} looping {loop_counter + 1}")
self.logging.error(err)
return 1

while tf_counter < platform.environment["clusters_per_apply"]:
cluster_name = platform.environment["cluster_name_seed"] + "-" + str((loop_counter * self.environment['clusters_per_apply']) + (tf_counter + 1)).zfill(4)
platform.environment["clusters"][cluster_name]["tf_index"] = loop_counter
platform.environment["clusters"][cluster_name]["cluster_delete_start_time"] = cluster_start_time
tf_counter += 1

loop_counter += 1
return 0

Expand All @@ -243,36 +226,45 @@ def destroy_tf_template(self, platform, tf_module="cluster"):
# No actual delete logic in this function unlike other subplatform
def delete_cluster(self, platform, cluster_name):
super().delete_cluster(platform, cluster_name)
retry_loop = 0
cluster_info = platform.environment["clusters"][cluster_name]
cluster_delete_start_time = platform.environment["clusters"][cluster_name]["cluster_delete_start_time"]
cluster_info["uuid"] = self.environment["uuid"]
cluster_info["install_method"] = "terraform"
cluster_info["per_template_count"] = platform.environment['clusters_per_apply_count']
cluster_info["per_template_count"] = platform.environment['clusters_per_apply']
cluster_info["tf_count"] = platform.environment['clusters_per_apply_count']
cluster_info["total_count"] = platform.environment['cluster_count']
self.logging.info(f"Checking uninstall log for cluster {cluster_name}")

watch_code, watch_out, watch_err = self.utils.subprocess_exec("rosa logs uninstall -c " + cluster_name + " --watch", cluster_info["path"] + "/cleanup.log", {'preexec_fn': self.utils.disable_signals})
if watch_code != 0:
cluster_info['status'] = "not deleted"
self.logging.debug(watch_out)
self.logging.error(watch_err)
return 1
else:
cluster_delete_end_time = int(datetime.datetime.utcnow().timestamp())
self.logging.debug(
f"Confirm cluster {cluster_name} deleted by attempting to describe the cluster. This should fail if the cluster is removed."
)
check_code, check_out, check_err = self.utils.subprocess_exec(
"rosa describe cluster -c " + cluster_name, log_output=False
)
if check_code != 0:
cluster_info["status"] = "deleted"
while retry_loop <= 7200: # 2hr timeout
retry_loop += 1
cluster_delete_start_time = int(datetime.datetime.utcnow().timestamp())
watch_code, watch_out, watch_err = self.utils.subprocess_exec("rosa logs uninstall -c " + cluster_name + " --watch", cluster_info["path"] + "/cleanup.log", {'preexec_fn': self.utils.disable_signals})
if watch_code != 0:
if retry_loop <= 7200:
self.logging.debug(f"ROSA cluster {cluster_name} is not available yet, retrying..")
self.logging.debug(watch_out)
time.sleep(1)
else:
cluster_info['status'] = "not deleted"
self.logging.debug(watch_out)
self.logging.error(watch_err)
return 1
else:
cluster_info["status"] = "not deleted"
cluster_delete_end_time = int(datetime.datetime.utcnow().timestamp())
self.logging.debug(
f"Confirm cluster {cluster_name} deleted by attempting to describe the cluster. This should fail if the cluster is removed."
)
check_code, check_out, check_err = self.utils.subprocess_exec(
"rosa describe cluster -c " + cluster_name, log_output=False
)
if check_code != 0:
cluster_info["status"] = "deleted"
else:
cluster_info["status"] = "not deleted"

cluster_end_time = int(datetime.datetime.utcnow().timestamp())
cluster_info["destroy_duration"] = cluster_delete_end_time - cluster_delete_start_time
cluster_info["destroy_all_duration"] = cluster_end_time - cluster_delete_start_time
cluster_end_time = int(datetime.datetime.utcnow().timestamp())
cluster_info["destroy_duration"] = cluster_delete_end_time - cluster_delete_start_time
cluster_info["destroy_all_duration"] = cluster_end_time - cluster_delete_start_time
try:
with open(cluster_info['path'] + "/metadata_destroy.json", "w") as metadata_file:
json.dump(cluster_info, metadata_file)
Expand Down Expand Up @@ -321,45 +313,51 @@ def create_cluster(self, platform, cluster_name):
os.mkdir(cluster_info["path"])
self.logging.debug("Output directory set to %s" % cluster_info["path"])

cluster_start_time = platform.environment["clusters"][cluster_name]["cluster_start_time"]

watch_code, watch_out, watch_err = self.utils.subprocess_exec("rosa logs install -c " + cluster_name + " --watch", cluster_info["path"] + "/installation.log", {'preexec_fn': self.utils.disable_signals})
if watch_code != 0:
cluster_info['status'] = "not ready"
self.logging.debug(watch_out)
self.logging.error(watch_err)
return 1
else:
cluster_info['status'] = "installed"
cluster_end_time = int(datetime.datetime.utcnow().timestamp())
# Getting againg metadata to update the cluster status
cluster_info["metadata"] = self.get_metadata(cluster_name)
cluster_info["install_duration"] = cluster_end_time - cluster_start_time
access_timers = self.get_cluster_admin_access(cluster_name, cluster_info["path"])
cluster_info["kubeconfig"] = access_timers.get("kubeconfig", None)
cluster_info["cluster_admin_create"] = access_timers.get("cluster_admin_create", None)
cluster_info["cluster_admin_login"] = access_timers.get("cluster_admin_login", None)
cluster_info["cluster_oc_adm"] = access_timers.get("cluster_oc_adm", None)
if not cluster_info["kubeconfig"]:
self.logging.error(f"Failed to download kubeconfig file for cluster {cluster_name}. Disabling wait for workers and workload execution")
cluster_info["workers_wait_time"] = None
cluster_info["status"] = "Ready. Not Access"
return 1
if cluster_info["workers_wait_time"]:
with concurrent.futures.ThreadPoolExecutor() as wait_executor:
futures = [wait_executor.submit(self._wait_for_workers, cluster_info["kubeconfig"], cluster_info["workers"], cluster_info["workers_wait_time"], cluster_name, "workers")]
futures.append(wait_executor.submit(self._wait_for_workers, cluster_info["kubeconfig"], platform.environment["extra_machinepool"]["replicas"], cluster_info["workers_wait_time"], cluster_name, platform.environment["extra_machinepool"]["name"])) if "extra_machinepool" in platform.environment else None
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result[0] == "workers":
default_pool_workers = int(result[1])
if default_pool_workers == cluster_info["workers"]:
cluster_info["workers_ready"] = result[2] - cluster_start_time
else:
cluster_info['workers_ready'] = None
cluster_info['status'] = "Ready, missing workers"
return 1
cluster_info['status'] = "ready"
while retry_loop <= 7200: # 2hr timeout
retry_loop += 1
cluster_start_time = int(datetime.datetime.utcnow().timestamp())
watch_code, watch_out, watch_err = self.utils.subprocess_exec("rosa logs install -c " + cluster_name + " --watch", cluster_info["path"] + "/installation.log", {'preexec_fn': self.utils.disable_signals})
if watch_code != 0:
if retry_loop <= 7200:
self.logging.debug(f"ROSA cluster {cluster_name} is not available yet, retrying..")
self.logging.debug(watch_out)
time.sleep(1)
else:
cluster_info['status'] = "not ready"
self.logging.debug(watch_out)
self.logging.error(watch_err)
return 1
else:
cluster_info['status'] = "installed"
cluster_end_time = int(datetime.datetime.utcnow().timestamp())
# Getting againg metadata to update the cluster status
cluster_info["metadata"] = self.get_metadata(cluster_name)
cluster_info["install_duration"] = cluster_end_time - cluster_start_time
access_timers = self.get_cluster_admin_access(cluster_name, cluster_info["path"])
cluster_info["kubeconfig"] = access_timers.get("kubeconfig", None)
cluster_info["cluster_admin_create"] = access_timers.get("cluster_admin_create", None)
cluster_info["cluster_admin_login"] = access_timers.get("cluster_admin_login", None)
cluster_info["cluster_oc_adm"] = access_timers.get("cluster_oc_adm", None)
if not cluster_info["kubeconfig"]:
self.logging.error(f"Failed to download kubeconfig file for cluster {cluster_name}. Disabling wait for workers and workload execution")
cluster_info["workers_wait_time"] = None
cluster_info["status"] = "Ready. Not Access"
return 1
if cluster_info["workers_wait_time"]:
with concurrent.futures.ThreadPoolExecutor() as wait_executor:
futures = [wait_executor.submit(self._wait_for_workers, cluster_info["kubeconfig"], cluster_info["workers"], cluster_info["workers_wait_time"], cluster_name, "workers")]
futures.append(wait_executor.submit(self._wait_for_workers, cluster_info["kubeconfig"], platform.environment["extra_machinepool"]["replicas"], cluster_info["workers_wait_time"], cluster_name, platform.environment["extra_machinepool"]["name"])) if "extra_machinepool" in platform.environment else None
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result[0] == "workers":
default_pool_workers = int(result[1])
if default_pool_workers == cluster_info["workers"]:
cluster_info["workers_ready"] = result[2] - cluster_start_time
else:
cluster_info['workers_ready'] = None
cluster_info['status'] = "Ready, missing workers"
return 1
cluster_info['status'] = "ready"
try:
with open(cluster_info['path'] + "/metadata_install.json", "w") as metadata_file:
json.dump(cluster_info, metadata_file)
Expand Down
12 changes: 6 additions & 6 deletions libs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,6 @@ def cleanup_scheduler(self, platform):
time.sleep(platform.environment["wait_before_cleanup"] * 60)
self.logging.info(f"Attempting to start cleanup process of {len(platform.environment['clusters'])} clusters waiting {platform.environment['delay_between_cleanup']} minutes between each deletion")
delete_cluster_thread_list = []
if platform.environment["subplatform"] and platform.environment["subplatform"] == "terraform":
if platform.destroy_tf_template(platform) != 0:
return 1
for cluster_name, cluster_info in platform.environment["clusters"].items():
self.logging.info(f"Attempting to start cleanup process of {cluster_name} on status: {cluster_info['status']}")
try:
Expand All @@ -116,6 +113,9 @@ def cleanup_scheduler(self, platform):
f"Waiting {platform.environment['delay_between_cleanup']} minutes before deleting the next cluster"
)
time.sleep(platform.environment["delay_between_cleanup"])
if platform.environment["subplatform"] and platform.environment["subplatform"] == "terraform":
if platform.destroy_tf_template(platform) != 0:
return 1
return delete_cluster_thread_list

# To form the cluster_info dict for cleanup funtions
Expand Down Expand Up @@ -155,9 +155,6 @@ def install_scheduler(self, platform):
batch_count = 0
loop_counter = 0
try:
if platform.environment["subplatform"] and platform.environment["subplatform"] == "terraform":
if platform.apply_tf_template(platform) != 0:
return 1
while loop_counter < platform.environment["cluster_count"]:
self.logging.debug(platform.environment["clusters"])
if self.force_terminate:
Expand Down Expand Up @@ -204,6 +201,9 @@ def install_scheduler(self, platform):
cluster_thread_list.append(thread)
thread.start()
self.logging.debug("Number of alive threads %d" % threading.active_count())
if platform.environment["subplatform"] and platform.environment["subplatform"] == "terraform":
if platform.apply_tf_template(platform) != 0:
return 1
except Exception as err:
self.logging.error(err)
self.logging.error("Thread creation failed")
Expand Down

0 comments on commit 55666ed

Please sign in to comment.