Skip to content

Commit

Permalink
New EPP for parsing VC100 CSV file
Browse files Browse the repository at this point in the history
  • Loading branch information
chuan-wang committed Sep 20, 2024
1 parent 3784cda commit 63c1802
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 0 deletions.
4 changes: 4 additions & 0 deletions VERSIONLOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Scilifelab_epps Version Log

## 20240920.1

New EPP for parsing VC100 CSV file

## 20240913.1

Generate composite run manifests for AVITI based on index lengths.
Expand Down
117 changes: 117 additions & 0 deletions scripts/parse_vc100_results.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#!/usr/bin/env python

DESC = """EPP for parsing VC100 output CSV file
Volumes will be filled in by matching well positions
A warning message will be given for the wells that give a more than
threshold volume but should not have been used
Author: Chuan Wang, Science for Life Laboratory, Stockholm, Sweden
"""

import csv
import sys
from argparse import ArgumentParser

from genologics.config import BASEURI, PASSWORD, USERNAME
from genologics.entities import Process
from genologics.lims import Lims

from scilifelab_epps.epp import EppLogger, set_field

VOL_WARNING_THRESHOLD = 5


def get_vc100_file(process, log):
output = None
for outart in process.all_outputs():
# get the right output artifact
if outart.type == "ResultFile" and outart.name == "VC100 CSV File":
try:
fid = outart.files[0].id
content = lims.get_file_contents(id=fid)
if isinstance(content.data, bytes):
output = content.data.decode("utf-8")
except:
log.append("Cannot parse VC100 output file")
break
return output


def get_data(content, log):
data = dict()
headers = dict()
dialect = csv.Sniffer().sniff(content)
pf = csv.reader(content.splitlines(), dialect=dialect)
for line in pf:
# this is the header row
if "TUBE" in line:
for item in line:
headers[item] = line.index(item)
else:
well = line[headers["TUBE"]]
row = well[0]
col = str(int(well[1:]))
new_well = row + ":" + col
volume = line[headers["VOLAVG"]]
data[new_well] = volume
return data


def parse_vc100_results(process):
# strings returned to the EPP user
log = []
# Get file contents by parsing lims artifacts
content = get_vc100_file(process, log)
# parse the file and get the interesting data out
data = get_data(content, log)

# Fill in LIMS field Volume (ul)
for target_file in process.result_files():
well = target_file.samples[0].artifact.location[1]
if well in data:
if float(data[well]) > 0:
target_file.udf["Volume (ul)"] = float(data[well])
else:
target_file.udf["Volume (ul)"] = 0
del data[well]
else:
log.append(f"Cannot find volume for well {well} in the VC100 CSV file.")
target_file.put()
set_field(target_file)

# Give warning messages if a well supposed to be empty give volume
wells_with_warning = []
for k, v in data.items():
if float(v) > VOL_WARNING_THRESHOLD:
wells_with_warning.append(k)
log.append(
f"The following wells are supposed to be empty but give a volumen higher than {VOL_WARNING_THRESHOLD}: {','.join(wells_with_warning)}"
)

print("".join(log), file=sys.stderr)


def main(lims, pid, epp_logger):
process = Process(lims, id=pid)
parse_vc100_results(process)


if __name__ == "__main__":
parser = ArgumentParser(description=DESC)
parser.add_argument(
"--pid", default="24-38458", dest="pid", help="Lims id for current Process"
)
parser.add_argument(
"--log",
dest="log",
help=(
"File name for standard log file, " "for runtime information and problems."
),
)

args = parser.parse_args()

lims = Lims(BASEURI, USERNAME, PASSWORD)
lims.check_version()

with EppLogger(log_file=args.log, lims=lims, prepend=True) as epp_logger:
main(lims, args.pid, epp_logger)

0 comments on commit 63c1802

Please sign in to comment.