Skip to content

RD-15410: DAS function support #25

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions python/multicorn/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,39 @@ def rollback(self):
self._init_transaction_state()


class ForeignFunction(object):
"""
Class that represents an RPC-like function call from DAS, exposed as a Postgres function.
It can call the remote function via gRPC and return the result.
"""
def execute(
self,
named_args=None,
env=None
):
"""
Execute the remote function.
"""
pass

@classmethod
def execute_static(cls, options, named_args=None, env=None):
"""
One-shot static helper to create a temporary ForeignFunction instance,
call execute(), and return the result.
"""

# TODO instances could be cached by the options they were configured with, to avoid
# recreating a new gRPC channel

ephemeral = cls(options)

return ephemeral.execute(
named_args=named_args or {},
env=env
)


"""Code from python2.7 importlib.import_module."""
"""Backport of importlib.import_module from 3.x."""
# While not critical (and in no way guaranteed!), it would be nice to keep this
Expand Down
191 changes: 191 additions & 0 deletions src/multicorn.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ PG_MODULE_MAGIC;

extern Datum multicorn_handler(PG_FUNCTION_ARGS);
extern Datum multicorn_validator(PG_FUNCTION_ARGS);
extern Datum multicorn_function_execute(PG_FUNCTION_ARGS);

Datum
foreign_function_execute(List *options_list, int nActualArgs, char **argNames, Oid *argTypes, Datum *argDatums, bool *argNulls, Oid rettype);

PG_FUNCTION_INFO_V1(multicorn_handler);
PG_FUNCTION_INFO_V1(multicorn_validator);
Expand Down Expand Up @@ -1430,3 +1433,191 @@ multicornInitializeExecState(void *internalstate)

return execstate;
}

PG_FUNCTION_INFO_V1(multicorn_function_execute);

/*
* multicorn_function_execute
*
* This function is invoked from Postgres with N arguments:
* 1) A text[] of "options".
* 2) A text[] of "parameter names".
* 3..N) The actual function parameters for the DAS function.
*
* The code then looks up the pg_proc row to confirm argument types and
* obtains the return type. Finally, it calls a helper function
* foreign_function_execute(...) which bridges to Python to do the real
* function invocation, and returns the result as a Postgres Datum.
*/
Datum
multicorn_function_execute(PG_FUNCTION_ARGS)
{
/* The total number of arguments passed to this function. */
int nargs = PG_NARGS();
/* The OID (object identifier) of this C function itself. */
Oid fn_oid = fcinfo->flinfo->fn_oid;
/* Arrays to store argument OIDs, Datums, and null flags once we retrieve them. */
Oid *argTypes = NULL;
Datum *argDatums = NULL;
bool *argNulls = NULL;
/* We will also retrieve the function’s declared return type from pg_proc. */
Oid rettype;

/*
* We expect at least two parameters: the "options" array and the
* "parameter names" array. If fewer than 2 are provided, throw an error.
*/
if (nargs < 2)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("At least two arguments (options and parameter names) must be provided")));

/*
* Parse the first argument, which is the array of "options" (text[]).
* We fetch it, Then we call untransformRelOptions(...) to get them
* as a List.
*/
ArrayType *optionsArray = PG_GETARG_ARRAYTYPE_P(0);
Datum optionsDatum = PointerGetDatum(optionsArray);
List *options_list = untransformRelOptions(optionsDatum);

/*
* Parse the second argument: parameter names (a text[]).
* We deconstruct the array into Datums and null flags, then
* build a C array of char* for them.
*/
ArrayType *paramNamesArray = PG_GETARG_ARRAYTYPE_P(1);
int nParamNames;
Datum *paramNameDatums;
bool *paramNameNulls;
/*
* deconstruct_array(...) unpacks the array into paramNameDatums[] and paramNameNulls[].
* We also get nParamNames = how many elements are in the array.
*/
deconstruct_array(paramNamesArray,
TEXTOID,
-1, false, 'i',
&paramNameDatums,
&paramNameNulls,
&nParamNames);

/*
* Build a C array of strings (argNames) from the text Datums.
* We call TextDatumGetCString(...) to convert the text datum to a C string.
*/
char **argNames = palloc(nParamNames * sizeof(char *));
for (int i = 0; i < nParamNames; i++)
{
if (paramNameNulls[i]) {
elog(ERROR, "Unexpected NULL parameter name");
} else
argNames[i] = TextDatumGetCString(paramNameDatums[i]);
}
pfree(paramNameDatums);
pfree(paramNameNulls);

/*
* The actual function arguments start from index 2.
* That is, arguments 0 and 1 are the "options" and "param names."
* The rest (2..nargs-1) are the real input parameters for the function call.
*/
int nActualArgs = nargs - 2;

/*
* Allocate arrays to store all arguments (including the first two).
* We'll store them in argDatums and argNulls.
*/
argDatums = palloc(nargs * sizeof(Datum));
argNulls = palloc(nargs * sizeof(bool));
for (int i = 0; i < nargs; i++)
{
argDatums[i] = PG_GETARG_DATUM(i);
argNulls[i] = PG_ARGISNULL(i);
}

/*
* Retrieve the function argument types from the pg_proc catalog
* for this function OID. We do a syscache lookup of PROCOID.
*/
{
HeapTuple procTup = SearchSysCache1(PROCOID, ObjectIdGetDatum(fn_oid));
if (!HeapTupleIsValid(procTup))
ereport(ERROR, (errmsg("cache lookup failed for function %u", fn_oid)));

/*
* Compare the pronargs field from pg_proc to our runtime PG_NARGS().
* If they differ, there's a mismatch. Typically they should match.
*/
Form_pg_proc procStruct = (Form_pg_proc) GETSTRUCT(procTup);
if (procStruct->pronargs != nargs)
ereport(ERROR,
(errmsg("argument mismatch: PG_NARGS() (%d) != pronargs (%d)",
nargs, procStruct->pronargs)));

bool isNull;
Datum proargtypes = SysCacheGetAttr(PROCOID, procTup,
Anum_pg_proc_proargtypes,
&isNull);
if (isNull)
ereport(ERROR, (errmsg("proargtypes is null")));

/*
* proargtypes is an OID array listing each argument's type.
* We confirm it has the correct shape (1D array, correct length).
*/
{
ArrayType *arr = (ArrayType *) DatumGetPointer(proargtypes);
if (ARR_HASNULL(arr) ||
ARR_NDIM(arr) != 1 ||
ARR_DIMS(arr)[0] != nargs ||
ARR_ELEMTYPE(arr) != OIDOID)
ereport(ERROR, (errmsg("unexpected proargtypes array layout")));

/*
* Copy out the actual OIDs into argTypes[].
*/
argTypes = palloc(nargs * sizeof(Oid));
memcpy(argTypes, ARR_DATA_PTR(arr), nargs * sizeof(Oid));
}

/*
* Retrieve the function's declared return type from prorettype.
*/
rettype = procStruct->prorettype;

ReleaseSysCache(procTup);
}

/*
* Now call a bridging function (foreign_function_execute) in C that delegates
* to Python. We pass:
* - options_list (from argument 0)
* - nActualArgs and argNames for the second argument
* - the "actual" arguments (the function’s real parameters) from index 2..(nargs-1)
* along with their types (argTypes[2..]) and null flags (argNulls[2..]).
* - The function’s declared rettype.
*
* The bridging function returns a Datum we can hand back to Postgres.
*
* Note: we do an assertion that nParamNames == nActualArgs, meaning the second
* argument's array length matches the number of "actual" parameters.
*/
assert(nParamNames == nActualArgs);
Datum result = foreign_function_execute(options_list,
nActualArgs,
argNames,
&argTypes[2],
&argDatums[2],
&argNulls[2],
rettype);

/* Cleanup allocated memory for argNames and the arrays we allocated. */
for (int i = 0; i < nParamNames; i++)
pfree(argNames[i]);
pfree(argNames);
pfree(argDatums);
pfree(argNulls);
pfree(argTypes);

return result;
}
Loading