Skip to content

Commit

Permalink
Improvements to ClientContext for ensureOpen (apache#5258)
Browse files Browse the repository at this point in the history
These changes are small quality fixes to ensure that
ClientContext.ensureOpen is used when it is needed, and not used when it
isn't. This fixes an issue seen where the client RPC timeout value is
being retrieved from a supplier in a thread pool when returning RPC
transports after a client is closed. In these cases, ensureOpen does not
need to be checked. However, there were a few context API methods where
it was not checked but should have been.

Also, improved the close method to ensure close activities are only
called at most once, and made private and renamed an internal method to
get the client properties from the ClientInfo object, so it's more clear
which properties the method is returning and isn't exposed for misuse.
  • Loading branch information
ctubbsii authored Jan 22, 2025
1 parent 27e5fe7 commit e1038d4
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public class ClientContext implements AccumuloClient {
private ThriftTransportPool thriftTransportPool;
private ZookeeperLockChecker zkLockChecker;

private volatile boolean closed = false;
private final AtomicBoolean closed = new AtomicBoolean();

private SecurityOperations secops = null;
private final TableOperationsImpl tableops;
Expand All @@ -157,22 +157,21 @@ public class ClientContext implements AccumuloClient {
private final Supplier<ZooSession> zooSession;

private void ensureOpen() {
if (closed) {
if (closed.get()) {
throw new IllegalStateException("This client was closed.");
}
}

private ScanServerSelector createScanServerSelector() {
String clazz = ClientProperty.SCAN_SERVER_SELECTOR.getValue(info.getProperties());
String clazz = ClientProperty.SCAN_SERVER_SELECTOR.getValue(getClientProperties());
try {
Class<? extends ScanServerSelector> impl =
Class.forName(clazz).asSubclass(ScanServerSelector.class);
ScanServerSelector scanServerSelector = impl.getDeclaredConstructor().newInstance();

Map<String,String> sserverProps = new HashMap<>();
ClientProperty
.getPrefix(info.getProperties(), ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey())
.forEach((k, v) -> {
ClientProperty.getPrefix(getClientProperties(),
ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey()).forEach((k, v) -> {
sserverProps.put(
k.toString()
.substring(ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey().length()),
Expand Down Expand Up @@ -311,9 +310,8 @@ public AuthenticationToken getAuthenticationToken() {
return getCredentials().getToken();
}

public Properties getProperties() {
ensureOpen();
return info.getProperties();
private Properties getClientProperties() {
return info.getClientProperties();
}

/**
Expand Down Expand Up @@ -396,7 +394,7 @@ static BatchWriterConfig getBatchWriterConfig(Properties props) {
public synchronized BatchWriterConfig getBatchWriterConfig() {
ensureOpen();
if (batchWriterConfig == null) {
batchWriterConfig = getBatchWriterConfig(info.getProperties());
batchWriterConfig = getBatchWriterConfig(getClientProperties());
}
return batchWriterConfig;
}
Expand All @@ -405,6 +403,7 @@ public synchronized BatchWriterConfig getBatchWriterConfig() {
* @return map of live scan server addresses to lock uuids.
*/
public Map<String,Pair<UUID,String>> getScanServers() {
ensureOpen();
Map<String,Pair<UUID,String>> liveScanServers = new HashMap<>();
String root = this.getZooKeeperRoot() + Constants.ZSSERVERS;
var addrs = this.getZooCache().getChildren(root);
Expand Down Expand Up @@ -455,7 +454,7 @@ static ConditionalWriterConfig getConditionalWriterConfig(Properties props) {
public synchronized ConditionalWriterConfig getConditionalWriterConfig() {
ensureOpen();
if (conditionalWriterConfig == null) {
conditionalWriterConfig = getConditionalWriterConfig(info.getProperties());
conditionalWriterConfig = getConditionalWriterConfig(getClientProperties());
}
return conditionalWriterConfig;
}
Expand Down Expand Up @@ -621,6 +620,7 @@ public Map<String,TableId> getTableNameToIdMap() {
}

public Map<NamespaceId,String> getNamespaceIdToNameMap() {
ensureOpen();
return Namespaces.getIdToNameMap(this);
}

Expand Down Expand Up @@ -694,14 +694,15 @@ public BatchScanner createBatchScanner(String tableName, Authorizations authoriz
throws TableNotFoundException {
ensureOpen();
Integer numQueryThreads =
ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS.getInteger(getProperties());
ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS.getInteger(getClientProperties());
Objects.requireNonNull(numQueryThreads);
return createBatchScanner(tableName, authorizations, numQueryThreads);
}

@Override
public BatchScanner createBatchScanner(String tableName)
throws TableNotFoundException, AccumuloSecurityException, AccumuloException {
ensureOpen();
Authorizations auths = securityOperations().getUserAuthorizations(getPrincipal());
return createBatchScanner(tableName, auths);
}
Expand All @@ -718,7 +719,6 @@ public BatchDeleter createBatchDeleter(String tableName, Authorizations authoriz
@Override
public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations,
int numQueryThreads) throws TableNotFoundException {
ensureOpen();
return createBatchDeleter(tableName, authorizations, numQueryThreads, new BatchWriterConfig());
}

Expand Down Expand Up @@ -773,7 +773,7 @@ public Scanner createScanner(String tableName, Authorizations authorizations)
checkArgument(authorizations != null, "authorizations is null");
Scanner scanner =
new ScannerImpl(this, requireNotOffline(getTableId(tableName), tableName), authorizations);
Integer batchSize = ClientProperty.SCANNER_BATCH_SIZE.getInteger(getProperties());
Integer batchSize = ClientProperty.SCANNER_BATCH_SIZE.getInteger(getClientProperties());
if (batchSize != null) {
scanner.setBatchSize(batchSize);
}
Expand Down Expand Up @@ -829,7 +829,7 @@ public synchronized InstanceOperations instanceOperations() {
public Properties properties() {
ensureOpen();
Properties result = new Properties();
getProperties().forEach((key, value) -> {
getClientProperties().forEach((key, value) -> {
if (!key.equals(ClientProperty.AUTH_TOKEN.getKey())) {
result.setProperty((String) key, (String) value);
}
Expand All @@ -844,23 +844,24 @@ public AuthenticationToken token() {

@Override
public synchronized void close() {
closed = true;
if (zooKeeperOpened.get()) {
zooSession.get().close();
}
if (thriftTransportPool != null) {
thriftTransportPool.shutdown();
}
if (tableZooHelper != null) {
tableZooHelper.close();
}
if (scannerReadaheadPool != null) {
scannerReadaheadPool.shutdownNow(); // abort all tasks, client is shutting down
}
if (cleanupThreadPool != null) {
cleanupThreadPool.shutdown(); // wait for shutdown tasks to execute
if (closed.compareAndSet(false, true)) {
if (zooKeeperOpened.get()) {
zooSession.get().close();
}
if (thriftTransportPool != null) {
thriftTransportPool.shutdown();
}
if (tableZooHelper != null) {
tableZooHelper.close();
}
if (scannerReadaheadPool != null) {
scannerReadaheadPool.shutdownNow(); // abort all tasks, client is shutting down
}
if (cleanupThreadPool != null) {
cleanupThreadPool.shutdown(); // wait for shutdown tasks to execute
}
singletonReservation.close();
}
singletonReservation.close();
}

public static class ClientBuilderImpl<T>
Expand Down Expand Up @@ -896,7 +897,7 @@ public static AccumuloClient buildClient(ClientBuilderImpl<AccumuloClient> cbi)
try {
// ClientContext closes reservation unless a RuntimeException is thrown
ClientInfo info = cbi.getClientInfo();
AccumuloConfiguration config = ClientConfConverter.toAccumuloConf(info.getProperties());
var config = ClientConfConverter.toAccumuloConf(info.getClientProperties());
return new ClientContext(reservation, info, config, cbi.getUncaughtExceptionHandler());
} catch (RuntimeException e) {
reservation.close();
Expand Down Expand Up @@ -1080,8 +1081,7 @@ public ZooSession getZooSession() {
}

protected long getTransportPoolMaxAgeMillis() {
ensureOpen();
return ClientProperty.RPC_TRANSPORT_IDLE_TIMEOUT.getTimeInMillis(getProperties());
return ClientProperty.RPC_TRANSPORT_IDLE_TIMEOUT.getTimeInMillis(getClientProperties());
}

public synchronized ThriftTransportPool getTransportPool() {
Expand All @@ -1108,6 +1108,7 @@ public synchronized ZookeeperLockChecker getTServerLockChecker() {
}

public NamespaceMapping getNamespaces() {
ensureOpen();
return namespaces;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public interface ClientInfo {
/**
* @return All Accumulo client properties set for this connection
*/
Properties getProperties();
Properties getClientProperties();

/**
* @return hadoop Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public String getPrincipal() {
}

@Override
public Properties getProperties() {
public Properties getClientProperties() {
Properties result = new Properties();
properties.forEach((key, value) -> result.setProperty((String) key, (String) value));
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public AccumuloClient createAccumuloClient(String user, AuthenticationToken toke

@Override
public Properties getClientProperties() {
return info.getProperties();
return info.getClientProperties();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public boolean saslEnabled() {
}

@Override
public Properties getProperties() {
public Properties getClientProperties() {
Properties properties = ClientConfConverter.toProperties(getSiteConfiguration());
properties.setProperty(ClientProperty.INSTANCE_ZOOKEEPERS.getKey(), getZooKeepers());
properties.setProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT.getKey(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;

import java.util.Properties;

import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
Expand All @@ -43,7 +41,6 @@ public static ServerContext get() {
ConfigurationCopy conf = new ConfigurationCopy(DefaultConfiguration.getInstance());
conf.set(Property.INSTANCE_VOLUMES, "file:///");
expect(context.getConfiguration()).andReturn(conf).anyTimes();
expect(context.getProperties()).andReturn(new Properties()).anyTimes();
return context;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.net.ServerSocket;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.Properties;

import org.apache.accumulo.core.clientImpl.thrift.ClientService.Iface;
import org.apache.accumulo.core.clientImpl.thrift.ClientService.Processor;
Expand Down Expand Up @@ -70,7 +69,6 @@ public void createMockServerContext() {
expect(context.getZooSession()).andReturn(zk).anyTimes();
expect(zk.asReader()).andReturn(null).anyTimes();
expect(zk.asReaderWriter()).andReturn(null).anyTimes();
expect(context.getProperties()).andReturn(new Properties()).anyTimes();
expect(context.getZooKeepers()).andReturn("").anyTimes();
expect(context.getInstanceName()).andReturn("instance").anyTimes();
expect(context.getZooKeepersSessionTimeOut()).andReturn(1).anyTimes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public static void main(final String[] args) throws AccumuloException, TableNotF
default:
throw new RuntimeException("Incorrect usage; expected to be run by test only");
}
try (AccumuloClient client = Accumulo.newClient().from(context.getProperties())
try (AccumuloClient client = Accumulo.newClient().from(context.properties())
.as(creds.getPrincipal(), creds.getToken()).build()) {
client.securityOperations().authenticateUser(creds.getPrincipal(), creds.getToken());
try (Scanner scan =
Expand Down

0 comments on commit e1038d4

Please sign in to comment.