@@ -157,13 +157,25 @@ func sendCmdLogicRadix(conn Client, newCmdS []string, enableMultiExec bool, key
157
157
datapointsChan <- datapoint {! (err != nil ), duration .Microseconds (), cacheHit }
158
158
}
159
159
160
+ func onInvalidations (messages []rueidis.RedisMessage ) {
161
+ if messages != nil {
162
+ cscInvalidationMutex .Lock ()
163
+ for range messages {
164
+ totalCachedInvalidations ++
165
+ }
166
+ cscInvalidationMutex .Unlock ()
167
+ }
168
+
169
+ }
170
+
160
171
func main () {
161
172
host := flag .String ("h" , "127.0.0.1" , "Server hostname." )
162
173
port := flag .Int ("p" , 12000 , "Server port." )
163
174
rps := flag .Int64 ("rps" , 0 , "Max rps. If 0 no limit is applied and the DB is stressed up to maximum." )
164
175
rpsburst := flag .Int64 ("rps-burst" , 0 , "Max rps burst. If 0 the allowed burst will be the ammount of clients." )
165
176
username := flag .String ("u" , "" , "Username for Redis Auth." )
166
177
password := flag .String ("a" , "" , "Password for Redis Auth." )
178
+ jsonOutFile := flag .String ("json-out-file" , "" , "Results file. If empty will not save." )
167
179
seed := flag .Int64 ("random-seed" , 12345 , "random seed to be used." )
168
180
clients := flag .Uint64 ("c" , 50 , "number of clients." )
169
181
keyspacelen := flag .Uint64 ("r" , 1000000 , "keyspace length. The benchmark will expand the string __key__ inside an argument with a number in the specified range from 0 to keyspacelen-1. The substitution changes every time a command is executed." )
@@ -248,6 +260,7 @@ func main() {
248
260
samplesPerClient := * numberRequests / * clients
249
261
client_update_tick := 1
250
262
latencies = hdrhistogram .New (1 , 90000000 , 3 )
263
+ latenciesTick = hdrhistogram .New (1 , 90000000 , 3 )
251
264
opts := radix.Dialer {}
252
265
if * password != "" {
253
266
opts .AuthPass = * password
@@ -310,6 +323,10 @@ func main() {
310
323
cmd := make ([]string , len (args ))
311
324
copy (cmd , args )
312
325
if * cscEnabled || * useRuedis {
326
+ var invalidationFunction func ([]rueidis.RedisMessage ) = nil
327
+ if * cscEnabled {
328
+ invalidationFunction = onInvalidations
329
+ }
313
330
clientOptions := rueidis.ClientOption {
314
331
InitAddress : []string {connectionStr },
315
332
Username : * username ,
@@ -323,6 +340,7 @@ func main() {
323
340
ReadBufferEachConn : 1024 ,
324
341
WriteBufferEachConn : 1024 ,
325
342
CacheSizeEachConn : * cscSizeBytes ,
343
+ OnInvalidations : invalidationFunction ,
326
344
}
327
345
clientOptions .Dialer .KeepAlive = * clientKeepAlive
328
346
ruedisClient , err = rueidis .NewClient (clientOptions )
@@ -350,8 +368,10 @@ func main() {
350
368
signal .Notify (c , os .Interrupt )
351
369
352
370
tick := time .NewTicker (time .Duration (client_update_tick ) * time .Second )
353
- closed , _ , duration , totalMessages , _ := updateCLI (tick , c , * numberRequests , * loop , datapointsChan )
354
- messageRate := float64 (totalMessages ) / float64 (duration .Seconds ())
371
+ closed , startT , endT , duration , _ , _ , _ , messageRateTs , cacheRateTs , cacheInvalidationsTs , percentilesTs := updateCLI (tick , c , * numberRequests , * loop , datapointsChan , int (* clients ))
372
+ messageRate := float64 (totalCommands ) / float64 (duration .Seconds ())
373
+ CSCHitRate := float64 (totalCached ) / float64 (duration .Seconds ())
374
+ CSCInvalidationRate := float64 (totalCachedInvalidations ) / float64 (duration .Seconds ())
355
375
avgMs := float64 (latencies .Mean ()) / 1000.0
356
376
p50IngestionMs := float64 (latencies .ValueAtQuantile (50.0 )) / 1000.0
357
377
p95IngestionMs := float64 (latencies .ValueAtQuantile (95.0 )) / 1000.0
@@ -362,10 +382,22 @@ func main() {
362
382
fmt .Printf ("Total Duration %.3f Seconds\n " , duration .Seconds ())
363
383
fmt .Printf ("Total Errors %d\n " , totalErrors )
364
384
fmt .Printf ("Throughput summary: %.0f requests per second\n " , messageRate )
385
+ fmt .Printf (" %.0f CSC Hits per second\n " , CSCHitRate )
386
+ fmt .Printf (" %.0f CSC Evicts per second\n " , CSCInvalidationRate )
365
387
fmt .Printf ("Latency summary (msec):\n " )
366
388
fmt .Printf (" %9s %9s %9s %9s\n " , "avg" , "p50" , "p95" , "p99" )
367
389
fmt .Printf (" %9.3f %9.3f %9.3f %9.3f\n " , avgMs , p50IngestionMs , p95IngestionMs , p99IngestionMs )
368
390
391
+ testResult := NewTestResult ("" , uint (* clients ), 0 )
392
+ testResult .FillDurationInfo (startT , endT , duration )
393
+ testResult .OverallClientLatencies = percentilesTs
394
+ testResult .OverallCommandRate = messageRateTs
395
+ testResult .OverallCSCHitRate = cacheRateTs
396
+ testResult .OverallCSCInvalidationRate = cacheInvalidationsTs
397
+ _ , overallLatencies := generateLatenciesMap (latencies , duration )
398
+ testResult .Totals = overallLatencies
399
+ saveJsonResult (testResult , * jsonOutFile )
400
+
369
401
if closed {
370
402
return
371
403
}
@@ -378,21 +410,27 @@ func main() {
378
410
os .Exit (0 )
379
411
}
380
412
381
- func updateCLI (tick * time.Ticker , c chan os.Signal , message_limit uint64 , loop bool , datapointsChan chan datapoint ) (bool , time.Time , time.Duration , uint64 , [ ]float64 ) {
413
+ func updateCLI (tick * time.Ticker , c chan os.Signal , message_limit uint64 , loop bool , datapointsChan chan datapoint , totalClients int ) (bool , time.Time , time.Time , time. Duration , uint64 , uint64 , uint64 , [] float64 , [] float64 , [] float64 , [] map [ string ]float64 ) {
382
414
var currentErr uint64 = 0
383
415
var currentCount uint64 = 0
384
416
var currentCachedCount uint64 = 0
385
417
start := time .Now ()
386
418
prevTime := time .Now ()
387
419
prevMessageCount := uint64 (0 )
420
+ prevMessageCached := uint64 (0 )
421
+ previousCachedInvalidations := uint64 (0 )
388
422
messageRateTs := []float64 {}
423
+ cacheRateTs := []float64 {}
424
+ cacheInvalidationsTs := []float64 {}
425
+ percentilesTs := []map [string ]float64 {}
389
426
var dp datapoint
390
- fmt .Printf ("%26s %7s %25s %25s %7s %25s %25s %7s %25s\n " , "Test time" , " " , "Total Commands" , "Total Errors" , "" , "Command Rate" , "Client Cache Hits" , "" , "p50 lat. (msec)" )
427
+ fmt .Printf ("%26s %7s %25s %25s %7s %25s %25s %25s %25s\n " , "Test time" , " " , "Total Commands" , "Total Errors" , "" , "Command Rate" , "CSC Hits/sec " , "CSC Invalidations/sec " , "p50 lat. (msec)" )
391
428
for {
392
429
select {
393
430
case dp = <- datapointsChan :
394
431
{
395
432
latencies .RecordValue (dp .duration_ms )
433
+ latenciesTick .RecordValue (dp .duration_ms )
396
434
if ! dp .success {
397
435
currentErr ++
398
436
}
@@ -412,41 +450,48 @@ func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit uint64, loop b
412
450
now := time .Now ()
413
451
took := now .Sub (prevTime )
414
452
messageRate := float64 (totalCommands - prevMessageCount ) / float64 (took .Seconds ())
453
+ InvalidationMessageRate := float64 (totalCachedInvalidations - previousCachedInvalidations ) / float64 (took .Seconds ())
454
+ CacheHitRate := float64 (totalCached - prevMessageCached ) / float64 (took .Seconds ())
415
455
completionPercentStr := "[----%]"
416
456
if ! loop {
417
457
completionPercent := float64 (totalCommands ) / float64 (message_limit ) * 100.0
418
458
completionPercentStr = fmt .Sprintf ("[%3.1f%%]" , completionPercent )
419
459
}
420
460
errorPercent := float64 (totalErrors ) / float64 (totalCommands ) * 100.0
421
- cachedPercent := 0.0
422
- if totalCached > 0 {
423
- cachedPercent = float64 (totalCached ) / float64 (totalCommands ) * 100.0
424
- }
425
-
426
461
p50 := float64 (latencies .ValueAtQuantile (50.0 )) / 1000.0
427
462
428
463
if prevMessageCount == 0 && totalCommands != 0 {
429
464
start = time .Now ()
430
465
}
431
466
if totalCommands != 0 {
432
467
messageRateTs = append (messageRateTs , messageRate )
468
+ cacheRateTs = append (cacheRateTs , CacheHitRate )
469
+ cacheInvalidationsTs = append (cacheInvalidationsTs , InvalidationMessageRate )
470
+ _ , perTickLatencies := generateLatenciesMap (latenciesTick , took )
471
+ percentilesTs = append (percentilesTs , perTickLatencies )
472
+ latenciesTick .Reset ()
433
473
}
474
+
434
475
prevMessageCount = totalCommands
476
+ prevMessageCached = totalCached
477
+ previousCachedInvalidations = totalCachedInvalidations
435
478
prevTime = now
436
479
437
- fmt .Printf ("%25.0fs %s %25d %25d [%3.1f%%] %25.0f %25d [%3.1f%%] %25.3f\t " , time .Since (start ).Seconds (), completionPercentStr , totalCommands , totalErrors , errorPercent , messageRate , totalCached , cachedPercent , p50 )
480
+ fmt .Printf ("%25.0fs %s %25d %25d [%3.1f%%] %25.0f %25.0f %25.0f %25.3f\t " , time .Since (start ).Seconds (), completionPercentStr , totalCommands , totalErrors , errorPercent , messageRate , CacheHitRate , InvalidationMessageRate , p50 )
438
481
fmt .Printf ("\r " )
439
482
//w.Flush()
440
483
if message_limit > 0 && totalCommands >= uint64 (message_limit ) && ! loop {
441
- return true , start , time .Since (start ), totalCommands , messageRateTs
484
+ end := time .Now ()
485
+ return true , start , end , time .Since (start ), totalCommands , totalCached , totalErrors , messageRateTs , cacheRateTs , cacheInvalidationsTs , percentilesTs
442
486
}
443
487
444
488
break
445
489
}
446
490
447
491
case <- c :
448
492
fmt .Println ("\n received Ctrl-c - shutting down" )
449
- return true , start , time .Since (start ), totalCommands , messageRateTs
493
+ end := time .Now ()
494
+ return true , start , end , time .Since (start ), totalCommands , totalCached , totalErrors , messageRateTs , cacheRateTs , cacheInvalidationsTs , percentilesTs
450
495
}
451
496
}
452
497
}
0 commit comments