Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Nov 7, 2024
1 parent f0e928a commit f604a02
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,16 @@ private void addPartition(BinaryRow partition) {

private void addPartitions(Set<BinaryRow> partitions) {
try {
List<BinaryRow> filteredPartitions = new ArrayList<>();
List<BinaryRow> newPartitions = new ArrayList<>();
for (BinaryRow partition : partitions) {
if (!cache.get(partition, () -> false)) {
filteredPartitions.add(partition);
newPartitions.add(partition);
}
}
client.addPartitions(filteredPartitions);
filteredPartitions.forEach(partition -> cache.put(partition, true));
if (!newPartitions.isEmpty()) {
client.addPartitions(newPartitions);
newPartitions.forEach(partition -> cache.put(partition, true));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,49 +103,20 @@ public void addPartition(LinkedHashMap<String, String> partitionSpec) throws Exc
// do nothing if the partition already exists
} catch (NoSuchObjectException e) {
// partition not found, create new partition
StorageDescriptor newSd = new StorageDescriptor(sd);
newSd.setLocation(
sd.getLocation()
+ "/"
+ PartitionPathUtils.generatePartitionPath(partitionSpec));

Partition hivePartition = new Partition();
hivePartition.setDbName(identifier.getDatabaseName());
hivePartition.setTableName(identifier.getTableName());
hivePartition.setValues(partitionValues);
hivePartition.setSd(newSd);
int currentTime = (int) (System.currentTimeMillis() / 1000);
hivePartition.setCreateTime(currentTime);
hivePartition.setLastAccessTime(currentTime);

Partition hivePartition =
toHivePartition(partitionSpec, (int) (System.currentTimeMillis() / 1000));
clients.execute(client -> client.add_partition(hivePartition));
}
}

@Override
public void addPartitionsSpec(List<LinkedHashMap<String, String>> partitionSpecsList)
throws Exception {
String databaseName = identifier.getDatabaseName();
String tableName = identifier.getTableName();
ArrayList<Partition> hivePartitions = new ArrayList<>();
int currentTime = (int) (System.currentTimeMillis() / 1000);
for (LinkedHashMap<String, String> partitionSpec : partitionSpecsList) {
List<String> partitionValues = new ArrayList<>(partitionSpec.values());
StorageDescriptor newSd = new StorageDescriptor(sd);
newSd.setLocation(
sd.getLocation()
+ "/"
+ PartitionPathUtils.generatePartitionPath(partitionSpec));
Partition hivePartition = new Partition();
hivePartition.setDbName(databaseName);
hivePartition.setTableName(tableName);
hivePartition.setValues(partitionValues);
hivePartition.setSd(newSd);
hivePartition.setCreateTime(currentTime);
hivePartition.setLastAccessTime(currentTime);
hivePartitions.add(hivePartition);
}

List<Partition> hivePartitions =
partitionSpecsList.stream()
.map(partitionSpec -> toHivePartition(partitionSpec, currentTime))
.collect(Collectors.toList());
clients.execute(client -> client.add_partitions(hivePartitions, true, false));
}

Expand Down Expand Up @@ -215,6 +186,21 @@ public IMetaStoreClient client() throws TException, InterruptedException {
return clients.run(client -> client);
}

private Partition toHivePartition(
LinkedHashMap<String, String> partitionSpec, int currentTime) {
Partition hivePartition = new Partition();
StorageDescriptor newSd = new StorageDescriptor(sd);
newSd.setLocation(
sd.getLocation() + "/" + PartitionPathUtils.generatePartitionPath(partitionSpec));
hivePartition.setDbName(identifier.getDatabaseName());
hivePartition.setTableName(identifier.getTableName());
hivePartition.setValues(new ArrayList<>(partitionSpec.values()));
hivePartition.setSd(newSd);
hivePartition.setCreateTime(currentTime);
hivePartition.setLastAccessTime(currentTime);
return hivePartition;
}

/** Factory to create {@link HiveMetastoreClient}. */
public static class Factory implements MetastoreClient.Factory {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@

package org.apache.paimon.spark

import org.apache.paimon.catalog.{Catalog, CatalogContext, CatalogFactory, Identifier}
import org.apache.paimon.options.{CatalogOptions, Options}
import org.apache.paimon.spark.catalog.Catalogs
import org.apache.paimon.catalog.{Catalog, Identifier}
import org.apache.paimon.spark.catalog.WithPaimonCatalog
import org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
import org.apache.paimon.spark.sql.{SparkVersionSupport, WithTableOptions}
import org.apache.paimon.table.FileStoreTable
Expand All @@ -36,7 +35,6 @@ import org.scalactic.source.Position
import org.scalatest.Tag

import java.io.File
import java.util.{HashMap => JHashMap}
import java.util.TimeZone

import scala.util.Random
Expand All @@ -49,7 +47,9 @@ class PaimonSparkTestBase

protected lazy val tempDBDir: File = Utils.createTempDir

protected lazy val catalog: Catalog = initCatalog()
protected def paimonCatalog: Catalog = {
spark.sessionState.catalogManager.currentCatalog.asInstanceOf[WithPaimonCatalog].paimonCatalog()
}

protected val dbName0: String = "test"

Expand Down Expand Up @@ -122,18 +122,12 @@ class PaimonSparkTestBase
super.test(testName, testTags: _*)(testFun)(pos)
}

private def initCatalog(): Catalog = {
val currentCatalog = spark.sessionState.catalogManager.currentCatalog.name()
val options =
new JHashMap[String, String](Catalogs.catalogOptions(currentCatalog, spark.sessionState.conf))
options.put(CatalogOptions.CACHE_ENABLED.key(), "false")
val catalogContext =
CatalogContext.create(Options.fromMap(options), spark.sessionState.newHadoopConf())
CatalogFactory.createCatalog(catalogContext)
def loadTable(tableName: String): FileStoreTable = {
loadTable(dbName0, tableName)
}

def loadTable(tableName: String): FileStoreTable = {
catalog.getTable(Identifier.create(dbName0, tableName)).asInstanceOf[FileStoreTable]
def loadTable(dbName: String, tableName: String): FileStoreTable = {
paimonCatalog.getTable(Identifier.create(dbName, tableName)).asInstanceOf[FileStoreTable]
}

protected def createRelationV2(tableName: String): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
.column("ts", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
.column("ts_ntz", DataTypes.TIMESTAMP())
.build
catalog.createTable(identifier, schema, false)
paimonCatalog.createTable(identifier, schema, false)
sql(
s"INSERT INTO paimon_tbl VALUES (timestamp'2024-01-01 00:00:00', timestamp_ntz'2024-01-01 00:00:00')")

Expand All @@ -370,7 +370,7 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
// Due to previous design, read timestamp ltz type with spark 3.3 and below will cause problems,
// skip testing it
if (gteqSpark3_4) {
val table = catalog.getTable(identifier)
val table = paimonCatalog.getTable(identifier)
val builder = table.newReadBuilder.withProjection(Array[Int](0, 1))
val splits = builder.newScan().plan().splits()
builder.newRead
Expand Down Expand Up @@ -405,7 +405,7 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
// Due to previous design, read timestamp ltz type with spark 3.3 and below will cause problems,
// skip testing it
if (gteqSpark3_4) {
val table = catalog.getTable(identifier)
val table = paimonCatalog.getTable(identifier)
val builder = table.newReadBuilder.withProjection(Array[Int](0, 1))
val splits = builder.newScan().plan().splits()
builder.newRead
Expand All @@ -423,7 +423,7 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
}
}
} finally {
catalog.dropTable(identifier, true)
paimonCatalog.dropTable(identifier, true)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.spark.sql

import org.apache.paimon.hive.HiveMetastoreClient
import org.apache.paimon.spark.PaimonHiveTestBase
import org.apache.paimon.table.FileStoreTable

Expand Down Expand Up @@ -251,6 +252,46 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase {
}
}

test("Paimon DDL with hive catalog: sync partitions to HMS") {
Seq(sparkCatalogName, paimonHiveCatalogName).foreach {
catalogName =>
val dbName = "default"
val tblName = "t"
sql(s"USE $catalogName.$dbName")
withTable(tblName) {
sql("""
|CREATE TABLE t (id INT, pt INT)
|USING PAIMON
|TBLPROPERTIES ('metastore.partitioned-table' = 'true')
|PARTITIONED BY (pt)
|""".stripMargin)

val metastoreClient = loadTable(dbName, tblName)
.catalogEnvironment()
.metastoreClientFactory()
.create()
.asInstanceOf[HiveMetastoreClient]
.client()

sql("INSERT INTO t VALUES (1, 1), (2, 2), (3, 3)")
// check partitions in paimon
checkAnswer(sql("show partitions t"), Seq(Row("pt=1"), Row("pt=2"), Row("pt=3")))
// check partitions in HMS
assert(metastoreClient.listPartitions(dbName, tblName, 100).size() == 3)

sql("INSERT INTO t VALUES (4, 3), (5, 4)")
checkAnswer(
sql("show partitions t"),
Seq(Row("pt=1"), Row("pt=2"), Row("pt=3"), Row("pt=4")))
assert(metastoreClient.listPartitions(dbName, tblName, 100).size() == 4)

sql("ALTER TABLE t DROP PARTITION (pt=1)")
checkAnswer(sql("show partitions t"), Seq(Row("pt=2"), Row("pt=3"), Row("pt=4")))
assert(metastoreClient.listPartitions(dbName, tblName, 100).size() == 3)
}
}
}

def getDatabaseLocation(dbName: String): String = {
spark
.sql(s"DESC DATABASE $dbName")
Expand Down

0 comments on commit f604a02

Please sign in to comment.