Skip to content

Commit

Permalink
🚧 add sequence support
Browse files Browse the repository at this point in the history
  • Loading branch information
victorlin committed Oct 15, 2024
1 parent afd6fc7 commit 146084c
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 5 deletions.
168 changes: 163 additions & 5 deletions augur/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@
from shutil import which
from tempfile import mkstemp
from textwrap import dedent
from typing import Callable, Dict, Iterable, List, Optional, Sequence, Tuple, TypeVar
from typing import Callable, Dict, Iterable, List, Optional, Sequence, Tuple, TypeVar, Union

from augur.argparse_ import ExtendOverwriteDefault, SKIP_AUTO_DEFAULT_IN_HELP
from augur.errors import AugurError
from augur.io.metadata import DEFAULT_DELIMITERS, DEFAULT_ID_COLUMNS, Metadata
from augur.io.print import print_err, print_debug, _n
from augur.io.sequences import read_sequences
from augur.utils import first_line


Expand All @@ -74,6 +75,9 @@
augur = f"augur"


SEQUENCE_ID_COLUMN = 'id'


class Database:
fd: int
"""Database file descriptor"""
Expand Down Expand Up @@ -105,13 +109,41 @@ def cleanup(self):
os.unlink(self.path)


class UnnamedFile:
table_name: str
"""Generated SQLite table name for this file, based on *path*."""

path: str


class NamedFile:
name: str
"""User-provided descriptive name for this file."""

table_name: str
"""Generated SQLite table name for this file, based on *name*."""

path: str


class NamedSequenceFile(NamedFile):
def __init__(self, name: str, path: str):
self.name = name
self.path = path
self.table_name = f"sequences_{self.name}"

def __repr__(self):
return f"<NamedSequenceFile {self.name}={self.path}>"


class UnnamedSequenceFile(UnnamedFile):
def __init__(self, path: str):
self.path = path
self.table_name = f"sequences_{re.sub(r'[^a-zA-Z0-9]', '_', os.path.basename(self.path))}"

def __repr__(self):
return f"<NamedSequenceFile {self.name}={self.path}>"


class NamedMetadata(Metadata, NamedFile):
def __init__(self, name: str, *args, **kwargs):
Expand All @@ -128,6 +160,7 @@ def register_parser(parent_subparsers):

input_group = parser.add_argument_group("inputs", "options related to input")
input_group.add_argument("--metadata", nargs="+", action="extend", metavar="NAME=FILE", help="Required. Metadata table names and file paths. Names are arbitrary monikers used solely for referring to the associated input file in other arguments and in output column names. Paths must be to seekable files, not unseekable streams. Compressed files are supported." + SKIP_AUTO_DEFAULT_IN_HELP)
input_group.add_argument("--sequences", nargs="+", action="extend", metavar="[NAME=]FILE", help="Sequence files, optionally named for validation with named metadata. Compressed files are supported." + SKIP_AUTO_DEFAULT_IN_HELP)

input_group.add_argument("--metadata-id-columns", default=DEFAULT_ID_COLUMNS, nargs="+", action=ExtendOverwriteDefault, metavar="[TABLE=]COLUMN", help=f"Possible metadata column names containing identifiers, considered in the order given. Columns will be considered for all metadata tables by default. Table-specific column names may be given using the same names assigned in --metadata. Only one ID column will be inferred for each table. (default: {' '.join(map(shquote_humanized, DEFAULT_ID_COLUMNS))})" + SKIP_AUTO_DEFAULT_IN_HELP)
input_group.add_argument("--metadata-delimiters", default=DEFAULT_DELIMITERS, nargs="+", action=ExtendOverwriteDefault, metavar="[TABLE=]CHARACTER", help=f"Possible field delimiters to use for reading metadata tables, considered in the order given. Delimiters will be considered for all metadata tables by default. Table-specific delimiters may be given using the same names assigned in --metadata. Only one delimiter will be inferred for each table. (default: {' '.join(map(shquote_humanized, DEFAULT_DELIMITERS))})" + SKIP_AUTO_DEFAULT_IN_HELP)
Expand All @@ -136,23 +169,26 @@ def register_parser(parent_subparsers):
output_group.add_argument('--output-metadata', metavar="FILE", help="Required. Merged metadata as TSV. Compressed files are supported." + SKIP_AUTO_DEFAULT_IN_HELP)
output_group.add_argument('--source-columns', metavar="TEMPLATE", help=f"Template with which to generate names for the columns (described above) identifying the source of each row's data. Must contain a literal placeholder, {{NAME}}, which stands in for the metadata table names assigned in --metadata. (default: disabled)" + SKIP_AUTO_DEFAULT_IN_HELP)
output_group.add_argument('--no-source-columns', dest="source_columns", action="store_const", const=None, help=f"Suppress generated columns (described above) identifying the source of each row's data. This is the default behaviour, but it may be made explicit or used to override a previous --source-columns." + SKIP_AUTO_DEFAULT_IN_HELP)
output_group.add_argument('--output-sequences', metavar="FILE", help="Required. Merged sequences as FASTA. Compressed files are supported." + SKIP_AUTO_DEFAULT_IN_HELP)
output_group.add_argument('--quiet', action="store_true", default=False, help="Suppress informational and warning messages normally written to stderr. (default: disabled)" + SKIP_AUTO_DEFAULT_IN_HELP)

return parser


def validate_arguments(args):
# These will make more sense when sequence support is added.
if not args.metadata:
if not any((args.metadata, args.sequences)):
raise AugurError("At least one input must be specified.")
if not args.output_metadata:
if not any((args.output_metadata, args.output_sequences)):
raise AugurError("At least one output must be specified.")

if args.metadata and not len(args.metadata) >= 2:
raise AugurError(f"At least two metadata inputs are required for merging.")

if args.output_metadata and not args.metadata:
raise AugurError("--output-metadata requires --metadata.")
if args.output_sequences and not args.sequences:
raise AugurError("--output-sequences requires --sequences.")


def run(args: argparse.Namespace):
Expand All @@ -166,15 +202,65 @@ def run(args: argparse.Namespace):

db = Database()

metadata: Optional[List[NamedMetadata]] = None
sequences: Optional[List[Union[NamedSequenceFile, UnnamedSequenceFile]]] = None
if args.metadata:
metadata = get_metadata(args.metadata, args.metadata_id_columns, args.metadata_delimiters)
output_source_column = get_output_source_column(args.source_columns, metadata)
output_columns = get_output_columns(metadata, args.source_columns)

if args.sequences:
sequences = list(get_sequences(args.sequences))


# Perform checks on file names.

named_sequences = [s for s in sequences if isinstance(s, NamedSequenceFile)]

if unnamed_sequences := [s for s in sequences if isinstance(s, UnnamedSequenceFile)]:
for x in unnamed_sequences:
print_info(f"WARNING: Sequence file {x.path!r} is unnamed. Skipping validation with metadata.")

if metadata and named_sequences:
metadata_order = [m.name for m in metadata]
sequences_order = [s.name for s in named_sequences]

if metadata_order != sequences_order:
raise AugurError(f"Order of inputs differs between named metadata {metadata_order!r} and named sequences {sequences_order!r}.")

# FIXME: easy win that requires a few more conditions:
# ERROR: Sequence file 'c=c.fasta' does not have a corresponding metadata file.


# Load data.

if metadata:
load_metadata(db, metadata)
if sequences:
load_sequences(db, sequences)


# Perform checks on file contents.

if metadata and named_sequences:
metadata_by_name = {m.name: m for m in metadata}
sequences_by_name = {s.name: s for s in named_sequences}

for name in metadata_by_name.keys() & sequences_by_name.keys():
# FIXME: check database for matching entries
# WARNING: Sequence 'XXX' in a.tsv is missing from a.fasta. It will not be present in any output.
# WARNING: Sequence 'YYY' in b.fasta is missing from b.csv. It will not be present in any output.
...


# Handle outputs.

if args.output_metadata:
output_source_column = get_output_source_column(args.source_columns, metadata)
output_columns = get_output_columns(metadata, args.source_columns)
merge_metadata(db, metadata, output_columns, args.output_metadata, output_source_column)

if args.output_sequences:
merge_sequences(db, sequences, args.output_sequences)


def get_metadata(
input_metadata: Sequence[str],
Expand Down Expand Up @@ -387,6 +473,78 @@ def merge_metadata(
db.cleanup()


def get_sequences(input_sequences: List[str]):
sequences = parse_inputs(input_sequences)

for name, path in sequences:
if name == "":
yield UnnamedSequenceFile(path)
else:
yield NamedSequenceFile(name, path)


def load_sequences(db: Database, sequences: List[Union[NamedSequenceFile, UnnamedSequenceFile]]):
for s in sequences:
ids = [seq.id for seq in read_sequences(s.path)]

if duplicates := [item for item, count in count_unique(ids) if count > 1]:
raise AugurError(f"The following entries are duplicated in {s.path!r}:\n" + "\n".join(duplicates))

db.run(f"create table {sqlite_quote_id(s.table_name)} ({sqlite_quote_id(SEQUENCE_ID_COLUMN)} text);")
values = ", ".join([f"('{id}')" for id in ids])
db.run(f"insert into {sqlite_quote_id(s.table_name)} ({sqlite_quote_id(SEQUENCE_ID_COLUMN)}) values {values};")

db.run(f'create unique index {sqlite_quote_id(f"{s.table_name}_id")} on {sqlite_quote_id(s.table_name)}({sqlite_quote_id(SEQUENCE_ID_COLUMN)});')


# FIXME: return a list of arguments and don't use shell
def cat(filepath: str):
cat = "cat"

if filepath.endswith(".gz"):
cat = "gzcat"
if filepath.endswith(".xz"):
cat = "xzcat"
if filepath.endswith(".zst"):
cat = "zstdcat"

return f"{cat} {filepath}"


def merge_sequences(
db: Database,
sequences: List[Union[NamedSequenceFile, UnnamedSequenceFile]],
output_sequences: str,
):
# Confirm that seqkit is installed.
if which("seqkit") is None:
raise AugurError("'seqkit' is not installed! This is required to merge sequences.")

# Reversed because seqkit rmdup keeps the first entry but this command
# should keep the last entry.
# FIXME: don't use shell. just using it to get a sense of feasibility.
# FIXME: is seqkit overkill here? compare to ncov's drop_duplicate_sequences which is plain Python.
# https://github.com/nextstrain/ncov/blob/0769ac0429df8456ce70be2f74dc885d7b7fab12/scripts/sanitize_sequences.py#L127
cat_processes = (f"<({cat(filepath)})" for filepath in reversed([sequence_input.path for sequence_input in sequences]))
shell_cmd = f"cat {' '.join(cat_processes)} | seqkit rmdup"
print_debug(F"running shell command {shell_cmd!r}")
process = subprocess.Popen(shell_cmd, shell=True, executable="/bin/bash", stdout=subprocess.PIPE)

# FIXME: output only the sequences that are also present in metadata
# 1. before calling this function, create an index table mapping ID -> output (boolean)
# 2. create a file with list of IDs to output
# 3. use `seqkit grep` to filter the output of rmdup

# FIXME: handle `-` better
output = process.communicate()[0]
if output_sequences == "-":
sys.stdout.write(output.decode())
else:
with open(output_sequences, "w") as f:
f.write(output.decode())


# FIXME: do this for seqkit too
def sqlite3(*args, **kwargs):
"""
Internal helper for invoking ``sqlite3``, the SQLite CLI program.
Expand Down
41 changes: 41 additions & 0 deletions tests/functional/merge/cram/merge-sequences-and-metadata.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
SETUP

$ export AUGUR="${AUGUR:-$TESTDIR/../../../../bin/augur}"

Merge sequences and metadata

$ cat >x.fasta <<~~
> >seq1
> ATCG
> >seq2
> GCTA
> >seq3
> TCGA
> ~~

$ cat >y.fasta <<~~
> >seq3
> ATCG
> >seq4
> GCTA
> ~~

$ cat >x.tsv <<~~
> strain a b c
> one X1a X1b X1c
> two X2a X2b X2c
> ~~

$ cat >y.tsv <<~~
> strain b c f e d
> two Y2c Y2f Y2e Y2d
> three Y3f Y3e Y3d
> ~~

$ ${AUGUR} merge \
> --metadata X=x.tsv Y=y.tsv \
> --sequences X=x.fasta Y=y.fasta \
> --output-metadata merged.tsv \
> --output-sequences merged.fasta \
> --quiet
[INFO]\x1b[0m 1 duplicated records removed (esc)
36 changes: 36 additions & 0 deletions tests/functional/merge/cram/merge-sequences.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
SETUP

$ export AUGUR="${AUGUR:-$TESTDIR/../../../../bin/augur}"

Merge sequences without metadata

$ cat >x.fasta <<~~
> >seq1
> ATCG
> >seq2
> GCTA
> >seq3
> TCGA
> ~~

$ cat >y.fasta <<~~
> >seq3
> ATCG
> >seq4
> GCTA
> ~~

$ ${AUGUR} merge \
> --sequences x=x.fasta y=y.fasta \
> --output-sequences - > merged.fasta
[INFO]\x1b[0m 1 duplicated records removed (esc)

$ cat merged.fasta
>seq3
ATCG
>seq4
GCTA
>seq1
ATCG
>seq2
GCTA

0 comments on commit 146084c

Please sign in to comment.