Skip to content

Commit

Permalink
simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
nh13 committed Feb 26, 2022
1 parent 610b100 commit a5a150a
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 81 deletions.
103 changes: 22 additions & 81 deletions src/main/scala/com/fulcrumgenomics/umi/FilterConsensusReads.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import com.fulcrumgenomics.util.NumericTypes.PhredScore
import com.fulcrumgenomics.util.{Io, ProgressLogger}
import htsjdk.samtools.SAMFileHeader
import htsjdk.samtools.SAMFileHeader.GroupOrder
import htsjdk.samtools.reference.ReferenceSequenceFileWalker
import htsjdk.samtools.reference.ReferenceSequence
import htsjdk.samtools.util.SequenceUtil

import java.io.Closeable
Expand Down Expand Up @@ -119,8 +119,8 @@ class FilterConsensusReads
val minMeanBaseQuality: Option[PhredScore] = None,
@arg(flag='s', doc="Mask (make `N`) consensus bases where the AB and BA consensus reads disagree (for duplex-sequencing only).")
val requireSingleStrandAgreement: Boolean = false,
@arg(flag='S', doc="The sort order of the output, if `:none:` then the same as the input.") val sortOrder: Option[SamOrder] = Some(SamOrder.Coordinate),
@arg(flag='l', doc="Load the full reference sequence in memory") val loadFullReference: Boolean = false
@arg(flag='S', doc="The sort order of the output. If not given, query grouped if the input is also query grouped, otherwise queryname.")
val sortOrder: Option[SamOrder] = None
) extends FgBioTool with LazyLogging {
// Baseline input validation
Io.assertReadable(input)
Expand Down Expand Up @@ -175,9 +175,13 @@ class FilterConsensusReads
private val EmptyFilterResult = FilterResult(keepRead=true, maskedBases=0)

override def execute(): Unit = {
logger.info("Reading the reference into memory")
val refMap = ReferenceSequenceIterator(ref, stripComments=true).map { ref => ref.getContigIndex -> ref}.toMap
logger.info(f"Read ${refMap.size}%,d contigs.")

val progress = ProgressLogger(logger, verb="Filtered and masked")
val in = SamSource(input)
val out = buildOutputWriter(in.header)
val out = buildOutputWriter(in.header, refMap)

// Go through the reads by template and do the filtering
val templateIterator = Bams.templateIterator(in, maxInMemory=MaxRecordsInMemoryWhenSorting)
Expand Down Expand Up @@ -226,103 +230,40 @@ class FilterConsensusReads
logger.info(f"Masked $maskedBases%,d of $totalBases%,d bases in retained primary consensus reads.")
}

/** Builds a method to re-generate teh NM/UQ/MD tags based on if we are loading the full reference or not. Also
* returns a method to close the underling reference */
private def buildRegenerateNmUqMdTags(): (SamRecord => SamRecord, () => Unit) = {
if (loadFullReference) {
logger.info("Loading reference into memory")
val refMap = ReferenceSequenceIterator(ref, stripComments=true).map { ref => ref.getContigIndex -> ref}.toMap
val f = (rec: SamRecord) => Bams.regenerateNmUqMdTags(rec, refMap)
(f, () => ())
}
else {
logger.warning("Will require coordinate sorting to update tags, try --load-full-reference instead")
val walker = new ReferenceSequenceFileWalker(ref)
val f = (rec: SamRecord) => Bams.regenerateNmUqMdTags(rec, walker)
(f, () => walker.safelyClose())
}
}

/** Builds the writer to which filtered records should be written.
*
* The filtered records may be sorted once, twice, or never depending on (a) if the full reference is loaded into
* memory, (b) the order after filtering, and (c) the output order.
*
* The order after filtering is determined as follows:
* 1. If the input order is Queryname, or the input is query grouped, then use the input order.
* 2. Otherwise, Queryname.
* If the input order is [[SamOrder.Queryname]] or query grouped, then the filtered records will also be in the same
* order. So if the output order specified AND does not match the the input order, sorting will occur.
*
* The output order is determined as follows:
* 1. The order from the `--sort-order` option.
* 2. Otherwise, the order from the input file, if an order is present.
* 3. Otherwise, the order after filtering.
* If the input order is not [[SamOrder.Queryname]] or query grouped, then the input records will be resorted into
* [[SamOrder.Queryname]]. So if the output order specified AND is not [[SamOrder.Queryname]], sorting will occur.
*
* If the full reference has not been loaded then:
* 1. the filtered records are sorted by coordinate to reset the SAM tags.
* 2. if the output order is coordinate, the records are then written directly to the output, otherwise they are
* re-sorted (for a second time) to the desired output order, and written to the output.
* Otherwise, we can skip sorting!
*
* If the full reference has been loaded then:
* 1. if the output order is the same as the order after filtering, the filtered records are written to the output,
* otherwise they re-sorted to the desired output order and written to the output.
* */
private def buildOutputWriter(header: SAMFileHeader): Closeable with Writer[SamRecord] = {
val (regenerateNmUqMdTags, refCloseMethod) = buildRegenerateNmUqMdTags()
val outHeader = header.clone()
private def buildOutputWriter(inHeader: SAMFileHeader, refMap: Map[Int, ReferenceSequence]): Closeable with Writer[SamRecord] = {
val outHeader = inHeader.clone()

// Check if the input will be re-sorted into QueryName, or if the input sort order will be kept
val orderAfterFiltering = SamOrder(header) match {
val orderAfterFiltering = SamOrder(inHeader) match {
case Some(order) if order == SamOrder.Queryname || order.groupOrder == GroupOrder.query => order
case None => SamOrder.Queryname
case _ => SamOrder.Queryname // input will we resorted to queryname
}

// Get the output order
val outputOrder = this.sortOrder
.orElse(SamOrder(header)) // use the input sort order
.getOrElse(orderAfterFiltering) // use the order after filtering, so no sort occurs
outputOrder.applyTo(outHeader) // remember to apply it

// Build the writer
val sort = {
if (loadFullReference) {
// If the full reference has been loaded, we need only sort the output if the order after filtering does not
// match the output order.
if (orderAfterFiltering == outputOrder) None else Some(outputOrder)
}
else {
// If the full reference has not been loaded, we will need to coordinate sort to reset
// the tags, then only re-sort in the output order if not in coordinate order.
if (outputOrder == SamOrder.Coordinate) None else Some(outputOrder)
}
}
sort.foreach(o => logger.info(f"Output will be sorted into $o order"))
val sort = if (orderAfterFiltering == outputOrder) None else Some(outputOrder)
val writer = SamWriter(output, outHeader, sort=sort, maxRecordsInRam=MaxRecordsInMemoryWhenSorting)
sort.foreach(o => logger.info(f"Output will be sorted into $o order"))

// Create the final writer based on if the full reference has been loaded, or not
if (loadFullReference) {
new Writer[SamRecord] with Closeable {
override def write(rec: SamRecord): Unit = writer += regenerateNmUqMdTags(rec)
def close(): Unit = {
writer.close()
refCloseMethod()
}
}
}
else {
val progress = ProgressLogger(this.logger, "records", "sorted")
new Writer[SamRecord] with Closeable {
private val _sorter = Bams.sorter(order=SamOrder.Coordinate, header=header, maxRecordsInRam=MaxRecordsInMemoryWhenSorting)
override def write(rec: SamRecord): Unit = {
progress.record(rec)
this._sorter += rec
}
def close(): Unit = {
this._sorter.foreach { rec => writer += regenerateNmUqMdTags(rec) }
writer.close()
refCloseMethod()
}
}
new Writer[SamRecord] with Closeable {
override def write(rec: SamRecord): Unit = writer += Bams.regenerateNmUqMdTags(rec, refMap)
def close(): Unit = writer.close()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,67 @@ class FilterConsensusReadsTest extends UnitSpec {
}
}

private case class ReadNames(in: Seq[String], out: Seq[String])

private def sortOrderTest(name1: String, start1R1: Int, start1R2: Int, name2: String, start2R1: Int, start2R2: Int,
inOrder: SamOrder, outOrder: Option[SamOrder] = None): ReadNames = {
val builder = new SamBuilder(readLength=10, baseQuality=45, sort=Some(inOrder))
builder.addPair(name=name1, start1=start1R1, start2=start1R2).foreach(r => tag(r, minDepth=4, depth=4, readErr=0f, depths=arr(4, 10), errors=arr(0,10)))
builder.addPair(name=name2, start1=start2R1, start2=start2R2).foreach(r => tag(r, minDepth=5, depth=5, readErr=0f, depths=arr(5, 10), errors=arr(0,10)))
val in = builder.toTempFile()
val out = makeTempFile("filtered.", ".bam")
new FilterConsensusReads(input=in, output=out, ref=ref, reversePerBaseTags=false,
minBaseQuality=45.toByte, minReads=Seq(3), maxReadErrorRate=Seq(0.025), maxBaseErrorRate=Seq(0.1), maxNoCallFraction=0.1,
sortOrder=outOrder
).execute()

val recs = SamSource(out).toSeq
recs.size shouldBe 4
recs.exists(_.basesString.contains("N")) shouldBe false
ReadNames(in=readBamRecs(in).map(_.name), out=recs.map(_.name))
}

it should "should output queryname sorted if the input is queryname sorted" in {
val result = sortOrderTest(
name1="q1", start1R1=101, start1R2=201,
name2="q2", start2R1=100, start2R2=200,
inOrder=SamOrder.Queryname
)
result.in should contain theSameElementsInOrderAs Seq("q1", "q1", "q2", "q2") // query name!
result.out should contain theSameElementsInOrderAs Seq("q1", "q1", "q2", "q2") // query name!
}

it should "should output query grouped sorted if the input is query grouped sorted" in {
val result = sortOrderTest(
name1="q2", start1R1=100, start1R2=200,
name2="q1", start2R1=101, start2R2=201,
inOrder=SamOrder.TemplateCoordinate
)
result.in should contain theSameElementsInOrderAs Seq("q2", "q2", "q1", "q1") // query grouped, but not query name
result.out should contain theSameElementsInOrderAs Seq("q2", "q2", "q1", "q1") // query grouped, but not query name
}

it should "should output queryname sorted if the input is neither queryname nor query grouped sorted" in {
val result = sortOrderTest(
name1="q2", start1R1=100, start1R2=200,
name2="q1", start2R1=101, start2R2=201,
inOrder=SamOrder.Unsorted
)
result.in should contain theSameElementsInOrderAs Seq("q2", "q2", "q1", "q1") // query grouped, but not query name
result.out should contain theSameElementsInOrderAs Seq("q1", "q1", "q2", "q2") // query name
}

it should "should output coordinate sorted if the output order is coordinate" in {
val result = sortOrderTest(
name1="q1", start1R1=100, start1R2=200,
name2="q2", start2R1=101, start2R2=201,
inOrder=SamOrder.Queryname,
outOrder=Some(SamOrder.Coordinate)
)
result.in should contain theSameElementsInOrderAs Seq("q1", "q1", "q2", "q2") // query name
result.out should contain theSameElementsInOrderAs Seq("q1", "q2", "q1", "q2") // coordinate
}

//////////////////////////////////////////////////////////////////////////////
// Below this line are tests for filtering of duplex consensus reads.
//////////////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit a5a150a

Please sign in to comment.