Skip to content

Commit

Permalink
hive metastore client has been added (ydb-platform#4593)
Browse files Browse the repository at this point in the history
  • Loading branch information
dorooleg authored May 21, 2024
1 parent 99ca7e7 commit c71c5b0
Show file tree
Hide file tree
Showing 36 changed files with 249,810 additions and 0 deletions.
1 change: 1 addition & 0 deletions ydb/core/external_sources/hive_metastore/events.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#include "events.h"
86 changes: 86 additions & 0 deletions ydb/core/external_sources/hive_metastore/events.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#pragma once

#include <ydb/core/external_sources/hive_metastore/hive_metastore_native/gen-cpp/ThriftHiveMetastore.h>
#include <ydb/library/actors/core/event_local.h>
#include <ydb/library/actors/core/events.h>
#include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
#include <ydb/public/api/protos/ydb_value.pb.h>

#include <library/cpp/threading/future/core/future.h>

namespace NKikimr::NExternalSource {

struct TEvHiveMetastore {
// Event ids.
enum EEv : ui32 {
EvGetTable = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
EvGetStatistics,
EvGetPartitions,

EvHiveGetTableResult,
EvHiveGetTableStatisticsResult,
EvHiveGetPartitionsResult,
EvEnd
};

static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents:ES_PRIVATE)");

struct TTable {
std::vector<Ydb::Column> Columns;
TString Location;
TString Format;
TString Compression;
std::vector<TString> PartitionedBy;
};

struct TEvGetTable: NActors::TEventLocal<TEvGetTable, EvGetTable> {
TString DatbaseName;
TString TableName;
NThreading::TPromise<TTable> Promise;
};

struct TStatistics {
TMaybe<int64_t> Rows;
TMaybe<int64_t> Size;
};

struct TEvGetStatistics: NActors::TEventLocal<TEvGetStatistics, EvGetStatistics> {
TString DatbaseName;
TString TableName;
std::vector<std::string> Columns;
NThreading::TPromise<TStatistics> Promise;
};

struct TPartitions {
struct TPartition {
TString Location;
std::vector<TString> Values;
};
std::vector<TPartition> Partitions;
};

struct TEvGetPartitions: NActors::TEventLocal<TEvGetPartitions, EvGetPartitions> {
TString DatbaseName;
TString TableName;
NYql::NConnector::NApi::TPredicate Predicate;
NThreading::TPromise<TPartitions> Promise;
};

struct TEvHiveGetTableResult: NActors::TEventLocal<TEvHiveGetTableResult, EvHiveGetTableResult> {
Apache::Hadoop::Hive::Table Table;
NYql::TIssues Issues;
};

struct TEvHiveGetTableStatisticsResult: NActors::TEventLocal<TEvHiveGetTableStatisticsResult, EvHiveGetTableStatisticsResult> {
Apache::Hadoop::Hive::TableStatsResult Statistics;
NYql::TIssues Issues;
};

struct TEvHiveGetPartitionsResult: NActors::TEventLocal<TEvHiveGetPartitionsResult, EvHiveGetPartitionsResult> {
std::vector<Apache::Hadoop::Hive::Partition> Partitions;
NYql::TIssues Issues;
};
};

}
130 changes: 130 additions & 0 deletions ydb/core/external_sources/hive_metastore/hive_metastore_client.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#include "hive_metastore_client.h"

namespace NKikimr::NExternalSource {

THiveMetastoreClient::THiveMetastoreClient(const TString& host, int32_t port)
: Socket(new apache::thrift::transport::TSocket(host, port))
, Transport(new apache::thrift::transport::TBufferedTransport(Socket))
, Protocol(new apache::thrift::protocol::TBinaryProtocol(Transport))
, Client(new Apache::Hadoop::Hive::ThriftHiveMetastoreClient(Protocol)) {
Transport->open();
// It's very important to keep here the only 1 thread
ThreadPool.Start(1);
}

template<typename TResultValue>
NThreading::TFuture<TResultValue> THiveMetastoreClient::RunOperation(const std::function<TResultValue()>& function) {
NThreading::TPromise<TResultValue> promise = NThreading::NewPromise<TResultValue>();
Y_ABORT_UNLESS(ThreadPool.Add(MakeThrFuncObj([promise, function, transport=Transport]() mutable {
try {
if constexpr (std::is_void_v<TResultValue>) {
function();
promise.SetValue();
} else {
promise.SetValue(function());
}
} catch (const apache::thrift::transport::TTransportException&) {
transport->close();
transport->open();
promise.SetException(std::current_exception());
} catch (...) {
promise.SetException(std::current_exception());
}
})));
return promise.GetFuture();
}

NThreading::TFuture<void> THiveMetastoreClient::CreateDatabase(const Apache::Hadoop::Hive::Database& database) {
return RunOperation<void>([client=Client, database]() {
client->create_database(database);
});
}

NThreading::TFuture<Apache::Hadoop::Hive::Database> THiveMetastoreClient::GetDatabase(const TString& name) {
return RunOperation<Apache::Hadoop::Hive::Database>([client=Client, name]() {
Apache::Hadoop::Hive::Database database;
client->get_database(database, name);
return database;
});
}

NThreading::TFuture<void> THiveMetastoreClient::CreateTable(const Apache::Hadoop::Hive::Table& table) {
return RunOperation<void>([client=Client, table]() {
client->create_table(table);
});
}

NThreading::TFuture<std::vector<std::string>> THiveMetastoreClient::GetAllDatabases() {
return RunOperation<std::vector<std::string>>([client=Client]() {
std::vector<std::string> databases;
client->get_all_databases(databases);
return databases;
});
}

NThreading::TFuture<Apache::Hadoop::Hive::Table> THiveMetastoreClient::GetTable(const TString& databaseName, const TString& tableName) {
return RunOperation<Apache::Hadoop::Hive::Table>([client=Client, databaseName, tableName]() {
Apache::Hadoop::Hive::Table table;
client->get_table(table, databaseName, tableName);
return table;
});
}

NThreading::TFuture<std::vector<std::string>> THiveMetastoreClient::GetAllTables(const TString& databaseName) {
return RunOperation<std::vector<std::string>>([client=Client, databaseName]() {
std::vector<std::string> tables;
client->get_all_tables(tables, databaseName);
return tables;
});
}

NThreading::TFuture<void> THiveMetastoreClient::UpdateTableColumnStatistics(const Apache::Hadoop::Hive::ColumnStatistics& columnStatistics) {
return RunOperation<void>([client=Client, columnStatistics]() {
client->update_table_column_statistics(columnStatistics);
});
}

NThreading::TFuture<Apache::Hadoop::Hive::TableStatsResult> THiveMetastoreClient::GetTableStatistics(const Apache::Hadoop::Hive::TableStatsRequest& request) {
return RunOperation<Apache::Hadoop::Hive::TableStatsResult>([client=Client, request]() {
Apache::Hadoop::Hive::TableStatsResult result;
client->get_table_statistics_req(result, request);
return result;
});
}

NThreading::TFuture<Apache::Hadoop::Hive::Partition> THiveMetastoreClient::AddPartition(const Apache::Hadoop::Hive::Partition& partition) {
return RunOperation<Apache::Hadoop::Hive::Partition>([client=Client, partition]() {
Apache::Hadoop::Hive::Partition result;
client->add_partition(result, partition);
return result;
});
}

NThreading::TFuture<void> THiveMetastoreClient::DropPartition(const TString& databaseName, const TString& tableName, const std::vector<std::string>& partitionValues, bool deleteData) {
return RunOperation<void>([client=Client, databaseName, tableName, partitionValues, deleteData]() {
client->drop_partition(databaseName, tableName, partitionValues, deleteData);
});
}

NThreading::TFuture<std::vector<Apache::Hadoop::Hive::Partition>> THiveMetastoreClient::GetPartitionsByFilter(const TString& databaseName, const TString& tableName, const TString& filter, int16_t maxPartitions) {
return RunOperation<std::vector<Apache::Hadoop::Hive::Partition>>([client=Client, databaseName, tableName, filter, maxPartitions]() {
std::vector<Apache::Hadoop::Hive::Partition> partitions;
client->get_partitions_by_filter(partitions, databaseName, tableName, filter, maxPartitions);
return partitions;
});
}

NThreading::TFuture<std::string> THiveMetastoreClient::GetConfigValue(const std::string& name, const std::string& defaultValue) {
return RunOperation<std::string>([client=Client, name, defaultValue]() {
std::string result;
client->get_config_value(result, name, defaultValue);
return result;
});
}

THiveMetastoreClient::~THiveMetastoreClient() {
ThreadPool.Stop();
Transport->close();
}

}
49 changes: 49 additions & 0 deletions ydb/core/external_sources/hive_metastore/hive_metastore_client.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#include <ydb/core/external_sources/hive_metastore/hive_metastore_native/gen-cpp/ThriftHiveMetastore.h>

#include <contrib/restricted/thrift/thrift/protocol/TBinaryProtocol.h>
#include <contrib/restricted/thrift/thrift/transport/TSocket.h>
#include <contrib/restricted/thrift/thrift/transport/TTransportUtils.h>

#include <library/cpp/threading/future/core/future.h>

#include <util/generic/string.h>
#include <util/thread/pool.h>

namespace NKikimr::NExternalSource {

struct THiveMetastoreClient : public TThrRefBase {
public:
THiveMetastoreClient(const TString& host, int32_t port);

NThreading::TFuture<void> CreateDatabase(const Apache::Hadoop::Hive::Database& database);
NThreading::TFuture<Apache::Hadoop::Hive::Database> GetDatabase(const TString& name);
NThreading::TFuture<std::vector<std::string>> GetAllDatabases();

NThreading::TFuture<void> CreateTable(const Apache::Hadoop::Hive::Table& table);
NThreading::TFuture<Apache::Hadoop::Hive::Table> GetTable(const TString& databaseName, const TString& tableName);
NThreading::TFuture<std::vector<std::string>> GetAllTables(const TString& databaseName);

NThreading::TFuture<void> UpdateTableColumnStatistics(const Apache::Hadoop::Hive::ColumnStatistics& columnStatistics);
NThreading::TFuture<Apache::Hadoop::Hive::TableStatsResult> GetTableStatistics(const Apache::Hadoop::Hive::TableStatsRequest& request);

NThreading::TFuture<Apache::Hadoop::Hive::Partition> AddPartition(const Apache::Hadoop::Hive::Partition& partition);
NThreading::TFuture<void> DropPartition(const TString& databaseName, const TString& tableName, const std::vector<std::string>& partitionValues, bool deleteData = false);
NThreading::TFuture<std::vector<Apache::Hadoop::Hive::Partition>> GetPartitionsByFilter(const TString& databaseName, const TString& tableName, const TString& filter, int16_t maxPartitions = -1);

NThreading::TFuture<std::string> GetConfigValue(const std::string& name, const std::string& defaultValue = {});

~THiveMetastoreClient();

private:
template<typename TResultValue>
NThreading::TFuture<TResultValue> RunOperation(const std::function<TResultValue()>& function);

private:
std::shared_ptr<apache::thrift::protocol::TTransport> Socket;
std::shared_ptr<apache::thrift::protocol::TTransport> Transport;
std::shared_ptr<apache::thrift::protocol::TProtocol> Protocol;
std::shared_ptr<Apache::Hadoop::Hive::ThriftHiveMetastoreClient> Client;
TThreadPool ThreadPool;
};

}
Loading

0 comments on commit c71c5b0

Please sign in to comment.