diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 6f61ca64794..176913f5b54 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -193,7 +193,7 @@ if (TARGET ch_contrib::jemalloc) target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::jemalloc) endif() -target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::sparsehash ch_contrib::incbin) +target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::sparsehash ch_contrib::incbin ch_contrib::icu) add_subdirectory(Access/Common) add_subdirectory(Common/ZooKeeper) @@ -284,6 +284,7 @@ endforeach() set_source_files_properties(Storages/StoragePython.cpp PROPERTIES INCLUDE_DIRECTORIES "${PYTHON_INCLUDE_DIRS}") set_source_files_properties(Processors/Sources/PythonSource.cpp PROPERTIES INCLUDE_DIRECTORIES "${PYTHON_INCLUDE_DIRS}") set_source_files_properties(Columns/ColumnPyObject.cpp PROPERTIES INCLUDE_DIRECTORIES "${PYTHON_INCLUDE_DIRS}") +set_source_files_properties(Common/PythonUtils.cpp PROPERTIES INCLUDE_DIRECTORIES "${PYTHON_INCLUDE_DIRS}") # get python version, something like python3.x execute_process(COMMAND python3 -c "import sys; print('python3.'+str(sys.version_info[1]))" @@ -308,6 +309,9 @@ if (OS_LINUX) set_source_files_properties(Columns/ColumnPyObject.cpp PROPERTIES COMPILE_FLAGS "-w -idirafter /usr/include -include crypt.h" ) + set_source_files_properties(Common/PythonUtils.cpp PROPERTIES COMPILE_FLAGS + "-w -idirafter /usr/include -include crypt.h" + ) else() set_source_files_properties(Storages/StoragePython.cpp PROPERTIES COMPILE_FLAGS "-w" @@ -318,6 +322,9 @@ if (OS_LINUX) set_source_files_properties(Columns/ColumnPyObject.cpp PROPERTIES COMPILE_FLAGS "-w" ) + set_source_files_properties(Common/PythonUtils.cpp PROPERTIES COMPILE_FLAGS + "-w" + ) endif() elseif (OS_DARWIN) set_source_files_properties(Storages/StoragePython.cpp PROPERTIES COMPILE_FLAGS @@ -329,6 +336,9 @@ elseif (OS_DARWIN) set_source_files_properties(Columns/ColumnPyObject.cpp PROPERTIES COMPILE_FLAGS "-w" ) + set_source_files_properties(Common/PythonUtils.cpp PROPERTIES COMPILE_FLAGS + "-w" + ) endif() set (all_modules dbms) diff --git a/src/Processors/Sources/PythonSource.cpp b/src/Processors/Sources/PythonSource.cpp index 686a465f0ae..2b5a2673ac9 100644 --- a/src/Processors/Sources/PythonSource.cpp +++ b/src/Processors/Sources/PythonSource.cpp @@ -1,5 +1,8 @@ +#define PYBIND11_NO_ASSERT_GIL_HELD_INCREF_DECREF + #include #include +#include #include // #include #include @@ -17,81 +20,87 @@ #include #include #include +#include #include #include #include #include +#include #include + namespace DB { -using namespace pybind11::literals; +namespace py = pybind11; + +namespace ErrorCodes +{ +extern const int PY_OBJECT_NOT_FOUND; +extern const int PY_EXCEPTION_OCCURED; +} -PythonSource::PythonSource(py::object reader_, const Block & sample_block_, const UInt64 max_block_size_) - : ISource(sample_block_.cloneEmpty()), reader(reader_), max_block_size(max_block_size_) +PythonSource::PythonSource( + py::object & data_source_, + const Block & sample_block_, + const UInt64 max_block_size_, + const size_t stream_index, + const size_t num_streams) + : ISource(sample_block_.cloneEmpty()) + , data_source(data_source_) + , max_block_size(max_block_size_) + , stream_index(stream_index) + , num_streams(num_streams) + , cursor(0) { description.init(sample_block_); } template -void insert_from_pyobject(py::object obj, const MutableColumnPtr & column) +void insert_from_pyobject(const py::object & obj, const MutableColumnPtr & column) { - if (py::isinstance(obj)) + auto type_name = getPyType(obj); + if (type_name == "list") { - py::list list = obj.cast(); - for (auto && item : list) - column->insert(item.cast()); + py::list list = castToPyList(obj); + { + py::gil_scoped_acquire acquire; + for (auto && item : list) + column->insert(item.cast()); + } return; } - if (py::isinstance(obj)) + if (type_name == "ndarray") { // if column type is ColumnString, we need to handle it like list if constexpr (std::is_same_v) { - // Typically numpy string array is a array of pointers, convert it to ColumnString: - // 1. get the total size of the strings - // 2. reserve the size in the column - // 3. copy the strings into the column - py::array array = obj.cast(); + py::array array = castToPyArray(obj); + py::gil_scoped_acquire acquire; for (auto && item : array) { - UInt64 str_len; - // auto kind = PyUnicode_KIND(item.ptr()); - // LOG_DEBUG(&Poco::Logger::get("TableFunctionPython"), "PyObjects Kind: {}", kind); - const char * ptr = PythonSource::getPyUtf8StrData(item, str_len); + size_t str_len; + const char * ptr = GetPyUtf8StrData(item, str_len); column->insertData(ptr, str_len); } - // if (py::isinstance(obj.attr("__getitem__")(0))) - // { - // py::array array = obj.cast(); - // ColumnVectorHelper * helper = static_cast(column.get()); - // helper->appendRawData(static_cast(array.data()), array.size()); - // LOG_DEBUG(&Poco::Logger::get("TableFunctionPython"), "PyObjects Read {} bytes", array.size() * array.itemsize()); - // } return; } - py::array array = obj.cast(); + py::array array = castToPyArray(obj); column->reserve(size_t(array.size())); // get the raw data from the array and memcpy it into the column ColumnVectorHelper * helper = static_cast(column.get()); helper->appendRawData(static_cast(array.data()), array.size()); - LOG_DEBUG(&Poco::Logger::get("TableFunctionPython"), "Read {} bytes", array.size() * array.itemsize()); return; } else { - throw Exception( - ErrorCodes::BAD_TYPE_OF_FIELD, - "Unsupported type {} for value {}", - obj.get_type().attr("__name__").cast(), - py::str(obj).cast()); + throw Exception(ErrorCodes::BAD_TYPE_OF_FIELD, "Unsupported type {} for value {}", getPyType(obj), castToStr(obj)); } } template -ColumnPtr convert_and_insert(py::object obj, UInt32 scale = 0) +ColumnPtr convert_and_insert(const py::object & obj, UInt32 scale = 0) { MutableColumnPtr column; if constexpr (std::is_same_v || std::is_same_v || std::is_same_v) @@ -101,38 +110,25 @@ ColumnPtr convert_and_insert(py::object obj, UInt32 scale = 0) else column = ColumnVector::create(); - if (py::isinstance(obj) || py::isinstance(obj)) + std::string type_name = getPyType(obj); + // if (isInstanceOf(obj) || isInstanceOf(obj)) + if (type_name == "list" || type_name == "ndarray") { //reserve the size of the column - column->reserve(py::len(obj)); + column->reserve(getObjectLength(obj)); insert_from_pyobject(obj, column); return column; } - std::string type_name = obj.attr("__class__").attr("__name__").cast(); if (type_name == "Series") { - py::object values = obj.attr("values"); - if (py::isinstance(values)) + py::object values; + { + py::gil_scoped_acquire acquire; + values = obj.attr("values"); + } + if (isInstanceOf(values)) { - // if constexpr (std::is_same_v) - // { - // // call obj.memory_usage(deep=True) if possible - // if (py::hasattr(obj, "memory_usage")) - // { - // size_t mem_usage = obj.attr("memory_usage")("deep"_a = true).cast(); - // LOG_DEBUG(&Poco::Logger::get("TableFunctionPython"), "Memory usage: {}", mem_usage); - // //reserve the size of the column - // auto col = ColumnString::create(); - // col->getOffsets().reserve(py::len(values)); - // col->getChars().reserve(mem_usage); - // column = std::move(col); - // } - - // // // If the values first element is a Python str object - // // if (py::isinstance(values.attr("__getitem__")(0))) - // // column = ColumnPyObject::create(); - // } insert_from_pyobject(values, column); return column; } @@ -152,70 +148,96 @@ ColumnPtr convert_and_insert(py::object obj, UInt32 scale = 0) // Run with new chDB on dataframe. Time cost: 0.09642386436462402 s // Run with new chDB on dataframe(arrow). Time cost: 0.11595273017883301 s // chdb todo: maybe we can use the ArrowExtensionArray directly - if (py::hasattr(values, "to_numpy")) + if (hasAttribute(values, "to_numpy")) { - py::array numpy_array = values.attr("to_numpy")(); + py::array numpy_array = callMethod(values, "to_numpy"); column->reserve(numpy_array.size()); insert_from_pyobject(numpy_array, column); return column; } } - throw Exception( - ErrorCodes::BAD_TYPE_OF_FIELD, - "Unsupported type {} for value {}", - obj.get_type().attr("__name__").cast(), - py::str(obj).cast()); + throw Exception(ErrorCodes::BAD_TYPE_OF_FIELD, "Unsupported type {} for value {}", getPyType(obj), castToStr(obj)); +} + +void PythonSource::destory(std::shared_ptr> & data) +{ + // manually destory std::shared_ptr> and trigger the py::object dec_ref with GIL holded + py::gil_scoped_acquire acquire; + data->clear(); + data.reset(); } Chunk PythonSource::generate() { size_t num_rows = 0; - py::gil_scoped_acquire acquire; - try + auto names = description.sample_block.getNames(); + if (names.empty()) + return {}; + + std::shared_ptr> data; + if (isInheritsFromPyReader(data_source)) + { + py::gil_scoped_acquire acquire; + data = std::move(castToSharedPtrVector(data_source.attr("read")(names, max_block_size))); + } + else { - auto names = description.sample_block.getNames(); - auto data = reader.attr("read")(names, max_block_size).cast>(); + auto total_rows = getLengthOfValueByKey(data_source, names.front()); + auto rows_per_stream = total_rows / num_streams; + auto start = stream_index * rows_per_stream; + auto end = (stream_index + 1) * rows_per_stream; + if (stream_index == num_streams - 1) + end = total_rows; + if (cursor == 0) + cursor = start; + auto count = std::min(max_block_size, end - cursor); + if (count == 0) + return {}; + LOG_DEBUG(logger, "Stream index {} Reading {} rows from {}", stream_index, count, cursor); + data = PyReader::readData(data_source, names, cursor, count); + } - LOG_DEBUG(logger, "Read {} columns", data.size()); - LOG_DEBUG(logger, "Need {} columns", description.sample_block.columns()); - LOG_DEBUG(logger, "Max block size: {}", max_block_size); + if (data->empty()) + return {}; - // if log level is debug, print all the data - if (logger->debug()) - { - // print all the data - for (auto && col : data) - { - if (py::isinstance(col)) - { - py::list list = col.cast(); - for (auto && i : list) - LOG_DEBUG(logger, "Data: {}", py::str(i).cast()); - } - else if (py::isinstance(col)) - { - py::array array = col.cast(); - for (auto && i : array) - LOG_DEBUG(logger, "Data: {}", py::str(i).cast()); - } - else - { - LOG_DEBUG(logger, "Data: {}", py::str(col).cast()); - } - } - } + // // if log level is debug, print all the data + // if (logger->debug()) + // { + // // print all the data + // for (auto && col : data) + // { + // if (isInstanceOf(col)) + // { + // py::list list = col.cast(); + // for (auto && i : list) + // LOG_DEBUG(logger, "Data: {}", py::str(i).cast()); + // } + // else if (isInstanceOf(col)) + // { + // py::array array = col.cast(); + // for (auto && i : array) + // LOG_DEBUG(logger, "Data: {}", py::str(i).cast()); + // } + // else + // { + // LOG_DEBUG(logger, "Data: {}", py::str(col).cast()); + // } + // } + // } - Columns columns(description.sample_block.columns()); - // fill in the columns - for (size_t i = 0; i < data.size(); ++i) - { - if (i == 0) - num_rows = py::len(data[i]); - const auto & column = data[i]; - const auto & type = description.sample_block.getByPosition(i).type; - WhichDataType which(type); + Columns columns(description.sample_block.columns()); + for (size_t i = 0; i < data->size(); ++i) + { + if (i == 0) + num_rows = getObjectLength((*data)[i]); + const auto & column = (*data)[i]; + const auto & type = description.sample_block.getByPosition(i).type; + WhichDataType which(type); + try + { + // Dispatch to the appropriate conversion function based on data type if (which.isUInt8()) columns[i] = convert_and_insert(column); else if (which.isUInt16()) @@ -267,18 +289,19 @@ Chunk PythonSource::generate() type->getName(), description.sample_block.getByPosition(i).name); } - - if (num_rows == 0) - return {}; - - return Chunk(std::move(columns), num_rows); - } - catch (const std::exception & e) - { - // py::gil_scoped_release release; - throw Exception(ErrorCodes::LOGICAL_ERROR, e.what()); + catch (const Exception & e) + { + destory(data); + LOG_ERROR(logger, "Error processing column {}: {}", i, e.what()); + throw; + } } -} + destory(data); + if (num_rows == 0) + return {}; + + return Chunk(std::move(columns), num_rows); +} } diff --git a/src/Processors/Sources/PythonSource.h b/src/Processors/Sources/PythonSource.h index 9fdfee143ba..2d0a6ba776c 100644 --- a/src/Processors/Sources/PythonSource.h +++ b/src/Processors/Sources/PythonSource.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -16,71 +17,26 @@ namespace py = pybind11; class PythonSource : public ISource { public: - PythonSource(py::object reader_, const Block & sample_block_, UInt64 max_block_size_); - ~PythonSource() override - { - // Acquire the GIL before destroying the reader object - py::gil_scoped_acquire acquire; - reader.dec_ref(); - reader.release(); - } + PythonSource(py::object & data_source_, const Block & sample_block_, UInt64 max_block_size_, size_t stream_index, size_t num_streams); + + ~PythonSource() override = default; String getName() const override { return "Python"; } Chunk generate() override; - static const char * getPyUtf8StrData(const py::handle & obj, size_t & buf_len) - { - // See: https://github.com/python/cpython/blob/3.9/Include/cpython/unicodeobject.h#L81 - if (PyUnicode_IS_COMPACT_ASCII(obj.ptr())) - { - const char * data = reinterpret_cast(PyUnicode_1BYTE_DATA(obj.ptr())); - buf_len = PyUnicode_GET_LENGTH(obj.ptr()); - return data; - } - else - { - PyCompactUnicodeObject * unicode = reinterpret_cast(obj.ptr()); - if (unicode->utf8) - { - // It's utf8 string, treat it like ASCII - const char * data = reinterpret_cast(unicode->utf8); - buf_len = unicode->utf8_length; - return data; - } - else if (PyUnicode_IS_COMPACT(obj.ptr())) - { - auto kind = PyUnicode_KIND(obj.ptr()); - if (kind == PyUnicode_1BYTE_KIND || kind == PyUnicode_2BYTE_KIND || kind == PyUnicode_4BYTE_KIND) - { - // always convert it to utf8 - const char * data = PyUnicode_AsUTF8AndSize(obj.ptr(), &unicode->utf8_length); - buf_len = unicode->utf8_length; - // set the utf8 buffer back - unicode->utf8 = const_cast(data); - return data; - } - else - { - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported unicode kind {}", kind); - } - } - else - { - // always convert it to utf8 - const char * data = PyUnicode_AsUTF8AndSize(obj.ptr(), &unicode->utf8_length); - buf_len = unicode->utf8_length; - // set the utf8 buffer back - unicode->utf8 = const_cast(data); - return data; - } - } - } private: - py::object reader; + py::object & data_source; // Do not own the reference Block sample_block; const UInt64 max_block_size; + // Caller will only pass stream index and total stream count + // to the constructor, we need to calculate the start offset and end offset. + const size_t stream_index; + const size_t num_streams; + size_t cursor; Poco::Logger * logger = &Poco::Logger::get("TableFunctionPython"); ExternalResultDescription description; + + void destory(std::shared_ptr> & data); }; } diff --git a/src/Storages/StoragePython.cpp b/src/Storages/StoragePython.cpp index b47b76d6910..6450cbb3e2a 100644 --- a/src/Storages/StoragePython.cpp +++ b/src/Storages/StoragePython.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -42,7 +43,7 @@ StoragePython::StoragePython( const ConstraintsDescription & constraints_, py::object reader_, ContextPtr context_) - : IStorage(table_id_), reader(reader_), WithContext(context_->getGlobalContext()) + : IStorage(table_id_), data_source(reader_), WithContext(context_->getGlobalContext()) { StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns_); @@ -57,14 +58,35 @@ Pipe StoragePython::read( ContextPtr /*context_*/, QueryProcessingStage::Enum /*processed_stage*/, size_t max_block_size, - size_t /*num_streams*/) + size_t num_streams) { py::gil_scoped_acquire acquire; storage_snapshot->check(column_names); Block sample_block = prepareSampleBlock(column_names, storage_snapshot); + // check if string type column involved + bool has_string_column = false; + for (const auto & column_name : column_names) + { + if (sample_block.getByName(column_name).type->getName() == "String") + { + has_string_column = true; + break; + } + } + + num_streams = 10; // for testing - return Pipe(std::make_shared(reader, sample_block, max_block_size)); + // Converting Python str to ClickHouse String type will cost a lot of time. + // so if string column involved, return multiple streams. + if (has_string_column) + { + Pipes pipes; + for (size_t stream = 0; stream < num_streams; ++stream) + pipes.emplace_back(std::make_shared(data_source, sample_block, max_block_size, stream, num_streams)); + return Pipe::unitePipes(std::move(pipes)); + } + return Pipe(std::make_shared(data_source, sample_block, max_block_size, 0, 1)); } Block StoragePython::prepareSampleBlock(const Names & column_names, const StorageSnapshotPtr & storage_snapshot) @@ -78,12 +100,16 @@ Block StoragePython::prepareSampleBlock(const Names & column_names, const Storag return sample_block; } -ColumnsDescription StoragePython::getTableStructureFromData(py::object reader) +ColumnsDescription StoragePython::getTableStructureFromData(py::object data_source) { - if (!reader) + if (!data_source) throw Exception(ErrorCodes::LOGICAL_ERROR, "Python reader not initialized"); py::gil_scoped_acquire acquire; - auto schema = reader.attr("get_schema")().cast>>(); + std::vector> schema; + if (isInheritsFromPyReader(data_source)) + schema = data_source.attr("get_schema")().cast>>(); + else + schema = PyReader::getSchemaFromPyObj(data_source); auto * logger = &Poco::Logger::get("StoragePython"); if (logger->debug()) @@ -108,7 +134,7 @@ ColumnsDescription StoragePython::getTableStructureFromData(py::object reader) RE2 pattern_time64_us(R"(\btime64\[us\]\b)"); RE2 pattern_time64_ns(R"(\btime64\[ns\]\b||str|DataType\(string\)|DataType\(binary\)|binary\[pyarrow\]|dtype\[object_\]|dtype\('O'\))"); + R"(\bstring\b||str|DataType\(string\)|DataType\(binary\)|binary\[pyarrow\]|dtype\[object_\]|dtype\('S|dtype\('O'\))"); // Iterate through each pair of name and type string in the schema for (const auto & [name, typeStr] : schema) @@ -200,6 +226,86 @@ ColumnsDescription StoragePython::getTableStructureFromData(py::object reader) return ColumnsDescription(names_and_types); } +std::vector> PyReader::getSchemaFromPyObj(const py::object data) +{ + std::vector> schema; + if (!py::hasattr(data, "__class__")) + { + throw Exception( + ErrorCodes::UNKNOWN_FORMAT, "Unknown data type for schema inference. Consider inheriting PyReader and overriding getSchema()."); + } + + auto type_name = data.attr("__class__").attr("__name__").cast(); + + if (py::isinstance(data)) + { + // If the data is a Python dictionary + for (auto item : data.cast()) + { + std::string key = py::str(item.first).cast(); + py::list values = py::cast(item.second); + std::string dtype = py::str(values[0].attr("__class__").attr("__name__")).cast(); + if (!values.empty()) + schema.emplace_back(key, dtype); + } + return schema; + } + + if (py::hasattr(data, "dtypes")) + { + // If the data is a Pandas DataFrame + py::object dtypes = data.attr("dtypes"); + py::list columns = data.attr("columns"); + for (size_t i = 0; i < py::len(columns); ++i) + { + std::string name = py::str(columns[i]).cast(); + std::string dtype = py::str(py::repr(dtypes[columns[i]])).cast(); + schema.emplace_back(name, dtype); + } + return schema; + } + + if (py::hasattr(data, "schema")) + { + // If the data is a Pyarrow Table + py::object tbl_schema = data.attr("schema"); + auto names = tbl_schema.attr("names").cast(); + auto types = tbl_schema.attr("types").cast(); + for (size_t i = 0; i < py::len(names); ++i) + { + std::string name = py::str(names[i]).cast(); + std::string dtype = py::str(types[i]).cast(); + schema.emplace_back(name, dtype); + } + return schema; + } + + if (type_name == "recarray") + { + // if it's numpy.recarray + py::object dtype = data.attr("dtype"); + py::list fields = dtype.attr("fields"); + py::dict fields_dict = fields.cast(); + // fields_dict looks like: + // {'TIME': (dtype('int64'), 0), + // 'FX' : (dtype('int64'), 8), + // 'FY' : (dtype('int64'), 16), + // 'FZ' : (dtype('S68'), 24)} + for (auto field : fields_dict) + { + std::string name = field.first.cast(); + std::string dtype_str = py::str(field.second).cast(); + schema.emplace_back(name, dtype_str); + } + return schema; + } + + throw Exception( + ErrorCodes::UNKNOWN_FORMAT, + "Unknown data type {} for schema inference. Consider inheriting PyReader and overriding getSchema().", + py::str(data.attr("__class__")).cast()); +} + void registerStoragePython(StorageFactory & factory) { factory.registerStorage( diff --git a/src/Storages/StoragePython.h b/src/Storages/StoragePython.h index 93b979620c4..c42b4c92f50 100644 --- a/src/Storages/StoragePython.h +++ b/src/Storages/StoragePython.h @@ -20,11 +20,23 @@ namespace DB { namespace py = pybind11; + +namespace ErrorCodes +{ +extern const int UNKNOWN_FORMAT; +extern const int NOT_IMPLEMENTED; +} class PyReader { public: explicit PyReader(const py::object & data) : data(data) { } - ~PyReader() = default; + ~PyReader() + { + py::gil_scoped_acquire acquire; + if (data.is_none()) + return; + data.release(); + } // Read `count` rows from the data, and return a list of columns // chdb todo: maybe return py::list is better, but this is just a shallow copy @@ -33,6 +45,23 @@ class PyReader throw Exception(ErrorCodes::NOT_IMPLEMENTED, "read() method is not implemented"); } + static std::shared_ptr> + readData(const py::object & data, const std::vector & col_names, size_t & cursor, size_t count) + { + py::gil_scoped_acquire acquire; + auto block = std::make_shared>(); + // Access columns directly by name and slice + for (const auto & col : col_names) + { + py::object col_data = data[py::str(col)]; // Use dictionary-style access + block->push_back(col_data.attr("__getitem__")(py::slice(cursor, cursor + count, 1))); + } + + if (!block->empty()) + cursor += py::len((*block)[0]); // Update cursor based on the length of the first column slice + + return block; + } // Return a vector of column names and their types, as a list of pairs. // The order is important, and should match the order of the data. // This is the default implementation, which trys to infer the schema from the every first row @@ -73,47 +102,9 @@ class PyReader // "DataTypeInt8", "DataTypeInt16", "DataTypeInt32", "DataTypeInt64", "DataTypeInt128", "DataTypeInt256", // "DataTypeFloat32", "DataTypeFloat64", "DataTypeString", - std::vector> getSchema() - { - std::vector> schema; + static std::vector> getSchemaFromPyObj(py::object data); - if (py::isinstance(data)) - { - // If the data is a Python dictionary - for (auto item : data.cast()) - { - std::string key = py::str(item.first).cast(); - py::list values = py::cast(item.second); - std::string dtype = py::str(values[0].attr("__class__").attr("__name__")).cast(); - if (!values.empty()) - schema.emplace_back(key, dtype); - } - } - else if (py::hasattr(data, "dtypes")) - { - // If the data is a Pandas DataFrame - py::object dtypes = data.attr("dtypes"); - py::list columns = data.attr("columns"); - for (size_t i = 0; i < py::len(columns); ++i) - { - std::string name = py::str(columns[i]).cast(); - std::string dtype = py::str(py::repr(dtypes[columns[i]])).cast(); - schema.emplace_back(name, dtype); - } - } - else if (py::hasattr(data, "schema")) - { - // If the data is a Pyarrow Table - py::object schema_fields = data.attr("schema").attr("fields"); - for (auto field : schema_fields) - { - std::string name = py::str(field.attr("name")).cast(); - std::string dtype = py::str(py::repr(field.attr("type"))).cast(); - schema.emplace_back(name, dtype); - } - } - return schema; - } + std::vector> getSchema() { return getSchemaFromPyObj(data); } protected: py::object data; @@ -152,8 +143,8 @@ class StoragePython : public IStorage, public WithContext { // Destroy the reader with the GIL py::gil_scoped_acquire acquire; - reader.dec_ref(); - reader.release(); + data_source.dec_ref(); + data_source.release(); } std::string getName() const override { return "Python"; } @@ -169,10 +160,10 @@ class StoragePython : public IStorage, public WithContext Block prepareSampleBlock(const Names & column_names, const StorageSnapshotPtr & storage_snapshot); - static ColumnsDescription getTableStructureFromData(py::object reader); + static ColumnsDescription getTableStructureFromData(py::object data_source); private: - py::object reader; + py::object data_source; Poco::Logger * logger = &Poco::Logger::get("StoragePython"); }; diff --git a/src/TableFunctions/TableFunctionPython.cpp b/src/TableFunctions/TableFunctionPython.cpp index 6ce3f5c1d0c..a0acdcf4b97 100644 --- a/src/TableFunctions/TableFunctionPython.cpp +++ b/src/TableFunctions/TableFunctionPython.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include namespace DB @@ -23,42 +24,6 @@ extern const int PY_OBJECT_NOT_FOUND; extern const int PY_EXCEPTION_OCCURED; } -// Helper function to check if an object's class is or inherits from PyReader with a maximum depth -bool is_or_inherits_from_pyreader(const py::handle & obj, int depth = 3) -{ - // Base case: if depth limit reached, stop the recursion - if (depth == 0) - return false; - - // Check directly if obj is an instance of PyReader - if (py::isinstance(obj, py::module_::import("chdb").attr("PyReader"))) - return true; - - // Check if obj's class or any of its bases is PyReader - py::object cls = obj.attr("__class__"); - if (py::hasattr(cls, "__bases__")) - { - for (auto base : cls.attr("__bases__")) - if (py::str(base.attr("__name__")).cast() == "PyReader" || is_or_inherits_from_pyreader(base, depth - 1)) - return true; - } - return false; -} - -// Helper function to check if object is a pandas DataFrame -bool is_pandas_dataframe(const py::object & obj) -{ - auto pd_data_frame_type = py::module_::import("pandas").attr("DataFrame"); - return py::isinstance(obj, pd_data_frame_type); -} - -// Helper function to check if object is a PyArrow Table -bool is_pyarrow_table(const py::object & obj) -{ - auto table_type = py::module_::import("pyarrow").attr("Table"); - return py::isinstance(obj, table_type); -} - // Function to find instance of PyReader, pandas DataFrame, or PyArrow Table, filtered by variable name py::object find_instances_of_pyreader(const std::string & var_name) { @@ -75,7 +40,7 @@ py::object find_instances_of_pyreader(const std::string & var_name) if (dict.contains(var_name)) { py::object obj = dict[var_name.data()]; - if (is_or_inherits_from_pyreader(obj) || is_pandas_dataframe(obj) || is_pyarrow_table(obj)) + if (isInheritsFromPyReader(obj) || isPandasDf(obj) || isPyarrowTable(obj)) return obj; } } @@ -143,7 +108,7 @@ StoragePtr TableFunctionPython::executeImpl( { py::gil_scoped_acquire acquire; if (!reader) - throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Python reader not initialized"); + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Python data source not initialized"); auto columns = getActualTableStructure(context, is_insert_query); @@ -163,9 +128,11 @@ void registerTableFunctionPython(TableFunctionFactory & factory) factory.registerFunction( {.documentation = {.description = R"( - Creates a table interface to a Python data source and reads data from a PyReader object. - This table function requires a single argument which is a PyReader object used to read data from Python. - )", +Passing Pandas DataFrame or Pyarrow Table to ClickHouse engine. +For any other data structure, you can also create a table interface to a Python data source and reads data +from a PyReader object. +This table function requires a single argument which is a PyReader object used to read data from Python. +)", .examples = {{"1", "SELECT * FROM Python(PyReader)", ""}}}}, TableFunctionFactory::CaseInsensitive); }