diff --git a/coordination/pom.xml b/coordination/pom.xml new file mode 100644 index 0000000..cf8a6bc --- /dev/null +++ b/coordination/pom.xml @@ -0,0 +1,20 @@ + + + 4.0.0 + + tech.ydb.examples + ydb-sdk-examples + 1.1.0-SNAPSHOT + + + tech.ydb.coordination.examples + ydb-coordination-examples + + YDB SDK Coordination Service examples + + pom + + + recipes + + diff --git a/coordination/recipes/pom.xml b/coordination/recipes/pom.xml new file mode 100644 index 0000000..9a6ed9c --- /dev/null +++ b/coordination/recipes/pom.xml @@ -0,0 +1,58 @@ + + + 4.0.0 + + + tech.ydb.coordination.examples + ydb-coordination-examples + 1.1.0-SNAPSHOT + + + ydb-coordination-recipes-example + YDB Coordination Service recipes example + jar + + + + org.apache.logging.log4j + log4j-slf4j-impl + + + + tech.ydb + ydb-sdk-coordination + + + tech.ydb.auth + yc-auth-provider + + + + + jdbc-coordination-recipes-example + + + + org.apache.maven.plugins + maven-dependency-plugin + + + + org.apache.maven.plugins + maven-jar-plugin + + + + true + libs/ + tech.ydb.coordination.recipes.example.Main + + + + + + + + \ No newline at end of file diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/LockApp.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/LockApp.java new file mode 100644 index 0000000..c7b4b92 --- /dev/null +++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/LockApp.java @@ -0,0 +1,88 @@ +package tech.ydb.coordination.recipes.example; + +import tech.ydb.coordination.CoordinationClient; +import tech.ydb.coordination.recipes.example.lib.locks.InterProcessLock; +import tech.ydb.coordination.recipes.example.lib.locks.InterProcessMutex; + +import java.time.Duration; +import java.util.Scanner; + +public class LockApp { + + InterProcessLock lock; + + LockApp(CoordinationClient client) { + client.createNode("examples/app").join().expectSuccess("cannot create coordination path"); + lock = new InterProcessMutex( + client, + "examples/app", + "data".getBytes(), + "default_lock" + ); + } + + public void lock(Duration duration) { + try { + if (duration == null) { + lock.acquire(); + } else { + lock.acquire(duration); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + public void release() { + try { + lock.release(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private boolean isAcquired() { + return lock.isAcquiredInThisProcess(); + } + + public void run() { + Scanner scanner = new Scanner(System.in); + System.out.println("Enter commands: lock [seconds] | release | reconnect | ?"); + + while (scanner.hasNextLine()) { + String commandLine = scanner.nextLine().trim(); + String[] commandParts = commandLine.split("\\s+"); + String command = commandParts[0]; + + switch (command.toLowerCase()) { + case "lock": + int seconds = -1; + if (commandParts.length > 1) { + try { + seconds = Integer.parseInt(commandParts[1]); + } catch (NumberFormatException e) { + System.out.println("Invalid number format, defaulting to 0 seconds"); + } + } + if (seconds == -1) { + lock(null); + } else { + lock(Duration.ofSeconds(seconds)); + } + break; + case "release": + release(); + break; + case "?": + System.out.println("Lock is acquired: " + isAcquired()); + break; + default: + System.out.println("Unknown command: " + command); + } + } + + scanner.close(); + } + +} + diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/Main.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/Main.java new file mode 100644 index 0000000..a9c9579 --- /dev/null +++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/Main.java @@ -0,0 +1,30 @@ +package tech.ydb.coordination.recipes.example; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.auth.iam.CloudAuthHelper; +import tech.ydb.coordination.CoordinationClient; +import tech.ydb.core.grpc.GrpcTransport; + +public class Main { + private final static Logger logger = LoggerFactory.getLogger(Main.class); + + public static void main(String[] args) { + if (args.length != 1) { + System.err.println("Usage: java -jar jdbc-coordination-api-example.jar "); + return; + } + + try (GrpcTransport transport = GrpcTransport.forConnectionString(args[0]) + .withAuthProvider(CloudAuthHelper.getAuthProviderFromEnviron()) + .build()) { + + logger.info("run lock app example"); + CoordinationClient client = CoordinationClient.newClient(transport); + LockApp lockApp = new LockApp(client); + lockApp.run(); + logger.info("lock app example finished"); + } + } +} diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/election/LeaderElectionListener.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/election/LeaderElectionListener.java new file mode 100644 index 0000000..957f403 --- /dev/null +++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/election/LeaderElectionListener.java @@ -0,0 +1,5 @@ +package tech.ydb.coordination.recipes.example.lib.election; + +public interface LeaderElectionListener { + void takeLeadership() throws Exception; +} diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/election/LeaderElector.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/election/LeaderElector.java new file mode 100644 index 0000000..5ca863c --- /dev/null +++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/election/LeaderElector.java @@ -0,0 +1,194 @@ +package tech.ydb.coordination.recipes.example.lib.election; + +import java.io.Closeable; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import tech.ydb.coordination.CoordinationClient; +import tech.ydb.coordination.CoordinationSession; +import tech.ydb.coordination.recipes.example.lib.watch.Participant; +import tech.ydb.coordination.recipes.example.lib.locks.InterProcessMutex; +import tech.ydb.coordination.recipes.example.lib.watch.SemaphoreWatchAdapter; +import tech.ydb.coordination.recipes.example.lib.util.Listenable; +import tech.ydb.coordination.recipes.example.lib.util.ListenableProvider; + +public class LeaderElector implements Closeable, ListenableProvider { + private static final Logger logger = LoggerFactory.getLogger(LeaderElector.class); + + private final CoordinationClient client; + private final LeaderElectionListener leaderElectionListener; + private final String coordinationNodePath; + private final String semaphoreName; + private final ExecutorService electionExecutor; + private final InterProcessMutex lock; + private final SemaphoreWatchAdapter semaphoreWatchAdapter; + + private AtomicReference state = new AtomicReference<>(State.STARTED); + private volatile boolean autoRequeue = false; + private volatile boolean isLeader = false; + private Future electionTask = null; + + + private enum State { // TODO: needs third state (CREATED)? + STARTED, + CLOSED + } + + public LeaderElector( + CoordinationClient client, + LeaderElectionListener leaderElectionListener, + String coordinationNodePath, + String semaphoreName + ) { + this(client, leaderElectionListener, coordinationNodePath, semaphoreName, Executors.newSingleThreadExecutor()); + } + + public LeaderElector( + CoordinationClient client, + LeaderElectionListener leaderElectionListener, + String coordinationNodePath, + String semaphoreName, + ExecutorService executorService + ) { + this.client = client; + this.leaderElectionListener = leaderElectionListener; + this.coordinationNodePath = coordinationNodePath; + this.semaphoreName = semaphoreName; + this.electionExecutor = executorService; + this.lock = new InterProcessMutex( + client, + coordinationNodePath, + semaphoreName + ); + this.semaphoreWatchAdapter = new SemaphoreWatchAdapter(lock.getSession(), semaphoreName); + semaphoreWatchAdapter.start(); + } + + public boolean isLeader() { + return isLeader; + } + + public synchronized void interruptLeadership() { + Future task = electionTask; + if (task != null) { + task.cancel(true); + } + } + + /** + * Re-queue an attempt for leadership. If this instance is already queued, nothing + * happens and false is returned. If the instance was not queued, it is re-queued and true + * is returned + * + * @return true if re-enqueue was successful + */ + public boolean requeue() { + Preconditions.checkState(state.get() == State.STARTED, "Already closed or not yet started"); + + return enqueueElection(); + } + + public void autoRequeue() { + autoRequeue = true; + } + + private synchronized boolean enqueueElection() { + if (!isQueued() && state.get() == State.STARTED) { + electionTask = electionExecutor.submit(new Callable() { + @Override + public Void call() throws Exception { + try { + doWork(); + } finally { + finishTask(); + } + return null; + } + }); + return true; + } + + return false; + } + + private void doWork() throws Exception { + isLeader = false; + + try { + lock.acquire(); + isLeader = true; + try { + leaderElectionListener.takeLeadership(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw e; + } catch (Throwable e) { + logger.debug("takeLeadership exception", e); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw e; + } finally { + if (isLeader) { + isLeader = false; + boolean wasInterrupted = Thread.interrupted(); + try { + lock.release(); + } catch (Exception e) { + logger.error("Lock release exception for: " + coordinationNodePath); + } finally { + if (wasInterrupted) { + Thread.currentThread().interrupt(); + } + } + } + } + } + + private synchronized void finishTask() { + electionTask = null; + if (autoRequeue) { // TODO: requeue if critical exception? + enqueueElection(); + } + } + + private boolean isQueued() { + return electionTask != null; + } + + public List getParticipants() { + return semaphoreWatchAdapter.getParticipants(); + } + + public Optional getLeader() { + return semaphoreWatchAdapter.getOwners().stream().findFirst(); + } + + @Override + public synchronized void close() { + Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed"); + + Future task = electionTask; + if (task != null) { + task.cancel(true); + } + + electionTask = null; + electionExecutor.close(); + semaphoreWatchAdapter.close(); + getListenable().clearListeners(); + } + + @Override + public Listenable getListenable() { + return lock.getListenable(); + } +} diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/InterProcessLock.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/InterProcessLock.java new file mode 100644 index 0000000..b67d5ea --- /dev/null +++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/InterProcessLock.java @@ -0,0 +1,25 @@ +package tech.ydb.coordination.recipes.example.lib.locks; + +import java.time.Duration; + +import tech.ydb.coordination.CoordinationSession; +import tech.ydb.coordination.recipes.example.lib.util.Listenable; + +public interface InterProcessLock extends Listenable { + void acquire() throws Exception, LockAlreadyAcquiredException, LockAcquireFailedException; + + /** + * @return true - if successfully acquired lock, false - if lock waiting time expired + */ + boolean acquire(Duration waitDuration) throws Exception, LockAlreadyAcquiredException, LockAcquireFailedException; + + /** + * @return false if nothing to release + */ + boolean release() throws Exception; + + /** + * @return true if the lock is acquired by a thread in this JVM + */ + boolean isAcquiredInThisProcess(); +} diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/InterProcessMutex.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/InterProcessMutex.java new file mode 100644 index 0000000..11b18e3 --- /dev/null +++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/InterProcessMutex.java @@ -0,0 +1,251 @@ +package tech.ydb.coordination.recipes.example.lib.locks; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import javax.annotation.concurrent.ThreadSafe; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import tech.ydb.coordination.CoordinationClient; +import tech.ydb.coordination.CoordinationSession; +import tech.ydb.coordination.SemaphoreLease; +import tech.ydb.coordination.description.SemaphoreDescription; +import tech.ydb.coordination.recipes.example.lib.util.SessionListenerWrapper; +import tech.ydb.coordination.recipes.example.lib.util.Listenable; +import tech.ydb.coordination.recipes.example.lib.util.ListenableProvider; +import tech.ydb.coordination.settings.DescribeSemaphoreMode; +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; + +@ThreadSafe +public class InterProcessMutex implements InterProcessLock, ListenableProvider { + private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30); + private static final Logger logger = LoggerFactory.getLogger(InterProcessMutex.class); + + private final Lock leaseLock = new ReentrantLock(); + private final CoordinationSession session; + private final CompletableFuture sessionConnectionTask; + private final SessionListenerWrapper sessionListenerWrapper; + private final String semaphoreName; + private final String coordinationNodePath; + + private volatile SemaphoreLease processLease = null; + + public InterProcessMutex( + CoordinationClient client, + String coordinationNodePath, + String lockName + ) { + this.coordinationNodePath = coordinationNodePath; + this.session = client.createSession(coordinationNodePath); + this.sessionListenerWrapper = new SessionListenerWrapper(session); + this.semaphoreName = lockName; + + this.sessionConnectionTask = session.connect().thenApply(status -> { + logger.debug("Session connection status: " + status); + return status; + }); + session.addStateListener(state -> { + switch (state) { + case RECONNECTED: { + logger.debug("Session RECONNECTED"); + reconnect(); + break; + } + case CLOSED: { + logger.debug("Session CLOSED, releasing lock"); + internalRelease(); + break; + } + case LOST: { + logger.debug("Session LOST, releasing lock"); + internalRelease(); + break; + } + } + }); + } + + private CoordinationSession connectedSession() { + try { + sessionConnectionTask.get().expectSuccess("Unable to connect to session on: " + coordinationNodePath); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + return session; + } + + private void reconnect() { + connectedSession().describeSemaphore( + semaphoreName, + DescribeSemaphoreMode.WITH_OWNERS_AND_WAITERS + ).thenAccept(result -> { + if (!result.isSuccess()) { + logger.error("Unable to describe semaphore {}", semaphoreName); + return; + } + SemaphoreDescription semaphoreDescription = result.getValue(); + SemaphoreDescription.Session owner = semaphoreDescription.getOwnersList().getFirst(); + if (owner.getId() != session.getId()) { + logger.warn( + "Current session with id: {} lost lease after reconnection on semaphore: {}", + owner.getId(), + semaphoreName + ); + internalRelease(); + } + }); + } + + @Override + public void acquire() throws Exception { + logger.debug("Trying to acquire without timeout"); + safeAcquire(null); + } + + @Override + public boolean acquire(Duration duration) throws Exception { + logger.debug("Trying to acquire with deadline: {}", duration); + Instant deadline = Instant.now().plus(duration); + return safeAcquire(deadline); + } + + @Override + public boolean release() throws Exception { + return internalRelease().get(); + } + + private CompletableFuture internalRelease() { + logger.debug("Trying to release"); + if (processLease == null) { + logger.debug("Already released"); + return CompletableFuture.completedFuture(false); + } + + leaseLock.lock(); + try { + if (processLease != null) { + return processLease.release().thenApply(it -> { + logger.debug("Released lock"); + processLease = null; + leaseLock.unlock(); + return true; + }); + } + } finally { + leaseLock.unlock(); + } + + logger.debug("Already released"); + return CompletableFuture.completedFuture(false); + } + + @Override + public boolean isAcquiredInThisProcess() { + return processLease != null; + } + + // TODO: implement interruption + + /** + * @param deadline + * @return true - if successfully acquired lock + * @throws Exception + * @throws LockAlreadyAcquiredException + */ + private boolean safeAcquire(@Nullable Instant deadline) throws Exception, LockAlreadyAcquiredException { + if (processLease != null) { + logger.debug("Already acquired lock: {}", semaphoreName); + throw new LockAlreadyAcquiredException(semaphoreName); + } + + leaseLock.lock(); + try { + if (processLease != null) { + logger.debug("Already acquired lock: {}", semaphoreName); + throw new LockAlreadyAcquiredException(semaphoreName); + } + + SemaphoreLease lease = internalLock(deadline); + if (lease != null) { + processLease = lease; + logger.debug("Successfully acquired lock: {}", semaphoreName); + return true; + } + } finally { + leaseLock.unlock(); + } + + logger.debug("Unable to acquire lock: {}", semaphoreName); + return false; + } + + private SemaphoreLease internalLock(@Nullable Instant deadline) throws ExecutionException, InterruptedException { + int retryCount = 0; + while (connectedSession().getState().isActive() && (deadline == null || Instant.now().isBefore(deadline))) { + retryCount++; + + Duration timeout; + if (deadline == null) { + timeout = DEFAULT_TIMEOUT; + } else { + timeout = Duration.between(Instant.now(), deadline); // TODO: use external Clock instead of Instant? + } + CompletableFuture> acquireTask = connectedSession().acquireEphemeralSemaphore( + semaphoreName, true, null, timeout // TODO: change Session API to use deadlines + ); + Result leaseResult; + try { + leaseResult = acquireTask.get(); + } catch (InterruptedException e) { + // If acquire is interrupted, then release immediately + Thread.currentThread().interrupt(); + acquireTask.thenAccept(acquireResult -> { + if (!acquireResult.getStatus().isSuccess()) { + return; + } + SemaphoreLease lease = acquireResult.getValue(); + lease.release(); + }); + throw e; + } + + Status status = leaseResult.getStatus(); + logger.debug("Lease result status: {}", status); + + if (status.isSuccess()) { + logger.debug("Successfully acquired the lock"); + return leaseResult.getValue(); + } + + if (status.getCode() == StatusCode.TIMEOUT) { + logger.debug("Trying to acquire again, retries: {}", retryCount); + continue; + } + + if (!status.getCode().isRetryable(true)) { + status.expectSuccess("Unable to retry acquiring semaphore"); + return null; + } + } + + // TODO: handle timeout and error differently + throw new LockAcquireFailedException(coordinationNodePath, semaphoreName); + } + + @Override + public Listenable getListenable() { + return sessionListenerWrapper; + } + + public CoordinationSession getSession() { + return session; + } +} diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockAcquireFailedException.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockAcquireFailedException.java new file mode 100644 index 0000000..d4ea7b1 --- /dev/null +++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockAcquireFailedException.java @@ -0,0 +1,20 @@ +package tech.ydb.coordination.recipes.example.lib.locks; + +public class LockAcquireFailedException extends RuntimeException { + private final String coordinationNodePath; + private final String semaphoreName; + + public LockAcquireFailedException(String coordinationNodePath, String semaphoreName) { + super("Failed to acquire semaphore=" + semaphoreName + ", on coordination node=" + coordinationNodePath); + this.coordinationNodePath = coordinationNodePath; + this.semaphoreName = semaphoreName; + } + + public String getCoordinationNodePath() { + return coordinationNodePath; + } + + public String getSemaphoreName() { + return semaphoreName; + } +} diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockAlreadyAcquiredException.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockAlreadyAcquiredException.java new file mode 100644 index 0000000..1d1009a --- /dev/null +++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockAlreadyAcquiredException.java @@ -0,0 +1,20 @@ +package tech.ydb.coordination.recipes.example.lib.locks; + +public class LockAlreadyAcquiredException extends RuntimeException { + private final String coordinationNodePath; + private final String semaphoreName; + + public LockAlreadyAcquiredException(String coordinationNodePath, String semaphoreName) { + super("Semaphore=" + semaphoreName + " on path=" + coordinationNodePath + " is already acquired"); + this.coordinationNodePath = coordinationNodePath; + this.semaphoreName = semaphoreName; + } + + public String getCoordinationNodePath() { + return coordinationNodePath; + } + + public String getSemaphoreName() { + return semaphoreName; + } +} diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockInternals.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockInternals.java new file mode 100644 index 0000000..57d16bf --- /dev/null +++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/LockInternals.java @@ -0,0 +1,267 @@ +package tech.ydb.coordination.recipes.example.lib.locks; + +import java.io.Closeable; +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; + +import javax.annotation.concurrent.ThreadSafe; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import tech.ydb.coordination.CoordinationClient; +import tech.ydb.coordination.CoordinationSession; +import tech.ydb.coordination.SemaphoreLease; +import tech.ydb.coordination.description.SemaphoreDescription; +import tech.ydb.coordination.recipes.example.lib.util.Listenable; +import tech.ydb.coordination.recipes.example.lib.util.ListenableProvider; +import tech.ydb.coordination.recipes.example.lib.util.SessionListenerWrapper; +import tech.ydb.coordination.settings.DescribeSemaphoreMode; +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; + +@ThreadSafe +class LockInternals implements ListenableProvider, Closeable { + private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30); + private static final Logger logger = LoggerFactory.getLogger(LockInternals.class); + + private final String coordinationNodePath; + private final String semaphoreName; + private final CoordinationSession session; + private final SessionListenerWrapper sessionListenerWrapper; + + private CompletableFuture sessionConnectionTask = null; + private volatile SemaphoreLease processLease = null; // TODO: volatile? + + LockInternals( + CoordinationClient client, + String coordinationNodePath, + String lockName + ) { + this.coordinationNodePath = coordinationNodePath; + this.semaphoreName = lockName; + this.session = client.createSession(coordinationNodePath); + this.sessionListenerWrapper = new SessionListenerWrapper(session); + } + + public void start() { + this.sessionConnectionTask = session.connect().thenApply(status -> { + logger.debug("Session connection status: {}", status); + return status; + }); + + Consumer listener = state -> { + switch (state) { + case RECONNECTED: { + logger.debug("Session RECONNECTED"); + reconnect(); + break; + } + case CLOSED: { + logger.debug("Session CLOSED, releasing lock"); + internalRelease(); + break; + } + case LOST: { + logger.debug("Session LOST, releasing lock"); + internalRelease(); + break; + } + } + }; + + session.addStateListener(listener); + } + + private CoordinationSession connectedSession() { + try { + sessionConnectionTask.get().expectSuccess("Unable to connect to session on: " + coordinationNodePath); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + return session; + } + + private void reconnect() { + CoordinationSession coordinationSession = connectedSession(); + coordinationSession.describeSemaphore( + semaphoreName, + DescribeSemaphoreMode.WITH_OWNERS_AND_WAITERS + ).thenAccept(result -> { + if (!result.isSuccess()) { + logger.error("Unable to describe semaphore {}", semaphoreName); + return; + } + SemaphoreDescription semaphoreDescription = result.getValue(); + SemaphoreDescription.Session owner = semaphoreDescription.getOwnersList().getFirst(); + if (owner.getId() != coordinationSession.getId()) { + logger.warn( + "Current session with id: {} lost lease after reconnection on semaphore: {}", + owner.getId(), + semaphoreName + ); + internalRelease(); + } + }); + } + + public boolean tryAcquire(@Nullable Duration duration, boolean exclusive, byte[] data) throws Exception { + logger.debug("Trying to acquire with deadline: {}", duration); + Instant deadline = Instant.now().plus(duration); + return safeAcquire(deadline, exclusive, data); + } + + public boolean release() { + return internalRelease(); + } + + // TODO: interruptible? + private synchronized boolean internalRelease() { + logger.debug("Trying to release"); + if (processLease == null) { + logger.debug("Already released"); + return false; + } + + try { + return processLease.release().thenApply(it -> { + logger.debug("Released lock"); + processLease = null; + return true; + }).get(); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + /** + * @param deadline + * @return true - if successfully acquired lock + * @throws Exception + * @throws LockAlreadyAcquiredException + * @throws LockAcquireFailedException + */ + // TODO: deadlock? Move synchronized? + private synchronized boolean safeAcquire( + @Nullable Instant deadline, + boolean exclusive, + byte[] data + ) throws Exception { + if (processLease != null) { + logger.debug("Already acquired lock: {}", semaphoreName); + throw new LockAlreadyAcquiredException(coordinationNodePath, semaphoreName); + } + + Optional lease = tryBlockingLock(deadline, true, data); + if (lease.isPresent()) { + processLease = lease.get(); + logger.debug("Successfully acquired lock: {}", semaphoreName); + return true; + } + + logger.debug("Unable to acquire lock: {}", semaphoreName); + return false; + } + + private Optional tryBlockingLock( + @Nullable Instant deadline, + boolean exclusive, + byte[] data + ) throws Exception { + int retryCount = 0; + CoordinationSession coordinationSession = connectedSession(); + + while (coordinationSession.getState().isActive() && (deadline == null || Instant.now().isBefore(deadline))) { + retryCount++; + + Duration timeout; + if (deadline == null) { + timeout = DEFAULT_TIMEOUT; + } else { + timeout = Duration.between(Instant.now(), deadline); // TODO: use external Clock instead of Instant? + } + + CompletableFuture> acquireTask = coordinationSession.acquireEphemeralSemaphore( + semaphoreName, exclusive, data, timeout // TODO: change Session API to use deadlines + ); + Result leaseResult; + try { + leaseResult = acquireTask.get(); + } catch (InterruptedException e) { + // If acquire is interrupted, then release immediately + Thread.currentThread().interrupt(); + acquireTask.thenAccept(acquireResult -> { + if (!acquireResult.getStatus().isSuccess()) { + return; + } + SemaphoreLease lease = acquireResult.getValue(); + lease.release(); + }); + throw e; + } + + Status status = leaseResult.getStatus(); + logger.debug("Lease result status: {}", status); + + if (status.isSuccess()) { + logger.debug("Successfully acquired the lock"); + return Optional.of(leaseResult.getValue()); + } + + if (status.getCode() == StatusCode.TIMEOUT) { + logger.debug("Trying to acquire semaphore {} again, retries: {}", semaphoreName, retryCount); + continue; + } + + if (!status.getCode().isRetryable(true)) { + status.expectSuccess("Unable to retry acquiring semaphore"); + throw new LockAcquireFailedException(coordinationNodePath, semaphoreName); + } + } + + if (deadline != null && Instant.now().compareTo(deadline) >= 0) { + return Optional.empty(); + } + + throw new LockAcquireFailedException(coordinationNodePath, semaphoreName); + } + + public String getCoordinationNodePath() { + return coordinationNodePath; + } + + public String getSemaphoreName() { + return semaphoreName; + } + + public CoordinationSession getCoordinationSession() { + return connectedSession(); + } + + public @Nullable SemaphoreLease getProcessLease() { + return processLease; + } + + @Override + public Listenable getListenable() { + return sessionListenerWrapper; + } + + @Override + public void close() { + try { + release(); + } catch (Exception ignored) { + } + + session.close(); + sessionListenerWrapper.clearListeners(); + } +} diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/ReadWriteLock.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/ReadWriteLock.java new file mode 100644 index 0000000..7b5ad67 --- /dev/null +++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/locks/ReadWriteLock.java @@ -0,0 +1,79 @@ +package tech.ydb.coordination.recipes.example.lib.locks; + +import java.time.Duration; + +import tech.ydb.coordination.CoordinationClient; +import tech.ydb.coordination.CoordinationSession; +import tech.ydb.coordination.recipes.example.lib.util.Listenable; +import tech.ydb.coordination.recipes.example.lib.util.ListenableProvider; + +public class ReadWriteLock { + private final InternalLock readLock; + private final InternalLock writeLock; + + public ReadWriteLock( + CoordinationClient client, + String coordinationNodePath, + String lockName + ) { + LockInternals lockInternals = new LockInternals( + client, coordinationNodePath, lockName + ); + lockInternals.start(); + // TODO: Share same lockInternals? + this.readLock = new InternalLock(lockInternals, false); + this.writeLock = new InternalLock(lockInternals, true); + } + + public InterProcessLock writeLock() { + return readLock; + } + + public InterProcessLock readLock() { + return writeLock; + } + + private static class InternalLock implements InterProcessLock, ListenableProvider { + private final LockInternals lockInternals; + private final boolean isExclisive; + + private InternalLock(LockInternals lockInternals, boolean isExclisive) { + this.lockInternals = lockInternals; + this.isExclisive = isExclisive; + } + + @Override + public void acquire() throws Exception { + lockInternals.tryAcquire( + null, + isExclisive, + null + ); + } + + @Override + public boolean acquire(Duration waitDuration) throws Exception { + return lockInternals.tryAcquire( + waitDuration, + isExclisive, + null + ); + } + + @Override + public boolean release() { + return lockInternals.release(); + } + + @Override + public boolean isAcquiredInThisProcess() { + return lockInternals.getProcessLease() != null; + } + + @Override + public Listenable getListenable() { + return lockInternals.getListenable(); + } + } + +} diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/Listenable.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/Listenable.java new file mode 100644 index 0000000..fa887a3 --- /dev/null +++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/Listenable.java @@ -0,0 +1,19 @@ +package tech.ydb.coordination.recipes.example.lib.util; + +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; + +public interface Listenable { + void addListener(Consumer listener); + + /** + * Listener call will be processed in executor + * @param listener + * @param executor + */ + void addListener(Consumer listener, ExecutorService executor); + + void removeListener(Consumer listener); + + void clearListeners(); +} diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/ListenableProvider.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/ListenableProvider.java new file mode 100644 index 0000000..f4b80f2 --- /dev/null +++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/ListenableProvider.java @@ -0,0 +1,29 @@ +package tech.ydb.coordination.recipes.example.lib.util; + + +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; + +public interface ListenableProvider extends Listenable { + Listenable getListenable(); + + @Override + default void addListener(Consumer listener) { + getListenable().addListener(listener); + } + + @Override + default void addListener(Consumer listener, ExecutorService executor) { + getListenable().addListener(listener, executor); + } + + @Override + default void removeListener(Consumer listener) { + getListenable().removeListener(listener); + } + + @Override + default void clearListeners() { + getListenable().clearListeners(); + } +} diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/SessionListenerWrapper.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/SessionListenerWrapper.java new file mode 100644 index 0000000..0ef56d7 --- /dev/null +++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/util/SessionListenerWrapper.java @@ -0,0 +1,54 @@ +package tech.ydb.coordination.recipes.example.lib.util; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; + +import tech.ydb.coordination.CoordinationSession; +import tech.ydb.coordination.CoordinationSession.State; + +public class SessionListenerWrapper implements Listenable { + private final CoordinationSession session; + /** + * key - original (external) consumer, value - consumer wrapper or original consumer depending on executor + */ + private final Map, Consumer> listenersMapping = new HashMap<>(); + + public SessionListenerWrapper(CoordinationSession session) { + this.session = session; + } + + @Override + public void addListener(Consumer listener) { + if (listenersMapping.containsKey(listener)) { + return; + } + + listenersMapping.put(listener, listener); + session.addStateListener(listener); + } + + @Override + public void addListener(Consumer listener, ExecutorService executor) { + if (listenersMapping.containsKey(listener)) { + return; + } + + Consumer wrapper = state -> executor.submit(() -> listener.accept(state)); + listenersMapping.put(listener, wrapper); + session.addStateListener(wrapper); + } + + @Override + public void removeListener(Consumer listener) { + Consumer removed = listenersMapping.remove(listener); + session.removeStateListener(removed); + } + + @Override + public void clearListeners() { + listenersMapping.keySet().forEach(this::removeListener); + listenersMapping.clear(); + } +} diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/watch/Participant.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/watch/Participant.java new file mode 100644 index 0000000..568b6a3 --- /dev/null +++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/watch/Participant.java @@ -0,0 +1,61 @@ +package tech.ydb.coordination.recipes.example.lib.watch; + +import java.util.Arrays; +import java.util.Objects; + +public class Participant { + private final long id; + private final byte[] data; + private final long count; + private final boolean isLeader; + + public Participant(long id, byte[] data, long count, boolean isLeader) { + this.id = id; + this.data = data; + this.count = count; + this.isLeader = isLeader; + } + + public long getId() { + return id; + } + + public byte[] getData() { + return data; + } + + public long getCount() { + return count; + } + + public boolean isLeader() { + return isLeader; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Participant that = (Participant) o; + return id == that.id && count == that.count && isLeader == that.isLeader && Objects.deepEquals(data, that.data); + } + + @Override + public int hashCode() { + return Objects.hash(id, Arrays.hashCode(data), count, isLeader); + } + + @Override + public String toString() { + return "Participant{" + + "id=" + id + + ", data=" + Arrays.toString(data) + + ", count=" + count + + ", isLeader=" + isLeader + + '}'; + } +} diff --git a/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/watch/SemaphoreWatchAdapter.java b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/watch/SemaphoreWatchAdapter.java new file mode 100644 index 0000000..d1b982d --- /dev/null +++ b/coordination/recipes/src/main/java/tech/ydb/coordination/recipes/example/lib/watch/SemaphoreWatchAdapter.java @@ -0,0 +1,178 @@ +package tech.ydb.coordination.recipes.example.lib.watch; + +import java.io.Closeable; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import tech.ydb.coordination.CoordinationSession; +import tech.ydb.coordination.description.SemaphoreDescription; +import tech.ydb.coordination.description.SemaphoreWatcher; +import tech.ydb.coordination.settings.DescribeSemaphoreMode; +import tech.ydb.coordination.settings.WatchSemaphoreMode; +import tech.ydb.core.Result; +import tech.ydb.core.Status; + +public class SemaphoreWatchAdapter implements Closeable { + private static final Logger logger = LoggerFactory.getLogger(SemaphoreWatchAdapter.class); + + private final CoordinationSession session; + private final String semaphoreName; + + private AtomicReference state; + private Future watchTask; + private volatile WatchData watchData; + + private enum State { + CREATED, + STARTED, + CLOSED + } + + private class WatchData { + final long count; + final byte[] data; + final List waiters; + final List owners; + final List participants; + + WatchData(long count, byte[] data, List waiters, List owners) { + this.count = count; + this.data = data; + this.waiters = waiters; + this.owners = owners; + this.participants = Stream.concat(owners.stream(), waiters.stream()).collect(Collectors.toList()); + } + } + + public SemaphoreWatchAdapter(CoordinationSession session, String semaphoreName) { + this.session = session; + this.semaphoreName = semaphoreName; + this.state = new AtomicReference<>(State.CREATED); + this.watchTask = null; + this.watchData = null; + } + + public List getOwners() { + // TODO: block until initialized or throw exception or return default value or return Optional.empty() + Preconditions.checkState(watchData == null, "Is not yet fetched state"); + + return Collections.unmodifiableList(watchData.owners); // TODO: copy Participant.data[]? + } + + public List getWaiters() { + Preconditions.checkState(watchData == null, "Is not yet fetched state"); + + return Collections.unmodifiableList(watchData.waiters); // TODO: copy Participant.data[]? + } + + public List getParticipants() { + Preconditions.checkState(watchData == null, "Is not yet fetched state"); + + return Collections.unmodifiableList(watchData.participants); // TODO: copy Participant.data[]? + } + + public long getCount() { + Preconditions.checkState(watchData == null, "Is not yet fetched state"); + + return watchData.count; + } + + public byte[] getData() { + Preconditions.checkState(watchData == null, "Is not yet fetched state"); + + return watchData.data.clone(); + } + + public boolean start() { + Preconditions.checkState(state.compareAndSet(State.CREATED, State.STARTED), "Already started or closed"); + + return enqueueWatch(); + } + + private synchronized boolean enqueueWatch() { + if (watchIsQueued() && state.get() == State.STARTED) { + return false; + } + + watchTask = watchSemaphore().thenCompose(status -> { + if (!status.isSuccess()) { + // TODO: stop watching on error? + logger.error("Wailed to watch semaphore: {} with status: {}", semaphoreName, status); + } + + finish(); + return null; + }); + return true; + } + + private boolean watchIsQueued() { + return watchTask != null; + } + + private synchronized void finish() { + watchTask = null; + enqueueWatch(); + } + + private CompletableFuture watchSemaphore() { + return session.watchSemaphore( + semaphoreName, + DescribeSemaphoreMode.WITH_OWNERS_AND_WAITERS, + WatchSemaphoreMode.WATCH_DATA_AND_OWNERS + ).thenCompose(result -> { + Status status = result.getStatus(); + if (!status.isSuccess()) { + return CompletableFuture.completedFuture(status); + } + SemaphoreWatcher watcher = result.getValue(); + saveWatchState(watcher.getDescription()); + return watcher.getChangedFuture().thenApply(Result::getStatus); + }); + } + + private void saveWatchState(SemaphoreDescription description) { + List waitersList = description.getWaitersList().stream().map(it -> new Participant( + it.getId(), + it.getData(), + it.getCount(), + false + )).collect(Collectors.toList()); + List ownersList = description.getOwnersList().stream().map(it -> new Participant( + it.getId(), + it.getData(), + it.getCount(), + true + )).collect(Collectors.toList()); + + watchData = new WatchData( + description.getCount(), + description.getData(), + waitersList, + ownersList + ); + } + + private synchronized void stopWatch() { + Future task = watchTask; + if (task != null) { + task.cancel(true); + } + watchTask = null; + } + + @Override + public void close() { + Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Is not yet started"); + + stopWatch(); + } +} diff --git a/coordination/recipes/src/main/resources/log4j2.xml b/coordination/recipes/src/main/resources/log4j2.xml new file mode 100644 index 0000000..1cf1cde --- /dev/null +++ b/coordination/recipes/src/main/resources/log4j2.xml @@ -0,0 +1,27 @@ + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index 1655ee9..d05d409 100644 --- a/pom.xml +++ b/pom.xml @@ -29,6 +29,7 @@ ydb-cookbook url-shortener-demo jdbc + coordination