Skip to content

Commit

Permalink
Merge branch 'master' of github.com:mulbc/gosbench
Browse files Browse the repository at this point in the history
  • Loading branch information
mulbc committed Jul 24, 2019
2 parents 5f67bd4 + 56a23e5 commit b3c76cc
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 57 deletions.
19 changes: 14 additions & 5 deletions worker/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"encoding/json"
"flag"
"fmt"
Expand All @@ -15,7 +16,7 @@ import (
var config common.WorkerConf

func init() {
log.SetLevel(log.DebugLevel)
log.SetLevel(log.InfoLevel)
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
})
Expand Down Expand Up @@ -79,7 +80,7 @@ func connectToServer(serverAddress string) error {
return nil
}
log.Info("Starting to work")
PerfTest(config.Test, Workqueue)
PerfTest(config.Test, Workqueue, config.WorkerID)
encoder.Encode(common.WorkerMessage{Message: "work done"})
// Work is done - return to being a ready worker by reconnecting
return nil
Expand All @@ -88,7 +89,7 @@ func connectToServer(serverAddress string) error {
}

// PerfTest runs a performance test as configured in testConfig
func PerfTest(testConfig *common.TestCaseConfiguration, Workqueue *Workqueue) {
func PerfTest(testConfig *common.TestCaseConfiguration, Workqueue *Workqueue, workerID string) {
workChannel := make(chan WorkItem, len(*Workqueue.Queue))
doneChannel := make(chan bool)
for worker := 0; worker < testConfig.ParallelClients; worker++ {
Expand All @@ -110,11 +111,15 @@ func PerfTest(testConfig *common.TestCaseConfiguration, Workqueue *Workqueue) {
for _, work := range *Workqueue.Queue {
work.Clean()
}
for bucket := uint64(0); bucket < testConfig.Buckets.NumberMax; bucket++ {
deleteBucket(housekeepingSvc, fmt.Sprintf("%s%s%d", workerID, testConfig.BucketPrefix, bucket))
}
log.Info("Housekeeping finished")
}
}

func workUntilTimeout(Workqueue *Workqueue, workChannel chan WorkItem, runtime time.Duration) {
workContext, WorkCancel = context.WithCancel(context.Background())
timer := time.NewTimer(runtime)
for {
for _, work := range *Workqueue.Queue {
Expand Down Expand Up @@ -179,6 +184,10 @@ func fillWorkqueue(testConfig *common.TestCaseConfiguration, Workqueue *Workqueu

bucketCount := common.EvaluateDistribution(testConfig.Buckets.NumberMin, testConfig.Buckets.NumberMax, &testConfig.Buckets.NumberLast, 1, testConfig.Buckets.NumberDistribution)
for bucket := uint64(0); bucket < bucketCount; bucket++ {
err := createBucket(housekeepingSvc, fmt.Sprintf("%s%s%d", workerID, testConfig.BucketPrefix, bucket))
if err != nil {
log.WithError(err).WithField("bucket", fmt.Sprintf("%s%s%d", workerID, testConfig.BucketPrefix, bucket)).Error("Error when creating bucket")
}
objectCount := common.EvaluateDistribution(testConfig.Objects.NumberMin, testConfig.Objects.NumberMax, &testConfig.Objects.NumberLast, 1, testConfig.Objects.NumberDistribution)
for object := uint64(0); object < objectCount; object++ {
objectSize := common.EvaluateDistribution(testConfig.Objects.SizeMin, testConfig.Objects.SizeMax, &testConfig.Objects.SizeLast, 1, testConfig.Objects.SizeDistribution)
Expand Down Expand Up @@ -212,8 +221,8 @@ func fillWorkqueue(testConfig *common.TestCaseConfiguration, Workqueue *Workqueu
case "delete":
IncreaseOperationValue(nextOp, 1/float64(testConfig.DeleteWeight), Workqueue)
new := DeleteOperation{
Bucket: fmt.Sprintf("%s%d", testConfig.BucketPrefix, bucket),
ObjectName: fmt.Sprintf("%s%d", testConfig.ObjectPrefix, object),
Bucket: fmt.Sprintf("%s%s%d", workerID, testConfig.BucketPrefix, bucket),
ObjectName: fmt.Sprintf("%s%s%d", workerID, testConfig.ObjectPrefix, object),
ObjectSize: objectSize,
}
*Workqueue.Queue = append(*Workqueue.Queue, new)
Expand Down
48 changes: 26 additions & 22 deletions worker/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ import (

var svc, housekeepingSvc *s3.S3
var ctx context.Context
var hc *http.Client

// InitS3 initialises the S3 session
// Also starts the Prometheus exporter on Port 8888
func InitS3(config common.S3Configuration) {
func init() {
// Then create the prometheus stat exporter
pe, err := prometheus.NewExporter(prometheus.Options{
Namespace: "gosbench",
Expand All @@ -36,13 +35,36 @@ func InitS3(config common.S3Configuration) {
if err != nil {
log.WithError(err).Fatalf("Failed to create the Prometheus exporter:")
}
hc = &http.Client{Transport: new(ochttp.Transport)}

if err := view.Register([]*view.View{
ochttp.ClientSentBytesDistribution,
ochttp.ClientReceivedBytesDistribution,
ochttp.ClientRoundtripLatencyDistribution,
ochttp.ClientCompletedCount,
}...); err != nil {
log.WithError(err).Fatalf("Failed to register HTTP client views:")
}
view.RegisterExporter(pe)
go func() {
mux := http.NewServeMux()
mux.Handle("/metrics", pe)
// http://localhost:8888/metrics
if err := http.ListenAndServe(":8888", mux); err != nil {
log.WithError(err).Fatalf("Failed to run Prometheus /metrics endpoint:")
}
}()

}

// InitS3 initialises the S3 session
// Also starts the Prometheus exporter on Port 8888
func InitS3(config common.S3Configuration) {
// All clients require a Session. The Session provides the client with
// shared configuration such as region, endpoint, and credentials. A
// Session should be shared where possible to take advantage of
// configuration and credential caching. See the session package for
// more information.
hc := &http.Client{Transport: new(ochttp.Transport)}

sess := session.Must(session.NewSession(&aws.Config{
HTTPClient: hc,
Expand All @@ -61,24 +83,6 @@ func InitS3(config common.S3Configuration) {
S3ForcePathStyle: aws.Bool(true),
}))

if err := view.Register([]*view.View{
ochttp.ClientSentBytesDistribution,
ochttp.ClientReceivedBytesDistribution,
ochttp.ClientRoundtripLatencyDistribution,
ochttp.ClientCompletedCount,
}...); err != nil {
log.WithError(err).Fatalf("Failed to register HTTP client views:")
}
view.RegisterExporter(pe)
go func() {
mux := http.NewServeMux()
mux.Handle("/metrics", pe)
// http://localhost:8888/metrics
if err := http.ListenAndServe(":8888", mux); err != nil {
log.WithError(err).Fatalf("Failed to run Prometheus /metrics endpoint:")
}
}()

// Create a new instance of the service's client with a Session.
// Optional aws.Config values can also be provided as variadic arguments
// to the New function. This option allows you to provide service
Expand Down
36 changes: 6 additions & 30 deletions worker/workItems.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func GetNextOperation(Queue *Workqueue) string {
}

func init() {
workContext, WorkCancel = context.WithCancel(context.Background())
workContext = context.Background()
}

var workContext context.Context
Expand All @@ -97,36 +97,24 @@ func IncreaseOperationValue(operation string, value float64, Queue *Workqueue) e
// Prepare prepares the execution of the ReadOperation
func (op ReadOperation) Prepare() error {
log.WithField("bucket", op.Bucket).WithField("object", op.ObjectName).Debug("Preparing ReadOperation")
err := createBucket(housekeepingSvc, op.Bucket)
if err != nil {
return err
}
return putObject(housekeepingSvc, op.ObjectName, bytes.NewReader(generateRandomBytes(op.ObjectSize)), op.Bucket)
}

// Prepare prepares the execution of the WriteOperation
func (op WriteOperation) Prepare() error {
log.WithField("bucket", op.Bucket).WithField("object", op.ObjectName).Debug("Preparing WriteOperation")
return createBucket(housekeepingSvc, op.Bucket)
return nil
}

// Prepare prepares the execution of the ListOperation
func (op ListOperation) Prepare() error {
log.WithField("bucket", op.Bucket).WithField("object", op.ObjectName).Debug("Preparing ListOperation")
err := createBucket(housekeepingSvc, op.Bucket)
if err != nil {
return err
}
return putObject(housekeepingSvc, op.ObjectName, bytes.NewReader(generateRandomBytes(op.ObjectSize)), op.Bucket)
}

// Prepare prepares the execution of the DeleteOperation
func (op DeleteOperation) Prepare() error {
log.WithField("bucket", op.Bucket).WithField("object", op.ObjectName).Debug("Preparing DeleteOperation")
err := createBucket(housekeepingSvc, op.Bucket)
if err != nil {
return err
}
return putObject(housekeepingSvc, op.ObjectName, bytes.NewReader(generateRandomBytes(op.ObjectSize)), op.Bucket)
}

Expand Down Expand Up @@ -166,34 +154,22 @@ func (op Stopper) Do() error {

// Clean removes the objects and buckets left from the previous ReadOperation
func (op ReadOperation) Clean() error {
err := deleteObject(housekeepingSvc, op.ObjectName, op.Bucket)
if err != nil {
return err
}
return deleteBucket(housekeepingSvc, op.Bucket)
return deleteObject(housekeepingSvc, op.ObjectName, op.Bucket)
}

// Clean removes the objects and buckets left from the previous WriteOperation
func (op WriteOperation) Clean() error {
err := deleteObject(housekeepingSvc, op.ObjectName, op.Bucket)
if err != nil {
return err
}
return deleteBucket(housekeepingSvc, op.Bucket)
return deleteObject(housekeepingSvc, op.ObjectName, op.Bucket)
}

// Clean removes the objects and buckets left from the previous ListOperation
func (op ListOperation) Clean() error {
err := deleteObject(housekeepingSvc, op.ObjectName, op.Bucket)
if err != nil {
return err
}
return deleteBucket(housekeepingSvc, op.Bucket)
return deleteObject(housekeepingSvc, op.ObjectName, op.Bucket)
}

// Clean removes the objects and buckets left from the previous DeleteOperation
func (op DeleteOperation) Clean() error {
return deleteBucket(housekeepingSvc, op.Bucket)
return nil
}

// Clean does nothing here
Expand Down

0 comments on commit b3c76cc

Please sign in to comment.