diff --git a/python/multicorn/__init__.py b/python/multicorn/__init__.py index a7037d8..16c3b6c 100755 --- a/python/multicorn/__init__.py +++ b/python/multicorn/__init__.py @@ -512,6 +512,39 @@ def rollback(self): self._init_transaction_state() +class ForeignFunction(object): + """ + Class that represents an RPC-like function call from DAS, exposed as a Postgres function. + It can call the remote function via gRPC and return the result. + """ + def execute( + self, + named_args=None, + env=None + ): + """ + Execute the remote function. + """ + pass + + @classmethod + def execute_static(cls, options, named_args=None, env=None): + """ + One-shot static helper to create a temporary ForeignFunction instance, + call execute(), and return the result. + """ + + # TODO instances could be cached by the options they were configured with, to avoid + # recreating a new gRPC channel + + ephemeral = cls(options) + + return ephemeral.execute( + named_args=named_args or {}, + env=env + ) + + """Code from python2.7 importlib.import_module.""" """Backport of importlib.import_module from 3.x.""" # While not critical (and in no way guaranteed!), it would be nice to keep this diff --git a/src/multicorn.c b/src/multicorn.c index d5b6d15..c6a582e 100644 --- a/src/multicorn.c +++ b/src/multicorn.c @@ -41,7 +41,10 @@ PG_MODULE_MAGIC; extern Datum multicorn_handler(PG_FUNCTION_ARGS); extern Datum multicorn_validator(PG_FUNCTION_ARGS); +extern Datum multicorn_function_execute(PG_FUNCTION_ARGS); +Datum +foreign_function_execute(List *options_list, int nActualArgs, char **argNames, Oid *argTypes, Datum *argDatums, bool *argNulls, Oid rettype); PG_FUNCTION_INFO_V1(multicorn_handler); PG_FUNCTION_INFO_V1(multicorn_validator); @@ -1430,3 +1433,191 @@ multicornInitializeExecState(void *internalstate) return execstate; } + +PG_FUNCTION_INFO_V1(multicorn_function_execute); + +/* + * multicorn_function_execute + * + * This function is invoked from Postgres with N arguments: + * 1) A text[] of "options". + * 2) A text[] of "parameter names". + * 3..N) The actual function parameters for the DAS function. + * + * The code then looks up the pg_proc row to confirm argument types and + * obtains the return type. Finally, it calls a helper function + * foreign_function_execute(...) which bridges to Python to do the real + * function invocation, and returns the result as a Postgres Datum. + */ +Datum +multicorn_function_execute(PG_FUNCTION_ARGS) +{ + /* The total number of arguments passed to this function. */ + int nargs = PG_NARGS(); + /* The OID (object identifier) of this C function itself. */ + Oid fn_oid = fcinfo->flinfo->fn_oid; + /* Arrays to store argument OIDs, Datums, and null flags once we retrieve them. */ + Oid *argTypes = NULL; + Datum *argDatums = NULL; + bool *argNulls = NULL; + /* We will also retrieve the function’s declared return type from pg_proc. */ + Oid rettype; + + /* + * We expect at least two parameters: the "options" array and the + * "parameter names" array. If fewer than 2 are provided, throw an error. + */ + if (nargs < 2) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("At least two arguments (options and parameter names) must be provided"))); + + /* + * Parse the first argument, which is the array of "options" (text[]). + * We fetch it, Then we call untransformRelOptions(...) to get them + * as a List. + */ + ArrayType *optionsArray = PG_GETARG_ARRAYTYPE_P(0); + Datum optionsDatum = PointerGetDatum(optionsArray); + List *options_list = untransformRelOptions(optionsDatum); + + /* + * Parse the second argument: parameter names (a text[]). + * We deconstruct the array into Datums and null flags, then + * build a C array of char* for them. + */ + ArrayType *paramNamesArray = PG_GETARG_ARRAYTYPE_P(1); + int nParamNames; + Datum *paramNameDatums; + bool *paramNameNulls; + /* + * deconstruct_array(...) unpacks the array into paramNameDatums[] and paramNameNulls[]. + * We also get nParamNames = how many elements are in the array. + */ + deconstruct_array(paramNamesArray, + TEXTOID, + -1, false, 'i', + ¶mNameDatums, + ¶mNameNulls, + &nParamNames); + + /* + * Build a C array of strings (argNames) from the text Datums. + * We call TextDatumGetCString(...) to convert the text datum to a C string. + */ + char **argNames = palloc(nParamNames * sizeof(char *)); + for (int i = 0; i < nParamNames; i++) + { + if (paramNameNulls[i]) { + elog(ERROR, "Unexpected NULL parameter name"); + } else + argNames[i] = TextDatumGetCString(paramNameDatums[i]); + } + pfree(paramNameDatums); + pfree(paramNameNulls); + + /* + * The actual function arguments start from index 2. + * That is, arguments 0 and 1 are the "options" and "param names." + * The rest (2..nargs-1) are the real input parameters for the function call. + */ + int nActualArgs = nargs - 2; + + /* + * Allocate arrays to store all arguments (including the first two). + * We'll store them in argDatums and argNulls. + */ + argDatums = palloc(nargs * sizeof(Datum)); + argNulls = palloc(nargs * sizeof(bool)); + for (int i = 0; i < nargs; i++) + { + argDatums[i] = PG_GETARG_DATUM(i); + argNulls[i] = PG_ARGISNULL(i); + } + + /* + * Retrieve the function argument types from the pg_proc catalog + * for this function OID. We do a syscache lookup of PROCOID. + */ + { + HeapTuple procTup = SearchSysCache1(PROCOID, ObjectIdGetDatum(fn_oid)); + if (!HeapTupleIsValid(procTup)) + ereport(ERROR, (errmsg("cache lookup failed for function %u", fn_oid))); + + /* + * Compare the pronargs field from pg_proc to our runtime PG_NARGS(). + * If they differ, there's a mismatch. Typically they should match. + */ + Form_pg_proc procStruct = (Form_pg_proc) GETSTRUCT(procTup); + if (procStruct->pronargs != nargs) + ereport(ERROR, + (errmsg("argument mismatch: PG_NARGS() (%d) != pronargs (%d)", + nargs, procStruct->pronargs))); + + bool isNull; + Datum proargtypes = SysCacheGetAttr(PROCOID, procTup, + Anum_pg_proc_proargtypes, + &isNull); + if (isNull) + ereport(ERROR, (errmsg("proargtypes is null"))); + + /* + * proargtypes is an OID array listing each argument's type. + * We confirm it has the correct shape (1D array, correct length). + */ + { + ArrayType *arr = (ArrayType *) DatumGetPointer(proargtypes); + if (ARR_HASNULL(arr) || + ARR_NDIM(arr) != 1 || + ARR_DIMS(arr)[0] != nargs || + ARR_ELEMTYPE(arr) != OIDOID) + ereport(ERROR, (errmsg("unexpected proargtypes array layout"))); + + /* + * Copy out the actual OIDs into argTypes[]. + */ + argTypes = palloc(nargs * sizeof(Oid)); + memcpy(argTypes, ARR_DATA_PTR(arr), nargs * sizeof(Oid)); + } + + /* + * Retrieve the function's declared return type from prorettype. + */ + rettype = procStruct->prorettype; + + ReleaseSysCache(procTup); + } + + /* + * Now call a bridging function (foreign_function_execute) in C that delegates + * to Python. We pass: + * - options_list (from argument 0) + * - nActualArgs and argNames for the second argument + * - the "actual" arguments (the function’s real parameters) from index 2..(nargs-1) + * along with their types (argTypes[2..]) and null flags (argNulls[2..]). + * - The function’s declared rettype. + * + * The bridging function returns a Datum we can hand back to Postgres. + * + * Note: we do an assertion that nParamNames == nActualArgs, meaning the second + * argument's array length matches the number of "actual" parameters. + */ + assert(nParamNames == nActualArgs); + Datum result = foreign_function_execute(options_list, + nActualArgs, + argNames, + &argTypes[2], + &argDatums[2], + &argNulls[2], + rettype); + + /* Cleanup allocated memory for argNames and the arrays we allocated. */ + for (int i = 0; i < nParamNames; i++) + pfree(argNames[i]); + pfree(argNames); + pfree(argDatums); + pfree(argNulls); + pfree(argTypes); + + return result; +} diff --git a/src/python.c b/src/python.c index 46db552..edede1e 100644 --- a/src/python.c +++ b/src/python.c @@ -1509,65 +1509,7 @@ pyobjectToCString(PyObject *pyobject, StringInfo buffer, } if (cinfo->atttypoid == INTERVALOID) { elog(DEBUG1, "Importing Python interval (OID=%d)", cinfo->atttypoid); - // It's a dictionary. All its values are integers. - PyObject *years = PyDict_GetItemString(pyobject, "years"); - PyObject *months = PyDict_GetItemString(pyobject, "months"); - PyObject *days = PyDict_GetItemString(pyobject, "days"); - PyObject *hours = PyDict_GetItemString(pyobject, "hours"); - PyObject *minutes = PyDict_GetItemString(pyobject, "minutes"); - PyObject *seconds = PyDict_GetItemString(pyobject, "seconds"); - PyObject *micros = PyDict_GetItemString(pyobject, "micros"); - appendBinaryStringInfo(buffer, "P", 1); - pynumberToCString(years, buffer, cinfo); - appendBinaryStringInfo(buffer, "Y", 1); - pynumberToCString(months, buffer, cinfo); - appendBinaryStringInfo(buffer, "M", 1); - pynumberToCString(days, buffer, cinfo); - appendBinaryStringInfo(buffer, "DT", 2); // D + T to separate time - pynumberToCString(hours, buffer, cinfo); - appendBinaryStringInfo(buffer, "H", 1); - pynumberToCString(minutes, buffer, cinfo); - appendBinaryStringInfo(buffer, "M", 1); - - // Combine seconds and micros into a single value - if (seconds || micros) { - char seconds_str[64] = {0}; - double total_seconds = 0.0; - - // Convert seconds and micros to numeric values - if (seconds) { - total_seconds += PyFloat_AsDouble(seconds); - } - if (micros) { - total_seconds += PyFloat_AsDouble(micros) / 1e6; - } - - // Format the combined seconds value as a string - snprintf(seconds_str, sizeof(seconds_str), "%.6f", total_seconds); - - // Remove trailing zeros from fractional part for compact representation - char *dot = strchr(seconds_str, '.'); - if (dot) { - char *end = seconds_str + strlen(seconds_str) - 1; - while (end > dot && *end == '0') { - *end-- = '\0'; - } - if (end == dot) { - *end = '\0'; // Remove the dot if no fractional part remains - } - } - - appendBinaryStringInfo(buffer, seconds_str, strlen(seconds_str)); - appendBinaryStringInfo(buffer, "S", 1); // Append the 'S' suffix - } - - Py_DECREF(years); - Py_DECREF(months); - Py_DECREF(days); - Py_DECREF(hours); - Py_DECREF(minutes); - Py_DECREF(seconds); - Py_DECREF(micros); + pyunicodeToCString(pyobject, buffer, cinfo); return; } if (PyNumber_Check(pyobject)) @@ -2340,3 +2282,182 @@ int getModifyBatchSize(PyObject *fdw_instance) return result; } + +/* + * This function constructs a dummy AttInMetadata structure for a given return type. + * It creates a tuple descriptor with a single attribute and initializes it with the + * specified return type. If the return type is an array, it adjusts the dimension + * count accordingly. + */ +AttInMetadata * +build_dummy_attinmeta(Oid retType) +{ + TupleDesc tupDesc; + AttInMetadata *attinmeta; + int attndims = 0; + Oid elemType = get_element_type(retType); + + /* If retType is an array type, set attndims to 1 */ + if (OidIsValid(elemType)) { + attndims = 1; + } + + /* Create a tuple descriptor with 1 column */ + tupDesc = CreateTemplateTupleDesc(1); + + /* + * Initialize the single attribute. + * Name: "dummy" + * Type: retType + * typmod: -1 (unknown) + * attndims: set as above (0 for scalar, 1 for array) + */ + TupleDescInitEntry(tupDesc, + (AttrNumber) 1, + "dummy", + retType, + -1, + attndims); + + /* Get the conversion info from the tuple descriptor */ + attinmeta = TupleDescGetAttInMetadata(tupDesc); + + return attinmeta; +} + + +/* + * This function executes the Python function defined in multicorn to execute functions and returns + * the result as a Datum. It prepares the options and arguments as Python dictionaries, + * calls the Python function, and converts the result back to a PostgreSQL Datum. + */ +Datum +foreign_function_execute(List *options_list, int nArgs, char **argNames, Oid *argTypes, Datum *argDatums, bool *argNulls, Oid retType) +{ + /* 1) Build a Python dictionary from the FDW-like options list. */ + PyObject *option_dict = optionsListToPyDict(options_list); + errorCheck(); /* check for Python errors */ + + /* 2) Build a Python dictionary for the function arguments, keyed by name. */ + PyObject *py_argdict = PyDict_New(); + if (!py_argdict) + { + Py_DECREF(option_dict); + elog(ERROR, "Failed to create Python dictionary for function arguments"); + } + for (int i = 0; i < nArgs; i++) + { + PyObject *pyVal = NULL; + if (argNulls[i]) + { + Py_INCREF(Py_None); + pyVal = Py_None; + } + else + { + ConversionInfo localCinfo; + memset(&localCinfo, 0, sizeof(localCinfo)); + localCinfo.atttypoid = argTypes[i]; + pyVal = datumToPython(argDatums[i], argTypes[i], &localCinfo); + } + if (!pyVal) + { + Py_DECREF(py_argdict); + Py_DECREF(option_dict); + elog(ERROR, "Failed to convert argument %d to Python object", i); + } + if (PyDict_SetItemString(py_argdict, argNames[i], pyVal) != 0) + { + Py_DECREF(pyVal); + Py_DECREF(py_argdict); + Py_DECREF(option_dict); + elog(ERROR, "Failed to set argument '%s' in Python dictionary", argNames[i]); + } + Py_DECREF(pyVal); /* PyDict_SetItemString increases reference count */ + } + errorCheck(); + + /* Check if the "wrapper" key exists in the options dictionary */ + PyObject *p_wrapper = PyDict_GetItemString(option_dict, "wrapper"); + if (p_wrapper == NULL) + { + Py_DECREF(py_argdict); + Py_DECREF(option_dict); + elog(ERROR, "Missing 'wrapper' key in options"); + } + + /* Get the class from the wrapper value (which is a borrowed reference) */ + PyObject *p_class = getClass(p_wrapper); + if (p_class == NULL || !PyCallable_Check(p_class)) + { + Py_XDECREF(p_class); + Py_DECREF(py_argdict); + Py_DECREF(option_dict); + elog(ERROR, "Could not get callable wrapper class from 'wrapper' option"); + } + + /* Remove the "wrapper" key from the dictionary; check for errors */ + if (PyDict_DelItemString(option_dict, "wrapper") < 0) + { + Py_DECREF(p_class); + Py_DECREF(py_argdict); + Py_DECREF(option_dict); + elog(ERROR, "Failed to remove 'wrapper' key from options"); + } + + /* 5) Then get the class method "execute_static". */ + PyObject *p_func = PyObject_GetAttrString(p_class, "execute_static"); + Py_DECREF(p_class); + if (!p_func || !PyCallable_Check(p_func)) + { + Py_XDECREF(p_func); + Py_DECREF(py_argdict); + Py_DECREF(option_dict); + elog(ERROR, "No callable 'execute_static' found in DASFunction"); + } + + PyObject* p_env = get_environment_dict(); + if (!p_env) { + // The environment wasn't collected (an error), send None. + p_env = Py_None; + } + + /* 6) Build final arguments for p_func. + * Assume execute_static expects two parameters: + * - a dict of options, + * - a dict of arguments keyed by name. + */ + PyObject *call_args = PyTuple_New(3); + PyTuple_SetItem(call_args, 0, option_dict); /* tuple takes ownership */ + PyTuple_SetItem(call_args, 1, py_argdict); /* tuple takes ownership */ + PyTuple_SetItem(call_args, 2, p_env); /* tuple takes ownership */ + + /* 7) Call the Python method. */ + PyObject *p_result = PyObject_CallObject(p_func, call_args); + Py_DECREF(call_args); + Py_DECREF(p_func); + errorCheck(); + + if (!p_result || p_result == Py_None) + { + Py_XDECREF(p_result); + elog(ERROR, "execute_static returned non-string or None"); + } + + /* 8) Convert the Python result to a Postgres Datum using pyobjectToDatum. */ + StringInfo buffer = makeStringInfo(); + ConversionInfo* retCinfo; + + /* Retrieve the type’s output function and related info */ + AttInMetadata* meta = build_dummy_attinmeta(retType); + initConversioninfo(&retCinfo, meta); + + /* Optionally set other retCinfo fields if needed */ + Datum resultDatum = pyobjectToDatum(p_result, buffer, retCinfo); + Py_DECREF(p_result); + pfree(buffer->data); + pfree(buffer); + /* Return the converted Datum. */ + return resultDatum; + +}