48
48
#include " memtier_benchmark.h"
49
49
#include " obj_gen.h"
50
50
#include " shard_connection.h"
51
-
52
- #define KEY_INDEX_QUEUE_MAX_SIZE 1000000
51
+ #include " crc16_slottable.h"
53
52
54
53
#define MOVED_MSG_PREFIX " -MOVED"
55
54
#define MOVED_MSG_PREFIX_LEN 6
56
55
#define ASK_MSG_PREFIX " -ASK"
57
56
#define ASK_MSG_PREFIX_LEN 4
58
57
59
- #define MAX_CLUSTER_HSLOT 16383
60
58
static const uint16_t crc16tab[256 ]= {
61
59
0x0000 ,0x1021 ,0x2042 ,0x3063 ,0x4084 ,0x50a5 ,0x60c6 ,0x70e7 ,
62
60
0x8108 ,0x9129 ,0xa14a ,0xb16b ,0xc18c ,0xd1ad ,0xe1ce ,0xf1ef ,
@@ -100,24 +98,13 @@ static inline uint16_t crc16(const char *buf, size_t len) {
100
98
return crc;
101
99
}
102
100
103
- static uint32_t calc_hslot_crc16_cluster (const char *str, size_t length)
104
- {
105
- uint32_t rv = (uint32_t ) crc16 (str, length) & MAX_CLUSTER_HSLOT;
106
- return rv;
107
- }
108
-
109
101
// /////////////////////////////////////////////////////////////////////////////////////////////////////
110
102
111
103
cluster_client::cluster_client (client_group* group) : client(group)
112
104
{
113
105
}
114
106
115
107
cluster_client::~cluster_client () {
116
- for (unsigned int i = 0 ; i < m_key_index_pools.size (); i++) {
117
- key_index_pool* key_idx_pool = m_key_index_pools[i];
118
- delete key_idx_pool;
119
- }
120
- m_key_index_pools.clear ();
121
108
}
122
109
123
110
int cluster_client::connect (void ) {
@@ -128,11 +115,6 @@ int cluster_client::connect(void) {
128
115
// set main connection to send 'CLUSTER SLOTS' command
129
116
sc->set_cluster_slots ();
130
117
131
- // create key index pool for main connection
132
- key_index_pool* key_idx_pool = new key_index_pool;
133
- m_key_index_pools.push_back (key_idx_pool);
134
- assert (m_connections.size () == m_key_index_pools.size ());
135
-
136
118
// continue with base class
137
119
client::connect ();
138
120
@@ -166,22 +148,10 @@ shard_connection* cluster_client::create_shard_connection(abstract_protocol* abs
166
148
167
149
m_connections.push_back (sc);
168
150
169
- // create key index pool
170
- key_index_pool* key_idx_pool = new key_index_pool;
171
- assert (key_idx_pool != NULL );
172
-
173
- m_key_index_pools.push_back (key_idx_pool);
174
- assert (m_connections.size () == m_key_index_pools.size ());
175
-
176
151
return sc;
177
152
}
178
153
179
154
bool cluster_client::connect_shard_connection (shard_connection* sc, char * address, char * port) {
180
- // empty key index queue
181
- if (m_key_index_pools[sc->get_id ()]->size ()) {
182
- key_index_pool empty_queue;
183
- std::swap (*m_key_index_pools[sc->get_id ()], empty_queue);
184
- }
185
155
186
156
// save address and port
187
157
sc->set_address_port (address, port);
@@ -224,9 +194,12 @@ void cluster_client::handle_cluster_slots(protocol_response *r) {
224
194
*/
225
195
unsigned long prev_connections_size = m_connections.size ();
226
196
std::vector<bool > close_sc (prev_connections_size, true );
197
+ for (unsigned int i = 0 ; i < MAX_SLOTS; i++) {
198
+ m_conn_to_init_slot[i] = UINT16_MAX;
199
+ }
227
200
228
201
// run over response and create connections
229
- for (unsigned int i= 0 ; i< r->get_mbulk_value ()->mbulks_elements .size (); i++) {
202
+ for (unsigned int i = 0 ; i < r->get_mbulk_value ()->mbulks_elements .size (); i++) {
230
203
// create connection
231
204
mbulk_size_el* shard = r->get_mbulk_value ()->mbulks_elements [i]->as_mbulk_size ();
232
205
@@ -273,17 +246,26 @@ void cluster_client::handle_cluster_slots(protocol_response *r) {
273
246
connect_shard_connection (sc, addr, port);
274
247
}
275
248
276
- // update range
249
+ unsigned int sc_id = sc->get_id ();
250
+ // Set the initial slot for this shard connection
251
+ if (m_conn_to_init_slot[sc_id] == UINT16_MAX) {
252
+ m_conn_to_init_slot[sc_id] = min_slot;
253
+ }
277
254
for (int j = min_slot; j <= max_slot; j++) {
278
- m_slot_to_shard[j] = sc->get_id ();
255
+ if (j < max_slot) {
256
+ m_slot_lists[j] = j+1 ;
257
+ } else {
258
+ // Close the loop - point the last index to the first one owned by the shard connection
259
+ m_slot_lists[j] = m_conn_to_init_slot[sc_id];
260
+ }
279
261
}
280
262
281
263
free (addr);
282
264
free (port);
283
265
}
284
266
285
267
// check if some connections left with no slots, and need to be closed
286
- for (unsigned int i= 0 ; i < prev_connections_size; i++) {
268
+ for (unsigned int i = 0 ; i < prev_connections_size; i++) {
287
269
if ((close_sc[i] == true ) &&
288
270
(m_connections[i]->get_connection_state () != conn_disconnected)) {
289
271
@@ -299,8 +281,7 @@ bool cluster_client::hold_pipeline(unsigned int conn_id) {
299
281
300
282
// don't exceed requests
301
283
if (m_config->requests ) {
302
- if (m_key_index_pools[conn_id]->empty () &&
303
- m_reqs_generated >= m_config->requests ) {
284
+ if (m_reqs_generated >= m_config->requests ) {
304
285
return true ;
305
286
}
306
287
}
@@ -309,53 +290,13 @@ bool cluster_client::hold_pipeline(unsigned int conn_id) {
309
290
}
310
291
311
292
bool cluster_client::get_key_for_conn (unsigned int conn_id, int iter, unsigned long long * key_index) {
312
- // first check if we already have key in pool
313
- if (!m_key_index_pools[conn_id]->empty ()) {
314
- *key_index = m_key_index_pools[conn_id]->front ();
315
- m_key_len = snprintf (m_key_buffer, sizeof (m_key_buffer)-1 , " %s%llu" , m_obj_gen->get_key_prefix (), *key_index);
316
-
317
- m_key_index_pools[conn_id]->pop ();
318
- return true ;
319
- }
320
-
321
- // keep generate key till it match for this connection, or requests reached
322
- while (true ) {
323
- // generate key
324
- *key_index = m_obj_gen->get_key_index (iter);
325
- m_key_len = snprintf (m_key_buffer, sizeof (m_key_buffer)-1 , " %s%llu" , m_obj_gen->get_key_prefix (), *key_index);
326
-
327
- unsigned int hslot = calc_hslot_crc16_cluster (m_key_buffer, m_key_len);
328
-
329
- // check if the key match for this connection
330
- if (m_slot_to_shard[hslot] == conn_id) {
331
- m_reqs_generated++;
332
- return true ;
333
- }
334
-
335
- // handle key for other connection
336
- unsigned int other_conn_id = m_slot_to_shard[hslot];
337
293
338
- // in case we generated key for connection that is disconnected, 'slot to shard' map may need to be updated
339
- if (m_connections[other_conn_id]->get_connection_state () == conn_disconnected) {
340
- m_connections[conn_id]->set_cluster_slots ();
341
- return false ;
342
- }
343
-
344
- // in case connection is during cluster slots command, his slots mapping not relevant
345
- if (m_connections[other_conn_id]->get_cluster_slots_state () != setup_done)
346
- continue ;
347
-
348
- // store key for other connection, if queue is not full
349
- key_index_pool* key_idx_pool = m_key_index_pools[other_conn_id];
350
- if (key_idx_pool->size () < KEY_INDEX_QUEUE_MAX_SIZE) {
351
- key_idx_pool->push (*key_index);
352
- m_reqs_generated++;
353
- }
354
-
355
- // don't exceed requests
356
- if (m_config->requests > 0 && m_reqs_generated >= m_config->requests )
357
- return false ;
358
- }
294
+ *key_index = m_obj_gen->get_key_index (iter);
295
+ m_key_len = snprintf (m_key_buffer, sizeof (m_key_buffer)-1 , " %s{%s}%llu" ,
296
+ m_obj_gen->get_key_prefix (), crc16_slot_table[m_conn_to_init_slot[conn_id]], *key_index);
297
+ m_conn_to_init_slot[conn_id] = m_slot_lists[m_conn_to_init_slot[conn_id]];
298
+ m_reqs_generated++;
299
+ return true ;
359
300
}
360
301
361
302
// This function could use some urgent TLC -- but we need to do it without altering the behavior
@@ -432,10 +373,6 @@ void cluster_client::handle_moved(unsigned int conn_id, struct timeval timestamp
432
373
if (m_connections[conn_id]->get_cluster_slots_state () != setup_done)
433
374
return ;
434
375
435
- // queue may stored uncorrected mapping indexes, empty them
436
- key_index_pool empty_queue;
437
- std::swap (*m_key_index_pools[conn_id], empty_queue);
438
-
439
376
// set connection to send 'CLUSTER SLOTS' command
440
377
m_connections[conn_id]->set_cluster_slots ();
441
378
}
0 commit comments