@@ -63,7 +63,14 @@ Driver::Driver(Config&& config, ValidConfigTag)
63
63
scheduler_{std::move (config.scheduler )},
64
64
selector_{std::move (config.selector )},
65
65
enginePairs_{std::move (config.enginePairs )},
66
- admissionPolicy_{std::move (config.admissionPolicy )} {
66
+ admissionPolicy_{std::move (config.admissionPolicy )},
67
+ rejectedCountByEngine_{enginePairs_.size () > 1 ? enginePairs_.size () : 0 },
68
+ rejectedConcurrentInsertsCountByEngine_{
69
+ enginePairs_.size () > 1 ? enginePairs_.size () : 0 },
70
+ rejectedParcelMemoryCountByEngine_{
71
+ enginePairs_.size () > 1 ? enginePairs_.size () : 0 },
72
+ rejectedBytesByEngine_{enginePairs_.size () > 1 ? enginePairs_.size ()
73
+ : 0 } {
67
74
getRandomAllocDist = getDist (enginePairs_);
68
75
XLOGF (INFO, " Max concurrent inserts: {}" , maxConcurrentInserts_);
69
76
XLOGF (INFO, " Max parcel memory: {}" , maxParcelMemory_);
@@ -123,6 +130,7 @@ bool Driver::admissionTest(HashedKey hk, BufferView value) const {
123
130
size_t parcelSize = hk.key ().size () + value.size ();
124
131
auto currParcelMemory = parcelMemory_.add_fetch (parcelSize);
125
132
auto currConcurrentInserts = concurrentInserts_.add_fetch (1 );
133
+ auto enginePairIndex = selectEnginePair (hk);
126
134
127
135
if (!admissionPolicy_ ||
128
136
admissionPolicy_->accept (hk, value, estimateWriteSize (hk, value))) {
@@ -133,13 +141,25 @@ bool Driver::admissionTest(HashedKey hk, BufferView value) const {
133
141
return true ;
134
142
} else {
135
143
rejectedParcelMemoryCount_.inc ();
144
+ if (rejectedParcelMemoryCountByEngine_.size () > enginePairIndex) {
145
+ rejectedParcelMemoryCountByEngine_[enginePairIndex].inc ();
146
+ }
136
147
}
137
148
} else {
138
149
rejectedConcurrentInsertsCount_.inc ();
150
+ if (rejectedConcurrentInsertsCountByEngine_.size () > enginePairIndex) {
151
+ rejectedConcurrentInsertsCountByEngine_[enginePairIndex].inc ();
152
+ }
139
153
}
140
154
}
141
155
rejectedCount_.inc ();
142
156
rejectedBytes_.add (parcelSize);
157
+ if (rejectedCountByEngine_.size () > enginePairIndex) {
158
+ rejectedCountByEngine_[enginePairIndex].inc ();
159
+ }
160
+ if (rejectedBytesByEngine_.size () > enginePairIndex) {
161
+ rejectedBytesByEngine_[enginePairIndex].add (parcelSize);
162
+ }
143
163
144
164
// Revert counter modifications. Remember, can't assign back atomic.
145
165
concurrentInserts_.dec ();
@@ -152,6 +172,15 @@ Status Driver::insertAsync(HashedKey hk, BufferView value, InsertCallback cb) {
152
172
if (hk.key ().size () > kMaxKeySize ) {
153
173
rejectedCount_.inc ();
154
174
rejectedBytes_.add (hk.key ().size () + value.size ());
175
+
176
+ auto enginePairIndex = selectEnginePair (hk);
177
+ if (rejectedCountByEngine_.size () > enginePairIndex) {
178
+ rejectedCountByEngine_[enginePairIndex].inc ();
179
+ }
180
+ if (rejectedBytesByEngine_.size () > enginePairIndex) {
181
+ rejectedBytesByEngine_[enginePairIndex].add (hk.key ().size () +
182
+ value.size ());
183
+ }
155
184
return Status::Rejected;
156
185
}
157
186
@@ -306,6 +335,21 @@ void Driver::getCounters(const CounterVisitor& visitor) const {
306
335
visitor (folly::to<std::string>(name, " _" , idx, suffix), count);
307
336
}};
308
337
enginePairs_[idx].getCounters (pv);
338
+
339
+ visitor (folly::to<std::string>(" navy_rejected_" , idx, suffix),
340
+ rejectedCountByEngine_[idx].get (),
341
+ CounterVisitor::CounterType::RATE);
342
+ visitor (folly::to<std::string>(" navy_rejected_concurrent_inserts_" , idx,
343
+ suffix),
344
+ rejectedConcurrentInsertsCountByEngine_[idx].get (),
345
+ CounterVisitor::CounterType::RATE);
346
+ visitor (
347
+ folly::to<std::string>(" navy_rejected_parcel_memory_" , idx, suffix),
348
+ rejectedParcelMemoryCountByEngine_[idx].get (),
349
+ CounterVisitor::CounterType::RATE);
350
+ visitor (folly::to<std::string>(" navy_rejected_bytes_" , idx, suffix),
351
+ rejectedBytesByEngine_[idx].get (),
352
+ CounterVisitor::CounterType::RATE);
309
353
}
310
354
visitor (" navy_total_usable_size" , getUsableSize ());
311
355
} else {
0 commit comments