Skip to content

Commit

Permalink
feat: redirect BwaAlnInteractive stderr to debug logging (#74)
Browse files Browse the repository at this point in the history
  • Loading branch information
clintval authored Oct 17, 2024
1 parent 364a9dc commit 5d79a52
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 4 deletions.
31 changes: 30 additions & 1 deletion prymer/offtarget/bwa.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@
```
""" # noqa: E501

import logging
import os
import subprocess
from dataclasses import dataclass
from pathlib import Path
from threading import Thread
from typing import ClassVar
from typing import Optional
from typing import cast
Expand All @@ -56,6 +59,7 @@
from fgpyo.sam import Cigar
from pysam import AlignedSegment
from pysam import AlignmentHeader
from typing_extensions import override

from prymer.api import coordmath
from prymer.util.executable_runner import ExecutableRunner
Expand Down Expand Up @@ -298,7 +302,7 @@ def __init__(
"/dev/stdin",
]

super().__init__(command=command)
super().__init__(command=command, stderr=subprocess.PIPE)

header = []
for line in self._subprocess.stdout:
Expand All @@ -309,6 +313,18 @@ def __init__(

self.header = AlignmentHeader.from_text("".join(header))

# NB: ExecutableRunner will default to redirecting stderr to /dev/null. However, we would
# like to preserve stderr messages from bwa for potential debugging. To do this, we create
# a single thread to continuously read from stderr and redirect text lines to a debug
# logger. The close() method of this class will additionally join the stderr logging thread.
self._logger = logging.getLogger(self.__class__.__qualname__)
self._stderr_thread = Thread(
daemon=True,
target=self._stream_to_sink,
args=(self._subprocess.stderr, self._logger.debug),
)
self._stderr_thread.start()

def __signal_bwa(self) -> None:
"""Signals BWA to process the queries."""
self._subprocess.stdin.flush()
Expand All @@ -317,6 +333,19 @@ def __signal_bwa(self) -> None:
self._subprocess.stdin.write("\n" * 16)
self._subprocess.stdin.flush()

@override
def close(self) -> bool:
"""
Gracefully terminates the underlying subprocess if it is still running.
Returns:
True: if the subprocess was terminated successfully
False: if the subprocess failed to terminate or was not already running
"""
safely_closed: bool = super().close()
self._stderr_thread.join()
return safely_closed

def map_one(self, query: str, id: str = "unknown") -> BwaResult:
"""Maps a single query to the genome and returns the result.
Expand Down
26 changes: 23 additions & 3 deletions prymer/util/executable_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
from contextlib import AbstractContextManager
from pathlib import Path
from types import TracebackType
from typing import Callable
from typing import Optional
from typing import Self
from typing import TextIO


class ExecutableRunner(AbstractContextManager):
Expand All @@ -30,6 +32,12 @@ class ExecutableRunner(AbstractContextManager):
Subclasses of [`ExecutableRunner`][prymer.util.executable_runner.ExecutableRunner]
provide additional type checking of inputs and orchestrate parsing output data from specific
command-line tools.
Warning:
Users of this class must be acutely aware of deadlocks that can exist when manually
writing and reading to subprocess pipes. The Python documentation for subprocess and PIPE
has warnings to this effect as well as recommended workarounds and alternatives.
https://docs.python.org/3/library/subprocess.html
"""

__slots__ = ("_command", "_subprocess", "_name")
Expand All @@ -40,9 +48,13 @@ class ExecutableRunner(AbstractContextManager):
def __init__(
self,
command: list[str],
# NB: users of this class must be acutely aware of deadlocks that can exist when manually
# writing and reading to subprocess pipes. The Python documentation for subprocess and PIPE
# has warnings to this effect as well as recommended workarounds and alternatives.
# https://docs.python.org/3/library/subprocess.html
stdin: int = subprocess.PIPE,
stdout: int = subprocess.PIPE,
stderr: int = subprocess.PIPE,
stderr: int = subprocess.DEVNULL,
) -> None:
if len(command) == 0:
raise ValueError(f"Invocation must not be empty, received {command}")
Expand Down Expand Up @@ -71,6 +83,15 @@ def __exit__(
super().__exit__(exc_type, exc_value, traceback)
self.close()

@staticmethod
def _stream_to_sink(stream: TextIO, sink: Callable[[str], None]) -> None:
"""Redirect a text IO stream to a text sink."""
while True:
if line := stream.readline():
sink(line.rstrip())
else:
break

@classmethod
def validate_executable_path(cls, executable: str | Path) -> Path:
"""Validates user-provided path to an executable.
Expand Down Expand Up @@ -115,8 +136,7 @@ def is_alive(self) -> bool:

def close(self) -> bool:
"""
Gracefully terminates the underlying subprocess if it is still
running.
Gracefully terminates the underlying subprocess if it is still running.
Returns:
True: if the subprocess was terminated successfully
Expand Down
20 changes: 20 additions & 0 deletions tests/offtarget/test_bwa.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import shutil
from dataclasses import replace
from pathlib import Path
Expand Down Expand Up @@ -99,6 +100,25 @@ def test_map_one_uniquely_mapped(ref_fasta: Path) -> None:
assert result.query == query


def test_stderr_redirected_to_logger(ref_fasta: Path, caplog: pytest.LogCaptureFixture) -> None:
"""Tests that we redirect the stderr of the bwa executable to a logger.."""
caplog.set_level(logging.DEBUG)
query = Query(bases="TCTACTAAAAATACAAAAAATTAGCTGGGCATGATGGCATGCACCTGTAATCCCGCTACT", id="NA")
with BwaAlnInteractive(ref=ref_fasta, max_hits=1) as bwa:
result = bwa.map_one(query=query.bases, id=query.id)
assert result.hit_count == 1
assert result.hits[0].refname == "chr1"
assert result.hits[0].start == 61
assert result.hits[0].negative is False
assert f"{result.hits[0].cigar}" == "60M"
assert result.query == query
assert "[bwa_aln_core] calculate SA coordinate..." in caplog.text
assert "[bwa_aln_core] convert to sequence coordinate..." in caplog.text
assert "[bwa_aln_core] refine gapped alignments..." in caplog.text
assert "[bwa_aln_core] print alignments..." in caplog.text
assert "[bwa_aln_core] 1 sequences have been processed" in caplog.text


def test_map_one_unmapped(ref_fasta: Path) -> None:
"""Tests that bwa returns an unmapped alignment. The hit count should be zero and the list
of hits empty."""
Expand Down

0 comments on commit 5d79a52

Please sign in to comment.