diff --git a/plugin/trino-functions-python/pom.xml b/plugin/trino-functions-python/pom.xml
new file mode 100644
index 000000000000..2b3ad56f4ba0
--- /dev/null
+++ b/plugin/trino-functions-python/pom.xml
@@ -0,0 +1,164 @@
+
+
+ 4.0.0
+
+ io.trino
+ trino-root
+ 468-SNAPSHOT
+ ../../pom.xml
+
+
+ trino-functions-python
+ trino-plugin
+ Trino - Python language functions
+
+
+
+
+ com.dylibso.chicory
+ bom
+ 1.0.0-M2
+ pom
+ import
+
+
+
+
+
+
+ com.dylibso.chicory
+ log
+
+
+
+ com.dylibso.chicory
+ runtime
+
+
+
+ com.dylibso.chicory
+ wasi
+
+
+
+ com.dylibso.chicory
+ wasm
+
+
+
+ com.google.guava
+ guava
+
+
+
+ com.google.jimfs
+ jimfs
+ 1.3.0
+
+
+
+ io.airlift
+ log
+
+
+
+ io.airlift
+ units
+
+
+
+ io.trino
+ trino-plugin-toolkit
+
+
+
+ io.trino
+ trino-wasm-python
+ 3.13-1
+
+
+
+ joda-time
+ joda-time
+
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ provided
+
+
+
+ io.airlift
+ slice
+ provided
+
+
+
+ io.opentelemetry
+ opentelemetry-api
+ provided
+
+
+
+ io.opentelemetry
+ opentelemetry-context
+ provided
+
+
+
+ io.trino
+ trino-spi
+ provided
+
+
+
+ io.airlift
+ junit-extensions
+ test
+
+
+
+ io.trino
+ trino-main
+ test
+
+
+
+ io.trino
+ trino-main
+ test-jar
+ test
+
+
+
+ io.trino
+ trino-testing
+ test
+
+
+
+ io.trino
+ trino-tpch
+ test
+
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ test
+
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ test
+
+
+
diff --git a/plugin/trino-functions-python/src/main/java/io/trino/plugin/functions/python/JdkLogger.java b/plugin/trino-functions-python/src/main/java/io/trino/plugin/functions/python/JdkLogger.java
new file mode 100644
index 000000000000..f76c839763e3
--- /dev/null
+++ b/plugin/trino-functions-python/src/main/java/io/trino/plugin/functions/python/JdkLogger.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.functions.python;
+
+import com.dylibso.chicory.log.Logger;
+
+import static java.util.Objects.requireNonNull;
+
+final class JdkLogger
+ implements Logger
+{
+ private final java.util.logging.Logger logger;
+
+ public static Logger get(Class> clazz)
+ {
+ return new JdkLogger(java.util.logging.Logger.getLogger(clazz.getName()));
+ }
+
+ public JdkLogger(java.util.logging.Logger logger)
+ {
+ this.logger = requireNonNull(logger, "logger is null");
+ }
+
+ @Override
+ public void log(Level level, String msg, Throwable throwable)
+ {
+ logger.log(toJdkLevel(level), msg, throwable);
+ }
+
+ @Override
+ public boolean isLoggable(Level level)
+ {
+ return logger.isLoggable(toJdkLevel(level));
+ }
+
+ private static java.util.logging.Level toJdkLevel(Level level)
+ {
+ return switch (level) {
+ case ALL -> java.util.logging.Level.ALL;
+ case TRACE -> java.util.logging.Level.FINEST;
+ case DEBUG -> java.util.logging.Level.FINE;
+ case INFO -> java.util.logging.Level.INFO;
+ case WARNING -> java.util.logging.Level.WARNING;
+ case ERROR -> java.util.logging.Level.SEVERE;
+ case OFF -> java.util.logging.Level.OFF;
+ };
+ }
+}
diff --git a/plugin/trino-functions-python/src/main/java/io/trino/plugin/functions/python/LoggingOutputStream.java b/plugin/trino-functions-python/src/main/java/io/trino/plugin/functions/python/LoggingOutputStream.java
new file mode 100644
index 000000000000..62b8be7a6559
--- /dev/null
+++ b/plugin/trino-functions-python/src/main/java/io/trino/plugin/functions/python/LoggingOutputStream.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.functions.python;
+
+import io.airlift.log.Logger;
+
+import java.io.ByteArrayOutputStream;
+
+import static com.google.common.base.CharMatcher.javaIsoControl;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.requireNonNull;
+
+@SuppressWarnings("UnsynchronizedOverridesSynchronized")
+final class LoggingOutputStream
+ extends ByteArrayOutputStream
+{
+ private final Logger logger;
+
+ public LoggingOutputStream(Logger logger)
+ {
+ this.logger = requireNonNull(logger, "logger is null");
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len)
+ {
+ if (logger.isDebugEnabled()) {
+ super.write(b, off, len);
+ flush();
+ }
+ }
+
+ @Override
+ public void flush()
+ {
+ if (count > 4096) {
+ log(toString(UTF_8));
+ reset();
+ return;
+ }
+
+ int index;
+ for (index = count - 1; index >= 0; index--) {
+ if (buf[index] == '\n') {
+ break;
+ }
+ }
+ if (index == -1) {
+ return;
+ }
+
+ String data = new String(buf, 0, index, UTF_8);
+ data.lines().forEach(this::log);
+
+ int remaining = count - index - 1;
+ System.arraycopy(buf, index + 1, buf, 0, remaining);
+ count = remaining;
+ }
+
+ @Override
+ public void close()
+ {
+ log(toString(UTF_8));
+ reset();
+ }
+
+ private void log(String message)
+ {
+ String value = javaIsoControl().removeFrom(message).strip();
+ if (!value.isEmpty()) {
+ logger.debug(value);
+ }
+ }
+}
diff --git a/plugin/trino-functions-python/src/main/java/io/trino/plugin/functions/python/PythonEngine.java b/plugin/trino-functions-python/src/main/java/io/trino/plugin/functions/python/PythonEngine.java
new file mode 100644
index 000000000000..a53c7dbad74b
--- /dev/null
+++ b/plugin/trino-functions-python/src/main/java/io/trino/plugin/functions/python/PythonEngine.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.functions.python;
+
+import com.dylibso.chicory.runtime.ExportFunction;
+import com.dylibso.chicory.runtime.HostFunction;
+import com.dylibso.chicory.runtime.ImportValues;
+import com.dylibso.chicory.runtime.Instance;
+import com.dylibso.chicory.runtime.Memory;
+import com.dylibso.chicory.wasi.WasiOptions;
+import com.dylibso.chicory.wasi.WasiPreview1;
+import com.dylibso.chicory.wasm.ChicoryException;
+import com.dylibso.chicory.wasm.WasmModule;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Closer;
+import com.google.common.jimfs.Configuration;
+import com.google.common.jimfs.Jimfs;
+import io.airlift.log.Logger;
+import io.airlift.slice.BasicSliceInput;
+import io.airlift.slice.Slice;
+import io.airlift.slice.SliceInput;
+import io.airlift.slice.Slices;
+import io.airlift.units.DataSize;
+import io.trino.spi.ErrorCodeSupplier;
+import io.trino.spi.StandardErrorCode;
+import io.trino.spi.TrinoException;
+import io.trino.spi.type.Type;
+import io.trino.wasm.python.PythonModule;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.FileSystem;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static com.dylibso.chicory.wasm.types.ValueType.I32;
+import static com.google.common.collect.ImmutableMap.toImmutableMap;
+import static io.airlift.units.DataSize.Unit.MEGABYTE;
+import static io.trino.plugin.functions.python.TrinoTypes.binaryToJava;
+import static io.trino.plugin.functions.python.TrinoTypes.javaToBinary;
+import static io.trino.plugin.functions.python.TrinoTypes.toRowTypeDescriptor;
+import static io.trino.plugin.functions.python.TrinoTypes.toTypeDescriptor;
+import static io.trino.spi.StandardErrorCode.FUNCTION_IMPLEMENTATION_ERROR;
+import static java.lang.Math.min;
+import static java.lang.Math.toIntExact;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.requireNonNull;
+import static java.util.function.Function.identity;
+
+final class PythonEngine
+ implements Closeable
+{
+ private static final Logger log = Logger.get(PythonEngine.class);
+ private static final com.dylibso.chicory.log.Logger logger = JdkLogger.get(PythonEngine.class);
+
+ private static final Configuration FS_CONFIG = Configuration.unix().toBuilder()
+ .setAttributeViews("unix")
+ .setMaxSize(DataSize.of(8, MEGABYTE).toBytes())
+ .build();
+
+ private static final Map ERROR_CODES = Stream.of(StandardErrorCode.values())
+ .collect(toImmutableMap(error -> error.toErrorCode().getCode(), identity()));
+
+ private static final WasmModule PYTHON_MODULE = PythonModule.load();
+
+ private final Closer closer = Closer.create();
+ private final LimitedOutputStream stderr = new LimitedOutputStream();
+ private final ExportFunction allocate;
+ private final ExportFunction deallocate;
+ private final ExportFunction setup;
+ private final ExportFunction execute;
+ private final Memory memory;
+ private Type returnType;
+ private List argumentTypes;
+ private TrinoException error;
+
+ public PythonEngine(String guestCode)
+ {
+ FileSystem fileSystem = closer.register(Jimfs.newFileSystem(FS_CONFIG));
+ Path guestRoot = fileSystem.getPath("/guest");
+
+ try {
+ Files.createDirectories(guestRoot);
+ Files.writeString(guestRoot.resolve("guest.py"), guestCode);
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ OutputStream stdout = closer.register(new LoggingOutputStream(log));
+
+ WasiOptions wasiOptions = WasiOptions.builder()
+ .withStdout(stdout)
+ .withStderr(stderr)
+ .withDirectory(guestRoot.toString(), guestRoot)
+ .build();
+
+ WasiPreview1 wasi = closer.register(new WasiPreview1(logger, wasiOptions));
+
+ ImportValues importValues = ImportValues.builder()
+ .addFunction(wasi.toHostFunctions())
+ .addFunction(returnErrorHostFunction())
+ .build();
+
+ Instance instance = Instance.builder(PYTHON_MODULE)
+ .withMachineFactory(PythonModule::create)
+ .withImportValues(importValues)
+ .build();
+
+ allocate = instance.export("allocate");
+ deallocate = instance.export("deallocate");
+ setup = instance.export("setup");
+ execute = instance.export("execute");
+ memory = instance.memory();
+ }
+
+ public void setup(Type returnType, List argumentTypes, String handlerName)
+ {
+ try {
+ doSetup(returnType, argumentTypes, handlerName);
+ }
+ catch (ChicoryException e) {
+ throw fatalError("Python error", e);
+ }
+ }
+
+ private void doSetup(Type returnType, List argumentTypes, String handlerName)
+ {
+ byte[] nameBytes = handlerName.getBytes(UTF_8);
+ int nameAddress = allocate(nameBytes.length + 1);
+ memory.write(nameAddress, nameBytes);
+ memory.writeByte(nameAddress + nameBytes.length, (byte) 0);
+
+ Slice argumentTypeSlice = toRowTypeDescriptor(argumentTypes);
+ int argTypeAddress = allocate(argumentTypeSlice.length());
+ writeSliceTo(argumentTypeSlice, argTypeAddress);
+
+ Slice returnTypeSlice = toTypeDescriptor(returnType);
+ int returnTypeAddress = allocate(returnTypeSlice.length());
+ writeSliceTo(returnTypeSlice, returnTypeAddress);
+
+ setup.apply(nameAddress, argTypeAddress, returnTypeAddress);
+
+ deallocate(nameAddress);
+
+ this.returnType = requireNonNull(returnType, "returnType is null");
+ this.argumentTypes = ImmutableList.copyOf(requireNonNull(argumentTypes, "argumentTypes is null"));
+ }
+
+ private void writeSliceTo(Slice slice, int address)
+ {
+ memory.write(address, slice.byteArray(), slice.byteArrayOffset(), slice.length());
+ }
+
+ private int allocate(int size)
+ {
+ return toIntExact(allocate.apply(size)[0]);
+ }
+
+ private void deallocate(int address)
+ {
+ deallocate.apply(address);
+ }
+
+ private int execute(int address)
+ {
+ return toIntExact(execute.apply(address)[0]);
+ }
+
+ public Object execute(Object[] arguments)
+ {
+ Slice slice = javaToBinary(argumentTypes, arguments);
+ int argAddress = allocate(slice.length());
+ writeSliceTo(slice, argAddress);
+
+ error = null;
+
+ int resultAddress;
+ try {
+ resultAddress = execute(argAddress);
+ }
+ catch (ChicoryException e) {
+ throw fatalError("Failed to invoke Python function", e);
+ }
+
+ deallocate(argAddress);
+
+ if (error != null) {
+ throw new TrinoException(error::getErrorCode, error.getMessage(), error.getCause());
+ }
+
+ if (resultAddress == 0) {
+ throw new TrinoException(FUNCTION_IMPLEMENTATION_ERROR, "Python function did not return a result");
+ }
+
+ int resultSize = memory.readInt(resultAddress);
+ byte[] bytes = memory.readBytes(resultAddress + 4, resultSize);
+ deallocate(resultAddress);
+
+ SliceInput input = new BasicSliceInput(Slices.wrappedBuffer(bytes));
+ return binaryToJava(returnType, input);
+ }
+
+ public TrinoException fatalError(String message, ChicoryException e)
+ {
+ String error = stderr.toString(UTF_8).strip();
+ if (!error.isEmpty()) {
+ message += ":";
+ message += error.contains("\n") ? "\n" : " ";
+ message += error;
+ }
+ return new TrinoException(FUNCTION_IMPLEMENTATION_ERROR, message, e);
+ }
+
+ @Override
+ public void close()
+ {
+ try {
+ closer.close();
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private long[] returnError(Instance instance, long... args)
+ {
+ int code = toIntExact(args[0]);
+ int messageAddress = toIntExact(args[1]);
+ int messageSize = toIntExact(args[2]);
+ int tracebackAddress = toIntExact(args[3]);
+ int tracebackSize = toIntExact(args[4]);
+
+ Memory memory = instance.memory();
+ String message = memory.readString(messageAddress, messageSize);
+
+ Throwable traceback = null;
+ if (tracebackAddress != 0) {
+ String value = memory.readString(tracebackAddress, tracebackSize);
+ traceback = new RuntimeException("Python traceback:\n" + value.stripTrailing());
+ }
+
+ ErrorCodeSupplier errorCode = ERROR_CODES.get(code);
+ if (errorCode == null) {
+ errorCode = FUNCTION_IMPLEMENTATION_ERROR;
+ message = "Unknown error code (%s): %s".formatted(code, message);
+ }
+
+ error = new TrinoException(errorCode, message, traceback);
+
+ return null;
+ }
+
+ private HostFunction returnErrorHostFunction()
+ {
+ return new HostFunction(
+ "trino",
+ "return_error",
+ List.of(I32, I32, I32, I32, I32),
+ List.of(),
+ this::returnError);
+ }
+
+ @SuppressWarnings("UnsynchronizedOverridesSynchronized")
+ private static class LimitedOutputStream
+ extends ByteArrayOutputStream
+ {
+ private static final int LIMIT = 4096;
+
+ @Override
+ public void write(byte[] b, int off, int len)
+ {
+ if (count < LIMIT) {
+ super.write(b, off, min(len, LIMIT - count));
+ }
+ }
+ }
+}
diff --git a/plugin/trino-functions-python/src/main/java/io/trino/plugin/functions/python/PythonFunctionEngine.java b/plugin/trino-functions-python/src/main/java/io/trino/plugin/functions/python/PythonFunctionEngine.java
new file mode 100644
index 000000000000..e639c6859e20
--- /dev/null
+++ b/plugin/trino-functions-python/src/main/java/io/trino/plugin/functions/python/PythonFunctionEngine.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.functions.python;
+
+import io.trino.spi.TrinoException;
+import io.trino.spi.function.InvocationConvention;
+import io.trino.spi.function.LanguageFunctionEngine;
+import io.trino.spi.function.ScalarFunctionAdapter;
+import io.trino.spi.function.ScalarFunctionImplementation;
+import io.trino.spi.session.PropertyMetadata;
+import io.trino.spi.type.Type;
+
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodType;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import static io.trino.plugin.functions.python.TrinoTypes.validateReturnType;
+import static io.trino.spi.StandardErrorCode.INVALID_FUNCTION_PROPERTY;
+import static io.trino.spi.function.InvocationConvention.InvocationArgumentConvention.BOXED_NULLABLE;
+import static io.trino.spi.function.InvocationConvention.InvocationReturnConvention.NULLABLE_RETURN;
+import static io.trino.spi.session.PropertyMetadata.stringProperty;
+import static java.lang.invoke.MethodHandles.lookup;
+import static java.lang.invoke.MethodType.methodType;
+import static java.util.Collections.nCopies;
+
+final class PythonFunctionEngine
+ implements LanguageFunctionEngine
+{
+ private static final MethodHandle FACTORY_METHOD;
+ private static final MethodHandle EXECUTE_METHOD;
+
+ static {
+ try {
+ FACTORY_METHOD = lookup().findVirtual(Supplier.class, "get", methodType(Object.class));
+ EXECUTE_METHOD = lookup().findVirtual(PythonEngine.class, "execute", methodType(Object.class, Object[].class));
+ }
+ catch (NoSuchMethodException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public String getLanguage()
+ {
+ return "PYTHON";
+ }
+
+ @Override
+ public List> getFunctionProperties()
+ {
+ return List.of(stringProperty("handler", "Name of the Python method to call", "", false));
+ }
+
+ @Override
+ public void validateScalarFunction(Type returnType, List argumentTypes, String definition, Map properties)
+ {
+ validateReturnType(returnType);
+
+ String code = definition.stripIndent();
+
+ String handler = (String) properties.get("handler");
+ if (handler.isEmpty()) {
+ throw new TrinoException(INVALID_FUNCTION_PROPERTY, "Property 'handler' is required");
+ }
+
+ try (PythonEngine engine = new PythonEngine(code)) {
+ engine.setup(returnType, argumentTypes, handler);
+ }
+ }
+
+ @Override
+ public ScalarFunctionImplementation getScalarFunctionImplementation(
+ Type returnType,
+ List argumentTypes,
+ String definition,
+ Map properties,
+ InvocationConvention invocationConvention)
+ {
+ String code = definition.stripIndent();
+ String handler = (String) properties.get("handler");
+
+ Supplier