Skip to content

Commit

Permalink
🚧 bash is a good plumber
Browse files Browse the repository at this point in the history
Co-authored-by: Thomas Sibley <[email protected]>
  • Loading branch information
victorlin and tsibley committed Jan 31, 2025
1 parent 4ada7e3 commit 44218f9
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 98 deletions.
139 changes: 58 additions & 81 deletions augur/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,26 +147,13 @@ def validate_sequences(sequences):
for s in sequences:
print_info(f"Validating {s!r}…")

with (NamedTemporaryFile() as dup_num_file,
seqkit('rmdup', '--dup-num-file', dup_num_file.name,
stdin =subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE) as seqkit_process,
):
read_process = subprocess.run([*augur, "read-file", s], stdout=seqkit_process.stdin)
with NamedTemporaryFile() as dup_num_file:
command = f"""
augur read-file {shquote(s)} |
{seqkit()} rmdup --dup-num-file {shquote(dup_num_file.name)}
"""

if read_process.returncode != 0:
raise AugurError(f"Unable to read file {s!r}.")

seqkit_process.stdin.close()

for line in iter(seqkit_process.stderr.readline, ''):
if line.startswith("[INFO]"):
print_debug(f"[SeqKit] {line.rstrip()}")
else:
print_info(f"[SeqKit] {line.rstrip()}")

returncode = seqkit_process.wait()
returncode = run_command(command, print_stdout=False)

if returncode != 0:
raise AugurError(f"Validation failed for {s!r}.")
Expand Down Expand Up @@ -459,78 +446,65 @@ def merge_sequences(args):
if args.quiet:
print_info = lambda *_: None

seqkit_process = seqkit('rmdup',
stdin =subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)

read_processes = []
print_info(f"Merging sequences and writing to {args.output_sequences!r}…")

# Reversed because seqkit rmdup keeps the first entry but this command
# should keep the last entry.
for s in reversed(args.sequences):
print_info(f"Reading sequences from {s!r}…")
# "echo;" adds a newline character to support FASTA files that are missing one at the end.
# This is fine because extraneous newline characters are stripped by seqkit.
command = f"""
(for f in {" ".join(shquote(s) for s in reversed(args.sequences))}; do
augur read-file "$f";
echo;
done) |
{seqkit()} rmdup |
augur write-file {shquote(args.output_sequences)}
"""

# run instead of Popen to ensure that file contents are read in series, not in parallel.
# FIXME: this hangs in two different ways. why?
#
# 1. sometime after copyfileobj (cutoff is probably 256 * 1024 bytes, one less seq and it hangs later):
# for i in $(seq -w 1 26214); do echo -e ">s$(printf "%05d" $i)\nA"; done > sequences.fasta; augur merge --sequences sequences.fasta --output-sequences merged.fasta
#
# 2. during copyfileobj (cutoff is probably 272 * 1024 bytes; one less seq and it hangs later):
# for i in $(seq -w 1 27853); do echo -e ">s$(printf "%05d" $i)\nA"; done > sequences.fasta; augur merge --sequences sequences.fasta --output-sequences merged.fasta
read_processes.append(subprocess.run([*augur, "read-file", s], stdout=seqkit_process.stdin))
returncode = run_command(command, print_stdout=True)

# Add a newline character to support FASTA files that are missing one at the end.
# This is fine because extraneous newline characters are stripped by seqkit.
print(file=seqkit_process.stdin)
if returncode != 0:
# Ideally, read-file, seqkit, and write-file errors would all be handled
# separately. That is not possible because everything is done in one
# child process with one return code.
# FIXME: make this better?
if args.output_sequences == "-":
raise AugurError(f"Merging failed, see error(s) above. You are responsible for cleanup of any partial outputs.")
else:
os.remove(args.output_sequences)
raise AugurError(f"Merging failed, see error(s) above.")

# Let seqkit know that there are no more sequences.
try:
seqkit_process.stdin.close()
except BrokenPipeError:
# This can be caused by seqkit returning an error, shown upon continuing (pass).
pass

print_info(f"Merging sequences and writing to {args.output_sequences!r}…")
def run_command(command: str, print_stdout: bool):
"""Run a command, clearly marking any messages that resemble SeqKit output."""
process = subprocess.Popen(
['bash', '-euo', 'pipefail', '-c', command],
stdout=subprocess.PIPE if print_stdout else subprocess.DEVNULL,
stderr=subprocess.PIPE,
text=True,
)

if print_stdout:
for line in iter(process.stdout.readline, ''):
print(line.rstrip())

# FIXME: this hangs after input size of 64 * 1024 bytes. why?
# works:
# for i in $(seq -w 1 6553); do echo -e ">s$(printf "%05d" $i)\nA"; done > sequences.fasta; augur merge --sequences sequences.fasta --output-sequences merged.fasta
# hangs:
# for i in $(seq -w 1 6554); do echo -e ">s$(printf "%05d" $i)\nA"; done > sequences.fasta; augur merge --sequences sequences.fasta --output-sequences merged.fasta
for line in iter(seqkit_process.stderr.readline, ''):
for line in iter(process.stderr.readline, ''):
# Remove ANSI escape sequences common in SeqKit output.
line = re.sub(r'\x1b\[[\d;]+m', '', line)

# Detect messages from seqkit and append a prefix.
if line.startswith("[INFO]"):
print_debug(f"[SeqKit] {line.rstrip()}")
else:
elif line.startswith("[ERRO]"):
print_info(f"[SeqKit] {line.rstrip()}")

# Write merged sequences.
write_process = subprocess.run([*augur, "write-file", args.output_sequences], stdin=seqkit_process.stdout)
else:
print_info(line.rstrip())

# Wait for the seqkit process to finish.
# FIXME: address potential deadlock issue?
# <https://docs.python.org/3/library/subprocess.html#subprocess.Popen.wait>
# Options:
# 1. iterate on seqkit_process.stdout.readline manually instead of passing above as stdin=seqkit_process.stdout?
# 2. use Popen.communicate() - a nonstarter because the data read is buffered in memory.
# <https://docs.python.org/3/library/subprocess.html#subprocess.Popen.communicate>
# 3. use asyncio - a nonstarter since it requires refactoring code beyond augur merge.
seqkit_process.wait()

if (any(process.returncode != 0 for process in read_processes) or
seqkit_process.returncode != 0 or
write_process.returncode != 0):
# Ideally, read-file, seqkit, and write-file errors would all be
# handled separately. However, due to the concurrent piping, the return
# codes are not always reflective of the process itself. For example, a
# seqkit error causes a read process's return code to be non-zero.
# FIXME: make this better?
if args.output_sequences == "-":
raise AugurError(f"Merging failed, see error(s) above. You are responsible for cleanup of any partial outputs.")
else:
os.remove(args.output_sequences)
raise AugurError(f"Merging failed, see error(s) above.")
# I think this is safe since process.stdout and process.stderr are iterated manually.
return process.wait()


def sqlite3(*args, **kwargs):
Expand Down Expand Up @@ -603,20 +577,23 @@ def sqlite_quote_string(x):
return "'" + x.replace("'", "''") + "'"


def seqkit(*args, **kwargs):
def seqkit():
"""
Internal helper for invoking ``seqkit``, the SeqKit CLI program.
Unlike ``sqlite3()``, this function is not a wrapper around subprocess.run.
It is meant to be called without any parameters and only returns the
location of the executable. This is due to differences in the way the two
programs are invoked.
"""
seqkit = os.environ.get("SEQKIT", which("seqkit"))
seqkit = shquote(os.environ.get("SEQKIT", which("seqkit")))
if not seqkit:
raise AugurError(dedent(f"""\
Unable to find the program `seqkit`. Is it installed?
In order to use `augur merge`, the SeqKit CLI must be installed
separately. It is typically provided by a Nextstrain runtime.
"""))
argv = [seqkit, *args]
print_debug(f"running {argv!r}")
return subprocess.Popen(argv, encoding="utf-8", text=True, **kwargs)
return seqkit


def pairs(xs: Iterable[str]) -> Iterable[Tuple[str, str]]:
Expand Down
2 changes: 0 additions & 2 deletions augur/read_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ def run(args):
# and like most Unix programs. See also
# <https://docs.python.org/3/library/signal.html#note-on-sigpipe>.
try:
print("running copyfileobj...", file=sys.stderr)
copyfileobj(f, sys.stdout, BUFFER_SIZE)
print("finished running copyfileobj.", file=sys.stderr)

# Force a flush so if SIGPIPE is going to happen it happens now.
sys.stdout.flush()
Expand Down
16 changes: 3 additions & 13 deletions tests/functional/merge/cram/merge-sequences-errors.t
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ Seqkit errors messages are shown directly.
> --sequences x.fasta y.fasta \
> --skip-input-sequences-validation \
> --output-sequences - > merged.fasta
Reading sequences from 'y.fasta'
Reading sequences from 'x.fasta'
Merging sequences and writing to '-'
[SeqKit] \x1b[31m[ERRO]\x1b[0m fastx: invalid FASTA/Q format (esc)
[SeqKit] [ERRO] fastx: invalid FASTA/Q format
ERROR: Merging failed, see error(s) above. You are responsible for cleanup of any partial outputs.
[2]

Expand All @@ -36,22 +34,14 @@ Input file doesn't exist
Validating 'x.fasta'
Validating 'z.fasta'
ERROR: No such file or directory: 'z.fasta'
ERROR: Unable to read file 'z.fasta'.
ERROR: Validation failed for 'z.fasta'.
[2]

$ ${AUGUR} merge \
> --sequences x.fasta z.fasta \
> --skip-input-sequences-validation \
> --output-sequences -
Reading sequences from 'z.fasta'
ERROR: No such file or directory: 'z.fasta'
Reading sequences from 'x.fasta'
Merging sequences and writing to '-'
>seq1
ATCG
>seq2
GCTA
>seq3
TCGA
ERROR: No such file or directory: 'z.fasta'
ERROR: Merging failed, see error(s) above. You are responsible for cleanup of any partial outputs.
[2]
2 changes: 0 additions & 2 deletions tests/functional/merge/cram/merge-sequences-only.t
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ Merge sequences without metadata.
> --output-sequences - > merged.fasta
Validating 'x.fasta'
Validating 'y.fasta'
Reading sequences from 'y.fasta'
Reading sequences from 'x.fasta'
Merging sequences and writing to '-'

$ cat merged.fasta
Expand Down

0 comments on commit 44218f9

Please sign in to comment.