-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add markov model #8
base: master
Are you sure you want to change the base?
Changes from all commits
98fd8bc
cb18811
ca089ee
1147617
4b091dd
6983d4b
90b6f48
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
import nerpa_utils | ||
import handle_rban | ||
import logger | ||
import src.markov_probability_model.main | ||
|
||
# for detecting and processing antiSMASH v.5 output | ||
site.addsitedir(os.path.join(nerpa_init.python_modules_dir, 'NRPSPredictor_utils')) | ||
|
@@ -59,6 +60,23 @@ def parse_args(log): | |
advanced_input_group.add_argument("--force-existing-outdir", dest="output_dir_reuse", action="store_true", default=False, | ||
help="don't crash if the output dir already exists") | ||
|
||
alternative_model_group = parser.add_argument_group( | ||
'Alternative model parameters', | ||
'Additionally use Hidden Markov Model for calculating probabilities and compare results') | ||
alternative_model_group.add_argument("--use_alternative_model", type=bool, default=False, | ||
help="use additional model or not") | ||
alternative_model_group.add_argument("--algo", nargs='+', | ||
help="list of algorithms to use for alignment", | ||
default=['viterbi', 'global_viterbi', 'maximum_accuracy', 'maximum_posterior_decoding']) | ||
alternative_model_group.add_argument("--use_bw", type=bool, default=False, | ||
help="use Baum-Welch for parameters estimation or not") | ||
alternative_model_group.add_argument("--bw_iters", type=int, default=10, | ||
help="number of Baum-Welch iterations") | ||
alternative_model_group.add_argument("--log_alignments", type=bool, default=False, | ||
help="pretty log alignments with marginal probabilities or not") | ||
alternative_model_group.add_argument("--topk", type=list, default=[1, 3, 5, 10], | ||
help="k value for top-k-matching in computing results") | ||
|
||
# parser.add_argument("--insertion", help="insertion score [default=-2.8]", default=-2.8, action="store") | ||
# parser.add_argument("--deletion", help="deletion score [default=-5]", default=-5, action="store") | ||
parser.add_argument('--rban-monomers-db', dest='rban_monomers', type=str, default=None, | ||
|
@@ -321,6 +339,13 @@ def run(args, log): | |
"--threads", str(args.threads)] | ||
log.info("\n======= Nerpa matching") | ||
nerpa_utils.sys_call(command, log, cwd=output_dir) | ||
if args.use_alternative_model: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Мне кажется всё-таки логичнее запускать что-то одно из этих двух в зависимости от параметров а не то и другое сразу. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. У меня просто в коде парсится файл с результатами нерпы, поэтому мне перед этим нужно ее запустить. Но вообще я могу сделать отдельный скрипт для своего кода, и в него передавать например папочку-результат работы нерпы? |
||
src.markov_probability_model.main.run( | ||
data_dir=output_dir, prob_gen_filepath=os.path.join(nerpa_init.configs_dir, 'prob_gen.cfg'), | ||
results_dir=os.path.join(output_dir, 'markov_probability_model_results'), | ||
mibig_path=os.path.join(nerpa_init.nerpa_root_dir, 'data', 'mibig.csv'), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Вот возможно я бы этот файл передавала бы как парметр. Типа если вы хотите что бы веса обучались, передай-те файлик для обучения у которого такие-то такие требование. Мне кажется весьма логичный аргумент. |
||
pool_sz=args.threads, algo=args.algo, use_bw=args.use_bw, bw_iters=args.bw_iters, | ||
log_alignments=args.log_alignments, topk=args.topk) | ||
log.info("RESULTS:") | ||
log.info("Main report is saved to " + os.path.join(output_dir, 'report.csv'), indent=1) | ||
log.info("Detailed reports are saved to " + output_dir, indent=1) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
pandas | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Полезный файлик :) Спасибо, что добавила :) |
||
typing | ||
numpy | ||
prettytable | ||
tqdm | ||
matplotlib |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
import abc | ||
from typing import List, NewType, TypeVar, Optional | ||
|
||
|
||
class Symbol(abc.ABC): | ||
def __init__(self, name: str, modification: Optional[str], methylation: bool): | ||
self.name = name | ||
self.modification = modification | ||
self.methylation = methylation | ||
|
||
def __str__(self) -> str: | ||
res = '' | ||
if self.modification is not None: | ||
res += self.modification + '-' | ||
if self.methylation: | ||
res += 'NMe-' | ||
res += self.name | ||
return res | ||
|
||
def __eq__(self, other): | ||
return str(self) == str(other) | ||
|
||
def __hash__(self): | ||
return hash(str(self)) | ||
|
||
def __repr__(self): | ||
return str(self) | ||
|
||
|
||
class Gap(Symbol): | ||
def __init__(self): | ||
super().__init__('-', None, False) | ||
|
||
|
||
class Aminoacid(Symbol): | ||
pass | ||
|
||
|
||
class ScoredAminoacid(Symbol): | ||
def __init__(self, init_name: str, modification: Optional[str], methylation: bool): | ||
super().__init__(init_name.split('(')[0], modification, methylation) | ||
self._score_str: str = init_name.split('(')[1].split(')')[0] | ||
self.score: float = float(self._score_str) | ||
|
||
def __str__(self): | ||
return super().__str__() + '(' + self._score_str + ')' | ||
|
||
|
||
AlignedAminoacid = TypeVar('AlignedAminoacid', Aminoacid, Gap) | ||
|
||
AlignedScoredAminoacid = TypeVar('AlignedScoredAminoacid', ScoredAminoacid, Gap) | ||
|
||
Alphabet = NewType('Alphabet', List[Symbol]) | ||
|
||
AminoacidAlphabet = NewType('AminoacidAlphabet', List[Aminoacid]) | ||
|
||
ScoredAminoacidAlphabet = NewType('ScoredAminoacidAlphabet', List[ScoredAminoacid]) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
import abc | ||
import re | ||
from typing import NewType, List | ||
|
||
SequenceId = NewType('SequenceId', str) | ||
BaseSequenceId = NewType('BaseSequenceId', str) | ||
|
||
|
||
class BaseSequenceIdResolver(abc.ABC): | ||
@abc.abstractmethod | ||
def resolve(self, sequence_id: SequenceId) -> SequenceId: | ||
pass | ||
|
||
|
||
class SimpleBaseSequenceIdResolver(BaseSequenceIdResolver): | ||
def __init__(self): | ||
self._regexps: List[str] = [ | ||
r'BGC[0-9]{7}', | ||
r'NPA[0-9]{6}', | ||
r'[A-Z]{3}[0-9]{5}_variant', | ||
r'[A-Z]{3}[0-9]{2}-[A-Z]{1}_variant', | ||
r'antimarin[0-9]{4}_[0-9]{4,5}_variant', | ||
r'streptomedb.[0-9]{2,4}_variant', | ||
r'mibig_[0-9]{3}_variant', | ||
] | ||
|
||
def resolve(self, sequence_id: SequenceId) -> SequenceId: | ||
for seq_re in self._regexps: | ||
matches = re.findall(seq_re, str(sequence_id)) | ||
if len(matches) > 0: | ||
res = matches[0] | ||
if res.endswith('_variant'): | ||
return res[:-len('_variant')] | ||
return res | ||
raise IndexError(f'Cannot resolve base for {sequence_id}') |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
from src.markov_probability_model.base.base_sequence_id_resolver import SequenceId, BaseSequenceIdResolver, \ | ||
SimpleBaseSequenceIdResolver | ||
from src.markov_probability_model.base.alphabet import Aminoacid, ScoredAminoacid, AlignedAminoacid, \ | ||
AlignedScoredAminoacid | ||
from typing import List, Optional, Generic, TypeVar | ||
|
||
S = TypeVar('S') | ||
|
||
|
||
class Sequence(Generic[S]): | ||
def __init__(self, sequence_id: SequenceId, symbols: List[S], | ||
base_seq_id_resolver: Optional[BaseSequenceIdResolver] = SimpleBaseSequenceIdResolver()): | ||
self.sequence_id = sequence_id | ||
self.symbols = symbols | ||
self._base_seq_id_resolver = base_seq_id_resolver | ||
|
||
@property | ||
def base_sequence_id(self): | ||
return self._base_seq_id_resolver.resolve(self.sequence_id) | ||
|
||
def __len__(self): | ||
return len(self.symbols) | ||
|
||
|
||
class AminoacidSequence(Sequence[Aminoacid]): | ||
pass | ||
|
||
|
||
class ScoredAminoacidSequence(Sequence[ScoredAminoacid]): | ||
pass | ||
|
||
|
||
class AlignedAminoacidSequence(Sequence[AlignedAminoacid]): | ||
pass | ||
|
||
|
||
class AlignedScoredAminoacidSequence(Sequence[AlignedScoredAminoacid]): | ||
pass |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
import numpy as np | ||
|
||
from typing import List | ||
|
||
|
||
def my_log(a): | ||
if a == 0: | ||
return -np.inf | ||
return np.log(a) | ||
|
||
|
||
def my_exp(a): | ||
if a == -np.inf: | ||
return 0 | ||
res = np.exp(a) | ||
if isinstance(res, np.ndarray): | ||
return res[0] | ||
return res | ||
|
||
|
||
def log_add_exp(l: List[float]): | ||
l = list(filter(lambda x: x != -np.inf, l)) | ||
if len(l) == 0: | ||
return -np.inf | ||
if len(l) == 1: | ||
return l[0] | ||
l = sorted(l) | ||
res = np.logaddexp(l[0], l[1]) | ||
for i in l[2:]: | ||
res = np.logaddexp(res, i) | ||
return res |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
import abc | ||
|
||
from typing import List, TypeVar | ||
from src.markov_probability_model.pairwise_alignment.sequence_aligner import PairwiseAlignmentOutputWithLogs | ||
from src.markov_probability_model.data_loader.data_loader import PairwiseAlignmentDataLoader, TwoSequenceListsData | ||
from src.markov_probability_model.base.sequence import AminoacidSequence, ScoredAminoacidSequence | ||
from src.markov_probability_model.base.alphabet import Gap, Aminoacid, ScoredAminoacid | ||
|
||
|
||
class AlignmentsLoader(PairwiseAlignmentDataLoader): | ||
@abc.abstractmethod | ||
def load_alignments(self) -> List[PairwiseAlignmentOutputWithLogs]: | ||
pass | ||
|
||
def load_data(self) -> TwoSequenceListsData: | ||
alignments = self.load_alignments() | ||
seqs1: List[AminoacidSequence] = [] | ||
seqs2: List[ScoredAminoacidSequence] = [] | ||
for alignment in alignments: | ||
s1: List[Aminoacid] = [] | ||
for s in alignment.aligned_sequence1.symbols: | ||
if s != Gap(): | ||
s1.append(s) | ||
seqs1.append(AminoacidSequence(alignment.aligned_sequence1.sequence_id, s1)) | ||
s2: List[ScoredAminoacid] = [] | ||
for s in alignment.aligned_sequence2.symbols: | ||
if s != Gap(): | ||
s2.append(s) | ||
seqs2.append(ScoredAminoacidSequence(alignment.aligned_sequence2.sequence_id, s2)) | ||
return TwoSequenceListsData(seqs1, seqs2) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
import abc | ||
|
||
from src.markov_probability_model.base.sequence import AminoacidSequence, ScoredAminoacidSequence | ||
from typing import List, Generic, TypeVar | ||
|
||
D = TypeVar('D') | ||
|
||
|
||
class DataLoader(abc.ABC, Generic[D]): | ||
@abc.abstractmethod | ||
def load_data(self) -> D: | ||
pass | ||
|
||
|
||
class TwoSequenceListsData: | ||
def __init__(self, sequences1: List[AminoacidSequence], sequences2: List[ScoredAminoacidSequence]): | ||
self.sequences1 = sequences1 | ||
self.sequences2 = sequences2 | ||
|
||
|
||
class PairwiseAlignmentDataLoader(DataLoader[TwoSequenceListsData], abc.ABC): | ||
pass |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
import abc | ||
import pandas as pd | ||
import os | ||
import copy | ||
|
||
from src.markov_probability_model.pairwise_alignment.fdr import FdrData, FdrParameters, FdrGenerator | ||
from src.markov_probability_model.pairwise_alignment.sequence_aligner import ScoredPairwiseAlignmentOutput | ||
from src.markov_probability_model.base.sequence import AlignedAminoacidSequence, AlignedScoredAminoacidSequence | ||
from src.markov_probability_model.base.base_sequence_id_resolver import SequenceId | ||
from typing import List, Dict | ||
|
||
|
||
class FdrLoader(abc.ABC): | ||
@abc.abstractmethod | ||
def load_fdr(self) -> List[Dict[str, FdrData]]: | ||
pass | ||
|
||
|
||
class FdrGeneratorFromReport(FdrLoader): | ||
def __init__(self, data_dir: str, fdr_parameters: List[FdrParameters]): | ||
self._data_dir = data_dir | ||
self._fdr_parameters = copy.deepcopy(fdr_parameters) | ||
for f in self._fdr_parameters: | ||
f.pairs_df_logpath = None | ||
f.best_pairs_df_logpath = None | ||
|
||
def load_fdr(self) -> List[Dict[str, FdrData]]: | ||
report = pd.read_csv(os.path.join(self._data_dir, 'report.csv')) | ||
alignments: List[ScoredPairwiseAlignmentOutput] = [ | ||
self.NerpaFdrAlignment(score, seq1, seq2.split('/')[-1]) | ||
for score, seq1, seq2 in zip(report['score'], report['mol id'], report['prediction id']) | ||
] | ||
fdrs = FdrGenerator(alignments, self._fdr_parameters).generate_fdr() | ||
return [{'NERPA': f} for f in fdrs] | ||
|
||
class NerpaFdrAlignment(ScoredPairwiseAlignmentOutput): | ||
def __init__(self, score: float, sequence_id1: SequenceId, sequence_id2: SequenceId): | ||
super().__init__(AlignedAminoacidSequence(sequence_id1, []), | ||
AlignedScoredAminoacidSequence(sequence_id2, [])) | ||
self._score = score | ||
|
||
def score(self) -> float: | ||
return self._score | ||
|
||
|
||
class CsvFdrLoader(FdrLoader): | ||
def __init__(self, data_dir: str, fdr_parameters: List[FdrParameters]): | ||
self._data_dir = data_dir | ||
self._fdr_parameters = fdr_parameters | ||
|
||
def load_fdr(self) -> List[Dict[str, FdrData]]: | ||
return [self._load_single_fdr(p) for p in self._fdr_parameters] | ||
|
||
def _load_single_fdr(self, p: FdrParameters) -> Dict[str, FdrData]: | ||
t = pd.read_csv(os.path.join( | ||
self._data_dir, f'FDR_top{p.topk}_{p.relative_to}_scores_with_garlic.csv')) | ||
return {'NERPA': FdrData(_fdr_row_to_array(t['FDR Nerpa'])), | ||
'GARLIC': FdrData(_fdr_row_to_array(t['FDR Garlic']))} | ||
|
||
|
||
def _fdr_row_to_array(row, max_len=500) -> List[float]: | ||
a = [] | ||
for c in row: | ||
if c == '-': | ||
break | ||
a.append(float(c)) | ||
if len(a) > max_len: | ||
a = a[:max_len] | ||
return list(a) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Мне кажется опять же, это скорее не кусок Нерпы, а отдельные скрипты должны быть, которые это всё считает. Но мне кажется это стоит отдельно обсудить, как это лучше сделать.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Можно обсудить. Но мне в любом случае для выполнения своего кода нужна папочка с результатами нерпы. Чтобы из них данные NRP и BGC парсить, и чтобы с ней результаты сравнивать)