Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CURATOR-719: Fix orSetData for parallel create calls #510

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1147,23 +1147,26 @@ public String call() throws Exception {

if (createdPath == null) {
try {
if (failBeforeNextCreateForTesting) {
failBeforeNextCreateForTesting = false;
throw new KeeperException.ConnectionLossException();
}
createdPath = client.getZooKeeper().create(path, data, aclList, createMode, storingStat, ttl);
} catch (KeeperException.NoNodeException e) {
if (createParentsIfNeeded) {
ZKPaths.mkdirs(
client.getZooKeeper(),
path,
false,
acling.getACLProviderForParents(),
createParentsAsContainers);
createdPath = client.getZooKeeper()
.create(path, data, acling.getAclList(path), createMode, storingStat, ttl);
} else {
throw e;
try {
if (failBeforeNextCreateForTesting) {
failBeforeNextCreateForTesting = false;
throw new KeeperException.ConnectionLossException();
}
createdPath =
client.getZooKeeper().create(path, data, aclList, createMode, storingStat, ttl);
} catch (KeeperException.NoNodeException e) {
if (createParentsIfNeeded) {
ZKPaths.mkdirs(
client.getZooKeeper(),
path,
false,
acling.getACLProviderForParents(),
createParentsAsContainers);
createdPath = client.getZooKeeper()
.create(path, data, acling.getAclList(path), createMode, storingStat, ttl);
} else {
throw e;
}
}
} catch (KeeperException.NodeExistsException e) {
if (setDataIfExists) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.curator.framework.CuratorFramework;
Expand All @@ -33,6 +37,7 @@
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CreateBuilderMain;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.PathAndBytesable;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.utils.CloseableUtils;
Expand Down Expand Up @@ -365,18 +370,37 @@ public void testCreateWithParentsWithoutAclInNamespaceBackground() throws Except
}

private void check(
CuratorFramework client, CreateBuilderMain builder, String path, byte[] data, boolean expectedSuccess)
CuratorFramework client,
PathAndBytesable<String> builder,
String path,
byte[] data,
boolean expectedSuccess)
throws Exception {
check(
client,
builder,
path,
data,
0,
(expectedSuccess) ? KeeperException.Code.OK : KeeperException.Code.NODEEXISTS);
}

private void check(
CuratorFramework client,
PathAndBytesable<String> builder,
String path,
byte[] data,
int expectedVersion,
KeeperException.Code expectedCode)
throws Exception {
int expectedCode =
(expectedSuccess) ? KeeperException.Code.OK.intValue() : KeeperException.Code.NODEEXISTS.intValue();
try {
builder.forPath(path, data);
assertEquals(expectedCode, KeeperException.Code.OK.intValue());
assertEquals(expectedCode, KeeperException.Code.OK);
Stat stat = new Stat();
byte[] actualData = client.getData().storingStatIn(stat).forPath(path);
assertTrue(IdempotentUtils.matches(0, data, stat.getVersion(), actualData));
assertTrue(IdempotentUtils.matches(expectedVersion, data, stat.getVersion(), actualData));
} catch (KeeperException e) {
assertEquals(expectedCode, e.getCode());
assertEquals(expectedCode.intValue(), e.getCode());
}
}

Expand Down Expand Up @@ -540,6 +564,65 @@ public void testIdempotentCreateConnectionLoss() throws Exception {
}
}

/**
* Tests all cases of create().orSetData()
*/
@Test
public void testOrSetData() throws Exception {
CuratorFramework client = createClient(new DefaultACLProvider());
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
try {
client.start();

Stat stat = new Stat();

String path = "/idpcreate";
String pathWithParents1 = "/idpcreate/1/a/b/c/d";
String pathWithParents2 = "/idpcreate/2/a/b/c/d";
byte[] data1 = new byte[] {1, 2, 3};
byte[] data2 = new byte[] {4, 5, 6};

// first and second create should succeed with the same path and different data
check(client, client.create().orSetData(), path, data1, 0, KeeperException.Code.OK);
check(client, client.create().orSetData(), path, data2, 1, KeeperException.Code.OK);
check(client, client.create(), path, data2, false);

// without creatingParentsIfNeeded, it should fail
check(client, client.create().orSetData(), pathWithParents1, data1, 0, KeeperException.Code.NONODE);

// with creatingParentsIfNeeded, it should succeed and succeed a second time as well
check(
client,
client.create().orSetData().creatingParentsIfNeeded(),
pathWithParents1,
data1,
0,
KeeperException.Code.OK);
check(client, client.create().orSetData(), pathWithParents1, data2, 1, KeeperException.Code.OK);

// Check that calling the same create().orSetData() in parallel is ok
Callable<Exception> setData = () -> {
try {
client.create().orSetData().creatingParentsIfNeeded().forPath(pathWithParents2, data2);
} catch (Exception e) {
return e;
}
return null;
};
Future<Exception> f1 = executor.submit(setData);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may be very lucky and not reproduce the problem (if the machine is "slow" and the two operations don't run concurrently)

maybe we can make this a little bit nasty and schedule many operations in parallel.
It would be great to have a way to intercept the code paths and verify that we have exercised all the possible branches. (Mockito?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may be very lucky and not reproduce the problem (if the machine is "slow" and the two operations don't run concurrently)

For concurrent sensitive test, I usually run with help from IDE "until failure" or "run x times". I, personally, think "reproduce" means that it will reproduce the bug given enough runs(tens to hundreds depends on context).

Mockito?

I prefer to run multiple times over mock if the number of runs is small. Mock forces us to think different and complicate things.

I tested this after reverting the fix, it failed in the first runs without resorting to "run x times" on my 2015 macbook pro.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with Mockito I mean that we can use a spy or override some method to ensure that we are hitting the expected code path.

I tested this after reverting the fix, it failed in the first runs without resorting to "run x times" on my 2015 macbook pro

let me try

Future<Exception> f2 = executor.submit(setData);
assertNull(f1.get(), "Exception thrown during 1st parallel create");
assertNull(f2.get(), "Exception thrown during 2nd parallel create");

byte[] foundData = client.getData().storingStatIn(stat).forPath(pathWithParents2);
assertArrayEquals(data2, foundData, "Data does not match after parallel creates");
assertEquals(1, stat.getVersion(), "Version should be 1 after 2 parallel creates");
} finally {
CloseableUtils.closeQuietly(client);
executor.shutdownNow();
}
}

@Test
public void testCreateProtectedUtils() throws Exception {
try (CuratorFramework client = CuratorFrameworkFactory.builder()
Expand Down
Loading