Skip to content

Commit 30902d4

Browse files
committed
Add framework for LOAD CSV
1 parent b819026 commit 30902d4

File tree

13 files changed

+188
-1
lines changed

13 files changed

+188
-1
lines changed

Makefile

+1
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ REGRESS = new_cypher \
108108
cypher_merge \
109109
cypher_unwind \
110110
cypher_vle \
111+
load_csv \
111112
order_by \
112113
cypher_setop \
113114
aggregation \

regress/expected/load_csv.out

Whitespace-only changes.

regress/sql/load_csv.sql

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
2+
/*
3+
* Copyright (C) 2023-2025 PostGraphDB
4+
*
5+
* This program is free software: you can redistribute it and/or modify
6+
* it under the terms of the GNU Affero General Public License as
7+
* published by the Free Software Foundation, either version 3 of the
8+
* License, or (at your option) any later version.
9+
*
10+
* This program is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
* GNU Affero General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Affero General Public License
16+
* along with this program. If not, see <https://www.gnu.org/licenses/>.
17+
*/
18+
19+
LOAD 'postgraph';
20+
21+
CREATE GRAPH load_csv;
22+
USE GRAPH load_csv;
23+
24+
25+
CYPHER LOAD CSV '~/postgraph/regress/test.csv'
26+
RETURN *;
27+
28+
DROP GRAPH load_csv;

regress/test.csv

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
booker12,9012,Rachel,Booker
2+
grey07,2070,Laura,Grey
3+
johnson81,4081,Craig,Johnson
4+
jenkins46,9346,Mary,Jenkins
5+
smith79,5079,Jamie,Smith

sql/postgraph.sql.in

+7
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,13 @@ PARALLEL SAFE
515515
AS 'MODULE_PATHNAME', 'edge_unnest';
516516

517517

518+
CREATE FUNCTION cypher_load_csv (gtype)
519+
RETURNS SETOF gtype
520+
LANGUAGE C
521+
STABLE
522+
RETURNS NULL ON NULL INPUT
523+
PARALLEL UNSAFE
524+
AS 'MODULE_PATHNAME', 'cypher_load_csv';
518525

519526
CREATE FUNCTION vle (IN gtype, IN vertex, IN vertex, IN gtype, IN gtype,
520527
IN gtype, IN gtype, IN gtype, OUT edges variable_edge)

src/backend/nodes/ag_nodes.c

+2
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ const char *node_names[] = {
4343
"cypher_delete",
4444
"cypher_unwind",
4545
"cypher_merge",
46+
"cypher_load_csv",
4647
"cypher_path",
4748
"cypher_node",
4849
"cypher_relationship",
@@ -111,6 +112,7 @@ const ExtensibleNodeMethods node_methods[] = {
111112
DEFINE_NODE_METHODS(cypher_delete),
112113
DEFINE_NODE_METHODS(cypher_unwind),
113114
DEFINE_NODE_METHODS(cypher_merge),
115+
DEFINE_NODE_METHODS(cypher_load_csv),
114116
DEFINE_NODE_METHODS(cypher_path),
115117
DEFINE_NODE_METHODS(cypher_node),
116118
DEFINE_NODE_METHODS(cypher_relationship),

src/backend/nodes/cypher_outfuncs.c

+8
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,14 @@ void out_cypher_merge(StringInfo str, const ExtensibleNode *node)
212212
WRITE_NODE_FIELD(path);
213213
}
214214

215+
// serialization function for the cypher_delete ExtensibleNode.
216+
void out_cypher_load_csv(StringInfo str, const ExtensibleNode *node)
217+
{
218+
DEFINE_AG_NODE(cypher_load_csv);
219+
220+
WRITE_STRING_FIELD(file);
221+
}
222+
215223
// serialization function for the cypher_path ExtensibleNode.
216224
void out_cypher_path(StringInfo str, const ExtensibleNode *node)
217225
{

src/backend/parser/cypher_clause.c

+59
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,8 @@ static TargetEntry *placeholder_edge(cypher_parsestate *cpstate, char *name);
153153
static TargetEntry *placeholder_traversal(cypher_parsestate *cpstate, char *name);
154154
// call
155155
static Query *transform_cypher_call(cypher_parsestate *cpstate, cypher_clause *clause);
156+
// load csv
157+
static Query *transform_cypher_load_csv(cypher_parsestate *cpstate, cypher_clause *clause);
156158
// transform
157159
#define PREV_CYPHER_CLAUSE_ALIAS "_"
158160
#define CYPHER_OPT_RIGHT_ALIAS "_R"
@@ -221,6 +223,8 @@ Query *transform_cypher_clause(cypher_parsestate *cpstate, cypher_clause *clause
221223
result = transform_cypher_sub_pattern(cpstate, clause);
222224
} else if (is_ag_node(self, cypher_unwind)) {
223225
result = transform_cypher_unwind(cpstate, clause);
226+
} else if (is_ag_node(self, cypher_load_csv)) {
227+
result = transform_cypher_load_csv(cpstate, clause);
224228
} else {
225229
ereport(ERROR, (errmsg_internal("unexpected Node for cypher_clause")));
226230
}
@@ -328,6 +332,61 @@ static Node *transform_srf_function(cypher_parsestate *cpstate, Node *n, RangeTb
328332
return NULL;
329333
}
330334

335+
static Query *transform_cypher_load_csv(cypher_parsestate *cpstate, cypher_clause *clause) {
336+
cypher_parsestate *child_cpstate = make_cypher_parsestate(cpstate);
337+
ParseState *pstate = (ParseState *) child_cpstate;
338+
cypher_load_csv *load = clause->self;
339+
TargetEntry *te;
340+
ParseNamespaceItem *pnsi;
341+
FuncCall *fc = makeFuncCall(list_make2(makeString("postgraph"), makeString("cypher_load_csv")),
342+
list_make1(makeString(load->file)),
343+
COERCE_SQL_SYNTAX,
344+
-1);
345+
346+
Query *query = makeNode(Query);
347+
query->commandType = CMD_SELECT;
348+
349+
/*if (clause->prev) {
350+
int rtindex;
351+
352+
pnsi = transform_prev_cypher_clause(child_cpstate, clause->prev, true);
353+
rtindex = list_length(pstate->p_rtable);
354+
Assert(rtindex == 1); // rte is the first RangeTblEntry in pstate
355+
query->targetList = expandNSItemAttrs(pstate, pnsi, 0, -1);
356+
}*/
357+
358+
Expr *expr = NULL;
359+
//if (!call->where)
360+
{
361+
expr = transform_cypher_expr(child_cpstate, fc, EXPR_KIND_SELECT_TARGET);
362+
//te = makeTargetEntry((Expr *) expr, (AttrNumber) pstate->p_next_resno++, call->alias, false);
363+
te = makeTargetEntry((Expr *) expr, (AttrNumber) pstate->p_next_resno++, "test", false);
364+
365+
expr = NULL;
366+
query->targetList = lappend(query->targetList, te);
367+
} /*else {
368+
pnsi = add_srf_to_query(child_cpstate, call->func, call->alias);
369+
Node *var = scanNSItemForColumn(pstate, pnsi, 0, call->alias, -1);
370+
te = makeTargetEntry((Expr *) var, (AttrNumber) pstate->p_next_resno++, call->alias, false);
371+
query->targetList = lappend(query->targetList, te);
372+
373+
transform_entity *entity = make_transform_entity(child_cpstate, ENT_FUNC_CALL, NULL, (Node *)var, call->alias);
374+
child_cpstate->entities = lappend(child_cpstate->entities, entity);
375+
376+
377+
expr = transform_cypher_expr(child_cpstate, call->where, EXPR_KIND_WHERE);
378+
}*/
379+
380+
query->rtable = pstate->p_rtable;
381+
query->jointree = makeFromExpr(pstate->p_joinlist, expr);
382+
query->hasTargetSRFs = pstate->p_hasTargetSRFs;
383+
384+
assign_query_collations(pstate, query);
385+
386+
free_cypher_parsestate(child_cpstate);
387+
return query;
388+
}
389+
331390
static Query *transform_cypher_call(cypher_parsestate *cpstate, cypher_clause *clause) {
332391
cypher_parsestate *child_cpstate = make_cypher_parsestate(cpstate);
333392
ParseState *pstate = (ParseState *) child_cpstate;

src/backend/parser/cypher_gram.y

+14-1
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,8 @@ makeSimpleCypherA_Expr(A_Expr_Kind kind, char *name,
581581
%type <node> call_stmt yield_item
582582
%type <list> yield_item_list
583583

584+
%type <node> load_stmt
585+
584586
/* common */
585587
%type <node> cypher_where_opt
586588

@@ -11803,7 +11805,8 @@ cypher_query_start:
1180311805
| CYPHER with { $$ = $2; }
1180411806
| merge
1180511807
| CYPHER call_stmt { $$ = $2; }
11806-
| return
11808+
| CYPHER load_stmt { $$ = $2; }
11809+
| return
1180711810
| unwind
1180811811
;
1180911812

@@ -11827,11 +11830,21 @@ clause:
1182711830
| delete
1182811831
| merge
1182911832
| call_stmt
11833+
| load_stmt
1183011834
| return
1183111835
| unwind
1183211836
;
1183311837

1183411838

11839+
load_stmt:
11840+
LOAD CSV Sconst
11841+
{
11842+
cypher_load_csv *n = make_ag_node(cypher_load_csv);
11843+
n->file = $3;
11844+
11845+
$$ = n;
11846+
};
11847+
1183511848
UseGraphStmt:
1183611849
USE GRAPH BareColLabel
1183711850
{

src/backend/utils/adt/gtype.c

+55
Original file line numberDiff line numberDiff line change
@@ -3084,3 +3084,58 @@ generate_series_step_gtype(PG_FUNCTION_ARGS)
30843084
/* do when there is no more left */
30853085
SRF_RETURN_DONE(funcctx);
30863086
}
3087+
3088+
typedef struct
3089+
{
3090+
3091+
} cypher_load_csv_ctx;
3092+
3093+
PG_FUNCTION_INFO_V1(cypher_load_csv);
3094+
3095+
Datum
3096+
cypher_load_csv(PG_FUNCTION_ARGS)
3097+
{
3098+
FuncCallContext *funcctx;
3099+
cypher_load_csv_ctx *fctx;
3100+
int64 result;
3101+
MemoryContext oldcontext;
3102+
3103+
/* stuff done only on the first call of the function */
3104+
if (SRF_IS_FIRSTCALL())
3105+
{
3106+
/* create a function context for cross-call persistence */
3107+
funcctx = SRF_FIRSTCALL_INIT();
3108+
3109+
/* allocate memory for user context */
3110+
fctx = (generate_series_fctx *) palloc(sizeof(cypher_load_csv_ctx));
3111+
3112+
3113+
funcctx->user_fctx = fctx;
3114+
/*
3115+
* switch to memory context appropriate for multiple function calls
3116+
*/
3117+
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
3118+
3119+
3120+
MemoryContextSwitchTo(oldcontext);
3121+
}
3122+
3123+
/* stuff done on every call of the function */
3124+
funcctx = SRF_PERCALL_SETUP();
3125+
3126+
/*
3127+
* get the saved state and use current as the result for this iteration
3128+
*/
3129+
fctx = funcctx->user_fctx;
3130+
3131+
/*if ((fctx->step > 0 && fctx->current <= fctx->finish) ||
3132+
(fctx->step < 0 && fctx->current >= fctx->finish))
3133+
{
3134+
//gtype_value gtv = { .type = AGTV_INTEGER, .val.int_value = result };
3135+
3136+
3137+
SRF_RETURN_NEXT(funcctx, GTYPE_P_GET_DATUM(gtype_value_to_gtype(&gtv)));
3138+
}*/
3139+
//else
3140+
SRF_RETURN_DONE(funcctx);
3141+
}

src/include/nodes/ag_nodes.h

+1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ typedef enum ag_node_tag
4545
cypher_delete_t,
4646
cypher_unwind_t,
4747
cypher_merge_t,
48+
cypher_load_csv_t,
4849
// pattern
4950
cypher_path_t,
5051
cypher_node_t,

src/include/nodes/cypher_nodes.h

+7
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,13 @@ typedef struct cypher_call
165165
Node *query_tree;
166166
} cypher_call;
167167

168+
typedef struct cypher_load_csv
169+
{
170+
ExtensibleNode extensible;
171+
char *file;
172+
} cypher_load_csv;
173+
174+
168175
/*
169176
* pattern
170177
*/

src/include/nodes/cypher_outfuncs.h

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ void out_cypher_set_item(StringInfo str, const ExtensibleNode *node);
3838
void out_cypher_delete(StringInfo str, const ExtensibleNode *node);
3939
void out_cypher_unwind(StringInfo str, const ExtensibleNode *node);
4040
void out_cypher_merge(StringInfo str, const ExtensibleNode *node);
41+
void out_cypher_load_csv(StringInfo str, const ExtensibleNode *node);
4142

4243
// pattern
4344
void out_cypher_path(StringInfo str, const ExtensibleNode *node);

0 commit comments

Comments
 (0)