From cbb88b5915262533fdddf07d3f9134a86dca9d61 Mon Sep 17 00:00:00 2001 From: wiggin77 Date: Mon, 18 Jul 2022 18:42:16 -0400 Subject: [PATCH] add concurrency config option --- config.go | 4 +++- main.go | 66 ++++++++++++++++++++++++++++++++++++++++--------------- user.go | 4 ++-- 3 files changed, 53 insertions(+), 21 deletions(-) diff --git a/config.go b/config.go index e0b2f2c..7261a6e 100644 --- a/config.go +++ b/config.go @@ -15,7 +15,8 @@ type Config struct { TeamName string `json:"team_name"` // optional: if empty then a new team will be created teamID string - UserCount int `json:"user_count"` // number of users to create + ConcurrentUsers int `json:"concurrent_users"` // number of users to simulate concurrently + UserCount int `json:"user_count"` // number of users to create ChannelsPerUser int `json:"channels_per_user"` BoardsPerChannel int `json:"boards_per_channel"` @@ -35,6 +36,7 @@ func createDefaultConfig(filename string) error { AdminUsername: "", AdminPassword: "", TeamName: "", + ConcurrentUsers: DefaultConcurrentUsers, UserCount: DefaultUserCount, ChannelsPerUser: DefaultChannelsPerUser, BoardsPerChannel: DefaultBoardsPerChannel, diff --git a/main.go b/main.go index 6e69316..17a7a12 100644 --- a/main.go +++ b/main.go @@ -16,6 +16,7 @@ import ( ) const ( + DefaultConcurrentUsers = 3 DefaultUserCount = 5 DefaultChannelsPerUser = 3 DefaultBoardsPerChannel = 5 @@ -56,7 +57,7 @@ func main() { } defer func(l *logr.Logr) { - if lgr.IsShutdown() { + if l.IsShutdown() { return } ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) @@ -148,32 +149,61 @@ func main() { func run(ri *runInfo, workersExited chan struct{}) { defer close(workersExited) var wg sync.WaitGroup - for i := 0; i < ri.cfg.UserCount; i++ { - wg.Add(1) - username := strings.ToLower(makeName(".")) + var usersLeft int32 = int32(ri.cfg.UserCount) + concurrency := ri.cfg.ConcurrentUsers + + if !ri.quiet { + s := fmt.Sprintf("Creating %d users with %d concurrent threads.\n\n", usersLeft, concurrency) + ri.output.Write(s) + } - go func(u string) { + for i := 0; i < concurrency; i++ { + wg.Add(1) + go func() { defer wg.Done() - stats, err := runUser(u, ri) - if err != nil { - ri.logger.Error("Cannot simulate user", logr.Err(err)) - } - - if !ri.quiet { - s := fmt.Sprintf("%s: channels=%d boards=%d cards=%d text=%d\n", - username, stats.ChannelCount, stats.BoardCount, stats.CardCount, stats.TextCount) - - ri.output.Write(s) - } - }(username) + runConcurrentUsers(ri, &usersLeft) + }() } wg.Wait() } +func runConcurrentUsers(ri *runInfo, usersLeft *int32) { + fmt.Println("Starting thread") + + for { + select { + case <-ri.abort: + fmt.Println("Exiting thread (abort)") + return + default: + } + + left := atomic.AddInt32(usersLeft, -1) + if left <= 0 { + fmt.Println("Exiting thread (userLeft <= 0)") + return + } + + username := strings.ToLower(makeName(".")) + + stats, err := runUser(username, ri) + if err != nil { + ri.logger.Error("Cannot simulate user", logr.String("username", username), logr.Err(err)) + } + + if !ri.quiet { + s := fmt.Sprintf("%s: channels=%d boards=%d cards=%d text=%d remaining=%d\n", + username, stats.ChannelCount, stats.BoardCount, stats.CardCount, stats.TextCount, left) + + ri.output.Write(s) + } + } +} + func setUpInterruptHandler(cleanUp func()) { - sig := make(chan os.Signal) + sig := make(chan os.Signal, 1) signal.Notify(sig, os.Interrupt, syscall.SIGTERM) go func() { diff --git a/user.go b/user.go index b4df781..77dac83 100644 --- a/user.go +++ b/user.go @@ -60,7 +60,7 @@ func runUser(username string, ri *runInfo) (stats, error) { // create user user, err := ri.admin.CreateUser(username, ri.cfg.teamID) if err != nil { - return stats, err + return stats, fmt.Errorf("cannot create user: %w", err) } // add user to team @@ -73,7 +73,7 @@ func runUser(username string, ri *runInfo) (stats, error) { client, err := NewClient(ri.cfg.SiteURL, user.Username, password) if err != nil { - return stats, err + return stats, fmt.Errorf("cannot create client: %w", err) } // create channels, boards, cards, and content