Skip to content

Commit

Permalink
CURATOR-725: Allow for global compression
Browse files Browse the repository at this point in the history
  • Loading branch information
HoustonPutman committed Dec 16, 2024
1 parent ad19795 commit 61f1e56
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ public static class Builder {
private List<AuthInfo> authInfos = null;
private byte[] defaultData = LOCAL_ADDRESS;
private CompressionProvider compressionProvider = DEFAULT_COMPRESSION_PROVIDER;
private boolean globalCompressionEnabled = false;
private ZookeeperFactory zookeeperFactory = DEFAULT_ZOOKEEPER_FACTORY;
private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER;
private boolean canBeReadOnly = false;
Expand Down Expand Up @@ -367,6 +368,18 @@ public Builder compressionProvider(CompressionProvider compressionProvider) {
return this;
}

/**
* By default, each write or read call must explicitly use compression.
* Call this method to enable compression on all read and write calls.
* <p>
* In order to implement filtered compression, use this option and a custom {@link CompressionProvider} that only compresses and decompresses the zNodes that match the desired filter.
* @return this
*/
public Builder enableGlobalCompression() {
this.globalCompressionEnabled = true;
return this;
}

/**
* @param zookeeperFactory the zookeeper factory to use
* @return this
Expand Down Expand Up @@ -542,6 +555,10 @@ public CompressionProvider getCompressionProvider() {
return compressionProvider;
}

public boolean globalCompressionEnabled() {
return globalCompressionEnabled;
}

public ThreadFactory getThreadFactory() {
return threadFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public class CreateBuilderImpl
acling = new ACLing(client.getAclProvider());
createParentsIfNeeded = false;
createParentsAsContainers = false;
compress = false;
compress = client.globalCompressionEnabled();
setDataIfExists = false;
storingStat = null;
ttl = -1;
Expand All @@ -123,7 +123,7 @@ public CreateBuilderImpl(
this.backgrounding = backgrounding;
this.createParentsIfNeeded = createParentsIfNeeded;
this.createParentsAsContainers = createParentsAsContainers;
this.compress = compress;
this.compress = client.globalCompressionEnabled() || compress;
this.setDataIfExists = setDataIfExists;
this.acling = new ACLing(client.getAclProvider(), aclList);
this.storingStat = storingStat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public class CuratorFrameworkImpl implements CuratorFramework {
private final FailedDeleteManager failedDeleteManager;
private final FailedRemoveWatchManager failedRemoveWatcherManager;
private final CompressionProvider compressionProvider;
private final boolean globalCompressionEnabled;
private final ACLProvider aclProvider;
private final NamespaceFacadeCache namespaceFacadeCache;
private final boolean useContainerParentsIfAvailable;
Expand Down Expand Up @@ -184,6 +185,7 @@ public void process(WatchedEvent watchedEvent) {
builder.getSimulatedSessionExpirationPercent(),
builder.getConnectionStateListenerManagerFactory());
compressionProvider = builder.getCompressionProvider();
globalCompressionEnabled = builder.globalCompressionEnabled();
aclProvider = builder.getAclProvider();
state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable();
Expand Down Expand Up @@ -283,6 +285,7 @@ protected CuratorFrameworkImpl(CuratorFrameworkImpl parent) {
failedDeleteManager = parent.failedDeleteManager;
failedRemoveWatcherManager = parent.failedRemoveWatcherManager;
compressionProvider = parent.compressionProvider;
globalCompressionEnabled = parent.globalCompressionEnabled;
aclProvider = parent.aclProvider;
namespaceFacadeCache = parent.namespaceFacadeCache;
namespace = parent.namespace;
Expand Down Expand Up @@ -642,6 +645,10 @@ CompressionProvider getCompressionProvider() {
return compressionProvider;
}

boolean globalCompressionEnabled() {
return globalCompressionEnabled;
}

boolean useContainerParentsIfAvailable() {
return useContainerParentsIfAvailable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<S
responseStat = null;
watching = new Watching(client);
backgrounding = new Backgrounding();
decompress = false;
decompress = client.globalCompressionEnabled();
}

public GetDataBuilderImpl(
Expand All @@ -59,7 +59,7 @@ public GetDataBuilderImpl(
this.responseStat = responseStat;
this.watching = new Watching(client, watcher);
this.backgrounding = backgrounding;
this.decompress = decompress;
this.decompress = client.globalCompressionEnabled() || decompress;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ public class SetDataBuilderImpl
this.client = client;
backgrounding = new Backgrounding();
version = -1;
compress = false;
compress = client.globalCompressionEnabled();
}

public SetDataBuilderImpl(CuratorFrameworkImpl client, Backgrounding backgrounding, int version, boolean compress) {
this.client = client;
this.backgrounding = backgrounding;
this.version = version;
this.compress = compress;
this.compress = client.globalCompressionEnabled() || compress;
}

<T> TransactionSetDataBuilder<T> asTransactionSetDataBuilder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class TempGetDataBuilderImpl implements TempGetDataBuilder {
TempGetDataBuilderImpl(CuratorFrameworkImpl client) {
this.client = client;
responseStat = null;
decompress = false;
decompress = client.globalCompressionEnabled();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,32 @@ public void testSetData() throws Exception {
}
}

@Test
public void testSetDataGlobalCompression() throws Exception {
final byte[] data = "here's a string".getBytes();

CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(server.getConnectString())
.retryPolicy(new RetryOneTime(1))
.enableGlobalCompression()
.build();
try {
client.start();

// Write without explicit compression, read with explicit compression
client.create().creatingParentsIfNeeded().forPath("/a/b/c", data);
assertArrayEquals(data, client.getData().decompressed().forPath("/a/b/c"));
assertNotEquals(data.length, client.checkExists().forPath("/a/b/c").getDataLength());

// Write with explicit compression, read without explicit compression
client.setData().compressed().forPath("/a/b/c", data);
assertEquals(data.length, client.getData().forPath("/a/b/c").length);
assertNotEquals(data.length, client.checkExists().forPath("/a/b/c").getDataLength());
} finally {
CloseableUtils.closeQuietly(client);
}
}

@Test
public void testSimple() throws Exception {
final byte[] data = "here's a string".getBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,33 @@ public void testCreateCompressedAndUncompressed() throws Exception {
CloseableUtils.closeQuietly(client);
}
}

@Test
public void testGlobalCompression() throws Exception {
final String path = "/a";
final byte[] data = "here's a string".getBytes();

CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(server.getConnectString())
.retryPolicy(new RetryOneTime(1))
.enableGlobalCompression()
.build();
try {
client.start();

// Create compressed data in a transaction
CuratorOp op = client.transactionOp().create().forPath(path, data);
client.transaction().forOperations(op);
assertArrayEquals(data, client.getData().decompressed().forPath(path));
assertNotEquals(data.length, client.checkExists().forPath(path).getDataLength());

// Set compressed data in transaction
op = client.transactionOp().setData().forPath(path, data);
client.transaction().forOperations(op);
assertArrayEquals(data, client.getData().decompressed().forPath(path));
assertNotEquals(data.length, client.checkExists().forPath(path).getDataLength());
} finally {
CloseableUtils.closeQuietly(client);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,59 @@ public void testCreateCompressedAndUncompressed() throws Exception {
CloseableUtils.closeQuietly(client);
}
}

@Test
public void testGlobalCompression() throws Exception {
final String path1 = "/a";
final String path2 = "/b";

final byte[] data1 = "here's a string".getBytes();
final byte[] data2 = "here's another string".getBytes();

CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(server.getConnectString())
.retryPolicy(new RetryOneTime(1))
.enableGlobalCompression()
.build();
try {
client.start();

// Create the nodes
client.inTransaction()
.create()
.forPath(path1, data1)
.and()
.create()
.forPath(path2, data2)
.and()
.commit();

// Check they exist
assertNotNull(client.checkExists().forPath(path1));
assertNotEquals(data1.length, client.checkExists().forPath(path1).getDataLength());
assertNotNull(client.checkExists().forPath(path2));
assertNotEquals(data2.length, client.checkExists().forPath(path2).getDataLength());
assertArrayEquals(data1, client.getData().decompressed().forPath(path1));
assertArrayEquals(data2, client.getData().decompressed().forPath(path2));

// Set the nodes, path1 compressed, path2 uncompressed.
client.inTransaction()
.setData()
.forPath(path1, data2)
.and()
.setData()
.forPath(path2, data1)
.and()
.commit();

assertNotNull(client.checkExists().forPath(path1));
assertNotEquals(data2.length, client.checkExists().forPath(path1).getDataLength());
assertNotNull(client.checkExists().forPath(path2));
assertNotEquals(data1.length, client.checkExists().forPath(path2).getDataLength());
assertArrayEquals(data2, client.getData().decompressed().forPath(path1));
assertArrayEquals(data1, client.getData().decompressed().forPath(path2));
} finally {
CloseableUtils.closeQuietly(client);
}
}
}

0 comments on commit 61f1e56

Please sign in to comment.