Skip to content

Commit

Permalink
Merge branch 'main' into 4973-new-monitor-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
dlmarion committed Jan 31, 2025
2 parents a4cbf62 + c438520 commit eeb3d42
Show file tree
Hide file tree
Showing 36 changed files with 3,108 additions and 403 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,23 @@ public static class HAServiceLockWatcher implements AccumuloLockWatcher {
private static final Logger LOG = LoggerFactory.getLogger(HAServiceLockWatcher.class);

private final Type server;
private final Supplier<Boolean> shutdownComplete;
private volatile boolean acquiredLock = false;
private volatile boolean failedToAcquireLock = false;

public HAServiceLockWatcher(Type server) {
public HAServiceLockWatcher(Type server, Supplier<Boolean> shutdownComplete) {
this.server = server;
this.shutdownComplete = shutdownComplete;
}

@Override
public void lostLock(LockLossReason reason) {
Halt.halt(server + " lock in zookeeper lost (reason = " + reason + "), exiting!", -1);
if (shutdownComplete.get()) {
LOG.warn("{} lost lock (reason = {}), not halting because shutdown is complete.", server,
reason);
} else {
Halt.halt(server + " lock in zookeeper lost (reason = " + reason + "), exiting!", -1);
}
}

@Override
Expand Down Expand Up @@ -147,24 +154,27 @@ public static class ServiceLockWatcher implements LockWatcher {
private static final Logger LOG = LoggerFactory.getLogger(ServiceLockWatcher.class);

private final Type server;
private final Supplier<Boolean> shuttingDown;
private final Supplier<Boolean> shutdownComplete;
private final Consumer<Type> lostLockAction;

public ServiceLockWatcher(Type server, Supplier<Boolean> shuttingDown,
public ServiceLockWatcher(Type server, Supplier<Boolean> shutdownComplete,
Consumer<Type> lostLockAction) {
this.server = server;
this.shuttingDown = shuttingDown;
this.shutdownComplete = shutdownComplete;
this.lostLockAction = lostLockAction;
}

@Override
public void lostLock(final LockLossReason reason) {
Halt.halt(1, () -> {
if (!shuttingDown.get()) {
if (shutdownComplete.get()) {
LOG.warn("{} lost lock (reason = {}), not halting because shutdown is complete.", server,
reason);
} else {
Halt.halt(1, () -> {
LOG.error("{} lost lock (reason = {}), exiting.", server, reason);
}
lostLockAction.accept(server);
});
lostLockAction.accept(server);
});
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,15 @@ default C getManagerConnection(Logger log, ThriftClientTypes<C> type, ClientCont
return null;
}

HostAndPort manager = HostAndPort.fromString(managers.iterator().next().toHostPortString());
final String managerLocation = managers.iterator().next().toHostPortString();
if (managerLocation.equals("0.0.0.0:0")) {
// The Manager creates the lock with an initial address of 0.0.0.0:0, then
// later updates the lock contents with the actual address after everything
// is started.
log.debug("Manager is up and lock acquired, waiting for address...");
return null;
}
HostAndPort manager = HostAndPort.fromString(managerLocation);
try {
// Manager requests can take a long time: don't ever time out
return ThriftUtil.getClientNoTimeout(type, manager, context);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.rpc.clients;

import java.io.UncheckedIOException;
import java.net.UnknownHostException;

import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.process.thrift.ServerProcessService.Client;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;

import com.google.common.net.HostAndPort;

public class ServerProcessServiceThriftClient extends ThriftClientTypes<Client> {

protected ServerProcessServiceThriftClient(String serviceName) {
super(serviceName, new Client.Factory());
}

public Client getServerProcessConnection(ClientContext context, Logger log, String hostname,
int port) {
HostAndPort serverProcess = HostAndPort.fromParts(hostname, port);
try {
// Manager requests can take a long time: don't ever time out
return ThriftUtil.getClientNoTimeout(this, serverProcess, context);
} catch (TTransportException tte) {
Throwable cause = tte.getCause();
if (cause instanceof UnknownHostException) {
// do not expect to recover from this
throw new UncheckedIOException((UnknownHostException) cause);
}
log.debug("Failed to connect to process at " + serverProcess + ", will retry... ", tte);
return null;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ public abstract class ThriftClientTypes<C extends TServiceClient> {
public static final TabletManagementClientServiceThriftClient TABLET_MGMT =
new TabletManagementClientServiceThriftClient("tablet");

public static final ServerProcessServiceThriftClient SERVER_PROCESS =
new ServerProcessServiceThriftClient("process");

/**
* execute method with supplied client returning object of type R
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public enum ThreadPoolNames {
TSERVER_ASSIGNMENT_POOL("accumulo.pool.tserver.assignment"),
TSERVER_MIGRATIONS_POOL("accumulo.pool.tserver.migrations"),
TSERVER_MINOR_COMPACTOR_POOL("accumulo.pool.tserver.minor.compactor"),
TSERVER_SHUTDOWN_UNLOAD_TABLET_POOL("accumulo.pool.tserver.shutdown.tablet.unload"),
TSERVER_SUMMARY_FILE_RETRIEVER_POOL("accumulo.pool.tserver.summary.file.retriever.pool"),
TSERVER_SUMMARY_PARTITION_POOL("accumulo.pool.tserver.summary.partition"),
TSERVER_SUMMARY_REMOTE_POOL("accumulo.pool.tserver.summary.remote"),
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scripts/generate-thrift.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
[[ -z $REQUIRED_THRIFT_VERSION ]] && REQUIRED_THRIFT_VERSION='0.17.0'
[[ -z $INCLUDED_MODULES ]] && INCLUDED_MODULES=()
[[ -z $BASE_OUTPUT_PACKAGE ]] && BASE_OUTPUT_PACKAGE='org.apache.accumulo.core'
[[ -z $PACKAGES_TO_GENERATE ]] && PACKAGES_TO_GENERATE=(gc manager tabletserver securityImpl clientImpl dataImpl compaction tabletingest tablet tabletscan metrics)
[[ -z $PACKAGES_TO_GENERATE ]] && PACKAGES_TO_GENERATE=(process gc manager tabletserver securityImpl clientImpl dataImpl compaction tabletingest tablet tabletscan metrics)
[[ -z $BUILD_DIR ]] && BUILD_DIR='target'
[[ -z $LANGUAGES_TO_GENERATE ]] && LANGUAGES_TO_GENERATE=(java)
[[ -z $FINAL_DIR ]] && FINAL_DIR='src/main'
Expand Down
1 change: 1 addition & 0 deletions core/src/main/spotbugs/exclude-filter.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
<Package name="org.apache.accumulo.core.dataImpl.thrift" />
<Package name="org.apache.accumulo.core.gc.thrift" />
<Package name="org.apache.accumulo.core.manager.thrift" />
<Package name="org.apache.accumulo.core.process.thrift" />
<Package name="org.apache.accumulo.core.securityImpl.thrift" />
<Package name="org.apache.accumulo.core.tablet.thrift" />
<Package name="org.apache.accumulo.core.tabletingest.thrift" />
Expand Down
Loading

0 comments on commit eeb3d42

Please sign in to comment.