From 9017881e91b89b10c28593aa9a69fcd91171e723 Mon Sep 17 00:00:00 2001 From: shhnwz Date: Thu, 30 May 2024 20:12:16 +0530 Subject: [PATCH 1/2] Added feature to fetch s3 credential from aws provider chain --- parquet_s3_fdw.h | 3 ++ parquet_s3_fdw.hpp | 4 +-- parquet_s3_fdw_connection.cpp | 54 +++++++++++++++++++++------------- parquet_s3_fdw_server_option.c | 7 ++++- src/parquet_impl.cpp | 9 +++--- 5 files changed, 50 insertions(+), 27 deletions(-) diff --git a/parquet_s3_fdw.h b/parquet_s3_fdw.h index f788342..817726a 100644 --- a/parquet_s3_fdw.h +++ b/parquet_s3_fdw.h @@ -51,6 +51,8 @@ typedef struct parquet_s3_server_opt bool use_minio; /* Connect to MinIO instead of Amazon S3. */ bool keep_connections; /* setting value of keep_connections * server option */ + bool use_credential_provider; /* Retrieve AWS credentials using + * AWS Credential providers */ char *region; /* AWS region to connect to */ char *endpoint; /* Address and port to connect to */ } parquet_s3_server_opt; @@ -71,6 +73,7 @@ int ExecForeignDDL(Oid serverOid, /* Option name for CREATE FOREIGN SERVER. */ #define SERVER_OPTION_USE_MINIO "use_minio" #define SERVER_OPTION_KEEP_CONNECTIONS "keep_connections" +#define SERVER_OPTION_USE_CREDENTIAL_PROVIDER "use_credential_provider" #define SERVER_OPTION_REGION "region" #define SERVER_OPTION_ENDPOINT "endpoint" diff --git a/parquet_s3_fdw.hpp b/parquet_s3_fdw.hpp index 902cc46..448be54 100644 --- a/parquet_s3_fdw.hpp +++ b/parquet_s3_fdw.hpp @@ -58,7 +58,7 @@ typedef enum FileLocation_t /* * We would like to cache FileReader. When creating new hash entry, * the memory of entry is allocated by PostgreSQL core. But FileReader is - * a unique_ptr. In order to initialize it in parquet_s3_fdw, we define + * a unique_ptr. In order to initialize it in parquet_s3_fdw, we define * FileReaderCache class and the cache entry has the pointer of this class. */ class FileReaderCache @@ -84,7 +84,7 @@ extern List *extract_parquet_fields(const char *path, const char *dirname, Aws:: extern char *create_foreign_table_query(const char *tablename, const char *schemaname, const char *servername, char **paths, int npaths, List *fields, List *options); -extern Aws::S3::S3Client *parquetGetConnection(UserMapping *user, bool use_minio); +extern Aws::S3::S3Client *parquetGetConnection(UserMapping *user, parquet_s3_server_opt* option); extern Aws::S3::S3Client *parquetGetConnectionByTableid(Oid foreigntableid, Oid userid); extern void parquetReleaseConnection(Aws::S3::S3Client *conn); extern List* parquetGetS3ObjectList(Aws::S3::S3Client *s3_cli, const char *s3path); diff --git a/parquet_s3_fdw_connection.cpp b/parquet_s3_fdw_connection.cpp index ba10c48..b1432b6 100644 --- a/parquet_s3_fdw_connection.cpp +++ b/parquet_s3_fdw_connection.cpp @@ -11,6 +11,8 @@ *------------------------------------------------------------------------- */ #include +#include +#include #include #include #include @@ -119,13 +121,13 @@ PG_FUNCTION_INFO_V1(parquet_s3_fdw_disconnect_all); } /* prototypes of private functions */ -static void make_new_connection(ConnCacheEntry *entry, UserMapping *user, bool use_minio); +static void make_new_connection(ConnCacheEntry *entry, UserMapping *user, parquet_s3_server_opt* option); static bool disconnect_cached_connections(Oid serverid); -static Aws::S3::S3Client *create_s3_connection(ForeignServer *server, UserMapping *user, bool use_minio); +static Aws::S3::S3Client *create_s3_connection(ForeignServer *server, UserMapping *user, parquet_s3_server_opt* option); static void close_s3_connection(ConnCacheEntry *entry); static void check_conn_params(const char **keywords, const char **values, UserMapping *user); static void parquet_fdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue); -static Aws::S3::S3Client* s3_client_open(const char *user, const char *password, bool use_minio, const char *endpoint, const char *awsRegion); +static Aws::S3::S3Client* s3_client_open(const char *user, const char *password, bool use_minio, bool use_credential_provider, const char *endpoint, const char *awsRegion); static void s3_client_close(Aws::S3::S3Client *s3_client); extern "C" void @@ -148,7 +150,7 @@ parquet_s3_shutdown() * if we don't already have a suitable one. */ Aws::S3::S3Client * -parquetGetConnection(UserMapping *user, bool use_minio) +parquetGetConnection(UserMapping *user, parquet_s3_server_opt* option) { bool found; ConnCacheEntry *entry; @@ -215,7 +217,7 @@ parquetGetConnection(UserMapping *user, bool use_minio) * will remain in a valid empty state, ie conn == NULL.) */ if (entry->conn == NULL) - make_new_connection(entry, user, use_minio); + make_new_connection(entry, user, option); return entry->conn; } @@ -225,7 +227,7 @@ parquetGetConnection(UserMapping *user, bool use_minio) * establish new connection to the remote server. */ static void -make_new_connection(ConnCacheEntry *entry, UserMapping *user, bool use_minio) +make_new_connection(ConnCacheEntry *entry, UserMapping *user, parquet_s3_server_opt* option) { ForeignServer *server = GetForeignServer(user->serverid); @@ -242,7 +244,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user, bool use_minio) ObjectIdGetDatum(user->umid)); /* Now try to make the handle */ - entry->conn = create_s3_connection(server, user, use_minio); + entry->conn = create_s3_connection(server, user, option); elog(DEBUG3, "parquet_s3_fdw: new parquet_fdw handle %p for server \"%s\" (user mapping oid %u, userid %u)", entry->conn, server->servername, user->umid, user->userid); @@ -330,7 +332,7 @@ ExtractConnectionOptions(List *defelems, const char **keywords, * Connect to remote server using specified server and user mapping properties. */ static Aws::S3::S3Client * -create_s3_connection(ForeignServer *server, UserMapping *user, bool use_minio) +create_s3_connection(ForeignServer *server, UserMapping *user, parquet_s3_server_opt* option) { Aws::S3::S3Client *volatile conn = NULL; @@ -354,7 +356,7 @@ create_s3_connection(ForeignServer *server, UserMapping *user, bool use_minio) n = list_length(lst_options) + 1; keywords = (const char **) palloc(n * sizeof(char *)); values = (const char **) palloc(n * sizeof(char *)); - + n = ExtractConnectionOptions( lst_options, keywords, values); keywords[n] = values[n] = NULL; @@ -380,7 +382,7 @@ create_s3_connection(ForeignServer *server, UserMapping *user, bool use_minio) endpoint = defGetString(def); } - conn = s3_client_open(id, password, use_minio, endpoint, awsRegion); + conn = s3_client_open(id, password, option->use_minio, option->use_credential_provider, endpoint, awsRegion); if (!conn) ereport(ERROR, (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), @@ -494,13 +496,25 @@ parquet_fdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue) * Create S3 handle. */ static Aws::S3::S3Client* -s3_client_open(const char *user, const char *password, bool use_minio, const char *endpoint, const char * awsRegion) +s3_client_open(const char *user, const char *password, bool use_minio, bool use_credential_provider, const char *endpoint, const char * awsRegion) { const Aws::String access_key_id = user; const Aws::String secret_access_key = password; - Aws::Auth::AWSCredentials cred = Aws::Auth::AWSCredentials(access_key_id, secret_access_key); + Aws::Auth::AWSCredentials cred; Aws::S3::S3Client *s3_client; - + if (use_credential_provider) + { + Aws::Auth::DefaultAWSCredentialsProviderChain cred_provider; + cred = cred_provider.GetAWSCredentials(); + if (awsRegion == NULL) + { + awsRegion = Aws::Environment::GetEnv("AWS_REGION").c_str(); + } + elog(DEBUG1, "parquet_s3_fdw: AWSREGION %s", awsRegion); + }else + { + cred = Aws::Auth::AWSCredentials(access_key_id, secret_access_key); + } pthread_mutex_lock(&cred_mtx); Aws::Client::ClientConfiguration clientConfig; pthread_mutex_unlock(&cred_mtx); @@ -551,7 +565,7 @@ parquetGetConnectionByTableid(Oid foreigntableid, Oid userid) Assert(userid != InvalidOid); user = GetUserMapping(userid, fserver->serverid); options = parquet_s3_get_options(foreigntableid); - s3client = parquetGetConnection(user, options->use_minio); + s3client = parquetGetConnection(user, options); } return s3client; } @@ -621,7 +635,7 @@ parquetGetS3ObjectList(Aws::S3::S3Client *s3_cli, const char *s3path) /* * If the keep_connections option of its server is disabled, - * then discard it to recover. Next parquetGetConnection + * then discard it to recover. Next parquetGetConnection * will open a new connection. */ void @@ -703,7 +717,7 @@ parquetIsS3Filenames(List *filenames) /* * Split s3 path into bucket name and file path. - * If foreign table option 'dirname' is specified, dirname starts by + * If foreign table option 'dirname' is specified, dirname starts by * "s3://". And filename is already set by get_filenames_in_dir(). * On the other hand, if foreign table option 'filename' is specified, * dirname is NULL (Or empty string when ANALYZE was executed) @@ -791,14 +805,14 @@ List * parquetImportForeignSchemaS3(ImportForeignSchemaStmt *stmt, Oid serverOid) { List *cmds = NIL; - Aws::S3::S3Client *s3client; + Aws::S3::S3Client *s3client; List *objects; ListCell *cell; ForeignServer *fserver = GetForeignServer(serverOid); UserMapping *user = GetUserMapping(GetUserId(), fserver->serverid); parquet_s3_server_opt *options = parquet_s3_get_server_options(serverOid); - s3client = parquetGetConnection(user, options->use_minio); + s3client = parquetGetConnection(user, options); objects = parquetGetS3ObjectList(s3client, stmt->remote_schema); @@ -878,11 +892,11 @@ parquetExtractParquetFields(List *fields, char **paths, const char *servername) if (!fields) { if (IS_S3_PATH(paths[0])) - { + { ForeignServer *fserver = GetForeignServerByName(servername, false); UserMapping *user = GetUserMapping(GetUserId(), fserver->serverid); parquet_s3_server_opt *options = parquet_s3_get_server_options(fserver->serverid); - Aws::S3::S3Client *s3client = parquetGetConnection(user, options->use_minio); + Aws::S3::S3Client *s3client = parquetGetConnection(user, options); fields = extract_parquet_fields(paths[0], NULL, s3client); } diff --git a/parquet_s3_fdw_server_option.c b/parquet_s3_fdw_server_option.c index d836fd9..fa9e488 100644 --- a/parquet_s3_fdw_server_option.c +++ b/parquet_s3_fdw_server_option.c @@ -33,7 +33,8 @@ bool parquet_s3_is_valid_server_option(DefElem *def) { if (strcmp(def->defname, SERVER_OPTION_USE_MINIO) == 0 || - strcmp(def->defname, SERVER_OPTION_KEEP_CONNECTIONS) == 0) + strcmp(def->defname, SERVER_OPTION_KEEP_CONNECTIONS) == 0 || + strcmp(def->defname, SERVER_OPTION_USE_CREDENTIAL_PROVIDER) == 0) { /* Check that bool value is valid */ bool check_bool_valid; @@ -71,6 +72,8 @@ parquet_s3_extract_options(List *options, parquet_s3_server_opt * opt) opt->use_minio = defGetBoolean(def); else if (strcmp(def->defname, SERVER_OPTION_KEEP_CONNECTIONS) == 0) opt->keep_connections = defGetBoolean(def); + else if (strcmp(def->defname, SERVER_OPTION_USE_CREDENTIAL_PROVIDER) == 0) + opt->use_credential_provider = defGetBoolean(def); else if (strcmp(def->defname, SERVER_OPTION_REGION) == 0) opt->region = defGetString(def); else if (strcmp(def->defname, SERVER_OPTION_ENDPOINT) == 0) @@ -98,6 +101,7 @@ parquet_s3_get_options(Oid foreignoid) opt->use_minio = false; /* By default, all the connections to any foreign servers are kept open. */ opt->keep_connections = true; + opt->use_credential_provider = false; opt->region = "ap-northeast-1"; opt->endpoint = "127.0.0.1:9000"; @@ -147,6 +151,7 @@ parquet_s3_get_server_options(Oid serverid) opt->use_minio = false; /* By default, all the connections to any foreign servers are kept open. */ opt->keep_connections = true; + opt->use_credential_provider = false; opt->region = "ap-northeast-1"; opt->endpoint = "127.0.0.1:9000"; diff --git a/src/parquet_impl.cpp b/src/parquet_impl.cpp index d1ee672..be232ef 100644 --- a/src/parquet_impl.cpp +++ b/src/parquet_impl.cpp @@ -989,7 +989,7 @@ extract_rowgroups_list(const char *filename, } /* loop over rowgroups */ } catch(const std::exception& e) { - error = e.what(); + error = e.what(); } if (!error.empty()) { if (reader_entry) @@ -2540,7 +2540,7 @@ parquetAcquireSampleRowsFunc(Relation relation, int /* elevel */, slcols.insert(std::string(strVal(rcol))); } - festate = create_parquet_execution_state(RT_MULTI, reader_cxt, + festate = create_parquet_execution_state(RT_MULTI, reader_cxt, fdw_private.dirname, fdw_private.s3client, tupleDesc, @@ -2753,7 +2753,7 @@ parquetIsForeignScanParallelSafe(PlannerInfo * /* root */, RelOptInfo *rel, RangeTblEntry * /* rte */) { - /* Plan nodes that reference a correlated SubPlan is always parallel restricted. + /* Plan nodes that reference a correlated SubPlan is always parallel restricted. * Therefore, return false when there is lateral join. */ if (rel->lateral_relids) @@ -4329,6 +4329,7 @@ int ExecForeignDDL(Oid serverOid, table = GetForeignTable(RelationGetRelid(rel)); user = GetUserMapping(GetUserId(), serverOid); + parquet_s3_server_opt *options = parquet_s3_get_options(serverOid); foreach(lc, server->options) { DefElem *def = (DefElem *) lfirst(lc); @@ -4351,7 +4352,7 @@ int ExecForeignDDL(Oid serverOid, } if (IS_S3_PATH(dirname) || parquetIsS3Filenames(filenames)) - s3_client = parquetGetConnection(user, use_minio); + s3_client = parquetGetConnection(user, options); else s3_client = NULL; From 15ac0090ad180bb4ee7c6147f9b7dd755f6821d4 Mon Sep 17 00:00:00 2001 From: shhnwz Date: Tue, 25 Jun 2024 20:32:51 +0530 Subject: [PATCH 2/2] Added feature details in README --- README.md | 60 +++++++++++++++++++++++++++++++++---------------------- 1 file changed, 36 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index d36cd89..71f42cc 100644 --- a/README.md +++ b/README.md @@ -15,13 +15,13 @@ Read-only Apache Parquet foreign data wrapper supporting S3 access for PostgreSQ * libuuid-devel * pulseaudio-libs-devel ### 2. Install dependent libraries -* `libarrow` and `libparquet`: Confirmed version is 12.0.0 (required). +* `libarrow` and `libparquet`: Confirmed version is 12.0.0 (required). Please refer to [building guide](https://github.com/apache/arrow/blob/master/docs/source/developers/cpp/building.rst). -* `AWS SDK for C++ (libaws-cpp-sdk-core libaws-cpp-sdk-s3)`: Confirmed version is 1.11.91 (required). +* `AWS SDK for C++ (libaws-cpp-sdk-core libaws-cpp-sdk-s3)`: Confirmed version is 1.11.91 (required). Please refer to [bulding guide](https://docs.aws.amazon.com/sdk-for-cpp/v1/developer-guide/setup-linux.html) -Attention! +Attention! We reccomend to build `libarrow`, `libparquet` and `AWS SDK for C++` from the source code. We failed to link if using pre-compiled binaries because gcc version is different between arrow and AWS SDK. ### 3. Build and install parquet_s3_fdw @@ -45,6 +45,12 @@ CREATE EXTENSION parquet_s3_fdw; ```sql CREATE SERVER parquet_s3_srv FOREIGN DATA WRAPPER parquet_s3_fdw; ``` +If wanted to feed AWS Credentials through DefaultAWSCredentialProviderChain then +enable `use_credential_provider` server option + +```sql +CREATE SERVER parquet_s3_srv FOREIGN DATA WRAPPER parquet_s3_fdw OPTIONS (use_credential_provider 'true'); +``` If using [MinIO][3] instead of AWS S3, please use use_minio option for create server. ```sql CREATE SERVER parquet_s3_srv FOREIGN DATA WRAPPER parquet_s3_fdw OPTIONS (use_minio 'true'); @@ -56,6 +62,12 @@ You have to specify user name and password if accessing Amazon S3. CREATE USER MAPPING FOR public SERVER parquet_s3_srv OPTIONS (user 's3user', password 's3password'); ``` +If `use_credential_provider` is enabled, then no need to pass `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` as *User* and *Password*. + +```sql +CREATE USER MAPPING FOR public SERVER parquet_s3_srv OPTIONS (user '_', password '_'); +``` + ### Create foreign table Now you should be able to create foreign table from Parquet files. Currently `parquet_s3_fdw` supports the following column [types](https://github.com/apache/arrow/blob/master/cpp/src/arrow/type.h) (to be extended shortly): @@ -218,7 +230,7 @@ SELECT import_parquet_s3_explicit( v jsonb ) OPTIONS (filename '/path/to/parquet_file', schemaless 'true'); SELECT * FROM example_schemaless; - id | v + id | v ----+--------------------------------------------------------------------------------------------------------------------------------- | {"one": 1, "six": "t", "two": [1, 2, 3], "five": "2018-01-01", "four": "2018-01-01 00:00:00", "seven": 0.5, "three": "foo"} | {"one": 2, "six": "f", "two": [null, 5, 6], "five": "2018-01-02", "four": "2018-01-02 00:00:00", "seven": null, "three": "bar"} @@ -235,10 +247,10 @@ SELECT import_parquet_s3_explicit( ```sql -- non-schemaless mode SELECT * FROM example; - one | two | three | four | five | six | seven + one | two | three | four | five | six | seven -----+------------+-------+---------------------+------------+-----+------- 1 | {1,2,3} | foo | 2018-01-01 00:00:00 | 2018-01-01 | t | 0.5 - 2 | {NULL,5,6} | bar | 2018-01-02 00:00:00 | 2018-01-02 | f | + 2 | {NULL,5,6} | bar | 2018-01-02 00:00:00 | 2018-01-02 | f | (2 rows) -- schemaless mode SELECT * FROM example_schemaless; @@ -249,12 +261,12 @@ SELECT import_parquet_s3_explicit( (2 rows) ``` -- Fetch values in jsonb expression: - - Use `->>` jsonb arrow operator which return text type. User may cast type the jsonb expression to get corresponding data representation. - - For example, `v->>'col'` expression of fetch value `col` will be column name `col` in parquet file and we call it `schemaless variable` or `slvar`. +- Fetch values in jsonb expression: + - Use `->>` jsonb arrow operator which return text type. User may cast type the jsonb expression to get corresponding data representation. + - For example, `v->>'col'` expression of fetch value `col` will be column name `col` in parquet file and we call it `schemaless variable` or `slvar`. ```sql SELECT v->>'two', sqrt((v->>'one')::int) FROM example_schemaless; - ?column? | sqrt + ?column? | sqrt --------------+-------------------- [1, 2, 3] | 1 [null, 5, 6] | 1.4142135623730951 @@ -279,16 +291,16 @@ SELECT import_parquet_s3_explicit( SERVER parquet_s3_srv OPTIONS (filename '/path/to/example1.parquet /path/to/example2.parquet', sorted 'int64_col', schemaless 'true'); EXPLAIN (COSTS OFF) SELECT * FROM example_sorted ORDER BY (v->>'int64_col')::int8; - QUERY PLAN + QUERY PLAN -------------------------------- Foreign Scan on example_sorted Reader: Multifile Merge - Row groups: + Row groups: example1.parquet: 1, 2 example2.parquet: 1 (5 rows) ``` - - Support for arrow Nested List and Map: these type will be treated as nested jsonb value which can access by `->` operator. + - Support for arrow Nested List and Map: these type will be treated as nested jsonb value which can access by `->` operator. For example: ```sql SELECT * FROM example_schemaless; @@ -299,10 +311,10 @@ SELECT import_parquet_s3_explicit( (2 rows) SELECT v->'array_col'->1, v->'jsonb_col'->'1' FROM example3; - ?column? | ?column? + ?column? | ?column? ----------+---------- 20 | "foo" - 22 | + 22 | (2 rows) ``` @@ -438,7 +450,7 @@ INSERT INTO example_insert VALUES (1, 'text1', true), (2, DEFAULT, false), ((sel INSERT 0 3 SELECT * FROM example_insert; - c1 | c2 | c3 + c1 | c2 | c3 ----+-----------------+---- 1 | text1 | t 2 | | f @@ -453,7 +465,7 @@ CREATE FOREIGN TABLE example_insert_schemaless ( INSERT INTO example_insert_schemaless VALUES ('{"c1": 1, "c2": "text1", "c3": true}'), ('{"c1": 2, "c2": null, "c3": false}'), ('{"c1": 3, "c2": "values are fun!", "c3": true}'); SELECT * FROM example_insert_schemaless; - v + v ----------------------------------------------- {"c1": 1, "c2": "text1", "c3": "t"} {"c1": 2, "c2": null, "c3": "f"} @@ -486,7 +498,7 @@ CREATE FOREIGN TABLE example ( ) SERVER parquet_s3_srv OPTIONS (filename 's3://data/example.parquet'); SELECT * FROM example; - c1 | c2 | c3 + c1 | c2 | c3 ----+-----------------+---- 1 | text1 | t 2 | | f @@ -497,7 +509,7 @@ UPDATE example SET c3 = false WHERE c2 = 'text1'; UPDATE 1 SELECT * FROM example; - c1 | c2 | c3 + c1 | c2 | c3 ----+-----------------+---- 1 | text1 | f 2 | | f @@ -508,7 +520,7 @@ DELETE FROM example WHERE c1 = 2; DELETE 1 SELECT * FROM example; - c1 | c2 | c3 + c1 | c2 | c3 ----+-----------------+---- 1 | text1 | f 3 | values are fun! | t @@ -520,7 +532,7 @@ CREATE FOREIGN TABLE example_schemaless ( ) SERVER parquet_s3_srv OPTIONS (filename 's3://data/example.parquet', schemaless 'true', key_columns 'c1'); SELECT * FROM example_schemaless; - v + v ----------------------------------------------- {"c1": 1, "c2": "text1", "c3": "t"} {"c1": 2, "c2": null, "c3": "f"} @@ -531,7 +543,7 @@ UPDATE example_schemaless SET v='{"c3":false}' WHERE v->>'c2' = 'text1'; UPDATE 1 SELECT * FROM example_schemaless; - v + v ----------------------------------------------- {"c1": 1, "c2": "text1", "c3": "f"} {"c1": 2, "c2": null, "c3": "f"} @@ -542,7 +554,7 @@ DELETE FROM example_schemaless WHERE (v->>'c1')::int = 2; DELETE 1 SELECT * FROM example_schemaless; - v + v ----------------------------------------------- {"c1": 1, "c2": "text1", "c3": "f"} {"c1": 3, "c2": "values are fun!", "c3": "t"} @@ -573,7 +585,7 @@ SELECT * FROM example_schemaless; Opening issues and pull requests on GitHub are welcome. ## License -Copyright (c) 2021, TOSHIBA Corporation +Copyright (c) 2021, TOSHIBA Corporation Copyright (c) 2018 - 2019, adjust GmbH Permission to use, copy, modify, and distribute this software and its documentation for any purpose, without fee, and without a written agreement is hereby granted, provided that the above copyright notice and this paragraph and the following two paragraphs appear in all copies.