Skip to content

Commit

Permalink
Fix time comparison (#54)
Browse files Browse the repository at this point in the history
* Fix time comparison with sub-query

* Update readme.md
Make clear about test result with different version of glibc.

---------

Co-authored-by: Duong Ngoc Lam <[email protected]>
  • Loading branch information
lamdn1409 and lamduongngoc authored Sep 26, 2024
1 parent c579565 commit da0d422
Show file tree
Hide file tree
Showing 27 changed files with 38,249 additions and 457 deletions.
249 changes: 248 additions & 1 deletion README.md

Large diffs are not rendered by default.

366 changes: 335 additions & 31 deletions deparse.c

Large diffs are not rendered by default.

3,122 changes: 3,046 additions & 76 deletions expected/12.16/influxdb_fdw.out

Large diffs are not rendered by default.

2,846 changes: 2,845 additions & 1 deletion expected/12.16/schemaless/influxdb_fdw.out

Large diffs are not rendered by default.

3,122 changes: 3,046 additions & 76 deletions expected/13.12/influxdb_fdw.out

Large diffs are not rendered by default.

2,846 changes: 2,845 additions & 1 deletion expected/13.12/schemaless/influxdb_fdw.out

Large diffs are not rendered by default.

3,122 changes: 3,046 additions & 76 deletions expected/14.9/influxdb_fdw.out

Large diffs are not rendered by default.

2,846 changes: 2,845 additions & 1 deletion expected/14.9/schemaless/influxdb_fdw.out

Large diffs are not rendered by default.

3,122 changes: 3,046 additions & 76 deletions expected/15.4/influxdb_fdw.out

Large diffs are not rendered by default.

2,845 changes: 2,844 additions & 1 deletion expected/15.4/schemaless/influxdb_fdw.out

Large diffs are not rendered by default.

3,120 changes: 3,044 additions & 76 deletions expected/16.0/influxdb_fdw.out

Large diffs are not rendered by default.

2,845 changes: 2,844 additions & 1 deletion expected/16.0/schemaless/influxdb_fdw.out

Large diffs are not rendered by default.

116 changes: 108 additions & 8 deletions influxdb_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,16 @@ static void influxdb_to_pg_type(StringInfo str, char *typname);

static void prepare_query_params(PlanState *node,
List *fdw_exprs,
List *remote_exprs,
Oid foreigntableid,
int numParams,
FmgrInfo **param_flinfo,
List **param_exprs,
const char ***param_values,
Oid **param_types,
InfluxDBType * *param_influxdb_types,
InfluxDBValue * *param_influxdb_values);
InfluxDBValue * *param_influxdb_values,
InfluxDBColumnInfo * *param_column_info);

static void process_query_params(ExprContext *econtext,
FmgrInfo *param_flinfo,
Expand Down Expand Up @@ -247,6 +250,7 @@ static int influxdb_get_batch_size_option(Relation rel);
#endif
static void influxdb_extract_slcols(InfluxDBFdwRelationInfo *fpinfo, PlannerInfo *root, RelOptInfo *baserel, List *tlist);
static bool influxdb_is_existed_measurement(Oid serverOid, char* tbl_name, influxdb_opt *options);
static bool influxdb_param_belong_to_qual(Node *qual, Node *param);

#ifdef CXX_CLIENT
#define free(x) pfree(x)
Expand Down Expand Up @@ -300,7 +304,9 @@ enum FdwDirectModifyPrivateIndex
/* Integer list of attribute numbers retrieved by RETURNING */
FdwDirectModifyPrivateRetrievedAttrs,
/* set-processed flag (as an Boolean node) */
FdwDirectModifyPrivateSetProcessed
FdwDirectModifyPrivateSetProcessed,
/* remote conditions */
FdwDirectModifyRemoteExprs
};

/*
Expand Down Expand Up @@ -862,14 +868,12 @@ influxdbGetForeignPlan(PlannerInfo *root,

if (list_member_ptr(fpinfo->remote_conds, rinfo))
{
remote_conds = lappend(remote_conds, rinfo);
remote_exprs = lappend(remote_exprs, rinfo->clause);
}
else if (list_member_ptr(fpinfo->local_conds, rinfo))
local_exprs = lappend(local_exprs, rinfo->clause);
else if (influxdb_is_foreign_expr(root, baserel, rinfo->clause, false))
{
remote_conds = lappend(remote_conds, rinfo);
remote_exprs = lappend(remote_exprs, rinfo->clause);
}
else
Expand Down Expand Up @@ -1016,6 +1020,17 @@ influxdbGetForeignPlan(PlannerInfo *root,
for_update = true;
}

/* Get remote condition */
if (baserel->reloptkind == RELOPT_UPPER_REL)
{
InfluxDBFdwRelationInfo *ofpinfo;

ofpinfo = (InfluxDBFdwRelationInfo *) fpinfo->outerrel->fdw_private;
remote_conds = ofpinfo->remote_conds;
}
else
remote_conds = remote_exprs;

/*
* Build the fdw_private list that will be available to the executor.
* Items in the list must match enum FdwScanPrivateIndex, above.
Expand All @@ -1024,6 +1039,7 @@ influxdbGetForeignPlan(PlannerInfo *root,
fdw_private = lappend(fdw_private, fdw_scan_tlist);
fdw_private = lappend(fdw_private, makeInteger(fpinfo->is_tlist_func_pushdown));
fdw_private = lappend(fdw_private, makeInteger(fpinfo->slinfo.schemaless));
fdw_private = lappend(fdw_private, remote_conds);

/*
* Create the ForeignScan node from target list, local filtering
Expand Down Expand Up @@ -1060,6 +1076,7 @@ influxdbBeginForeignScan(ForeignScanState *node, int eflags)
#ifdef CXX_CLIENT
ForeignTable *ftable;
#endif
List *remote_exprs;

elog(DEBUG1, "influxdb_fdw : %s", __func__);

Expand All @@ -1077,6 +1094,7 @@ influxdbBeginForeignScan(ForeignScanState *node, int eflags)
festate->tlist = (List *) list_nth(fsplan->fdw_private, 3);
festate->is_tlist_func_pushdown = intVal(list_nth(fsplan->fdw_private, 4)) ? true : false;
schemaless = intVal(list_nth(fsplan->fdw_private, 5)) ? true : false;
remote_exprs = (List *) list_nth(fsplan->fdw_private, 6);

festate->cursor_exists = false;

Expand Down Expand Up @@ -1116,15 +1134,20 @@ influxdbBeginForeignScan(ForeignScanState *node, int eflags)
numParams = list_length(fsplan->fdw_exprs);
festate->numParams = numParams;
if (numParams > 0)
{
prepare_query_params((PlanState *) node,
fsplan->fdw_exprs,
remote_exprs,
rte->relid,
numParams,
&festate->param_flinfo,
&festate->param_exprs,
&festate->param_values,
&festate->param_types,
&festate->param_influxdb_types,
&festate->param_influxdb_values);
&festate->param_influxdb_values,
&festate->param_column_info);
}
}

/*
Expand Down Expand Up @@ -1986,7 +2009,7 @@ influxdbBeginForeignModify(ModifyTableState *mtstate,
fmstate->query = strVal(list_nth(fdw_private, FdwModifyPrivateUpdateSql));
fmstate->retrieved_attrs = (List *) list_nth(fdw_private, FdwModifyPrivateTargetAttnums);

if (mtstate->operation == CMD_INSERT)
if (mtstate->operation == CMD_INSERT || mtstate->operation == CMD_DELETE)
{
fmstate->column_list = NIL;

Expand Down Expand Up @@ -2226,8 +2249,14 @@ bindJunkColumnValue(InfluxDBFdwExecState * fmstate,
fmstate->param_influxdb_values[bindnum].i = 0;
}
else
{
struct InfluxDBColumnInfo *col = list_nth(fmstate->column_list, (int) bindnum);

fmstate->param_column_info[bindnum].column_type = col->column_type;

influxdb_bind_sql_var(type, bindnum, value, &is_null, fmstate->param_column_info,
fmstate->param_influxdb_types, fmstate->param_influxdb_values);
}
bindnum++;
}
}
Expand Down Expand Up @@ -2521,6 +2550,9 @@ influxdbPlanDirectModify(PlannerInfo *root,
retrieved_attrs,
makeInteger(plan->canSetTag));
#endif

fscan->fdw_private = lappend(fscan->fdw_private, remote_exprs);

/*
* Update the foreign-join-related fields.
*/
Expand Down Expand Up @@ -2550,6 +2582,7 @@ influxdbBeginDirectModify(ForeignScanState *node, int eflags)
#ifdef CXX_CLIENT
ForeignTable *ftable;
#endif
List *remote_exprs;

elog(DEBUG1, "influxdb_fdw : %s", __func__);

Expand Down Expand Up @@ -2635,6 +2668,9 @@ influxdbBeginDirectModify(ForeignScanState *node, int eflags)
FdwDirectModifyPrivateSetProcessed));
#endif

remote_exprs = (List *) list_nth(fsplan->fdw_private,
FdwDirectModifyRemoteExprs);

/*
* Prepare for processing of parameters used in remote query, if any.
*/
Expand All @@ -2643,13 +2679,16 @@ influxdbBeginDirectModify(ForeignScanState *node, int eflags)
if (numParams > 0)
prepare_query_params((PlanState *) node,
fsplan->fdw_exprs,
remote_exprs,
rte->relid,
numParams,
&dmstate->param_flinfo,
&dmstate->param_exprs,
&dmstate->param_values,
&dmstate->param_types,
&dmstate->param_influxdb_types,
&dmstate->param_influxdb_values);
&dmstate->param_influxdb_values,
&dmstate->param_column_info);
}

/*
Expand Down Expand Up @@ -3808,13 +3847,16 @@ influxdb_reset_transmission_modes(int nestlevel)
static void
prepare_query_params(PlanState *node,
List *fdw_exprs,
List *remote_exprs,
Oid foreigntableid,
int numParams,
FmgrInfo **param_flinfo,
List **param_exprs,
const char ***param_values,
Oid **param_types,
InfluxDBType * *param_influxdb_types,
InfluxDBValue * *param_influxdb_values)
InfluxDBValue * *param_influxdb_values,
InfluxDBColumnInfo * *param_column_info)
{
int i;
ListCell *lc;
Expand All @@ -3826,6 +3868,7 @@ prepare_query_params(PlanState *node,
*param_types = (Oid *) palloc0(sizeof(Oid) * numParams);
*param_influxdb_types = (InfluxDBType *) palloc0(sizeof(InfluxDBType) * numParams);
*param_influxdb_values = (InfluxDBValue *) palloc0(sizeof(InfluxDBValue) * numParams);
*param_column_info = (InfluxDBColumnInfo *)palloc0(sizeof(InfluxDBColumnInfo) * numParams);

i = 0;
foreach(lc, fdw_exprs)
Expand All @@ -3837,6 +3880,49 @@ prepare_query_params(PlanState *node,
(*param_types)[i] = exprType(param_expr);
getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
fmgr_info(typefnoid, &(*param_flinfo)[i]);

/*
* In case param type is kind of time, if column is time key, influxdb handles the comparison as time comparison.
* If column is tag/field key, influxdb handles the comparison as string comparison.
*/
if (INFLUXDB_IS_TIME_TYPE((*param_types)[i]))
{
ListCell *expr_cell;

foreach (expr_cell, remote_exprs)
{
Node *qual = (Node *)lfirst(expr_cell);

/* Check if param is in the qual */
if (influxdb_param_belong_to_qual(qual, param_expr))
{
Var *col;
char *column_name;
List *column_list = pull_var_clause(qual, PVC_RECURSE_PLACEHOLDERS);

/*
* Cases for time comparison with Parameter InfluxDB FDW supports pushdown.
* (1) time type column (both time key and tags/fields) = Param
* (2) time key column > Param
* (3) time key column < Param
* (4) time key column >= Param
* (5) time key column <= Param
*
* In each case, there is only one time column, so column_list has one item.
*/
col = linitial(column_list);

column_name = influxdb_get_column_name(foreigntableid, col->varattno);

if (INFLUXDB_IS_TIME_COLUMN(column_name))
(*param_column_info)[i].column_type = INFLUXDB_TIME_KEY;
else if (influxdb_is_tag_key(column_name, foreigntableid))
(*param_column_info)[i].column_type = INFLUXDB_TAG_KEY;
else
(*param_column_info)[i].column_type = INFLUXDB_FIELD_KEY;
}
}
}
i++;
}

Expand All @@ -3853,6 +3939,20 @@ prepare_query_params(PlanState *node,
*param_values = (const char **) palloc0(numParams * sizeof(char *));
}

/*
* Check if parameter is in the condition
*/
static bool influxdb_param_belong_to_qual(Node *qual, Node *param)
{
if (qual == NULL)
return false;

if (equal(qual, param))
return true;

return expression_tree_walker(qual, influxdb_param_belong_to_qual, param);
}

/*
* Construct array of query parameter values and bind parameters
*
Expand Down
7 changes: 5 additions & 2 deletions influxdb_fdw.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@
#define INFLUXDB_FIELDS_COLUMN "fields"
#define INFLUXDB_TAGS_PGTYPE "jsonb"
#define INFLUXDB_FIELDS_PGTYPE "jsonb"
#define INFLUXDB_IS_TIME_COLUMN(X) (strcmp(X,INFLUXDB_TIME_COLUMN) == 0 || \
strcmp(X,INFLUXDB_TIME_TEXT_COLUMN) == 0)
#define INFLUXDB_IS_TIME_COLUMN(X) (strcmp(X, INFLUXDB_TIME_COLUMN) == 0 || \
strcmp(X, INFLUXDB_TIME_TEXT_COLUMN) == 0)
#define INFLUXDB_IS_TIME_TYPE(typeoid) ((typeoid == TIMESTAMPTZOID) || \
(typeoid == TIMEOID) || \
(typeoid == TIMESTAMPOID))
#define CR_NO_ERROR 0

/* Define some typeArray for low version */
Expand Down
2 changes: 2 additions & 0 deletions query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,8 @@ makeRecord(influxdb::Point& record, struct InfluxDBColumnInfo* pColInfo, InfluxD
case INFLUXDB_FIELD_KEY:
record.addField(pColInfo->column_name, std::string{value.s});
break;
default:
elog(ERROR, "Unexpected column type: %d", pColInfo->column_type);
}
}
case INFLUXDB_NULL:
Expand Down
1 change: 1 addition & 0 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ typedef struct InfluxDBResult {
} InfluxDBResult;
typedef enum InfluxDBColumnType{
INFLUXDB_UNKNOWN_KEY,
INFLUXDB_TIME_KEY,
INFLUXDB_TAG_KEY,
INFLUXDB_FIELD_KEY
Expand Down
1 change: 1 addition & 0 deletions query_cxx.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ typedef struct InfluxDBResult {

/* Represents information of a table's column type */
typedef enum InfluxDBColumnType{
INFLUXDB_UNKNOWN_KEY,
INFLUXDB_TIME_KEY,
INFLUXDB_TAG_KEY,
INFLUXDB_FIELD_KEY
Expand Down
Loading

0 comments on commit da0d422

Please sign in to comment.