Skip to content

Commit baff6f4

Browse files
committed
RD-15410: DAS function support
1 parent d546cc8 commit baff6f4

File tree

3 files changed

+404
-59
lines changed

3 files changed

+404
-59
lines changed

Diff for: python/multicorn/__init__.py

+33
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,39 @@ def rollback(self):
512512
self._init_transaction_state()
513513

514514

515+
class ForeignFunction(object):
516+
"""
517+
Class that represents an RPC-like function call from DAS, exposed as a Postgres function.
518+
It can call the remote function via gRPC and return the result.
519+
"""
520+
def execute(
521+
self,
522+
named_args=None,
523+
env=None
524+
):
525+
"""
526+
Execute the remote function.
527+
"""
528+
pass
529+
530+
@classmethod
531+
def execute_static(cls, options, named_args=None, env=None):
532+
"""
533+
One-shot static helper to create a temporary ForeignFunction instance,
534+
call execute(), and return the result.
535+
"""
536+
537+
# TODO instances could be cached by the options they were configured with, to avoid
538+
# recreating a new gRPC channel
539+
540+
ephemeral = cls(options)
541+
542+
return ephemeral.execute(
543+
named_args=named_args or {},
544+
env=env
545+
)
546+
547+
515548
"""Code from python2.7 importlib.import_module."""
516549
"""Backport of importlib.import_module from 3.x."""
517550
# While not critical (and in no way guaranteed!), it would be nice to keep this

Diff for: src/multicorn.c

+191
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@ PG_MODULE_MAGIC;
4141

4242
extern Datum multicorn_handler(PG_FUNCTION_ARGS);
4343
extern Datum multicorn_validator(PG_FUNCTION_ARGS);
44+
extern Datum multicorn_function_execute(PG_FUNCTION_ARGS);
4445

46+
Datum
47+
foreign_function_execute(List *options_list, int nActualArgs, char **argNames, Oid *argTypes, Datum *argDatums, bool *argNulls, Oid rettype);
4548

4649
PG_FUNCTION_INFO_V1(multicorn_handler);
4750
PG_FUNCTION_INFO_V1(multicorn_validator);
@@ -1430,3 +1433,191 @@ multicornInitializeExecState(void *internalstate)
14301433

14311434
return execstate;
14321435
}
1436+
1437+
PG_FUNCTION_INFO_V1(multicorn_function_execute);
1438+
1439+
/*
1440+
* multicorn_function_execute
1441+
*
1442+
* This function is invoked from Postgres with N arguments:
1443+
* 1) A text[] of "options".
1444+
* 2) A text[] of "parameter names".
1445+
* 3..N) The actual function parameters for the DAS function.
1446+
*
1447+
* The code then looks up the pg_proc row to confirm argument types and
1448+
* obtains the return type. Finally, it calls a helper function
1449+
* foreign_function_execute(...) which bridges to Python to do the real
1450+
* function invocation, and returns the result as a Postgres Datum.
1451+
*/
1452+
Datum
1453+
multicorn_function_execute(PG_FUNCTION_ARGS)
1454+
{
1455+
/* The total number of arguments passed to this function. */
1456+
int nargs = PG_NARGS();
1457+
/* The OID (object identifier) of this C function itself. */
1458+
Oid fn_oid = fcinfo->flinfo->fn_oid;
1459+
/* Arrays to store argument OIDs, Datums, and null flags once we retrieve them. */
1460+
Oid *argTypes = NULL;
1461+
Datum *argDatums = NULL;
1462+
bool *argNulls = NULL;
1463+
/* We will also retrieve the function’s declared return type from pg_proc. */
1464+
Oid rettype;
1465+
1466+
/*
1467+
* We expect at least two parameters: the "options" array and the
1468+
* "parameter names" array. If fewer than 2 are provided, throw an error.
1469+
*/
1470+
if (nargs < 2)
1471+
ereport(ERROR,
1472+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1473+
errmsg("At least two arguments (options and parameter names) must be provided")));
1474+
1475+
/*
1476+
* Parse the first argument, which is the array of "options" (text[]).
1477+
* We fetch it, Then we call untransformRelOptions(...) to get them
1478+
* as a List.
1479+
*/
1480+
ArrayType *optionsArray = PG_GETARG_ARRAYTYPE_P(0);
1481+
Datum optionsDatum = PointerGetDatum(optionsArray);
1482+
List *options_list = untransformRelOptions(optionsDatum);
1483+
1484+
/*
1485+
* Parse the second argument: parameter names (a text[]).
1486+
* We deconstruct the array into Datums and null flags, then
1487+
* build a C array of char* for them.
1488+
*/
1489+
ArrayType *paramNamesArray = PG_GETARG_ARRAYTYPE_P(1);
1490+
int nParamNames;
1491+
Datum *paramNameDatums;
1492+
bool *paramNameNulls;
1493+
/*
1494+
* deconstruct_array(...) unpacks the array into paramNameDatums[] and paramNameNulls[].
1495+
* We also get nParamNames = how many elements are in the array.
1496+
*/
1497+
deconstruct_array(paramNamesArray,
1498+
TEXTOID,
1499+
-1, false, 'i',
1500+
&paramNameDatums,
1501+
&paramNameNulls,
1502+
&nParamNames);
1503+
1504+
/*
1505+
* Build a C array of strings (argNames) from the text Datums.
1506+
* We call TextDatumGetCString(...) to convert the text datum to a C string.
1507+
*/
1508+
char **argNames = palloc(nParamNames * sizeof(char *));
1509+
for (int i = 0; i < nParamNames; i++)
1510+
{
1511+
if (paramNameNulls[i]) {
1512+
elog(ERROR, "Unexpected NULL parameter name");
1513+
} else
1514+
argNames[i] = TextDatumGetCString(paramNameDatums[i]);
1515+
}
1516+
pfree(paramNameDatums);
1517+
pfree(paramNameNulls);
1518+
1519+
/*
1520+
* The actual function arguments start from index 2.
1521+
* That is, arguments 0 and 1 are the "options" and "param names."
1522+
* The rest (2..nargs-1) are the real input parameters for the function call.
1523+
*/
1524+
int nActualArgs = nargs - 2;
1525+
1526+
/*
1527+
* Allocate arrays to store all arguments (including the first two).
1528+
* We'll store them in argDatums and argNulls.
1529+
*/
1530+
argDatums = palloc(nargs * sizeof(Datum));
1531+
argNulls = palloc(nargs * sizeof(bool));
1532+
for (int i = 0; i < nargs; i++)
1533+
{
1534+
argDatums[i] = PG_GETARG_DATUM(i);
1535+
argNulls[i] = PG_ARGISNULL(i);
1536+
}
1537+
1538+
/*
1539+
* Retrieve the function argument types from the pg_proc catalog
1540+
* for this function OID. We do a syscache lookup of PROCOID.
1541+
*/
1542+
{
1543+
HeapTuple procTup = SearchSysCache1(PROCOID, ObjectIdGetDatum(fn_oid));
1544+
if (!HeapTupleIsValid(procTup))
1545+
ereport(ERROR, (errmsg("cache lookup failed for function %u", fn_oid)));
1546+
1547+
/*
1548+
* Compare the pronargs field from pg_proc to our runtime PG_NARGS().
1549+
* If they differ, there's a mismatch. Typically they should match.
1550+
*/
1551+
Form_pg_proc procStruct = (Form_pg_proc) GETSTRUCT(procTup);
1552+
if (procStruct->pronargs != nargs)
1553+
ereport(ERROR,
1554+
(errmsg("argument mismatch: PG_NARGS() (%d) != pronargs (%d)",
1555+
nargs, procStruct->pronargs)));
1556+
1557+
bool isNull;
1558+
Datum proargtypes = SysCacheGetAttr(PROCOID, procTup,
1559+
Anum_pg_proc_proargtypes,
1560+
&isNull);
1561+
if (isNull)
1562+
ereport(ERROR, (errmsg("proargtypes is null")));
1563+
1564+
/*
1565+
* proargtypes is an OID array listing each argument's type.
1566+
* We confirm it has the correct shape (1D array, correct length).
1567+
*/
1568+
{
1569+
ArrayType *arr = (ArrayType *) DatumGetPointer(proargtypes);
1570+
if (ARR_HASNULL(arr) ||
1571+
ARR_NDIM(arr) != 1 ||
1572+
ARR_DIMS(arr)[0] != nargs ||
1573+
ARR_ELEMTYPE(arr) != OIDOID)
1574+
ereport(ERROR, (errmsg("unexpected proargtypes array layout")));
1575+
1576+
/*
1577+
* Copy out the actual OIDs into argTypes[].
1578+
*/
1579+
argTypes = palloc(nargs * sizeof(Oid));
1580+
memcpy(argTypes, ARR_DATA_PTR(arr), nargs * sizeof(Oid));
1581+
}
1582+
1583+
/*
1584+
* Retrieve the function's declared return type from prorettype.
1585+
*/
1586+
rettype = procStruct->prorettype;
1587+
1588+
ReleaseSysCache(procTup);
1589+
}
1590+
1591+
/*
1592+
* Now call a bridging function (foreign_function_execute) in C that delegates
1593+
* to Python. We pass:
1594+
* - options_list (from argument 0)
1595+
* - nActualArgs and argNames for the second argument
1596+
* - the "actual" arguments (the function’s real parameters) from index 2..(nargs-1)
1597+
* along with their types (argTypes[2..]) and null flags (argNulls[2..]).
1598+
* - The function’s declared rettype.
1599+
*
1600+
* The bridging function returns a Datum we can hand back to Postgres.
1601+
*
1602+
* Note: we do an assertion that nParamNames == nActualArgs, meaning the second
1603+
* argument's array length matches the number of "actual" parameters.
1604+
*/
1605+
assert(nParamNames == nActualArgs);
1606+
Datum result = foreign_function_execute(options_list,
1607+
nActualArgs,
1608+
argNames,
1609+
&argTypes[2],
1610+
&argDatums[2],
1611+
&argNulls[2],
1612+
rettype);
1613+
1614+
/* Cleanup allocated memory for argNames and the arrays we allocated. */
1615+
for (int i = 0; i < nParamNames; i++)
1616+
pfree(argNames[i]);
1617+
pfree(argNames);
1618+
pfree(argDatums);
1619+
pfree(argNulls);
1620+
pfree(argTypes);
1621+
1622+
return result;
1623+
}

0 commit comments

Comments
 (0)