Skip to content

Commit

Permalink
Release v2.1.1 (#47)
Browse files Browse the repository at this point in the history
Support Insert/Update with generated column
Support check invalid options
Bug fixings:
- Fix issue #44 on GitHub
- Fix memory leak
  • Loading branch information
t-kataym authored Dec 22, 2021
2 parents 56fb787 + f14eed3 commit 9cffd29
Show file tree
Hide file tree
Showing 201 changed files with 2,703 additions and 1,086 deletions.
4 changes: 2 additions & 2 deletions META.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
"name": "sqlite_fdw",
"abstract": "Foreign Data Wrapper for SQLite databases",
"description": "PostgreSQL extension which implements a Foreign Data Wrapper (FDW) for SQLite databases.",
"version": "2.1.0",
"version": "2.1.1",
"maintainer": "pgspider",
"license": "postgresql",
"provides": {
"sqlite_fdw": {
"abstract": "Foreign Data Wrapper for SQLite databases",
"file": "sqlite_fdw.c",
"docfile": "README.md",
"version": "2.1.0"
"version": "2.1.1"
}
},
"prereqs": {
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,11 @@ SELECT * FROM t1;
- Support list cached connections to foreign servers by using function sqlite_fdw_get_connections()
- Support discard cached connections to foreign servers by using function sqlite_fdw_disconnect(), sqlite_fdw_disconnect_all().
- Support Bulk Insert by using batch_size option
- Support Insert/Update with generated column

## Limitations
- `COPY` command for foreign tables is not supported
- IMPORT of generated column is not supported
- Insert into a partitioned table which has foreign partitions is not supported
- TRUNCATE in sqlite_fdw always delete data of both parent and child tables (no matter user inputs `TRUNCATE table CASCADE` or `TRUNCATE table RESTRICT`) if there are foreign-keys references with "ON DELETE CASCADE" clause.
## Contributing
Expand Down
85 changes: 68 additions & 17 deletions connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ typedef struct ConnCacheEntry
bool truncatable; /* check table can truncate or not */
bool invalidated; /* true if reconnect is pending */
Oid serverid; /* foreign server OID used to get server name */
List *stmtList; /* list stmt associated with conn */
uint32 server_hashvalue; /* hash value of foreign server OID */
uint32 mapping_hashvalue; /* hash value of user mapping OID */
} ConnCacheEntry;
Expand Down Expand Up @@ -76,6 +77,8 @@ static void sqlitefdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
#if PG_VERSION_NUM >= 140000
static bool sqlite_disconnect_cached_connections(Oid serverid);
#endif
static void sqlite_finalize_list_stmt(List **list);
static List *sqlite_append_stmt_to_list(List *list, sqlite3_stmt * stmt);

/*
* sqlite_get_connection:
Expand Down Expand Up @@ -187,6 +190,7 @@ sqlite_make_new_connection(ConnCacheEntry *entry, ForeignServer *server)
entry->serverid = server->serverid;
entry->xact_depth = 0;
entry->invalidated = false;
entry->stmtList = NULL;
entry->keep_connections = true;
entry->server_hashvalue =
GetSysCacheHashValue1(FOREIGNSERVEROID,
Expand Down Expand Up @@ -240,16 +244,11 @@ sqlite_cleanup_connection(void)
hash_seq_init(&scan, ConnectionHash);
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
{
sqlite3_stmt *cur = NULL;

if (entry->conn == NULL)
continue;

while ((cur = sqlite3_next_stmt(entry->conn, cur)) != NULL)
{
elog(DEBUG1, "finalize %s", sqlite3_sql(cur));
sqlite3_finalize(cur);
}
sqlite_finalize_list_stmt(&entry->stmtList);

elog(DEBUG1, "disconnecting sqlite_fdw connection %p", entry->conn);
rc = sqlite3_close(entry->conn);
entry->conn = NULL;
Expand Down Expand Up @@ -359,10 +358,6 @@ sqlitefdw_report_error(int elevel, sqlite3_stmt * stmt, sqlite3 * conn,
if (sql)
sql = pstrdup(sqlite3_sql(stmt));
}

if (stmt)
sqlite3_finalize(stmt);

ereport(ERROR,
(errcode(sqlstate),
errmsg("failed to execute remote SQL: rc=%d %s \n sql=%s",
Expand Down Expand Up @@ -412,6 +407,8 @@ sqlitefdw_xact_callback(XactEvent event, void *arg)
/* Commit all remote transactions during pre-commit */
if (!sqlite3_get_autocommit(entry->conn))
sqlite_do_sql_command(entry->conn, "COMMIT", ERROR);
/* Finalize all prepared statements */
sqlite_finalize_list_stmt(&entry->stmtList);
break;
case XACT_EVENT_PRE_PREPARE:

Expand All @@ -437,15 +434,10 @@ sqlitefdw_xact_callback(XactEvent event, void *arg)
case XACT_EVENT_PARALLEL_ABORT:
case XACT_EVENT_ABORT:
{
sqlite3_stmt *cur = NULL;

elog(DEBUG3, "abort transaction");

/* Finalize all prepared statements */
while ((cur = sqlite3_next_stmt(entry->conn, NULL)) != NULL)
{
sqlite3_finalize(cur);
}
sqlite_finalize_list_stmt(&entry->stmtList);

/*
* rollback if in transaction because SQLite may
Expand Down Expand Up @@ -880,6 +872,7 @@ sqlite_disconnect_cached_connections(Oid serverid)
else
{
elog(DEBUG3, "discarding sqlite_fdw connection %p", entry->conn);
sqlite_finalize_list_stmt(&entry->stmtList);
sqlite3_close(entry->conn);
entry->conn = NULL;
result = true;
Expand All @@ -889,3 +882,61 @@ sqlite_disconnect_cached_connections(Oid serverid)
return result;
}
#endif

/*
* cache sqlite3 statement to finalize at the end of transaction
*/
void
sqlite_cache_stmt(ForeignServer *server, sqlite3_stmt * *stmt)
{
bool found;
ConnCacheEntry *entry;
ConnCacheKey key = server->serverid;

/*
* Find cached entry for requested connection.
*/
entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);

/* We must always have found the entry */
Assert(found);

entry->stmtList = sqlite_append_stmt_to_list(entry->stmtList, *stmt);
}

/*
* finalize all sqlite statement
*/
static void
sqlite_finalize_list_stmt(List **list)
{
ListCell *lc;

foreach(lc, *list)
{
sqlite3_stmt *stmt = (sqlite3_stmt *) lfirst(lc);

elog(DEBUG1, "sqlite_fdw: finalize %s", sqlite3_sql(stmt));
sqlite3_finalize(stmt);
}

list_free(*list);
*list = NULL;
}

/*
* append sqlite3 stmt to the head of linked list
*/
static List *
sqlite_append_stmt_to_list(List *list, sqlite3_stmt * stmt)
{
/*
* CurrentMemoryContext is released before cleanup transaction (when the
* list is called), so, use TopMemoryContext instead.
*/
MemoryContext oldcontext = MemoryContextSwitchTo(TopMemoryContext);

list = lappend(list, stmt);
MemoryContextSwitchTo(oldcontext);
return list;
}
115 changes: 90 additions & 25 deletions deparse.c
Original file line number Diff line number Diff line change
Expand Up @@ -1545,27 +1545,66 @@ sqlite_deparse_insert(StringInfo buf, PlannerInfo *root,
List *targetAttrs, bool doNothing,
int *values_end_len)
{
#if PG_VERSION_NUM >= 140000
TupleDesc tupdesc = RelationGetDescr(rel);
bool all_columns_generated = true;
#endif
AttrNumber pindex;
bool first;
ListCell *lc;

appendStringInfo(buf, "INSERT %sINTO ", doNothing ? "OR IGNORE " : "");
sqlite_deparse_relation(buf, rel);

#if PG_VERSION_NUM >= 140000

/*
* Check all columns in table that they are all generated column or not.
* If true, we will skip all columns and just add 'DEFAULT VALUES'. If
* not, we still push down other columns which are not generated column.
*/
if (targetAttrs)
{
foreach(lc, targetAttrs)
{
int attnum = linitial_int(targetAttrs);
Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);

if (!attr->attgenerated)
{
all_columns_generated = false;
break;
}
}
}
#endif

#if (PG_VERSION_NUM >= 140000)
if (targetAttrs && !all_columns_generated)
#else
if (targetAttrs)
#endif
{
appendStringInfoChar(buf, '(');

first = true;
foreach(lc, targetAttrs)
{
int attnum = lfirst_int(lc);
#if PG_VERSION_NUM >= 140000
Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);

if (!first)
appendStringInfoString(buf, ", ");
first = false;
if (!attr->attgenerated)
{
#endif
if (!first)
appendStringInfoString(buf, ", ");
first = false;

sqlite_deparse_column_ref(buf, rtindex, attnum, root, false);
sqlite_deparse_column_ref(buf, rtindex, attnum, root, false);
#if PG_VERSION_NUM >= 140000
}
#endif
}

appendStringInfoString(buf, ") VALUES (");
Expand All @@ -1574,12 +1613,21 @@ sqlite_deparse_insert(StringInfo buf, PlannerInfo *root,
first = true;
foreach(lc, targetAttrs)
{
if (!first)
appendStringInfoString(buf, ", ");
first = false;
#if PG_VERSION_NUM >= 140000
int attnum = lfirst_int(lc);
Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);

appendStringInfo(buf, "?");
pindex++;
if (!attr->attgenerated)
{
#endif
if (!first)
appendStringInfoString(buf, ", ");
first = false;
appendStringInfo(buf, "?");
pindex++;
#if PG_VERSION_NUM >= 140000
}
#endif
}

appendStringInfoChar(buf, ')');
Expand All @@ -1597,13 +1645,14 @@ sqlite_deparse_insert(StringInfo buf, PlannerInfo *root,
* right number of parameters.
*/
void
sqlite_rebuild_insert(StringInfo buf, char *orig_query,
int values_end_len, int num_cols,
sqlite_rebuild_insert(StringInfo buf, Relation rel, char *orig_query,
List *target_attrs, int values_end_len, int num_params,
int num_rows)
{
int i,
j;
TupleDesc tupdesc = RelationGetDescr(rel);
int i;
bool first;
ListCell *lc;

/* Make sure the values_end_len is sensible */
Assert((values_end_len > 0) && (values_end_len <= strlen(orig_query)));
Expand All @@ -1620,13 +1669,19 @@ sqlite_rebuild_insert(StringInfo buf, char *orig_query,
appendStringInfoString(buf, ", (");

first = true;
for (j = 0; j < num_cols; j++)
foreach(lc, target_attrs)
{
if (!first)
appendStringInfoString(buf, ", ");
first = false;
int attnum = lfirst_int(lc);
Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);

if (!attr->attgenerated)
{
if (!first)
appendStringInfoString(buf, ", ");
first = false;

appendStringInfo(buf, "?");
appendStringInfo(buf, "?");
}
}

appendStringInfoChar(buf, ')');
Expand Down Expand Up @@ -2028,6 +2083,9 @@ sqlite_deparse_update(StringInfo buf, PlannerInfo *root,
Index rtindex, Relation rel,
List *targetAttrs, List *attnums)
{
#if PG_VERSION_NUM >= 140000
TupleDesc tupdesc = RelationGetDescr(rel);
#endif
AttrNumber pindex;
bool first;
ListCell *lc;
Expand All @@ -2042,14 +2100,21 @@ sqlite_deparse_update(StringInfo buf, PlannerInfo *root,
foreach(lc, targetAttrs)
{
int attnum = lfirst_int(lc);
#if PG_VERSION_NUM >= 140000
Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);

if (!first)
appendStringInfoString(buf, ", ");
first = false;

sqlite_deparse_column_ref(buf, rtindex, attnum, root, false);
appendStringInfo(buf, " = ?");
pindex++;
if (!attr->attgenerated)
{
#endif
if (!first)
appendStringInfoString(buf, ", ");
first = false;
sqlite_deparse_column_ref(buf, rtindex, attnum, root, false);
appendStringInfo(buf, " = ?");
pindex++;
#if PG_VERSION_NUM >= 140000
}
#endif
}
i = 0;
foreach(lc, attnums)
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit 9cffd29

Please sign in to comment.