Skip to content
This repository has been archived by the owner on Aug 18, 2020. It is now read-only.

Commit

Permalink
Notice meta clear node offset after reconnection
Browse files Browse the repository at this point in the history
  • Loading branch information
CatKang committed Feb 9, 2018
1 parent e638973 commit f3ff64b
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 10 deletions.
34 changes: 25 additions & 9 deletions src/meta/zp_meta_info_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -664,19 +664,35 @@ Status ZPMetaInfoStore::UpdateNodeInfo(const ZPMeta::MetaCmd_Ping &ping) {
}

// Update offset
for (const auto& po : ping.offset()) {
bool has_clear = false;
for (int pindex = 0; pindex < ping.offset_size(); ++pindex) {
const auto& po = ping.offset(pindex);
std::string offset_key = NodeOffsetKey(po.table_name(), po.partition());
DLOG(INFO) << "update offset"
<< ", node: " << node
<< ", table partition: " << offset_key
<< ", offset: " << po.filenum() << "_" << po.offset();
if (po.filenum() == -1 || po.offset() == -1) {
if (po.table_name().empty() || po.partition() == -1) {
if (!has_clear
&& node_infos_.find(node) != node_infos_.end()) {
node_infos_.at(node).offsets.clear();
LOG(INFO) << "Clear all node offsets: "
<< " node: " << node;
// Traverse offset list from the beginning
// Since the order of protobuf repeated elements are not preserved
pindex = -1;
has_clear = true;
continue;
}
} else if (po.filenum() == -1 || po.offset() == -1) {
// Not in charge any more
LOG(INFO) << "Node not in charge any more: "
<< ", node: " << node
<< "node: " << node
<< ", table partiton: " << offset_key;
node_infos_.erase(offset_key);
if (node_infos_.find(node) != node_infos_.end()) {
node_infos_.at(node).offsets.erase(offset_key);
}
} else {
DLOG(INFO) << "update offset"
<< "node: " << node
<< ", table partition: " << offset_key
<< ", offset: " << po.filenum() << "_" << po.offset();
node_infos_[node].offsets[offset_key] = NodeOffset(po.filenum(),
po.offset());
}
Expand Down Expand Up @@ -745,7 +761,7 @@ Status ZPMetaInfoStore::GetNodeOffset(const ZPMeta::Node& node,
if (node_infos_.find(ip_port) == node_infos_.end()) {
return Status::NotFound("node not exist");
}
//LOG(INFO) << "node: " << node.ip() << ":" << node.port();
DLOG(INFO) << "node: " << node.ip() << ":" << node.port();
//node_infos_.at(ip_port).Dump();
return node_infos_.at(ip_port).GetOffset(table, partition_id, noffset);
}
Expand Down
12 changes: 11 additions & 1 deletion src/node/zp_ping_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ slash::Status ZPPingThread::Send() {
node->set_port(zp_data_server->local_port());
request.set_type(ZPMeta::Type::PING);

// Notice meta clear all offset of mine
if (last_offsets_.empty()) {
ZPMeta::SyncOffset *offset = ping->add_offset();
offset->set_table_name("");
offset->set_partition(-1);
offset->set_filenum(-1);
offset->set_offset(-1);
}

// Update meta
current_offsets_.clear();
zp_data_server->DumpTableBinlogOffsets("", &current_offsets_);
for (auto& item : current_offsets_) {
Expand All @@ -66,7 +76,7 @@ slash::Status ZPPingThread::Send() {
}
}

// Tell meta if we are not in charge of some partition any more
// Notice meta if we are not in charge of some partition any more
for (auto& last_item : last_offsets_) {
for (auto& last_p : last_item.second) {
if (current_offsets_.find(last_item.first) == current_offsets_.end()
Expand Down

0 comments on commit f3ff64b

Please sign in to comment.