-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconcat_alignments.py
executable file
·99 lines (82 loc) · 4.41 KB
/
concat_alignments.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#!/usr/bin/env python
import logging
import sys
import argparse
from Bio import AlignIO
from Bio.Align import MultipleSeqAlignment
from Bio.Seq import Seq #, UnknownSeq
from Bio.SeqRecord import SeqRecord
from collections import defaultdict
logger = logging.getLogger("concat")
def load_alignments(alignmentfiles, format):
alignments = []
for file in alignmentfiles:
try:
for alignment in AlignIO.parse(file, format=format):
logger.debug("loaded alignment of length {} from {}".format(len(alignment), file))
alignments.append(alignment)
except ValueError as e:
logger.error("Cannot parse input file {}: {}".format(file, e))
raise
logger.info("Successfully loaded {} alignments from {} input files"
.format(len(alignments), len(alignmentfiles)))
return alignments
def concatenate(alignments):
# Get the full set of labels (i.e. sequence ids) for all the alignments
all_labels = set(seq.id for aln in alignments for seq in aln)
logger.debug("extracted {} different labels in all alignments: {}"
.format(len(all_labels), all_labels))
# Make a dictionary to store info as we go along
# (defaultdict is convenient -- asking for a missing key gives back an empty list)
concat_buf = defaultdict(list)
for aln in alignments:
length = aln.get_alignment_length()
# check if any labels are missing in the current alignment
these_labels = set(rec.id for rec in aln)
missing = all_labels - these_labels
logger.debug("alignment of length {} with {} sequences, {} missing ({})"
.format(length, len(these_labels), len(missing), missing))
# if any are missing, create unknown data of the right length,
# stuff the string representation into the concat_buf dict
for label in missing:
new_seq = 'X'*length #UnknownSeq(length, character='X')
concat_buf[label].append(str(new_seq))
# else stuff the string representation into the concat_buf dict
for rec in aln:
concat_buf[rec.id].append(str(rec.seq))
# Stitch all the substrings together using join (most efficient way),
# and build the Biopython data structures Seq, SeqRecord and MultipleSeqAlignment
msa = MultipleSeqAlignment(SeqRecord(Seq(''.join(seq_arr)), id=label)
for (label, seq_arr) in concat_buf.items())
logger.info("concatenated MSA of {} taxa and total length {} created"
.format(len(msa), len(msa[0])))
return msa
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Concatenate alignments",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-f', '--format-in', default='fasta',
help="input format of the alignments. Any format that is understood"
"by Biopython's AlignIO module is possible.")
parser.add_argument('-o', '--output', type=argparse.FileType('w'), default=sys.stdout,
help="Path to the output file where the concatenated multiple "
"sequence alignment will be written")
parser.add_argument('-u', '--format-output', default='phylip-relaxed',
help="output format of the concatenated multiple sequence alignment")
parser.add_argument('-v', '--verbose', action='store_true', default=False,
help="Produce some output and status reports")
parser.add_argument('-d', '--debug', action="store_true", default=False,
help="Be more verbose for debugging purposes")
parser.add_argument('alignment', nargs='+', type=str,
help="Path to the alignment files. Use shell expansion to pass many files "
"in a simple way, e.g. \"/path/to/folder/*.fa\".")
conf = parser.parse_args()
level = logging.WARNING
if conf.verbose:
level = logging.INFO
if conf.debug:
level = logging.DEBUG
logging.basicConfig(level=level, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger.debug("Concatenate alignments: arguments: {}".format(conf))
alignments = load_alignments(conf.alignment, conf.format_in.lower())
msa = concatenate(alignments)
AlignIO.write(msa, conf.output, conf.format_output.lower())