Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-added functionality back to lpad.get_fw_id_from_reservation_id(reservation_id) #534

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
26 changes: 12 additions & 14 deletions fireworks/core/launchpad.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from collections import defaultdict
from itertools import chain


import gridfs
from bson import ObjectId
from monty.os.path import zpath
Expand All @@ -20,6 +21,8 @@
from tqdm import tqdm

from fireworks.core.firework import Firework, FWAction, Launch, Tracker, Workflow
from fireworks.utilities import fw_id_from_reservation_id
from fireworks.utilities import reservation_id_from_fw_id
from fireworks.fw_config import MongoClient
from fireworks.fw_config import (
GRIDFS_FALLBACK_COLLECTION,
Expand Down Expand Up @@ -1192,18 +1195,17 @@ def reserve_fw(self, fworker, launch_dir, host=None, ip=None, fw_id=None):
"""
return self.checkout_fw(fworker, launch_dir, host=host, ip=ip, fw_id=fw_id, state="RESERVED")

def get_fw_ids_from_reservation_id(self, reservation_id):
def get_fw_id_from_reservation_id(self, reservation_id):
"""
Given the reservation id, return the list of firework ids.

Args:
reservation_id (int)

Returns:
[int]: list of firework ids.
[int]: Return the firework id.
"""
l_id = self.launches.find_one({"state_history.reservation_id": reservation_id}, {"launch_id": 1})["launch_id"]
return [fw["fw_id"] for fw in self.fireworks.find({"launches": l_id}, {"fw_id": 1})]
fw_id=fw_id_from_reservation_id.get_fwid(reservation_id)

return fw_id

def cancel_reservation_by_reservation_id(self, reservation_id) -> None:
"""Given the reservation id, cancel the reservation and rerun the corresponding fireworks."""
Expand All @@ -1217,14 +1219,10 @@ def cancel_reservation_by_reservation_id(self, reservation_id) -> None:

def get_reservation_id_from_fw_id(self, fw_id):
"""Given the firework id, return the reservation id."""
fw = self.fireworks.find_one({"fw_id": fw_id}, {"launches": 1})
if fw:
for launch in self.launches.find({"launch_id": {"$in": fw["launches"]}}, {"state_history": 1}):
for d in launch["state_history"]:
if "reservation_id" in d:
return d["reservation_id"]
return None
return None
jobid=reservation_id_from_fw_id.main(fw_id)
if jobid==None:
print('No matching fw_id-JobID pair. The firework may be a lost run')
return jobid

def cancel_reservation(self, launch_id) -> None:
"""Given the launch id, cancel the reservation and rerun the fireworks."""
Expand Down
169 changes: 169 additions & 0 deletions fireworks/utilities/fw_id_from_reservation_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import os
import subprocess
import json
import paramiko
import getpass
import re

# Function to execute local shell commands and return the output
def execute_command(command):
try:
result = subprocess.run(command, shell=True, capture_output=True, text=True)
if result.returncode != 0:
#raise Exception(f"Command failed: {command}\n{result.stderr}")
raise Exception('Running fireworks locally')
ssh=None
return result.stdout.strip(),ssh
except Exception as e:
result,ssh=ssh_login(command)
print(e)
return result,ssh


def extract_username_hostname(input_string):
# Define the regex pattern
pattern = r'(?P<username>[^@]+)@(?P<hostname>.+)'

# Search for the pattern in the input string
match = re.match(pattern, input_string)

if match:
# Extract username and hostname from named groups
username = match.group('username')
hostname = match.group('hostname')
return username, hostname
else:
raise ValueError("The input does not match the required format 'username@hostname'.")

# Get user input

# SSH login and execute remote command
def ssh_login(command):
input_string = input("Enter username@hostname: ").strip()
username, hostname = extract_username_hostname(input_string)
password = getpass.getpass('Enter password+OTP: ')

# Create an SSH client
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

try:
# Connect to the server
ssh.connect(hostname, username=username, password=password)
# Execute the command
stdin, stdout, stderr = ssh.exec_command(command)
output = stdout.read().decode('utf-8').strip()
errors = stderr.read().decode('utf-8').strip()

if errors:
raise Exception(f"Command failed: {command}\n{errors}")

except Exception as e:
print(e)
return output, ssh


def get_fwid(jobid):
job_info,ssh = execute_command(f"scontrol show jobid {jobid}")
if ssh !=None:
fw_id=find_worker(job_info,ssh)
ssh.close()
else:
fw_id=find_worker(job_info,ssh)

return fw_id


def find_worker(job_info, ssh):
stdout_dir = ""
for line in job_info.splitlines():
if "StdOut=" in line:
stdout_dir = line.split("=", 1)[1]
break

if not stdout_dir:
print("StdOut path not found in job information")
return

base_dir = os.path.dirname(stdout_dir)

if ssh!=None:
# Change directory to the base directory on the remote server
stdin, stdout, stderr = ssh.exec_command(f"cd {base_dir} && pwd")
current_dir = stdout.read().decode('utf-8').strip()
errors = stderr.read().decode('utf-8').strip()
if errors:
raise Exception(f"Failed to change directory: {errors}")

print(f"Changed directory to: {current_dir}")

stdin, stdout, stderr = ssh.exec_command(f"find {current_dir} -type d -name 'launcher_*'")
launch_dirs = stdout.read().decode('utf-8').splitlines()
errors = stderr.read().decode('utf-8').strip()
if errors:
raise Exception(f"Failed to find launch directories: {errors}")

largest_dir = max(launch_dirs, key=lambda d: d.split('_')[-1])

# Change to the largest directory
stdin, stdout, stderr = ssh.exec_command(f"cd {largest_dir} && pwd")
final_dir = stdout.read().decode('utf-8').strip()
errors = stderr.read().decode('utf-8').strip()
if errors:
raise Exception(f"Failed to change directory to {largest_dir}: {errors}")

print(f"Changed directory to: {final_dir}")

# Check for the JSON file in the directory
stdin, stdout, stderr = ssh.exec_command(f"cat {final_dir}/FW.json")
json_data = stdout.read().decode('utf-8').strip()
errors = stderr.read().decode('utf-8').strip()
if errors:
raise Exception(f"Failed to read FW.json: {errors}")

data = json.loads(json_data)
spec_mpid = data.get('spec', {}).get('MPID', 'N/A')
fw_id = data.get('fw_id', 'N/A')

print(f"spec.MPID: {spec_mpid}")
print(f"fw_id: {fw_id}")
else:
# Change directory to the extracted base directory
try:
os.chdir(base_dir)
except OSError:
print(f"Failed to change directory to {base_dir}")
exit(1)

# Print the current directory to confirm
print(f"Changed directory to: {os.getcwd()}")

# Find the largest directory with the pattern "launcher_*"
launch_dirs = subprocess.check_output(f"find {os.getcwd()} -type d -name 'launcher_*'", shell=True).decode().splitlines()
largest_dir = max(launch_dirs, key=lambda d: d.split('_')[-1])

try:
os.chdir(largest_dir)
except OSError:
print(f"Failed to change directory to {largest_dir}")
exit(1)

print(f"Changed directory to: {os.getcwd()}")

json_file = os.path.join(os.getcwd(), "FW.json")

# Check if the JSON file exists
if os.path.isfile(json_file):
with open(json_file, 'r') as f:
data = json.load(f)
spec_mpid = data.get('spec', {}).get('MPID', 'N/A')
fw_id = data.get('fw_id', 'N/A')

# Output the extracted values
print(f"spec.MPID: {spec_mpid}")
print(f"fw_id: {fw_id}")
else:
print(f"FW.json not found in {largest_dir}")

return fw_id
return fw_id
Loading