Skip to content

Commit

Permalink
Iterating on FeedRange Api for Java SDK
Browse files Browse the repository at this point in the history
  • Loading branch information
FabianMeiswinkel committed Jul 10, 2020
1 parent 56519f4 commit 52e182a
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -314,5 +314,6 @@ public static class SubStatusCodes {
public static class HeaderValues {
public static final String NO_CACHE = "no-cache";
public static final String PREFER_RETURN_MINIMAL = "return=minimal";
public static final String IF_NONE_MATCH_ALL = "*";
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,48 @@
package com.azure.cosmos.implementation.changefeed.implementation;

import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
import com.azure.cosmos.implementation.feedranges.FeedRangeRequestMessagePopulatorVisitor;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.FeedRange;

import java.time.Instant;

public final class ChangeFeedRequestOptionsImpl {
public static void populateRequestOptions(
CosmosChangeFeedRequestOptions requestOptions,
RxDocumentServiceRequest request,
ChangeFeedStartFromInternal startFromInternal,
FeedRangeInternal feedRange)
{
// TODO fabianm - Implement
FeedRangeInternal feedRange) {
if (requestOptions == null) {
throw new NullPointerException("requestOptions");
}

if (request == null) {
throw new NullPointerException("request");
}

if (startFromInternal == null) {
throw new NullPointerException("startFromInternal");
}

final PopulateStartFromRequestOptionVisitor populateRequestOptionsVisitor =
new PopulateStartFromRequestOptionVisitor(request);
startFromInternal.accept(populateRequestOptionsVisitor);

Integer maxItemCount = requestOptions.getMaxItemCount();
if (maxItemCount != null) {
request.getHeaders().put(
HttpConstants.HttpHeaders.PAGE_SIZE,
maxItemCount.toString());
}

if (feedRange != null) {
final FeedRangeRequestMessagePopulatorVisitor feedRangeVisitor =
new FeedRangeRequestMessagePopulatorVisitor(request);
feedRange.accept(feedRangeVisitor);
}

request.getHeaders().put(
HttpConstants.HttpHeaders.A_IM,
HttpConstants.A_IMHeaderValues.INCREMENTAL_FEED);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.azure.cosmos.implementation.changefeed.implementation;

import com.azure.cosmos.implementation.Strings;

class ChangeFeedStartFromContinuationImpl extends ChangeFeedStartFromInternal {
private final String continuation;

Expand All @@ -10,6 +12,11 @@ public ChangeFeedStartFromContinuationImpl(String continuation) {
throw new NullPointerException("continuation");
}

if (Strings.isNullOrWhiteSpace(continuation)) {
throw new IllegalArgumentException(
"Continuation token must not be empty.");
}

this.continuation = continuation;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.azure.cosmos.implementation.changefeed.implementation;

import java.time.Instant;
import java.time.format.DateTimeFormatter;

import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;

class PopulateStartFromRequestOptionVisitor extends ChangeFeedStartFromVisitor {
private static final long START_FROM_BEGINNING_EPOCH_SECONDS = -62135596800L;
private static final Instant START_FROM_BEGINNING_TIME =
Instant.ofEpochSecond(START_FROM_BEGINNING_EPOCH_SECONDS);

private final RxDocumentServiceRequest request;

public PopulateStartFromRequestOptionVisitor(final RxDocumentServiceRequest request) {
if (request == null) {
throw new NullPointerException("request");
}

this.request = request;
}

@Override
public void Visit(ChangeFeedStartFromNowImpl startFromNow) {
this.request.getHeaders().put(
HttpConstants.HttpHeaders.IF_NONE_MATCH,
HttpConstants.HeaderValues.IF_NONE_MATCH_ALL);
}

@Override
public void Visit(ChangeFeedStartFromPointInTimeImpl startFromTime) {
// Our current public contract for ChangeFeedProcessor uses DateTime.MinValue.ToUniversalTime as beginning.
// We need to add a special case here, otherwise it would send it as normal StartTime.
// The problem is Multi master accounts do not support StartTime header on ReadFeed, and thus,
// it would break multi master Change Feed Processor users using Start From Beginning semantics.
// It's also an optimization, since the backend won't have to binary search for the value.
Instant pointInTime = startFromTime.getPointInTime();
if (pointInTime != START_FROM_BEGINNING_TIME)
{
this.request.getHeaders().put(
HttpConstants.HttpHeaders.IF_MODIFIED_SINCE,
DateTimeFormatter.RFC_1123_DATE_TIME.format(pointInTime));
}
}

@Override
public void Visit(ChangeFeedStartFromContinuationImpl startFromContinuation) {
// On REST level, change feed is using IfNoneMatch/ETag instead of continuation
this.request.getHeaders().put(
HttpConstants.HttpHeaders.IF_NONE_MATCH,
startFromContinuation.getContinuation());
}

@Override
public void Visit(ChangeFeedStartFromBeginningImpl startFromBeginning) {
// We don't need to set any headers to start from the beginning
}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
package com.azure.cosmos.models;

import com.azure.cosmos.implementation.ChangeFeedOptions;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.changefeed.implementation.ChangeFeedRequestOptionsImpl;
import com.azure.cosmos.implementation.changefeed.implementation.ChangeFeedStartFromInternal;
import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation;
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;

public final class CosmosChangeFeedRequestOptions {
private static final Integer DEFAULT_MAX_ITEM_COUNT = 1000;

private Integer maxItemCount;
private FeedRangeInternal feedRangeInternal;
private Integer maxItemCount;
private ChangeFeedStartFromInternal startFromInternal;

private CosmosChangeFeedRequestOptions(
Expand All @@ -39,15 +36,6 @@ public Integer getMaxItemCount() {
return this.maxItemCount;
}

void populateRequestOptions(RxDocumentServiceRequest request)
{
ChangeFeedRequestOptionsImpl.populateRequestOptions(
request,
this.startFromInternal,
this.feedRangeInternal
);
}

/**
* Sets the maximum number of items to be returned in the enumeration
* operation.
Expand All @@ -70,11 +58,41 @@ public static CosmosChangeFeedRequestOptions createForProcessingFromBeginning(Fe
ChangeFeedStartFromInternal.createFromBeginning());
}

public static CosmosChangeFeedRequestOptions createForProcessingFromContinuation(
String continuation) {

if (continuation == null) {
throw new NullPointerException("continuation");
}

final FeedRangeContinuation feedRangeContinuation =
FeedRangeContinuation.tryParse(continuation);

if (feedRangeContinuation == null) {
final String message = String.format(
"The provided string '%s' does not represent any known format.",
continuation);
throw new IllegalArgumentException(message);
}

final FeedRangeInternal feedRange = feedRangeContinuation.getFeedRangeInternal();
final String continuationToken = feedRangeContinuation.getContinuation();
if (continuation != null) {
return new CosmosChangeFeedRequestOptions(
feedRange,
ChangeFeedStartFromInternal.createFromContinuation(continuationToken));
}

return new CosmosChangeFeedRequestOptions(
feedRange,
ChangeFeedStartFromInternal.createFromBeginning());
}

public static CosmosChangeFeedRequestOptions createForProcessingFromNow(FeedRange feedRange) {
if (feedRange == null) {
throw new NullPointerException("feedRange");
}

return new CosmosChangeFeedRequestOptions(
FeedRangeInternal.convert(feedRange),
ChangeFeedStartFromInternal.createFromNow());
Expand All @@ -97,8 +115,12 @@ public static CosmosChangeFeedRequestOptions createForProcessingFromPointInTime(
ChangeFeedStartFromInternal.createFromPointInTime(pointInTime));
}

public static CosmosChangeFeedRequestOptions createForProcessingFromContinuation(String continuationToken) {
// TODO fabianm - Implement
return null;
void populateRequestOptions(RxDocumentServiceRequest request) {
ChangeFeedRequestOptionsImpl.populateRequestOptions(
this,
request,
this.startFromInternal,
this.feedRangeInternal
);
}
}

0 comments on commit 52e182a

Please sign in to comment.