Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jcipar/core 5087 changes to support iceberg #248

Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Allow custom metadata in data files.
The Iceberg spec requires custom metadata fields for Avro files. It
expects information such as the table scheme, the partition information
and the manifest type to be in the Manifest File's metadata.

This change makes the `setMetadata` method of `DataFileWriterBase`
public instead of private, and adds a similar public method to
`DataFileWriter`.
  • Loading branch information
jcipar committed Jul 5, 2024
commit 98d18c398cc9736645de302052e66fd91f20b7ce
23 changes: 18 additions & 5 deletions lang/c++/impl/DataFile.cc
Original file line number Diff line number Diff line change
@@ -20,6 +20,8 @@
#include "Compiler.hh"
#include "Exception.hh"

#include <map>
#include <optional>
#include <sstream>

#include <boost/crc.hpp> // for boost::crc_32_type
@@ -64,7 +66,7 @@ boost::iostreams::zlib_params get_zlib_params() {
} // namespace

DataFileWriterBase::DataFileWriterBase(const char *filename, const ValidSchema &schema, size_t syncInterval,
Codec codec) : filename_(filename),
Codec codec, const std::map<std::string, std::string> &customMetadata) : filename_(filename),
schema_(schema),
encoderPtr_(binaryEncoder()),
syncInterval_(syncInterval),
Comment on lines -67 to 72
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks! much easier to read

@@ -74,11 +76,11 @@ DataFileWriterBase::DataFileWriterBase(const char *filename, const ValidSchema &
sync_(makeSync()),
objectCount_(0),
lastSync_(0) {
init(schema, syncInterval, codec);
init(schema, syncInterval, codec, customMetadata);
}

DataFileWriterBase::DataFileWriterBase(std::unique_ptr<OutputStream> outputStream,
const ValidSchema &schema, size_t syncInterval, Codec codec) : filename_(),
const ValidSchema &schema, size_t syncInterval, Codec codec, const std::map<std::string, std::string> &customMetadata) : filename_(),
schema_(schema),
encoderPtr_(binaryEncoder()),
syncInterval_(syncInterval),
@@ -88,10 +90,10 @@ DataFileWriterBase::DataFileWriterBase(std::unique_ptr<OutputStream> outputStrea
sync_(makeSync()),
objectCount_(0),
lastSync_(0) {
init(schema, syncInterval, codec);
init(schema, syncInterval, codec, customMetadata);
}

void DataFileWriterBase::init(const ValidSchema &schema, size_t syncInterval, const Codec &codec) {
void DataFileWriterBase::init(const ValidSchema &schema, size_t syncInterval, const Codec &codec, const std::map<std::string, std::string> &customMetadata) {
if (syncInterval < minSyncInterval || syncInterval > maxSyncInterval) {
throw Exception(boost::format("Invalid sync interval: %1%. "
"Should be between %2% and %3%")
@@ -111,6 +113,9 @@ void DataFileWriterBase::init(const ValidSchema &schema, size_t syncInterval, co
throw Exception(boost::format("Unknown codec: %1%") % codec);
}
setMetadata(AVRO_SCHEMA_KEY, schema.toJson(false));
for (const auto &kv : customMetadata) {
setMetadata(kv.first, kv.second);
}

writeHeader();
encoderPtr_->init(*buffer_);
@@ -562,4 +567,12 @@ int64_t DataFileReaderBase::previousSync() const {
return blockStart_;
}

std::optional<std::string> DataFileReaderBase::getMetadata(const std::string& key) {
if (auto it = metadata_.find(key); it == metadata_.cend()) {
return std::nullopt;
} else {
return std::string(it->second.cbegin(), it->second.cend());
}
}

} // namespace avro
25 changes: 19 additions & 6 deletions lang/c++/include/avro/DataFile.hh
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@
#include "buffer/Buffer.hh"

#include <map>
#include <optional>
#include <string>
#include <vector>

@@ -89,7 +90,7 @@ class AVRO_DECL DataFileWriterBase : boost::noncopyable {
/**
* Shared constructor portion since we aren't using C++11
*/
void init(const ValidSchema &schema, size_t syncInterval, const Codec &codec);
void init(const ValidSchema &schema, size_t syncInterval, const Codec &codec, const std::map<std::string, std::string> &customMetadata);

public:
/**
@@ -118,9 +119,9 @@ public:
* Constructs a data file writer with the given sync interval and name.
*/
DataFileWriterBase(const char *filename, const ValidSchema &schema,
size_t syncInterval, Codec codec = NULL_CODEC);
size_t syncInterval, Codec codec = NULL_CODEC, const std::map<std::string, std::string> &customMetadata = {});
DataFileWriterBase(std::unique_ptr<OutputStream> outputStream,
const ValidSchema &schema, size_t syncInterval, Codec codec);
const ValidSchema &schema, size_t syncInterval, Codec codec, const std::map<std::string, std::string> &customMetadata = {});

~DataFileWriterBase();
/**
@@ -152,10 +153,10 @@ public:
* Constructs a new data file.
*/
DataFileWriter(const char *filename, const ValidSchema &schema,
size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC) : base_(new DataFileWriterBase(filename, schema, syncInterval, codec)) {}
size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC, const std::map<std::string, std::string> &customMetadata = {}) : base_(new DataFileWriterBase(filename, schema, syncInterval, codec, customMetadata)) {}

DataFileWriter(std::unique_ptr<OutputStream> outputStream, const ValidSchema &schema,
size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC) : base_(new DataFileWriterBase(std::move(outputStream), schema, syncInterval, codec)) {}
size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC, const std::map<std::string, std::string> &customMetadata = {}) : base_(new DataFileWriterBase(std::move(outputStream), schema, syncInterval, codec, customMetadata)) {}

/**
* Writes the given piece of data into the file.
@@ -206,7 +207,6 @@ class AVRO_DECL DataFileReaderBase : boost::noncopyable {
DecoderPtr dataDecoder_;
std::unique_ptr<InputStream> dataStream_;
typedef std::map<std::string, std::vector<uint8_t>> Metadata;

Metadata metadata_;
DataFileSync sync_{};

@@ -270,6 +270,12 @@ public:
*/
const ValidSchema &dataSchema() { return dataSchema_; }


/**
* Gets a metadata key/value pair for the file.
*/
std::optional<std::string> getMetadata(const std::string& key);

/**
* Closes the reader. No further operation is possible on this reader.
*/
@@ -382,6 +388,13 @@ public:
*/
const ValidSchema &dataSchema() { return base_->dataSchema(); }

/**
* Gets a metadata key/value pair for the file.
*/
std::optional<std::string> getMetadata(const std::string& key) {
return base_->getMetadata(key);
}

/**
* Closes the reader. No further operation is possible on this reader.
*/
29 changes: 29 additions & 0 deletions lang/c++/test/DataFileTests.cc
Original file line number Diff line number Diff line change
@@ -658,6 +658,26 @@ class DataFileTest {
BOOST_CHECK_EQUAL(root->leafAt(5)->getDoc(), "extra slashes\\\\");
}
}

void testWriteCustomMetadata() {
uint32_t a = 42;
{
avro::DataFileWriter<uint32_t> df(filename, writerSchema, 16 * 1024, avro::NULL_CODEC, {{"test_key_1", "test_value_1"}, {"test_key_2", "test_value_2"}});
df.write(a);
}

{

avro::DataFileReader<uint32_t> df(filename);
auto val_1 = df.getMetadata("test_key_1");
BOOST_CHECK(val_1.has_value());
BOOST_CHECK_EQUAL(val_1.value(), "test_value_1");

auto val_2 = df.getMetadata("test_key_2");
BOOST_CHECK(val_2.has_value());
BOOST_CHECK_EQUAL(val_2.value(), "test_value_2");
}
}
};

void addReaderTests(test_suite *ts, const shared_ptr<DataFileTest> &t) {
@@ -1126,6 +1146,15 @@ init_unit_test_suite(int argc, char *argv[]) {
boost::unit_test::framework::master_test_suite().add(ts);
}

{
auto *ts = BOOST_TEST_SUITE("DataFile tests: test13.df");
shared_ptr<DataFileTest> t(new DataFileTest("test13.df", ischWithDoc, ischWithDoc));
ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testWrite, t));
ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testWriteCustomMetadata, t));
ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testCleanup, t));
boost::unit_test::framework::master_test_suite().add(ts);
}

boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testSkipStringNullCodec));
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testSkipStringDeflateCodec));
#ifdef SNAPPY_CODEC_AVAILABLE