Skip to content

Commit

Permalink
C++ Client: fix FillChunk and missing types for ticking callbacks (#5286
Browse files Browse the repository at this point in the history
)

* C++ Client: fix FillChunk and missing types for ticking callbacks

* Respond to review feedback
  • Loading branch information
kosak authored Mar 22, 2024
1 parent eb6ba03 commit 1b728f0
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class GenericBackingStore {
*dest = ElementType();
is_null = true;
}
++dest;
if (optional_null_flags != nullptr) {
*optional_null_flags++ = is_null;
}
Expand Down Expand Up @@ -187,6 +188,7 @@ class GenericArrowColumnSource final : public deephaven::dhcore::column::Generic
internal::GenericBackingStore<ArrayType, ElementType> backingStore_;
};

using ArrowCharColumnSource = NumericArrowColumnSource<arrow::UInt16Array, char16_t>;
using ArrowInt8ColumnSource = NumericArrowColumnSource<arrow::Int8Array, int8_t>;
using ArrowInt16ColumnSource = NumericArrowColumnSource<arrow::Int16Array, int16_t>;
using ArrowInt32ColumnSource = NumericArrowColumnSource<arrow::Int32Array, int32_t>;
Expand Down
22 changes: 20 additions & 2 deletions cpp-client/deephaven/dhclient/src/subscription/subscribe_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ using deephaven::dhcore::ticking::TickingCallback;
using deephaven::dhcore::utility::MakeReservedVector;
using deephaven::dhcore::utility::separatedList;
using deephaven::dhcore::utility::VerboseCast;
using deephaven::client::arrowutil::ArrowBooleanColumnSource;
using deephaven::client::arrowutil::ArrowCharColumnSource;
using deephaven::client::arrowutil::ArrowDateTimeColumnSource;
using deephaven::client::arrowutil::ArrowFloatColumnSource;
using deephaven::client::arrowutil::ArrowDoubleColumnSource;
using deephaven::client::arrowutil::ArrowInt8ColumnSource;
using deephaven::client::arrowutil::ArrowInt16ColumnSource;
using deephaven::client::arrowutil::ArrowInt32ColumnSource;
using deephaven::client::arrowutil::ArrowInt64ColumnSource;
using deephaven::client::arrowutil::ArrowBooleanColumnSource;
using deephaven::client::arrowutil::ArrowDateTimeColumnSource;
using deephaven::client::arrowutil::ArrowStringColumnSource;
using deephaven::client::utility::Executor;
using deephaven::client::utility::OkOrThrow;
Expand Down Expand Up @@ -275,11 +278,26 @@ struct ArrayToColumnSourceVisitor final : public arrow::ArrayVisitor {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::FloatArray &array) final {
result_ = ArrowFloatColumnSource::Create(std::move(storage_), &array);
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::DoubleArray &array) final {
result_ = ArrowDoubleColumnSource::Create(std::move(storage_), &array);
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::BooleanArray &array) final {
result_ = ArrowBooleanColumnSource::Create(std::move(storage_), &array);
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::UInt16Array &array) final {
result_ = ArrowCharColumnSource::Create(std::move(storage_), &array);
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::StringArray &array) final {
result_ = ArrowStringColumnSource::Create(std::move(storage_), &array);
return arrow::Status::OK();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class Chunk {
template<typename T>
class GenericChunk final : public Chunk {
public:
using value_type = T;

/**
* Factory method. Create a Chunk having the specified size, with a privately allocated buffer.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,14 @@ class DateTime {
int64_t nanos_ = 0;

friend std::ostream &operator<<(std::ostream &s, const DateTime &o);

friend bool operator==(const DateTime &lhs, const DateTime &rhs) {
return lhs.nanos_ == rhs.nanos_;
}

friend bool operator!=(const DateTime &lhs, const DateTime &rhs) {
return !(lhs == rhs);
}
};
} // namespace deephaven::dhcore

Expand Down
5 changes: 5 additions & 0 deletions cpp-client/deephaven/dhcore/src/ticking/immer_table_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ class MyTable final : public ClientTable {

[[nodiscard]]
std::shared_ptr<ColumnSource> GetColumn(size_t column_index) const final {
if (column_index >= sources_.size()) {
auto message = fmt::format("Requested column index {} >= num columns {}", column_index,
sources_.size());
throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message));
}
return sources_[column_index];
}

Expand Down
146 changes: 144 additions & 2 deletions cpp-client/deephaven/tests/ticking_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,26 @@
#include "tests/third_party/catch.hpp"
#include "tests/test_util.h"
#include "deephaven/client/client.h"
#include "deephaven/dhcore/chunk/chunk_maker.h"
#include "deephaven/dhcore/utility/utility.h"

using deephaven::client::Client;
using deephaven::client::TableHandle;
using deephaven::client::utility::TableMaker;
using deephaven::dhcore::DateTime;
using deephaven::dhcore::chunk::ChunkMaker;
using deephaven::dhcore::chunk::BooleanChunk;
using deephaven::dhcore::chunk::DateTimeChunk;
using deephaven::dhcore::chunk::DoubleChunk;
using deephaven::dhcore::chunk::FloatChunk;
using deephaven::dhcore::chunk::Int8Chunk;
using deephaven::dhcore::chunk::Int16Chunk;
using deephaven::dhcore::chunk::Int32Chunk;
using deephaven::dhcore::chunk::Int64Chunk;
using deephaven::dhcore::chunk::StringChunk;
using deephaven::dhcore::clienttable::ClientTable;
using deephaven::dhcore::container::RowSequence;
using deephaven::dhcore::utility::MakeReservedVector;

namespace deephaven::client::tests {
class CommonBase : public deephaven::dhcore::ticking::TickingCallback {
Expand Down Expand Up @@ -44,7 +58,6 @@ class CommonBase : public deephaven::dhcore::ticking::TickingCallback {
std::condition_variable cond_var_;
bool done_ = false;
std::exception_ptr exception_ptr_;

};

class ReachesNRowsCallback final : public CommonBase {
Expand Down Expand Up @@ -127,7 +140,6 @@ TEST_CASE("Ticking Table modified rows are eventually all greater than 10", "[ti
auto table = tm.TimeTable(std::chrono::milliseconds(500))
.View({"Key = (long)(ii % 10)", "Value = ii"})
.LastBy("Key");
table.BindToVariable("ticking");
auto callback = std::make_shared<AllValuesGreaterThanNCallback>(target);
auto cookie = table.Subscribe(callback);

Expand All @@ -143,4 +155,134 @@ TEST_CASE("Ticking Table modified rows are eventually all greater than 10", "[ti

table.Unsubscribe(std::move(cookie));
}

class WaitForPopulatedTableCallback final : public CommonBase {
public:
explicit WaitForPopulatedTableCallback(int64_t target) : target_(target) {}

void OnTick(deephaven::dhcore::ticking::TickingUpdate update) final {
const auto &current = update.Current();
std::cout << "=== The Full Table ===\n"
<< current->Stream(true, true)
<< '\n';

if (current->NumRows() < static_cast<size_t>(target_)) {
// table not yet fully populated.
return;
}

if (current->NumRows() > static_cast<size_t>(target_)) {
// table has more rows than expected.
auto message = fmt::format("Expected table to have {} rows, got {}",
target_, current->NumRows());
throw std::runtime_error(message);
}

auto int8s = MakeReservedVector<int8_t>(target_);
auto int16s = MakeReservedVector<int16_t>(target_);
auto int32s = MakeReservedVector<int32_t>(target_);
auto int64s = MakeReservedVector<int64_t>(target_);
auto floats = MakeReservedVector<float>(target_);
auto doubles = MakeReservedVector<double>(target_);
auto bools = MakeReservedVector<bool>(target_);
auto strings = MakeReservedVector<std::string>(target_);
auto date_times = MakeReservedVector<DateTime>(target_);

auto date_time_start = DateTime::Parse("2001-03-01T12:34:56Z");

for (int64_t i = 0; i != target_; ++i) {
int8s.push_back(i);
int16s.push_back(i);
int32s.push_back(i);
int64s.push_back(i);
floats.push_back(i);
doubles.push_back(i);
bools.push_back((i % 2) == 0);
strings.push_back(fmt::format("hello {}", i));
date_times.push_back(DateTime::FromNanos(date_time_start.Nanos() + i));
}

CompareColumn<Int8Chunk>(*current, "Bytes", int8s);
CompareColumn<Int16Chunk>(*current, "Shorts", int16s);
CompareColumn<Int32Chunk>(*current, "Ints", int32s);
CompareColumn<Int64Chunk>(*current, "Longs", int64s);
CompareColumn<FloatChunk>(*current, "Floats", floats);
CompareColumn<DoubleChunk>(*current, "Doubles", doubles);
CompareColumn<StringChunk>(*current, "Strings", strings);
CompareColumn<BooleanChunk>(*current, "Bools", bools);
CompareColumn<DateTimeChunk>(*current, "DateTimes", date_times);

NotifyDone();
}

template<typename ChunkType, typename T>
void CompareColumn(const ClientTable &table, std::string_view column_name,
const std::vector<T> &expected) {
static_assert(std::is_same_v<typename ChunkType::value_type, T>);
if (expected.size() != table.NumRows()) {
auto message = fmt::format("Expected 'expected' to have size {}, have {}",
table.NumRows(), expected.size());
throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message));
}
auto cs = table.GetColumn(column_name, true);
auto rs = RowSequence::CreateSequential(0, table.NumRows());
auto chunk = ChunkType::Create(table.NumRows());
cs->FillChunk(*rs, &chunk, nullptr);

for (size_t row_num = 0; row_num != expected.size(); ++row_num) {
const auto &expected_elt = expected[row_num];
const auto &actual_elt = chunk.data()[row_num];
if (expected_elt != actual_elt) {
auto message = fmt::format(R"(In column "{}", row {}, expected={}, actual={})",
column_name, row_num, expected_elt, actual_elt);
throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message));
}
}
}

private:
int64_t target_ = 0;
};

TEST_CASE("Ticking Table all the data is eventually present", "[ticking]") {
const int64_t target = 10;
auto client = TableMakerForTests::CreateClient();
auto tm = client.GetManager();

tm.RunScript(R"(from deephaven.time import dh_now
Zamboni = dh_now()
)");
// would like the above to say Zamboni = DateTimeUtils.fromEpochSecond(0)

auto table = tm.TimeTable("PT0:00:0.5")
.Update({"II = (int)((ii * 7) % 10)",
"Bytes = (byte)II",
"Chars = (char)(II + 'a')",
"Shorts = (short)II",
"Ints = (int)II",
"Longs = (long)II",
"Floats = (float)II",
"Doubles = (double)II",
"Strings = `hello ` + II",
"Bools = (II % 2) == 0",
"DateTimes = '2001-03-01T12:34:56Z' + II"
})
.LastBy("II")
.Sort(SortPair::Ascending("II"));

auto callback = std::make_shared<WaitForPopulatedTableCallback>(target);
auto cookie = table.Subscribe(callback);

while (true) {
auto [done, eptr] = callback->WaitForUpdate();
if (done) {
break;
}
if (eptr != nullptr) {
std::rethrow_exception(eptr);
}
}

table.Unsubscribe(std::move(cookie));
}
} // namespace deephaven::client::tests

0 comments on commit 1b728f0

Please sign in to comment.