Skip to content

Commit

Permalink
[IO] Add ElasticSearch Connector (#267)
Browse files Browse the repository at this point in the history
Resolve #155.
  • Loading branch information
OwenDeng1993 authored and kygx-legend committed Jun 6, 2017
1 parent c641cd6 commit d0566fa
Show file tree
Hide file tree
Showing 19 changed files with 1,935 additions and 3 deletions.
1 change: 1 addition & 0 deletions core/constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,6 @@ const uint32_t TYPE_EXIT = 0x47d79fd5;
const uint32_t TYPE_GET_HASH_RING = 0x48d693d5;
const uint32_t TYPE_NFS_FILE_REQ = 0x4E465251;
const uint32_t TYPE_HDFS_FILE_REQ = 0x48465251;
const uint32_t TYPE_ELASTICSEARCH_REQ = 0xfa081098;

} // namespace husky
6 changes: 6 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ target_link_libraries(WordCountMR ${husky})
target_link_libraries(WordCountMR ${HUSKY_EXTERNAL_LIB})
husky_default_properties(WordCountMR)

# WordCountES
add_executable(WordCountES wc_mr_elasticsearch.cpp)
target_link_libraries(WordCountES ${husky})
target_link_libraries(WordCountES ${HUSKY_EXTERNAL_LIB})
husky_default_properties(WordCountES)

# WordCountMongo
add_executable(WordCountMongo wc_mr_mongo.cpp)
target_link_libraries(WordCountMongo ${husky})
Expand Down
121 changes: 121 additions & 0 deletions examples/wc_mr_elasticsearch.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2016 Husky Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <set>
#include <string>
#include <utility>
#include <vector>

#include "boost/property_tree/json_parser.hpp"
#include "boost/property_tree/ptree.hpp"
#include "boost/tokenizer.hpp"

#include "base/serialization.hpp"
#include "core/engine.hpp"
#include "io/input/inputformat_store.hpp"
#include "lib/aggregator_factory.hpp"

class Word {
public:
using KeyT = std::string;

Word() = default;
explicit Word(const KeyT& w) : word(w) {}
const KeyT& id() const { return word; }

KeyT word;
int count = 0;
};

bool operator<(const std::pair<int, std::string>& a, const std::pair<int, std::string>& b) {
return a.first == b.first ? a.second < b.second : a.first < b.first;
}

void wc() {
std::string server(husky::Context::get_param("elasticsearch_server"));
std::string index(husky::Context::get_param("elasticsearch_index"));
std::string type(husky::Context::get_param("elaticsearch_type"));
auto& infmt = husky::io::InputFormatStore::create_elasticsearch_inputformat();
infmt.set_server(server);
std::string query(" { \"query\": { \"match_all\":{}}}");
infmt.scan_fully(index, type, query, 1000);
auto& word_list = husky::ObjListStore::create_objlist<Word>();
auto& ch = husky::ChannelStore::create_push_combined_channel<int, husky::SumCombiner<int>>(infmt, word_list);

auto parse_wc = [&](std::string& chunk) {
if (chunk.size() == 0)
return;
boost::property_tree::ptree pt;
std::stringstream ss(chunk);
read_json(ss, pt);
std::string content = pt.get_child("_source").get<std::string>("content");
boost::char_separator<char> sep(" \t");
boost::tokenizer<boost::char_separator<char>> tok(content, sep);
for (auto& w : tok) {
ch.push(1, w);
}
};
husky::load(infmt, parse_wc);

// Show topk words.
const int kMaxNum = 100;
typedef std::set<std::pair<int, std::string>> TopKPairs;
auto add_to_topk = [](TopKPairs& pairs, const std::pair<int, std::string>& p) {
if (pairs.size() == kMaxNum && *pairs.begin() < p)
pairs.erase(pairs.begin());
if (pairs.size() < kMaxNum)
pairs.insert(p);
};
husky::lib::Aggregator<TopKPairs> unique_topk(
TopKPairs(),
[add_to_topk](TopKPairs& a, const TopKPairs& b) {
for (auto& i : b) {
add_to_topk(a, i);
}
},
[](TopKPairs& a) { a.clear(); },
[add_to_topk](husky::base::BinStream& in, TopKPairs& pairs) {
pairs.clear();
for (size_t n = husky::base::deser<size_t>(in); n--;)
add_to_topk(pairs, husky::base::deser<std::pair<int, std::string>>(in));
},
[](husky::base::BinStream& out, const TopKPairs& pairs) {
out << pairs.size();
for (auto& p : pairs)
out << p;
});

husky::list_execute(word_list, [&ch, &unique_topk, add_to_topk](Word& word) {
unique_topk.update(add_to_topk, std::make_pair(ch.get(word), word.id()));
});

husky::lib::AggregatorFactory::sync();

if (husky::Context::get_global_tid() == 0) {
for (auto& i : unique_topk.get_value())
husky::LOG_I << i.second << " " << i.first;
}
}

int main(int argc, char** argv) {
std::vector<std::string> args;
args.push_back("elasticsearch_server");
args.push_back("elasticsearch_index");
args.push_back("elasticsearch_type");
if (husky::init_with_args(argc, argv, args)) {
husky::run_job(wc);
return 0;
}
return 1;
}
6 changes: 4 additions & 2 deletions io/input/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ file(GLOB io-input-src-files
nfs_binary_inputformat.cpp
binary_inputformat_impl.cpp
binary_inputformat.cpp
inputformat_store.cpp)

inputformat_store.cpp
elasticsearch_inputformat.cpp
elasticsearch_connector/http.cpp)

if(LIBHDFS3_FOUND)
file(GLOB io-input-hdfs-src-files hdfs_file_splitter.cpp hdfs_binary_inputformat.cpp)
list(APPEND io-input-src-files ${io-input-hdfs-src-files})
Expand Down
Loading

0 comments on commit d0566fa

Please sign in to comment.