Skip to content

Commit

Permalink
Merge pull request googlegenomics#196 from googlegenomics/passCallSets
Browse files Browse the repository at this point in the history
Pass call sets and use variant merge strategies.
  • Loading branch information
deflaux committed Jun 1, 2016
2 parents a9498bd + b8153e3 commit 41a63b3
Show file tree
Hide file tree
Showing 18 changed files with 404 additions and 186 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@
<dependency>
<groupId>com.google.cloud.genomics</groupId>
<artifactId>google-genomics-utils</artifactId>
<version>v1-0.2</version>
<version>v1-0.3</version>
<exclusions>
<!-- Exclude an old version of guava which is being pulled
in by a transitive dependency google-api-client. -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,19 @@
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.genomics.utils.OfflineAuth;
import com.google.cloud.genomics.utils.ShardBoundary;
import com.google.cloud.genomics.utils.grpc.MergeNonVariantSegmentsWithSnps;
import com.google.cloud.genomics.utils.grpc.VariantEmitterStrategy;
import com.google.cloud.genomics.utils.grpc.VariantMergeStrategy;
import com.google.cloud.genomics.utils.grpc.VariantStreamIterator;
import com.google.cloud.genomics.utils.grpc.VariantUtils;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.genomics.v1.StreamVariantsRequest;
import com.google.genomics.v1.StreamVariantsResponse;
import com.google.genomics.v1.Variant;
import com.google.genomics.v1.Variant.Builder;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;

/**
Expand All @@ -66,6 +62,11 @@ public static interface Options extends PipelineOptions {
int getBinSize();
void setBinSize(int binSize);

@Description("The class that determines the strategy for merging non-variant segments and variants.")
@Default.Class(MergeNonVariantSegmentsWithSnps.class)
Class<? extends VariantMergeStrategy> getVariantMergeStrategy();
void setVariantMergeStrategy(Class<? extends VariantMergeStrategy> mergeStrategy);

public static class Methods {
public static void validateOptions(Options options) {
Preconditions.checkArgument(0 < options.getBinSize(), "binSize must be greater than zero");
Expand Down Expand Up @@ -211,85 +212,32 @@ public void processElement(DoFn<StreamVariantsRequest, Iterable<Variant>>.Proces
}
}

/**
* This DoFn converts data with non-variant segments (such as data that was in
* source format Genome VCF (gVCF) or Complete Genomics) to variant-only data with calls from
* non-variant-segments merged into the variants with which they overlap.
*
* This is currently done only for SNP variants. Indels and structural variants are left as-is.
*/
public static final class CombineVariantsFn extends DoFn<Iterable<Variant>, Variant> {
private VariantMergeStrategy merger;

/**
* Dev note: this code aims to minimize the amount of data held in memory. It should only
* be the current variant we are considering and any non-variant segments that overlap it.
*/
@Override
public void processElement(ProcessContext context) throws Exception {
List<Variant> records = Lists.newArrayList(context.element());

// The sort order is critical here so that candidate overlapping reference matching blocks
// occur prior to any variants they may overlap.
Collections.sort(records, NON_VARIANT_SEGMENT_COMPARATOR);

// The upper bound on potential overlaps is the sample size plus the number of
// block records that occur between actual variants.
List<Variant> blockRecords = new LinkedList<>();

for (Variant record : records) {
if (!VariantUtils.IS_NON_VARIANT_SEGMENT.apply(record)) {
// Dataflow does not allow the output of modified input items, so we make a copy and
// modify that, if applicable.
Builder updatedRecord = Variant.newBuilder(record);
// TODO: determine and implement the correct criteria for overlaps of non-SNP variants
if (VariantUtils.IS_SNP.apply(record)) {
for (Iterator<Variant> iterator = blockRecords.iterator(); iterator.hasNext();) {
Variant blockRecord = iterator.next();
if (isOverlapping(blockRecord, record)) {
updatedRecord.addAllCalls(blockRecord.getCallsList());
} else {
// Remove the current element from the iterator and the list since it is
// left of the genomic region we are currently working on due to our sort.
iterator.remove();
}
}
}
// Emit this variant and move on (no need to hang onto it in memory).
context.output(updatedRecord.build());
} else {
blockRecords.add(record);
}
}
public void startBundle(DoFn<Iterable<Variant>, Variant>.Context c) throws Exception {
super.startBundle(c);
Options options = c.getPipelineOptions().as(Options.class);
merger = options.getVariantMergeStrategy().newInstance();
}

static final Ordering<Variant> BY_START = Ordering.natural().onResultOf(
new Function<Variant, Long>() {
@Override
public Long apply(Variant variant) {
return variant.getStart();
}
});
@Override
public void processElement(ProcessContext context) throws Exception {
merger.merge(context.element(), new DataflowVariantEmitter(context));
}
}

static final Ordering<Variant> BY_FIRST_OF_ALTERNATE_BASES = Ordering.natural()
.nullsFirst().onResultOf(new Function<Variant, String>() {
@Override
public String apply(Variant variant) {
if (null == variant.getAlternateBasesList() || variant.getAlternateBasesList().isEmpty()) {
return null;
}
return variant.getAlternateBases(0);
}
});
public static class DataflowVariantEmitter implements VariantEmitterStrategy {
private final DoFn<Iterable<Variant>, Variant>.ProcessContext context;

// Special-purpose comparator for use in dealing with both variant and non-variant segment data.
// Sort by start position ascending and ensure that if a variant and a ref-matching block are at
// the same position, the non-variant segment record comes first.
static final Comparator<Variant> NON_VARIANT_SEGMENT_COMPARATOR = BY_START
.compound(BY_FIRST_OF_ALTERNATE_BASES);
public DataflowVariantEmitter(DoFn<Iterable<Variant>, Variant>.ProcessContext context) {
this.context = context;
}

static final boolean isOverlapping(Variant blockRecord, Variant variant) {
return blockRecord.getStart() <= variant.getStart()
&& blockRecord.getEnd() >= variant.getStart() + 1;
@Override
public void emit(Variant variant) {
context.output(variant);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.cloud.dataflow.sdk.transforms.SimpleFunction;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.genomics.utils.Contig;
import com.google.genomics.v1.StreamReadsRequest;
import com.google.genomics.v1.StreamVariantsRequest;

import java.util.logging.Logger;
Expand All @@ -45,7 +46,6 @@
* </ol>
*
* The fields may be comma, tab, or whitespace delimited.
*
*/
public class SitesToShards {

Expand All @@ -62,6 +62,9 @@ public static interface Options extends PipelineOptions {

private static final Pattern SITE_PATTERN = Pattern.compile("^\\s*([\\w\\.]+)\\W+(\\d+)\\W+(\\d+).*$");

/**
* Given a string encoding a site, parse it into a Contig object.
*/
public static class SitesToContigsFn extends DoFn<String, Contig> {

@Override
Expand All @@ -78,41 +81,95 @@ public void processElement(DoFn<String, Contig>.ProcessContext context) throws E
}
}

/**
* Given a contig object and request prototype, construct a request spanning the region
* defined by the contig.
*/
public static class ContigsToStreamVariantsRequestsFn extends
SimpleFunction<Contig, StreamVariantsRequest> {

private final String variantSetId;
private final StreamVariantsRequest prototype;

public ContigsToStreamVariantsRequestsFn(String variantSetId) {
public ContigsToStreamVariantsRequestsFn(StreamVariantsRequest prototype) {
super();
this.variantSetId = variantSetId;
this.prototype = prototype;
}

@Override
public StreamVariantsRequest apply(Contig contig) {
if (null == contig) {
return null;
}
return contig.getStreamVariantsRequest(variantSetId);
return contig.getStreamVariantsRequest(prototype);
}

}

/**
* Use this transform when you have file(s) of sites that should be converted into
* streaming requests that each span the region for a site.
*/
public static class SitesToStreamVariantsShardsTransform extends
PTransform<PCollection<String>, PCollection<StreamVariantsRequest>> {

private final String variantSetId;
private final StreamVariantsRequest prototype;

public SitesToStreamVariantsShardsTransform(String variantSetId) {
public SitesToStreamVariantsShardsTransform(StreamVariantsRequest prototype) {
super();
this.variantSetId = variantSetId;
this.prototype = prototype;
}

@Override
public PCollection<StreamVariantsRequest> apply(PCollection<String> lines) {
return lines.apply(ParDo.of(new SitesToContigsFn()))
.apply("Contigs to StreamVariantsRequests",
MapElements.via(new ContigsToStreamVariantsRequestsFn(variantSetId)));
MapElements.via(new ContigsToStreamVariantsRequestsFn(prototype)));
}
}

/**
* Given a contig object and request prototype, construct a request spanning the region
* defined by the contig.
*/
public static class ContigsToStreamReadsRequestsFn extends
SimpleFunction<Contig, StreamReadsRequest> {

private final StreamReadsRequest prototype;

public ContigsToStreamReadsRequestsFn(StreamReadsRequest prototype) {
super();
this.prototype = prototype;
}

@Override
public StreamReadsRequest apply(Contig contig) {
if (null == contig) {
return null;
}
return contig.getStreamReadsRequest(prototype);
}

}

/**
* Use this transform when you have file(s) of sites that should be converted into
* streaming requests that each span the region for a site.
*/
public static class SitesToStreamReadsShardsTransform extends
PTransform<PCollection<String>, PCollection<StreamReadsRequest>> {

private final StreamReadsRequest prototype;

public SitesToStreamReadsShardsTransform(StreamReadsRequest prototype) {
super();
this.prototype = prototype;
}

@Override
public PCollection<StreamReadsRequest> apply(PCollection<String> lines) {
return lines.apply(ParDo.of(new SitesToContigsFn()))
.apply("Contigs to StreamReadsRequests",
MapElements.via(new ContigsToStreamReadsRequestsFn(prototype)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.genomics.dataflow.utils.CallFilters;
import com.google.cloud.genomics.dataflow.utils.PairGenerator;
import com.google.cloud.genomics.utils.grpc.VariantUtils;
import com.google.cloud.genomics.utils.grpc.VariantCallUtils;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.genomics.v1.Variant;
Expand Down Expand Up @@ -56,7 +56,7 @@ public void processElement(ProcessContext context) {
CallSimilarityCalculator callSimilarityCalculator =
callSimilarityCalculatorFactory.get(isReferenceMajor(variant));
for (KV<VariantCall, VariantCall> pair : PairGenerator.WITHOUT_REPLACEMENT.allPairs(
getSamplesWithVariant(variant), VariantUtils.CALL_COMPARATOR)) {
getSamplesWithVariant(variant), VariantCallUtils.CALL_COMPARATOR)) {
accumulateCallSimilarity(callSimilarityCalculator, pair.getKey(), pair.getValue());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.cloud.genomics.dataflow.coders.GenericJsonCoder;
import com.google.cloud.genomics.dataflow.utils.AnnotationUtils;
import com.google.cloud.genomics.dataflow.utils.AnnotationUtils.VariantEffect;
import com.google.cloud.genomics.dataflow.utils.CallSetNamesOptions;
import com.google.cloud.genomics.dataflow.utils.GCSOutputOptions;
import com.google.cloud.genomics.dataflow.utils.GenomicsOptions;
import com.google.cloud.genomics.dataflow.utils.ShardOptions;
Expand Down Expand Up @@ -87,20 +88,25 @@
*/
public final class AnnotateVariants extends DoFn<StreamVariantsRequest, KV<String, VariantAnnotation>> {

public static interface Options extends ShardOptions, GCSOutputOptions {
public static interface Options extends
// Options for call set names.
CallSetNamesOptions,
// Options for calculating over regions, chromosomes, or whole genomes.
ShardOptions,
// Options for the output destination.
GCSOutputOptions {

@Override
@Description("The ID of the Google Genomics variant set this pipeline is accessing. "
+ "Defaults to 1000 Genomes.")
@Default.String("10473108253681171589")
String getVariantSetId();

void setVariantSetId(String variantSetId);

@Description("The IDs of the Google Genomics call sets this pipeline is working with, comma "
+ "delimited.Defaults to 1000 Genomes HG00261.")
@Default.String("10473108253681171589-0")
String getCallSetIds();
void setCallSetIds(String callSetIds);
@Override
@Description("The names of the Google Genomics call sets this pipeline is working with, comma "
+ "delimited. Defaults to 1000 Genomes HG00261.")
@Default.String("HG00261")
String getCallSetNames();

@Description("The IDs of the Google Genomics transcript sets this pipeline is working with, "
+ "comma delimited. Defaults to UCSC refGene (hg19).")
Expand Down Expand Up @@ -296,23 +302,22 @@ public static void main(String[] args) throws Exception {
// Option validation is not yet automatic, we make an explicit call here.
Options.Methods.validateOptions(options);

// Set up the prototype request and auth.
StreamVariantsRequest prototype = CallSetNamesOptions.Methods.getRequestPrototype(options);
OfflineAuth auth = GenomicsOptions.Methods.getGenomicsAuth(options);
Genomics genomics = GenomicsFactory.builder().build().fromOfflineAuth(auth);

List<String> callSetIds = ImmutableList.of();
if (!Strings.isNullOrEmpty(options.getCallSetIds().trim())) {
callSetIds = ImmutableList.copyOf(options.getCallSetIds().split(","));
}
List<String> callSetIds = CallSetNamesOptions.Methods.getCallSetIds(options);
List<String> transcriptSetIds =
validateAnnotationSetsFlag(genomics, options.getTranscriptSetIds(), "TRANSCRIPT");
List<String> variantAnnotationSetIds =
validateAnnotationSetsFlag(genomics, options.getVariantAnnotationSetIds(), "VARIANT");
validateRefsetForAnnotationSets(genomics, transcriptSetIds);

List<StreamVariantsRequest> requests = options.isAllReferences() ?
ShardUtils.getVariantRequests(options.getVariantSetId(), ShardUtils.SexChromosomeFilter.EXCLUDE_XY,
ShardUtils.getVariantRequests(prototype, ShardUtils.SexChromosomeFilter.EXCLUDE_XY,
options.getBasesPerShard(), auth) :
ShardUtils.getVariantRequests(options.getVariantSetId(), options.getReferences(), options.getBasesPerShard());
ShardUtils.getVariantRequests(prototype, options.getBasesPerShard(), options.getReferences());

Pipeline p = Pipeline.create(options);
p.getCoderRegistry().setFallbackCoderProvider(GenericJsonCoder.PROVIDER);
Expand Down
Loading

0 comments on commit 41a63b3

Please sign in to comment.