From b781cd676e99dd2f1ef8ceb159d0e53c8f8d82fa Mon Sep 17 00:00:00 2001 From: Tim Fennell Date: Fri, 13 Dec 2024 14:00:26 -0700 Subject: [PATCH] Emit primer pairs in penalty order. (#87) This PR does two things: 1. Fix a bug where amplicons that were too _small_ were still emitted 2. Ensure that primer pairs are emitted in penalty order (2) requires materializing a small tuple for all _valid_ pairs, and then sorting by score. The tuple contains two ints (indices into the primer sequences) and two floats (the penalty and the tm, the last for convenience so we don't have to recompute it). It the sorts the tuples by penalty, and starts generating PrimerPairs in penalty order. If you have e.g. 500 left and 500 right primers, this could construct ~250k tuples and calculate 250k Tms, but in reality the number is probably substantially smaller constrained by amplicon sizes. --- prymer/api/picking.py | 124 +++++++++++++++++++++++++------------- tests/api/test_picking.py | 4 +- 2 files changed, 85 insertions(+), 43 deletions(-) diff --git a/prymer/api/picking.py b/prymer/api/picking.py index fa00ae2..ce65059 100644 --- a/prymer/api/picking.py +++ b/prymer/api/picking.py @@ -21,6 +21,7 @@ from pathlib import Path from typing import Iterator from typing import Optional +from typing import Tuple from pysam import FastaFile @@ -80,18 +81,19 @@ def score( # The penalty for the amplicon melting temperature. # The difference in melting temperature between the calculated and optimal is weighted by the # product melting temperature. - tm = amplicon_tm tm_penalty: float - if tm > amplicon_tms.opt: - tm_penalty = (tm - amplicon_tms.opt) * weights.product_tm_gt + if amplicon_tms.opt == 0.0: + tm_penalty = 0.0 + elif amplicon_tm > amplicon_tms.opt: + tm_penalty = (amplicon_tm - amplicon_tms.opt) * weights.product_tm_gt else: - tm_penalty = (amplicon_tms.opt - tm) * weights.product_tm_lt + tm_penalty = (amplicon_tms.opt - amplicon_tm) * weights.product_tm_lt # Put it all together return left_primer.penalty + right_primer.penalty + size_penalty + tm_penalty -def build_primer_pairs( +def build_primer_pairs( # noqa: C901 left_primers: Sequence[Oligo], right_primers: Sequence[Oligo], target: Span, @@ -119,7 +121,8 @@ def build_primer_pairs( fasta_path: the path to the FASTA file from which the amplicon sequence will be retrieved. Returns: - an iterator over all the valid primer pairs, unsorted + An iterator over all the valid primer pairs, sorted by primer pair penalty. + Primer pairs with smaller penalties are returned first. """ # Short circuit if we have no left primers or no right primers if not any(left_primers) or not any(right_primers): @@ -131,50 +134,89 @@ def build_primer_pairs( if any(p.span.refname != target.refname for p in right_primers): raise ValueError("Right primers exist on different reference than target.") + # Sort the left and right primers + left_primers = sorted(left_primers, key=lambda p: p.span.start) + right_primers = sorted(right_primers, key=lambda p: p.span.end) + # Grab the sequence we'll use to fill in the amplicon sequence with FastaFile(f"{fasta_path}") as fasta: region_start = min(p.span.start for p in left_primers) region_end = max(p.span.end for p in right_primers) bases = fasta.fetch(target.refname, region_start - 1, region_end) + # Each tuple is left_idx, right_idx, penalty, tm + pairings: list[Tuple[int, int, float, float]] = [] + + # generate all the primer pairs that don't violate hard size and Tm constraints + first_right_primer_idx = 0 + + # Nested loops over indices are used here so that we can skip potentially large chunks of + # the cartesian product, based on the fact that we're sorting the left and right primers. + # Two things are relied upon: + # 1. If we encounter a left/right combo that either has the right primer leftward of the + # left primer _or_ generates a too-short amplicon, the neither that right primer nor + # any previous right primer can make a valid combination with any subsequent left primer. + # 2. If we encounter a left/right combo that generates a too-large amplicon, then no + # subsequent right-primer can make a valid combination with that left primer + for i in range(0, len(left_primers)): + for j in range(first_right_primer_idx, len(right_primers)): + lp = left_primers[i] + rp = right_primers[j] + + # If the right primer isn't "to the right" of the left primer, move on + if rp.span.start < lp.span.start or lp.span.end > rp.span.end: + first_right_primer_idx = max(first_right_primer_idx, j+1) + continue + + amp_span = PrimerPair.calculate_amplicon_span(lp, rp) + + if amp_span.length < amplicon_sizes.min: + first_right_primer_idx = max(first_right_primer_idx, j+1) + continue + + if amp_span.length > amplicon_sizes.max: + break # break in this case because all subsequent rps will yield longer amplicons + + # Since the amplicon span and the region_start are both 1-based, the minuend + # becomes a zero-based offset + amp_bases = bases[amp_span.start - region_start : amp_span.end - region_start + 1] + amp_tm = calculate_long_seq_tm(amp_bases) + + if amp_tm < amplicon_tms.min or amp_tm > amplicon_tms.max: + continue + + penalty = score( + left_primer=lp, + right_primer=rp, + amplicon=amp_span, + amplicon_tm=amp_tm, + amplicon_sizes=amplicon_sizes, + amplicon_tms=amplicon_tms, + weights=weights, + ) + + pairings.append((i, j, penalty, amp_tm)) + + # Sort by the penalty, ascending + pairings.sort(key=lambda tup: tup[2]) + with NtThermoAlign() as ntthal: - # generate all the primer pairs that don't violate hard size and Tm constraints - for lp in left_primers: - for rp in right_primers: - amp_span = PrimerPair.calculate_amplicon_span(lp, rp) + for i, j, penalty, tm in pairings: + lp = left_primers[i] + rp = right_primers[j] - # Ignore pairings with amplicon sizes out of the range specified - if not amplicon_sizes.min <= amp_span.length <= amplicon_sizes.max: + if max_heterodimer_tm is not None: + if ntthal.duplex_tm(lp.bases, rp.bases) > max_heterodimer_tm: continue - # Since the amplicon span and the region_start are both 1-based, the minuend - # becomes a zero-based offset - amp_bases = bases[amp_span.start - region_start : amp_span.end - region_start + 1] - amp_tm = calculate_long_seq_tm(amp_bases) + amp_bases = bases[lp.span.start - region_start : rp.span.end - region_start + 1] - if amp_tm < amplicon_tms.min or amp_tm > amplicon_tms.max: - continue + pp = PrimerPair( + left_primer=lp, + right_primer=rp, + amplicon_sequence=amp_bases, + amplicon_tm=tm, + penalty=penalty, + ) - if max_heterodimer_tm is not None: - if ntthal.duplex_tm(lp.bases, rp.bases) > max_heterodimer_tm: - continue - - penalty = score( - left_primer=lp, - right_primer=rp, - amplicon=amp_span, - amplicon_tm=amp_tm, - amplicon_sizes=amplicon_sizes, - amplicon_tms=amplicon_tms, - weights=weights, - ) - - pp = PrimerPair( - left_primer=lp, - right_primer=rp, - amplicon_sequence=amp_bases, - amplicon_tm=amp_tm, - penalty=penalty, - ) - - yield pp + yield pp diff --git a/tests/api/test_picking.py b/tests/api/test_picking.py index e69e5d0..7d2548a 100644 --- a/tests/api/test_picking.py +++ b/tests/api/test_picking.py @@ -273,8 +273,8 @@ def test_build_primers_amplicon_size_filtering( ) -> None: pairs = list( picking.build_primer_pairs( - left_primers=[p("A" * 20, tm=60, pos=s, pen=1.0) for s in range(1, 151, 10)], - right_primers=[p("A" * 20, tm=60, pos=s, pen=1.0) for s in range(151, 301, 10)], + left_primers=[p("A" * 20, tm=60, pos=s, pen=1.0) for s in range(51, 301, 10)], + right_primers=[p("A" * 20, tm=60, pos=s, pen=1.0) for s in range(1, 401, 10)], target=Span("chr1", 150, 160), amplicon_sizes=MinOptMax(100, 150, 200), amplicon_tms=MinOptMax(0.0, 0.0, 100.0),