From 1b728f0dc30b0a33439b8882c73aa9877f7d301d Mon Sep 17 00:00:00 2001 From: Corey Kosak Date: Fri, 22 Mar 2024 19:04:44 -0400 Subject: [PATCH] C++ Client: fix FillChunk and missing types for ticking callbacks (#5286) * C++ Client: fix FillChunk and missing types for ticking callbacks * Respond to review feedback --- .../client/arrowutil/arrow_column_source.h | 2 + .../src/subscription/subscribe_thread.cc | 22 ++- .../public/deephaven/dhcore/chunk/chunk.h | 2 + .../include/public/deephaven/dhcore/types.h | 8 + .../dhcore/src/ticking/immer_table_state.cc | 5 + cpp-client/deephaven/tests/ticking_test.cc | 146 +++++++++++++++++- 6 files changed, 181 insertions(+), 4 deletions(-) diff --git a/cpp-client/deephaven/dhclient/include/private/deephaven/client/arrowutil/arrow_column_source.h b/cpp-client/deephaven/dhclient/include/private/deephaven/client/arrowutil/arrow_column_source.h index 2625eed46f0..b55adea3af7 100644 --- a/cpp-client/deephaven/dhclient/include/private/deephaven/client/arrowutil/arrow_column_source.h +++ b/cpp-client/deephaven/dhclient/include/private/deephaven/client/arrowutil/arrow_column_source.h @@ -64,6 +64,7 @@ class GenericBackingStore { *dest = ElementType(); is_null = true; } + ++dest; if (optional_null_flags != nullptr) { *optional_null_flags++ = is_null; } @@ -187,6 +188,7 @@ class GenericArrowColumnSource final : public deephaven::dhcore::column::Generic internal::GenericBackingStore backingStore_; }; +using ArrowCharColumnSource = NumericArrowColumnSource; using ArrowInt8ColumnSource = NumericArrowColumnSource; using ArrowInt16ColumnSource = NumericArrowColumnSource; using ArrowInt32ColumnSource = NumericArrowColumnSource; diff --git a/cpp-client/deephaven/dhclient/src/subscription/subscribe_thread.cc b/cpp-client/deephaven/dhclient/src/subscription/subscribe_thread.cc index 7c45533023b..c21c25c454c 100644 --- a/cpp-client/deephaven/dhclient/src/subscription/subscribe_thread.cc +++ b/cpp-client/deephaven/dhclient/src/subscription/subscribe_thread.cc @@ -24,12 +24,15 @@ using deephaven::dhcore::ticking::TickingCallback; using deephaven::dhcore::utility::MakeReservedVector; using deephaven::dhcore::utility::separatedList; using deephaven::dhcore::utility::VerboseCast; +using deephaven::client::arrowutil::ArrowBooleanColumnSource; +using deephaven::client::arrowutil::ArrowCharColumnSource; +using deephaven::client::arrowutil::ArrowDateTimeColumnSource; +using deephaven::client::arrowutil::ArrowFloatColumnSource; +using deephaven::client::arrowutil::ArrowDoubleColumnSource; using deephaven::client::arrowutil::ArrowInt8ColumnSource; using deephaven::client::arrowutil::ArrowInt16ColumnSource; using deephaven::client::arrowutil::ArrowInt32ColumnSource; using deephaven::client::arrowutil::ArrowInt64ColumnSource; -using deephaven::client::arrowutil::ArrowBooleanColumnSource; -using deephaven::client::arrowutil::ArrowDateTimeColumnSource; using deephaven::client::arrowutil::ArrowStringColumnSource; using deephaven::client::utility::Executor; using deephaven::client::utility::OkOrThrow; @@ -275,11 +278,26 @@ struct ArrayToColumnSourceVisitor final : public arrow::ArrayVisitor { return arrow::Status::OK(); } + arrow::Status Visit(const arrow::FloatArray &array) final { + result_ = ArrowFloatColumnSource::Create(std::move(storage_), &array); + return arrow::Status::OK(); + } + + arrow::Status Visit(const arrow::DoubleArray &array) final { + result_ = ArrowDoubleColumnSource::Create(std::move(storage_), &array); + return arrow::Status::OK(); + } + arrow::Status Visit(const arrow::BooleanArray &array) final { result_ = ArrowBooleanColumnSource::Create(std::move(storage_), &array); return arrow::Status::OK(); } + arrow::Status Visit(const arrow::UInt16Array &array) final { + result_ = ArrowCharColumnSource::Create(std::move(storage_), &array); + return arrow::Status::OK(); + } + arrow::Status Visit(const arrow::StringArray &array) final { result_ = ArrowStringColumnSource::Create(std::move(storage_), &array); return arrow::Status::OK(); diff --git a/cpp-client/deephaven/dhcore/include/public/deephaven/dhcore/chunk/chunk.h b/cpp-client/deephaven/dhcore/include/public/deephaven/dhcore/chunk/chunk.h index 27e80f6536e..c7284f79ecb 100644 --- a/cpp-client/deephaven/dhcore/include/public/deephaven/dhcore/chunk/chunk.h +++ b/cpp-client/deephaven/dhcore/include/public/deephaven/dhcore/chunk/chunk.h @@ -51,6 +51,8 @@ class Chunk { template class GenericChunk final : public Chunk { public: + using value_type = T; + /** * Factory method. Create a Chunk having the specified size, with a privately allocated buffer. */ diff --git a/cpp-client/deephaven/dhcore/include/public/deephaven/dhcore/types.h b/cpp-client/deephaven/dhcore/include/public/deephaven/dhcore/types.h index b00dc65f807..7cae2d32b53 100644 --- a/cpp-client/deephaven/dhcore/include/public/deephaven/dhcore/types.h +++ b/cpp-client/deephaven/dhcore/include/public/deephaven/dhcore/types.h @@ -390,6 +390,14 @@ class DateTime { int64_t nanos_ = 0; friend std::ostream &operator<<(std::ostream &s, const DateTime &o); + + friend bool operator==(const DateTime &lhs, const DateTime &rhs) { + return lhs.nanos_ == rhs.nanos_; + } + + friend bool operator!=(const DateTime &lhs, const DateTime &rhs) { + return !(lhs == rhs); + } }; } // namespace deephaven::dhcore diff --git a/cpp-client/deephaven/dhcore/src/ticking/immer_table_state.cc b/cpp-client/deephaven/dhcore/src/ticking/immer_table_state.cc index 0dae82f474c..cb1336d47fa 100644 --- a/cpp-client/deephaven/dhcore/src/ticking/immer_table_state.cc +++ b/cpp-client/deephaven/dhcore/src/ticking/immer_table_state.cc @@ -51,6 +51,11 @@ class MyTable final : public ClientTable { [[nodiscard]] std::shared_ptr GetColumn(size_t column_index) const final { + if (column_index >= sources_.size()) { + auto message = fmt::format("Requested column index {} >= num columns {}", column_index, + sources_.size()); + throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message)); + } return sources_[column_index]; } diff --git a/cpp-client/deephaven/tests/ticking_test.cc b/cpp-client/deephaven/tests/ticking_test.cc index 317697c366e..f71926ca1eb 100644 --- a/cpp-client/deephaven/tests/ticking_test.cc +++ b/cpp-client/deephaven/tests/ticking_test.cc @@ -7,12 +7,26 @@ #include "tests/third_party/catch.hpp" #include "tests/test_util.h" #include "deephaven/client/client.h" +#include "deephaven/dhcore/chunk/chunk_maker.h" #include "deephaven/dhcore/utility/utility.h" using deephaven::client::Client; using deephaven::client::TableHandle; using deephaven::client::utility::TableMaker; +using deephaven::dhcore::DateTime; +using deephaven::dhcore::chunk::ChunkMaker; +using deephaven::dhcore::chunk::BooleanChunk; +using deephaven::dhcore::chunk::DateTimeChunk; +using deephaven::dhcore::chunk::DoubleChunk; +using deephaven::dhcore::chunk::FloatChunk; +using deephaven::dhcore::chunk::Int8Chunk; +using deephaven::dhcore::chunk::Int16Chunk; +using deephaven::dhcore::chunk::Int32Chunk; using deephaven::dhcore::chunk::Int64Chunk; +using deephaven::dhcore::chunk::StringChunk; +using deephaven::dhcore::clienttable::ClientTable; +using deephaven::dhcore::container::RowSequence; +using deephaven::dhcore::utility::MakeReservedVector; namespace deephaven::client::tests { class CommonBase : public deephaven::dhcore::ticking::TickingCallback { @@ -44,7 +58,6 @@ class CommonBase : public deephaven::dhcore::ticking::TickingCallback { std::condition_variable cond_var_; bool done_ = false; std::exception_ptr exception_ptr_; - }; class ReachesNRowsCallback final : public CommonBase { @@ -127,7 +140,6 @@ TEST_CASE("Ticking Table modified rows are eventually all greater than 10", "[ti auto table = tm.TimeTable(std::chrono::milliseconds(500)) .View({"Key = (long)(ii % 10)", "Value = ii"}) .LastBy("Key"); - table.BindToVariable("ticking"); auto callback = std::make_shared(target); auto cookie = table.Subscribe(callback); @@ -143,4 +155,134 @@ TEST_CASE("Ticking Table modified rows are eventually all greater than 10", "[ti table.Unsubscribe(std::move(cookie)); } + +class WaitForPopulatedTableCallback final : public CommonBase { +public: + explicit WaitForPopulatedTableCallback(int64_t target) : target_(target) {} + + void OnTick(deephaven::dhcore::ticking::TickingUpdate update) final { + const auto ¤t = update.Current(); + std::cout << "=== The Full Table ===\n" + << current->Stream(true, true) + << '\n'; + + if (current->NumRows() < static_cast(target_)) { + // table not yet fully populated. + return; + } + + if (current->NumRows() > static_cast(target_)) { + // table has more rows than expected. + auto message = fmt::format("Expected table to have {} rows, got {}", + target_, current->NumRows()); + throw std::runtime_error(message); + } + + auto int8s = MakeReservedVector(target_); + auto int16s = MakeReservedVector(target_); + auto int32s = MakeReservedVector(target_); + auto int64s = MakeReservedVector(target_); + auto floats = MakeReservedVector(target_); + auto doubles = MakeReservedVector(target_); + auto bools = MakeReservedVector(target_); + auto strings = MakeReservedVector(target_); + auto date_times = MakeReservedVector(target_); + + auto date_time_start = DateTime::Parse("2001-03-01T12:34:56Z"); + + for (int64_t i = 0; i != target_; ++i) { + int8s.push_back(i); + int16s.push_back(i); + int32s.push_back(i); + int64s.push_back(i); + floats.push_back(i); + doubles.push_back(i); + bools.push_back((i % 2) == 0); + strings.push_back(fmt::format("hello {}", i)); + date_times.push_back(DateTime::FromNanos(date_time_start.Nanos() + i)); + } + + CompareColumn(*current, "Bytes", int8s); + CompareColumn(*current, "Shorts", int16s); + CompareColumn(*current, "Ints", int32s); + CompareColumn(*current, "Longs", int64s); + CompareColumn(*current, "Floats", floats); + CompareColumn(*current, "Doubles", doubles); + CompareColumn(*current, "Strings", strings); + CompareColumn(*current, "Bools", bools); + CompareColumn(*current, "DateTimes", date_times); + + NotifyDone(); + } + + template + void CompareColumn(const ClientTable &table, std::string_view column_name, + const std::vector &expected) { + static_assert(std::is_same_v); + if (expected.size() != table.NumRows()) { + auto message = fmt::format("Expected 'expected' to have size {}, have {}", + table.NumRows(), expected.size()); + throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message)); + } + auto cs = table.GetColumn(column_name, true); + auto rs = RowSequence::CreateSequential(0, table.NumRows()); + auto chunk = ChunkType::Create(table.NumRows()); + cs->FillChunk(*rs, &chunk, nullptr); + + for (size_t row_num = 0; row_num != expected.size(); ++row_num) { + const auto &expected_elt = expected[row_num]; + const auto &actual_elt = chunk.data()[row_num]; + if (expected_elt != actual_elt) { + auto message = fmt::format(R"(In column "{}", row {}, expected={}, actual={})", + column_name, row_num, expected_elt, actual_elt); + throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message)); + } + } + } + +private: + int64_t target_ = 0; +}; + +TEST_CASE("Ticking Table all the data is eventually present", "[ticking]") { + const int64_t target = 10; + auto client = TableMakerForTests::CreateClient(); + auto tm = client.GetManager(); + + tm.RunScript(R"(from deephaven.time import dh_now +Zamboni = dh_now() +)"); + // would like the above to say Zamboni = DateTimeUtils.fromEpochSecond(0) + + auto table = tm.TimeTable("PT0:00:0.5") + .Update({"II = (int)((ii * 7) % 10)", + "Bytes = (byte)II", + "Chars = (char)(II + 'a')", + "Shorts = (short)II", + "Ints = (int)II", + "Longs = (long)II", + "Floats = (float)II", + "Doubles = (double)II", + "Strings = `hello ` + II", + "Bools = (II % 2) == 0", + "DateTimes = '2001-03-01T12:34:56Z' + II" + }) + .LastBy("II") + .Sort(SortPair::Ascending("II")); + + auto callback = std::make_shared(target); + auto cookie = table.Subscribe(callback); + + while (true) { + auto [done, eptr] = callback->WaitForUpdate(); + if (done) { + break; + } + if (eptr != nullptr) { + std::rethrow_exception(eptr); + } + } + + table.Unsubscribe(std::move(cookie)); +} } // namespace deephaven::client::tests