diff --git a/worker/main.go b/worker/main.go index cb57dde..9167c91 100644 --- a/worker/main.go +++ b/worker/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "encoding/json" "flag" "fmt" @@ -15,7 +16,7 @@ import ( var config common.WorkerConf func init() { - log.SetLevel(log.DebugLevel) + log.SetLevel(log.InfoLevel) log.SetFormatter(&log.TextFormatter{ FullTimestamp: true, }) @@ -79,7 +80,7 @@ func connectToServer(serverAddress string) error { return nil } log.Info("Starting to work") - PerfTest(config.Test, Workqueue) + PerfTest(config.Test, Workqueue, config.WorkerID) encoder.Encode(common.WorkerMessage{Message: "work done"}) // Work is done - return to being a ready worker by reconnecting return nil @@ -88,7 +89,7 @@ func connectToServer(serverAddress string) error { } // PerfTest runs a performance test as configured in testConfig -func PerfTest(testConfig *common.TestCaseConfiguration, Workqueue *Workqueue) { +func PerfTest(testConfig *common.TestCaseConfiguration, Workqueue *Workqueue, workerID string) { workChannel := make(chan WorkItem, len(*Workqueue.Queue)) doneChannel := make(chan bool) for worker := 0; worker < testConfig.ParallelClients; worker++ { @@ -110,11 +111,15 @@ func PerfTest(testConfig *common.TestCaseConfiguration, Workqueue *Workqueue) { for _, work := range *Workqueue.Queue { work.Clean() } + for bucket := uint64(0); bucket < testConfig.Buckets.NumberMax; bucket++ { + deleteBucket(housekeepingSvc, fmt.Sprintf("%s%s%d", workerID, testConfig.BucketPrefix, bucket)) + } log.Info("Housekeeping finished") } } func workUntilTimeout(Workqueue *Workqueue, workChannel chan WorkItem, runtime time.Duration) { + workContext, WorkCancel = context.WithCancel(context.Background()) timer := time.NewTimer(runtime) for { for _, work := range *Workqueue.Queue { @@ -179,6 +184,10 @@ func fillWorkqueue(testConfig *common.TestCaseConfiguration, Workqueue *Workqueu bucketCount := common.EvaluateDistribution(testConfig.Buckets.NumberMin, testConfig.Buckets.NumberMax, &testConfig.Buckets.NumberLast, 1, testConfig.Buckets.NumberDistribution) for bucket := uint64(0); bucket < bucketCount; bucket++ { + err := createBucket(housekeepingSvc, fmt.Sprintf("%s%s%d", workerID, testConfig.BucketPrefix, bucket)) + if err != nil { + log.WithError(err).WithField("bucket", fmt.Sprintf("%s%s%d", workerID, testConfig.BucketPrefix, bucket)).Error("Error when creating bucket") + } objectCount := common.EvaluateDistribution(testConfig.Objects.NumberMin, testConfig.Objects.NumberMax, &testConfig.Objects.NumberLast, 1, testConfig.Objects.NumberDistribution) for object := uint64(0); object < objectCount; object++ { objectSize := common.EvaluateDistribution(testConfig.Objects.SizeMin, testConfig.Objects.SizeMax, &testConfig.Objects.SizeLast, 1, testConfig.Objects.SizeDistribution) @@ -212,8 +221,8 @@ func fillWorkqueue(testConfig *common.TestCaseConfiguration, Workqueue *Workqueu case "delete": IncreaseOperationValue(nextOp, 1/float64(testConfig.DeleteWeight), Workqueue) new := DeleteOperation{ - Bucket: fmt.Sprintf("%s%d", testConfig.BucketPrefix, bucket), - ObjectName: fmt.Sprintf("%s%d", testConfig.ObjectPrefix, object), + Bucket: fmt.Sprintf("%s%s%d", workerID, testConfig.BucketPrefix, bucket), + ObjectName: fmt.Sprintf("%s%s%d", workerID, testConfig.ObjectPrefix, object), ObjectSize: objectSize, } *Workqueue.Queue = append(*Workqueue.Queue, new) diff --git a/worker/s3.go b/worker/s3.go index 1e6a8b5..9084ac5 100644 --- a/worker/s3.go +++ b/worker/s3.go @@ -22,10 +22,9 @@ import ( var svc, housekeepingSvc *s3.S3 var ctx context.Context +var hc *http.Client -// InitS3 initialises the S3 session -// Also starts the Prometheus exporter on Port 8888 -func InitS3(config common.S3Configuration) { +func init() { // Then create the prometheus stat exporter pe, err := prometheus.NewExporter(prometheus.Options{ Namespace: "gosbench", @@ -36,13 +35,36 @@ func InitS3(config common.S3Configuration) { if err != nil { log.WithError(err).Fatalf("Failed to create the Prometheus exporter:") } + hc = &http.Client{Transport: new(ochttp.Transport)} + + if err := view.Register([]*view.View{ + ochttp.ClientSentBytesDistribution, + ochttp.ClientReceivedBytesDistribution, + ochttp.ClientRoundtripLatencyDistribution, + ochttp.ClientCompletedCount, + }...); err != nil { + log.WithError(err).Fatalf("Failed to register HTTP client views:") + } + view.RegisterExporter(pe) + go func() { + mux := http.NewServeMux() + mux.Handle("/metrics", pe) + // http://localhost:8888/metrics + if err := http.ListenAndServe(":8888", mux); err != nil { + log.WithError(err).Fatalf("Failed to run Prometheus /metrics endpoint:") + } + }() + +} +// InitS3 initialises the S3 session +// Also starts the Prometheus exporter on Port 8888 +func InitS3(config common.S3Configuration) { // All clients require a Session. The Session provides the client with // shared configuration such as region, endpoint, and credentials. A // Session should be shared where possible to take advantage of // configuration and credential caching. See the session package for // more information. - hc := &http.Client{Transport: new(ochttp.Transport)} sess := session.Must(session.NewSession(&aws.Config{ HTTPClient: hc, @@ -61,24 +83,6 @@ func InitS3(config common.S3Configuration) { S3ForcePathStyle: aws.Bool(true), })) - if err := view.Register([]*view.View{ - ochttp.ClientSentBytesDistribution, - ochttp.ClientReceivedBytesDistribution, - ochttp.ClientRoundtripLatencyDistribution, - ochttp.ClientCompletedCount, - }...); err != nil { - log.WithError(err).Fatalf("Failed to register HTTP client views:") - } - view.RegisterExporter(pe) - go func() { - mux := http.NewServeMux() - mux.Handle("/metrics", pe) - // http://localhost:8888/metrics - if err := http.ListenAndServe(":8888", mux); err != nil { - log.WithError(err).Fatalf("Failed to run Prometheus /metrics endpoint:") - } - }() - // Create a new instance of the service's client with a Session. // Optional aws.Config values can also be provided as variadic arguments // to the New function. This option allows you to provide service diff --git a/worker/workItems.go b/worker/workItems.go index d3634bc..670452b 100644 --- a/worker/workItems.go +++ b/worker/workItems.go @@ -75,7 +75,7 @@ func GetNextOperation(Queue *Workqueue) string { } func init() { - workContext, WorkCancel = context.WithCancel(context.Background()) + workContext = context.Background() } var workContext context.Context @@ -97,36 +97,24 @@ func IncreaseOperationValue(operation string, value float64, Queue *Workqueue) e // Prepare prepares the execution of the ReadOperation func (op ReadOperation) Prepare() error { log.WithField("bucket", op.Bucket).WithField("object", op.ObjectName).Debug("Preparing ReadOperation") - err := createBucket(housekeepingSvc, op.Bucket) - if err != nil { - return err - } return putObject(housekeepingSvc, op.ObjectName, bytes.NewReader(generateRandomBytes(op.ObjectSize)), op.Bucket) } // Prepare prepares the execution of the WriteOperation func (op WriteOperation) Prepare() error { log.WithField("bucket", op.Bucket).WithField("object", op.ObjectName).Debug("Preparing WriteOperation") - return createBucket(housekeepingSvc, op.Bucket) + return nil } // Prepare prepares the execution of the ListOperation func (op ListOperation) Prepare() error { log.WithField("bucket", op.Bucket).WithField("object", op.ObjectName).Debug("Preparing ListOperation") - err := createBucket(housekeepingSvc, op.Bucket) - if err != nil { - return err - } return putObject(housekeepingSvc, op.ObjectName, bytes.NewReader(generateRandomBytes(op.ObjectSize)), op.Bucket) } // Prepare prepares the execution of the DeleteOperation func (op DeleteOperation) Prepare() error { log.WithField("bucket", op.Bucket).WithField("object", op.ObjectName).Debug("Preparing DeleteOperation") - err := createBucket(housekeepingSvc, op.Bucket) - if err != nil { - return err - } return putObject(housekeepingSvc, op.ObjectName, bytes.NewReader(generateRandomBytes(op.ObjectSize)), op.Bucket) } @@ -166,34 +154,22 @@ func (op Stopper) Do() error { // Clean removes the objects and buckets left from the previous ReadOperation func (op ReadOperation) Clean() error { - err := deleteObject(housekeepingSvc, op.ObjectName, op.Bucket) - if err != nil { - return err - } - return deleteBucket(housekeepingSvc, op.Bucket) + return deleteObject(housekeepingSvc, op.ObjectName, op.Bucket) } // Clean removes the objects and buckets left from the previous WriteOperation func (op WriteOperation) Clean() error { - err := deleteObject(housekeepingSvc, op.ObjectName, op.Bucket) - if err != nil { - return err - } - return deleteBucket(housekeepingSvc, op.Bucket) + return deleteObject(housekeepingSvc, op.ObjectName, op.Bucket) } // Clean removes the objects and buckets left from the previous ListOperation func (op ListOperation) Clean() error { - err := deleteObject(housekeepingSvc, op.ObjectName, op.Bucket) - if err != nil { - return err - } - return deleteBucket(housekeepingSvc, op.Bucket) + return deleteObject(housekeepingSvc, op.ObjectName, op.Bucket) } // Clean removes the objects and buckets left from the previous DeleteOperation func (op DeleteOperation) Clean() error { - return deleteBucket(housekeepingSvc, op.Bucket) + return nil } // Clean does nothing here