Skip to content

Commit

Permalink
update to hive-metastore 4
Browse files Browse the repository at this point in the history
  • Loading branch information
Abacn committed Jan 27, 2025
1 parent 301b5ae commit e21a71c
Show file tree
Hide file tree
Showing 5 changed files with 1,489 additions and 293 deletions.
2 changes: 1 addition & 1 deletion sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ dependencies {
// Hive catalog test dependencies
testImplementation project(path: ":sdks:java:io:iceberg:hive")
testImplementation "org.apache.iceberg:iceberg-common:$iceberg_version"
testImplementation ("org.apache.iceberg:iceberg-hive-metastore:$iceberg_version")
testImplementation ("org.apache.hive:hive-iceberg-catalog:$hive_version")
testImplementation ("org.apache.hive:hive-metastore:$hive_version")
testImplementation ("org.apache.hive:hive-standalone-metastore-server:$hive_version")
testImplementation "org.assertj:assertj-core:3.11.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,28 @@
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HMSHandler;
import org.apache.hadoop.hive.metastore.HMSHandlerProxyFactory;
import org.apache.hadoop.hive.metastore.IHMSHandler;
import org.apache.hadoop.hive.metastore.RetryingHMSHandler;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor;
import org.apache.hadoop.hive.metastore.api.GetTableRequest;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.iceberg.ClientPool;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.hive.HiveClientPool;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
Expand All @@ -58,7 +63,7 @@
* HiveMetastoreExtension} instead.
*
* <p>Copied over from <a
* href="https://github.com/apache/iceberg/blob/main/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java">Iceberg's
* href="https://github.com/apache/hive/blob/branch-4.0/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java">Iceberg's
* integration testing util</a>
*/
public class TestHiveMetastore {
Expand All @@ -76,10 +81,22 @@ public class TestHiveMetastore {

private static final DynMethods.StaticMethod GET_BASE_HMS_HANDLER =
DynMethods.builder("getProxy")
.impl(RetryingHMSHandler.class, Configuration.class, IHMSHandler.class, boolean.class)
.impl(RetryingHMSHandler.class, HiveConf.class, IHMSHandler.class, boolean.class)
.impl(HMSHandlerProxyFactory.class, Configuration.class, IHMSHandler.class, boolean.class)
.impl(HMSHandlerProxyFactory.class, HiveConf.class, IHMSHandler.class, boolean.class)
.buildStatic();

// Hive3 introduces background metastore tasks (MetastoreTaskThread) for performing various
// cleanup duties. These
// threads are scheduled and executed in a static thread pool
// (org.apache.hadoop.hive.metastore.ThreadPool).
// This thread pool is shut down normally as part of the JVM shutdown hook, but since we're
// creating and tearing down
// multiple metastore instances within the same JVM, we have to call this cleanup method manually,
// otherwise
// threads from our previous test suite will be stuck in the pool with stale config, and keep on
// being scheduled.
// This can lead to issues, e.g. accidental Persistence Manager closure by
// ScheduledQueryExecutionsMaintTask.
private static final DynMethods.StaticMethod METASTORE_THREADS_SHUTDOWN =
DynMethods.builder("shutdown")
.impl("org.apache.hadoop.hive.metastore.ThreadPool")
Expand All @@ -89,13 +106,15 @@ public class TestHiveMetastore {
// It's tricky to clear all static fields in an HMS instance in order to switch derby root dir.
// Therefore, we reuse the same derby root between tests and remove it after JVM exits.
private static final File HIVE_LOCAL_DIR;
private static final File HIVE_EXTERNAL_WAREHOUSE_DIR;
private static final String DERBY_PATH;

static {
try {
HIVE_LOCAL_DIR =
createTempDirectory("hive", asFileAttribute(fromString("rwxrwxrwx"))).toFile();
DERBY_PATH = HIVE_LOCAL_DIR + "/metastore_db";
DERBY_PATH = new File(HIVE_LOCAL_DIR, "metastore_db").getPath();
HIVE_EXTERNAL_WAREHOUSE_DIR = new File(HIVE_LOCAL_DIR, "external");
File derbyLogFile = new File(HIVE_LOCAL_DIR, "derby.log");
System.setProperty("derby.stream.error.file", derbyLogFile.getAbsolutePath());
setupMetastoreDB("jdbc:derby:" + DERBY_PATH + ";create=true");
Expand Down Expand Up @@ -127,9 +146,16 @@ public class TestHiveMetastore {
TestHiveMetastore(String hiveWarehousePath) {
this.hiveWarehousePath = hiveWarehousePath;
}
/**
* Starts a TestHiveMetastore with the default connection pool size (5) and the default HiveConf.
*/
public void start() {
start(new HiveConf(new Configuration(), TestHiveMetastore.class), DEFAULT_POOL_SIZE);
}

/**
* Starts a TestHiveMetastore with the default connection pool size with the provided HiveConf.
* Starts a TestHiveMetastore with the default connection pool size (5) with the provided
* HiveConf.
*
* @param conf The hive configuration to use
*/
Expand All @@ -143,7 +169,6 @@ public void start(HiveConf conf) {
* @param conf The hive configuration to use
* @param poolSize The number of threads in the executor pool
*/
@SuppressWarnings("FutureReturnValueIgnored")
public void start(HiveConf conf, int poolSize) {
try {
TServerSocket socket = new TServerSocket(0);
Expand All @@ -153,7 +178,14 @@ public void start(HiveConf conf, int poolSize) {
this.hiveConf = conf;
this.server = newThriftServer(socket, poolSize, hiveConf);
this.executorService = Executors.newSingleThreadExecutor();
this.executorService.submit(() -> server.serve());
Future<?> ignored = this.executorService.submit(() -> server.serve());

// in Hive3, setting this as a system prop ensures that it will be picked up whenever a new
// HiveConf is created
System.setProperty(
HiveConf.ConfVars.METASTORE_URIS.varname,
hiveConf.getVar(HiveConf.ConfVars.METASTORE_URIS));

this.clientPool = new HiveClientPool(1, hiveConf);
} catch (Exception e) {
throw new RuntimeException("Cannot start TestHiveMetastore", e);
Expand All @@ -169,13 +201,7 @@ public void stop() throws Exception {
server.stop();
}
if (executorService != null) {
executorService.shutdownNow();
try {
// Give it a reasonable timeout
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
executorService.shutdown();
}
if (baseHandler != null) {
baseHandler.shutdown();
Expand Down Expand Up @@ -215,9 +241,6 @@ public void reset() throws Exception {

Path warehouseRoot = new Path(hiveWarehousePath);
FileSystem fs = Util.getFs(warehouseRoot, hiveConf);
if (!fs.exists(warehouseRoot)) {
return;
}
for (FileStatus fileStatus : fs.listStatus(warehouseRoot)) {
if (!fileStatus.getPath().getName().equals("derby.log")
&& !fileStatus.getPath().getName().equals("metastore_db")) {
Expand All @@ -226,6 +249,19 @@ public void reset() throws Exception {
}
}

public Table getTable(String dbName, String tableName) throws TException, InterruptedException {
return clientPool.run(client -> client.getTable(new GetTableRequest(dbName, tableName)));
}

public Table getTable(TableIdentifier identifier) throws TException, InterruptedException {
return getTable(identifier.namespace().toString(), identifier.name());
}

public <R> R run(ClientPool.Action<R, IMetaStoreClient, TException> action)
throws InterruptedException, TException {
return clientPool.run(action, false);
}

private TServer newThriftServer(TServerSocket socket, int poolSize, HiveConf conf)
throws Exception {
HiveConf serverConf = new HiveConf(conf);
Expand All @@ -249,20 +285,24 @@ private TServer newThriftServer(TServerSocket socket, int poolSize, HiveConf con
private void initConf(HiveConf conf, int port) {
conf.set(HiveConf.ConfVars.METASTORE_URIS.varname, "thrift://localhost:" + port);
conf.set(HiveConf.ConfVars.METASTORE_WAREHOUSE.varname, hiveWarehousePath);
conf.set(
HiveConf.ConfVars.HIVE_METASTORE_WAREHOUSE_EXTERNAL.varname,
"file:" + HIVE_EXTERNAL_WAREHOUSE_DIR.getAbsolutePath());
conf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "false");
conf.set(HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES.varname, "false");
conf.set("iceberg.hive.client-pool-size", "2");
// Setting this to avoid thrift exception during running Iceberg tests outside Iceberg.
conf.set(
HiveConf.ConfVars.HIVE_IN_TEST.varname, HiveConf.ConfVars.HIVE_IN_TEST.getDefaultValue());
// set to false so that TxnManager#checkLock does not throw exception when using UNSET data type
// operation
// in the requested lock component
conf.setBoolVar(HiveConf.ConfVars.HIVE_IN_TEST, false);
}

private static void setupMetastoreDB(String dbURL) throws SQLException, IOException {
private static void setupMetastoreDB(String dbURL) throws Exception {
Connection connection = DriverManager.getConnection(dbURL);
ScriptRunner scriptRunner = new ScriptRunner(connection, true, true);

ClassLoader classLoader = ClassLoader.getSystemClassLoader();
InputStream inputStream = classLoader.getResourceAsStream("hive-schema-3.1.0.derby.sql");
InputStream inputStream = classLoader.getResourceAsStream("hive-schema-4.0.0.derby.sql");
try (Reader reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8)) {
scriptRunner.runScript(reader);
}
Expand Down
Loading

0 comments on commit e21a71c

Please sign in to comment.