From 87a444882a038c0198dc8cd70d0ad0a96a9b938b Mon Sep 17 00:00:00 2001 From: "yipeng01@baidu.com" Date: Wed, 31 May 2017 17:22:52 +0800 Subject: [PATCH 1/6] add observer --- include/observer/executor.h | 35 +++++ include/observer/observer.h | 72 +++++++++ src/observer/executor/executor_impl.cc | 203 +++++++++++++++++++++++++ src/observer/executor/executor_impl.h | 88 +++++++++++ src/observer/executor/observer_impl.cc | 53 +++++++ src/observer/executor/scanner.cc | 187 +++++++++++++++++++++++ src/observer/executor/scanner.h | 54 +++++++ src/observer/executor/tuple.h | 45 ++++++ src/observer/observer_demo.cc | 145 ++++++++++++++++++ 9 files changed, 882 insertions(+) create mode 100644 include/observer/executor.h create mode 100644 include/observer/observer.h create mode 100644 src/observer/executor/executor_impl.cc create mode 100644 src/observer/executor/executor_impl.h create mode 100644 src/observer/executor/observer_impl.cc create mode 100644 src/observer/executor/scanner.cc create mode 100644 src/observer/executor/scanner.h create mode 100644 src/observer/executor/tuple.h create mode 100644 src/observer/observer_demo.cc diff --git a/include/observer/executor.h b/include/observer/executor.h new file mode 100644 index 000000000..c806c01e3 --- /dev/null +++ b/include/observer/executor.h @@ -0,0 +1,35 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef OBSERVER_EXECUTOR_H_ +#define OBSERVER_EXECUTOR_H_ + +#include "observer.h" + +#pragma GCC visibility push(default) +namespace observer { + +/// 执行器 +class Executor { +public: + static Executor* NewExecutor(); + + // 注册需要运行的Observer + virtual bool RegisterObserver(Observer* observer) = 0; + + // 启动接口 + virtual bool Run() = 0; + + Executor() {} + virtual ~Executor() {} + +private: + Executor(const Executor&); + void operator=(const Executor&); +}; + +} // namespace observer +#pragma GCC visibility pop + +#endif // OBSERVER_EXECUTOR_H_ diff --git a/include/observer/observer.h b/include/observer/observer.h new file mode 100644 index 000000000..fe5db73de --- /dev/null +++ b/include/observer/observer.h @@ -0,0 +1,72 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef OBSERVER_OBSERVER_H_ +#define OBSERVER_OBSERVER_H_ + +#include +#include +#include +#include "tera.h" + +#pragma GCC visibility push(default) +namespace observer { + +struct Column { + std::string table_name; + std::string family; + std::string qualifier; + bool operator<(const Column& other) const { + std::string str1 = table_name + family + qualifier; + std::string str2 = other.table_name + other.family + other.qualifier; + return str1 < str2; + } +}; + +typedef std::vector ColumnList; +// +typedef std::map ColumnMap; + +/// 基于Tera跨行事务, 实现大规模表格上增量实时触发计算框架 +class Observer { +public: + // 传入观察者唯一标示名以及被观察列 + Observer(const std::string& observer_name, ColumnList& observed_columns); + virtual ~Observer(); + + // 用户实现此接口拿到观察列上变化数据, 完成计算 + virtual bool OnNotify(tera::Transaction* t, + tera::Table* table, + const std::string& row, + const Column& column, + const std::string& value, + int64_t timestamp); + + // 用户实现此接口做初始化操作 + virtual bool Init(); + + // 用户实现此接口做结束操作 + virtual bool Close(); + + // 清除通知 + bool Ack(ColumnList& columns, const std::string& row, int64_t timestamp); + + // 设置通知, 触发下游observer + bool Notify(ColumnList& columns, const std::string& row, int64_t timestamp); + + std::string GetName() const; + ColumnMap& GetColumnMap(); +private: + Observer(const Observer&); + void operator=(const Observer&); + +private: + std::string observer_name_; + ColumnMap column_map_; +}; + +} // namespace observer +#pragma GCC visibility pop + +#endif // ONSERVER_OBSERVER_H_ diff --git a/src/observer/executor/executor_impl.cc b/src/observer/executor/executor_impl.cc new file mode 100644 index 000000000..756beb8ca --- /dev/null +++ b/src/observer/executor/executor_impl.cc @@ -0,0 +1,203 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include +#include +#include +#include "executor_impl.h" + +DECLARE_string(observer_tera_flag_file); +DECLARE_int32(observer_proc_thread_num); +DECLARE_int64(observer_proc_pending_num_max); +DECLARE_string(observer_notify_column_name); + +namespace observer { + +static tera::Client* g_tera_client = NULL; +static Mutex g_table_mutex; +static TableMap g_table_map; + +Executor* Executor::NewExecutor() { + return new ExecutorImpl(); +} + +ExecutorImpl::ExecutorImpl() : quit_(false), + proc_thread_pool_(new common::ThreadPool(FLAGS_observer_proc_thread_num)), + scanner_(NULL) { + observer_set_.clear(); + observer_map_.clear(); + reduce_column_map_.clear(); +} + +ExecutorImpl::~ExecutorImpl() { +} + +bool ExecutorImpl::RegisterObserver(Observer* observer) { + observer_set_.insert(observer); + + // init tera client + tera::ErrorCode err; + if (NULL == g_tera_client) { + g_tera_client = tera::Client::NewClient(FLAGS_observer_tera_flag_file, &err); + if (tera::ErrorCode::kOK != err.GetType()) { + LOG(ERROR) << "init tera client [" << FLAGS_observer_tera_flag_file << "] failed"; + return false; + } + } + + ColumnMap& column_map = observer->GetColumnMap(); + ColumnMap::iterator it = column_map.begin(); + for (; it != column_map.end(); ++it) { + MutexLock locker(&g_table_mutex); + if (g_table_map.end() == g_table_map.find(it->first)) { + // init table + tera::Table* new_table = g_tera_client->OpenTable(it->first, &err); + if (tera::ErrorCode::kOK != err.GetType()) { + LOG(ERROR) << "open tera table [" << it->first << "] failed"; + return false; + } + LOG(INFO) << "open tera table [" << it->first << "] succ"; + + // build map: + g_table_map[it->first] = new_table; + } + for (size_t idx = 0; idx < it->second.size(); ++idx) { + // build map: + observer_map_[it->second[idx]].push_back(observer); + + // build map: + reduce_column_map_[g_table_map[it->first]].insert(it->second[idx]); + } + } + + return true; +} + +bool ExecutorImpl::Run() { + if (0 == observer_map_.size()) { + LOG(ERROR) << "no observer, please register observers first"; + return false; + } + + // init scanner + scanner_ = new Scanner(this); + if (!scanner_->Init()) { + LOG(ERROR) << "init Scanner failed"; + Quit(); + return false; + } + + // init observers (user definition) + for (ObserverSet::iterator it = observer_set_.begin(); it != observer_set_.end(); ++it) { + (*it)->Init(); + } + + while (!quit_) { + ThisThread::Sleep(1); + } + + // close observers (user definition) + for (ObserverSet::iterator it = observer_set_.begin(); it != observer_set_.end(); ++it) { + (*it)->Close(); + } + + // close scanner + if (scanner_ != NULL) { + scanner_->Close(); + delete scanner_; + } + + // close table + for (TableMap::iterator it = g_table_map.begin(); it != g_table_map.end(); ++it) { + if (it->second != NULL) { + delete it->second; + } + } + + // close tera client + if (g_tera_client != NULL) { + delete g_tera_client; + } + + return true; +} + +bool ExecutorImpl::Process(TuplePtr tuple) { + // find observers + ObserverMap::iterator it = observer_map_.find(tuple->observed_column); + if (observer_map_.end() == it) { + LOG(ERROR) << "no match observers, table=" << tuple->observed_column.table_name << + " cf=" << tuple->observed_column.family << " qu=" << tuple->observed_column.qualifier; + return false; + } + // notify observers + for (size_t idx = 0; idx < it->second.size(); ++idx) { + proc_thread_pool_->AddTask(std::bind(&ExecutorImpl::DoNotify, this, tuple, it->second[idx])); + } + + return true; +} + +bool ExecutorImpl::DoNotify(TuplePtr tuple, Observer* observer) { + return observer->OnNotify(tuple->t, + tuple->table, + tuple->row, + tuple->observed_column, + tuple->value, + tuple->timestamp); +} + +bool ExecutorImpl::ProcTaskPendingFull() { + return (proc_thread_pool_->PendingNum() > FLAGS_observer_proc_pending_num_max); +} + +bool ExecutorImpl::SetOrClearNotification(ColumnList& columns, + const std::string& row, + int64_t timestamp, + NotificationType type) { + bool ret = true; + tera::ErrorCode err; + // reduce columns + ColumnReduceMap reduce_map; + for (size_t idx = 0; idx < columns.size(); ++idx) { + MutexLock locker(&g_table_mutex); + TableMap::iterator it = g_table_map.find(columns[idx].table_name); + if (g_table_map.end() == it) { + tera::Table* new_table = g_tera_client->OpenTable(columns[idx].table_name, &err); + if (tera::ErrorCode::kOK != err.GetType()) { + LOG(ERROR) << "open table failed, name=" << columns[idx].table_name << + " err=" << err.GetReason(); + return false; + } + g_table_map[columns[idx].table_name] = new_table; + } + reduce_map[it->second].insert(columns[idx]); + } + // set or clear notification columns + ColumnReduceMap::iterator table_it = reduce_map.begin(); + for (; table_it != reduce_map.end(); ++table_it) { + tera::RowMutation* mutation = table_it->first->NewRowMutation(row); + ColumnSet::iterator column_it = table_it->second.begin(); + for (; column_it != table_it->second.end(); ++column_it) { + std::string notify_qualifier = column_it->family + "+" + column_it->qualifier; + if (kSetNotification == type) { + mutation->Put(FLAGS_observer_notify_column_name, notify_qualifier, "1", timestamp); + } else if (kClearNotification == type) { + // 删除t->StartTimestamp之前的通知标记, 避免通知标记发生变更引起数据丢失 + mutation->DeleteColumns(FLAGS_observer_notify_column_name, notify_qualifier, timestamp); + } else { + LOG(ERROR) << "error notification type=" << type; + } + } + table_it->first->ApplyMutation(mutation); + if (mutation->GetError().GetType() != tera::ErrorCode::kOK) { + LOG(ERROR) << "set or clear notification failed, err=" << mutation->GetError().GetReason(); + ret = false; + } + delete mutation; + } + return ret; +} + +} // namespace observer diff --git a/src/observer/executor/executor_impl.h b/src/observer/executor/executor_impl.h new file mode 100644 index 000000000..135e02ce4 --- /dev/null +++ b/src/observer/executor/executor_impl.h @@ -0,0 +1,88 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef OBSERVER_EXECUTOR_IMPL_H_ +#define OBSERVER_EXECUTOR_IMPL_H_ + +#include +#include "common/base/scoped_ptr.h" +#include "common/thread_pool.h" +#include "common/this_thread.h" +#include "tuple.h" +#include "scanner.h" +#include "observer/executor.h" + +namespace observer { + +enum NotificationType { + kSetNotification = 1, + kClearNotification = 2, +}; + +typedef std::set ObserverSet; +typedef std::vector ObserverList; + +// +typedef std::map ObserverMap; +// +typedef std::map TableMap; +// +typedef std::map ColumnReduceMap; + +class Scanner; + +class ExecutorImpl : public Executor { +public: + ExecutorImpl(); + virtual ~ExecutorImpl(); + + // 注册需要运行的Observer + virtual bool RegisterObserver(Observer* observer); + + // 启动接口 + virtual bool Run(); + + bool Process(TuplePtr tuple); + + bool ProcTaskPendingFull(); + + // 进程退出 + void Quit() { + quit_ = true; + } + + bool GetQuit() const { + return quit_; + } + + ColumnReduceMap& GetColumnReduceMap() { + return reduce_column_map_; + } + + static bool SetOrClearNotification(ColumnList& columns, + const std::string& row, + int64_t timestamp, + NotificationType type); + +private: + ExecutorImpl(const ExecutorImpl&); + void operator=(const ExecutorImpl&); + + bool DoNotify(TuplePtr tuple, Observer* observer); + +private: + volatile bool quit_; + scoped_ptr proc_thread_pool_; + Scanner* scanner_; + + ObserverSet observer_set_; + // 每个列对应多个Observer + ObserverMap observer_map_; + // 每个table对应多个被观察列 + ColumnReduceMap reduce_column_map_; +}; + +} // namespace observer + +#endif // OBSERVER_EXECUTOR_H_ diff --git a/src/observer/executor/observer_impl.cc b/src/observer/executor/observer_impl.cc new file mode 100644 index 000000000..30eab75d6 --- /dev/null +++ b/src/observer/executor/observer_impl.cc @@ -0,0 +1,53 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "observer/observer.h" +#include "executor_impl.h" + +namespace observer { + +Observer::Observer(const std::string& observer_name, + ColumnList& observed_columns): observer_name_(observer_name) { + for (size_t idx = 0; idx < observed_columns.size(); ++idx) { + column_map_[observed_columns[idx].table_name].push_back(observed_columns[idx]); + } +} + +Observer::~Observer() { +} + +bool Observer::OnNotify(tera::Transaction* t, + tera::Table* table, + const std::string& row, + const Column& column, + const std::string& value, + int64_t timestamp) { + return true; +} + +bool Observer::Init() { + return true; +} + +bool Observer::Close() { + return true; +} + +std::string Observer::GetName() const { + return observer_name_; +} + +ColumnMap& Observer::GetColumnMap() { + return column_map_; +} + +bool Observer::Ack(ColumnList& columns, const std::string& row, int64_t timestamp) { + return ExecutorImpl::SetOrClearNotification(columns, row, timestamp, kClearNotification); +} + +bool Observer::Notify(ColumnList& columns, const std::string& row, int64_t timestamp) { + return ExecutorImpl::SetOrClearNotification(columns, row, timestamp, kSetNotification); +} + +} // namespace observer diff --git a/src/observer/executor/scanner.cc b/src/observer/executor/scanner.cc new file mode 100644 index 000000000..09b6dd64b --- /dev/null +++ b/src/observer/executor/scanner.cc @@ -0,0 +1,187 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include +#include +#include +#include "scanner.h" + +DECLARE_int32(observer_scan_thread_num); +DECLARE_bool(observer_scan_async_switch); +DECLARE_int32(observer_read_thread_num); +DECLARE_string(observer_notify_column_name); + +namespace observer { + +Scanner::Scanner(ExecutorImpl* executor_impl) : executor_impl_(executor_impl), + read_thread_pool_(new common::ThreadPool(FLAGS_observer_read_thread_num)) { +} + +Scanner::~Scanner() { +} + +bool Scanner::Init() { + ColumnReduceMap& column_map = executor_impl_->GetColumnReduceMap(); + if (FLAGS_observer_scan_thread_num < (int)column_map.size()) { + LOG(ERROR) << "some tables can't be scaned, scanner_num=" << + FLAGS_observer_scan_thread_num << " table_num=" << column_map.size(); + } + + // 启动scan线程 + ColumnReduceMap::iterator it = column_map.begin(); + scan_thread_list_.resize(FLAGS_observer_scan_thread_num); + for (size_t idx = 0; idx < scan_thread_list_.size(); ++idx) { + scan_thread_list_[idx].Start(std::bind(&Scanner::DoScan, this, it->first, it->second)); + if (column_map.end() == ++it) { + it = column_map.begin(); + } + } + + return true; +} + +bool Scanner::Close() { + return true; +} + +bool Scanner::ParseNotifyQualifier(const std::string& notify_qualifier, + std::string* data_family, + std::string* data_qualifier) { + // = + + std::string delim = "+"; + std::vector frags; + std::size_t pos = std::string::npos; + std::size_t start_pos = 0; + std::string frag = ""; + while (std::string::npos != (pos = notify_qualifier.find(delim, start_pos))) { + frag = notify_qualifier.substr(start_pos, pos - start_pos); + frags.push_back(frag); + start_pos = pos + 1; + } + std::size_t str_len = notify_qualifier.length(); + if (start_pos <= str_len - 1) { + frag = notify_qualifier.substr(start_pos, str_len - start_pos); + frags.push_back(frag); + } + + if (2 != frags.size()) { + return false; + } + *data_family = frags[0]; + *data_qualifier = frags[1]; + return true; +} + +void Scanner::DoScan(tera::Table* table, ColumnSet& column_set) { + std::string start_key; + RandomStartKey(table, &start_key); + while (true) { + if (executor_impl_->GetQuit()) { + return; + } + if (!ScanTable(table, column_set, start_key, "")) { + // scan失败重新随机选startkey + RandomStartKey(table, &start_key); + } else { + // scan正常结束从表头继续 + start_key = ""; + } + ThisThread::Sleep(1); + } +} + +bool Scanner::ScanTable(tera::Table* table, + ColumnSet& column_set, + const std::string& start_key, + const std::string& end_key) { + bool ret = true; + tera::ScanDescriptor desc(start_key); + desc.SetAsync(FLAGS_observer_scan_async_switch); + // Notify列存储在单独lg + desc.AddColumnFamily(FLAGS_observer_notify_column_name); + tera::ErrorCode err; + tera::ResultStream* result_stream = table->Scan(desc, &err); + if (NULL == result_stream || tera::ErrorCode::kOK != err.GetType()) { + LOG(ERROR) << "table scan init failed"; + return false; + } + while (!result_stream->Done(&err)) { + if (executor_impl_->GetQuit()) { + return true; + } + if (tera::ErrorCode::kOK != err.GetType()) { + LOG(ERROR) << "table scanning failed"; + ret = false; + break; + } + + // 控制scanner给observer发送数据的速度 + while (executor_impl_->ProcTaskPendingFull()) { + if (executor_impl_->GetQuit()) { + return true; + } + ThisThread::Sleep(1); + } + + std::string ob_family; + std::string ob_qualifier; + std::string rowkey = result_stream->RowName(); + // 遍历cell + while (result_stream->RowName() == rowkey) { + if (ParseNotifyQualifier(result_stream->Qualifier(), &ob_family, &ob_qualifier)) { + Column ob_column = {table->GetName(), ob_family, ob_qualifier}; + if (column_set.end() != column_set.find(ob_column)) { + TuplePtr tuple(new Tuple()); + // 创建跨行事务 + tuple->t = tera::NewTransaction(); + tuple->table = table; + tuple->row = rowkey; + tuple->observed_column = ob_column; + read_thread_pool_->AddTask(std::bind(&Scanner::DoReadValue, this, tuple)); + } else { + LOG(ERROR) << "miss observed column, table_name" << table->GetName() << + " cf=" << ob_family << " qu=" << ob_qualifier; + } + } else { + LOG(ERROR) << "parse notify qualifier failed: " << result_stream->Qualifier(); + } + + result_stream->Next(); + if (result_stream->Done(&err)) { + break; + } + } + } + delete result_stream; + return ret; +} + +bool Scanner::DoReadValue(TuplePtr tuple) { + bool ret = true; + tera::RowReader* row_reader = tuple->table->NewRowReader(tuple->row); + row_reader->AddColumn(tuple->observed_column.family, tuple->observed_column.qualifier); + // 事务读 + tuple->t->Get(row_reader); + if (tera::ErrorCode::kOK == row_reader->GetError().GetType()) { + tuple->value = row_reader->Value(); + tuple->timestamp = row_reader->Timestamp(); + // 触发observer计算 + executor_impl_->Process(tuple); + } else { + LOG(ERROR) << "[read failed] cf=" << tuple->observed_column.family << + " qu=" << tuple->observed_column.qualifier << " row=" << tuple->row << + " err=" << row_reader->GetError().GetReason(); + ret = false; + } + delete row_reader; + return ret; +} + +bool Scanner::RandomStartKey(tera::Table* table, std::string* Key) { + /// todo + *Key = ""; + return true; +} + +} // namespace observer diff --git a/src/observer/executor/scanner.h b/src/observer/executor/scanner.h new file mode 100644 index 000000000..462e84317 --- /dev/null +++ b/src/observer/executor/scanner.h @@ -0,0 +1,54 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef OBSERVER_SCANNER_H_ +#define OBSERVER_SCANNER_H_ + +#include "common/thread.h" +#include "executor_impl.h" + +namespace observer { + +class ExecutorImpl; + +class Scanner { +public: + Scanner(ExecutorImpl* executor_impl); + ~Scanner(); + + bool Init(); + + bool Close(); + + // 执行scan操作 + void DoScan(tera::Table* table, ColumnSet& column_set); + +private: + Scanner(const Scanner&); + void operator=(const Scanner&); + + // 数据列family+qualifier构成通知列的qualifier + bool ParseNotifyQualifier(const std::string& notify_qualifier, + std::string* data_family, + std::string* data_qualfier); + + // table的一次scan + bool ScanTable(tera::Table* table, + ColumnSet& column_set, + const std::string& start_key, + const std::string& end_key); + + bool DoReadValue(TuplePtr tuple); + + bool RandomStartKey(tera::Table* table, std::string* Key); + +private: + ExecutorImpl* executor_impl_; + std::vector scan_thread_list_; + scoped_ptr read_thread_pool_; +}; + +} // namespace observer + +#endif // OBSERVER_SCANNER_H_ diff --git a/src/observer/executor/tuple.h b/src/observer/executor/tuple.h new file mode 100644 index 000000000..3cd7a1616 --- /dev/null +++ b/src/observer/executor/tuple.h @@ -0,0 +1,45 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef OBSERVER_TUPLE_H_ +#define OBSERVER_TUPLE_H_ + +#include +#include +#include +#include "tera.h" +#include "observer/observer.h" + +namespace observer { + +typedef std::set ColumnSet; + +struct Tuple { + Tuple() : t(NULL), table(NULL) {} + ~Tuple() { + // release Transaction + if (t) { + delete t; + } + } + // 跨行事务 + tera::Transaction* t; + // Tera表 + tera::Table* table; + // 行Key + std::string row; + // 被观察列 + Column observed_column; + // 列值 + std::string value; + // 时间戳 + int64_t timestamp; +}; + +typedef boost::shared_ptr TuplePtr; +typedef std::vector Tuples; + +} // namespace observer + +#endif // OBSERVER_TUPLE_H_ diff --git a/src/observer/observer_demo.cc b/src/observer/observer_demo.cc new file mode 100644 index 000000000..53d4f2a3a --- /dev/null +++ b/src/observer/observer_demo.cc @@ -0,0 +1,145 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include +#include +#include "observer/executor.h" +#include "observer/observer.h" + +namespace observer { +namespace demo { + +/// Parser Worker /// +class Parser : public observer::Observer { +public: + Parser(const std::string& observer_name, + observer::ColumnList& observed_columns): + observer::Observer(observer_name, observed_columns) {} + virtual ~Parser() {} + virtual bool OnNotify(tera::Transaction* t, + tera::Table* table, + const std::string& row, + const observer::Column& column, + const std::string& value, + int64_t timestamp); + virtual bool Init() { return true; } + virtual bool Close() { return true; } +private: +}; + +bool Parser::OnNotify(tera::Transaction* t, + tera::Table* table, + const std::string& row, + const observer::Column& column, + const std::string& value, + int64_t timestamp) { + LOG(ERROR) << "[Notify Parser] table:family:qualifer=" << column.table_name << ":" << + column.family << ":" << column.qualifier << " row=" << row << + " value=" << value << " timestamp=" << timestamp; + // todo: read other columns ... + // write ForwordIndex column + tera::RowMutation* mutation = table->NewRowMutation(row); + mutation->Put("Data", "ForwordIndex", "FIValue_" + row); + t->ApplyMutation(mutation); + // t->Notify() + // t->Ack() + t->Commit(); + delete mutation; + + // notify downstream observers, equal to t->Notify() + observer::ColumnList notify_columns; + observer::Column c1 = {"observer_test_table", "Data", "ForwordIndex"}; + notify_columns.push_back(c1); + // GetStartTimestamp接口暂不支持 + // Notify(notify_columns, row, t->GetStartTimestamp()); + Notify(notify_columns, row, -1); + // clear notification, equal to t->Ack() + observer::ColumnList ack_columns; + observer::Column c2 = {"observer_test_table", "Data", "Page"}; + observer::Column c3 = {"observer_test_table", "Data", "Link"}; + // ... + ack_columns.push_back(c2); + ack_columns.push_back(c3); + // GetStartTimestamp接口暂不支持 + // Ack(ack_columns, row, t->GetStartTimestamp()); + Ack(ack_columns, row, -1); + + return true; +} + +/// Builder Worker /// +class Builder : public observer::Observer { +public: + Builder(const std::string& observer_name, + observer::ColumnList& observed_columns): + observer::Observer(observer_name, observed_columns) {} + virtual ~Builder() {} + virtual bool OnNotify(tera::Transaction* t, + tera::Table* table, + const std::string& row, + const observer::Column& column, + const std::string& value, + int64_t timestamp); + virtual bool Init() { return true; } + virtual bool Close() { return true; } +private: +}; + +bool Builder::OnNotify(tera::Transaction* t, + tera::Table* table, + const std::string& row, + const observer::Column& column, + const std::string& value, + int64_t timestamp) { + LOG(ERROR) << "[Notify Builder] table:family:qualifer=" << column.table_name << ":" << + column.family << ":" << column.qualifier << " row=" << row << + " value=" << value << " timestamp=" << timestamp; + + // todo: read other columns ... + // todo: write InvertIndex columns ... + // t->Notify() + // t->Ack() + // t->Commit(); + + // clear notification, equal to t->Ack() + observer::ColumnList ack_columns; + observer::Column c1 = {"observer_test_table", "Data", "ForwordIndex"}; + ack_columns.push_back(c1); + // GetStartTimestamp接口暂不支持 + // Ack(ack_columns, row, t->GetStartTimestamp()); + Ack(ack_columns, row, -1); + + return true; +} + +} // namespace demo +} // namespace observer + +int main(int argc, char** argv) { + google::ParseCommandLineFlags(&argc, &argv, true); + + // 创建observer + // 传入唯一标示名和需要观察的列(table_name+family+qualifier) + observer::ColumnList columns; + observer::Column c1 = {"observer_test_table", "Data", "Page"}; + observer::Column c2 = {"observer_test_table", "Data", "Link"}; + columns.push_back(c1); + columns.push_back(c2); + observer::demo::Parser parser("Parser", columns); + + columns.clear(); + observer::Column c3 = {"observer_test_table", "Data", "ForwordIndex"}; + columns.push_back(c3); + observer::demo::Builder builder("Builder", columns); + + // 注册并启动observers + observer::Executor* executor = observer::Executor::NewExecutor(); + executor->RegisterObserver(&parser); + executor->RegisterObserver(&builder); + executor->Run(); + + return 0; +} + +/* vim: set ts=4 sw=4 sts=4 tw=100 */ From 06850823ac0357140580e3dc65143262a71e4b90 Mon Sep 17 00:00:00 2001 From: "yipeng01@baidu.com" Date: Wed, 31 May 2017 17:23:18 +0800 Subject: [PATCH 2/6] add observer --- Makefile | 26 +++++++++++++++++++------- src/tera_flags.cc | 9 +++++++++ 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/Makefile b/Makefile index 2644902ef..a9c212ffd 100644 --- a/Makefile +++ b/Makefile @@ -48,6 +48,8 @@ TEST_SRC := src/utils/test/prop_tree_test.cc src/utils/test/tprinter_test.cc \ src/io/test/tablet_io_test.cc src/io/test/tablet_scanner_test.cc \ src/master/test/master_impl_test.cc src/io/test/load_test.cc \ src/common/test/thread_pool_test.cc +OBSERVER_SRC := $(wildcard src/observer/executor/*.cc) +OBSERVER_DEMO_SRC := $(wildcard src/observer/observer_demo.cc) TEST_OUTPUT := test_output UNITTEST_OUTPUT := $(TEST_OUTPUT)/unittest @@ -69,14 +71,17 @@ MONITOR_OBJ := $(MONITOR_SRC:.cc=.o) MARK_OBJ := $(MARK_SRC:.cc=.o) HTTP_OBJ := $(HTTP_SRC:.cc=.o) TEST_OBJ := $(TEST_SRC:.cc=.o) +OBSERVER_OBJ := $(OBSERVER_SRC:.cc=.o) +OBSERVER_DEMO_OBJ := $(OBSERVER_DEMO_SRC:.cc=.o) ALL_OBJ := $(MASTER_OBJ) $(TABLETNODE_OBJ) $(IO_OBJ) $(SDK_OBJ) $(PROTO_OBJ) \ $(JNI_TERA_OBJ) $(OTHER_OBJ) $(COMMON_OBJ) $(SERVER_OBJ) $(CLIENT_OBJ) \ $(TEST_CLIENT_OBJ) $(TERA_C_OBJ) $(MONITOR_OBJ) $(MARK_OBJ) $(TEST_OBJ) \ - $(SERVER_WRAPPER_OBJ) + $(SERVER_WRAPPER_OBJ) \ + $(OBSERVER_OBJ) $(OBSERVER_DEMO_OBJ) LEVELDB_LIB := src/leveldb/libleveldb.a LEVELDB_UTIL := src/leveldb/util/histogram.o src/leveldb/port/port_posix.o -PROGRAM = tera_main tera_master tabletserver teracli teramo tera_test +PROGRAM = tera_main tera_master tabletserver teracli teramo tera_test observer_demo LIBRARY = libtera.a SOLIBRARY = libtera.so TERA_C_SO = libtera_c.so @@ -85,13 +90,14 @@ BENCHMARK = tera_bench tera_mark TESTS = prop_tree_test tprinter_test string_util_test tablet_io_test \ tablet_scanner_test fragment_test progress_bar_test master_impl_test load_test \ thread_pool_test +OBSERVER_LIBRARY = libobserver.a .PHONY: all clean cleanall test -all: $(PROGRAM) $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) $(BENCHMARK) +all: $(PROGRAM) $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) $(BENCHMARK) $(OBSERVER_LIBRARY) mkdir -p build/include build/lib build/bin build/log build/benchmark cp $(PROGRAM) build/bin - cp $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) build/lib + cp $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) $(OBSERVER_LIBRARY) build/lib cp src/leveldb/tera_bench . cp -r benchmark/*.sh benchmark/ycsb4tera/ $(BENCHMARK) build/benchmark cp -r include build/ @@ -113,7 +119,7 @@ check: test clean: rm -rf $(ALL_OBJ) $(PROTO_OUT_CC) $(PROTO_OUT_H) $(TEST_OUTPUT) $(MAKE) clean -C src/leveldb - rm -rf $(PROGRAM) $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) $(BENCHMARK) $(TESTS) terahttp + rm -rf $(PROGRAM) $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) $(BENCHMARK) $(TESTS) terahttp $(OBSERVER_LIBRARY) cleanall: $(MAKE) clean @@ -162,6 +168,12 @@ src/leveldb/libleveldb.a: FORCE tera_bench: +libobserver.a: $(OBSERVER_OBJ) + $(AR) -rs $@ $^ + +observer_demo : $(OBSERVER_DEMO_OBJ) $(OBSERVER_LIBRARY) $(LIBRARY) + $(CXX) -o $@ $^ $(LDFLAGS) + # unit test thread_pool_test: src/common/test/thread_pool_test.o $(LIBRARY) $(CXX) -o $@ $^ $(LDFLAGS) @@ -216,8 +228,8 @@ proto: $(PROTO_OUT_CC) $(PROTO_OUT_H) # install output into system directories .PHONY: install -install: $(PROGRAM) $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) +install: $(PROGRAM) $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) $(OBSERVER_LIBRARY) mkdir -p $(INSTALL_PREFIX)/bin $(INSTALL_PREFIX)/include $(INSTALL_PREFIX)/lib cp -rf $(PROGRAM) $(INSTALL_PREFIX)/bin cp -rf include/* $(INSTALL_PREFIX)/include - cp -rf $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) $(INSTALL_PREFIX)/lib + cp -rf $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) $(OBSERVER_LIBRARY) $(INSTALL_PREFIX)/lib diff --git a/src/tera_flags.cc b/src/tera_flags.cc index efe0201e0..988493cf6 100644 --- a/src/tera_flags.cc +++ b/src/tera_flags.cc @@ -281,3 +281,12 @@ DEFINE_int64(tera_sdk_status_timeout, 600, "(s) check tablet/tabletnode status t DEFINE_string(tera_http_port, "8657", "the http proxy port of tera"); DEFINE_int32(tera_http_request_thread_num, 30, "the http proxy thread num for handle client request"); DEFINE_int32(tera_http_ctrl_thread_num, 10, "the http proxy thread num for it self"); + +//////// observer /////// +DEFINE_string(observer_tera_flag_file, "./tera.flag", "tera flag file"); +DEFINE_int32(observer_proc_thread_num, 100, "the max number of user process thread"); +DEFINE_int64(observer_proc_pending_num_max, 10000, "the max number of pending process task"); +DEFINE_int32(observer_scan_thread_num, 3, "the max number of table scan thread"); +DEFINE_bool(observer_scan_async_switch, false, "enable to async scan table"); +DEFINE_int32(observer_read_thread_num, 200, "the max number of table read thread"); +DEFINE_string(observer_notify_column_name, "Notify", "the column family name of notification"); From c0a7a206aa6eac8e28a8740bb05df019fa9fc187 Mon Sep 17 00:00:00 2001 From: "yipeng01@baidu.com" Date: Mon, 5 Jun 2017 17:00:38 +0800 Subject: [PATCH 3/6] update observer src --- Makefile | 8 +- include/observer/executor.h | 5 +- src/observer/executor/executor_impl.cc | 38 +++------ src/observer/executor/executor_impl.h | 12 +-- src/observer/executor/scanner.cc | 6 +- src/observer/observer_demo.cc | 17 ++-- src/observer/test/observer_impl_test.cc | 105 ++++++++++++++++++++++++ src/tera_flags.cc | 2 +- 8 files changed, 146 insertions(+), 47 deletions(-) create mode 100644 src/observer/test/observer_impl_test.cc diff --git a/Makefile b/Makefile index a9c212ffd..570bf08d9 100644 --- a/Makefile +++ b/Makefile @@ -47,7 +47,8 @@ MARK_SRC := src/benchmark/mark.cc src/benchmark/mark_main.cc TEST_SRC := src/utils/test/prop_tree_test.cc src/utils/test/tprinter_test.cc \ src/io/test/tablet_io_test.cc src/io/test/tablet_scanner_test.cc \ src/master/test/master_impl_test.cc src/io/test/load_test.cc \ - src/common/test/thread_pool_test.cc + src/common/test/thread_pool_test.cc \ + src/observer/test/observer_impl_test.cc OBSERVER_SRC := $(wildcard src/observer/executor/*.cc) OBSERVER_DEMO_SRC := $(wildcard src/observer/observer_demo.cc) @@ -89,7 +90,7 @@ JNILIBRARY = libjni_tera.so BENCHMARK = tera_bench tera_mark TESTS = prop_tree_test tprinter_test string_util_test tablet_io_test \ tablet_scanner_test fragment_test progress_bar_test master_impl_test load_test \ - thread_pool_test + thread_pool_test observer_impl_test OBSERVER_LIBRARY = libobserver.a .PHONY: all clean cleanall test @@ -175,6 +176,9 @@ observer_demo : $(OBSERVER_DEMO_OBJ) $(OBSERVER_LIBRARY) $(LIBRARY) $(CXX) -o $@ $^ $(LDFLAGS) # unit test +observer_impl_test: src/observer/test/observer_impl_test.o $(OBSERVER_LIBRARY) $(LIBRARY) + $(CXX) -o $@ $^ $(LDFLAGS) + thread_pool_test: src/common/test/thread_pool_test.o $(LIBRARY) $(CXX) -o $@ $^ $(LDFLAGS) diff --git a/include/observer/executor.h b/include/observer/executor.h index c806c01e3..7b8acbd4e 100644 --- a/include/observer/executor.h +++ b/include/observer/executor.h @@ -20,7 +20,10 @@ class Executor { // 启动接口 virtual bool Run() = 0; - + + // 退出接口 + virtual void Quit() = 0; + Executor() {} virtual ~Executor() {} diff --git a/src/observer/executor/executor_impl.cc b/src/observer/executor/executor_impl.cc index 756beb8ca..02960858b 100644 --- a/src/observer/executor/executor_impl.cc +++ b/src/observer/executor/executor_impl.cc @@ -15,7 +15,7 @@ DECLARE_string(observer_notify_column_name); namespace observer { static tera::Client* g_tera_client = NULL; -static Mutex g_table_mutex; +// static Mutex g_table_mutex; static TableMap g_table_map; Executor* Executor::NewExecutor() { @@ -49,7 +49,7 @@ bool ExecutorImpl::RegisterObserver(Observer* observer) { ColumnMap& column_map = observer->GetColumnMap(); ColumnMap::iterator it = column_map.begin(); for (; it != column_map.end(); ++it) { - MutexLock locker(&g_table_mutex); + // MutexLock locker(&g_table_mutex); if (g_table_map.end() == g_table_map.find(it->first)) { // init table tera::Table* new_table = g_tera_client->OpenTable(it->first, &err); @@ -79,7 +79,12 @@ bool ExecutorImpl::Run() { LOG(ERROR) << "no observer, please register observers first"; return false; } - + + // init observers (user definition) + for (ObserverSet::iterator it = observer_set_.begin(); it != observer_set_.end(); ++it) { + (*it)->Init(); + } + // init scanner scanner_ = new Scanner(this); if (!scanner_->Init()) { @@ -87,39 +92,22 @@ bool ExecutorImpl::Run() { Quit(); return false; } - - // init observers (user definition) - for (ObserverSet::iterator it = observer_set_.begin(); it != observer_set_.end(); ++it) { - (*it)->Init(); - } while (!quit_) { ThisThread::Sleep(1); } - // close observers (user definition) - for (ObserverSet::iterator it = observer_set_.begin(); it != observer_set_.end(); ++it) { - (*it)->Close(); - } - // close scanner if (scanner_ != NULL) { scanner_->Close(); delete scanner_; } - - // close table - for (TableMap::iterator it = g_table_map.begin(); it != g_table_map.end(); ++it) { - if (it->second != NULL) { - delete it->second; - } - } - // close tera client - if (g_tera_client != NULL) { - delete g_tera_client; + // close observers (user definition) + for (ObserverSet::iterator it = observer_set_.begin(); it != observer_set_.end(); ++it) { + (*it)->Close(); } - + return true; } @@ -161,7 +149,7 @@ bool ExecutorImpl::SetOrClearNotification(ColumnList& columns, // reduce columns ColumnReduceMap reduce_map; for (size_t idx = 0; idx < columns.size(); ++idx) { - MutexLock locker(&g_table_mutex); + // MutexLock locker(&g_table_mutex); TableMap::iterator it = g_table_map.find(columns[idx].table_name); if (g_table_map.end() == it) { tera::Table* new_table = g_tera_client->OpenTable(columns[idx].table_name, &err); diff --git a/src/observer/executor/executor_impl.h b/src/observer/executor/executor_impl.h index 135e02ce4..32a2732ba 100644 --- a/src/observer/executor/executor_impl.h +++ b/src/observer/executor/executor_impl.h @@ -42,15 +42,15 @@ class ExecutorImpl : public Executor { // 启动接口 virtual bool Run(); - - bool Process(TuplePtr tuple); - bool ProcTaskPendingFull(); - - // 进程退出 - void Quit() { + // 退出接口 + virtual void Quit() { quit_ = true; } + + bool Process(TuplePtr tuple); + + bool ProcTaskPendingFull(); bool GetQuit() const { return quit_; diff --git a/src/observer/executor/scanner.cc b/src/observer/executor/scanner.cc index 09b6dd64b..89e7df628 100644 --- a/src/observer/executor/scanner.cc +++ b/src/observer/executor/scanner.cc @@ -42,6 +42,9 @@ bool Scanner::Init() { } bool Scanner::Close() { + for (size_t idx = 0; idx < scan_thread_list_.size(); ++idx) { + scan_thread_list_[idx].Join(); + } return true; } @@ -108,7 +111,8 @@ bool Scanner::ScanTable(tera::Table* table, } while (!result_stream->Done(&err)) { if (executor_impl_->GetQuit()) { - return true; + ret = true; + break; } if (tera::ErrorCode::kOK != err.GetType()) { LOG(ERROR) << "table scanning failed"; diff --git a/src/observer/observer_demo.cc b/src/observer/observer_demo.cc index 53d4f2a3a..2561ab4c3 100644 --- a/src/observer/observer_demo.cc +++ b/src/observer/observer_demo.cc @@ -23,9 +23,6 @@ class Parser : public observer::Observer { const observer::Column& column, const std::string& value, int64_t timestamp); - virtual bool Init() { return true; } - virtual bool Close() { return true; } -private: }; bool Parser::OnNotify(tera::Transaction* t, @@ -34,9 +31,11 @@ bool Parser::OnNotify(tera::Transaction* t, const observer::Column& column, const std::string& value, int64_t timestamp) { - LOG(ERROR) << "[Notify Parser] table:family:qualifer=" << column.table_name << ":" << - column.family << ":" << column.qualifier << " row=" << row << + LOG(INFO) << "[Notify Parser] table:family:qualifer=" << + column.table_name << ":" << column.family << ":" << + column.qualifier << " row=" << row << " value=" << value << " timestamp=" << timestamp; + // todo: read other columns ... // write ForwordIndex column tera::RowMutation* mutation = table->NewRowMutation(row); @@ -54,6 +53,7 @@ bool Parser::OnNotify(tera::Transaction* t, // GetStartTimestamp接口暂不支持 // Notify(notify_columns, row, t->GetStartTimestamp()); Notify(notify_columns, row, -1); + // clear notification, equal to t->Ack() observer::ColumnList ack_columns; observer::Column c2 = {"observer_test_table", "Data", "Page"}; @@ -61,7 +61,6 @@ bool Parser::OnNotify(tera::Transaction* t, // ... ack_columns.push_back(c2); ack_columns.push_back(c3); - // GetStartTimestamp接口暂不支持 // Ack(ack_columns, row, t->GetStartTimestamp()); Ack(ack_columns, row, -1); @@ -81,9 +80,6 @@ class Builder : public observer::Observer { const observer::Column& column, const std::string& value, int64_t timestamp); - virtual bool Init() { return true; } - virtual bool Close() { return true; } -private: }; bool Builder::OnNotify(tera::Transaction* t, @@ -92,7 +88,7 @@ bool Builder::OnNotify(tera::Transaction* t, const observer::Column& column, const std::string& value, int64_t timestamp) { - LOG(ERROR) << "[Notify Builder] table:family:qualifer=" << column.table_name << ":" << + LOG(INFO) << "[Notify Builder] table:family:qualifer=" << column.table_name << ":" << column.family << ":" << column.qualifier << " row=" << row << " value=" << value << " timestamp=" << timestamp; @@ -106,7 +102,6 @@ bool Builder::OnNotify(tera::Transaction* t, observer::ColumnList ack_columns; observer::Column c1 = {"observer_test_table", "Data", "ForwordIndex"}; ack_columns.push_back(c1); - // GetStartTimestamp接口暂不支持 // Ack(ack_columns, row, t->GetStartTimestamp()); Ack(ack_columns, row, -1); diff --git a/src/observer/test/observer_impl_test.cc b/src/observer/test/observer_impl_test.cc new file mode 100644 index 000000000..04d28a238 --- /dev/null +++ b/src/observer/test/observer_impl_test.cc @@ -0,0 +1,105 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include +#include +#include +#include +#include "observer/executor.h" +#include "observer/observer.h" +#include "utils/utils_cmd.h" +#include "common/thread.h" + +namespace observer { + +class TestWorker : public Observer { +public: + TestWorker(const std::string& observer_name, ColumnList& observed_columns): + Observer(observer_name, observed_columns), notified_(false) {} + virtual ~TestWorker() {} + virtual bool OnNotify(tera::Transaction* t, + tera::Table* table, + const std::string& row, + const Column& column, + const std::string& value, + int64_t timestamp) { + row_ = row; + column_ = column; + value_ = value; + timestamp_ = timestamp; + notified_ = true; + return true; + } + std::string row_; + Column column_; + std::string value_; + int64_t timestamp_; + + std::atomic notified_; +}; + +class ObserverImplTest : public ::testing::Test { +public: + void OnNotifyTest() { + tera::ErrorCode err; + tera::Client* client = tera::Client::NewClient("./tera.flag", &err); + if (tera::ErrorCode::kOK != err.GetType()) { + LOG(ERROR) << "new client failed"; + return; + } + tera::Table* table = client->OpenTable("observer_test_table", &err); + if (tera::ErrorCode::kOK != err.GetType()) { + LOG(ERROR) << "open table failed"; + return; + } + tera::Transaction* t = tera::NewTransaction(); + assert(t != NULL); + tera::RowMutation* mu0 = table->NewRowMutation("www.baidu.com"); + mu0->Put("Data", "Page", "hello world"); + t->ApplyMutation(mu0); + t->Commit(); + tera::RowMutation* mu1 = table->NewRowMutation("www.baidu.com"); + // mu1->Put("Notify", "Data+Page", "1", t->GetStartTimestamp()); + mu1->Put("Notify", "Data+Page", "1", -1); + table->ApplyMutation(mu1); + delete t; + delete mu0; + delete mu1; + + ColumnList columns; + Column c = {"observer_test_table", "Data", "Page"}; + columns.push_back(c); + TestWorker worker("TestWorker", columns); + + Executor* executor = Executor::NewExecutor(); + executor->RegisterObserver(&worker); + run_thread_.Start(std::bind(&ObserverImplTest::DoRun, this, executor)); + while (!worker->notified_) { + sleep(1); + } + executor->Quit(); + EXPECT_TRUE(("www.baidu.com" == worker.row_) && + ("observer_test_table" == worker.column_.table_name) && + ("Data" == worker.column_.family) && + ("Page" == worker.column_.qualifier) && + ("hello world" == worker.value_)); + } +private: + void DoRun(Executor* executor) { + executor->Run(); + } + common::Thread run_thread_; +}; + +TEST_F(ObserverImplTest, OnNotifyTest) { + OnNotifyTest(); +} + +} // namespace observer + +int main(int argc, char** argv) { + ::google::ParseCommandLineFlags(&argc, &argv, true); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/tera_flags.cc b/src/tera_flags.cc index 988493cf6..628bc5653 100644 --- a/src/tera_flags.cc +++ b/src/tera_flags.cc @@ -286,7 +286,7 @@ DEFINE_int32(tera_http_ctrl_thread_num, 10, "the http proxy thread num for it se DEFINE_string(observer_tera_flag_file, "./tera.flag", "tera flag file"); DEFINE_int32(observer_proc_thread_num, 100, "the max number of user process thread"); DEFINE_int64(observer_proc_pending_num_max, 10000, "the max number of pending process task"); -DEFINE_int32(observer_scan_thread_num, 3, "the max number of table scan thread"); +DEFINE_int32(observer_scan_thread_num, 1, "the max number of table scan thread"); DEFINE_bool(observer_scan_async_switch, false, "enable to async scan table"); DEFINE_int32(observer_read_thread_num, 200, "the max number of table read thread"); DEFINE_string(observer_notify_column_name, "Notify", "the column family name of notification"); From fad2158b5b23f25b472331979ed6ebdebabbd09b Mon Sep 17 00:00:00 2001 From: "yipeng01@baidu.com" Date: Mon, 5 Jun 2017 19:43:04 +0800 Subject: [PATCH 4/6] update observer test src --- src/observer/test/observer_impl_test.cc | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/observer/test/observer_impl_test.cc b/src/observer/test/observer_impl_test.cc index 04d28a238..7eabfd450 100644 --- a/src/observer/test/observer_impl_test.cc +++ b/src/observer/test/observer_impl_test.cc @@ -16,7 +16,7 @@ namespace observer { class TestWorker : public Observer { public: TestWorker(const std::string& observer_name, ColumnList& observed_columns): - Observer(observer_name, observed_columns), notified_(false) {} + Observer(observer_name, observed_columns), counter_(0), notified_(false) {} virtual ~TestWorker() {} virtual bool OnNotify(tera::Transaction* t, tera::Table* table, @@ -29,13 +29,21 @@ class TestWorker : public Observer { value_ = value; timestamp_ = timestamp; notified_ = true; + + ++counter_; + + observer::ColumnList ack_columns; + observer::Column c = {"observer_test_table", "Data", "Page"}; + ack_columns.push_back(c); + Ack(ack_columns, row, -1); + return true; } std::string row_; Column column_; std::string value_; int64_t timestamp_; - + std::atomic counter_; std::atomic notified_; }; @@ -75,7 +83,7 @@ class ObserverImplTest : public ::testing::Test { Executor* executor = Executor::NewExecutor(); executor->RegisterObserver(&worker); run_thread_.Start(std::bind(&ObserverImplTest::DoRun, this, executor)); - while (!worker->notified_) { + while (!worker.notified_) { sleep(1); } executor->Quit(); From ccf28d00b51589fffa07c1fd01a5f2e5942f3fba Mon Sep 17 00:00:00 2001 From: "yipeng01@baidu.com" Date: Tue, 6 Jun 2017 11:09:16 +0800 Subject: [PATCH 5/6] update observer src --- src/observer/executor/executor_impl.cc | 5 ++++- src/observer/executor/scanner.cc | 4 +++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/observer/executor/executor_impl.cc b/src/observer/executor/executor_impl.cc index 02960858b..6a5748417 100644 --- a/src/observer/executor/executor_impl.cc +++ b/src/observer/executor/executor_impl.cc @@ -118,7 +118,10 @@ bool ExecutorImpl::Process(TuplePtr tuple) { LOG(ERROR) << "no match observers, table=" << tuple->observed_column.table_name << " cf=" << tuple->observed_column.family << " qu=" << tuple->observed_column.qualifier; return false; - } + } + + // todo: analysis observers collision on Ack column + // notify observers for (size_t idx = 0; idx < it->second.size(); ++idx) { proc_thread_pool_->AddTask(std::bind(&ExecutorImpl::DoNotify, this, tuple, it->second[idx])); diff --git a/src/observer/executor/scanner.cc b/src/observer/executor/scanner.cc index 89e7df628..cb3236ecb 100644 --- a/src/observer/executor/scanner.cc +++ b/src/observer/executor/scanner.cc @@ -128,6 +128,8 @@ bool Scanner::ScanTable(tera::Table* table, ThisThread::Sleep(1); } + // todo: try lock row + std::string ob_family; std::string ob_qualifier; std::string rowkey = result_stream->RowName(); @@ -183,7 +185,7 @@ bool Scanner::DoReadValue(TuplePtr tuple) { } bool Scanner::RandomStartKey(tera::Table* table, std::string* Key) { - /// todo + // todo *Key = ""; return true; } From acba09f30453cd9244007e82689dc2ac7bb7c062 Mon Sep 17 00:00:00 2001 From: "yipeng01@baidu.com" Date: Thu, 8 Jun 2017 14:24:45 +0800 Subject: [PATCH 6/6] update observer src --- src/observer/executor/executor_impl.cc | 2 -- src/observer/executor/scanner.cc | 2 ++ 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/observer/executor/executor_impl.cc b/src/observer/executor/executor_impl.cc index 6a5748417..033993029 100644 --- a/src/observer/executor/executor_impl.cc +++ b/src/observer/executor/executor_impl.cc @@ -120,8 +120,6 @@ bool ExecutorImpl::Process(TuplePtr tuple) { return false; } - // todo: analysis observers collision on Ack column - // notify observers for (size_t idx = 0; idx < it->second.size(); ++idx) { proc_thread_pool_->AddTask(std::bind(&ExecutorImpl::DoNotify, this, tuple, it->second[idx])); diff --git a/src/observer/executor/scanner.cc b/src/observer/executor/scanner.cc index cb3236ecb..29a02b9b6 100644 --- a/src/observer/executor/scanner.cc +++ b/src/observer/executor/scanner.cc @@ -164,6 +164,8 @@ bool Scanner::ScanTable(tera::Table* table, } bool Scanner::DoReadValue(TuplePtr tuple) { + // todo: if observers conflict on Ack column, return + bool ret = true; tera::RowReader* row_reader = tuple->table->NewRowReader(tuple->row); row_reader->AddColumn(tuple->observed_column.family, tuple->observed_column.qualifier);