Skip to content

Commit b65c8e1

Browse files
committed
[feature](hive)support hive4 acid
1 parent 88a6268 commit b65c8e1

File tree

8 files changed

+320
-65
lines changed

8 files changed

+320
-65
lines changed

be/src/vec/exec/format/table/transactional_hive_reader.cpp

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "transactional_hive_common.h"
2222
#include "vec/data_types/data_type_factory.hpp"
2323
#include "vec/exec/format/orc/vorc_reader.h"
24+
#include <re2/re2.h>
2425

2526
namespace doris {
2627

@@ -108,15 +109,39 @@ Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range,
108109
int64_t num_delete_files = 0;
109110
std::filesystem::path file_path(data_file_path);
110111

112+
113+
//See https://github.com/apache/hive/commit/ffee30e6267e85f00a22767262192abb9681cfb7#diff-5fe26c36b4e029dcd344fc5d484e7347R165
114+
// bucket_xxx_attemptId => bucket_xxx
115+
// bucket_xxx => bucket_xxx
116+
auto remove_bucket_attemptId = [](const std::string& str) {
117+
re2::RE2 pattern("^bucket_\\d+_\\d+$");
118+
119+
if (re2::RE2::FullMatch(str, pattern)) {
120+
size_t pos = str.rfind('_');
121+
if (pos != std::string::npos) {
122+
return str.substr(0, pos);
123+
}
124+
}
125+
return str;
126+
};
127+
128+
111129
SCOPED_TIMER(_transactional_orc_profile.delete_files_read_time);
112130
for (auto& delete_delta : range.table_format_params.transactional_hive_params.delete_deltas) {
113131
const std::string file_name = file_path.filename().string();
114-
auto iter = std::find(delete_delta.file_names.begin(), delete_delta.file_names.end(),
115-
file_name);
132+
133+
//need opt.
134+
std::vector<std::string> delete_delta_file_names;
135+
for (const auto& x : delete_delta.file_names){
136+
delete_delta_file_names.emplace_back(remove_bucket_attemptId(x));
137+
}
138+
auto iter = std::find(delete_delta_file_names.begin(), delete_delta_file_names.end(),
139+
remove_bucket_attemptId(file_name));
116140
if (iter == delete_delta.file_names.end()) {
117141
continue;
118142
}
119-
auto delete_file = fmt::format("{}/{}", delete_delta.directory_location, file_name);
143+
auto delete_file = fmt::format("{}/{}", delete_delta.directory_location,
144+
delete_delta.file_names[iter-delete_delta_file_names.begin()]);
120145

121146
TFileRangeDesc delete_range;
122147
// must use __set() method to make sure __isset is true

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.doris.datasource.TableMetadata;
2424
import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
2525

26+
import org.apache.hadoop.hive.common.ValidTxnList;
2627
import org.apache.hadoop.hive.common.ValidWriteIdList;
2728
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
2829
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
@@ -84,6 +85,8 @@ NotificationEventResponse getNextNotification(long lastEventId,
8485

8586
ValidWriteIdList getValidWriteIds(String fullTableName, long currentTransactionId);
8687

88+
ValidTxnList getValidTxns();
89+
8790
void acquireSharedLock(String queryId, long txnId, String user, TableName tblName,
8891
List<String> partitionNames, long timeoutMs);
8992

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java

Lines changed: 251 additions & 60 deletions
Large diffs are not rendered by default.

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.doris.common.UserException;
2222

2323
import com.google.common.collect.Lists;
24+
import org.apache.hadoop.hive.common.ValidTxnList;
2425
import org.apache.hadoop.hive.common.ValidWriteIdList;
2526

2627
import java.util.List;
@@ -61,6 +62,10 @@ public boolean isFullAcid() {
6162
return isFullAcid;
6263
}
6364

65+
public ValidTxnList getValidTxns(HMSCachedClient client) {
66+
return client.getValidTxns();
67+
}
68+
6469
public ValidWriteIdList getValidWriteIds(HMSCachedClient client) {
6570
if (validWriteIdList == null) {
6671
TableName tableName = new TableName(hiveTable.getCatalog().getName(), hiveTable.getDbName(),

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.google.common.collect.ImmutableList.Builder;
3333
import com.google.common.collect.ImmutableMap;
3434
import org.apache.commons.lang3.NotImplementedException;
35+
import org.apache.hadoop.hive.common.ValidTxnList;
3536
import org.apache.hadoop.hive.common.ValidWriteIdList;
3637
import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
3738
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
@@ -527,6 +528,11 @@ public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTrans
527528
throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient.");
528529
}
529530

531+
@Override
532+
public ValidTxnList getValidTxns() {
533+
throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient.");
534+
}
535+
530536
@Override
531537
public void acquireSharedLock(String queryId, long txnId, String user, TableName tblName,
532538
List<String> partitionNames, long timeoutMs) {

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,21 @@ public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTrans
595595
}
596596
}
597597

598+
@Override
599+
public ValidTxnList getValidTxns() {
600+
try (ThriftHMSClient client = getClient()) {
601+
try {
602+
return ugiDoAs(client.client::getValidTxns);
603+
} catch (Exception e) {
604+
client.setThrowable(e);
605+
throw e;
606+
}
607+
} catch (Exception e) {
608+
throw new HMSClientException("Catalog Get the transactions that "
609+
+ "are currently valid fail. Exception = {}", e);
610+
}
611+
}
612+
598613
private LockResponse checkLock(long lockId) {
599614
try (ThriftHMSClient client = getClient()) {
600615
try {

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import com.google.common.collect.Lists;
5656
import com.google.common.collect.Maps;
5757
import lombok.Setter;
58+
import org.apache.hadoop.hive.common.ValidTxnList;
5859
import org.apache.hadoop.hive.common.ValidWriteIdList;
5960
import org.apache.hadoop.hive.metastore.api.FieldSchema;
6061
import org.apache.hadoop.hive.metastore.api.Table;
@@ -345,8 +346,11 @@ private List<FileCacheValue> getFileSplitByTransaction(HiveMetaStoreCache cache,
345346
}
346347
ValidWriteIdList validWriteIds = hiveTransaction.getValidWriteIds(
347348
((HMSExternalCatalog) hmsTable.getCatalog()).getClient());
348-
return cache.getFilesByTransaction(partitions, validWriteIds,
349-
hiveTransaction.isFullAcid(), skipCheckingAcidVersionFile, hmsTable.getId(), bindBrokerName);
349+
ValidTxnList validTxnList = hiveTransaction.getValidTxns(
350+
((HMSExternalCatalog) hmsTable.getCatalog()).getClient());
351+
352+
return cache.getFilesByTransaction(partitions, validWriteIds, validTxnList,
353+
hiveTransaction.isFullAcid(), skipCheckingAcidVersionFile, hmsTable.getId(), bindBrokerName);
350354
}
351355

352356
@Override

fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
2929

3030
import com.google.common.collect.ImmutableList;
31+
import org.apache.hadoop.hive.common.ValidTxnList;
3132
import org.apache.hadoop.hive.common.ValidWriteIdList;
3233
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
3334
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
@@ -206,6 +207,11 @@ public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTrans
206207
return null;
207208
}
208209

210+
@Override
211+
public ValidTxnList getValidTxns() {
212+
return null;
213+
}
214+
209215
@Override
210216
public void acquireSharedLock(String queryId, long txnId, String user, TableName tblName, List<String> partitionNames, long timeoutMs) {
211217

0 commit comments

Comments
 (0)