diff --git a/CMakeLists.txt b/CMakeLists.txt index e205975..f623702 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -72,6 +72,13 @@ if(THRIFT_FOUND) list(APPEND HUSKY_EXTERNAL_DEFINITION ${THRIFT_DEFINITION}) endif(THRIFT_FOUND) +# ORC +if(ORC_FOUND) + list(APPEND HUSKY_EXTERNAL_INCLUDE ${ORC_INCLUDE_DIR}) + list(APPEND HUSKY_EXTERNAL_LIB ${ORC_LIBRARY}) + list(APPEND HUSKY_EXTERNAL_DEFINITION ${ORC_DEFINITION}) +endif(ORC_FOUND) + if(WIN32) list(APPEND HUSKY_EXTERNAL_LIB wsock32 ws2_32) endif() diff --git a/cmake/dep.cmake b/cmake/dep.cmake index 1321d9b..a65d2db 100644 --- a/cmake/dep.cmake +++ b/cmake/dep.cmake @@ -135,3 +135,36 @@ if(WITHOUT_THRIFT) unset(THRIFT_FOUND) message(STATUS "Not using Thrift due to WITHOUT_THRIFT option") endif(WITHOUT_THRIFT) + +### ORC ### + +#NAMES liblz4.a liborc.a libprotobuf.a libsnappy.a libz.a +#NAMES ColumnPrinter.hh Int128.hh MemoryPool.hh orc-config.hh OrcFile.hh Reader.hh Type.hh Vector.hh +find_path(ORC_INCLUDE_DIR NAMES orc/OrcFile.hh) +find_library(ORC_L0 NAMES protobuf NO_CMAKE_ENVIRONMENT_PATH NO_CMAKE_SYSTEM_PATH NO_SYSTEM_ENVIRONMENT_PATH) +find_library(ORC_L1 NAMES z NO_CMAKE_ENVIRONMENT_PATH NO_CMAKE_SYSTEM_PATH NO_SYSTEM_ENVIRONMENT_PATH) +find_library(ORC_L2 NAMES lz4 NO_CMAKE_ENVIRONMENT_PATH NO_CMAKE_SYSTEM_PATH NO_SYSTEM_ENVIRONMENT_PATH) +find_library(ORC_L3 NAMES snappy NO_CMAKE_ENVIRONMENT_PATH NO_CMAKE_SYSTEM_PATH NO_SYSTEM_ENVIRONMENT_PATH) +find_library(ORC_L4 NAMES orc) + +if (ORC_INCLUDE_DIR AND ORC_L1 AND ORC_L0 AND ORC_L2 AND ORC_L3 AND ORC_L4) + set(ORC_FOUND true) +endif (ORC_INCLUDE_DIR AND ORC_L1 AND ORC_L0 AND ORC_L2 AND ORC_L3 AND ORC_L4) +if (ORC_FOUND) + set(ORC_DEFINITION "-DWITH_ORC") + # The order is important for dependencies. + set(ORC_LIBRARY ${ORC_L4} ${ORC_L3} ${ORC_L2} ${ORC_L1} ${ORC_L0}) + message (STATUS "Found ORC:") + message (STATUS " (Headers) ${ORC_INCLUDE_DIR}") + message (STATUS " (Library) ${ORC_L0}") + message (STATUS " (Library) ${ORC_L1}") + message (STATUS " (Library) ${ORC_L2}") + message (STATUS " (Library) ${ORC_L3}") + message (STATUS " (Library) ${ORC_L4}") +else(ORC_FOUND) + message (STATUS "Could NOT find ORC") +endif(ORC_FOUND) +if(WITHOUT_ORC) + unset(ORC_FOUND) + message(STATUS "Not using ORC due to WITHOUT_ORC option") +endif(WITHOUT_ORC) diff --git a/core/constants.hpp b/core/constants.hpp index 78714d8..a6e81f3 100644 --- a/core/constants.hpp +++ b/core/constants.hpp @@ -73,6 +73,7 @@ const uint32_t TYPE_KAFKA_END_REQ = 0xfa091344; const uint32_t TYPE_MONGODB_REQ = 0xfa091388; const uint32_t TYPE_MONGODB_END_REQ = 0xfa091389; const uint32_t TYPE_LOCAL_BLK_REQ = 0xfa0e12a2; +const uint32_t TYPE_ORC_BLK_REQ = 0xfa2e32a1; const uint32_t TYPE_STOP_ASYNC_REQ = 0xf89d74b4; const uint32_t TYPE_STOP_ASYNC_YES = 0x09b8ab2b; const uint32_t TYPE_STOP_ASYNC_NO = 0x192a241a; @@ -87,5 +88,6 @@ const uint32_t TYPE_EXIT = 0x47d79fd5; const uint32_t TYPE_GET_HASH_RING = 0x48d693d5; const uint32_t TYPE_NFS_FILE_REQ = 0x4E465251; const uint32_t TYPE_HDFS_FILE_REQ = 0x48465251; +const uint32_t TYPE_ELASTICSEARCH_REQ = 0xfa081098; } // namespace husky diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index d091d67..a5d46bc 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -49,6 +49,12 @@ target_link_libraries(WordCountMR ${husky}) target_link_libraries(WordCountMR ${HUSKY_EXTERNAL_LIB}) husky_default_properties(WordCountMR) +# WordCountES +add_executable(WordCountES wc_mr_elasticsearch.cpp) +target_link_libraries(WordCountES ${husky}) +target_link_libraries(WordCountES ${HUSKY_EXTERNAL_LIB}) +husky_default_properties(WordCountES) + # WordCountMongo add_executable(WordCountMongo wc_mr_mongo.cpp) target_link_libraries(WordCountMongo ${husky}) @@ -61,6 +67,12 @@ target_link_libraries(WordCountFlume ${husky}) target_link_libraries(WordCountFlume ${HUSKY_EXTERNAL_LIB}) husky_default_properties(WordCountFlume) +# WordCountORC +add_executable(WordCountORC wc_mr_orc.cpp) +target_link_libraries(WordCountORC ${husky}) +target_link_libraries(WordCountORC ${HUSKY_EXTERNAL_LIB}) +husky_default_properties(WordCountORC) + # PI add_executable(PI pi.cpp) target_link_libraries(PI ${husky}) @@ -132,3 +144,9 @@ add_executable(NN_MNIST nn_mnist.cpp) target_link_libraries(NN_MNIST ${husky}) target_link_libraries(NN_MNIST ${HUSKY_EXTERNAL_LIB}) husky_default_properties(NN_MNIST) + +# Cube +add_executable(Cube cube.cpp) +target_link_libraries(Cube ${husky}) +target_link_libraries(Cube ${HUSKY_EXTERNAL_LIB}) +husky_default_properties(Cube) diff --git a/examples/cube.cpp b/examples/cube.cpp new file mode 100644 index 0000000..d02e59f --- /dev/null +++ b/examples/cube.cpp @@ -0,0 +1,627 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "boost/algorithm/string.hpp" +#include "boost/tokenizer.hpp" + +#include "core/engine.hpp" +#include "io/hdfs_manager.hpp" +#include "io/input/inputformat_store.hpp" +#include "lib/aggregator_factory.hpp" + +typedef boost::tokenizer> Tokenizer; +typedef std::map DimMap; +typedef std::pair Pair; +typedef std::pair Filter; +typedef std::map FilterMap; +typedef std::vector AttrIdx; +typedef std::vector Tuple; +typedef std::vector TupleVector; +typedef TupleVector::iterator TVIterator; + +thread_local std::string ghost; +thread_local std::string gport; +thread_local std::string ghdfs_dest; +thread_local int gpart_factor; + +using husky::PushCombinedChannel; +using husky::lib::Aggregator; + +/** + Basic object type for main list_execute, + which is an instance of a node in the cube +*/ +class Group { + public: + using KeyT = std::string; + + Group() = default; + explicit Group(const KeyT& t) : key(t) {} + + const KeyT& id() { return key; } + KeyT key; +}; + +/** + A node in the cube lattice, + or BUC processing tree +*/ +class TreeNode { + public: + TreeNode() = default; + explicit TreeNode(AttrIdx&& key) : key_(std::move(key)) { visit = false; } + + explicit TreeNode(const AttrIdx& key) : key_(key) { visit = false; } + + ~TreeNode() = default; + + bool visit; + + const AttrIdx& Key() { return key_; } + + std::vector>& Children() { return children_; } + + void add_child(std::shared_ptr child) { children_.push_back(child); } + + private: + AttrIdx key_; + std::vector> children_; +}; + +class AttrSet { + public: + AttrSet() = default; + + AttrSet(AttrIdx&& key, DimMap&& mapping) : key_(std::move(key)), map_(std::move(mapping)) {} + + bool has(int attr) const { return (std::find(key_.begin(), key_.end(), attr) != key_.end()); } + + size_t size() const { return key_.size(); } + + const int operator[](int attr) const { return map_.at(attr); } + + const AttrIdx& get_attridx() const { return key_; } + + const DimMap& get_map() const { return map_; } + + private: + AttrIdx key_; + DimMap map_; +}; + +struct PairSumCombiner { + static void combine(Pair& val, Pair const& inc) { + val.first += inc.first; + val.second += inc.second; + } +}; + +bool is_parent(std::shared_ptr parent, std::shared_ptr child) { + auto child_key = child->Key(); + for (auto& col : parent->Key()) { + if (std::find(child_key.begin(), child_key.end(), col) == child_key.end()) { + return false; + } + } + return true; +} + +std::string print_key(const AttrIdx& key) { + std::string out; + for (auto& i : key) { + out = out + std::to_string(i) + " "; + } + return out; +} + +void measure(const Tuple& key_value, const AttrIdx& group_set, const AttrIdx& select, const AttrSet& key_attributes, + const AttrSet& msg_attributes, const int uid_dim, TVIterator begin, TVIterator end, + PushCombinedChannel& post_ch, Aggregator& num_write) { + int count = end - begin; + std::sort(begin, end, [uid_dim](const Tuple& a, const Tuple& b) { return a[uid_dim] < b[uid_dim]; }); + int unique = 1; + for (TVIterator it = begin; it != end; ++it) { + TVIterator next_it = it + 1; + if (next_it != end && (*it)[uid_dim] != (*next_it)[uid_dim]) { + ++unique; + } + } + + // Output + std::string out; + for (auto& attr : select) { + // If attribute is in key, + // output key value. + // Else, + // If attribute is in group, + // output attribute in the tuple + // Else, + // output * + if (key_attributes.has(attr)) { + out = out + key_value[key_attributes[attr]] + "\t"; + } else { + if (std::find(group_set.begin(), group_set.end(), attr) != group_set.end()) { + out = out + (*begin)[msg_attributes[attr]] + "\t"; + } else { + out += "*\t"; + } + } + } + + if (gpart_factor == 1) { + out = out + std::to_string(count) + "\t" + std::to_string(unique) + "\n"; + num_write.update(1); + std::string hdfs_dest = ghdfs_dest + "/" + key_value.back(); + husky::io::HDFS::Write(ghost, gport, out, hdfs_dest, husky::Context::get_global_tid()); + } else { + out += key_value.back(); + post_ch.push(Pair(count, unique), out); + } +} + +int next_partition_dim(const AttrIdx& parent_key, const AttrIdx& child_key, const DimMap& dim_map) { + for (auto& attr : child_key) { + if (std::find(parent_key.begin(), parent_key.end(), attr) == parent_key.end()) { + return dim_map.at(attr); + } + } + // error + return -1; +} + +// Parition the table according to value at the 'dim'-th column +void partition(TVIterator begin, TVIterator end, const int dim, std::vector& out_partition_result) { + std::sort(begin, end, [dim](const Tuple& a, const Tuple& b) { return a[dim] < b[dim]; }); + int i = 0; + // Store the size of each partition + out_partition_result.resize(1); + TVIterator next_tuple; + for (TVIterator it = begin; it != end; ++it) { + out_partition_result[i]++; + next_tuple = it + 1; + // If value of next row differs at the dim-th column, + // partition the table + if (next_tuple != end && (*it)[dim] != (*next_tuple)[dim]) { + ++i; + out_partition_result.resize(i + 1); + } + } +} + +void BUC(std::shared_ptr cur_node, TupleVector& table, const Tuple& key_value, const AttrIdx& select, + const AttrSet& key_attributes, const AttrSet& msg_attributes, const int uid_dim, const int dim, + TVIterator begin, TVIterator end, PushCombinedChannel& post_ch, + Aggregator& num_write) { + // Measure current group + measure(key_value, cur_node->Key(), select, key_attributes, msg_attributes, uid_dim, begin, end, post_ch, + num_write); + + // Process children if it is not visited + for (auto& child : cur_node->Children()) { + // Partition table by next column + int next_dim = next_partition_dim(cur_node->Key(), child->Key(), msg_attributes.get_map()); + if (next_dim == -1) { + throw husky::base::HuskyException("Cannot find next partition dimension from " + + print_key(cur_node->Key()) + " to " + print_key(child->Key())); + } + std::vector next_partition_result = {}; + partition(begin, end, next_dim, next_partition_result); + // Perform BUC on each partition + TVIterator k = begin; + for (int i = 0; i < next_partition_result.size(); ++i) { + int count = next_partition_result[i]; + BUC(child, table, key_value, select, key_attributes, msg_attributes, uid_dim, next_dim, k, k + count, + post_ch, num_write); + k += count; + } + } +} + +bool is_operator(const char& c) { return (c == '<' || c == '>' || c == '='); } + +void parse_group_set(const std::string& group_filter, const Tokenizer& schema_tok, + std::vector>& out_roots, std::vector& out_filters) { + boost::char_separator vbar_sep("|"); + boost::char_separator comma_sep(","); + boost::char_separator colon_sep(":"); + + Tokenizer group_filter_tok(group_filter, vbar_sep); + Tokenizer::iterator gf_it = group_filter_tok.begin(); + + /** + * Process group sets + */ + Tokenizer group_set_tok(*gf_it, colon_sep); + std::shared_ptr root; + int min_lv = INT_MAX; + int max_lv = INT_MIN; + + std::unordered_map>> tree_map; + size_t group_set_size = std::distance(group_set_tok.begin(), group_set_tok.end()); + for (auto& group : group_set_tok) { + // Encode and construct key of the node + Tokenizer column_tok(group, comma_sep); + AttrIdx tree_key = {}; + for (auto column : column_tok) { + auto it = std::find(schema_tok.begin(), schema_tok.end(), column); + if (it != schema_tok.end()) { + tree_key.push_back(std::distance(schema_tok.begin(), it)); + } else { + throw husky::base::HuskyException("Invalid schema or group sets"); + } + } + int level = tree_key.size(); + std::shared_ptr node(new TreeNode(std::move(tree_key))); + tree_map[level].push_back(node); + if (level < min_lv) { + min_lv = level; + root = node; + } + if (level > max_lv) { + max_lv = level; + } + } + if (husky::Context::get_global_tid() == 0) { + husky::LOG_I << "Min level: " << min_lv << "\tMax level: " << max_lv; + } + + // Build group set lattice + bool has_parent = false; + for (int i = min_lv; i < max_lv; ++i) { + if (tree_map[i].empty()) { + throw husky::base::HuskyException("Level " + std::to_string(i) + " is empty"); + } + for (auto& next_tn : tree_map[i + 1]) { + for (auto& tn : tree_map[i]) { + if (is_parent(tn, next_tn)) { + tn->add_child(next_tn); + has_parent = true; + } + } + if (!has_parent) { + throw husky::base::HuskyException("Cannot find the parent of " + print_key(next_tn->Key())); + } + has_parent = false; + } + } + if (husky::Context::get_global_tid() == 0) { + husky::LOG_I << "Finished constructing lattice."; + } + + // Construct BUC processing tree + std::shared_ptr buc_root(new TreeNode(root->Key())); + std::stack> tmp_stack; + std::stack> buc_stack; + tmp_stack.push(root); + buc_stack.push(buc_root); + while (!tmp_stack.empty()) { + std::shared_ptr cur_node = tmp_stack.top(); + tmp_stack.pop(); + std::shared_ptr cur_buc_node = buc_stack.top(); + buc_stack.pop(); + cur_node->visit = true; + for (auto& child : cur_node->Children()) { + if (!child->visit) { + tmp_stack.push(child); + std::shared_ptr new_buc_node(new TreeNode(child->Key())); + cur_buc_node->add_child(new_buc_node); + buc_stack.push(new_buc_node); + } + } + } + out_roots.push_back(buc_root); + if (husky::Context::get_global_tid() == 0) { + husky::LOG_I << "Finished constructing buc processing tree."; + } + + /** + * Process WHERE + * Format AttrOperatorValue, e.g., fuid<>1:fcard_type='123' + */ + FilterMap filter; + if (std::distance(group_filter_tok.begin(), group_filter_tok.end()) == 2) { + gf_it++; + Tokenizer where_tok(*gf_it, colon_sep); + for (auto& where : where_tok) { + int pos[2] = {}; + std::string where_str = where; + for (int i = 0; i < where_str.length(); ++i) { + if (pos[0] == 0) { + if (is_operator(where_str[i])) + pos[0] = i; + } else { + if (!is_operator(where_str[i])) { + pos[1] = i; + break; + } + } + } + if (pos[0] == 0 || pos[1] == 0) { + throw husky::base::HuskyException("Invalid syntax in WHERE"); + } + std::string attr = where_str.substr(0, pos[0]); + std::string op = where_str.substr(pos[0], pos[1] - pos[0]); + std::string value = where_str.substr(pos[1], where_str.length() - pos[1]); + + auto it = std::find(schema_tok.begin(), schema_tok.end(), attr); + if (it == schema_tok.end()) { + throw husky::base::HuskyException("Invalid attribute in WHERE"); + } + int attr_idx = std::distance(schema_tok.begin(), it); + filter[attr_idx] = Filter(op, value); + } + } + out_filters.push_back(filter); +} + +bool pass_filter(const std::string& value, const Filter& filter) { + if (boost::iequals(filter.second, std::string("null"))) + return true; // Always return true if compare against null. + // Consistent to SQL + + if (filter.first == "<>") + return value != filter.second; + if (filter.first == ">") + return value > filter.second; + if (filter.first == "<") + return value < filter.second; + if (filter.first == ">=") + return value >= filter.second; + if (filter.first == "<=") + return value <= filter.second; + if (filter.first == "=") + return value == filter.second; +} + +void print_buc_tree(const std::shared_ptr& root) { + husky::LOG_I << print_key(root->Key()); + for (auto& child : root->Children()) { + print_buc_tree(child); + } +} + +void print_filter_map(const FilterMap& fmap) { + for (auto& kv : fmap) { + husky::LOG_I << kv.first << " " << kv.second.first << " " << kv.second.second; + } +} + +void cube_buc() { + gpart_factor = std::stoi(husky::Context::get_param("partition_factor")); + ghost = husky::Context::get_param("hdfs_namenode"); + gport = husky::Context::get_param("hdfs_namenode_port"); + ghdfs_dest = husky::Context::get_param("output"); + + /** + * Format of 'schema' and 'select': + * attr1,attr2,attr3,... + */ + std::string schema_conf = husky::Context::get_param("schema"); + std::string select_conf = husky::Context::get_param("select"); + + /** + * Format of 'group_sets': + * {GROUP_SETS_1|WHERE_1}{GROUP_SET_2|WHERE_2}{...}{...} + * Format of GROUP_SET: + * arrt1,attr2,attr3:attr2,attr3,attr4:...:... + * Format of WHERE + * arrt1<>value:attr2=value:...:... + */ + std::string group_conf = husky::Context::get_param("group_sets"); + + boost::char_separator comma_sep(","); + boost::char_separator colon_sep(":"); + boost::char_separator brace_sep("{}"); + + Tokenizer schema_tok(schema_conf, comma_sep); + Tokenizer select_tok(select_conf, comma_sep); + Tokenizer group_filter(group_conf, brace_sep); + + AttrIdx select; + for (auto& s : select_tok) { + auto it = std::find(schema_tok.begin(), schema_tok.end(), s); + if (it != schema_tok.end()) { + select.push_back(std::distance(schema_tok.begin(), it)); + } else { + throw husky::base::HuskyException("Attribute is not in the schema"); + } + } + + std::vector> root_vec; + std::vector filter_vec; + for (auto& item : group_filter) { + std::string item_str = item; + parse_group_set(item_str, schema_tok, root_vec, filter_vec); + } + + int uid_index = -1; + // TODO(Ruihao): AttrIdx to count is hard-coded as "fuid" + auto uid_it = std::find(schema_tok.begin(), schema_tok.end(), "fuid"); + if (uid_it != schema_tok.end()) { + uid_index = std::distance(schema_tok.begin(), uid_it); + } else { + throw husky::base::HuskyException("Cannot find fuid"); + } + + std::vector key_attr_vec; + std::vector msg_attr_vec; + + for (int i = 0; i < root_vec.size(); ++i) { + // {key} union {msg} = {select} + // {key} intersect {msg} = empty + AttrIdx key_attributes = root_vec[i]->Key(); + AttrIdx msg_attributes; + for (auto& s : select) { + if (std::find(key_attributes.begin(), key_attributes.end(), s) == key_attributes.end()) { + msg_attributes.push_back(s); + } + } + + // Mapping of attributes in the message table + // schema_idx -> msg_table_idx + DimMap msg_dim_map; + for (int i = 0; i < msg_attributes.size(); ++i) { + msg_dim_map[msg_attributes[i]] = i; + } + + // Mapping of attributes in key + DimMap key_dim_map; + for (int i = 0; i < key_attributes.size(); ++i) { + key_dim_map[key_attributes[i]] = i; + } + + key_attr_vec.push_back(AttrSet(std::move(key_attributes), std::move(key_dim_map))); + msg_attr_vec.push_back(AttrSet(std::move(msg_attributes), std::move(msg_dim_map))); + } + + // Load input and emit key\tpid\ti -> uid + auto& infmt = husky::io::InputFormatStore::create_orc_inputformat(); + infmt.set_input(husky::Context::get_param("input")); + + auto& buc_list = husky::ObjListStore::create_objlist(); + auto& buc_ch = husky::ChannelStore::create_push_channel(infmt, buc_list); + auto& post_list = husky::ObjListStore::create_objlist(); + auto& post_ch = husky::ChannelStore::create_push_combined_channel(buc_list, post_list); + + Aggregator num_write; // track number of records written to hdfs + Aggregator num_tuple; // track number of tuples read from db + + auto& agg_ch = husky::lib::AggregatorFactory::get_channel(); + + auto parser = [&](boost::string_ref& chunk) { + std::vector to_send(root_vec.size(), true); + num_tuple.update(1); + if (chunk.size() == 0) + return; + boost::char_separator sep("\t"); + Tokenizer tok(chunk, sep); + for (int i = 0; i < root_vec.size(); ++i) { + auto& filter_map = filter_vec[i]; + auto& key_attributes = key_attr_vec[i]; + auto& msg_attributes = msg_attr_vec[i]; + // auto& msg_dim_map = msg_dim_map_vec[i]; + std::string key = ""; + Tuple msg(msg_attributes.size()); + std::string fuid; + int j = 0; + for (auto& col : tok) { + if (filter_map.find(j) != filter_map.end() && !pass_filter(col, filter_map[j])) { + to_send[i] = false; + break; + } + + if (key_attributes.has(j)) { + key = key + col + "\t"; + } else if (msg_attributes.has(j)) { + msg[msg_attributes[j]] = col; + } else if (j == uid_index) { + fuid = col; + } + ++j; + } + if (to_send[i]) { + msg.push_back(fuid); + if (gpart_factor > 1) { + int bucket = std::stoi(fuid) % gpart_factor; + key = key + "p" + std::to_string(bucket); + } + key = key + "\t" + std::to_string(i); + buc_ch.push(msg, key); + } + } + }; + + husky::load(infmt, parser); + husky::lib::AggregatorFactory::sync(); + if (husky::Context::get_global_tid() == 0) { + husky::LOG_I << "Total num of tuple: " << num_tuple.get_value(); + } + + // Receive + husky::list_execute(buc_list, {&buc_ch}, {&post_ch, &agg_ch}, [&](Group& g) { + auto& msgs = buc_ch.get(g); + TupleVector table(std::move(const_cast(msgs))); + boost::char_separator sep("\t"); + boost::tokenizer> tok(g.id(), sep); + std::vector key_value(tok.begin(), tok.end()); + + int filter_idx = std::stoi(key_value.back()); + key_value.pop_back(); + if (gpart_factor > 1) { + // Remove the hash value + key_value.pop_back(); + } + key_value.push_back("w" + std::to_string(filter_idx)); + + auto& buc_root = root_vec[filter_idx]; + auto& key_attributes = key_attr_vec[filter_idx]; + auto& msg_attributes = msg_attr_vec[filter_idx]; + int uid_dim = msg_attributes.size(); + + BUC(buc_root, table, key_value, select, key_attributes, msg_attributes, uid_dim, 0, table.begin(), table.end(), + post_ch, num_write); + }); + + if (gpart_factor > 1) { + if (husky::Context::get_global_tid() == 0) { + husky::LOG_I << "Finished BUC stage.\nStart post process..."; + } + + husky::ObjListStore::drop_objlist(buc_list.get_id()); + + husky::list_execute(post_list, {&post_ch}, {&agg_ch}, [&post_ch, &num_write](Group& g) { + auto& msg = post_ch.get(g); + size_t pos = g.id().rfind("\t"); + std::string key = g.id().substr(0, pos); + std::string w_idx = g.id().substr(pos + 1, g.id().length() - pos - 1); + std::string hdfs_dest = ghdfs_dest + "/" + w_idx; + std::string out = key + "\t" + std::to_string(msg.first) + "\t" + std::to_string(msg.second) + "\n"; + num_write.update(1); + husky::io::HDFS::Write(ghost, gport, out, hdfs_dest, husky::Context::get_global_tid()); + }); + } + + int total_num_write = num_write.get_value(); + if (husky::Context::get_global_tid() == 0) { + husky::LOG_I << "Total number of rows written to HDFS: " << total_num_write; + } +} + +int main(int argc, char** argv) { + std::vector args; + args.push_back("hdfs_namenode"); + args.push_back("hdfs_namenode_port"); + args.push_back("input"); + args.push_back("output"); + args.push_back("schema"); + args.push_back("select"); + args.push_back("group_sets"); + args.push_back("partition_factor"); + + if (husky::init_with_args(argc, argv, args)) { + husky::run_job(cube_buc); + return 0; + } + return 1; +} diff --git a/examples/wc_mr_elasticsearch.cpp b/examples/wc_mr_elasticsearch.cpp new file mode 100644 index 0000000..b81cefd --- /dev/null +++ b/examples/wc_mr_elasticsearch.cpp @@ -0,0 +1,121 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include "boost/property_tree/json_parser.hpp" +#include "boost/property_tree/ptree.hpp" +#include "boost/tokenizer.hpp" + +#include "base/serialization.hpp" +#include "core/engine.hpp" +#include "io/input/inputformat_store.hpp" +#include "lib/aggregator_factory.hpp" + +class Word { + public: + using KeyT = std::string; + + Word() = default; + explicit Word(const KeyT& w) : word(w) {} + const KeyT& id() const { return word; } + + KeyT word; + int count = 0; +}; + +bool operator<(const std::pair& a, const std::pair& b) { + return a.first == b.first ? a.second < b.second : a.first < b.first; +} + +void wc() { + std::string server(husky::Context::get_param("elasticsearch_server")); + std::string index(husky::Context::get_param("elasticsearch_index")); + std::string type(husky::Context::get_param("elaticsearch_type")); + auto& infmt = husky::io::InputFormatStore::create_elasticsearch_inputformat(); + infmt.set_server(server); + std::string query(" { \"query\": { \"match_all\":{}}}"); + infmt.scan_fully(index, type, query, 1000); + auto& word_list = husky::ObjListStore::create_objlist(); + auto& ch = husky::ChannelStore::create_push_combined_channel>(infmt, word_list); + + auto parse_wc = [&](std::string& chunk) { + if (chunk.size() == 0) + return; + boost::property_tree::ptree pt; + std::stringstream ss(chunk); + read_json(ss, pt); + std::string content = pt.get_child("_source").get("content"); + boost::char_separator sep(" \t"); + boost::tokenizer> tok(content, sep); + for (auto& w : tok) { + ch.push(1, w); + } + }; + husky::load(infmt, parse_wc); + + // Show topk words. + const int kMaxNum = 100; + typedef std::set> TopKPairs; + auto add_to_topk = [](TopKPairs& pairs, const std::pair& p) { + if (pairs.size() == kMaxNum && *pairs.begin() < p) + pairs.erase(pairs.begin()); + if (pairs.size() < kMaxNum) + pairs.insert(p); + }; + husky::lib::Aggregator unique_topk( + TopKPairs(), + [add_to_topk](TopKPairs& a, const TopKPairs& b) { + for (auto& i : b) { + add_to_topk(a, i); + } + }, + [](TopKPairs& a) { a.clear(); }, + [add_to_topk](husky::base::BinStream& in, TopKPairs& pairs) { + pairs.clear(); + for (size_t n = husky::base::deser(in); n--;) + add_to_topk(pairs, husky::base::deser>(in)); + }, + [](husky::base::BinStream& out, const TopKPairs& pairs) { + out << pairs.size(); + for (auto& p : pairs) + out << p; + }); + + husky::list_execute(word_list, [&ch, &unique_topk, add_to_topk](Word& word) { + unique_topk.update(add_to_topk, std::make_pair(ch.get(word), word.id())); + }); + + husky::lib::AggregatorFactory::sync(); + + if (husky::Context::get_global_tid() == 0) { + for (auto& i : unique_topk.get_value()) + husky::LOG_I << i.second << " " << i.first; + } +} + +int main(int argc, char** argv) { + std::vector args; + args.push_back("elasticsearch_server"); + args.push_back("elasticsearch_index"); + args.push_back("elasticsearch_type"); + if (husky::init_with_args(argc, argv, args)) { + husky::run_job(wc); + return 0; + } + return 1; +} diff --git a/examples/wc_mr_orc.cpp b/examples/wc_mr_orc.cpp new file mode 100644 index 0000000..1e79148 --- /dev/null +++ b/examples/wc_mr_orc.cpp @@ -0,0 +1,64 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include "boost/tokenizer.hpp" + +#include "core/engine.hpp" +#include "lib/aggregator_factory.hpp" +#include "io/input/inputformat_store.hpp" + +class Word { + public: + using KeyT = std::string; + + Word() = default; + explicit Word(const KeyT& w) : word(w) {} + const KeyT& id() const { return word; } + + KeyT word; + int count = 0; +}; + +void wc() { + auto& infmt = husky::io::InputFormatStore::create_orc_inputformat(); + infmt.set_input(husky::Context::get_param("input")); + husky::lib::Aggregator num_tuple(0, [](int& a, const int& b){ a += b; }); + + auto parse_wc = [&](boost::string_ref& chunk) { + if (chunk.size() == 0) + return; + num_tuple.update(1); + }; + husky::load(infmt, parse_wc); + husky::lib::AggregatorFactory::sync(); + if (husky::Context::get_global_tid() == 0) { + husky::LOG_I << "Total number of tuples: " << num_tuple.get_value(); + } +} + +int main(int argc, char** argv) { + std::vector args; + args.push_back("hdfs_namenode"); + args.push_back("hdfs_namenode_port"); + args.push_back("input"); + if (husky::init_with_args(argc, argv, args)) { + husky::run_job(wc); + return 0; + } + return 1; +} diff --git a/io/input/CMakeLists.txt b/io/input/CMakeLists.txt index 6676f0a..73aac10 100644 --- a/io/input/CMakeLists.txt +++ b/io/input/CMakeLists.txt @@ -26,8 +26,10 @@ file(GLOB io-input-src-files nfs_binary_inputformat.cpp binary_inputformat_impl.cpp binary_inputformat.cpp - inputformat_store.cpp) - + inputformat_store.cpp + elasticsearch_inputformat.cpp + elasticsearch_connector/http.cpp) + if(LIBHDFS3_FOUND) file(GLOB io-input-hdfs-src-files hdfs_file_splitter.cpp hdfs_binary_inputformat.cpp) list(APPEND io-input-src-files ${io-input-hdfs-src-files}) @@ -47,6 +49,11 @@ if(THRIFT_FOUND) list(APPEND io-input-src-files ${io-input-flume-src-files}) endif(THRIFT_FOUND) +if(ORC_FOUND) + file(GLOB io-input-orc-src-files orc_file_splitter.cpp orc_inputformat.cpp) + list(APPEND io-input-src-files ${io-input-orc-src-files}) +endif(ORC_FOUND) + husky_cache_variable(io-input-src-files ${io-input-src-files}) add_library(input-objs OBJECT ${io-input-src-files}) diff --git a/io/input/elasticsearch_connector/http.cpp b/io/input/elasticsearch_connector/http.cpp new file mode 100644 index 0000000..fdc451c --- /dev/null +++ b/io/input/elasticsearch_connector/http.cpp @@ -0,0 +1,795 @@ +/* + * Licensed to cpp-elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "io/input/elasticsearch_connector/http.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "boost/property_tree/json_parser.hpp" +#include "boost/property_tree/ptree.hpp" + +/** Returns true on success, or false if there was an error */ +bool SetSocketBlockingEnabled(int fd, bool blocking) { + if (fd < 0) + return false; + + int flags = fcntl(fd, F_GETFL, 0); + + if (flags < 0) + return false; + + flags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK); + + return (fcntl(fd, F_SETFL, flags) == 0) ? true : false; +} + +int to_int(const std::string& str) { + int numb; + std::istringstream(str) >> numb; + return numb; +} + +HTTP::HTTP() : _connection(0), _sockfd(-1), _keepAlive(false), _keepAliveTimeout(60), _lastRequest(0) {} + +HTTP::~HTTP() { + // Set the socket free. + disconnect(); +} + +void HTTP::set_url(std::string uri, bool keepAlive) { + _keepAlive = keepAlive; + // Remove http protocol if set. + size_t pos = uri.find("http://"); + if (pos != std::string::npos) + uri = uri.substr(pos + 7); + + // Extract the URN + pos = uri.find("/"); + if (pos != std::string::npos) { + _url = uri.substr(0, pos); + _urn = uri.substr(pos); + } else { + _url = uri; + _urn = "/"; + } + + // std::cout << "_urn " << _urn << std::endl; + // std::cout << "_url " << _url << std::endl; + + // Extract the port if it's in the domain name. + pos = _url.find(":"); + if (pos != std::string::npos) { + _port = to_int(_url.substr(pos + 1)); + _url = _url.substr(0, pos); + } else { + _port = 80; + } + + struct hostent* host = gethostbyname(_url.c_str()); + + if ((host == NULL) || (host->h_addr == NULL)) + EXCEPTION("Error retrieving DNS information."); + + // print information about this host: + struct in_addr** addr_list; + // printf("Official name is: %s", host->h_name); + // printf(", IP addresses: "); + addr_list = (struct in_addr**) host->h_addr_list; + /*for (int i = 0; addr_list[i] != NULL; i++) + printf("%s ", inet_ntoa(*addr_list[i])); + + if (_keepAlive) + printf(", session keep alive connection.\n"); + else + printf(", session is not keep alive connection.\n"); + */ + bzero(&_client, sizeof(_client)); + + _client.sin_family = AF_INET; + _client.sin_port = htons(_port); + memcpy(&_client.sin_addr, host->h_addr, host->h_length); +} + +// Returns true if managed to connect. +bool HTTP::connect() { + if (++_connection > 5) + return false; + + // If socket point already present. Close connection. + if (_sockfd >= 0) + close(_sockfd); + + /* Create a socket point */ + _sockfd = socket(AF_INET, SOCK_STREAM, 0); + + if (_sockfd < 0) + EXCEPTION("Error creating socket."); + + // Set socket non-bloking + int flags = fcntl(_sockfd, F_GETFL, 0); + fcntl(_sockfd, F_SETFL, flags | O_NONBLOCK); + + /* Now connect to the server */ + int n = ::connect(_sockfd, (struct sockaddr*) &_client, sizeof(_client)); + if (n < 0 && errno != EINPROGRESS) + EXCEPTION("Failed to connect to host."); + + assert(errno == EINPROGRESS); + errno = 0; + + if (n == 0) { + _connection = 0; + return true; + } + + fd_set rset, wset; + struct timeval tval; + + FD_ZERO(&rset); + + FD_SET(_sockfd, &rset); + wset = rset; + tval.tv_sec = 5; + tval.tv_usec = 0; + + if ((n = select(_sockfd + 1, &rset, &wset, NULL, &tval)) == 0) { + close(_sockfd); /* timeout */ + errno = ETIMEDOUT; + EXCEPTION("Failed to connect to host."); + } + + if (FD_ISSET(_sockfd, &rset) || FD_ISSET(_sockfd, &wset)) { + int errorValue; + socklen_t len = sizeof(errorValue); + if (getsockopt(_sockfd, SOL_SOCKET, SO_ERROR, &errorValue, &len) < 0) + EXCEPTION("Solaris pending error"); + + errno = errorValue; + if (error()) { + close(_sockfd); /* just in case */ + EXCEPTION("error set by getsockopt."); + } + } else { + EXCEPTION("select error: sockfd not set"); + } + + if (error()) { + close(_sockfd); /* just in case */ + EXCEPTION("error set by select."); + } + + _connection = 0; + + return true; +} + +void HTTP::disconnect() { + if (_sockfd >= 0) + close(_sockfd); + + _sockfd = -1; +} + +// Check if the connection is on error state. +bool HTTP::error() { + // no error + if (errno == 0) + return false; + + if (errno == EWOULDBLOCK || errno == EAGAIN) + return false; + + // Socket is already connected + if (errno == EISCONN) + printf("Socket is already connected\n"); + + if (errno == EINVAL) + printf("Exception caught while reading socket - Invalid argument: _sfd = %i\n", _sockfd); + + if (errno == ECONNREFUSED) + printf("Couldn't connect, connection refused.\n"); + + if (errno == EINPROGRESS) + printf("This returns is often see for large ElasticSearch requests.\n"); + + printf("Exception caught while reading socket - %s.\n", strerror(errno)); + + // reset errno + errno = 0; + disconnect(); + return true; +} + +// Get Json Object on web server. +bool HTTP::request(const char* method, const char* endUrl, const char* data, boost::property_tree::ptree* jOutput, + const char* content_type) { + Result result; + request(method, endUrl, data, jOutput, result, content_type); + return (result == OK); +} + +// Get Json Object on web server. +unsigned int HTTP::request(const char* method, const char* endUrl, const char* data, + boost::property_tree::ptree* jOutput, Result& result, const char* content_type) { + unsigned int statusCode = 0; + + std::string output; + statusCode = request(method, endUrl, data, output, result, content_type); + if (result != OK) { + // Give a second chance. + disconnect(); + + statusCode = request(method, endUrl, data, output, result, content_type); + if (result != OK) + return statusCode; + } + + try { + if (jOutput && output.size()) { + std::stringstream ss(output); + boost::property_tree::ptree pt; + read_json(ss, pt); + jOutput->push_back(std::make_pair("main", pt)); + } + } catch (Exception& e) { + printf("parser() failed in Getter. Exception caught: %s\n", e.what()); + result = ERROR; + return statusCode; + } catch (std::exception& e) { + printf("parser() failed in Getter. std::exception caught: %s\n", e.what()); + throw std::exception(e); + } catch (...) { + printf("parser() failed in Getter.\n"); + EXCEPTION("Unknown exception."); + } + + if (jOutput) { + jOutput->put("status", statusCode); + } + + result = OK; + return statusCode; +} + +// Parse the message and split if necessary. +bool HTTP::sendMessage(const char* method, const char* endUrl, const char* data, const char* content_type) { + // Make the request type. + std::string requestString(method); + + assert(requestString == "POST" || requestString == "DELETE" || requestString == "GET" || requestString == "PUT" || + requestString == "HEAD"); + + // Concatenate the page. + requestString += std::string(" "); + requestString += _urn; + + if (endUrl != 0) { + if (_urn.back() != '/') + requestString += std::string("/"); + + requestString += std::string(endUrl); + } + + requestString += std::string(" HTTP/1.1\r\n"); + + // Concatenate the host. + requestString += std::string("Host: "); + requestString += _url; + requestString += std::string("\r\n"); + requestString += std::string("Accept: */*\r\n"); + if (_keepAlive) + requestString += std::string("Connection: Keep-Alive\r\n"); + else + requestString += std::string("Connection: close\r\n"); + + // If no data, send the header and return. + if (data == 0) { + requestString += std::string("\r\n"); + + if (!write(requestString)) + return false; + + return true; + } + + assert(data != 0); + // If data decide to split it or not. + + // Concatenate the content. + requestString += std::string("Content-Type: "); + requestString += std::string(content_type); + requestString += std::string("\r\n"); + + size_t dataSize = strlen(data); + + assert(!error()); + + // If size is small enough, send as one message with the header. + if (dataSize < 1024) { + requestString += std::string("Content-length: "); + requestString += std::to_string(dataSize); + requestString += std::string("\r\n\r\n"); + + requestString += std::string(data); + + if (!write(requestString)) + return false; + + return true; + } + + assert(dataSize >= 1024); + // If size is high then send the header and the rest as chunked message. + requestString += std::string("Transfer-Encoding: chunked\r\n\r\n"); + + if (!write(requestString)) + return false; + + size_t totalSent = 0; + while (totalSent < dataSize) { + size_t chunkSize = std::min(dataSize - totalSent, (size_t) 1024); + + std::stringstream chunkSizeString; + chunkSizeString << std::hex << chunkSize; + + std::string chunk(chunkSizeString.str()); + chunk += "\r\n"; + chunk += std::string(data + totalSent, chunkSize); + chunk += "\r\n"; + +#if !defined(NDEBUG) && VERBOSE >= 4 + show(chunk.c_str(), chunk.length(), __LINE__); +#endif + + if (!write(chunk)) + return false; + + totalSent += chunkSize; + } + + // Final chunk message + if (!write("0\r\n\r\n")) + return false; + + return true; +} + +// Write string on the socketfd. +bool HTTP::write(const std::string& outgoing) { + assert(!error()); + + if (!connected() && !connect()) + EXCEPTION("Cannot write, we're not connected."); + + assert(!error()); + + ssize_t writeReturn = ::write(_sockfd, outgoing.c_str(), outgoing.length()); + + if (writeReturn == 0) { + if (!connect()) + EXCEPTION("write returned 0 and we could not reconnect."); + + write(outgoing); + } + + if (writeReturn < 0) { + error(); + EXCEPTION(outgoing); + } + + if (outgoing.length() != (size_t) writeReturn) { + error(); + EXCEPTION("we did not write everything we wanted to write."); + } + + assert(!error()); + + return true; +} + +bool HTTP::request(const char* method, const char* endUrl, const char* data, std::string& output, + const char* content_type) { + Result result; + request(method, endUrl, data, output, result, content_type); + return (result == OK); +} + +unsigned int HTTP::request(const char* method, const char* endUrl, const char* data, std::string& output, + Result& result, const char* content_type) { + /// Example of request. + /// "POST /test.php HTTP/1.0\r\n" + /// "Host: www.mariequantier.com\r\n" + /// "Content-Type: application/json\r\n" + /// "Content-length: 36\r\n\r\n" + /// Then the content. + /// + /// Where /test.php is the URN and www.mariequantier.com is the URL. + + // Lock guard for every request. + std::lock_guard lock(_requestMutex); + + // If this instance does not keep-alive the connection, we must reconnect each time. + if (!connected() || (connected() && !_keepAlive) || mustReconnect()) { + if (!connect()) + EXCEPTION("Cannot reconnect."); + } + + assert(!error()); + assert(output.empty()); + + unsigned int statusCode = 0; + + if (!sendMessage(method, endUrl, data, content_type)) { + result = ERROR; + return statusCode; + } + + statusCode = readMessage(output, result); + if (result != OK) { + // Clear ouput in case we didn't get the full response. + if (!output.empty()) + output.clear(); + } + + if (_keepAlive) + _lastRequest = time(NULL); + else + // not keep-alive session. + disconnect(); + + // Format string output. + /* + if(output.size() >= 2){ + if(output[output.size() - 2] == '\r' && output[output.size() - 1] == '\n'){ + output[output.size() - 2] = '\0'; + output.resize(output.size() - 1); + } + } */ + + result = OK; + return statusCode; +} + +// Whole process to read the response from HTTP server. +unsigned int HTTP::readMessage(std::string& output, Result& result) { + unsigned int statusCode = 0; + + // Need to loop (recursion may fail because pile up over the stack for large requests. + size_t contentLength = 0; + bool isChunked = false; + do { + statusCode = readMessage(output, contentLength, isChunked, result); + } while (result == MORE_DATA); + + return statusCode; +} + +// Wait with select then start to read the message. +unsigned int HTTP::readMessage(std::string& output, size_t& contentLength, bool& isChunked, Result& result) { + unsigned int statusCode = 0; + + /// First, use select() with a timeout value to determine when the file descriptor is ready to be read. + assert(!error()); + assert(_sockfd >= 0); + + int fd = _sockfd; + + // Time value before timeout. + timeval tval = {40, 0}; + + // Declare file descriptors. + fd_set readSet, errorSet; + + FD_ZERO(&readSet); + FD_ZERO(&errorSet); + FD_SET(fd, &readSet); + FD_SET(fd, &errorSet); + + assert(fd >= 0); + + int ret = select(fd + 1, &readSet, 0, &errorSet, &tval); + + assert(!error()); + + // Is error ? + if (ret < 0) { + disconnect(); + result = ERROR; + return statusCode; + } + + // Is timeout ? + if (ret == 0) { + result = ERROR; + return statusCode; + } + + // Check error on socket + if (FD_ISSET(fd, &errorSet)) { + error(); + result = ERROR; + return statusCode; + } + + // Is read ? + if (FD_ISSET(fd, &readSet)) { + // Parse message. + return parseMessage(output, contentLength, isChunked, result); + } + + result = OK; + return statusCode; +} + +// Append char* to output. +size_t HTTP::appendChunk(std::string& output, char* msg, size_t msgSize) { + assert(msgSize > 0); + +#if !defined(NDEBUG) && VERBOSE >= 4 + show(msg, msgSize, __LINE__); +#endif + + char* afterSize; + size_t chunkSize = strtol(msg, &afterSize, 16); + + if (error()) + return 0; + + if (chunkSize == 0) + return 0; + + // Move after the /r/n characters. + afterSize += 2; + + assert(msgSize + msg > afterSize + 2); + + // Append the result to the output; remove the \r\n as line separator. + if (chunkSize + afterSize + 2 <= msgSize + msg) { + // The chunksize is smaller, we take only the given size. + output.append(afterSize, chunkSize); + } else { + // If the chunksize is higher, we take only what we got in the answer. + output.append(afterSize, msgSize - (afterSize - msg)); + } + + return chunkSize; +} + +// Whole process to read the response from HTTP server. +unsigned int HTTP::parseMessage(std::string& output, size_t& contentLength, bool& isChunked, Result& result) { + unsigned int statusCode = 0; + + // Socket is ready for reading. + char recvline[4096]; + ssize_t readSize = read(_sockfd, recvline, 4095); + + if (readSize <= 0) { + if (!connect()) { + result = ERROR; + return statusCode; + } + + if (readSize < 0) { + if (errno != (EWOULDBLOCK | EAGAIN)) + EXCEPTION("read error on socket"); + errno = 0; + } + + result = MORE_DATA; + return statusCode; + } + + assert(readSize <= 4095); + + // If we already got the header from a non chunked message. + if (contentLength > 0) { + // If we have the content length, append the result to the output. + output.append(recvline, readSize); + } + + // If we already got the header from a chunked message. + if (contentLength == 0 && isChunked) { + // We already tested that the readSize is not 0. + contentLength = appendChunk(output, recvline, readSize); + + // Append the message to the output. + if (contentLength == 0) { + result = OK; + return statusCode; + } + } + + // If this method is called with no contentLength, parse the header. + if (contentLength == 0 && !isChunked) { + char* endStatus = strstr(recvline, "\r\n"); + + if (endStatus == NULL) { + disconnect(); + result = ERROR; + return statusCode; + } + + // Extract and interpret status line. + std::stringstream status(std::string(recvline, endStatus)); + + if (!status) { + disconnect(); + result = ERROR; + return statusCode; + } + + std::string httpVersion; + status >> httpVersion; + + if (httpVersion.substr(0, 5) != "HTTP/") { + disconnect(); + result = ERROR; + return statusCode; + } + + status >> statusCode; + + // Extract status message. + std::stringstream statusMessage; + status >> statusMessage.rdbuf(); + + // Handle the different status' response. + switch (statusCode) { + // If created, then continue. + case 201: + break; + + // If ok, then continue. + case 200: + break; + + // If found, then continue. + case 302: + break; + + // Bad Request + case 400: + std::cout << "Status: Bad Request, you must reconsidered your request." << std::endl; + disconnect(); + result = ERROR; + return statusCode; + + // If forbidden, it's over. + case 403: + std::cout << "Status: Forbidden, you must reconsidered your request." << std::endl; + disconnect(); + result = ERROR; + return statusCode; + + // If 404 then check the message and continue to get the complete response. + case 404: + std::cout << " 404 but statusMessage is not \"Not Found\"." << std::endl; + disconnect(); + break; + + // If 500 then print the message and break. + case 500: + std::cout << " 500 but statusMessage is not \"Internal Server Error\"." << std::endl; + disconnect(); + result = ERROR; + return statusCode; + + // If unhandled state, return false. + default: + std::cout << "Weird status code: " << statusCode << std::endl; + disconnect(); + result = ERROR; + return statusCode; + } + + // Extract and interpret the header. + char* endHeader = strstr(endStatus + 2, "\r\n\r\n"); + if (endHeader == NULL) { + disconnect(); + result = ERROR; + return statusCode; + } + size_t headerSize = endHeader + 4 - recvline; + + // Extract and interpret the header. + char* contentLenghtPos = strstr(endStatus + 2, "Content-Length:"); + + // If we've got the content-length. + if (contentLenghtPos == NULL) { + // If the message is chunked, restart the method with the flag on. + if (strstr(endStatus + 2, "Transfer-Encoding: chunked") != NULL) { + // Set the transfer as chunked. + isChunked = true; + + // If we did not get the size of the chunk read message as a chunk, go on reading. + if (readSize - headerSize <= 0) { + result = MORE_DATA; + return statusCode; + } + + // If the message content the length, parse the size. + contentLength = appendChunk(output, endHeader + 4, readSize - headerSize); + + if (contentLength == 0) { + result = OK; + return statusCode; + } + + } else { + disconnect(); + result = ERROR; + return statusCode; + } + } else { + // We received the content-length.else + contentLength = atoi(contentLenghtPos + 15); + + // Error due to conversion may set errno. + if (error()) { + result = ERROR; + return statusCode; + } + + // Copy content. + assert(endHeader - recvline + contentLength + 4 >= (unsigned int) readSize); + output.append(endHeader + 4, readSize - headerSize); + } + } + + while (output.length() < contentLength) { + char nextContent[4096]; + readSize = read(_sockfd, nextContent, 4095); + + // When we did not receive the data yet. Wait with select. + if (readSize == 0) { + result = MORE_DATA; + return statusCode; + } + + // When there is nothing more to read but the output is incomplete, wait with select. + if (readSize < 0) { + if (errno != (EWOULDBLOCK | EAGAIN)) + EXCEPTION("read error on socket"); + errno = 0; + result = MORE_DATA; + return statusCode; + } + + output.append(nextContent, readSize); + } + + // If chunked but no size, we return until we receive the answer. + if (isChunked && contentLength == 0) { + result = MORE_DATA; + return statusCode; + } + + result = OK; + return statusCode; +} diff --git a/io/input/elasticsearch_connector/http.h b/io/input/elasticsearch_connector/http.h new file mode 100644 index 0000000..27b8d5b --- /dev/null +++ b/io/input/elasticsearch_connector/http.h @@ -0,0 +1,166 @@ +/* + * Licensed to cpp-elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "boost/property_tree/json_parser.hpp" +#include "boost/property_tree/ptree.hpp" + +#define _TEXT_PLAIN "text/plain" +#define _APPLICATION_JSON "application/json" +#define _APPLICATION_URLENCODED "application/x-www-form-urlencoded" + +#define EXCEPTION(...) throw Exception(__FILE__, __LINE__, __VA_ARGS__) + +class Exception : std::exception { + public: + template + Exception(const char* fil, int lin, T const& msg); + virtual ~Exception() throw() {} + + const char* what() const throw() { return _msg.c_str(); } + + private: + Exception() {} + std::string _msg; +}; + +template +Exception::Exception(const char* fil, int lin, T const& msg) { + _msg = msg; + std::cerr << "Exception in " << fil << " l. " << lin << " ->\n"; + std::cerr << msg << std::endl; +} + +enum Result { OK, ERROR, MORE_DATA }; + +class HTTP { + public: + HTTP(); + ~HTTP(); + void set_url(std::string uri, bool keepAlive = false); + + /// DEPRECATED + /// Generic request that parses the result in Json::Object. + bool request(const char* method, const char* endUrl, const char* data, boost::property_tree::ptree* root, + const char* content_type = _APPLICATION_JSON); + + /// Generic request that parses the result in Json::Object. + unsigned int request(const char* method, const char* endUrl, const char* data, boost::property_tree::ptree* root, + Result& result, const char* content_type = _APPLICATION_JSON); + + /// DEPRECATED + /// Generic request that stores result in the string. + bool request(const char* method, const char* endUrl, const char* data, std::string& output, + const char* content_type = _APPLICATION_JSON); + + /// Generic request that stores result in the string. + unsigned int request(const char* method, const char* endUrl, const char* data, std::string& output, Result& result, + const char* content_type = _APPLICATION_JSON); + + /// Generic get request to node. + inline unsigned int get(const char* endUrl, const char* data, boost::property_tree::ptree* root) { + Result result; + return request("GET", endUrl, data, root, result); + } + + /// Generic head request to node. + inline unsigned int head(const char* endUrl, const char* data, boost::property_tree::ptree* root) { + Result result; + return request("HEAD", endUrl, data, root, result); + } + + /// Generic put request to node. + inline unsigned int put(const char* endUrl, const char* data, boost::property_tree::ptree* root) { + Result result; + return request("PUT", endUrl, data, root, result); + } + + /// Generic post request to node. + inline unsigned int post(const char* endUrl, const char* data, boost::property_tree::ptree* root) { + Result result; + return request("POST", endUrl, data, root, result); + } + + /// Generic delete request to node. + inline unsigned int remove(const char* endUrl, const char* data, boost::property_tree::ptree* root) { + Result result; + return request("DELETE", endUrl, data, root, result); + } + + /// Generic post request to node. + inline unsigned int rawpost(const char* endUrl, const char* data, boost::property_tree::ptree* root) { + Result result; + return request("POST", endUrl, data, root, result, _APPLICATION_URLENCODED); + } + + private: + /// Returns true if managed to connect. + bool connect(); + + /// Parse the message and split if necessary. + bool sendMessage(const char* method, const char* endUrl, const char* data, const char* content_type); + + /// Write string on the socketfd. + bool write(const std::string& outgoing); + + /// Test socket point. + inline bool connected() const { return (_sockfd >= 0); } + + /// Close the socket. + void disconnect(); + + /// Whole process to read the response from HTTP server. + unsigned int readMessage(std::string& output, Result& result); + + /// Wait with select then start to read the message. + unsigned int readMessage(std::string& output, size_t& contentLength, bool& isChunked, Result& result); + + /// Methods to read chunked messages. + unsigned int parseMessage(std::string& output, size_t& contentLength, bool& isChunked, Result& result); + + /// Append the chunk message to the stream. + size_t appendChunk(std::string& output, char* msg, size_t msgSize); + + /// Check if the connection is on error state. + bool error(); + + /// Determine if we must reconnect. + inline bool mustReconnect() const { return (_keepAliveTimeout <= time(NULL) - _lastRequest); } + + std::string _url; + std::string _urn; + int _port; + unsigned int _connection; + int _sockfd; + struct sockaddr_in _client; + bool _keepAlive; + time_t _keepAliveTimeout; + time_t _lastRequest; + + /// Mutex for every request. + std::mutex _requestMutex; +}; diff --git a/io/input/elasticsearch_inputformat.cpp b/io/input/elasticsearch_inputformat.cpp new file mode 100644 index 0000000..5163765 --- /dev/null +++ b/io/input/elasticsearch_inputformat.cpp @@ -0,0 +1,282 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include + +#include "base/exception.hpp" +#include "base/serialization.hpp" +#include "boost/property_tree/json_parser.hpp" +#include "boost/property_tree/ptree.hpp" +#include "core/constants.hpp" +#include "core/context.hpp" +#include "core/coordinator.hpp" +#include "io/input/elasticsearch_connector/http.h" +#include "io/input/elasticsearch_inputformat.hpp" + +namespace husky { +namespace io { + +enum ElasticsearchInputFormatSetUp { + NotSetUp = 0, + ServerSetUp = 1 << 1, + AllSetUp = ServerSetUp, +}; + +ElasticsearchInputFormat::ElasticsearchInputFormat() { + records_vector_.clear(); + is_setup_ = ElasticsearchInputFormatSetUp::NotSetUp; +} + +bool ElasticsearchInputFormat::set_server(const std::string& server, bool local_prefer, bool local_only) { + is_local_prefer_ = local_prefer; + server_ = server; + std::size_t found = server_.find(":"); + std::string port = server.substr(found + 1); + is_local_only_ = local_only; + if (local_only) { + server_ = husky::Context::get_worker_info().get_hostname(husky::Context::get_worker_info().get_process_id()) + + ":" + port; + http_conn_.set_url(server_, true); + if (!is_active()) + throw base::HuskyException("local only mode failed, please set local_only = false"); + is_setup_ = ElasticsearchInputFormatSetUp::AllSetUp; + } else { + if (is_local_prefer_) { + server_ = + husky::Context::get_worker_info().get_hostname(husky::Context::get_worker_info().get_process_id()) + + ":" + port; + http_conn_.set_url(server_, true); + if (!is_active()) { + is_local_prefer_ = false; + server_ = server; + http_conn_.set_url(server_, true); + husky::LOG_I << "local engine fail, reconnect with remote engine"; + if (!is_active()) + throw base::HuskyException( + "Cannot create local engine, database is not active and try the remote engine"); + husky::LOG_I << "reconnect successfully"; + } + is_setup_ = ElasticsearchInputFormatSetUp::AllSetUp; + } else { + server_ = server; + http_conn_.set_url(server_, true); + if (!is_active()) + throw base::HuskyException("Cannot connect to server"); + is_setup_ = ElasticsearchInputFormatSetUp::AllSetUp; + } + } + // geting the local node_id from the elasticsearch + std::ostringstream oss; + oss << "_nodes/_local"; + boost::property_tree::ptree msg; + http_conn_.get(oss.str().c_str(), 0, &msg); + node_id = msg.get_child("main").get_child("nodes").begin()->first; + + return true; +} + +ElasticsearchInputFormat::~ElasticsearchInputFormat() { records_vector_.clear(); } + +bool ElasticsearchInputFormat::is_setup() const { return !(is_setup_ ^ ElasticsearchInputFormatSetUp::AllSetUp); } + +bool ElasticsearchInputFormat::is_active() { + boost::property_tree::ptree root; + try { + http_conn_.get(0, 0, &root); + } catch (Exception& e) { + husky::LOG_I << "get(0) failed in ElasticsearchInputformat::is_active(). Exception caught: %s\n" << e.what(); + return false; + } catch (std::exception& e) { + husky::LOG_I << "get(0) failed in ElasticsearchInputFormat::is_active(). std::exception caught: %s\n" + << e.what(); + return false; + } catch (...) { + husky::LOG_I << "get(0) failed in ElasticsearchInputFormat::is_active().\n"; + return false; + } + + if (root.empty()) + return false; + + if (root.get("status") != 200) { + husky::LOG_I << "Status is not 200. Cannot find Elasticsearch Node.\n"; + return false; + } + + return true; +} + +int ElasticsearchInputFormat::find_shard() { + std::stringstream url; + url << index_ << "/_search_shards?"; + boost::property_tree::ptree obj; + http_conn_.get(url.str().c_str(), 0, &obj); + boost::property_tree::ptree arr = obj.get_child("main").get_child("shards"); + int shard_num = 0; + for (auto it = arr.begin(); it != arr.end(); ++it) { + auto iter(it->second); + for (auto it_(iter.begin()); it_ != (iter).end(); ++it_) { + boost::property_tree::ptree obj_ = (it_->second); + std::string _node = obj_.get("node"); + std::string _pri = obj_.get("primary"); + if (_node == node_id && _pri == "true") { + records_shards_[shard_num] = obj_.get("shard"); + shard_num++; + } + } + } + return shard_num; +} + +void ElasticsearchInputFormat::set_query(const std::string& index, const std::string& type, const std::string& query) { + index_ = index; + type_ = type; + query_ = query; + records_vector_.clear(); + while (true) { + std::string shard_; + if (is_local_only_) { + int shard_num = find_shard(); + int worker_num = husky::Context::get_num_local_workers(); + int id = husky::Context::get_local_tid(); + if (id >= shard_num) + return; + shard_ = records_shards_[id]; + while (id + worker_num < shard_num) { + id = id + worker_num; + shard_ = shard_ + "," + records_shards_[id]; + } + } else { + BinStream question; + question << server_ << index_ << node_id; + BinStream answer = husky::Context::get_coordinator()->ask_master(question, husky::TYPE_ELASTICSEARCH_REQ); + answer >> shard_; + } + if (shard_ == "No shard") + return; + std::stringstream url; + url << index_ << "/" << type_ << "/_search?preference=_shards:" << shard_; + http_conn_.post(url.str().c_str(), query_.c_str(), &result); + if (result.get_child("main").empty()) { + std::cout << url.str() << " -d " << query << std::endl; + throw base::HuskyException("Search failed."); + } + if (result.get_child("main").get("timed_out")) { + throw base::HuskyException("Search timed out."); + } + ElasticsearchInputFormat::read(result, false); + if (is_local_only_) break; + } +} + +bool ElasticsearchInputFormat::get_document(const std::string& index, const std::string& type, const std::string& id) { + index_ = index; + type_ = type; + id_ = id; + std::stringstream url; + url << index_ << "/" << type_ << "/" << id_; + http_conn_.get(url.str().c_str(), 0, &result); + boost::property_tree::ptree pt = result.get_child("main"); + std::stringstream ss; + write_json(ss, pt); + records_vector_.clear(); + records_vector_.push_back(ss.str()); + return pt.get("found"); +} + +int ElasticsearchInputFormat::scan_fully(const std::string& index, const std::string& type, const std::string& query, + int scrollSize) { + index_ = index; + type_ = type; + query_ = query; + records_vector_.clear(); + bool is_first = true; + while (true) { + std::string shard_; + + if (is_local_only_) { + int shard_num = find_shard(); + int worker_num = husky::Context::get_num_local_workers(); + int id = husky::Context::get_local_tid(); + if (id >= shard_num) + return 0; + shard_ = records_shards_[id]; + while (id + worker_num < shard_num) { + id = id + worker_num; + shard_ = shard_ + "," + records_shards_[id]; + } + } else { + BinStream question; + question << server_ << index_ << node_id; + BinStream answer = husky::Context::get_coordinator()->ask_master(question, husky::TYPE_ELASTICSEARCH_REQ); + answer >> shard_; + } + if (shard_ == "No shard") + break; + std::stringstream scrollUrl; + scrollUrl << index << "/" << type << "/_search?preference=_shards:" << shard_ + << "&search_type=scan&scroll=10m&size=" << scrollSize; + boost::property_tree::ptree scrollObject; + http_conn_.post(scrollUrl.str().c_str(), query_.c_str(), &scrollObject); + if (scrollObject.get_child("main").get_child("hits").empty()) + EXCEPTION("Result corrupted, no member \"hits\"."); + if (!scrollObject.get_child("main").get_child("hits").get("total")) + EXCEPTION("Result corrupted, no member \"total\" nested in \"hits\"."); + int total = scrollObject.get_child("main").get_child("hits").get("total"); + std::string scrollId = scrollObject.get_child("main").get("_scroll_id"); + int count = 0; + while (count < total) { + boost::property_tree::ptree result; + http_conn_.rawpost("_search/scroll?scroll=10m", scrollId.c_str(), &result); + scrollId = result.get_child("main").get("_scroll_id"); + read(result, false); + for (auto it = result.get_child("main").get_child("hits").get_child("hits").begin(); + it != result.get_child("main").get_child("hits").get_child("hits").end(); ++it) + ++count; + } + if (count != total) + throw base::HuskyException("Result corrupted, total is different from count."); + if (is_local_only_) + break; + } + return 0; +} + +void ElasticsearchInputFormat::read(boost::property_tree::ptree jresult, bool is_clear) { + if (!records_vector_.empty() && is_clear) + records_vector_.clear(); + boost::property_tree::ptree pt = jresult.get_child("main").get_child("hits").get_child("hits"); + for (auto it = pt.begin(); it != pt.end(); ++it) { + std::stringstream ss; + write_json(ss, it->second); + records_vector_.push_back(ss.str()); + } +} + +bool ElasticsearchInputFormat::next(RecordT& ref) { + if (!records_vector_.empty()) { + ref = records_vector_.back(); + records_vector_.pop_back(); + return true; + } + return false; +} + +} // namespace io +} // namespace husky diff --git a/io/input/elasticsearch_inputformat.hpp b/io/input/elasticsearch_inputformat.hpp new file mode 100644 index 0000000..7e05c3e --- /dev/null +++ b/io/input/elasticsearch_inputformat.hpp @@ -0,0 +1,68 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include "boost/property_tree/json_parser.hpp" +#include "boost/property_tree/ptree.hpp" +#include "io/input/elasticsearch_connector/http.h" +#include "io/input/inputformat_base.hpp" + +namespace husky { +namespace io { + +class ElasticsearchInputFormat final : public InputFormatBase { + public: + typedef std::string RecordT; + ElasticsearchInputFormat(); + virtual ~ElasticsearchInputFormat(); + virtual bool is_setup() const; + bool is_active(); + int find_shard(); + + bool set_server(const std::string& server, bool local_prefer = true, + bool local_only = false); + + void set_query(const std::string& index, const std::string& type, const std::string& query); + + bool get_document(const std::string& index, const std::string& type, const std::string& id); + + int scan_fully(const std::string& index, const std::string& type, const std::string& query, int scrollSize = 100); + + virtual bool next(RecordT& ref); + + void read(boost::property_tree::ptree jresult, bool is_clear = true); + + protected: + bool is_local_prefer_; + bool is_local_only_; + boost::property_tree::ptree result; + std::string node_; + std::string node_id; + std::string server_; + std::string index_; + std::string type_; + std::string id_; + std::string query_; + std::string router_; + std::vector records_vector_; + std::string records_shards_[100]; + HTTP http_conn_; +}; + +} // namespace io +} // namespace husky diff --git a/io/input/inputformat_store.cpp b/io/input/inputformat_store.cpp index bfa5400..93ee914 100644 --- a/io/input/inputformat_store.cpp +++ b/io/input/inputformat_store.cpp @@ -78,6 +78,17 @@ BinaryInputFormat& InputFormatStore::create_binary_inputformat(const std::string return *binary_input_format; } +#ifdef WITH_ORC +ORCInputFormat& InputFormatStore::create_orc_inputformat() { + InputFormatMap& inputformat_map = get_inputformat_map(); + int id = g_gen_inputformat_id++; + ASSERT_MSG(inputformat_map.find(id) == inputformat_map.end(), "Should not be reached"); + auto* orc_input_format = new ORCInputFormat(); + inputformat_map.insert({id, orc_input_format}); + return *orc_input_format; +} +#endif + #ifdef WITH_THRIFT FlumeInputFormat& InputFormatStore::create_flume_inputformat(std::string rcv_host, int rcv_port) { InputFormatMap& inputformat_map = get_inputformat_map(); @@ -100,6 +111,15 @@ MongoDBInputFormat& InputFormatStore::create_mongodb_inputformat() { } #endif +ElasticsearchInputFormat& InputFormatStore::create_elasticsearch_inputformat() { + InputFormatMap& inputformat_map = get_inputformat_map(); + int id = g_gen_inputformat_id++; + ASSERT_MSG(inputformat_map.find(id) == inputformat_map.end(), "Should not be reached"); + auto* elasticsearch_input_format = new ElasticsearchInputFormat(); + inputformat_map.insert({id, elasticsearch_input_format}); + return *elasticsearch_input_format; +} + void InputFormatStore::drop_all_inputformats() { if (s_inputformat_map == nullptr) return; diff --git a/io/input/inputformat_store.hpp b/io/input/inputformat_store.hpp index a227f2d..2d6d35a 100644 --- a/io/input/inputformat_store.hpp +++ b/io/input/inputformat_store.hpp @@ -26,8 +26,12 @@ #ifdef WITH_MONGODB #include "io/input/mongodb_inputformat.hpp" #endif +#include "io/input/elasticsearch_inputformat.hpp" #include "io/input/separator_inputformat.hpp" #include "io/input/xml_inputformat.hpp" +#ifdef WITH_ORC +#include "io/input/orc_inputformat.hpp" +#endif namespace husky { namespace io { @@ -41,12 +45,16 @@ class InputFormatStore { static SeparatorInputFormat& create_separator_inputformat(const std::string& pattern); static XMLInputFormat& create_xml_inputformat(const std::string& start_pattern, const std::string& end_pattern); static BinaryInputFormat& create_binary_inputformat(const std::string& url, const std::string& filter = ""); +#ifdef WITH_ORC + static ORCInputFormat& create_orc_inputformat(); +#endif #ifdef WITH_THRIFT static FlumeInputFormat& create_flume_inputformat(std::string rcv_host, int rcv_port); #endif #ifdef WITH_MONGODB static MongoDBInputFormat& create_mongodb_inputformat(); #endif + static ElasticsearchInputFormat& create_elasticsearch_inputformat(); static void drop_all_inputformats(); static void init_inputformat_map(); diff --git a/io/input/inputformat_store_unittest.cpp b/io/input/inputformat_store_unittest.cpp index df3dfee..66d4c5e 100644 --- a/io/input/inputformat_store_unittest.cpp +++ b/io/input/inputformat_store_unittest.cpp @@ -82,6 +82,14 @@ TEST_F(TestInputFormatStore, MongoDBInputFormat) { } #endif +TEST_F(TestInputFormatStore, ElasticsearchInputFormat) { + auto& infmt1 = InputFormatStore::create_elasticsearch_inputformat(); + EXPECT_EQ(InputFormatStore::size(), 1); + auto& infmt2 = InputFormatStore::create_elasticsearch_inputformat(); + EXPECT_EQ(InputFormatStore::size(), 2); + InputFormatStore::drop_all_inputformats(); +} + } // namespace } // namespace io } // namespace husky diff --git a/io/input/orc_file_splitter.cpp b/io/input/orc_file_splitter.cpp new file mode 100644 index 0000000..c42e127 --- /dev/null +++ b/io/input/orc_file_splitter.cpp @@ -0,0 +1,164 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifdef WITH_ORC + +#include "io/input/orc_file_splitter.hpp" + +#include +#include + +#include "boost/utility/string_ref.hpp" +#ifdef WITH_HDFS +#include "hdfs/hdfs.h" +#endif +#include "orc/ColumnPrinter.hh" +#include "orc/OrcFile.hh" +#include "orc/orc-config.hh" + +#include "base/log.hpp" +#include "base/serialization.hpp" +#include "core/constants.hpp" +#include "core/context.hpp" +#include "core/coordinator.hpp" +#include "io/input/orc_hdfs_inputstream.hpp" + +namespace orc { + +class SQLColumnPrinter : public ColumnPrinter { + public: + SQLColumnPrinter(std::string& buffer, const Type& type) : ColumnPrinter(buffer) { + for (unsigned int i = 0; i < type.getSubtypeCount(); ++i) { + // fieldNames.push_back(type.getFieldName(i)); + fieldPrinter.push_back(createColumnPrinter(buffer, type.getSubtype(i)).release()); + } + } + + virtual ~SQLColumnPrinter() { + for (size_t i = 0; i < fieldPrinter.size(); ++i) { + delete fieldPrinter[i]; + } + } + + void printRow(uint64_t rowId) override { + if (hasNulls && !notNull[rowId]) { + writeString(buffer, "null"); + } else { + // writeChar(buffer, '{'); + for (unsigned int i = 0; i < fieldPrinter.size(); ++i) { + if (i != 0) { + writeString(buffer, "\t"); + } + // writeChar(buffer, '"'); + // writeString(buffer, fieldNames[i].c_str()); + // writeString(buffer, "\": "); + fieldPrinter[i]->printRow(rowId); + } + // writeChar(buffer, '}'); + } + } + void reset(const ColumnVectorBatch& batch) override { + ColumnPrinter::reset(batch); + const StructVectorBatch& structBatch = dynamic_cast(batch); + for (size_t i = 0; i < fieldPrinter.size(); ++i) { + fieldPrinter[i]->reset(*(structBatch.fields[i])); + } + } + + private: + void writeChar(std::string& file, char ch) { file += ch; } + + void writeString(std::string& file, const char* ptr) { + size_t len = strlen(ptr); + file.append(ptr, len); + } + + std::vector fieldPrinter; +}; + +} // namespace orc + +namespace husky { +namespace io { + +using orc::ColumnVectorBatch; +using orc::createReader; +using orc::ReaderOptions; +using orc::readLocalFile; +using orc::SQLColumnPrinter; + +// default number of lines in one read operation +// size_t ORCFileSplitter::row_batch_size = 8 * 1024; + +ORCFileSplitter::ORCFileSplitter() {} + +ORCFileSplitter::~ORCFileSplitter() { hdfsDisconnect(fs_); } +// initialize reader with the file url +void ORCFileSplitter::load(std::string url) { + cur_fn_ = ""; + url_ = url; + + struct hdfsBuilder* builder = hdfsNewBuilder(); + hdfsBuilderSetNameNode(builder, husky::Context::get_param("hdfs_namenode").c_str()); + hdfsBuilderSetNameNodePort(builder, std::stoi(husky::Context::get_param("hdfs_namenode_port"))); + fs_ = hdfsBuilderConnect(builder); + hdfsFreeBuilder(builder); +} + +// ask master for offset and url +boost::string_ref ORCFileSplitter::fetch_batch() { + BinStream question; + question << url_; + BinStream answer = husky::Context::get_coordinator()->ask_master(question, husky::TYPE_ORC_BLK_REQ); + std::string fn; + size_t offset; + answer >> fn; + answer >> offset; + if (fn == "") { + return ""; + } else if (fn != cur_fn_) { + cur_fn_ = fn; + ReaderOptions opts; + reader_ = createReader(read_hdfs_file(fs_, cur_fn_), opts); + } + return read_by_batch(offset); +} + +boost::string_ref ORCFileSplitter::read_by_batch(size_t offset) { + buffer_.clear(); + try { + std::string line = ""; + reader_->seekToRow(offset); + std::unique_ptr printer(new SQLColumnPrinter(line, reader_->getSelectedType())); + std::unique_ptr batch = reader_->createRowBatch(kOrcRowBatchSize); + + if (reader_->next(*batch)) { + printer->reset(*batch); + for (unsigned int i = 0; i < batch->numElements; ++i) { + line.clear(); + printer->printRow(i); + line += "\n"; + buffer_ += line; + } + } + } catch (const std::exception& e) { + LOG_I << e.what(); + } + return boost::string_ref(buffer_); +} + +} // namespace io +} // namespace husky + +#endif diff --git a/io/input/orc_file_splitter.hpp b/io/input/orc_file_splitter.hpp new file mode 100644 index 0000000..648463f --- /dev/null +++ b/io/input/orc_file_splitter.hpp @@ -0,0 +1,55 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#ifdef WITH_ORC + +#include + +#include "boost/utility/string_ref.hpp" +#include "hdfs/hdfs.h" +#include "orc/OrcFile.hh" + +#include "io/input/file_splitter_base.hpp" + +namespace husky { +namespace io { + +class ORCFileSplitter { + public: + ORCFileSplitter(); + virtual ~ORCFileSplitter(); + + // intialize the url of the orc file + virtual void load(std::string url); + boost::string_ref fetch_batch(); + + protected: + boost::string_ref read_by_batch(size_t offset); + std::string buffer_; + // url may be a directory or a file + std::string url_; + // current filename + std::string cur_fn_; + // orc reader to help to read orc files + std::unique_ptr reader_; + + hdfsFS fs_; +}; + +} // namespace io +} // namespace husky + +#endif diff --git a/io/input/orc_hdfs_inputstream.hpp b/io/input/orc_hdfs_inputstream.hpp new file mode 100644 index 0000000..fcd11f3 --- /dev/null +++ b/io/input/orc_hdfs_inputstream.hpp @@ -0,0 +1,92 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#ifdef WITH_ORC + +#include +#include + +#include "hdfs/hdfs.h" +#include "orc/OrcFile.hh" + +#include "base/exception.hpp" +#include "base/log.hpp" +#include "base/thread_support.hpp" + +namespace husky { +namespace io { + +using husky::base::HuskyException; + +const int kOrcRowBatchSize = 5000; + +class HDFSFileInputStream final : public orc::InputStream { + public: + HDFSFileInputStream(hdfsFS hdfs_fs, const std::string& file) { + hdfs_fs_ = hdfs_fs; + file_name_ = file; + hdfs_file_ = hdfsOpenFile(hdfs_fs_, file_name_.c_str(), O_RDONLY, 0, 0, 0); + assert(hdfs_file_ != NULL); + hdfsFileInfo* file_info = hdfsGetPathInfo(hdfs_fs_, file_name_.c_str()); + length_ = file_info->mSize; + hdfsFreeFileInfo(file_info, 1); + } + + ~HDFSFileInputStream() { hdfsCloseFile(hdfs_fs_, hdfs_file_); } + + uint64_t getLength() const override { return static_cast(length_); } + + uint64_t getNaturalReadSize() const override { return 128 * 1024; } + + void read(void* buf, uint64_t length, uint64_t offset) override { + if (!buf) + throw HuskyException("Buffer is null"); + + hdfsSeek(hdfs_fs_, hdfs_file_, offset); + int32_t remain = static_cast(length); + int32_t start = 0; + int32_t nbytes = 0; + while (remain > 0) { + // only 128KB per hdfsRead + nbytes = hdfsRead(hdfs_fs_, hdfs_file_, buf + start, remain); + start += nbytes; + remain -= nbytes; + } + + if (start == -1) + throw HuskyException("Bad read of " + file_name_); + if (static_cast(start) != length) + throw HuskyException("Short read of " + file_name_); + } + + const std::string& getName() const override { return file_name_; } + + private: + std::string file_name_; + hdfsFile hdfs_file_; + hdfsFS hdfs_fs_; + int64_t length_; + std::mutex read_mutex; +}; + +std::unique_ptr read_hdfs_file(hdfsFS hdfs_fs, const std::string& path) { + return std::unique_ptr(new HDFSFileInputStream(hdfs_fs, path)); +} + +} // namespace io +} // namespace husky + +#endif diff --git a/io/input/orc_inputformat.cpp b/io/input/orc_inputformat.cpp new file mode 100644 index 0000000..dcf8451 --- /dev/null +++ b/io/input/orc_inputformat.cpp @@ -0,0 +1,81 @@ + +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "io/input/orc_inputformat.hpp" + +#include +#include + +#include "boost/utility/string_ref.hpp" + +#include "base/log.hpp" +#include "io/input/inputformat_helper.hpp" + +namespace husky { +namespace io { + +enum ORCInputFormatSetUp { + NotSetUp = 0, + InputSetUp = 1 << 1, + AllSetUp = InputSetUp, +}; + +ORCInputFormat::ORCInputFormat() { is_setup_ = ORCInputFormatSetUp::NotSetUp; } + +bool ORCInputFormat::is_setup() const { return !(is_setup_ ^ ORCInputFormatSetUp::AllSetUp); } + +void ORCInputFormat::set_input(const std::string& url) { + if (!url_.empty() && url_ == url) + // Setting with a same url last time will do nothing. + return; + url_ = url; + int prefix = url_.find("://"); + ASSERT_MSG(prefix != std::string::npos, ("Cannot analyze protocol from " + url_).c_str()); + std::string protocol = url_.substr(0, prefix); + splitter_.load(url_.substr(prefix + 3)); + is_setup_ |= ORCInputFormatSetUp::InputSetUp; +} + +// buffer_ got from the orc_splitter must be '\n' seperated lines +// this saves us a lot of block handling +bool ORCInputFormat::next(boost::string_ref& ref) { + if (buffer_.empty() || r == buffer_.size() - 1) { + clear_buffer(); + bool success = fetch_new_batch(); + if (success == false) { + return false; + } + } + r = helper::find_next(buffer_, l, '\n'); + ref = buffer_.substr(l, r - l); + l = helper::find_next(buffer_, r, '\n') + 1; + return true; +} + +bool ORCInputFormat::fetch_new_batch() { + buffer_ = splitter_.fetch_batch(); + if (buffer_.empty()) { + return false; + } + return true; +} + +void ORCInputFormat::clear_buffer() { + buffer_.clear(); + l = r = 0; +} + +} // namespace io +} // namespace husky diff --git a/io/input/orc_inputformat.hpp b/io/input/orc_inputformat.hpp new file mode 100644 index 0000000..34009af --- /dev/null +++ b/io/input/orc_inputformat.hpp @@ -0,0 +1,52 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include "boost/utility/string_ref.hpp" + +#include "io/input/inputformat_base.hpp" +#include "io/input/orc_file_splitter.hpp" + +namespace husky { +namespace io { + +class ORCInputFormat : public InputFormatBase { + public: + typedef boost::string_ref RecordT; + + ORCInputFormat(); + virtual ~ORCInputFormat() = default; + + virtual void set_input(const std::string& url); + virtual bool next(boost::string_ref& ref); + virtual bool is_setup() const; + + protected: + bool fetch_new_batch(); + void clear_buffer(); + std::string url_; + + int l = 0; + int r = 0; + boost::string_ref buffer_; + ORCFileSplitter splitter_; +}; + +} // namespace io +} // namespace husky diff --git a/io/output/CMakeLists.txt b/io/output/CMakeLists.txt index a681962..d8ac31c 100644 --- a/io/output/CMakeLists.txt +++ b/io/output/CMakeLists.txt @@ -15,7 +15,7 @@ include_directories(${PROJECT_SOURCE_DIR} ${EXTERNAL_INCLUDE}) -file(GLOB io-output-src-file outputformat_base.cpp) +file(GLOB io-output-src-file outputformat_base.cpp elasticsearch_outputformat.cpp) if(MONGOCLIENT_FOUND) file(GLOB io-output-mongo-src-file mongodb_outputformat.cpp) diff --git a/io/output/elasticsearch_outputformat.cpp b/io/output/elasticsearch_outputformat.cpp new file mode 100644 index 0000000..49b17d5 --- /dev/null +++ b/io/output/elasticsearch_outputformat.cpp @@ -0,0 +1,164 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include + +#include "boost/property_tree/json_parser.hpp" +#include "boost/property_tree/ptree.hpp" +#include "core/context.hpp" +#include "io/output/elasticsearch_outputformat.hpp" + +namespace husky { +namespace io { + +enum ElasticsearchOutputFormatSetUp { + NotSetUp = 0, + ServerSetUp = 1 << 1, + AllSetUp = ServerSetUp, +}; + +ElasticsearchOutputFormat::ElasticsearchOutputFormat() { + records_vector_.clear(); + is_setup_ = ElasticsearchOutputFormatSetUp::NotSetUp; + bound_ = 1024; +} + +bool ElasticsearchOutputFormat::set_server(const std::string& server, const bool& local_prefer) { + is_local_prefer_ = local_prefer; + server_ = server; + std::string port = server_.substr(server_.find(":") + 1); + if (is_local_prefer_) { + server_ = husky::Context::get_worker_info().get_hostname(husky::Context::get_worker_info().get_process_id()) + + ":" + port; + http_conn_.set_url(server_, true); + if (!is_active()) { + server_ = server; + http_conn_.set_url(server_, true); + husky::LOG_I << "local engine fail, reconnect with remote engine"; + if (!is_active()) + throw base::HuskyException( + "Cannot create local engine, database is not active and try the remote engine"); + husky::LOG_I << "reconnect successfully"; + } + } else { + server_ = server; + http_conn_.set_url(server_, true); + if (!is_active()) + throw base::HuskyException("Cannot connect to server"); + is_setup_ = ElasticsearchOutputFormatSetUp::AllSetUp; + } + // geting the local node_id from the elasticsearch + std::ostringstream oss; + oss << "_nodes/_local"; + boost::property_tree::ptree msg; + http_conn_.get(oss.str().c_str(), 0, &msg); + node_id = msg.get_child("main").get_child("nodes").begin()->first; + + return true; +} + +ElasticsearchOutputFormat::~ElasticsearchOutputFormat() {} + +bool ElasticsearchOutputFormat::is_setup() const { return !(is_setup_ ^ ElasticsearchOutputFormatSetUp::AllSetUp); } + +bool ElasticsearchOutputFormat::is_active() { + boost::property_tree::ptree root; + try { + http_conn_.get(0, 0, &root); + } catch (Exception& e) { + husky::LOG_I << "get(0) failed in ElasticSearch::isActive(). Exception caught: %s\n" << e.what(); + return false; + } catch (std::exception& e) { + husky::LOG_I << "get(0) failed in ElasticSearch::isActive(). std::exception caught: %s\n" << e.what(); + return false; + } catch (...) { + husky::LOG_I << "get(0) failed in ElasticSearch::isActive().\n"; + return false; + } + + if (root.empty()) + return false; + + if (root.get("status") != 200) { + husky::LOG_I << "Status is not 200. Cannot find Elasticsearch Node.\n"; + return false; + } + + return true; +} + +bool ElasticsearchOutputFormat::set_index(const std::string& index, const std::string& type, + const boost::property_tree::ptree& content) { + index_ = index; + type_ = type; + std::stringstream url; + std::stringstream data; + write_json(data, content); + boost::property_tree::ptree result; + url << index_ << "/" << type_ << "/"; + http_conn_.post(url.str().c_str(), data.str().c_str(), &result); + return true; +} + +bool ElasticsearchOutputFormat::set_index(const std::string& index, const std::string& type, const std::string& id, + const boost::property_tree::ptree& content) { + index_ = index; + type_ = type; + id_ = id; + std::stringstream url; + std::stringstream data; + write_json(data, content); + boost::property_tree::ptree result; + url << index_ << "/" << type_ << "/" << id_; + http_conn_.put(url.str().c_str(), data.str().c_str(), &result); + return true; +} + +bool ElasticsearchOutputFormat::bulk_add(const std::string& opt, const std::string& index, const std::string& type, + const std::string& id, const std::string& content) { + index_ = index; + type_ = type; + id_ = id; + opt_ = opt; + data << "{\"" << opt_ << "\":{\"_index\":\"" << index_ + "\",\"_type\":\"" << type_ << "\",\"_id\":\"" << id_ + << "\"}}" << std::endl; + records_vector_.push_back(content); + data << content << std::endl; + if (bulk_is_full()) + bulk_flush(); + return true; +} + +bool ElasticsearchOutputFormat::bulk_is_full() { + if (records_vector_.size() >= bound_) + return true; + return false; +} + +void ElasticsearchOutputFormat::bulk_flush() { + if (records_vector_.empty()) + return; + records_vector_.clear(); + http_conn_.post("/_bulk", data.str().c_str(), &result); + data.clear(); + data.str(""); +} + +} // namespace io +} // namespace husky diff --git a/io/output/elasticsearch_outputformat.hpp b/io/output/elasticsearch_outputformat.hpp new file mode 100644 index 0000000..fcad88e --- /dev/null +++ b/io/output/elasticsearch_outputformat.hpp @@ -0,0 +1,72 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include "boost/property_tree/json_parser.hpp" +#include "boost/property_tree/ptree.hpp" +#include "io/input/elasticsearch_connector/http.h" +#include "io/output/outputformat_base.hpp" + +namespace husky { +namespace io { + +class ElasticsearchOutputFormat final : public OutputFormatBase { + public: + typedef std::string RecordT; + ElasticsearchOutputFormat(); + virtual ~ElasticsearchOutputFormat(); + virtual bool is_setup() const; + bool is_active(); + + bool set_server(const std::string& server, const bool& local_prefer = true); + + bool set_index(const std::string& index, const std::string& type, const std::string& id, + const boost::property_tree::ptree& content); + + bool set_index(const std::string& index, const std::string& type, const boost::property_tree::ptree& content); + + bool bulk_add(const std::string& opt, const std::string& index, const std::string& type, const std::string& id, + const std::string& content); + + bool bulk_is_full(); + + void bulk_flush(); + + void bulk_setbound(const int bound) { bound_ = bound; } + + protected: + bool is_local_prefer_; + boost::property_tree::ptree result; + std::string node_; + std::string node_id; + std::string server_; + std::string index_; + std::string type_; + std::string id_; + std::string opt_; + int bound_; + boost::property_tree::ptree content_; + std::string shard_; + std::string router_; + std::vector records_vector_; + std::stringstream data; + HTTP http_conn_; +}; + +} // namespace io +} // namespace husky diff --git a/master/CMakeLists.txt b/master/CMakeLists.txt index ebf0da8..899101a 100644 --- a/master/CMakeLists.txt +++ b/master/CMakeLists.txt @@ -25,6 +25,12 @@ if(MONGOCLIENT_FOUND) list(APPEND master_plugins mongodb_assigner.cpp) endif(MONGOCLIENT_FOUND) +if(ORC_FOUND) + list(APPEND master_plugins orc_assigner.cpp) +endif(ORC_FOUND) + +list(APPEND master_plugins elasticsearch_assigner.cpp) + add_library(husky-master-objs OBJECT master.cpp ${master_plugins}) husky_default_properties(husky-master-objs) diff --git a/master/elasticsearch_assigner.cpp b/master/elasticsearch_assigner.cpp new file mode 100644 index 0000000..326d01f --- /dev/null +++ b/master/elasticsearch_assigner.cpp @@ -0,0 +1,113 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "master/elasticsearch_assigner.hpp" + +#include +#include + +#include "base/log.hpp" +#include "boost/property_tree/ptree.hpp" +#include "core/constants.hpp" +#include "core/zmq_helpers.hpp" +#include "io/input/elasticsearch_connector/http.h" +#include "master/master.hpp" + +namespace husky { + +static ESShardAssigner es_shard_assigner; + +ESShardAssigner::ESShardAssigner() { + Master::get_instance().register_main_handler(TYPE_ELASTICSEARCH_REQ, + std::bind(&ESShardAssigner::master_elasticsearch_req_handler, this)); +} + +void ESShardAssigner::master_elasticsearch_req_handler() { + auto& master = Master::get_instance(); + auto master_socket = master.get_socket(); + std::string server, index, node; + BinStream stream = zmq_recv_binstream(master_socket.get()); + stream >> server >> index >> node; + std::string ret = answer(server, index, node); + stream.clear(); + stream << ret; + + zmq_sendmore_string(master_socket.get(), master.get_cur_client()); + zmq_sendmore_dummy(master_socket.get()); + zmq_send_binstream(master_socket.get(), stream); + LOG_I << server << " => " << ret; +} + +ESShardAssigner::~ESShardAssigner() { + shards_.clear(); + nodes_.clear(); +} + +void ESShardAssigner::create_shards() { + shards_.clear(); + nodes_.clear(); + HTTP http_conn_; + http_conn_.set_url(server_, false); + std::stringstream url; + url << index_ << "/_search_shards?"; + boost::property_tree::ptree obj; + http_conn_.get(url.str().c_str(), 0, &obj); + boost::property_tree::ptree arr = obj.get_child("main").get_child("shards"); + for (auto it = arr.begin(); it != arr.end(); ++it) { + auto iter(it->second); + for (auto it_(iter.begin()); it_ != (iter).end(); ++it_) { + boost::property_tree::ptree obj_ = (it_->second); + std::string _node = obj_.get("node"); + std::string _pri = obj_.get("primary"); + if (_pri == "true") { + nodes_.push_back(_node); + shards_.push_back(obj_.get("shard")); + } + shard_num_ = shards_.size(); + } + } +} + +std::string ESShardAssigner::answer(const std::string& server, const std::string& index, std::string node) { + if (server_ != server && index_ != index) { + server_ = server; + index_ = index; + if (server_ == "reset server") + return ""; + create_shards(); + } + + if (shards_.empty()) { + return "No shard"; + } + + int size = shards_.size(); + for (int i = 1; i <= size; i++) { + std::string nid = nodes_.front(); + nodes_.pop_front(); + std::string ret = shards_.front(); + shards_.pop_front(); + if (nid == node || i == size) { + LOG_I << i << " "; + return ret; + + break; + } else { + nodes_.push_back(nid); + shards_.push_back(ret); + } + } +} + +} // namespace husky diff --git a/master/elasticsearch_assigner.hpp b/master/elasticsearch_assigner.hpp new file mode 100644 index 0000000..aafa75b --- /dev/null +++ b/master/elasticsearch_assigner.hpp @@ -0,0 +1,44 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +namespace husky { + +class ESShardAssigner { + public: + ESShardAssigner(); + void master_elasticsearch_req_handler(); + // void master_elasticsearch_req_end_handler(); + // void master_setup_handler(); + virtual ~ESShardAssigner(); + + void create_shards(); + std::string answer(const std::string&, const std::string&, std::string node); + // void recieve_end(MongoDBSplit& split); + + private: + int shard_num_; + + std::string server_; + std::string index_; + std::deque nodes_; + std::deque shards_; +}; + +} // namespace husky diff --git a/master/orc_assigner.cpp b/master/orc_assigner.cpp new file mode 100644 index 0000000..1be9755 --- /dev/null +++ b/master/orc_assigner.cpp @@ -0,0 +1,152 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifdef WITH_ORC + +#include "master/orc_assigner.hpp" + +#include +#include +#include + +#include "boost/filesystem.hpp" +#include "orc/OrcFile.hh" + +#include "base/log.hpp" +#include "core/constants.hpp" +#include "core/context.hpp" +#include "io/input/orc_hdfs_inputstream.hpp" +#include "master/master.hpp" + +namespace husky { + +using orc::Reader; +using orc::ReaderOptions; +using orc::createReader; +using orc::readLocalFile; +using orc::ColumnVectorBatch; + +static ORCBlockAssigner orc_block_assigner; + +ORCBlockAssigner::ORCBlockAssigner() { + Master::get_instance().register_main_handler(TYPE_ORC_BLK_REQ, + std::bind(&ORCBlockAssigner::master_main_handler, this)); + Master::get_instance().register_setup_handler(std::bind(&ORCBlockAssigner::master_setup_handler, this)); +} + +void ORCBlockAssigner::master_main_handler() { + auto& master = Master::get_instance(); + auto resp_socket = master.get_socket(); + std::string url; + BinStream stream = zmq_recv_binstream(resp_socket.get()); + stream >> url; + + std::pair ret = answer(url); + stream.clear(); + stream << ret.first << ret.second; + + zmq_sendmore_string(resp_socket.get(), master.get_cur_client()); + zmq_sendmore_dummy(resp_socket.get()); + zmq_send_binstream(resp_socket.get(), stream); + + LOG_I << " => " << ret.first << "@" << ret.second; +} + +void ORCBlockAssigner::master_setup_handler() { + init_hdfs(Context::get_param("hdfs_namenode"), Context::get_param("hdfs_namenode_port")); + num_workers_alive = Context::get_num_workers(); +} + +void ORCBlockAssigner::init_hdfs(const std::string& node, const std::string& port) { + int num_retries = 3; + while (num_retries--) { + struct hdfsBuilder* builder = hdfsNewBuilder(); + hdfsBuilderSetNameNode(builder, node.c_str()); + hdfsBuilderSetNameNodePort(builder, std::stoi(port)); + fs_ = hdfsBuilderConnect(builder); + hdfsFreeBuilder(builder); + if (fs_) + break; + } + if (fs_) { + return; + } + LOG_I << "Failed to connect to HDFS " << node << ":" << port; +} + +void ORCBlockAssigner::browse_hdfs(const std::string& url) { + if (!fs_) + return; + + try { + int num_files; + size_t total = 0; + auto& files = files_dict[url]; + hdfsFileInfo* file_info = hdfsListDirectory(fs_, url.c_str(), &num_files); + for (int i = 0; i < num_files; ++i) { + // for every file in a directory + if (file_info[i].mKind != kObjectKindFile) + continue; + ReaderOptions opts; + reader = createReader(io::read_hdfs_file(fs_, file_info[i].mName), opts); + size_t num_rows = reader->getNumberOfRows(); + files.push_back(OrcFileDesc{std::string(file_info[i].mName) + '\0', num_rows, 0}); + total += num_rows; + } + LOG_I << "Total num of rows: " << total; + hdfsFreeFileInfo(file_info, num_files); + } catch (const std::exception& ex) { + LOG_I << "Exception cought: " << ex.what(); + } +} + +/** + * @return selected_file, offset + */ +std::pair ORCBlockAssigner::answer(std::string& url) { + if (!fs_) { + return {"", 0}; + } + // Directory or file status initialization + // This condition is true either when the begining of the file or + // all the workers has finished reading this file or directory + if (files_dict.find(url) == files_dict.end()) { + browse_hdfs(url); + finish_dict[url] = 0; + } + + // empty url + auto& files = files_dict[url]; + if (files.empty()) { + finish_dict[url] += 1; + if (finish_dict[url] == num_workers_alive) { + files_dict.erase(url); + } + return {"", 0}; + } + + auto& file = files.back(); + std::pair ret = {file.filename, file.offset}; + file.offset += io::kOrcRowBatchSize; + + // remove + if (file.offset > file.num_of_rows) + files.pop_back(); + + return ret; +} + +} // namespace husky + +#endif diff --git a/master/orc_assigner.hpp b/master/orc_assigner.hpp new file mode 100644 index 0000000..9a6a234 --- /dev/null +++ b/master/orc_assigner.hpp @@ -0,0 +1,59 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#ifdef WITH_ORC + +#include +#include +#include +#include + +#include "orc/OrcFile.hh" + +#include "io/input/orc_hdfs_inputstream.hpp" +#include "master/master.hpp" + +namespace husky { + +struct OrcFileDesc { + std::string filename; + size_t num_of_rows; + size_t offset; +}; + +class ORCBlockAssigner { + public: + ORCBlockAssigner(); + ~ORCBlockAssigner() = default; + + void master_main_handler(); + void master_setup_handler(); + void init_hdfs(const std::string& node, const std::string& port); + void browse_hdfs(const std::string& url); + std::pair answer(std::string& url); + + private: + hdfsFS fs_ = NULL; + // int row_batch_size = 8 * 1024; + int num_workers_alive; + // std::map file_offset; + // std::map file_size; + std::map> files_dict; + std::map finish_dict; + std::unique_ptr reader; +}; +} // namespace husky + +#endif diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 786ae55..5798331 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -56,6 +56,13 @@ target_link_libraries(TestMongoDBOutputFormat ${husky}) target_link_libraries(TestMongoDBOutputFormat ${HUSKY_EXTERNAL_LIB}) husky_default_properties(TestMongoDBOutputFormat) + +# TestMongoToES +add_executable(TestMongoToES test-mongo-to-es.cpp) +target_link_libraries(TestMongoToES ${husky}) +target_link_libraries(TestMongoToES ${HUSKY_EXTERNAL_LIB}) +husky_default_properties(TestMongoToES) + # TestMemoryChecker add_executable(TestMemoryChecker test-memory-checker.cpp) target_link_libraries(TestMemoryChecker ${husky}) diff --git a/test/test-mongo-to-es.cpp b/test/test-mongo-to-es.cpp new file mode 100644 index 0000000..e659bd5 --- /dev/null +++ b/test/test-mongo-to-es.cpp @@ -0,0 +1,70 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include "boost/tokenizer.hpp" +#include "mongo/bson/bson.h" +#include "mongo/client/dbclient.h" + +#include "base/serialization.hpp" +#include "boost/property_tree/json_parser.hpp" +#include "boost/property_tree/ptree.hpp" +#include "core/engine.hpp" +#include "io/input/inputformat_store.hpp" +#include "io/output/elasticsearch_outputformat.hpp" +#include "lib/aggregator_factory.hpp" + +void mongo_to_es() { + auto& infmt = husky::io::InputFormatStore::create_mongodb_inputformat(); + infmt.set_server(husky::Context::get_param("mongo_server")); + infmt.set_ns(husky::Context::get_param("mongo_db"), husky::Context::get_param("mongo_collection")); + infmt.set_query(""); + std::string server = husky::Context::get_param("elasticsearch_server"); + std::string index = husky::Context::get_param("elasticsearch_index"); + std::string type = husky::Context::get_param("elasticsearch_type"); + + husky::io::ElasticsearchOutputFormat outfmt; + outfmt.set_server(server, true); + outfmt.bulk_setbound(1000); + auto work = [&](std::string& chunk) { + mongo::BSONObj o = mongo::fromjson(chunk); + o = o.removeField("_id"); + std::string id = o.getStringField("id"); + if (chunk.size() == 0) + return; + outfmt.bulk_add("index", index, type, id, o.jsonString()); + }; + + husky::load(infmt, work); + outfmt.bulk_flush(); +} + +int main(int argc, char** argv) { + std::vector args; + args.push_back("mongo_server"); + args.push_back("mongo_db"); + args.push_back("mongo_collection"); + args.push_back("elasticsearch_server"); + args.push_back("elasticsearch_index"); + args.push_back("elasticsearch_type"); + if (husky::init_with_args(argc, argv, args)) { + husky::run_job(mongo_to_es); + return 0; + } + return 1; +}