Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1063 from zalando/aruha-2306-revert-2
Browse files Browse the repository at this point in the history
revert zookeeper lock PR
  • Loading branch information
adyach authored Jun 12, 2019
2 parents 77add95 + 1aa7dc4 commit 162ecd1
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,9 @@ public void testDeleteSubscription() throws Exception {
assertThat(
CURATOR.checkExists().forPath(format("/nakadi/subscriptions/{0}", subscription.getId())),
not(nullValue()));
assertThat(
CURATOR.checkExists().forPath(format("/nakadi/locks/subscription_{0}", subscription.getId())),
not(nullValue()));

// delete subscription
when().delete("/subscriptions/{sid}", subscription.getId())
Expand All @@ -408,6 +411,9 @@ public void testDeleteSubscription() throws Exception {
assertThat(
CURATOR.checkExists().forPath(format("/nakadi/subscriptions/{0}", subscription.getId())),
nullValue());
assertThat(
CURATOR.checkExists().forPath(format("/nakadi/locks/subscription_{0}", subscription.getId())),
nullValue());
}

@Test
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/org/zalando/nakadi/config/NakadiConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.zalando.nakadi.exceptions.runtime.DuplicatedStorageException;
import org.zalando.nakadi.repository.db.StorageDbRepository;
import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder;
import org.zalando.nakadi.repository.zookeeper.ZooKeeperLockFactory;
import org.zalando.nakadi.service.StorageService;

import java.util.Optional;
Expand All @@ -33,6 +34,11 @@ public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor();
}

@Bean
public ZooKeeperLockFactory zooKeeperLockFactory(final ZooKeeperHolder zooKeeperHolder) {
return new ZooKeeperLockFactory(zooKeeperHolder);
}

@Bean
@Qualifier("default_storage")
public DefaultStorage defaultStorage(final StorageDbRepository storageDbRepository,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.zalando.nakadi.repository.zookeeper;

import com.google.common.collect.ImmutableList;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.zookeeper.KeeperException;
import org.echocat.jomon.runtime.concurrent.RetryForSpecifiedCountStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException;

import static org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode.BUILD_INITIAL_CACHE;
import static org.echocat.jomon.runtime.concurrent.Retryer.executeWithRetry;

public class ZkChildrenCache extends PathChildrenCache {

private static final Logger LOG = LoggerFactory.getLogger(ZkChildrenCache.class);

public static final int MAX_NUMBER_OF_RETRIES = 5;
public static final int WAIT_BETWEEN_TRIES_MS = 100;

public ZkChildrenCache(final CuratorFramework client, final String path) {
super(client, path, false);
}

@Override
public void start() throws Exception {
try {
super.start(BUILD_INITIAL_CACHE);
} catch (final Exception e) {
close();
throw e;
}
}

public static ZkChildrenCache createCache(final CuratorFramework client, final String key) {
try {
// in some rare case the cache start can fail because the node can be removed
// in specific moment by other thread/instance, then we need to retry
return executeWithRetry(
() -> {
final ZkChildrenCache newCache = new ZkChildrenCache(client, key);
try {
newCache.start();
return newCache;
} catch (final KeeperException.NoNodeException e) {
throw e; // throw it to activate retry
} catch (final Exception e) {
throw new NakadiRuntimeException(e);
}
},
new RetryForSpecifiedCountStrategy<ZkChildrenCache>(MAX_NUMBER_OF_RETRIES)
.withExceptionsThatForceRetry(ImmutableList.of(KeeperException.NoNodeException.class))
.withWaitBetweenEachTry(WAIT_BETWEEN_TRIES_MS));
} catch (final Exception e) {
LOG.error("Zookeeper error when creating cache for children", e);
throw new NakadiRuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,7 @@
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.util.Arrays;
import java.util.Collection;

Expand Down Expand Up @@ -77,28 +71,6 @@ public CuratorFramework get() {
return zooKeeper;
}


public Closeable newZookeeperLock(final String lockObject, final long timeoutMs) throws RuntimeException {
try {
final ZookeeperLock zookeeperLock = new ZookeeperLock(new ZooKeeper(
zooKeeper.getZookeeperClient().getCurrentConnectionString(),
sessionTimeoutMs,
new NakadiZookeeperWatcher()));
return zookeeperLock.tryLock(lockObject, timeoutMs);
} catch (final Exception e) {
throw new RuntimeException("Failed to get zookeeper client", e);
}
}

private static class NakadiZookeeperWatcher implements Watcher {
private static final Logger LOG = LoggerFactory.getLogger(NakadiZookeeperWatcher.class);

@Override
public void process(final WatchedEvent event) {
LOG.debug("{}", event);
}
}

private class ExhibitorEnsembleProvider extends org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider {

ExhibitorEnsembleProvider(final Exhibitors exhibitors, final ExhibitorRestClient restClient,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.zalando.nakadi.repository.zookeeper;

import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;

public class ZooKeeperLockFactory {

private final ZooKeeperHolder zkHolder;

public ZooKeeperLockFactory(final ZooKeeperHolder zkHolder) {
this.zkHolder = zkHolder;
}

public InterProcessLock createLock(final String path) {
return new InterProcessSemaphoreMutex(zkHolder.get(), path);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.zalando.nakadi.repository.zookeeper;

import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Callable;

public class ZookeeperUtils {

private static final Logger LOG = LoggerFactory.getLogger(ZookeeperUtils.class);

private ZookeeperUtils() {
}

public static <V> V runLocked(final Callable<V> callable, final InterProcessLock lock) throws Exception {
lock.acquire();
try {
return callable.call();
} finally {
try {
lock.release();
} catch (final Exception e) {
LOG.warn("Error occurred when releasing ZK lock", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
import org.zalando.nakadi.exceptions.runtime.UnableProcessException;
import org.zalando.nakadi.exceptions.runtime.ZookeeperException;
import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder;
import org.zalando.nakadi.service.subscription.model.Session;
import org.zalando.nakadi.view.SubscriptionCursorWithoutToken;

Expand All @@ -47,30 +46,30 @@
import static org.echocat.jomon.runtime.concurrent.Retryer.executeWithRetry;

public abstract class AbstractZkSubscriptionClient implements ZkSubscriptionClient {
public static final int TIMEOUT_AQUIRE_LOCK_MS = 15_000;
public static final int SECONDS_TO_WAIT_FOR_LOCK = 15;
protected static final String NODE_TOPOLOGY = "/topology";
private static final String STATE_INITIALIZED = "INITIALIZED";
private static final int COMMIT_CONFLICT_RETRY_TIMES = 5;
private static final int MAX_ZK_RESPONSE_SECONDS = 5;

private final String subscriptionId;
private final ZooKeeperHolder zooKeeperHolder;
private final CuratorFramework curatorFramework;
private final String resetCursorPath;
private final Logger log;
private InterProcessSemaphoreMutex lock;

public AbstractZkSubscriptionClient(
final String subscriptionId,
final ZooKeeperHolder zooKeeperHolder,
final CuratorFramework curatorFramework,
final String loggingPath) {
this.subscriptionId = subscriptionId;
this.zooKeeperHolder = zooKeeperHolder;
this.curatorFramework = curatorFramework;
this.resetCursorPath = getSubscriptionPath("/cursor_reset");
this.log = LoggerFactory.getLogger(loggingPath + ".zk");
}

protected CuratorFramework getCurator() throws RuntimeException {
return this.zooKeeperHolder.get();
protected CuratorFramework getCurator() {
return this.curatorFramework;
}

protected String getSubscriptionId() {
Expand All @@ -81,14 +80,42 @@ protected String getSubscriptionPath(final String value) {
return "/nakadi/subscriptions/" + subscriptionId + value;
}

protected String getSubscriptionLockPath() {
return "/nakadi/locks/subscription_" + subscriptionId;
}

protected Logger getLog() {
return log;
}

@Override
public final <T> T runLocked(final Callable<T> function) {
try (Closeable closeable = zooKeeperHolder.newZookeeperLock(subscriptionId, TIMEOUT_AQUIRE_LOCK_MS)) {
return function.call();
try {
Exception releaseException = null;
if (null == lock) {
lock = new InterProcessSemaphoreMutex(curatorFramework, getSubscriptionLockPath());
}

final boolean acquired = lock.acquire(SECONDS_TO_WAIT_FOR_LOCK, TimeUnit.SECONDS);
if (!acquired) {
throw new ServiceTemporarilyUnavailableException("Failed to acquire subscription lock within " +
SECONDS_TO_WAIT_FOR_LOCK + " seconds");
}
final T result;
try {
result = function.call();
} finally {
try {
lock.release();
} catch (final Exception e) {
log.error("Failed to release lock", e);
releaseException = e;
}
}
if (releaseException != null) {
throw releaseException;
}
return result;
} catch (final NakadiRuntimeException | NakadiBaseException e) {
throw e;
} catch (final Exception e) {
Expand All @@ -100,6 +127,7 @@ public final <T> T runLocked(final Callable<T> function) {
public final void deleteSubscription() {
try {
getCurator().delete().guaranteed().deletingChildrenIfNeeded().forPath(getSubscriptionPath(""));
getCurator().delete().guaranteed().deletingChildrenIfNeeded().forPath(getSubscriptionLockPath());
} catch (final KeeperException.NoNodeException nne) {
getLog().warn("Subscription to delete is not found in Zookeeper: {}", subscriptionId);
} catch (final Exception e) {
Expand Down
Loading

0 comments on commit 162ecd1

Please sign in to comment.