Skip to content

Commit

Permalink
Merge branch 'elasticity' into 3991-add-resource-group-tag
Browse files Browse the repository at this point in the history
  • Loading branch information
dlmarion committed Nov 30, 2023
2 parents a724224 + 4346e74 commit c0c790a
Show file tree
Hide file tree
Showing 29 changed files with 740 additions and 408 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ private static Predicate<String> boundedUnits(final long lowerBound, final long
|| (suffixCheck.test(x) && new Bounds(lowerBound, upperBound).test(stripUnits.apply(x)));
}

private static final Pattern SUFFIX_REGEX = Pattern.compile("[^\\d]*$");
private static final Pattern SUFFIX_REGEX = Pattern.compile("\\D*$"); // match non-digits at end
private static final Function<String,String> stripUnits =
x -> x == null ? null : SUFFIX_REGEX.matcher(x.trim()).replaceAll("");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,10 @@ interface TabletUpdates<T> {
* is not empty
*/
T deleteAll(Set<Key> keys);

T setMerged();

T deleteMerged();
}

interface TabletMutator extends TabletUpdates<TabletMutator> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,16 @@ public static class LogColumnFamily {
public static final Text NAME = new Text(STR_NAME);
}

/**
* Column family for indicating that the files in a tablet have been trimmed to only include
* data for the current tablet, so that they are safe to merge
*/
public static class ChoppedColumnFamily {
// kept to support upgrades to 3.1; name is used for both col fam and col qual
@Deprecated(since = "3.1.0")
public static final Text NAME = new Text("chopped");
}

public static class ExternalCompactionColumnFamily {
public static final String STR_NAME = "ecomp";
public static final Text NAME = new Text(STR_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,9 @@
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is used to serialize and deserialize root tablet metadata using GSon. The only data
Expand All @@ -52,19 +49,14 @@
*/
public class RootTabletMetadata {

private static final Logger log = LoggerFactory.getLogger(RootTabletMetadata.class);
private static final CharsetDecoder UTF8_error_detecting_decoder = UTF_8.newDecoder();
private static final Predicate<Entry<String,TreeMap<String,String>>> isLocationCF = e -> {
String fam = e.getKey();
return fam.equals(CurrentLocationColumnFamily.STR_NAME)
|| fam.equals(FutureLocationColumnFamily.STR_NAME);
};

// JSON Mapping Version 1. Released with Accumulo version 2.1.0
private static final int VERSION_1 = 1;
// JSON Mapping Version 2. Released with Accumulo version 3,1
private static final int VERSION_2 = 2;
private static final int VERSION = VERSION_2;
private static final int VERSION = 1;

// This class is used to serialize and deserialize root tablet metadata using GSon. Any changes to
// this class must consider persisted data.
Expand Down Expand Up @@ -127,40 +119,6 @@ public RootTabletMetadata() {
data = new Data(VERSION, new TreeMap<>());
}

public static RootTabletMetadata upgrade(final String json) {
Data data = GSON.get().fromJson(json, Data.class);
int currVersion = data.getVersion();
switch (currVersion) {
case VERSION_1:
RootTabletMetadata rtm = new RootTabletMetadata();
Mutation m = convert1To2(data);
rtm.update(m);
return rtm;
case VERSION_2:
log.debug("no metadata version conversion required for {}", currVersion);
return new RootTabletMetadata(data);
default:
throw new IllegalArgumentException("Unsupported data version: " + currVersion);
}
}

private static Mutation convert1To2(final Data data) {
Mutation mutation =
MetadataSchema.TabletsSection.TabletColumnFamily.createPrevRowMutation(RootTable.EXTENT);
data.columnValues.forEach((colFam, colQuals) -> {
if (colFam.equals(MetadataSchema.TabletsSection.DataFileColumnFamily.STR_NAME)) {
colQuals.forEach((colQual, value) -> {
mutation.put(colFam, StoredTabletFile.serialize(colQual), value);
});
} else {
colQuals.forEach((colQual, value) -> {
mutation.put(colFam, colQual, value);
});
}
});
return mutation;
}

/**
* Apply a metadata table mutation to update internal entries.
*/
Expand Down Expand Up @@ -195,6 +153,15 @@ public void update(Mutation m) {
}
}

public Stream<SimpleImmutableEntry<Key,Value>> getKeyValues() {
String row = RootTable.EXTENT.toMetaRow().toString();
return data.columnValues.entrySet().stream()
.flatMap(famToQualVal -> famToQualVal.getValue().entrySet().stream()
.map(qualVal -> new SimpleImmutableEntry<>(
new Key(row, famToQualVal.getKey(), qualVal.getKey(), 1),
new Value(qualVal.getValue()))));
}

public SortedMap<Key,Value> toKeyValues() {
TreeMap<Key,Value> metamap = new TreeMap<>();
getKeyValues().forEach(e -> metamap.put(e.getKey(), e.getValue()));
Expand All @@ -205,21 +172,11 @@ public SortedMap<Key,Value> toKeyValues() {
* Convert this class to a {@link TabletMetadata}
*/
public TabletMetadata toTabletMetadata() {
Stream<SimpleImmutableEntry<Key,Value>> entries = getKeyValues();
return TabletMetadata.convertRow(entries.iterator(),
// use a stream so we don't have to re-sort in a new TreeMap<Key,Value> structure
return TabletMetadata.convertRow(getKeyValues().iterator(),
EnumSet.allOf(TabletMetadata.ColumnType.class), false, false);
}

private Stream<SimpleImmutableEntry<Key,Value>> getKeyValues() {
String row = RootTable.EXTENT.toMetaRow().toString();
Stream<SimpleImmutableEntry<Key,Value>> entries = data.columnValues.entrySet().stream()
.flatMap(famToQualVal -> famToQualVal.getValue().entrySet().stream()
.map(qualVal -> new SimpleImmutableEntry<>(
new Key(row, famToQualVal.getKey(), qualVal.getKey(), 1),
new Value(qualVal.getValue()))));
return entries;
}

public static boolean needsUpgrade(final String json) {
return Data.needsUpgrade(json);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGED;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SCANS;
Expand Down Expand Up @@ -273,6 +274,18 @@ public TabletMetadataBuilder deleteAll(Set<Key> keys) {
throw new UnsupportedOperationException();
}

@Override
public TabletMetadataBuilder setMerged() {
fetched.add(MERGED);
internalBuilder.setMerged();
return this;
}

@Override
public TabletMetadataBuilder deleteMerged() {
throw new UnsupportedOperationException();
}

/**
* @param extraFetched Anything that was put on the builder will automatically be added to the
* fetched set. However, for the case where something was not put and it needs to be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
Expand Down Expand Up @@ -329,6 +330,18 @@ public T deleteAll(Set<Key> keys) {
return getThis();
}

@Override
public T setMerged() {
MergedColumnFamily.MERGED_COLUMN.put(mutation, MergedColumnFamily.MERGED_VALUE);
return getThis();
}

@Override
public T deleteMerged() {
MergedColumnFamily.MERGED_COLUMN.putDelete(mutation);
return getThis();
}

public void setCloseAfterMutate(AutoCloseable closeable) {
this.closeAfterMutate = closeable;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,21 @@ public class MetricsUtil {
private static Pattern camelCasePattern = Pattern.compile("[a-z][A-Z][a-z]");

public static void initializeMetrics(final AccumuloConfiguration conf, final String appName,
final HostAndPort address, final String resourceGroup) throws ClassNotFoundException,
InstantiationException, IllegalAccessException, IllegalArgumentException,
InvocationTargetException, NoSuchMethodException, SecurityException {
final HostAndPort address, final String instanceName, final String resourceGroup)
throws ClassNotFoundException, InstantiationException, IllegalAccessException,
IllegalArgumentException, InvocationTargetException, NoSuchMethodException,
SecurityException {
initializeMetrics(conf.getBoolean(Property.GENERAL_MICROMETER_ENABLED),
conf.getBoolean(Property.GENERAL_MICROMETER_JVM_METRICS_ENABLED),
conf.get(Property.GENERAL_MICROMETER_FACTORY), appName, address, resourceGroup);
conf.get(Property.GENERAL_MICROMETER_FACTORY), appName, address, instanceName,
resourceGroup);
}

private static void initializeMetrics(boolean enabled, boolean jvmMetricsEnabled,
String factoryClass, String appName, HostAndPort address, String resourceGroup)
throws ClassNotFoundException, InstantiationException, IllegalAccessException,
IllegalArgumentException, InvocationTargetException, NoSuchMethodException,
SecurityException {
String factoryClass, String appName, HostAndPort address, String instanceName,
String resourceGroup) throws ClassNotFoundException, InstantiationException,
IllegalAccessException, IllegalArgumentException, InvocationTargetException,
NoSuchMethodException, SecurityException {

LOG.info("initializing metrics, enabled:{}, class:{}", enabled, factoryClass);

Expand All @@ -78,6 +80,7 @@ private static void initializeMetrics(boolean enabled, boolean jvmMetricsEnabled
}

List<Tag> tags = new ArrayList<>();
tags.add(Tag.of("instance.name", instanceName));
tags.add(Tag.of("process.name", processName));
tags.add(Tag.of("resource.group", resourceGroup));
if (address != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ private static void validateFilePath(String filePath) {
String uuidPart = parts[parts.length - 1];

try {
var ignored = HostAndPort.fromString(tserverPart);
HostAndPort.fromString(tserverPart);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
"Invalid tserver format in filePath. Expected format: host:port. Found '" + tserverPart
+ "'");
}

try {
var ignored = UUID.fromString(uuidPart);
UUID.fromString(uuidPart);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Expected valid UUID. Found '" + uuidPart + "'");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -51,21 +50,12 @@ public class CompactionServicesConfig {
@SuppressWarnings("removal")
private static final Property oldPrefix = Property.TSERV_COMPACTION_SERVICE_PREFIX;
private static final Property newPrefix = Property.COMPACTION_SERVICE_PREFIX;
public static final CompactionServiceId DEFAULT_SERVICE = CompactionServiceId.of("default");

private static Map<String,Map<String,String>>
stripPropPrefix(PluginEnvironment.Configuration conf) {
@SuppressWarnings("removal")
String prefix = Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey();
var confMap = new HashMap<String,String>();
conf.getWithPrefix(prefix).forEach((prop, val) -> {
var suffix = prop.substring(prefix.length());
confMap.put(suffix, val);
});
return Map.of(prefix, confMap);

private interface ConfigIndirection {
Map<String,String> getAllPropertiesWithPrefixStripped(Property p);
}

private static Map<String,Map<String,String>> getConfiguration(AccumuloConfiguration aconf) {
private static Map<String,Map<String,String>> getConfiguration(ConfigIndirection aconf) {
Map<String,Map<String,String>> properties = new HashMap<>();

var newProps = aconf.getAllPropertiesWithPrefixStripped(newPrefix);
Expand Down Expand Up @@ -95,11 +85,16 @@ private static Map<String,Map<String,String>> getConfiguration(AccumuloConfigura

public CompactionServicesConfig(PluginEnvironment.Configuration conf) {
// TODO will probably not need rate limit eventually and the 2nd param predicate can go away
this(stripPropPrefix(conf), property -> conf.isSet(property.getKey()));
this(getConfiguration(prefix -> {
var props = conf.getWithPrefix(prefix.getKey());
Map<String,String> stripped = new HashMap<>();
props.forEach((k, v) -> stripped.put(k.substring(prefix.getKey().length()), v));
return stripped;
}), property -> conf.isSet(property.getKey()));
}

public CompactionServicesConfig(AccumuloConfiguration aconf) {
this(getConfiguration(aconf), aconf::isPropertySet);
this(getConfiguration(aconf::getAllPropertiesWithPrefixStripped), aconf::isPropertySet);
}

@SuppressWarnings("removal")
Expand Down
Loading

0 comments on commit c0c790a

Please sign in to comment.