10
10
"""
11
11
12
12
from __future__ import annotations
13
+ from typing import TextIO
13
14
import sys
14
15
import os
15
16
import logging
16
17
import tempfile
17
18
from pathlib import Path
18
19
19
- from Bio import SeqIO
20
+ from Bio import SeqIO , SeqRecord , SeqFeature
20
21
import pyfastx
21
22
22
23
import pynteny .utils as utils
@@ -156,15 +157,13 @@ def remove_duplicates(
156
157
self ,
157
158
output_file : Path = None ,
158
159
export_duplicates : bool = False ,
159
- method : str = "seqkit" ,
160
160
point_to_new_file : bool = True ,
161
161
) -> None :
162
162
"""Removes duplicate entries (either by sequence or ID) from fasta.
163
163
164
164
Args:
165
165
output_file (Path, optional): path to output fasta file. Defaults to None.
166
166
export_duplicates (bool, optional): whether duplicated records are exported to a file. Defaults to False.
167
- method (str, optional): choose method to select duplicates: 'biopython' or 'seqkit'. Defaults to 'seqkit'.
168
167
point_to_new_file (bool, optional): whether FASTA object should point to the newly generated file. Defaults to True.
169
168
170
169
Yields:
@@ -175,24 +174,11 @@ def remove_duplicates(
175
174
Path (self ._input_file .parent )
176
175
/ f"{ self ._input_file .stem } _noduplicates{ self ._input_file .suffix } "
177
176
)
178
-
179
- if "bio" in method :
180
- seen_seqs , seen_ids = set (), set ()
181
-
182
- def unique_records ():
183
- for record in SeqIO .parse (self ._input_file , "fasta" ):
184
- if (record .seq not in seen_seqs ) and (record .id not in seen_ids ):
185
- seen_seqs .add (record .seq )
186
- seen_ids .add (record .id )
187
- yield record
188
-
189
- SeqIO .write (unique_records (), output_file , "fasta" )
190
- else :
191
- wrappers .run_seqkit_nodup (
192
- input_fasta = self ._input_file ,
193
- output_fasta = output_file ,
194
- export_duplicates = export_duplicates ,
195
- )
177
+ wrappers .run_seqkit_nodup (
178
+ input_fasta = self ._input_file ,
179
+ output_fasta = output_file ,
180
+ export_duplicates = export_duplicates ,
181
+ )
196
182
if point_to_new_file :
197
183
self .set_file_path (output_file )
198
184
@@ -381,49 +367,51 @@ def from_genbank(
381
367
Path (gbk_files .pop ().parent ) / f"{ prefix } sequence_database.fasta"
382
368
)
383
369
384
- def get_label_str (gbk_contig , feature ):
385
- name = feature .qualifiers ["locus_tag" ][0 ].replace ("_" , "." )
386
- start , end , strand = (
387
- str (feature .location .start ),
388
- str (feature .location .end ),
389
- feature .location .strand ,
390
- )
391
- start = start .replace (">" , "" ).replace ("<" , "" )
392
- end = end .replace (">" , "" ).replace ("<" , "" )
393
- strand_sense = "neg" if strand == - 1 else ("pos" if strand == 1 else "" )
394
- return f">{ name } __{ gbk_contig .name .replace ('_' , '' )} _{ gene_counter } _{ start } _{ end } _{ strand_sense } \n "
395
-
396
- if nucleotide :
397
-
398
- def write_record (gbk_contig , feature , outfile , gene_counter ):
399
- header = get_label_str (gbk_contig , feature )
400
- sequence = str (feature .extract (gbk_contig ).seq )
401
- outfile .write (header )
402
- outfile .write (sequence + "\n " )
403
- gene_counter += 1
404
- return gene_counter
405
-
406
- else :
407
-
408
- def write_record (gbk_contig , feature , outfile , gene_counter ):
409
- if "translation" in feature .qualifiers :
410
- header = get_label_str (gbk_contig , feature )
411
- sequence = feature .qualifiers ["translation" ][0 ]
412
- outfile .write (header )
413
- outfile .write (sequence + "\n " )
414
- gene_counter += 1
415
- return gene_counter
416
-
417
370
with open (output_file , "w+" ) as outfile :
418
371
for gbk_contig in gbk_contigs :
419
372
gene_counter = 0
420
373
for feature in gbk_contig .features :
421
374
if "cds" in feature .type .lower ():
422
- gene_counter = write_record (
423
- gbk_contig , feature , outfile , gene_counter
375
+ gene_counter = cls . write_record (
376
+ gbk_contig , feature , outfile , gene_counter , nucleotide
424
377
)
425
378
return cls (output_file )
426
379
380
+ @staticmethod
381
+ def get_label_str (
382
+ gbk_contig : SeqRecord , feature : SeqFeature , gene_counter : int
383
+ ) -> str :
384
+ name = feature .qualifiers ["locus_tag" ][0 ].replace ("_" , "." )
385
+ start , end , strand = (
386
+ str (feature .location .start ),
387
+ str (feature .location .end ),
388
+ feature .location .strand ,
389
+ )
390
+ start = start .replace (">" , "" ).replace ("<" , "" )
391
+ end = end .replace (">" , "" ).replace ("<" , "" )
392
+ strand_sense = "neg" if strand == - 1 else ("pos" if strand == 1 else "" )
393
+ return f">{ name } __{ gbk_contig .name .replace ('_' , '' )} _{ gene_counter } _{ start } _{ end } _{ strand_sense } \n "
394
+
395
+ @staticmethod
396
+ def write_record (
397
+ gbk_contig : SeqRecord ,
398
+ feature : SeqFeature ,
399
+ outfile : TextIO ,
400
+ gene_counter : int ,
401
+ nucleotide : bool = False ,
402
+ ) -> int :
403
+ header = LabelledFASTA .get_label_str (gbk_contig , feature , gene_counter )
404
+ if (not nucleotide ) and ("translation" in feature .qualifiers ):
405
+ sequence = feature .qualifiers ["translation" ][0 ]
406
+ elif nucleotide :
407
+ sequence = str (feature .extract (gbk_contig ).seq )
408
+ else :
409
+ return gene_counter
410
+ outfile .write (header )
411
+ outfile .write (sequence + "\n " )
412
+ gene_counter += 1
413
+ return gene_counter
414
+
427
415
428
416
class GeneAnnotator :
429
417
"""Run prodigal on assembly, predict ORFs and extract location info"""
0 commit comments