Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wuwenchi committed Nov 22, 2024
1 parent db02ca1 commit ed34249
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 8 deletions.
6 changes: 3 additions & 3 deletions be/src/vec/exec/format/table/paimon_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const std::string PaimonJniReader::HADOOP_OPTION_PREFIX = "hadoop.";

PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs,
RuntimeState* state, RuntimeProfile* profile,
const TFileRangeDesc& range)
const TFileRangeDesc& range, const TFileScanRangeParams* range_params)
: JniReader(file_slot_descs, state, profile) {
std::vector<std::string> column_names;
std::vector<std::string> column_types;
Expand All @@ -61,8 +61,8 @@ PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_d
std::to_string(range.table_format_params.paimon_params.last_update_time);
params["required_fields"] = join(column_names, ",");
params["columns_types"] = join(column_types, "#");
if (range.table_format_params.paimon_params.__isset.paimon_table) {
params["paimon_table"] = range.table_format_params.paimon_params.paimon_table;
if (range_params->__isset.serialized_table) {
params["serialized_table"] = range_params->serialized_table;
}

// Used to create paimon option
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/format/table/paimon_jni_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class PaimonJniReader : public JniReader {
static const std::string PAIMON_OPTION_PREFIX;
static const std::string HADOOP_OPTION_PREFIX;
PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state,
RuntimeProfile* profile, const TFileRangeDesc& range);
RuntimeProfile* profile, const TFileRangeDesc& range, const TFileScanRangeParams* range_params);

~PaimonJniReader() override = default;

Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ Status VFileScanner::_get_next_reader() {
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "paimon") {
_cur_reader =
PaimonJniReader::create_unique(_file_slot_descs, _state, _profile, range);
PaimonJniReader::create_unique(_file_slot_descs, _state, _profile, range, _params);
init_status = ((PaimonJniReader*)(_cur_reader.get()))
->init_reader(_colname_to_value_range);
} else if (range.__isset.table_format_params &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ protected TableSchema parseTableSchema() throws UnsupportedOperationException {
}

private void initTable() {
if (params.containsKey("paimon_table")) {
table = PaimonUtils.deserialize(params.get("paimon_table"));
if (params.containsKey("serialized_table")) {
table = PaimonUtils.deserialize(params.get("serialized_table"));
} else {
PaimonTableCacheKey key = new PaimonTableCacheKey(ctlId, dbId, tblId,
paimonOptionParams, hadoopOptionParams, dbName, tblName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* FileQueryScanNode for querying the file access type of catalog, now only support
Expand Down Expand Up @@ -261,6 +262,11 @@ public TFileScanRangeParams getFileScanRangeParams() {
protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
}

// Serialize the table to be scanned to BE's jni reader
protected Optional<String> getSerializedTable() {
return Optional.empty();
}

@Override
public void createScanRangeLocations() throws UserException {
long start = System.currentTimeMillis();
Expand Down Expand Up @@ -369,6 +375,8 @@ public void createScanRangeLocations() throws UserException {
}
}

getSerializedTable().ifPresent(params::setSerializedTable);

if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setCreateScanRangeFinishTime();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public String toString() {
private int paimonSplitNum = 0;
private List<SplitStat> splitStats = new ArrayList<>();
private SessionVariable sessionVariable;
private String serializedTable;

public PaimonScanNode(PlanNodeId id,
TupleDescriptor desc,
Expand All @@ -115,6 +116,7 @@ public PaimonScanNode(PlanNodeId id,
protected void doInitialize() throws UserException {
super.doInitialize();
source = new PaimonSource(desc);
serializedTable = encodeObjectToString(source.getPaimonTable());
Preconditions.checkNotNull(source);
}

Expand Down Expand Up @@ -144,6 +146,11 @@ protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
}
}

@Override
protected Optional<String> getSerializedTable() {
return Optional.of(serializedTable);
}

private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(paimonSplit.getTableFormatType().value());
Expand Down
7 changes: 6 additions & 1 deletion gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ struct TPaimonFileDesc {
11: optional string file_format
12: optional TPaimonDeletionFileDesc deletion_file;
13: optional map<string, string> hadoop_conf // deprecated
14: optional string paimon_table
14: optional string paimon_table // deprecated
}

struct TTrinoConnectorFileDesc {
Expand Down Expand Up @@ -448,6 +448,11 @@ struct TFileScanRangeParams {
22: optional TTextSerdeType text_serde_type
// used by flexible partial update
23: optional string sequence_map_col
// table from FE, used for jni scanner
// BE can use table director:
// 1. Reduce the access to HMS and HDFS on the JNI side.
// 2. There will be no inconsistency between the fe and be tables.
24: optional string serialized_table
}

struct TFileRangeDesc {
Expand Down

0 comments on commit ed34249

Please sign in to comment.