Skip to content

Commit

Permalink
Merge branch '2.1' into 5137-minc-fail-no-lock
Browse files Browse the repository at this point in the history
  • Loading branch information
dlmarion committed Jan 29, 2025
2 parents 1d9e34f + 712e50e commit 1e360fb
Show file tree
Hide file tree
Showing 62 changed files with 1,853 additions and 1,386 deletions.
1,140 changes: 559 additions & 581 deletions assemble/bin/accumulo-cluster

Large diffs are not rendered by default.

112 changes: 87 additions & 25 deletions assemble/bin/accumulo-service
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Services:
gc Accumulo garbage collector
monitor Accumulo monitor
manager Accumulo manager
master Deprecated. Accumulo master
master Deprecated. Use 'manager' instead
tserver Accumulo tserver
compaction-coordinator Accumulo compaction coordinator (experimental)
compactor Accumulo compactor (experimental)
Expand Down Expand Up @@ -60,31 +60,69 @@ function rotate_log() {
fi
}

function get_group() {
# Find the group parameter if any
local group="default"
local group_param=""
local param
for param in "$@"; do
if [[ -n $group_param ]]; then
# grab the group if the previous arg was the group param
group="$param"
break
elif [[ $param == '-q' || $param == '-g' ]]; then
# found the group parameter, the next arg is the group
group_param=$param
fi
done
echo "$group"
}

function start_service() {
local service_type=$1
local service_name=$2
shift 2

local pid_file="${ACCUMULO_PID_DIR}/accumulo-${service_name}.pid"
if [[ -f $pid_file ]]; then
pid=$(cat "$pid_file")
if kill -0 "$pid" 2>/dev/null; then
echo "$HOST : ${service_name} already running (${pid})"
exit 0
fi
local build_service_name="false"
if [[ -n $service_name ]]; then
# if service_name is supplied, then we are only starting one instance
servers_per_host=1
else
build_service_name="true"
servers_per_host=${ACCUMULO_CLUSTER_ARG:-1}
fi
echo "Starting $service_name on $HOST"

if [[ ${service_type} == "manager" ]]; then
"${bin}/accumulo" org.apache.accumulo.manager.state.SetGoalState NORMAL
fi
outfile="${ACCUMULO_LOG_DIR}/${service_name}_${HOST}.out"
errfile="${ACCUMULO_LOG_DIR}/${service_name}_${HOST}.err"
rotate_log "$outfile"
rotate_log "$errfile"
for ((process_num = 1; process_num <= servers_per_host; process_num++)); do
if [[ ${build_service_name} == "true" ]]; then
service_name="${service_type}_${group}_${process_num}"
fi
# The ACCUMULO_SERVICE_INSTANCE variable is used in
# accumulo-env.sh to set parameters on the command
# line.
export ACCUMULO_SERVICE_INSTANCE="${service_name}"

nohup "${bin}/accumulo" "$service_type" "$@" >"$outfile" 2>"$errfile" </dev/null &
echo "$!" >"${pid_file}"
local pid_file="${ACCUMULO_PID_DIR}/accumulo-${service_name}.pid"
if [[ -f $pid_file ]]; then
pid=$(cat "$pid_file")
if kill -0 "$pid" 2>/dev/null; then
echo "$HOST : ${service_name} already running (${pid})"
continue
fi
fi
echo "Starting $service_name on $HOST"

if [[ ${service_type} == "manager" ]]; then
"${bin}/accumulo" org.apache.accumulo.manager.state.SetGoalState NORMAL
fi
outfile="${ACCUMULO_LOG_DIR}/${service_name}_${HOST}.out"
errfile="${ACCUMULO_LOG_DIR}/${service_name}_${HOST}.err"
rotate_log "$outfile"
rotate_log "$errfile"

nohup "${bin}/accumulo" "$service_type" "$@" >"$outfile" 2>"$errfile" </dev/null &
echo "$!" >"${pid_file}"

done

# Check the max open files limit and selectively warn
max_files_open=$(ulimit -n)
Expand Down Expand Up @@ -127,6 +165,18 @@ function stop_service() {
local pid_file="${ACCUMULO_PID_DIR}/accumulo-${process}.pid"
control_process "TERM" "$process" "$pid_file"
done
elif [[ -n $ACCUMULO_CLUSTER_ARG ]]; then

servers_per_host=${ACCUMULO_CLUSTER_ARG:-1}
group=$(get_group "$@")

for ((process_num = 1; process_num <= servers_per_host; process_num++)); do
service_name="${service_type}_${group}_${process_num}"
echo "Stopping service process: $service_name"
local pid_file="${ACCUMULO_PID_DIR}/accumulo-${service_name}.pid"
control_process "TERM" "$service_name" "$pid_file"
done

else
echo "Stopping service process: $service_name"
local pid_file="${ACCUMULO_PID_DIR}/accumulo-${service_name}.pid"
Expand All @@ -144,6 +194,18 @@ function kill_service() {
local pid_file="${ACCUMULO_PID_DIR}/accumulo-${process}.pid"
control_process "KILL" "$process" "$pid_file"
done
elif [[ -n $ACCUMULO_CLUSTER_ARG ]]; then

servers_per_host=${ACCUMULO_CLUSTER_ARG:-1}
group=$(get_group "$@")

for ((process_num = 1; process_num <= servers_per_host; process_num++)); do
service_name="${service_type}_${group}_${process_num}"
echo "Stopping service process: $service_name"
local pid_file="${ACCUMULO_PID_DIR}/accumulo-${service_name}.pid"
control_process "KILL" "$service_name" "$pid_file"
done

else
local pid_file="${ACCUMULO_PID_DIR}/accumulo-${service_name}.pid"
control_process "KILL" "$service_name" "$pid_file"
Expand Down Expand Up @@ -182,6 +244,9 @@ function main() {
export conf="${ACCUMULO_CONF_DIR:-${basedir}/conf}"
export lib="${basedir}/lib"

group=$(get_group "$@")
export ACCUMULO_RESOURCE_GROUP="$group"

if [[ -f "${conf}/accumulo-env.sh" ]]; then
#shellcheck source=../conf/accumulo-env.sh
source "${conf}/accumulo-env.sh"
Expand All @@ -200,7 +265,7 @@ function main() {
local service_type="$1"
local command_name="$2"
shift 2
local service_name=$service_type
local service_name=""
local all_flag=false

if [[ $service_type == "master" ]]; then
Expand All @@ -209,7 +274,7 @@ function main() {
fi

# Check and see if accumulo-cluster is calling this script
if [[ -z $ACCUMULO_SERVICE_INSTANCE ]]; then
if [[ -z $ACCUMULO_CLUSTER_ARG ]]; then
# The rest of the arguments are from a user
if [[ $1 == "--all" ]]; then
all_flag=true
Expand All @@ -219,9 +284,6 @@ function main() {
service_name="$1"
fi
fi
# Use the special bash env var from accumulo-cluster
else
service_name=${service_type}${ACCUMULO_SERVICE_INSTANCE}
fi

case "$service_type" in
Expand All @@ -234,10 +296,10 @@ function main() {
start_service "$service_type" "$service_name" "$@"
;;
stop)
stop_service "$service_type" "$service_name" $all_flag
stop_service "$service_type" "$service_name" $all_flag "$@"
;;
kill)
kill_service "$service_type" "$service_name" $all_flag
kill_service "$service_type" "$service_name" $all_flag "$@"
;;
list)
list_processes "$service_type"
Expand Down
32 changes: 21 additions & 11 deletions assemble/conf/accumulo-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -89,23 +89,33 @@ JAVA_OPTS=(
## JVM options set for individual applications
# cmd is set by calling script that sources this env file
#shellcheck disable=SC2154
case "$cmd" in
manager | master) JAVA_OPTS=('-Xmx512m' '-Xms512m' "${JAVA_OPTS[@]}") ;;
monitor) JAVA_OPTS=('-Xmx256m' '-Xms256m' "${JAVA_OPTS[@]}") ;;
gc) JAVA_OPTS=('-Xmx256m' '-Xms256m' "${JAVA_OPTS[@]}") ;;
tserver) JAVA_OPTS=('-Xmx768m' '-Xms768m' "${JAVA_OPTS[@]}") ;;
compaction-coordinator) JAVA_OPTS=('-Xmx512m' '-Xms512m' "${JAVA_OPTS[@]}") ;;
compactor) JAVA_OPTS=('-Xmx256m' '-Xms256m' "${JAVA_OPTS[@]}") ;;
sserver) JAVA_OPTS=('-Xmx512m' '-Xms512m' "${JAVA_OPTS[@]}") ;;
*) JAVA_OPTS=('-Xmx256m' '-Xms64m' "${JAVA_OPTS[@]}") ;;
case "${ACCUMULO_RESOURCE_GROUP:-default}" in
default)
# shellcheck disable=SC2154
# $cmd is exported in the accumulo script, but not the accumulo-service script
case "$cmd" in
manager | master) JAVA_OPTS=('-Xmx512m' '-Xms512m' "${JAVA_OPTS[@]}") ;;
monitor) JAVA_OPTS=('-Xmx256m' '-Xms256m' "${JAVA_OPTS[@]}") ;;
gc) JAVA_OPTS=('-Xmx256m' '-Xms256m' "${JAVA_OPTS[@]}") ;;
tserver) JAVA_OPTS=('-Xmx768m' '-Xms768m' "${JAVA_OPTS[@]}") ;;
compaction-coordinator) JAVA_OPTS=('-Xmx512m' '-Xms512m' "${JAVA_OPTS[@]}") ;;
compactor) JAVA_OPTS=('-Xmx256m' '-Xms256m' "${JAVA_OPTS[@]}") ;;
sserver) JAVA_OPTS=('-Xmx512m' '-Xms512m' "${JAVA_OPTS[@]}") ;;
*) JAVA_OPTS=('-Xmx256m' '-Xms64m' "${JAVA_OPTS[@]}") ;;
esac
;;
*)
echo "ACCUMULO_RESOURCE_GROUP named $ACCUMULO_RESOURCE_GROUP is not configured in accumulo-env.sh"
exit 1
;;
esac

## JVM options set for logging. Review log4j2.properties file to see how they are used.
JAVA_OPTS=("-Daccumulo.log.dir=${ACCUMULO_LOG_DIR}"
"-Daccumulo.application=${cmd}${ACCUMULO_SERVICE_INSTANCE}_$(hostname)"
"-Daccumulo.application=${ACCUMULO_SERVICE_INSTANCE}_$(hostname)"
"-Daccumulo.metrics.service.instance=${ACCUMULO_SERVICE_INSTANCE}"
"-Dlog4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector"
"-Dotel.service.name=${cmd}${ACCUMULO_SERVICE_INSTANCE}"
"-Dotel.service.name=${ACCUMULO_SERVICE_INSTANCE}"
"${JAVA_OPTS[@]}"
)

Expand Down
5 changes: 5 additions & 0 deletions assemble/conf/accumulo.properties
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,8 @@ instance.secret=DEFAULT

## Set to false if 'accumulo-util build-native' fails
tserver.memory.maps.native.enabled=true

## (optional) include additional property files for a resource group
## based on the ACCUMULO_RESOURCE_GROUP env var set in accumulo-service
#include=group-${env:ACCUMULO_RESOURCE_GROUP}.properties
#includeOptional=group-${env:ACCUMULO_RESOURCE_GROUP}.properties
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ static ContextClassLoaderFactory getContextFactory() {
}

// for testing
static synchronized void resetContextFactoryForTests() {
public static synchronized void resetContextFactoryForTests() {
FACTORY = null;
}

Expand Down
18 changes: 13 additions & 5 deletions core/src/main/java/org/apache/accumulo/core/conf/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,12 @@ public enum Property {
+ " user-implementations of pluggable Accumulo features, such as the balancer"
+ " or volume chooser.",
"2.0.0"),
GENERAL_CACHE_MANAGER_IMPL("general.block.cache.manager.class",
"org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager", PropertyType.STRING,
"Specifies the class name of the block cache factory implementation."
+ " Alternative implementation is"
+ " org.apache.accumulo.core.file.blockfile.cache.tinylfu.TinyLfuBlockCacheManager.",
"2.1.4"),
GENERAL_DELEGATION_TOKEN_LIFETIME("general.delegation.token.lifetime", "7d",
PropertyType.TIMEDURATION,
"The length of time that delegation tokens and secret keys are valid.", "1.7.0"),
Expand Down Expand Up @@ -532,7 +538,9 @@ public enum Property {
"Time to wait for clients to continue scans before closing a session.", "1.3.5"),
TSERV_DEFAULT_BLOCKSIZE("tserver.default.blocksize", "1M", PropertyType.BYTES,
"Specifies a default blocksize for the tserver caches.", "1.3.5"),
TSERV_CACHE_MANAGER_IMPL("tserver.cache.manager.class",
@Deprecated(since = "2.1.4")
@ReplacedBy(property = Property.GENERAL_CACHE_MANAGER_IMPL)
TSERV_CACHE_MANAGER_IMPL("general.cache.manager.class",
"org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager", PropertyType.STRING,
"Specifies the class name of the block cache factory implementation."
+ " Alternative implementation is"
Expand Down Expand Up @@ -1873,8 +1881,9 @@ public static boolean isValidTablePropertyKey(String key) {
TSERV_MAX_MESSAGE_SIZE, GENERAL_MAX_MESSAGE_SIZE, RPC_MAX_MESSAGE_SIZE,

// block cache options
TSERV_CACHE_MANAGER_IMPL, TSERV_DATACACHE_SIZE, TSERV_INDEXCACHE_SIZE,
TSERV_SUMMARYCACHE_SIZE, SSERV_DATACACHE_SIZE, SSERV_INDEXCACHE_SIZE, SSERV_SUMMARYCACHE_SIZE,
GENERAL_CACHE_MANAGER_IMPL, TSERV_CACHE_MANAGER_IMPL, TSERV_DATACACHE_SIZE,
TSERV_INDEXCACHE_SIZE, TSERV_SUMMARYCACHE_SIZE, SSERV_DATACACHE_SIZE, SSERV_INDEXCACHE_SIZE,
SSERV_SUMMARYCACHE_SIZE,

// blocksize options
TSERV_DEFAULT_BLOCKSIZE, SSERV_DEFAULT_BLOCKSIZE,
Expand All @@ -1889,8 +1898,7 @@ public static boolean isValidTablePropertyKey(String key) {
COMPACTOR_MINTHREADS_TIMEOUT,

// others
TSERV_NATIVEMAP_ENABLED, TSERV_SCAN_MAX_OPENFILES, MANAGER_RECOVERY_WAL_EXISTENCE_CACHE_TIME,
TSERV_SESSION_MAXIDLE, TSERV_UPDATE_SESSION_MAXIDLE);
TSERV_NATIVEMAP_ENABLED, TSERV_SCAN_MAX_OPENFILES, MANAGER_RECOVERY_WAL_EXISTENCE_CACHE_TIME);

/**
* Checks if the given property may be changed via Zookeeper, but not recognized until the restart
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.commons.configuration2.MapConfiguration;
import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.configuration2.io.FileHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -222,8 +223,9 @@ private SiteConfiguration(Map<String,String> config) {
private static AbstractConfiguration getPropsFileConfig(URL accumuloPropsLocation) {
var config = new PropertiesConfiguration();
if (accumuloPropsLocation != null) {
var fileHandler = new FileHandler(config);
try (var reader = new InputStreamReader(accumuloPropsLocation.openStream(), UTF_8)) {
config.read(reader);
fileHandler.load(reader);
} catch (ConfigurationException | IOException e) {
throw new IllegalArgumentException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,12 @@ private static void flatten(String parentKey, String key, Object value,
(parentKey == null || parentKey.equals("")) ? "" : parentKey + addTheDot(parentKey);
if (value instanceof String) {
results.put(parent + key, (String) value);
return;
} else if (value instanceof List) {
((List<?>) value).forEach(l -> {
if (l instanceof String) {
// remove the [] at the ends of toString()
String val = value.toString();
results.put(parent + key, val.substring(1, val.length() - 1).replace(", ", " "));
return;
} else {
flatten(parent, key, l, results);
}
Expand All @@ -102,9 +100,8 @@ private static void flatten(String parentKey, String key, Object value,
map.forEach((k, v) -> flatten(parent + key, k, v, results));
} else if (value instanceof Number) {
results.put(parent + key, value.toString());
return;
} else {
throw new RuntimeException("Unhandled object type: " + value.getClass());
throw new IllegalStateException("Unhandled object type: " + value.getClass());
}
}

Expand All @@ -121,7 +118,7 @@ public static void outputShellVariables(Map<String,String> config, PrintStream o
out.printf(PROPERTY_FORMAT, section.toUpperCase() + "_HOSTS", config.get(section));
} else {
if (section.equals("manager") || section.equals("tserver")) {
throw new RuntimeException("Required configuration section is missing: " + section);
throw new IllegalStateException("Required configuration section is missing: " + section);
}
System.err.println("WARN: " + section + " is missing");
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public void startTransactionRunners(AccumuloConfiguration conf) {
// resize the pool if the property changed
ThreadPools.resizePool(pool, conf, Property.MANAGER_FATE_THREADPOOL_SIZE);
// If the pool grew, then ensure that there is a TransactionRunner for each thread
int needed = conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE) - pool.getQueue().size();
int needed = conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE) - pool.getActiveCount();
if (needed > 0) {
for (int i = 0; i < needed; i++) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ public void lock() {
return;
}
} catch (InterruptedException ex) {
// ignored
Thread.currentThread().interrupt();
log.warn("Interrupted while waiting to acquire lock", ex);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ public SortedMap<Long,byte[]> getEarlierEntries(long entry) {
for (String name : children) {
// this try catch must be done inside the loop because some subset of the children may exist
try {
byte[] data = zoo.getData(path + "/" + name);
long order = Long.parseLong(name.substring(PREFIX.length()));
if (order <= entry) {
byte[] data = zoo.getData(path + "/" + name);
result.put(order, data);
}
} catch (KeeperException.NoNodeException ex) {
Expand Down
Loading

0 comments on commit 1e360fb

Please sign in to comment.