Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added an optimisation to spare not required shuffle of unpaired reads #2282

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -130,7 +130,7 @@ class TransformFragments(protected val args: TransformFragmentsArgs) extends BDG

val rdd = if (args.loadAsAlignments) {
sc.loadAlignments(args.inputPath)
.toFragments
.toFragments()
} else {
sc.loadFragments(args.inputPath)
}
Original file line number Diff line number Diff line change
@@ -3680,7 +3680,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
.querynameSortedToFragments
} else {
info(s"Loading $pathName as BAM/CRAM/SAM and converting to Fragments.")
loadBam(pathName, stringency).toFragments
loadBam(pathName, stringency).toFragments()
}
} else if (isInterleavedFastqExt(trimmedPathName)) {
info(s"Loading $pathName as interleaved FASTQ and converting to Fragments.")
Original file line number Diff line number Diff line change
@@ -463,11 +463,17 @@ sealed abstract class AlignmentDataset extends AvroReadGroupGenomicDataset[Align
/**
* Convert this set of reads into fragments.
*
* @param mergePairedReads Merge paired reads into a fragment. Can be set to false when only unpaired
* reads are in the AlignmentDataset. Defaults to true.
* @return Returns a FragmentDataset where all reads have been grouped together by
* the original sequence fragment they come from.
*/
def toFragments(): FragmentDataset = {
FragmentDataset(groupReadsByFragment().map(_.toFragment),
def toFragments(mergePairedReads: Boolean = true): FragmentDataset = {
val fragmentRDD = if (mergePairedReads) groupReadsByFragment().map(_.toFragment)
else
rdd.map(SingleReadBucket.buildFragmentFromSingleAlignment)

FragmentDataset(fragmentRDD,
sequences,
readGroups,
processingSteps)
Original file line number Diff line number Diff line change
@@ -105,6 +105,27 @@ private[read] object SingleReadBucket {
def apply(fragment: Fragment): SingleReadBucket = {
fromGroupedReads(fragment.getAlignments.toIterable)
}

def buildFragmentFromSingleAlignment(alignment: Alignment): Fragment = {
buildFragmentFromAlignments(List(alignment), alignment.getReadName, Option(alignment.getReadGroupId), Option(alignment.getInsertSize).map(_.toInt))
}

def buildFragmentFromAlignments(reads: Seq[Alignment], readName: String, readGroupId: Option[String], insertSize: Option[Int]): Fragment = {

val builder = Fragment.newBuilder()
.setName(readName)
.setAlignments(seqAsJavaList(reads))

// is an insert size defined for this fragment?
insertSize.foreach(is => {
builder.setInsertSize(is)
})

// set read group name, if known
readGroupId.foreach(n => builder.setReadGroupId(n))

builder.build()
}
}

/**
@@ -139,24 +160,17 @@ private[adam] case class SingleReadBucket(
// want to pay the cost exactly once
val unionReads = allReads

// start building fragment
val builder = Fragment.newBuilder()
.setName(unionReads.head.getReadName)
.setAlignments(seqAsJavaList(allReads.toSeq))
val firstRead = unionReads.head

// is an insert size defined for this fragment?
primaryMapped.headOption
.foreach(r => {
Option(r.getInsertSize).foreach(is => {
builder.setInsertSize(is.toInt)
})
})
val insertSize = primaryMapped.headOption match {
case Some(alignment) => Option(alignment.getInsertSize).map(_.toInt)
case None => None
}

// set read group name, if known
Option(unionReads.head.getReadGroupId)
.foreach(n => builder.setReadGroupId(n))
SingleReadBucket.buildFragmentFromAlignments(unionReads.toSeq,
firstRead.getReadName, Option(firstRead.getReadGroupId),
insertSize)

builder.build()
}
}

592 changes: 592 additions & 0 deletions adam-core/src/test/resources/unpaired.sam

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -335,8 +335,8 @@ class FragmentDatasetSuite extends ADAMFunSuite {
}

sparkTest("union two genomic datasets of fragments together") {
val reads1 = sc.loadAlignments(testFile("bqsr1.sam")).toFragments
val reads2 = sc.loadAlignments(testFile("small.sam")).toFragments
val reads1 = sc.loadAlignments(testFile("bqsr1.sam")).toFragments()
val reads2 = sc.loadAlignments(testFile("small.sam")).toFragments()
val union = reads1.union(reads2)
assert(union.rdd.count === (reads1.rdd.count + reads2.rdd.count))
// all of the contigs small.sam has are in bqsr1.sam
@@ -636,4 +636,18 @@ class FragmentDatasetSuite extends ADAMFunSuite {

assert(convertedRdd.rdd.collect().toSet == convertedDataset.rdd.collect().toSet)
}

sparkTest("Compare unpaired FragmentDataset with optimisation to non-optimised") {

val alignmentDataset = sc.loadAlignments(testFile("unpaired.sam"))

val merged = alignmentDataset.toFragments()

val notMerged = alignmentDataset.toFragments(false)

assert(merged.rdd.count === notMerged.rdd.count)

assert(merged.toDF().except(notMerged.toDF()).count() === 0)

}
}
Original file line number Diff line number Diff line change
@@ -208,7 +208,7 @@ class MarkDuplicatesSuite extends ADAMFunSuite {

private def markDuplicateFragments(reads: Alignment*): Array[Alignment] = {
AlignmentDataset(sc.parallelize(reads), SequenceDictionary.empty, rgd, Seq.empty)
.toFragments
.toFragments()
.markDuplicates()
.toAlignments
.rdd
6 changes: 4 additions & 2 deletions adam-python/bdgenomics/adam/rdd.py
Original file line number Diff line number Diff line change
@@ -870,16 +870,18 @@ def _inferConversionFn(self, destClass):
return "org.bdgenomics.adam.api.java.AlignmentsTo%s" % self._destClassSuffix(destClass)


def toFragments(self):
def toFragments(self, mergePairedReads=True):
"""
Convert this set of reads into fragments.

:param mergePairedReads: Merge paired reads into a fragment. Can be set to false when only unpaired
reads are in the AlignmentDataset. Defaults to true.
:return: Returns a FragmentDataset where all reads have been grouped
together by the original sequence fragment they come from.
:rtype: bdgenomics.adam.rdd.FragmentDataset
"""

return FragmentDataset(self._jvmRdd.toFragments(), self.sc)
return FragmentDataset(self._jvmRdd.toFragments(mergePairedReads), self.sc)


def toCoverage(self, collapse = True):