Skip to content

Commit c7f9eb8

Browse files
authored
Merge pull request #22 from lambdaliu/fix_service_expired
fix: fix service expired
2 parents b97a1ba + 546aef7 commit c7f9eb8

File tree

16 files changed

+189
-110
lines changed

16 files changed

+189
-110
lines changed

include/polaris/plugin.h

+4
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,10 @@ class LocalRegistry : public Plugin {
285285
virtual ReturnCode UpdateSetCircuitBreakerData(
286286
const ServiceKey& service_key, const CircuitBreakUnhealthySetsData& unhealthy_sets) = 0;
287287

288+
virtual ReturnCode GetCircuitBreakerInstances(const ServiceKey& service_key,
289+
ServiceData*& service_data,
290+
std::vector<Instance*>& open_instances) = 0;
291+
288292
/// @brief 更新服务实例状态,properties存放的是状态值,当前支持2个key
289293
///
290294
/// 1. ReadyToServe: 故障熔断标识,true or false

polaris/cache/rcu_map.h

+10-6
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class RcuMap {
7878
~RcuMap();
7979

8080
/// @brief 根据Key获取指向Value的指针,key不存在返回NULL
81-
Value* Get(const Key& key);
81+
Value* Get(const Key& key, bool update_access_time = true);
8282

8383
/// @brief 更新Key对应的Value
8484
/// 如果key对应的value已存在,则将旧的value加入待释放列表,内部线程会延迟一定时间释放
@@ -166,20 +166,24 @@ RcuMap<Key, Value>::~RcuMap() {
166166
}
167167

168168
template <typename Key, typename Value>
169-
Value* RcuMap<Key, Value>::Get(const Key& key) {
169+
Value* RcuMap<Key, Value>::Get(const Key& key, bool update_access_time) {
170170
// 查询read map,获取结果
171171
Value* read_result = NULL;
172172
InnerMap* current_read = read_map_;
173173
typename InnerMap::iterator it = current_read->find(key);
174174
if (it != current_read->end()) { // MapValue包含的value指针在整个过程中是可能改变的
175-
it->second->used_time_ = Time::GetCurrentTimeMs();
176-
read_result = it->second->value_;
175+
if (update_access_time) {
176+
it->second->used_time_ = Time::GetCurrentTimeMs();
177+
}
178+
read_result = it->second->value_;
177179
} else {
178180
// 从read map未读到数据,则加锁进行后续操作
179181
sync::MutexGuard mutex_guard(dirty_lock_);
180182
if ((it = dirty_map_->find(key)) != dirty_map_->end()) {
181-
it->second->used_time_ = Time::GetCurrentTimeMs();
182-
read_result = it->second->value_;
183+
if (update_access_time) {
184+
it->second->used_time_ = Time::GetCurrentTimeMs();
185+
}
186+
read_result = it->second->value_;
183187
if (read_map_ == current_read) {
184188
miss_time_++; // 记录read map读失败,dirty map读成功次数
185189
}

polaris/context.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -797,6 +797,7 @@ void ContextImpl::ClearCache() {
797797
}
798798
pthread_rwlock_unlock(&cache_rwlock_);
799799
}
800+
service_context_map_->CheckGc(min_access_time);
800801
}
801802

802803
} // namespace polaris

polaris/plugin/health_checker/health_checker.cpp

+15-20
Original file line numberDiff line numberDiff line change
@@ -138,38 +138,32 @@ ReturnCode HealthCheckerChainImpl::DetectInstance(CircuitBreakerChain& circuit_b
138138
}
139139

140140
ServiceData* service_data = NULL;
141-
local_registry_->GetServiceDataWithRef(service_key_, kServiceDataInstances, service_data);
142-
if (service_data == NULL) {
141+
std::vector<Instance*> health_check_instances;
142+
if (local_registry_->GetCircuitBreakerInstances(service_key_, service_data,
143+
health_check_instances) != kReturnOk) {
143144
return kReturnOk;
144145
}
145-
Service* service = service_data->GetService();
146+
146147
ServiceInstances service_instances(service_data);
147148
std::map<std::string, Instance*>& instance_map = service_instances.GetInstances();
148-
std::set<std::string> target_health_check_instances;
149149

150150
if (when_ == HealthCheckerConfig::kChainWhenAlways) {
151+
health_check_instances.clear();
151152
// 健康检查设置为always, 则探测所有非隔离实例
152153
for (std::map<std::string, Instance*>::iterator instance_iter = instance_map.begin();
153154
instance_iter != instance_map.end(); ++instance_iter) {
154155
if (!instance_iter->second->isIsolate()) {
155-
target_health_check_instances.insert(instance_iter->first);
156+
health_check_instances.push_back(instance_iter->second);
156157
}
157158
}
158-
} else if (when_ == HealthCheckerConfig::kChainWhenOnRecover) {
159-
// 健康检查设置为on_recover, 则探测半开实例
160-
target_health_check_instances = service->GetCircuitBreakerOpenInstances();
159+
} else if (when_ != HealthCheckerConfig::kChainWhenOnRecover) {
160+
// 健康检查设置不为on_recover, 则探测半开实例
161+
health_check_instances.clear();
161162
}
162-
for (std::set<std::string>::iterator it = target_health_check_instances.begin();
163-
it != target_health_check_instances.end(); ++it) {
164-
const std::string& instance_id = *it;
165-
std::map<std::string, Instance*>::iterator iter = instance_map.find(instance_id);
166-
if (iter == instance_map.end()) {
167-
POLARIS_LOG(LOG_INFO, "The health checker of service[%s/%s] getting instance[%s] failed",
168-
service_key_.namespace_.c_str(), service_key_.name_.c_str(), instance_id.c_str());
169-
continue;
170-
}
163+
164+
for (std::size_t i = 0; i < health_check_instances.size(); ++i) {
171165
bool is_detect_success = false;
172-
Instance* instance = iter->second;
166+
Instance* instance = health_check_instances[i];
173167
for (std::size_t i = 0; i < health_checker_list_.size(); ++i) {
174168
HealthChecker*& detector = health_checker_list_[i];
175169
DetectResult detector_result;
@@ -195,15 +189,16 @@ ReturnCode HealthCheckerChainImpl::DetectInstance(CircuitBreakerChain& circuit_b
195189
// 探活插件成功,则将熔断实例置为半开状态,其他实例状态不变
196190
// 探活插件失败,则将健康实例置为熔断状态,其他实例状态不变
197191
if (is_detect_success) {
198-
circuit_breaker_chain.TranslateStatus(instance_id, kCircuitBreakerOpen,
192+
circuit_breaker_chain.TranslateStatus(instance->GetId(), kCircuitBreakerOpen,
199193
kCircuitBreakerHalfOpen);
200194
POLARIS_LOG(LOG_INFO,
201195
"service[%s/%s] getting instance[%s-%s:%d] detectoring success, change to "
202196
"half-open status",
203197
service_key_.namespace_.c_str(), service_key_.name_.c_str(),
204198
instance->GetId().c_str(), instance->GetHost().c_str(), instance->GetPort());
205199
} else {
206-
circuit_breaker_chain.TranslateStatus(instance_id, kCircuitBreakerClose, kCircuitBreakerOpen);
200+
circuit_breaker_chain.TranslateStatus(instance->GetId(), kCircuitBreakerClose,
201+
kCircuitBreakerOpen);
207202
POLARIS_LOG(LOG_INFO,
208203
"service[%s/%s] getting instance[%s-%s:%d] detectoring failed, change to "
209204
"open status",

polaris/plugin/local_registry/local_registry.cpp

+81-30
Original file line numberDiff line numberDiff line change
@@ -138,20 +138,38 @@ void InMemoryRegistry::RunGcTask() {
138138
service_circuit_breaker_config_data_.CheckGc(min_gc_time);
139139
}
140140

141-
Service* InMemoryRegistry::GetOrCreateServiceInLock(const ServiceKey& service_key) {
141+
Service* InMemoryRegistry::CreateServiceInLock(const ServiceKey& service_key) {
142142
Service* service = NULL;
143143
pthread_rwlock_wrlock(&rwlock_);
144144
std::map<ServiceKey, Service*>::iterator service_it = service_cache_.find(service_key);
145-
if (service_it == service_cache_.end()) {
146-
service = new Service(service_key, ++next_service_id_);
147-
service_cache_[service_key] = service;
148-
} else {
145+
POLARIS_ASSERT(service_it == service_cache_.end())
146+
service = new Service(service_key, ++next_service_id_);
147+
service_cache_[service_key] = service;
148+
pthread_rwlock_unlock(&rwlock_);
149+
return service;
150+
}
151+
152+
Service* InMemoryRegistry::GetServiceInLock(const ServiceKey& service_key) {
153+
Service* service = NULL;
154+
pthread_rwlock_wrlock(&rwlock_);
155+
std::map<ServiceKey, Service*>::iterator service_it = service_cache_.find(service_key);
156+
if (service_it != service_cache_.end()) {
149157
service = service_it->second;
150158
}
151159
pthread_rwlock_unlock(&rwlock_);
152160
return service;
153161
}
154162

163+
void InMemoryRegistry::DeleteServiceInLock(const ServiceKey& service_key) {
164+
pthread_rwlock_wrlock(&rwlock_);
165+
std::map<ServiceKey, Service*>::iterator service_it = service_cache_.find(service_key);
166+
if (service_it != service_cache_.end()) {
167+
delete service_it->second;
168+
service_cache_.erase(service_it);
169+
}
170+
pthread_rwlock_unlock(&rwlock_);
171+
}
172+
155173
void InMemoryRegistry::CheckExpireServiceData(uint64_t min_access_time,
156174
RcuMap<ServiceKey, ServiceData>& rcu_cache,
157175
ServiceDataType service_data_type) {
@@ -166,16 +184,16 @@ void InMemoryRegistry::CheckExpireServiceData(uint64_t min_access_time,
166184
if (service_data_notify_map_.erase(service_key_with_type) > 0) { // 有通知对象表示注册过handler
167185
context_->GetServerConnector()->DeregisterEventHandler(expired_services[i],
168186
service_data_type);
169-
} else { // 没有通知对象,表示未注册过handler,从磁盘加载后从未访问过的数据,直接删除数据
170-
rcu_cache.Delete(expired_services[i]);
171-
context_impl->GetServiceRecord()->ServiceDataDelete(expired_services[i], service_data_type);
172-
context_impl->GetCacheManager()->GetCachePersist().PersistServiceData(expired_services[i],
173-
service_data_type, "");
174187
}
175-
pthread_rwlock_unlock(&notify_rwlock_);
176188
if (service_data_type == kServiceDataInstances) { // 清除实例数据时对应的服务级别插件也删除
177189
context_impl->DeleteServiceContext(expired_services[i]);
190+
DeleteServiceInLock(expired_services[i]);
178191
}
192+
rcu_cache.Delete(expired_services[i]);
193+
context_impl->GetServiceRecord()->ServiceDataDelete(expired_services[i], service_data_type);
194+
context_impl->GetCacheManager()->GetCachePersist().PersistServiceData(expired_services[i],
195+
service_data_type, "");
196+
pthread_rwlock_unlock(&notify_rwlock_);
179197
}
180198
}
181199

@@ -248,6 +266,9 @@ ReturnCode InMemoryRegistry::LoadServiceDataWithNotify(const ServiceKey& service
248266
if (interval_it != service_interval_map_.end()) {
249267
refresh_interval = interval_it->second;
250268
}
269+
if (data_type == kServiceDataInstances) {
270+
CreateServiceInLock(service_key);
271+
}
251272
// 先加载磁盘缓存数据
252273
CachePersist& cache_persist = context_->GetContextImpl()->GetCacheManager()->GetCachePersist();
253274
ServiceData* disk_service_data = cache_persist.LoadServiceData(service_key, data_type);
@@ -268,25 +289,21 @@ ReturnCode InMemoryRegistry::LoadServiceDataWithNotify(const ServiceKey& service
268289
return kReturnOk;
269290
}
270291

271-
void InMemoryRegistry::DeleteServiceInLock(const ServiceKey& service_key) {
272-
pthread_rwlock_wrlock(&rwlock_);
273-
std::map<ServiceKey, Service*>::iterator service_it = service_cache_.find(service_key);
274-
if (service_it != service_cache_.end()) {
275-
delete service_it->second;
276-
service_cache_.erase(service_it);
277-
}
278-
pthread_rwlock_unlock(&rwlock_);
279-
}
280-
281292
ReturnCode InMemoryRegistry::UpdateServiceData(const ServiceKey& service_key,
282293
ServiceDataType data_type,
283294
ServiceData* service_data) {
284-
if (service_data != NULL) { // 更新服务数据指向服务
285-
Service* service = GetOrCreateServiceInLock(service_key);
295+
Service* service = GetServiceInLock(service_key);
296+
if (service != NULL) { // 更新服务数据指向服务
286297
service->UpdateData(service_data);
287298
}
288299
ContextImpl* context_impl = context_->GetContextImpl();
289300
if (data_type == kServiceDataInstances) {
301+
if (service == NULL) { // 服务被反注册了
302+
if (service_data != NULL) {
303+
service_data->DecrementRef();
304+
}
305+
return kReturnOk;
306+
}
290307
ServiceData* old_service_data = service_instances_data_.Get(service_key);
291308
if (old_service_data != NULL) {
292309
PluginManager::Instance().OnPreUpdateServiceData(old_service_data, service_data);
@@ -307,13 +324,6 @@ ReturnCode InMemoryRegistry::UpdateServiceData(const ServiceKey& service_key,
307324
POLARIS_ASSERT(false);
308325
}
309326
if (service_data == NULL) { // Server Connector反注册Handler触发更新为NULL
310-
if (data_type == kServiceDataInstances) { // 删除服务实例数据时,同时删除服务
311-
DeleteServiceInLock(service_key);
312-
}
313-
context_impl->GetServiceRecord()->ServiceDataDelete(service_key,
314-
data_type); // 同步记录Service数据删除
315-
context_impl->GetCacheManager()->GetCachePersist().PersistServiceData(
316-
service_key, data_type, ""); // 异步删除磁盘服务数据
317327
return kReturnOk;
318328
}
319329
context_impl->GetServiceRecord()->ServiceDataUpdate(service_data); // 同步记录Service版本变化
@@ -389,4 +399,45 @@ ReturnCode InMemoryRegistry::UpdateSetCircuitBreakerData(
389399
return service->WriteCircuitBreakerUnhealthySets(unhealthy_sets);
390400
}
391401

402+
ReturnCode InMemoryRegistry::GetCircuitBreakerInstances(const ServiceKey& service_key,
403+
ServiceData*& service_data,
404+
std::vector<Instance*>& open_instances) {
405+
service_data = service_instances_data_.Get(service_key, false);
406+
if (service_data == NULL) {
407+
return kReturnServiceNotFound;
408+
}
409+
if (service_data->GetDataStatus() < kDataIsSyncing) {
410+
service_data->DecrementRef();
411+
return kReturnServiceNotFound;
412+
}
413+
// 由于此处获取service data没有更新访问时间,服务可能淘汰,不能直接使用其关联的服务数据
414+
pthread_rwlock_rdlock(&rwlock_);
415+
std::map<ServiceKey, Service*>::iterator service_it = service_cache_.find(service_key);
416+
if (service_it == service_cache_.end()) {
417+
pthread_rwlock_unlock(&rwlock_);
418+
return kReturnServiceNotFound;
419+
}
420+
std::set<std::string> open_instance = service_it->second->GetCircuitBreakerOpenInstances();
421+
pthread_rwlock_unlock(&rwlock_);
422+
423+
ServiceInstances service_instances(service_data);
424+
std::map<std::string, Instance*>& instance_map = service_instances.GetInstances();
425+
for (std::set<std::string>::iterator it = open_instance.begin(); it != open_instance.end();
426+
++it) {
427+
const std::string& instance_id = *it;
428+
std::map<std::string, Instance*>::iterator iter = instance_map.find(instance_id);
429+
if (iter == instance_map.end()) {
430+
POLARIS_LOG(LOG_INFO, "The outlier detector of service[%s/%s] getting instance[%s] failed",
431+
service_key.namespace_.c_str(), service_key.name_.c_str(), instance_id.c_str());
432+
continue;
433+
}
434+
open_instances.push_back(iter->second);
435+
}
436+
if (open_instances.empty()) {
437+
return kReturnInstanceNotFound;
438+
}
439+
service_data->IncrementRef();
440+
return kReturnOk;
441+
}
442+
392443
} // namespace polaris

polaris/plugin/local_registry/local_registry.h

+7-1
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ class InMemoryRegistry : public LocalRegistry {
9595
virtual ReturnCode UpdateSetCircuitBreakerData(
9696
const ServiceKey& service_key, const CircuitBreakUnhealthySetsData& unhealthy_sets);
9797

98+
virtual ReturnCode GetCircuitBreakerInstances(const ServiceKey& service_key,
99+
ServiceData*& service_data,
100+
std::vector<Instance*>& open_instances);
101+
98102
virtual ReturnCode UpdateDynamicWeight(const ServiceKey& service_key,
99103
const DynamicWeightData& dynamic_weight_data);
100104

@@ -105,7 +109,9 @@ class InMemoryRegistry : public LocalRegistry {
105109
ServiceDataNotify* GetOrCreateDataNotify(const ServiceKey& service_key, ServiceDataType data_type,
106110
bool& new_create);
107111

108-
Service* GetOrCreateServiceInLock(const ServiceKey& service_key);
112+
Service* CreateServiceInLock(const ServiceKey& service_key);
113+
114+
Service* GetServiceInLock(const ServiceKey& service_key);
109115

110116
void DeleteServiceInLock(const ServiceKey& service_key);
111117

0 commit comments

Comments
 (0)