diff --git a/src/Processors/Sources/PythonSource.cpp b/src/Processors/Sources/PythonSource.cpp index ff65a76dcf6..119f7d0539d 100644 --- a/src/Processors/Sources/PythonSource.cpp +++ b/src/Processors/Sources/PythonSource.cpp @@ -1,13 +1,20 @@ +#include #include #include +#include #include +#include #include +#include #include #include +#include #include #include #include #include +#include +#include namespace DB { @@ -18,57 +25,37 @@ PythonSource::PythonSource(std::shared_ptr reader_, const Block & samp } template -ColumnPtr convert_and_insert(py::object obj) +ColumnPtr convert_and_insert(py::object obj, UInt32 scale = 0) { - auto column = ColumnVector::create(); - // if obj is a list - if (py::isinstance(obj)) - { - py::list list = obj.cast(); - for (auto && i : list) - column->insert(i.cast()); - // free the list - list.dec_ref(); - } - else if (py::isinstance(obj)) // if obj is a numpy array - { - py::array array = obj.cast(); - //chdb: array is a numpy array, so we can directly cast it to a vector? - for (auto && i : array) - column->insert(i.cast()); - // free the array, until we implement with zero copy - array.dec_ref(); - } + MutableColumnPtr column; + if constexpr (std::is_same_v || std::is_same_v || std::is_same_v) + column = ColumnDecimal::create(0, scale); + else if constexpr (std::is_same_v) + column = ColumnString::create(); else - { - throw Exception(ErrorCodes::BAD_TYPE_OF_FIELD, "Unsupported type {}", obj.get_type().attr("__name__").cast()); - } - return column; -} + column = ColumnVector::create(); -template <> -ColumnPtr convert_and_insert(py::object obj) -{ - auto column = ColumnString::create(); if (py::isinstance(obj)) { py::list list = obj.cast(); for (auto && i : list) - column->insert(i.cast()); - // free the list + column->insert(i.cast()); list.dec_ref(); } else if (py::isinstance(obj)) { py::array array = obj.cast(); for (auto && i : array) - column->insert(i.cast()); - // free the array, until we implement with zero copy + column->insert(i.cast()); array.dec_ref(); } else { - throw Exception(ErrorCodes::BAD_TYPE_OF_FIELD, "Unsupported type {}", obj.get_type().attr("__name__").cast()); + throw Exception( + ErrorCodes::BAD_TYPE_OF_FIELD, + "Unsupported type {} for value {}", + obj.get_type().attr("__name__").cast(), + py::str(obj).cast()); } return column; } @@ -76,12 +63,12 @@ ColumnPtr convert_and_insert(py::object obj) Chunk PythonSource::generate() { size_t num_rows = 0; - + std::vector data; try { // GIL is held when called from Python code. Release it to avoid deadlock py::gil_scoped_release release; - std::vector data = reader->read(description.sample_block.getNames(), max_block_size); + data = reader->read(description.sample_block.getNames(), max_block_size); LOG_DEBUG(logger, "Read {} columns", data.size()); LOG_DEBUG(logger, "Need {} columns", description.sample_block.columns()); @@ -122,31 +109,58 @@ Chunk PythonSource::generate() num_rows = py::len(data[i]); const auto & column = data[i]; const auto & type = description.sample_block.getByPosition(i).type; + WhichDataType which(type); - if (type->equals(*std::make_shared())) + if (which.isUInt8()) columns[i] = convert_and_insert(column); - else if (type->equals(*std::make_shared())) + else if (which.isUInt16()) columns[i] = convert_and_insert(column); - else if (type->equals(*std::make_shared())) + else if (which.isUInt32()) columns[i] = convert_and_insert(column); - else if (type->equals(*std::make_shared())) + else if (which.isUInt64()) columns[i] = convert_and_insert(column); - else if (type->equals(*std::make_shared())) + else if (which.isUInt128()) + columns[i] = convert_and_insert(column); + else if (which.isUInt256()) + columns[i] = convert_and_insert(column); + else if (which.isInt8()) columns[i] = convert_and_insert(column); - else if (type->equals(*std::make_shared())) + else if (which.isInt16()) columns[i] = convert_and_insert(column); - else if (type->equals(*std::make_shared())) + else if (which.isInt32()) columns[i] = convert_and_insert(column); - else if (type->equals(*std::make_shared())) + else if (which.isInt64()) columns[i] = convert_and_insert(column); - else if (type->equals(*std::make_shared())) + else if (which.isInt128()) + columns[i] = convert_and_insert(column); + else if (which.isInt256()) + columns[i] = convert_and_insert(column); + else if (which.isFloat32()) columns[i] = convert_and_insert(column); - else if (type->equals(*std::make_shared())) + else if (which.isFloat64()) columns[i] = convert_and_insert(column); - else if (type->equals(*std::make_shared())) + else if (which.isDecimal128()) + { + const auto & dtype = typeid_cast *>(type.get()); + columns[i] = convert_and_insert(column, dtype->getScale()); + } + else if (which.isDecimal256()) + { + const auto & dtype = typeid_cast *>(type.get()); + columns[i] = convert_and_insert(column, dtype->getScale()); + } + else if (which.isDateTime()) + columns[i] = convert_and_insert(column); + else if (which.isDateTime64()) + columns[i] = convert_and_insert(column); + else if (which.isString()) columns[i] = convert_and_insert(column); else - throw Exception(ErrorCodes::BAD_TYPE_OF_FIELD, "Unsupported type {}", type->getName()); + throw Exception( + ErrorCodes::BAD_TYPE_OF_FIELD, + "Unsupported type {} for column {}", + type->getName(), + description.sample_block.getByPosition(i).name); } // Set data vector to empty to avoid trigger py::object destructor without GIL // Note: we have already manually decremented the reference count of the list or array in `convert_and_insert` function diff --git a/src/Storages/StoragePython.cpp b/src/Storages/StoragePython.cpp index bd56bd34231..2e214f48c84 100644 --- a/src/Storages/StoragePython.cpp +++ b/src/Storages/StoragePython.cpp @@ -1,14 +1,26 @@ #include +#include +#include +#include +#include +#include #include #include #include +#include #include #include #include +#include #include +#include #include +#include #include +#include +#include #include +#include #include @@ -22,8 +34,6 @@ extern const int LOGICAL_ERROR; extern const int BAD_TYPE_OF_FIELD; } -namespace py = pybind11; - StoragePython::StoragePython( const StorageID & table_id_, @@ -66,6 +76,126 @@ Block StoragePython::prepareSampleBlock(const Names & column_names, const Storag return sample_block; } +ColumnsDescription StoragePython::getTableStructureFromData(std::shared_ptr reader) +{ + if (!reader) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Python reader not initialized"); + auto schema = reader->getSchema(); + + auto * logger = &Poco::Logger::get("StoragePython"); + if (logger->debug()) + { + LOG_DEBUG(logger, "Schema content:"); + for (const auto & item : schema) + LOG_DEBUG(logger, "Column: {}, Type: {}", String(item.first), String(item.second)); + } + + NamesAndTypesList names_and_types; + + // Define regular expressions for different data types + RE2 pattern_int(R"(\bint(\d+))"); + RE2 pattern_generic_int(R"(\bint\b|)"); // Matches generic 'int' + RE2 pattern_uint(R"(\buint(\d+))"); + RE2 pattern_float(R"(\b(float|double)(\d+))"); + RE2 pattern_decimal128(R"(decimal128\((\d+),\s*(\d+)\))"); + RE2 pattern_decimal256(R"(decimal256\((\d+),\s*(\d+)\))"); + RE2 pattern_date32(R"(\bdate32\b)"); + RE2 pattern_date64(R"(\bdate64\b)"); + RE2 pattern_time32(R"(\btime32\b)"); + RE2 pattern_time64_us(R"(\btime64\[us\]\b)"); + RE2 pattern_time64_ns(R"(\btime64\[ns\]\b)"); + RE2 pattern_string_binary(R"(\bstring\b||str|DataType\(string\)|DataType\(binary\)|dtype\[object_\]|dtype\('O'\))"); + + // Iterate through each pair of name and type string in the schema + for (const auto & [name, typeStr] : schema) + { + std::shared_ptr data_type; + + std::string bits, precision, scale; + if (RE2::PartialMatch(typeStr, pattern_int, &bits)) + { + if (bits == "8") + data_type = std::make_shared(); + else if (bits == "16") + data_type = std::make_shared(); + else if (bits == "32") + data_type = std::make_shared(); + else if (bits == "64") + data_type = std::make_shared(); + else if (bits == "128") + data_type = std::make_shared(); + else if (bits == "256") + data_type = std::make_shared(); + } + else if (RE2::PartialMatch(typeStr, pattern_uint, &bits)) + { + if (bits == "8") + data_type = std::make_shared(); + else if (bits == "16") + data_type = std::make_shared(); + else if (bits == "32") + data_type = std::make_shared(); + else if (bits == "64") + data_type = std::make_shared(); + else if (bits == "128") + data_type = std::make_shared(); + else if (bits == "256") + data_type = std::make_shared(); + } + else if (RE2::PartialMatch(typeStr, pattern_generic_int)) + { + data_type = std::make_shared(); // Default to 64-bit integers for generic 'int' + } + else if (RE2::PartialMatch(typeStr, pattern_float, &bits)) + { + if (bits == "32") + data_type = std::make_shared(); + else if (bits == "64") + data_type = std::make_shared(); + } + else if (RE2::PartialMatch(typeStr, pattern_decimal128, &precision, &scale)) + { + data_type = std::make_shared(std::stoi(precision), std::stoi(scale)); + } + else if (RE2::PartialMatch(typeStr, pattern_decimal256, &precision, &scale)) + { + data_type = std::make_shared(std::stoi(precision), std::stoi(scale)); + } + else if (RE2::PartialMatch(typeStr, pattern_date32)) + { + data_type = std::make_shared(); + } + else if (RE2::PartialMatch(typeStr, pattern_date64)) + { + data_type = std::make_shared(3); // date64 corresponds to DateTime64(3) + } + else if (RE2::PartialMatch(typeStr, pattern_time32)) + { + data_type = std::make_shared(); + } + else if (RE2::PartialMatch(typeStr, pattern_time64_us)) + { + data_type = std::make_shared(6); // time64[us] corresponds to DateTime64(6) + } + else if (RE2::PartialMatch(typeStr, pattern_time64_ns)) + { + data_type = std::make_shared(9); // time64[ns] corresponds to DateTime64(9) + } + else if (RE2::PartialMatch(typeStr, pattern_string_binary)) + { + data_type = std::make_shared(); + } + else + { + throw Exception(ErrorCodes::TYPE_MISMATCH, "Unrecognized data type: {}", typeStr); + } + + names_and_types.push_back({name, data_type}); + } + + return ColumnsDescription(names_and_types); +} + void registerStoragePython(StorageFactory & factory) { factory.registerStorage( diff --git a/src/Storages/StoragePython.h b/src/Storages/StoragePython.h index bbfd0e1d99a..12b835137b8 100644 --- a/src/Storages/StoragePython.h +++ b/src/Storages/StoragePython.h @@ -3,11 +3,16 @@ #include #include #include +#include #include #include +#include +#include #include #include +#include #include +#include #include @@ -21,26 +26,111 @@ class PyReader explicit PyReader(const py::object & data) : data(data) { } virtual ~PyReader() = default; + // Read `count` rows from the data, and return a list of columns + // chdb todo: maybe return py::list is better, but this is just a shallow copy virtual std::vector read(const std::vector & col_names, int count) = 0; + // Return a vector of column names and their types, as a list of pairs. + // The order is important, and should match the order of the data. + // This is the default implementation, which trys to infer the schema from the every first row + // of this.data column. + // The logic is: + // 1. If the data is a map with column names as keys and column data as values, then we use + // the key and type of every first element in the value list. + // eg: + // d = {'a': [1, 2, 3], 'b': ['x', 'y', 'z'], 'c': [1.0, 1e10, 1.2e100]} + // schema = {name: repr(type(value[0])) for name, value in d.items()} + // out: + // schema = {'a': "", 'b': "", 'c': ""} + // 2. If the data is a Pandas DataFrame, then we use the column names and dtypes. + // We use the repr of the dtype, which is a string representation of the dtype. + // eg: + // df = pd.DataFrame(d) + // schema = {name: repr(dtype) for name, dtype in zip(df.columns, df.dtypes)} + // out: + // schema = {'a': "dtype('int64')", 'b': "dtype('O')", 'c': "dtype('float64')"} + // Note: + // 1. dtype('O') means object type, which is a catch-all for any types. we just treat it as string. + // 2. the dtype of a Pandas DataFrame is a numpy.dtype object, which is not a Python type object. + // + // When using Pandas >= 2.0, we can use the pyarrow as dtype_backend: + // eg: + // df_arr = pd.read_json('{"a": [1, 2, 3], "b": ["x", "y", "z"], "c": [1.0, 1.111, 2.222]}', dtype_backend="pyarrow") + // schema = {name: repr(dtype) for name, dtype in zip(df_arr.columns, df_arr.dtypes)} + // out: + // schema = {'a': 'int64[pyarrow]', 'b': 'string[pyarrow]', 'c': 'double[pyarrow]'} + // 3. if the data is a Pyarrow Table, then we use the column names and types. + // eg: + // tbl = pa.Table.from_pandas(df) + // schema = {field.name: repr(field.type) for field in tbl.schema} + // out: + // schema = {'a': 'DataType(int64)', 'b': 'DataType(string)', 'c': 'DataType(double)'} + // 4. User can override this function to provide a more accurate schema. + // eg: "DataTypeUInt8", "DataTypeUInt16", "DataTypeUInt32", "DataTypeUInt64", "DataTypeUInt128", "DataTypeUInt256", + // "DataTypeInt8", "DataTypeInt16", "DataTypeInt32", "DataTypeInt64", "DataTypeInt128", "DataTypeInt256", + // "DataTypeFloat32", "DataTypeFloat64", "DataTypeString", + + std::vector> getSchema() + { + std::vector> schema; + + if (py::isinstance(data)) + { + // If the data is a Python dictionary + for (auto item : data.cast()) + { + std::string key = py::str(item.first).cast(); + py::list values = py::cast(item.second); + std::string dtype = py::str(values[0].attr("__class__").attr("__name__")).cast(); + if (!values.empty()) + schema.emplace_back(key, dtype); + } + } + else if (py::hasattr(data, "dtypes")) + { + // If the data is a Pandas DataFrame + py::object dtypes = data.attr("dtypes"); + py::list columns = data.attr("columns"); + for (size_t i = 0; i < py::len(columns); ++i) + { + std::string name = py::str(columns[i]).cast(); + std::string dtype = py::str(py::repr(dtypes[columns[i]])).cast(); + schema.emplace_back(name, dtype); + } + } + else if (py::hasattr(data, "schema")) + { + // If the data is a Pyarrow Table + py::object schema_fields = data.attr("schema").attr("fields"); + for (auto field : schema_fields) + { + std::string name = py::str(field.attr("name")).cast(); + std::string dtype = py::str(py::repr(field.attr("type"))).cast(); + schema.emplace_back(name, dtype); + } + } + return schema; + } + protected: py::object data; }; // Trampoline class -// Zsee: https://pybind11.readthedocs.io/en/stable/advanced/classes.html#trampolines +// see: https://pybind11.readthedocs.io/en/stable/advanced/classes.html#trampolines class PyReaderTrampoline : public PyReader { public: using PyReader::PyReader; // Inherit constructors + // Just forward the virtual function call to Python std::vector read(const std::vector & col_names, int count) override { PYBIND11_OVERRIDE_PURE( std::vector, // Return type List[object] - PyReader, // Parent class - read, // Name of the function in C++ (must match Python name) - col_names, // Argument(s) + PyReader, // Parent class + read, // Name of the function in C++ (must match Python name) + col_names, // Argument(s) count); } }; @@ -69,6 +159,11 @@ class StoragePython : public IStorage, public WithContext size_t num_streams) override; Block prepareSampleBlock(const Names & column_names, const StorageSnapshotPtr & storage_snapshot); + + static ColumnsDescription getTableStructureFromData(std::shared_ptr reader); + +private: + Poco::Logger * logger = &Poco::Logger::get("StoragePython"); }; void registerStoragePython(StorageFactory & factory); diff --git a/src/TableFunctions/TableFunctionPython.cpp b/src/TableFunctions/TableFunctionPython.cpp index 55bafaefc20..4504f3a2204 100644 --- a/src/TableFunctions/TableFunctionPython.cpp +++ b/src/TableFunctions/TableFunctionPython.cpp @@ -86,8 +86,7 @@ StoragePtr TableFunctionPython::executeImpl( ColumnsDescription TableFunctionPython::getActualTableStructure(ContextPtr /*context*/, bool /*is_insert_query*/) const { - //chdb todo - return ColumnsDescription({{"a", std::make_shared()}, {"b", std::make_shared()}}); + return StoragePython::getTableStructureFromData(reader); } void registerTableFunctionPython(TableFunctionFactory & factory)