Skip to content

Commit

Permalink
[refactor](hms)Refactor HiveTransactionMgr, Refactor the logic of rea…
Browse files Browse the repository at this point in the history
…ding hive acid tb.
  • Loading branch information
hubgeter committed Jan 9, 2025
1 parent 744691a commit cfdc438
Show file tree
Hide file tree
Showing 12 changed files with 390 additions and 176 deletions.
13 changes: 6 additions & 7 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@
import org.apache.doris.datasource.SplitSourceManager;
import org.apache.doris.datasource.es.EsExternalCatalog;
import org.apache.doris.datasource.es.EsRepository;
import org.apache.doris.datasource.hive.HiveTransactionMgr;
import org.apache.doris.datasource.hive.event.MetastoreEventsProcessor;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.deploy.DeployManager;
Expand Down Expand Up @@ -556,7 +555,7 @@ public class Env {

private FollowerColumnSender followerColumnSender;

private HiveTransactionMgr hiveTransactionMgr;
private QueryCallBackMgr queryCallBackMgr;

private TopicPublisherThread topicPublisherThread;

Expand Down Expand Up @@ -810,7 +809,7 @@ public Env(boolean isCheckpointCatalog) {
this.admissionControl = new AdmissionControl(systemInfo);
this.queryStats = new QueryStats();
this.loadManagerAdapter = new LoadManagerAdapter();
this.hiveTransactionMgr = new HiveTransactionMgr();
this.queryCallBackMgr = new QueryCallBackMgr();
this.plsqlManager = new PlsqlManager();
this.binlogManager = new BinlogManager();
this.binlogGcer = new BinlogGcer();
Expand Down Expand Up @@ -990,12 +989,12 @@ public Checkpoint getCheckpointer() {
return checkpointer;
}

public HiveTransactionMgr getHiveTransactionMgr() {
return hiveTransactionMgr;
public QueryCallBackMgr getQueryCallBackMgr() {
return queryCallBackMgr;
}

public static HiveTransactionMgr getCurrentHiveTransactionMgr() {
return getCurrentEnv().getHiveTransactionMgr();
public static QueryCallBackMgr getCurrentQueryCallBackMgr() {
return getCurrentEnv().getQueryCallBackMgr();
}

public DNSCache getDnsCache() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog;

import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.Maps;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* This class is suitable for queries, especially for releasing resources after the query is completed.
* The resources generally need to be held from the plan to the end of the query.
*
* For example, when reading a Hive ACID table, you should first acquire a lock from the HMS.
* Releasing the lock on the HMS is divided into two parts:
* 1. The plan fails.
* 2. The plan is sent to the BE (backend), and the BE finishes reading.
*/
public class QueryCallBackMgr {
//query id => plan fail callback func.
// In the same query, different plan nodes may have different callback functions, so handle a List<Runnable>.
private Map<TUniqueId, List<Runnable>> planFailCallBackMap = Maps.newConcurrentMap();
//query id => query end callback func .(plan success).
private Map<TUniqueId, List<Runnable>> queryEndCallBackMap = Maps.newConcurrentMap();

public void registerPlanFailFunc(TUniqueId queryId, Runnable callback) {
planFailCallBackMap.computeIfAbsent(queryId,
k -> Collections.synchronizedList(new ArrayList<>())).add(callback);
}

public void registerQueryEndFunc(TUniqueId queryId, Runnable callback) {
queryEndCallBackMap.computeIfAbsent(queryId,
k -> Collections.synchronizedList(new ArrayList<>())).add(callback);
}

public void planFailCallback(TUniqueId queryId) {
List<Runnable> callbacks = planFailCallBackMap.remove(queryId);
if (callbacks != null) {
for (Runnable callback : callbacks) {
callback.run();
}
}
queryEndCallBackMap.remove(queryId);
}

public void queryEndCallback(TUniqueId queryId) {
List<Runnable> callbacks = queryEndCallBackMap.remove(queryId);
if (callbacks != null) {
for (Runnable callback : callbacks) {
callback.run();
}
}
planFailCallBackMap.remove(queryId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ public class HMSExternalCatalog extends ExternalCatalog {
//for "type" = "hms" , but is iceberg table.
private IcebergMetadataOps icebergMetadataOps;

//for read hms acid table.
private HiveAcidTransactionMgr hiveAcidTransactionMgr;

@VisibleForTesting
public HMSExternalCatalog() {
catalogProperty = new CatalogProperty(null, null);
Expand Down Expand Up @@ -187,6 +190,7 @@ protected void initLocalObjectsImpl() {
transactionManager = TransactionManagerFactory.createHiveTransactionManager(hiveOps, fileSystemProvider,
fileSystemExecutor);
metadataOps = hiveOps;
hiveAcidTransactionMgr = new HiveAcidTransactionMgr();
}

@Override
Expand Down Expand Up @@ -399,5 +403,9 @@ public IcebergMetadataOps getIcebergMetadataOps() {
}
return icebergMetadataOps;
}

public HiveAcidTransactionMgr getHiveAcidTransactionMgr() {
return hiveAcidTransactionMgr;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.hive;

import org.apache.doris.analysis.TableName;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.thrift.TUniqueId;

import java.util.List;
import java.util.Map;

/**
* HiveAcidTransaction is used to maintain the information of a transaction table during the query process.
* <p>
* Each HiveAcidTransaction is bound to a specific scanNode during the plan process.
* A plan may contain multiple HiveAcidTransaction, such as join.
*/
public class HiveAcidTransaction {
private final TUniqueId queryId;
private final long hmsTxnId;
private final String user;
private final HMSExternalTable hiveTable;

private Map<String, String> txnValidIds = null;

public HiveAcidTransaction(TUniqueId queryId, long hmsTxnId, String user, HMSExternalTable hiveTable) {
this.queryId = queryId;
this.hmsTxnId = hmsTxnId;
this.user = user;
this.hiveTable = hiveTable;
}

public Map<String, String> getValidWriteIds(HMSCachedClient client, List<String> partitionNames) {
if (txnValidIds == null) {
TableName tableName = new TableName(hiveTable.getCatalog().getName(), hiveTable.getDbName(),
hiveTable.getName());
client.acquireSharedLock(DebugUtil.printId(queryId), hmsTxnId, user, tableName, partitionNames, 5000);
txnValidIds = client.getValidWriteIds(tableName.getDb() + "." + tableName.getTbl(), hmsTxnId);
}
return txnValidIds;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.hive;

import org.apache.doris.catalog.Env;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Map;

/**
* HiveAcidTransactionMgr is responsible for starting and automatically committing HMS transactions.
* HiveAcidTransactionMgr should be at the catalog level.
* <p>
* beginQueryTransaction() starts the transaction and return HiveAcidTransaction.
* HiveAcidTransaction.getValidWriteIds() requests table locks.
* The scanNode needs to call beginQueryTransaction to obtain a HiveAcidTransaction.
* The scanNode calls the getValidWriteIds method from HiveAcidTransaction to retrieve transaction information.
* <p>
* Example expected behavior as follows:
* SQL1: catalog1.db.a JOIN catalog1.db.b
* scanNode1 (catalog1.db.a): beginQueryTransaction()
* scanNode2 (catalog1.db.b): beginQueryTransaction()
* Only one transaction will be started on HMS, table 'a' needs one lock, table 'b' needs one lock.
* <p>
* SQL2: catalog1.db.a JOIN catalog1.db.a
* scanNode1 (catalog1.db.a): beginQueryTransaction()
* scanNode2 (catalog1.db.a): beginQueryTransaction()
* Only one transaction will be started on HMS, table 'a' needs two locks (because locking needs to consider
* the partitioning of the query, after partition pruning).
* <p>
* SQL3: catalog1.db.a JOIN catalog2.db.a
* scanNode1 (catalog1.db.a): beginQueryTransaction()
* scanNode2 (catalog2.db.a): beginQueryTransaction()
* For two different catalogs, even if they end up connecting to the same HMS, we still consider table 'a'
* as two different tables. So two transactions will be started on HMS, and table 'a' needs two locks.
*/
public class HiveAcidTransactionMgr {
private static final Logger LOG = LogManager.getLogger(HiveAcidTransactionMgr.class);
// doris query id => hms transaction id.
private Map<TUniqueId, Long> queryIdToHmsTxnIdMap = Maps.newConcurrentMap();

public HiveAcidTransaction beginQueryTransaction(TUniqueId queryId, String user,
HMSExternalCatalog catalog, HMSExternalTable hiveTable) {
long hmsTxnId = queryIdToHmsTxnIdMap.computeIfAbsent(
queryId,
k -> {
long newHmsTxnId = catalog.getClient().openTxn(user);

// After the plan fails or the query ends, we need to commit the transaction to release the locks
// that were acquired during the plan execution.
Env.getCurrentQueryCallBackMgr().registerPlanFailFunc(queryId, () -> {
commitQueryTransaction(queryId, newHmsTxnId, catalog);
});

Env.getCurrentQueryCallBackMgr().registerQueryEndFunc(queryId, () -> {
commitQueryTransaction(queryId, newHmsTxnId, catalog);
});

return newHmsTxnId;
}
);
return new HiveAcidTransaction(queryId, hmsTxnId, user, hiveTable);
}

private void commitQueryTransaction(TUniqueId queryId, long hmsTxnId, HMSExternalCatalog catalog) {
queryIdToHmsTxnIdMap.remove(queryId);
try {
catalog.getClient().commitTxn(hmsTxnId);
} catch (Exception e) {
//The transaction may still exist on HMS. After a while, HMS will clear the transaction.
LOG.warn("catalog ={},queryId = {},hmsTxnId = {},commit hms transaction failed",
catalog.getName(), queryId, hmsTxnId, e);
}
}
}

This file was deleted.

Loading

0 comments on commit cfdc438

Please sign in to comment.