|
10 | 10 |
|
11 | 11 | #include <tuple>
|
12 | 12 |
|
| 13 | +#include "logdevice/common/RetryHandler.h" |
| 14 | + |
13 | 15 | namespace facebook { namespace logdevice {
|
14 | 16 |
|
15 | 17 | RebuildingMarkerChecker::RebuildingMarkerChecker(
|
16 | 18 | const std::unordered_map<shard_index_t, membership::ShardState>& shards,
|
17 | 19 | NodeID my_node_id,
|
18 |
| - membership::MembershipVersion::Type storage_version, |
19 | 20 | configuration::NodesConfigurationAPI* nc_api,
|
20 | 21 | ShardedLocalLogStore* sharded_store)
|
21 | 22 | : shards_(shards),
|
22 | 23 | my_node_id_(std::move(my_node_id)),
|
23 |
| - storage_version_(storage_version), |
24 | 24 | nc_api_(nc_api),
|
25 | 25 | sharded_store_(sharded_store) {
|
26 | 26 | ld_check(sharded_store_);
|
@@ -124,37 +124,59 @@ Status RebuildingMarkerChecker::markShardsAsProvisioned(
|
124 | 124 | return Status::OK;
|
125 | 125 | }
|
126 | 126 |
|
127 |
| - ld_info("Will mark shards %s as provisioned in the NodesConfiguration", |
128 |
| - toString(to_be_updated).c_str()); |
129 |
| - |
130 |
| - using namespace configuration::nodes; |
131 |
| - using namespace membership; |
132 |
| - |
133 |
| - // Build the MARK_SHARD_PROVISIONED update |
134 |
| - NodesConfiguration::Update update; |
135 |
| - update.storage_config_update = std::make_unique<StorageConfig::Update>(); |
136 |
| - update.storage_config_update->membership_update = |
137 |
| - std::make_unique<StorageMembership::Update>(storage_version_); |
138 |
| - for (auto shard : shards) { |
139 |
| - update.storage_config_update->membership_update->addShard( |
140 |
| - ShardID(my_node_id_.index(), shard), |
141 |
| - { |
142 |
| - StorageStateTransition::MARK_SHARD_PROVISIONED, |
143 |
| - Condition::EMPTY_SHARD | Condition::LOCAL_STORE_READABLE | |
144 |
| - Condition::NO_SELF_REPORT_MISSING_DATA, |
145 |
| - }); |
146 |
| - } |
147 |
| - |
148 |
| - // Apply the update |
149 |
| - Status update_status; |
150 |
| - folly::Baton<> b; |
151 |
| - nc_api_->update(std::move(update), |
152 |
| - [&](Status st, std::shared_ptr<const NodesConfiguration>) { |
153 |
| - update_status = st; |
154 |
| - b.post(); |
155 |
| - }); |
156 |
| - b.wait(); |
157 |
| - return update_status; |
| 127 | + auto result = RetryHandler<Status>::syncRun( |
| 128 | + [&](size_t trial_num) { |
| 129 | + ld_info("Will mark shards %s as PROVISIONED in the NodesConfiguration " |
| 130 | + "(trial #%ld)", |
| 131 | + toString(to_be_updated).c_str(), |
| 132 | + trial_num); |
| 133 | + |
| 134 | + using namespace configuration::nodes; |
| 135 | + using namespace membership; |
| 136 | + |
| 137 | + auto storage_membership = nc_api_->getConfig()->getStorageMembership(); |
| 138 | + auto storage_version = storage_membership->getVersion(); |
| 139 | + |
| 140 | + // Build the MARK_SHARD_PROVISIONED update |
| 141 | + NodesConfiguration::Update update; |
| 142 | + update.storage_config_update = |
| 143 | + std::make_unique<StorageConfig::Update>(); |
| 144 | + update.storage_config_update->membership_update = |
| 145 | + std::make_unique<StorageMembership::Update>(storage_version); |
| 146 | + for (auto shard : shards) { |
| 147 | + auto shard_id = ShardID(my_node_id_.index(), shard); |
| 148 | + if (storage_membership->getShardState(shard_id)->storage_state != |
| 149 | + StorageState::PROVISIONING) { |
| 150 | + continue; |
| 151 | + } |
| 152 | + update.storage_config_update->membership_update->addShard( |
| 153 | + shard_id, |
| 154 | + { |
| 155 | + StorageStateTransition::MARK_SHARD_PROVISIONED, |
| 156 | + Condition::EMPTY_SHARD | Condition::LOCAL_STORE_READABLE | |
| 157 | + Condition::NO_SELF_REPORT_MISSING_DATA, |
| 158 | + }); |
| 159 | + } |
| 160 | + |
| 161 | + // Apply the update |
| 162 | + Status update_status; |
| 163 | + folly::Baton<> b; |
| 164 | + nc_api_->update( |
| 165 | + std::move(update), |
| 166 | + [&](Status st, std::shared_ptr<const NodesConfiguration>) { |
| 167 | + update_status = st; |
| 168 | + b.post(); |
| 169 | + }); |
| 170 | + b.wait(); |
| 171 | + return update_status; |
| 172 | + }, |
| 173 | + [](const Status& st) { return st == Status::VERSION_MISMATCH; }, |
| 174 | + /* num_tries */ 10, |
| 175 | + /* backoff_min */ std::chrono::seconds(1), |
| 176 | + /* backoff_max */ std::chrono::seconds(60), |
| 177 | + /* jitter_param */ 0.25); |
| 178 | + |
| 179 | + return result.hasValue() ? result.value() : result.error(); |
158 | 180 | }
|
159 | 181 |
|
160 | 182 | }} // namespace facebook::logdevice
|
0 commit comments