forked from pgspider/dynamodb_fdw
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconnection.cpp
386 lines (341 loc) · 10.7 KB
/
connection.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
/*-------------------------------------------------------------------------
*
* connection.cpp
* Connection management functions for dynamodb_fdw
*
* Portions Copyright (c) 2021, TOSHIBA CORPORATION
*
* IDENTIFICATION
* contrib/dynamodb_fdw/connection.cpp
*
*-------------------------------------------------------------------------
*/
extern "C"
{
#include "postgres.h"
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/pg_user_mapping.h"
#include "commands/defrem.h"
#include "dynamodb_fdw.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/fd.h"
#include "storage/latch.h"
#include "utils/hsearch.h"
#include "utils/inval.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
}
#include <aws/dynamodb/DynamoDBClient.h>
#include <aws/core/Aws.h>
#include <aws/core/auth/AWSCredentialsProvider.h>
static Aws::SDKOptions *aws_sdk_options;
extern "C" void
dynamodb_init()
{
aws_sdk_options = new Aws::SDKOptions();
Aws::InitAPI(*aws_sdk_options);
}
extern "C" void
dynamodb_shutdown()
{
Aws::ShutdownAPI(*aws_sdk_options);
aws_sdk_options = NULL;
}
typedef Oid ConnCacheKey;
typedef struct ConnCacheEntry
{
ConnCacheKey key; /* hash key (must be first) */
Aws::DynamoDB::DynamoDBClient *conn; /* connection to foreign server, or NULL */
/* Remaining fields are invalid when conn is NULL: */
bool invalidated; /* true if reconnect is pending */
uint32 server_hashvalue; /* hash value of foreign server OID */
uint32 mapping_hashvalue; /* hash value of user mapping OID */
} ConnCacheEntry;
/*
* Connection cache (initialized on first use)
*/
static HTAB *ConnectionHash = NULL;
/* prototypes of private functions */
static void dynamodb_make_new_connection(ConnCacheEntry *entry, UserMapping *user);
static Aws::DynamoDB::DynamoDBClient *dynamodb_create_connection(ForeignServer *server, UserMapping *user);
static void dynamodb_check_conn_params(dynamodb_opt *opt);
static void dynamodb_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
static Aws::DynamoDB::DynamoDBClient *dynamodb_client_open(const char *user,
const char *password,
const char *endpoint);
static void dynamodb_delete_client(Aws::DynamoDB::DynamoDBClient *dynamoDB_client);
/* prototypes of public functions */
extern void dynamodb_close_connection(ConnCacheEntry *entry);
/*
* dynamodb_get_connection
*
* Get a connection which can be used to execute queries on
* the remote DynamoDB with the user's authorization. A new connection
* is established if we don't already have a suitable one.
*/
Aws::DynamoDB::DynamoDBClient *
dynamodb_get_connection(UserMapping *user)
{
bool found;
ConnCacheEntry *entry;
ConnCacheKey key;
/* First time through, initialize connection cache hashtable */
if (ConnectionHash == NULL)
{
HASHCTL ctl;
#if PG_VERSION_NUM < 140000
MemSet(&ctl, 0, sizeof(ctl));
#endif
ctl.keysize = sizeof(ConnCacheKey);
ctl.entrysize = sizeof(ConnCacheEntry);
#if PG_VERSION_NUM < 140000
/* allocate ConnectionHash in the cache context */
ctl.hcxt = CacheMemoryContext;
#endif
ConnectionHash = hash_create("dynamoDB_fdw connections", 8,
&ctl,
#if PG_VERSION_NUM >= 140000
HASH_ELEM | HASH_BLOBS);
#else
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
#endif
/*
* Register some callback functions that manage connection cleanup.
* This should be done just once in each backend.
*/
CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
dynamodb_inval_callback, (Datum) 0);
CacheRegisterSyscacheCallback(USERMAPPINGOID,
dynamodb_inval_callback, (Datum) 0);
}
/* Create hash key for the entry. Assume no pad bytes in key struct */
key = user->umid;
/*
* Find or create cached entry for requested connection.
*/
entry = (ConnCacheEntry *) hash_search(ConnectionHash, &key, HASH_ENTER, &found);
if (!found)
{
/*
* We need only clear "conn" here; remaining fields will be filled
* later when "conn" is set.
*/
entry->conn = NULL;
}
/*
* If the connection needs to be remade due to invalidation, disconnect as
* soon as we're out of all transactions.
*/
if (entry->conn != NULL && entry->invalidated)
{
elog(DEBUG3, "dynamodb_fdw: closing connection %p for option changes to take effect",
entry->conn);
dynamodb_close_connection(entry);
}
/*
* If cache entry doesn't have a connection, we have to establish a new
* connection. (If connect_dynamo_server throws an error, the cache entry
* will remain in a valid empty state, ie conn == NULL.)
*/
if (entry->conn == NULL)
dynamodb_make_new_connection(entry, user);
return entry->conn;
}
/*
* Reset all transient state fields in the cached connection entry and
* establish new connection to the remote server.
*/
static void
dynamodb_make_new_connection(ConnCacheEntry *entry, UserMapping *user)
{
ForeignServer *server = GetForeignServer(user->serverid);
Assert(entry->conn == NULL);
/* Reset all transient state fields, to be sure all are clean */
entry->invalidated = false;
entry->server_hashvalue =
GetSysCacheHashValue1(FOREIGNSERVEROID,
ObjectIdGetDatum(server->serverid));
entry->mapping_hashvalue =
GetSysCacheHashValue1(USERMAPPINGOID,
ObjectIdGetDatum(user->umid));
/* Now try to make the connection */
entry->conn = dynamodb_create_connection(server, user);
elog(DEBUG3, "dynamodb_fdw: new dynamoDB_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
entry->conn, server->servername, user->umid, user->userid);
}
/*
* dynamodb_create_connection
*
* Connect to remote server using specified server and user mapping properties.
*/
static Aws::DynamoDB::DynamoDBClient *
dynamodb_create_connection(ForeignServer *server, UserMapping *user)
{
Aws::DynamoDB::DynamoDBClient *volatile conn = NULL;
dynamodb_opt *opt = dynamodb_get_options(server->serverid);
/*
* Extract options from FDW objects.
*/
PG_TRY();
{
/* verify connection parameters and make connection */
dynamodb_check_conn_params(opt);
conn = dynamodb_client_open(opt->svr_username, opt->svr_password, opt->svr_endpoint);
if (!conn)
ereport(ERROR,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
errmsg("dynamodb_fdw: could not connect to DynamoDB \"%s\"",
server->servername)));
}
PG_CATCH();
{
/* Close DynamoDB handle if we managed to create one */
if (conn)
{
dynamodb_delete_client(conn);
}
PG_RE_THROW();
}
PG_END_TRY();
return conn;
}
/*
* dynamodb_check_conn_params
*
* Password is required to connect to dynamoDB.
*/
static void
dynamodb_check_conn_params(dynamodb_opt *opt)
{
if (opt->svr_username == NULL)
ereport(ERROR,
(errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
errmsg("dynamodb_fdw: password is required"),
errdetail("Non-superusers must provide a password in the user mapping.")));
if (opt->svr_password == NULL)
ereport(ERROR,
(errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
errmsg("dynamodb_fdw: user is required"),
errdetail("Non-superusers must provide an user name in the user mapping.")));
}
/*
* dynamodb_inval_callback
*
* Connection invalidation callback function
*
* After a change to a pg_foreign_server or pg_user_mapping catalog entry,
* close connections depending on that entry immediately if current transaction
* has not used those connections yet. Otherwise, mark those connections as
* invalid and then make pgfdw_xact_callback() close them at the end of current
* transaction, since they cannot be closed in the midst of the transaction
* using them. Closed connections will be remade at the next opportunity if
* necessary.
*
* Although most cache invalidation callbacks blow away all the related stuff
* regardless of the given hashvalue, connections are expensive enough that
* it's worth trying to avoid that.
*
* NB: We could avoid unnecessary disconnection more strictly by examining
* individual option values, but it seems too much effort for the gain.
*/
static void
dynamodb_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
{
HASH_SEQ_STATUS scan;
ConnCacheEntry *entry;
Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
/* ConnectionHash must exist already, if we're registered */
hash_seq_init(&scan, ConnectionHash);
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
{
/* Ignore invalid entries */
if (entry->conn == NULL)
continue;
/* hashvalue == 0 means a cache reset, must clear all state */
if (hashvalue == 0 ||
(cacheid == FOREIGNSERVEROID &&
entry->server_hashvalue == hashvalue) ||
(cacheid == USERMAPPINGOID &&
entry->mapping_hashvalue == hashvalue))
entry->invalidated = true;
}
}
/*
* dynamodb_client_open
*
* Create dynamoDB handle.
*/
static Aws::DynamoDB::DynamoDBClient*
dynamodb_client_open(const char *user, const char *password, const char *endpoint)
{
const Aws::String access_key_id = user;
const Aws::String secret_access_key = password;
Aws::Client::ClientConfiguration clientConfig;
Aws::DynamoDB::DynamoDBClient *dynamo_client;
Aws::Auth::AWSCredentials cred(access_key_id, secret_access_key);
clientConfig.endpointOverride = endpoint;
dynamo_client = new Aws::DynamoDB::DynamoDBClient(cred, clientConfig);
return dynamo_client;
}
/*
* dynamodb_delete_client
*
* Delete DynamoDB client handle.
*/
static void
dynamodb_delete_client(Aws::DynamoDB::DynamoDBClient *dynamoDB_client)
{
delete dynamoDB_client;
}
/*
* Close any open handle for a connection cache entry.
*/
extern void
dynamodb_close_connection(ConnCacheEntry *entry)
{
if (entry->conn != NULL)
{
dynamodb_delete_client(entry->conn);
entry->conn = NULL;
}
}
/*
* dynamodb_report_error
*
* Report an error we got from the remote server.
*
* elevel: error level to use (typically ERROR, but might be less)
* message: error message
* query: the query that causes error
*
* Note: callers that choose not to throw ERROR for a remote error are
* responsible for making sure that the associated ConnCacheEntry gets
* marked with have_error = true.
*/
void
dynamodb_report_error(int elevel, const Aws::String message, char* query)
{
int state = ERRCODE_FDW_ERROR;
ereport(elevel,
(errcode(state),
errmsg("dynamodb_fdw: failed to execute remote SQL: %s \n sql=%s",
message.c_str(), query)
));
}
/*
* dynamodb_release_connection
*
* Release connection reference count created by calling GetConnection.
*/
void
dynamodb_release_connection(Aws::DynamoDB::DynamoDBClient *dynamoDB_client)
{
/*
* Currently, we don't actually track connection references because all
* cleanup is managed on a transaction or subtransaction basis instead. So
* there's nothing to do here.
*/
}