Skip to content

Commit

Permalink
Merge pull request #300 from kedhammar/parse-anglerfish-df
Browse files Browse the repository at this point in the history
Parse Anglerfish dataframe instead of textfile and improve code factoring
  • Loading branch information
kedhammar authored Jan 4, 2024
2 parents 74aba6a + 1a83733 commit 3399e0b
Showing 1 changed file with 121 additions and 137 deletions.
258 changes: 121 additions & 137 deletions scripts/parse_anglerfish_results.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python
import glob
import logging
import os
from argparse import ArgumentParser
from datetime import datetime as dt
Expand All @@ -13,127 +14,107 @@
from epp_utils.udf_tools import fetch, put


def get_anglerfish_output_file(lims: Lims, currentStep: Process, log: list):
def find_latest_flowcell_run(currentStep: Process) -> str:
flowcell_id: str = currentStep.udf["ONT flow cell ID"].upper().strip()
run_query = f"/srv/ngi-nas-ns/minion_data/qc/*{flowcell_id}*"
logging.info(f"Looking for path {run_query}")

run_glob = glob.glob(run_query)
assert (
len(run_glob) != 0
), f"No runs with flowcell ID {flowcell_id} found on path {run_query}"
if len(run_glob) > 1:
runs_list = "\n".join(run_glob)
logging.warning(
f"Multiple runs with flowcell ID {flowcell_id} detected:\n{runs_list}"
)
latest_flowcell_run_path = max(run_glob, key=os.path.getctime)

logging.info(f"Using latest flowcell run {latest_flowcell_run_path}")
return latest_flowcell_run_path


def find_latest_anglerfish_run(latest_flowcell_run_path: str) -> str:
anglerfish_query = f"{latest_flowcell_run_path}/*anglerfish_run*"
anglerfish_glob = glob.glob(anglerfish_query)

assert (
len(anglerfish_glob) != 0
), f"No Anglerfish runs found for query {anglerfish_query}"

if len(anglerfish_glob) > 1:
runs_list = "\n".join(anglerfish_glob)
logging.warning(f"Multiple Anglerfish runs detected:\n{runs_list}")
latest_anglerfish_run_path = max(anglerfish_glob, key=os.path.getctime)
logging.info(f"Using latest Anglerfish run {latest_anglerfish_run_path}")

return latest_anglerfish_run_path


def upload_anglerfish_text_results(
lims: Lims, currentStep: Process, latest_anglerfish_run_path: str
):
logging.info("Uploading Anglerfish results .txt-file to LIMS")

anglerfish_file_slot: Artifact = [
outart
for outart in currentStep.all_outputs()
if outart.name == "Anglerfish Result File"
][0]

# Try to load file from LIMS
if anglerfish_file_slot.files:
loaded_file_name = anglerfish_file_slot.files[0].original_location.split("/")[
-1
]
log.append(
f"Anglerfish Result File '{loaded_file_name}' detected in the step, loading it directly"
)
bytes_content = lims.get_file_contents(
id=anglerfish_file_slot.files[0].id
).readlines()
content = [x.decode("utf-8") for x in bytes_content]

# Try to load file from ngi-nas-ns
else:
log.append(
"No 'Anglerfish Result File' detected in the step, trying to fetch it from ngi-nas-ns"
)

# Find latest run
run_query = f"/srv/ngi-nas-ns/minion_data/qc/*{flowcell_id}*"
log.append(f"Looking for path {run_query}")
run_glob = glob.glob(run_query)
assert (
len(run_glob) != 0
), f"No runs with flowcell ID {flowcell_id} found on path {run_query}"
if len(run_glob) > 1:
runs_list = "\n".join(run_glob)
log.append(
f"WARNING: Multiple runs with flowcell ID {flowcell_id} detected:\n{runs_list}"
)
latest_run_path = max(run_glob, key=os.path.getctime)
log.append(f"Using latest run {latest_run_path}")

# Find latest Anglerfish results of run
anglerfish_results_query = (
f"{latest_run_path}/*anglerfish*/anglerfish_stats.txt"
)
anglerfish_results_glob = glob.glob(anglerfish_results_query)
assert (
len(anglerfish_results_glob) != 0
), f"No Anglerfish results found for query {anglerfish_results_query}"
if len(anglerfish_results_glob) > 1:
results_list = "\n".join(anglerfish_results_glob)
log.append(
f"WARNING: Multiple Anglerfish results detected:\n{results_list}"
)
latest_anglerfish_results_path = max(
anglerfish_results_glob, key=os.path.getctime
)
log.append(f"Using latest Anglerfish results {latest_anglerfish_results_path}")

# Upload results to LIMS
lims.upload_new_file(anglerfish_file_slot, latest_anglerfish_results_path)
file_name = os.path.join(latest_anglerfish_run_path, "anglerfish_stats.txt")
assert os.path.exists(file_name), f"File {file_name} does not exist"

# Load file
content = open(latest_anglerfish_results_path).readlines()
# Upload results to LIMS
lims.upload_new_file(anglerfish_file_slot, file_name)

return content

def get_anglerfish_dataframe(latest_anglerfish_run_path: str) -> pd.DataFrame:
file_name = "anglerfish_dataframe.csv"
file_path = os.path.join(latest_anglerfish_run_path, file_name)
assert os.path.exists(file_path), f"File {file_path} does not exist"

def get_data(content: list, log: list):
data = []
header = None
df_raw = pd.read_csv(file_path)

# Extract sample data
for line in content:
# Search for header
if "sample_name" in line and "#reads" in line:
header = [e.strip() for e in line.split("\t")]
continue
return df_raw

# Parse tsv body
if (header) and (line != "\n"):
data.append([e.strip() for e in line.split("\t")])

# Ready tsv body until an empty line
if (header) and (line == "\n"):
break

else:
continue

# Compile data into dataframe
df = pd.DataFrame(data, columns=header)
df = df.astype(
{
"sample_name": str,
"#reads": int,
"mean_read_len": float,
"std_read_len": float,
"i5_reversed": bool,
"ont_barcode": str,
}
)
def parse_data(df_raw: pd.DataFrame):
df = df_raw.copy()

# Add additional metrics
df["repr_total_pc"] = df["#reads"] / df["#reads"].sum() * 100
df["repr_total_pc"] = df["num_reads"] / df["num_reads"].sum() * 100
df["repr_within_barcode_pc"] = df.apply(
# Sample reads divided by sum of all sample reads w. the same barcode
lambda row: row["#reads"]
/ df[df["ont_barcode"] == row["ont_barcode"]]["#reads"].sum()
lambda row: row["num_reads"]
/ df[df["ont_barcode"] == row["ont_barcode"]]["num_reads"].sum()
* 100,
axis=1,
)

return df


def fill_udfs(currentStep: Process, df: pd.DataFrame, log: list):
def ont_barcode_well2name(barcode_well: str) -> str:
# Add colon if not present
if ":" not in barcode_well:
barcode_well = f"{barcode_well[0]}:{barcode_well[1:]}"

# Get the number corresponding to the well (column-wise)
barcode_num_str = str(formula.well_name2num_96plate[barcode_well])

# Pad barcode number with leading zero if necessary
if len(barcode_num_str) < 2:
barcode_num_str = f"0{barcode_num_str}"
barcode_name = f"barcode{barcode_num_str}"

return barcode_name


def fill_udfs(currentStep: Process, df: pd.DataFrame):
# Dictate which LIMS UDF corresponds to which column in the dataframe
udfs_to_cols = {
"# Reads": "#reads",
"# Reads": "num_reads",
"Avg. Read Length": "mean_read_len",
"Std. Read Length": "std_read_len",
"Representation Within Run (%)": "repr_total_pc",
Expand All @@ -160,67 +141,53 @@ def fill_udfs(currentStep: Process, df: pd.DataFrame, log: list):

for illumina_sample in illumina_samples:
try:
# Translate the ONT barcode well to the barcode string used by Anglerfish
barcode_well: str = fetch(illumina_sample, "ONT Barcode Well")
# Add colon if not present
if ":" not in barcode_well:
barcode_well = f"{barcode_well[0]}:{barcode_well[1:]}"
# Get the number corresponding to the well (column-wise)
barcode_num_str = str(formula.well_name2num_96plate[barcode_well])
# Pad barcode number with leading zero if necessary
if len(barcode_num_str) < 2:
barcode_num_str = f"0{barcode_num_str}"
barcode_name = f"barcode{barcode_num_str}"

# Find the dataframe row matching the LIMS output artifact
barcode_name = ont_barcode_well2name(
fetch(illumina_sample, "ONT Barcode Well")
)

# Subset df to the current ONT barcode
df_barcode = df[df["ont_barcode"] == barcode_name]
df_match = df_barcode[

# Further subset df to the current Illumina sample
df_sample = df_barcode[
df_barcode["sample_name"] == illumina_sample.name
]

assert (
len(df_match) == 1
len(df_sample) == 1
), f"Multiple entries matching both Illumina sample name {illumina_sample.name} and ONT barcode {barcode_name} was found in the dataframe."

# Start putting UDFs
for udf, col in udfs_to_cols.items():
try:
value = float(df_match[col].values[0])
value = float(df_sample[col].values[0])
put(
illumina_sample,
udf,
value,
)
except:
log.append(
f"ERROR: Could not assign UDF '{udf}' value '{value}' for sample {illumina_sample.name}"
logging.error(
f"Could not assign UDF '{udf}' value '{value}' for sample {illumina_sample.name}"
)
continue

except:
log.append(
f"ERROR: Could not process sample {illumina_sample.name}"
)
logging.error(f"Could not process sample {illumina_sample.name}")
continue

except:
log.append(f"ERROR: Could not process pool {illumina_pool.name}")
logging.error(f"Could not process pool {illumina_pool.name}")
continue


def write_log(log, currentStep):
timestamp = dt.now().strftime("%y%m%d_%H%M%S")
log_filename = f"parse_anglerfish_results_log_{currentStep.id}_{timestamp}_{currentStep.technician.name.replace(' ','')}"
with open(log_filename, "w") as logContext:
logContext.write("\n".join(log))
return log_filename


def upload_log(currentStep, lims, log_filename):
def upload_log(currentStep: Process, lims: Lims, log_filename: str):
log_file_slot = [
slot
for slot in currentStep.all_outputs()
if slot.name == "Parse Anglerfish Results Log"
][0]

for f in log_file_slot.files:
lims.request_session.delete(f.uri)
lims.upload_new_file(log_file_slot, log_filename)
Expand All @@ -230,38 +197,55 @@ def upload_log(currentStep, lims, log_filename):


def main(lims: Lims, currentStep: Process):
# Instantiate log file
log: list = []
latest_flowcell_run_path = find_latest_flowcell_run(currentStep)
latest_anglerfish_run_path = find_latest_anglerfish_run(latest_flowcell_run_path)

upload_anglerfish_text_results(lims, currentStep, latest_anglerfish_run_path)

# Get file contents
file_content: list = get_anglerfish_output_file(lims, currentStep, log)
df_raw: pd.DataFrame = get_anglerfish_dataframe(latest_anglerfish_run_path)

# Parse the Anglerfish output
df: pd.DataFrame = get_data(file_content, log)
df_parsed: pd.DataFrame = parse_data(df_raw)

# Populate sample fields with Anglerfish results
fill_udfs(currentStep, df, log)

# Add sample comments
# TODO

# Write log
log_filename = write_log(log, currentStep)
fill_udfs(currentStep, df_parsed)

# Upload log
upload_log(currentStep, lims, log_filename)


if __name__ == "__main__":
# Parse script arguments
parser = ArgumentParser()
parser.add_argument(
"--pid", default="24-594126", dest="pid", help="Lims id for current Process"
)
args = parser.parse_args()

# Set up LIMS instance
lims = Lims(BASEURI, USERNAME, PASSWORD)
lims.check_version()

currentStep = Process(lims, id=args.pid)

# Set up logging
log_filename = (
"_".join(
[
"parse-anglerfish-results",
currentStep.id,
dt.now().strftime("%y%m%d-%H%M%S"),
currentStep.technician.name.replace(" ", ""),
]
)
+ ".log"
)

logging.basicConfig(
filename=log_filename,
filemode="w",
format="%(levelname)s: %(message)s",
level=logging.INFO,
)

main(lims, currentStep)

0 comments on commit 3399e0b

Please sign in to comment.