Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #576 from zalando/ARUHA-657
Browse files Browse the repository at this point in the history
ARUHA-657 Fix committing to subscription for closed stream
  • Loading branch information
adyach authored Mar 6, 2017
2 parents 6b0fbb0 + a9b3200 commit d0863a7
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 69 deletions.
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#Fri Jul 08 16:30:40 CEST 2016
#Fri Feb 24 13:27:52 CET 2017
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-2.14-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-2.14.1-bin.zip
46 changes: 25 additions & 21 deletions gradlew
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,30 @@
##
##############################################################################

# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS=""
# Attempt to set APP_HOME
# Resolve links: $0 may be a link
PRG="$0"
# Need this for relative symlinks.
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG=`dirname "$PRG"`"/$link"
fi
done
SAVED="`pwd`"
cd "`dirname \"$PRG\"`/" >/dev/null
APP_HOME="`pwd -P`"
cd "$SAVED" >/dev/null

APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"`

# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS=""

# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"

Expand All @@ -30,6 +48,7 @@ die ( ) {
cygwin=false
msys=false
darwin=false
nonstop=false
case "`uname`" in
CYGWIN* )
cygwin=true
Expand All @@ -40,26 +59,11 @@ case "`uname`" in
MINGW* )
msys=true
;;
NONSTOP* )
nonstop=true
;;
esac

# Attempt to set APP_HOME
# Resolve links: $0 may be a link
PRG="$0"
# Need this for relative symlinks.
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG=`dirname "$PRG"`"/$link"
fi
done
SAVED="`pwd`"
cd "`dirname \"$PRG\"`/" >/dev/null
APP_HOME="`pwd -P`"
cd "$SAVED" >/dev/null

CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar

# Determine the Java command to use to start the JVM.
Expand All @@ -85,7 +89,7 @@ location of your Java installation."
fi

# Increase the maximum file descriptors if we can.
if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then
if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then
MAX_FD_LIMIT=`ulimit -H -n`
if [ $? -eq 0 ] ; then
if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
Expand Down
6 changes: 3 additions & 3 deletions gradlew.bat
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
@rem Set local scope for the variables with windows NT shell
if "%OS%"=="Windows_NT" setlocal

@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS=

set DIRNAME=%~dp0
if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%

@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS=

@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.http.HttpStatus;
Expand Down Expand Up @@ -73,6 +74,25 @@ public void before() throws IOException {
this.subscription = createSubscription(subscription);
}

@Test(timeout = 10000)
public void whenStreamTimeoutReachedPossibleToCommit() throws Exception {
final TestStreamingClient client = TestStreamingClient
.create(URL, subscription.getId(), "batch_limit=1&stream_limit=2&stream_timeout=1")
.start();
waitFor(() -> assertThat(client.getSessionId(), not(equalTo(SESSION_ID_UNKNOWN))));

publishEvent(eventType.getName(),"{\"foo\":\"bar\"}");
waitFor(() -> Assert.assertFalse(client.getBatches().isEmpty()), TimeUnit.SECONDS.toMillis(2), 100);
final SubscriptionCursor toCommit = client.getBatches().get(0).getCursor();
client.close(); // connection is closed, and stream as well
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
final int statusCode = commitCursors(
subscription.getId(),
Collections.singletonList(toCommit),
client.getSessionId());
Assert.assertEquals(SC_NO_CONTENT, statusCode);
}

@Test(timeout = 30000)
public void whenOffsetIsCommittedNextSessionStartsFromNextEventAfterCommitted() throws Exception {
// write 4 events to event-type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class TestStreamingClient implements Runnable {
private volatile boolean running;

private final List<StreamBatch> batches;
private InputStream inputStream;
private HttpURLConnection connection;
private String sessionId;
private Optional<String> token;
private volatile int responseCode;
Expand Down Expand Up @@ -60,17 +60,17 @@ public static TestStreamingClient create(final String baseUrl, final String subs
public void run() {
try {
final String url = format("{0}/subscriptions/{1}/events?{2}", baseUrl, subscriptionId, params);
final HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection();
token.ifPresent(token -> conn.setRequestProperty("Authorization", "Bearer " + token));
responseCode = conn.getResponseCode();
conn.getHeaderFields().entrySet().stream()
connection = (HttpURLConnection) new URL(url).openConnection();
token.ifPresent(token -> connection.setRequestProperty("Authorization", "Bearer " + token));
responseCode = connection.getResponseCode();
connection.getHeaderFields().entrySet().stream()
.filter(entry -> entry.getKey() != null)
.forEach(entry -> headers.put(entry.getKey(), entry.getValue()));
if (responseCode != HttpURLConnection.HTTP_OK) {
throw new IOException("Response code is " + responseCode);
}
sessionId = conn.getHeaderField("X-Nakadi-StreamId");
inputStream = conn.getInputStream();
sessionId = connection.getHeaderField("X-Nakadi-StreamId");
final InputStream inputStream = connection.getInputStream();
final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
running = true;

Expand Down Expand Up @@ -118,9 +118,7 @@ public TestStreamingClient start() {
public boolean close() {
if (running) {
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
connection.disconnect();
} finally {
running = false;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
package org.zalando.nakadi.service.subscription.state;

import org.zalando.nakadi.service.subscription.model.Partition;
import org.zalando.nakadi.service.subscription.zk.ZKSubscription;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.zalando.nakadi.service.subscription.model.Partition;
import org.zalando.nakadi.service.subscription.zk.ZKSubscription;

class ClosingState extends State {
private final Map<Partition.PartitionKey, Long> uncommitedOffsets;
private final Supplier<Map<Partition.PartitionKey, Long>> uncommittedOffsetsSupplier;
private final LongSupplier lastCommitSupplier;
private Map<Partition.PartitionKey, Long> uncommittedOffsets;
private final Map<Partition.PartitionKey, ZKSubscription> listeners = new HashMap<>();
private final long lastCommitMillis;
private ZKSubscription topologyListener;

ClosingState(final Map<Partition.PartitionKey, Long> uncommitedOffsets, final long lastCommitMillis) {
this.uncommitedOffsets = uncommitedOffsets;
this.lastCommitMillis = lastCommitMillis;
ClosingState(final Supplier<Map<Partition.PartitionKey, Long>> uncommittedOffsetsSupplier,
final LongSupplier lastCommitSupplier) {
this.uncommittedOffsetsSupplier = uncommittedOffsetsSupplier;
this.lastCommitSupplier = lastCommitSupplier;
}

@Override
Expand All @@ -39,9 +43,10 @@ public void onExit() {

@Override
public void onEnter() {
final long timeToWaitMillis = getParameters().commitTimeoutMillis - (System.currentTimeMillis()
- lastCommitMillis);
if (timeToWaitMillis > 0) {
final long timeToWaitMillis = getParameters().commitTimeoutMillis -
(System.currentTimeMillis() - lastCommitSupplier.getAsLong());
uncommittedOffsets = uncommittedOffsetsSupplier.get();
if (!uncommittedOffsets.isEmpty() && timeToWaitMillis > 0) {
scheduleTask(() -> switchState(new CleanupState()), timeToWaitMillis, TimeUnit.MILLISECONDS);
topologyListener = getZk().subscribeForTopologyChanges(() -> addTask(this::onTopologyChanged));
reactOnTopologyChange();
Expand Down Expand Up @@ -72,20 +77,20 @@ private void reactOnTopologyChange() {
final Set<Partition.PartitionKey> addListeners = new HashSet<>();
for (final Partition p : partitions.values()) {
if (Partition.State.REASSIGNING.equals(p.getState())) {
if (!uncommitedOffsets.containsKey(p.getKey())) {
if (!uncommittedOffsets.containsKey(p.getKey())) {
freeRightNow.add(p.getKey());
} else {
if (!listeners.containsKey(p.getKey())) {
addListeners.add(p.getKey());
}
}
} else { // ASSIGNED
if (uncommitedOffsets.containsKey(p.getKey()) && !listeners.containsKey(p.getKey())) {
if (uncommittedOffsets.containsKey(p.getKey()) && !listeners.containsKey(p.getKey())) {
addListeners.add(p.getKey());
}
}
}
uncommitedOffsets.keySet().stream().filter(p -> !partitions.containsKey(p)).forEach(freeRightNow::add);
uncommittedOffsets.keySet().stream().filter(p -> !partitions.containsKey(p)).forEach(freeRightNow::add);
freePartitions(freeRightNow);
addListeners.forEach(this::registerListener);
tryCompleteState();
Expand All @@ -108,22 +113,22 @@ private void offsetChanged(final Partition.PartitionKey key) {

private void reactOnOffset(final Partition.PartitionKey key) {
final long newOffset = getZk().getOffset(key);
if (uncommitedOffsets.containsKey(key) && uncommitedOffsets.get(key) <= newOffset) {
if (uncommittedOffsets.containsKey(key) && uncommittedOffsets.get(key) <= newOffset) {
freePartitions(Collections.singletonList(key));
}
tryCompleteState();
}

private void tryCompleteState() {
if (uncommitedOffsets.isEmpty()) {
if (uncommittedOffsets.isEmpty()) {
switchState(new CleanupState());
}
}

private void freePartitions(final Collection<Partition.PartitionKey> keys) {
RuntimeException exceptionCaught = null;
for (final Partition.PartitionKey partitionKey : keys) {
uncommitedOffsets.remove(partitionKey);
uncommittedOffsets.remove(partitionKey);
final ZKSubscription listener = listeners.remove(partitionKey);
if (null != listener) {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.zalando.nakadi.service.subscription.state;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -38,6 +39,8 @@ class StreamingState extends State {
private long committedEvents;
private long sentEvents;
private long batchesSent;
// Uncommitted offsets are calculated right on exiting from Streaming state.
private Map<Partition.PartitionKey, Long> uncommittedOffsets;

@Override
public void onEnter() {
Expand Down Expand Up @@ -85,17 +88,18 @@ private void sendMetadata(final String metadata) {
.ifPresent(pk -> flushData(pk.getKey(), new TreeMap<>(), Optional.of(metadata)));
}

private long getLastCommitMillis() {
return lastCommitMillis;
}

private Map<Partition.PartitionKey, Long> getUncommittedOffsets() {
Preconditions.checkNotNull(uncommittedOffsets, "uncommittedOffsets should not be null on time of call");
return uncommittedOffsets;
}

private void shutdownGracefully(final String reason) {
getLog().info("Shutting down gracefully. Reason: {}", reason);

final Map<Partition.PartitionKey, Long> uncommitted = offsets.entrySet().stream()
.filter(e -> !e.getValue().isCommitted())
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getSentOffset()));
if (uncommitted.isEmpty()) {
switchState(new CleanupState());
} else {
switchState(new ClosingState(uncommitted, lastCommitMillis));
}
switchState(new ClosingState(this::getUncommittedOffsets, this::getLastCommitMillis));
}

private void pollDataFromKafka() {
Expand Down Expand Up @@ -222,6 +226,10 @@ private String serializeBatch(final Partition.PartitionKey partitionKey, final l

@Override
public void onExit() {
uncommittedOffsets = offsets.entrySet().stream()
.filter(e -> !e.getValue().isCommitted())
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getSentOffset()));

if (null != topologyChangeSubscription) {
try {
topologyChangeSubscription.cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,19 @@ public CuratorZkSubscriptionClient(final String subscriptionId, final CuratorFra

@Override
public void runLocked(final Runnable function) {
log.info("Taking lock for " + function.hashCode());
try {
Exception releaseException = null;

lock.acquire();
log.debug("Lock taken " + function.hashCode());
try {
function.run();
} finally {
log.info("Releasing lock for " + function.hashCode());
try {
lock.release();
} catch (final Exception e) {
log.error("Failed to release lock", e);
releaseException = e;
}
log.debug("Lock released " + function.hashCode());
}
if (releaseException != null) {
throw releaseException;
Expand Down
Loading

0 comments on commit d0863a7

Please sign in to comment.