Skip to content

Commit

Permalink
Partially revert 1414641 to resurrect 2.1 upgrade code
Browse files Browse the repository at this point in the history
  • Loading branch information
dlmarion committed Nov 30, 2023
1 parent c615abf commit 8908e53
Show file tree
Hide file tree
Showing 5 changed files with 474 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
public class AccumuloDataVersion {

/**
* version (12) reflects On-Demand tablets starting with 4.0
* version (13) reflects On-Demand tablets starting with 4.0
*/
public static final int ONDEMAND_TABLETS_FOR_VERSION_4 = 13;

Expand All @@ -52,11 +52,16 @@ public class AccumuloDataVersion {
*/
public static final int REMOVE_DEPRECATIONS_FOR_VERSION_3 = 11;

/**
* version (10) reflects changes to how root tablet metadata is serialized in zookeeper starting
* with 2.1. See {@link org.apache.accumulo.core.metadata.schema.RootTabletMetadata}.
*/
public static final int ROOT_TABLET_META_CHANGES = 10;

/**
* Historic data versions
*
* <ul>
* <li>version (10) Changes to how root tablet metadata is serialized in zookeeper in 2.1.0</li>
* <li>version (9) RFiles and wal crypto serialization changes. RFile summary data in 2.0.0</li>
* <li>version (8) RFile index (ACCUMULO-1124) and wal tracking in ZK in 1.8.0</li>
* <li>version (7) also reflects the addition of a replication table in 1.7.0
Expand All @@ -78,6 +83,7 @@ public static int get() {
}

// ELASTICITY_TODO get upgrade working
// public static final Set<Integer> CAN_RUN = Set.of(ROOT_TABLET_META_CHANGES, CURRENT_VERSION);
public static final Set<Integer> CAN_RUN = Set.of(CURRENT_VERSION);

/**
Expand All @@ -99,6 +105,8 @@ public static String oldestUpgradeableVersionName() {

private static String dataVersionToReleaseName(final int version) {
switch (version) {
case ROOT_TABLET_META_CHANGES:
return "2.1.0";
case REMOVE_DEPRECATIONS_FOR_VERSION_3:
return "3.0.0";
case METADATA_FILE_JSON_ENCODING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void testCanRun() {

final int oldestSupported = AccumuloDataVersion.ONDEMAND_TABLETS_FOR_VERSION_4;
// ELASTICITY_TODO basically disable check until upgrade to 3.1 is supported. Should be:
// final int oldestSupported = AccumuloDataVersion.METADATA_FILE_JSON_ENCODING;
// final int oldestSupported = AccumuloDataVersion.ROOT_TABLET_META_CHANGES;

final int currentVersion = AccumuloDataVersion.get();
IntConsumer shouldPass = ServerContext::ensureDataVersionCompatible;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.accumulo.manager.upgrade;

import static org.apache.accumulo.server.AccumuloDataVersion.REMOVE_DEPRECATIONS_FOR_VERSION_3;
import static org.apache.accumulo.server.AccumuloDataVersion.ROOT_TABLET_META_CHANGES;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -112,8 +112,8 @@ public boolean isParentLevelUpgraded(Ample.DataLevel level) {
private int currentVersion;
// map of "current version" -> upgrader to next version.
// Sorted so upgrades execute in order from the oldest supported data version to current
private final Map<Integer,Upgrader> upgraders = Collections.unmodifiableMap(
new TreeMap<>(Map.of(REMOVE_DEPRECATIONS_FOR_VERSION_3, new Upgrader11to12())));
private final Map<Integer,Upgrader> upgraders = Collections
.unmodifiableMap(new TreeMap<>(Map.of(ROOT_TABLET_META_CHANGES, new Upgrader10to11())));

private volatile UpgradeStatus status;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.manager.upgrade;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.core.Constants.ZTABLES;
import static org.apache.accumulo.core.Constants.ZTABLE_STATE;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.RESERVED_PREFIX;
import static org.apache.accumulo.server.util.MetadataTableUtil.EMPTY_TEXT;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.apache.accumulo.core.client.BatchDeleter;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.store.PropStore;
import org.apache.accumulo.server.conf.store.PropStoreKey;
import org.apache.accumulo.server.conf.store.TablePropKey;
import org.apache.hadoop.io.Text;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Upgrader10to11 implements Upgrader {

private static final Logger log = LoggerFactory.getLogger(Upgrader10to11.class);

// Included for upgrade code usage any other usage post 3.0 should not be used.
private static final TableId REPLICATION_ID = TableId.of("+rep");

private static final Range REP_TABLE_RANGE =
new Range(REPLICATION_ID.canonical() + ";", true, REPLICATION_ID.canonical() + "<", true);

// copied from MetadataSchema 2.1 (removed in 3.0)
private static final Range REP_WAL_RANGE =
new Range(RESERVED_PREFIX + "repl", true, RESERVED_PREFIX + "repm", false);

public Upgrader10to11() {
super();
}

@Override
public void upgradeZookeeper(final ServerContext context) {
log.info("upgrade of ZooKeeper entries");

var zrw = context.getZooReaderWriter();
var iid = context.getInstanceID();

// if the replication base path (../tables/+rep) assume removed or never existed.
if (!checkReplicationTableInZk(iid, zrw)) {
log.debug("replication table root node does not exist in ZooKeeper - nothing to do");
return;
}

// if the replication table is online - stop. There could be data in transit.
if (!checkReplicationOffline(iid, zrw)) {
throw new IllegalStateException(
"Replication table is not offline. Cannot continue with upgrade that will remove replication with replication active");
}

cleanMetaConfig(iid, context.getPropStore());

deleteReplicationTableZkEntries(zrw, iid);

}

@Override
public void upgradeRoot(final ServerContext context) {
log.info("upgrade root - skipping, nothing to do");
}

@Override
public void upgradeMetadata(final ServerContext context) {
log.info("upgrade metadata entries");
List<String> replTableFiles = readReplFilesFromMetadata(context);
deleteReplMetadataEntries(context);
deleteReplTableFiles(context, replTableFiles);
}

List<String> readReplFilesFromMetadata(final ServerContext context) {
List<String> results = new ArrayList<>();
try (Scanner scanner = context.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
scanner.setRange(REP_TABLE_RANGE);
for (Map.Entry<Key,Value> entry : scanner) {
String f = entry.getKey()
.getColumnQualifier(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME).toString();
results.add(f);
}
} catch (TableNotFoundException ex) {
throw new IllegalStateException("failed to read replication files from metadata", ex);
}
return results;
}

void deleteReplTableFiles(final ServerContext context, final List<String> replTableFiles) {
// short circuit if there are no files
if (replTableFiles.isEmpty()) {
return;
}
// write delete mutations
boolean haveFailures = false;
try (BatchWriter writer = context.createBatchWriter(MetadataTable.NAME)) {
for (String filename : replTableFiles) {
Mutation m = createDelMutation(filename);
log.debug("Adding delete marker for file: {}", filename);
writer.addMutation(m);
}
} catch (MutationsRejectedException ex) {
log.debug("Failed to write delete marker {}", ex.getMessage());
haveFailures = true;
} catch (TableNotFoundException ex) {
throw new IllegalStateException("failed to read replication files from metadata", ex);
}
if (haveFailures) {
throw new IllegalStateException(
"deletes rejected adding deletion marker for replication file entries, check log");
}
}

private Mutation createDelMutation(String path) {
Mutation delFlag = new Mutation(new Text(MetadataSchema.DeletesSection.encodeRow(path)));
delFlag.put(EMPTY_TEXT, EMPTY_TEXT, MetadataSchema.DeletesSection.SkewedKeyValue.NAME);
return delFlag;
}

/**
* remove +rep entries from metadata.
*/
private void deleteReplMetadataEntries(final ServerContext context) {
try (BatchDeleter deleter =
context.createBatchDeleter(MetadataTable.NAME, Authorizations.EMPTY, 10)) {
deleter.setRanges(List.of(REP_TABLE_RANGE, REP_WAL_RANGE));
deleter.delete();
} catch (TableNotFoundException | MutationsRejectedException ex) {
throw new IllegalStateException("failed to remove replication info from metadata table", ex);
}
}

private boolean checkReplicationTableInZk(final InstanceId iid, final ZooReaderWriter zrw) {
try {
String path = buildRepTablePath(iid);
return zrw.exists(path);
} catch (KeeperException ex) {
throw new IllegalStateException("ZooKeeper error - cannot determine replication table status",
ex);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IllegalStateException("interrupted reading replication state from ZooKeeper", ex);
}
}

/**
* To protect against removing replication information if replication is being used and possible
* active, check the replication table state in Zookeeper to see if it is ONLINE (active) or
* OFFLINE (inactive). If the state node does not exist, then the status is considered as OFFLINE.
*
* @return true if the replication table state is OFFLINE, false otherwise
*/
private boolean checkReplicationOffline(final InstanceId iid, final ZooReaderWriter zrw) {
try {
String path = buildRepTablePath(iid) + ZTABLE_STATE;
byte[] bytes = zrw.getData(path);
if (bytes != null && bytes.length > 0) {
String status = new String(bytes, UTF_8);
return TableState.OFFLINE.name().equals(status);
}
return false;
} catch (KeeperException ex) {
throw new IllegalStateException("ZooKeeper error - cannot determine replication table status",
ex);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IllegalStateException("interrupted reading replication state from ZooKeeper", ex);
}
}

/**
* Utility method to build the ZooKeeper replication table path. The path resolves to
* {@code /accumulo/INSTANCE_ID/tables/+rep}
*/
static String buildRepTablePath(final InstanceId iid) {
return ZooUtil.getRoot(iid) + ZTABLES + "/" + REPLICATION_ID.canonical();
}

private void deleteReplicationTableZkEntries(ZooReaderWriter zrw, InstanceId iid) {
String repTablePath = buildRepTablePath(iid);
try {
zrw.recursiveDelete(repTablePath, ZooUtil.NodeMissingPolicy.SKIP);
} catch (KeeperException ex) {
throw new IllegalStateException(
"ZooKeeper error - failed recursive deletion on " + repTablePath, ex);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IllegalStateException("interrupted deleting " + repTablePath + " from ZooKeeper",
ex);
}
}

private void cleanMetaConfig(final InstanceId iid, final PropStore propStore) {
PropStoreKey<TableId> metaKey = TablePropKey.of(iid, MetadataTable.ID);
var p = propStore.get(metaKey);
var props = p.asMap();
List<String> filtered = filterReplConfigKeys(props.keySet());
// add replication status formatter to remove list.
String v = props.get("table.formatter");
if (v != null && v.compareTo("org.apache.accumulo.server.replication.StatusFormatter") == 0) {
filtered.add("table.formatter");
}

if (filtered.size() > 0) {
log.trace("Upgrade filtering replication iterators for id: {}", metaKey);
propStore.removeProperties(metaKey, filtered);
}
}

/**
* Return a list of property keys that match replication iterator settings. This is specifically a
* narrow filter to avoid potential matches with user define or properties that contain
* replication in the property name (specifically table.file.replication which set hdfs block
* replication.)
*/
private List<String> filterReplConfigKeys(Set<String> keys) {
String REPL_ITERATOR_PATTERN = "^table\\.iterator\\.(majc|minc|scan)\\.replcombiner$";
String REPL_COLUMN_PATTERN =
"^table\\.iterator\\.(majc|minc|scan)\\.replcombiner\\.opt\\.columns$";

Pattern p = Pattern.compile("(" + REPL_ITERATOR_PATTERN + "|" + REPL_COLUMN_PATTERN + ")");

return keys.stream().filter(e -> p.matcher(e).find()).collect(Collectors.toList());
}
}
Loading

0 comments on commit 8908e53

Please sign in to comment.