Skip to content

Commit

Permalink
Move prepareColumnCache to StoragePython
Browse files Browse the repository at this point in the history
  • Loading branch information
auxten committed Jun 3, 2024
1 parent 2f2397a commit 12d41da
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 76 deletions.
19 changes: 19 additions & 0 deletions src/Common/PythonUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <cstddef>
#include <stdexcept>
// #include <unicodeobject.h>
#include <DataTypes/Serializations/SerializationNumber.h>
#include <pybind11/gil.h>
#include <pybind11/numpy.h>
#include <pybind11/pybind11.h>
Expand All @@ -14,6 +15,7 @@
#include <unicode/utypes.h>
#include <Common/Exception.h>


namespace DB
{

Expand All @@ -25,6 +27,23 @@ extern const int NOT_IMPLEMENTED;
namespace py = pybind11;


struct ColumnWrapper
{
void * buf; // we may modify the data when cast it to PyObject **, so we need a non-const pointer
size_t row_count;
py::handle data;
DataTypePtr dest_type;
std::string py_type; //py::handle type, eg. numpy.ndarray;
std::string row_format;
std::string encoding; // utf8, utf16, utf32, etc.
std::string name;
};

using PyObjectVec = std::vector<py::object>;
using PyObjectVecPtr = std::shared_ptr<PyObjectVec>;
using PyColumnVec = std::vector<ColumnWrapper>;
using PyColumnVecPtr = std::shared_ptr<PyColumnVec>;

// Template wrapper function to handle any return type
template <typename Func, typename... Args>
auto execWithGIL(Func func, Args &&... args) -> decltype(func(std::forward<Args>(args)...))
Expand Down
44 changes: 10 additions & 34 deletions src/Processors/Sources/PythonSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,16 @@ extern const int PY_EXCEPTION_OCCURED;
PythonSource::PythonSource(
py::object & data_source_,
const Block & sample_block_,
const UInt64 max_block_size_,
const size_t stream_index,
const size_t num_streams)
PyColumnVecPtr column_cache,
size_t data_source_row_count,
size_t max_block_size_,
size_t stream_index,
size_t num_streams)
: ISource(sample_block_.cloneEmpty())
, data_source(data_source_)
, sample_block(sample_block_)
, column_cache(column_cache)
, data_source_row_count(data_source_row_count)
, max_block_size(max_block_size_)
, stream_index(stream_index)
, num_streams(num_streams)
Expand Down Expand Up @@ -261,6 +266,8 @@ PythonSource::scanData(const py::object & data, const std::vector<std::string> &
return std::move(block);
}



Chunk PythonSource::scanDataToChunk()
{
auto names = description.sample_block.getNames();
Expand All @@ -276,37 +283,6 @@ Chunk PythonSource::scanDataToChunk()
if (names.size() != columns.size())
throw Exception(ErrorCodes::PY_EXCEPTION_OCCURED, "Column cache size mismatch");

{
// check column cache with GIL holded
py::gil_scoped_acquire acquire;
if (column_cache == nullptr)
{
// fill in the cache
column_cache = std::make_shared<PyColumnVec>(columns.size());
for (size_t i = 0; i < columns.size(); ++i)
{
const auto & col_name = names[i];
auto & col = (*column_cache)[i];
col.name = col_name;
try
{
py::object col_data = data_source[py::str(col_name)];
col.buf = const_cast<void *>(tryGetPyArray(col_data, col.data, col.py_type, col.row_count));
if (col.buf == nullptr)
throw Exception(
ErrorCodes::PY_EXCEPTION_OCCURED, "Convert to array failed for column {} type {}", col_name, col.py_type);
col.dest_type = description.sample_block.getByPosition(i).type;
data_source_row_count = col.row_count;
}
catch (const Exception & e)
{
LOG_ERROR(logger, "Error processing column {}: {}", col_name, e.what());
throw;
}
}
}
}

auto rows_per_stream = data_source_row_count / num_streams;
auto start = stream_index * rows_per_stream;
auto end = (stream_index + 1) * rows_per_stream;
Expand Down
33 changes: 12 additions & 21 deletions src/Processors/Sources/PythonSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include <pybind11/pybind11.h>
#include <pybind11/pytypes.h>
#include <Poco/Logger.h>
#include "DataTypes/IDataType.h"
#include <Common/PythonUtils.h>

namespace DB
{
Expand All @@ -17,28 +17,18 @@ namespace py = pybind11;

class PyReader;

struct ColumnWrapper
{
void * buf; // we may modify the data when cast it to PyObject **, so we need a non-const pointer
size_t row_count;
py::handle data;
DataTypePtr dest_type;
std::string py_type; //py::handle type, eg. numpy.ndarray;
std::string row_format;
std::string encoding; // utf8, utf16, utf32, etc.
std::string name;
};

using PyObjectVec = std::vector<py::object>;
using PyObjectVecPtr = std::shared_ptr<PyObjectVec>;
using PyColumnVec = std::vector<ColumnWrapper>;
using PyColumnVecPtr = std::shared_ptr<PyColumnVec>;


class PythonSource : public ISource
{
public:
PythonSource(py::object & data_source_, const Block & sample_block_, UInt64 max_block_size_, size_t stream_index, size_t num_streams);
PythonSource(
py::object & data_source_,
const Block & sample_block_,
PyColumnVecPtr column_cache,
size_t data_source_row_count,
size_t max_block_size_,
size_t stream_index,
size_t num_streams);

~PythonSource() override = default;

Expand All @@ -52,18 +42,19 @@ class PythonSource : public ISource

Block sample_block;
PyColumnVecPtr column_cache;

size_t data_source_row_count;
const UInt64 max_block_size;
// Caller will only pass stream index and total stream count
// to the constructor, we need to calculate the start offset and end offset.
const size_t stream_index;
const size_t num_streams;
size_t cursor;
size_t data_source_row_count;

Poco::Logger * logger = &Poco::Logger::get("TableFunctionPython");
ExternalResultDescription description;

PyObjectVecPtr scanData(const py::object & data, const std::vector<std::string> & col_names, size_t & cursor, size_t count);
void prepareColumnCache(Names & names, Columns & columns);
Chunk scanDataToChunk();
void destory(PyObjectVecPtr & data);
};
Expand Down
62 changes: 42 additions & 20 deletions src/Storages/StoragePython.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,29 +63,19 @@ Pipe StoragePython::read(
storage_snapshot->check(column_names);

Block sample_block = prepareSampleBlock(column_names, storage_snapshot);
// check if string type column involved
bool has_string_column = false;
for (const auto & column_name : column_names)
{
if (sample_block.getByName(column_name).type->getName() == "String")
{
has_string_column = true;
break;
}
}

// num_streams = 3; // for testing

// Converting Python str to ClickHouse String type will cost a lot of time.
// so if string column involved and not using PyReader return multiple streams.
if (has_string_column && !isInheritsFromPyReader(data_source))
{
Pipes pipes;
for (size_t stream = 0; stream < num_streams; ++stream)
pipes.emplace_back(std::make_shared<PythonSource>(data_source, sample_block, max_block_size, stream, num_streams));
return Pipe::unitePipes(std::move(pipes));
}
return Pipe(std::make_shared<PythonSource>(data_source, sample_block, max_block_size, 0, 1));
prepareColumnCache(column_names, sample_block.getColumns(), sample_block);

if (isInheritsFromPyReader(data_source))
return Pipe(std::make_shared<PythonSource>(data_source, sample_block, column_cache, data_source_row_count, max_block_size, 0, 1));

Pipes pipes;
for (size_t stream = 0; stream < num_streams; ++stream)
pipes.emplace_back(std::make_shared<PythonSource>(
data_source, sample_block, column_cache, data_source_row_count, max_block_size, stream, num_streams));
return Pipe::unitePipes(std::move(pipes));
}

Block StoragePython::prepareSampleBlock(const Names & column_names, const StorageSnapshotPtr & storage_snapshot)
Expand All @@ -99,6 +89,38 @@ Block StoragePython::prepareSampleBlock(const Names & column_names, const Storag
return sample_block;
}

void StoragePython::prepareColumnCache(const Names & names, const Columns & columns, const Block & sample_block)
{
// check column cache with GIL holded
py::gil_scoped_acquire acquire;
if (column_cache == nullptr)
{
// fill in the cache
column_cache = std::make_shared<PyColumnVec>(columns.size());
for (size_t i = 0; i < columns.size(); ++i)
{
const auto & col_name = names[i];
auto & col = (*column_cache)[i];
col.name = col_name;
try
{
py::object col_data = data_source[py::str(col_name)];
col.buf = const_cast<void *>(tryGetPyArray(col_data, col.data, col.py_type, col.row_count));
if (col.buf == nullptr)
throw Exception(
ErrorCodes::PY_EXCEPTION_OCCURED, "Convert to array failed for column {} type {}", col_name, col.py_type);
col.dest_type = sample_block.getByPosition(i).type;
data_source_row_count = col.row_count;
}
catch (const Exception & e)
{
LOG_ERROR(logger, "Error processing column {}: {}", col_name, e.what());
throw;
}
}
}
}

ColumnsDescription StoragePython::getTableStructureFromData(py::object data_source)
{
if (!data_source)
Expand Down
6 changes: 5 additions & 1 deletion src/Storages/StoragePython.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#include <pybind11/stl.h>
#include <pybind11/stl_bind.h>
#include <Common/Exception.h>
#include "object.h"
#include <Common/PythonUtils.h>


namespace DB
Expand All @@ -26,6 +26,7 @@ namespace ErrorCodes
{
extern const int UNKNOWN_FORMAT;
extern const int NOT_IMPLEMENTED;
extern const int PY_EXCEPTION_OCCURED;
}
class PyReader
{
Expand Down Expand Up @@ -169,7 +170,10 @@ class StoragePython : public IStorage, public WithContext
static ColumnsDescription getTableStructureFromData(py::object data_source);

private:
void prepareColumnCache(const Names & names, const Columns & columns, const Block & sample_block);
py::object data_source;
PyColumnVecPtr column_cache;
size_t data_source_row_count;
Poco::Logger * logger = &Poco::Logger::get("StoragePython");
};

Expand Down

0 comments on commit 12d41da

Please sign in to comment.