diff --git a/src/meta/zp_meta_info_store.cc b/src/meta/zp_meta_info_store.cc index 408c6fb..835ba0b 100644 --- a/src/meta/zp_meta_info_store.cc +++ b/src/meta/zp_meta_info_store.cc @@ -664,19 +664,35 @@ Status ZPMetaInfoStore::UpdateNodeInfo(const ZPMeta::MetaCmd_Ping &ping) { } // Update offset - for (const auto& po : ping.offset()) { + bool has_clear = false; + for (int pindex = 0; pindex < ping.offset_size(); ++pindex) { + const auto& po = ping.offset(pindex); std::string offset_key = NodeOffsetKey(po.table_name(), po.partition()); - DLOG(INFO) << "update offset" - << ", node: " << node - << ", table partition: " << offset_key - << ", offset: " << po.filenum() << "_" << po.offset(); - if (po.filenum() == -1 || po.offset() == -1) { + if (po.table_name().empty() || po.partition() == -1) { + if (!has_clear + && node_infos_.find(node) != node_infos_.end()) { + node_infos_.at(node).offsets.clear(); + LOG(INFO) << "Clear all node offsets: " + << " node: " << node; + // Traverse offset list from the beginning + // Since the order of protobuf repeated elements are not preserved + pindex = -1; + has_clear = true; + continue; + } + } else if (po.filenum() == -1 || po.offset() == -1) { // Not in charge any more LOG(INFO) << "Node not in charge any more: " - << ", node: " << node + << "node: " << node << ", table partiton: " << offset_key; - node_infos_.erase(offset_key); + if (node_infos_.find(node) != node_infos_.end()) { + node_infos_.at(node).offsets.erase(offset_key); + } } else { + DLOG(INFO) << "update offset" + << "node: " << node + << ", table partition: " << offset_key + << ", offset: " << po.filenum() << "_" << po.offset(); node_infos_[node].offsets[offset_key] = NodeOffset(po.filenum(), po.offset()); } @@ -745,7 +761,7 @@ Status ZPMetaInfoStore::GetNodeOffset(const ZPMeta::Node& node, if (node_infos_.find(ip_port) == node_infos_.end()) { return Status::NotFound("node not exist"); } - //LOG(INFO) << "node: " << node.ip() << ":" << node.port(); + DLOG(INFO) << "node: " << node.ip() << ":" << node.port(); //node_infos_.at(ip_port).Dump(); return node_infos_.at(ip_port).GetOffset(table, partition_id, noffset); } diff --git a/src/node/zp_ping_thread.cc b/src/node/zp_ping_thread.cc index daee2e5..41bbe77 100644 --- a/src/node/zp_ping_thread.cc +++ b/src/node/zp_ping_thread.cc @@ -50,6 +50,16 @@ slash::Status ZPPingThread::Send() { node->set_port(zp_data_server->local_port()); request.set_type(ZPMeta::Type::PING); + // Notice meta clear all offset of mine + if (last_offsets_.empty()) { + ZPMeta::SyncOffset *offset = ping->add_offset(); + offset->set_table_name(""); + offset->set_partition(-1); + offset->set_filenum(-1); + offset->set_offset(-1); + } + + // Update meta current_offsets_.clear(); zp_data_server->DumpTableBinlogOffsets("", ¤t_offsets_); for (auto& item : current_offsets_) { @@ -66,7 +76,7 @@ slash::Status ZPPingThread::Send() { } } - // Tell meta if we are not in charge of some partition any more + // Notice meta if we are not in charge of some partition any more for (auto& last_item : last_offsets_) { for (auto& last_p : last_item.second) { if (current_offsets_.find(last_item.first) == current_offsets_.end()