From 0a412c12277905447a97b079a30b08a7f5943c00 Mon Sep 17 00:00:00 2001 From: Chris Blum Date: Mon, 15 Jul 2019 15:59:34 +0200 Subject: [PATCH 1/6] Worker: Only create the bucket once Speedup for preparation steps --- worker/main.go | 4 ++++ worker/workItems.go | 14 +------------- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/worker/main.go b/worker/main.go index cb57dde..6cf1a7b 100644 --- a/worker/main.go +++ b/worker/main.go @@ -179,6 +179,10 @@ func fillWorkqueue(testConfig *common.TestCaseConfiguration, Workqueue *Workqueu bucketCount := common.EvaluateDistribution(testConfig.Buckets.NumberMin, testConfig.Buckets.NumberMax, &testConfig.Buckets.NumberLast, 1, testConfig.Buckets.NumberDistribution) for bucket := uint64(0); bucket < bucketCount; bucket++ { + err := createBucket(housekeepingSvc, fmt.Sprintf("%s%s%d", workerID, testConfig.BucketPrefix, bucket)) + if err != nil { + log.WithError(err).WithField("bucket", fmt.Sprintf("%s%s%d", workerID, testConfig.BucketPrefix, bucket)).Error("Error when creating bucket") + } objectCount := common.EvaluateDistribution(testConfig.Objects.NumberMin, testConfig.Objects.NumberMax, &testConfig.Objects.NumberLast, 1, testConfig.Objects.NumberDistribution) for object := uint64(0); object < objectCount; object++ { objectSize := common.EvaluateDistribution(testConfig.Objects.SizeMin, testConfig.Objects.SizeMax, &testConfig.Objects.SizeLast, 1, testConfig.Objects.SizeDistribution) diff --git a/worker/workItems.go b/worker/workItems.go index d3634bc..5b58faf 100644 --- a/worker/workItems.go +++ b/worker/workItems.go @@ -97,36 +97,24 @@ func IncreaseOperationValue(operation string, value float64, Queue *Workqueue) e // Prepare prepares the execution of the ReadOperation func (op ReadOperation) Prepare() error { log.WithField("bucket", op.Bucket).WithField("object", op.ObjectName).Debug("Preparing ReadOperation") - err := createBucket(housekeepingSvc, op.Bucket) - if err != nil { - return err - } return putObject(housekeepingSvc, op.ObjectName, bytes.NewReader(generateRandomBytes(op.ObjectSize)), op.Bucket) } // Prepare prepares the execution of the WriteOperation func (op WriteOperation) Prepare() error { log.WithField("bucket", op.Bucket).WithField("object", op.ObjectName).Debug("Preparing WriteOperation") - return createBucket(housekeepingSvc, op.Bucket) + return nil } // Prepare prepares the execution of the ListOperation func (op ListOperation) Prepare() error { log.WithField("bucket", op.Bucket).WithField("object", op.ObjectName).Debug("Preparing ListOperation") - err := createBucket(housekeepingSvc, op.Bucket) - if err != nil { - return err - } return putObject(housekeepingSvc, op.ObjectName, bytes.NewReader(generateRandomBytes(op.ObjectSize)), op.Bucket) } // Prepare prepares the execution of the DeleteOperation func (op DeleteOperation) Prepare() error { log.WithField("bucket", op.Bucket).WithField("object", op.ObjectName).Debug("Preparing DeleteOperation") - err := createBucket(housekeepingSvc, op.Bucket) - if err != nil { - return err - } return putObject(housekeepingSvc, op.ObjectName, bytes.NewReader(generateRandomBytes(op.ObjectSize)), op.Bucket) } From 128bfdd898335deb95350d436b707520213dc6c8 Mon Sep 17 00:00:00 2001 From: Chris Blum Date: Mon, 15 Jul 2019 16:04:56 +0200 Subject: [PATCH 2/6] Worker: Decrease verbosity --- worker/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/main.go b/worker/main.go index 6cf1a7b..a722733 100644 --- a/worker/main.go +++ b/worker/main.go @@ -15,7 +15,7 @@ import ( var config common.WorkerConf func init() { - log.SetLevel(log.DebugLevel) + log.SetLevel(log.InfoLevel) log.SetFormatter(&log.TextFormatter{ FullTimestamp: true, }) From 4ccf6da7ef944137b04ecdf121b6027d1129fb54 Mon Sep 17 00:00:00 2001 From: Chris Blum Date: Mon, 15 Jul 2019 16:25:41 +0200 Subject: [PATCH 3/6] Worker speed up cleaning Only delete buckets once --- worker/main.go | 7 +++++-- worker/workItems.go | 20 ++++---------------- 2 files changed, 9 insertions(+), 18 deletions(-) diff --git a/worker/main.go b/worker/main.go index a722733..df27d5a 100644 --- a/worker/main.go +++ b/worker/main.go @@ -79,7 +79,7 @@ func connectToServer(serverAddress string) error { return nil } log.Info("Starting to work") - PerfTest(config.Test, Workqueue) + PerfTest(config.Test, Workqueue, config.WorkerID) encoder.Encode(common.WorkerMessage{Message: "work done"}) // Work is done - return to being a ready worker by reconnecting return nil @@ -88,7 +88,7 @@ func connectToServer(serverAddress string) error { } // PerfTest runs a performance test as configured in testConfig -func PerfTest(testConfig *common.TestCaseConfiguration, Workqueue *Workqueue) { +func PerfTest(testConfig *common.TestCaseConfiguration, Workqueue *Workqueue, workerID string) { workChannel := make(chan WorkItem, len(*Workqueue.Queue)) doneChannel := make(chan bool) for worker := 0; worker < testConfig.ParallelClients; worker++ { @@ -110,6 +110,9 @@ func PerfTest(testConfig *common.TestCaseConfiguration, Workqueue *Workqueue) { for _, work := range *Workqueue.Queue { work.Clean() } + for bucket := uint64(0); bucket < testConfig.Buckets.NumberMax; bucket++ { + deleteBucket(housekeepingSvc, fmt.Sprintf("%s%s%d", workerID, testConfig.BucketPrefix, bucket)) + } log.Info("Housekeeping finished") } } diff --git a/worker/workItems.go b/worker/workItems.go index 5b58faf..c139be5 100644 --- a/worker/workItems.go +++ b/worker/workItems.go @@ -154,34 +154,22 @@ func (op Stopper) Do() error { // Clean removes the objects and buckets left from the previous ReadOperation func (op ReadOperation) Clean() error { - err := deleteObject(housekeepingSvc, op.ObjectName, op.Bucket) - if err != nil { - return err - } - return deleteBucket(housekeepingSvc, op.Bucket) + return deleteObject(housekeepingSvc, op.ObjectName, op.Bucket) } // Clean removes the objects and buckets left from the previous WriteOperation func (op WriteOperation) Clean() error { - err := deleteObject(housekeepingSvc, op.ObjectName, op.Bucket) - if err != nil { - return err - } - return deleteBucket(housekeepingSvc, op.Bucket) + return deleteObject(housekeepingSvc, op.ObjectName, op.Bucket) } // Clean removes the objects and buckets left from the previous ListOperation func (op ListOperation) Clean() error { - err := deleteObject(housekeepingSvc, op.ObjectName, op.Bucket) - if err != nil { - return err - } - return deleteBucket(housekeepingSvc, op.Bucket) + return deleteObject(housekeepingSvc, op.ObjectName, op.Bucket) } // Clean removes the objects and buckets left from the previous DeleteOperation func (op DeleteOperation) Clean() error { - return deleteBucket(housekeepingSvc, op.Bucket) + return nil } // Clean does nothing here From 9d75632961a7b6ab90648483837f69f43111dbbe Mon Sep 17 00:00:00 2001 From: Chris Blum Date: Mon, 15 Jul 2019 16:51:35 +0200 Subject: [PATCH 4/6] Set workCancel() only when needed --- worker/main.go | 2 ++ worker/workItems.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/worker/main.go b/worker/main.go index df27d5a..da5f2cd 100644 --- a/worker/main.go +++ b/worker/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "encoding/json" "flag" "fmt" @@ -118,6 +119,7 @@ func PerfTest(testConfig *common.TestCaseConfiguration, Workqueue *Workqueue, wo } func workUntilTimeout(Workqueue *Workqueue, workChannel chan WorkItem, runtime time.Duration) { + workContext, WorkCancel = context.WithCancel(context.Background()) timer := time.NewTimer(runtime) for { for _, work := range *Workqueue.Queue { diff --git a/worker/workItems.go b/worker/workItems.go index c139be5..670452b 100644 --- a/worker/workItems.go +++ b/worker/workItems.go @@ -75,7 +75,7 @@ func GetNextOperation(Queue *Workqueue) string { } func init() { - workContext, WorkCancel = context.WithCancel(context.Background()) + workContext = context.Background() } var workContext context.Context From 46164a84cf7f390811980bffe622acb95ad05fbc Mon Sep 17 00:00:00 2001 From: Chris Blum Date: Mon, 15 Jul 2019 17:28:03 +0200 Subject: [PATCH 5/6] Do not recreate Prometheus exporter between tests --- worker/s3.go | 48 ++++++++++++++++++++++++++---------------------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/worker/s3.go b/worker/s3.go index fb200fc..ca3b6d3 100644 --- a/worker/s3.go +++ b/worker/s3.go @@ -22,10 +22,9 @@ import ( var svc, housekeepingSvc *s3.S3 var ctx context.Context +var hc *http.Client -// InitS3 initialises the S3 session -// Also starts the Prometheus exporter on Port 8888 -func InitS3(config common.S3Configuration) { +func init() { // Then create the prometheus stat exporter pe, err := prometheus.NewExporter(prometheus.Options{ Namespace: "gosbench", @@ -36,13 +35,36 @@ func InitS3(config common.S3Configuration) { if err != nil { log.WithError(err).Fatalf("Failed to create the Prometheus exporter:") } + hc = &http.Client{Transport: new(ochttp.Transport)} + + if err := view.Register([]*view.View{ + ochttp.ClientSentBytesDistribution, + ochttp.ClientReceivedBytesDistribution, + ochttp.ClientRoundtripLatencyDistribution, + ochttp.ClientCompletedCount, + }...); err != nil { + log.WithError(err).Fatalf("Failed to register HTTP client views:") + } + view.RegisterExporter(pe) + go func() { + mux := http.NewServeMux() + mux.Handle("/metrics", pe) + // http://localhost:8888/metrics + if err := http.ListenAndServe(":8888", mux); err != nil { + log.WithError(err).Fatalf("Failed to run Prometheus /metrics endpoint:") + } + }() + +} +// InitS3 initialises the S3 session +// Also starts the Prometheus exporter on Port 8888 +func InitS3(config common.S3Configuration) { // All clients require a Session. The Session provides the client with // shared configuration such as region, endpoint, and credentials. A // Session should be shared where possible to take advantage of // configuration and credential caching. See the session package for // more information. - hc := &http.Client{Transport: new(ochttp.Transport)} sess := session.Must(session.NewSession(&aws.Config{ HTTPClient: hc, @@ -61,24 +83,6 @@ func InitS3(config common.S3Configuration) { S3ForcePathStyle: aws.Bool(true), })) - if err := view.Register([]*view.View{ - ochttp.ClientSentBytesDistribution, - ochttp.ClientReceivedBytesDistribution, - ochttp.ClientRoundtripLatencyDistribution, - ochttp.ClientCompletedCount, - }...); err != nil { - log.WithError(err).Fatalf("Failed to register HTTP client views:") - } - view.RegisterExporter(pe) - go func() { - mux := http.NewServeMux() - mux.Handle("/metrics", pe) - // http://localhost:8888/metrics - if err := http.ListenAndServe(":8888", mux); err != nil { - log.WithError(err).Fatalf("Failed to run Prometheus /metrics endpoint:") - } - }() - // Create a new instance of the service's client with a Session. // Optional aws.Config values can also be provided as variadic arguments // to the New function. This option allows you to provide service From 56a23e50fa9c9d6f15a8bf3164d9e4492b925b0f Mon Sep 17 00:00:00 2001 From: Chris Blum Date: Mon, 15 Jul 2019 18:25:23 +0200 Subject: [PATCH 6/6] Fix: Delete operations should have workerID prefix too! --- worker/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker/main.go b/worker/main.go index da5f2cd..9167c91 100644 --- a/worker/main.go +++ b/worker/main.go @@ -221,8 +221,8 @@ func fillWorkqueue(testConfig *common.TestCaseConfiguration, Workqueue *Workqueu case "delete": IncreaseOperationValue(nextOp, 1/float64(testConfig.DeleteWeight), Workqueue) new := DeleteOperation{ - Bucket: fmt.Sprintf("%s%d", testConfig.BucketPrefix, bucket), - ObjectName: fmt.Sprintf("%s%d", testConfig.ObjectPrefix, object), + Bucket: fmt.Sprintf("%s%s%d", workerID, testConfig.BucketPrefix, bucket), + ObjectName: fmt.Sprintf("%s%s%d", workerID, testConfig.ObjectPrefix, object), ObjectSize: objectSize, } *Workqueue.Queue = append(*Workqueue.Queue, new)