forked from agocs/rabbit-mq-stress-tester
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
115 lines (92 loc) · 2.98 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package main
import (
"log"
"os"
"time"
"github.com/codegangsta/cli"
"github.com/streadway/amqp"
)
var totalTime int64 = 0
var totalCount int64 = 0
type MqMessage struct {
TimeNow time.Time
SequenceNumber int
Payload string
}
func main() {
app := cli.NewApp()
app.Name = "tester"
app.Usage = "Make the rabbit cry"
app.Flags = []cli.Flag{
cli.StringFlag{Name: "server, s", Value: "rabbit-mq-test.cs1cloud.internal", Usage: "Hostname for RabbitMQ server"},
cli.StringFlag{Name: "port, P", Value: "5672", Usage: "Port for RabbitMQ server"},
cli.StringFlag{Name: "user, u", Value: "guest", Usage: "user for RabbitMQ server"},
cli.StringFlag{Name: "password, pass", Value: "guest", Usage: "user pasword for RabbitMQ server"},
cli.IntFlag{Name: "producer, p", Value: 0, Usage: "Number of messages to produce, -1 to produce forever"},
cli.IntFlag{Name: "wait, w", Value: 0, Usage: "Number of nanoseconds to wait between publish events"},
cli.IntFlag{Name: "consumer, c", Value: -1, Usage: "Number of messages to consume. 0 consumes forever"},
cli.IntFlag{Name: "bytes, b", Value: 0, Usage: "number of extra bytes to add to the RabbitMQ message payload. About 50K max"},
cli.IntFlag{Name: "concurrency, n", Value: 50, Usage: "number of reader/writer Goroutines"},
cli.BoolFlag{Name: "quiet, q", Usage: "Print only errors to stdout"},
cli.BoolFlag{Name: "wait-for-ack, a", Usage: "Wait for an ack or nack after enqueueing a message"},
}
app.Action = func(c *cli.Context) error {
runApp(c)
return nil
}
app.Run(os.Args)
}
func runApp(c *cli.Context) {
println("Running!")
porto := "amqp://"
uri := porto + c.String("user") + ":" + c.String("password") + "@" + c.String("server") + ":" + c.String("port")
if c.Int("consumer") > -1 {
makeConsumers(uri, c.Int("concurrency"), c.Int("consumer"))
}
if c.Int("producer") != 0 {
config := ProducerConfig{uri, c.Int("bytes"), c.Bool("quiet"), c.Bool("wait-for-ack")}
makeProducers(c.Int("producer"), c.Int("wait"), c.Int("concurrency"), config)
}
}
func MakeQueue(c *amqp.Channel) amqp.Queue {
q, err := c.QueueDeclare("stress-test-exchange", true, false, false, false, nil)
if err != nil {
log.Fatal(err)
}
return q
}
func makeProducers(n int, wait int, concurrency int, config ProducerConfig) {
taskChan := make(chan int)
for i := 0; i < concurrency; i++ {
go Produce(config, taskChan)
}
start := time.Now()
for i := 0; i < n; i++ {
taskChan <- i
time.Sleep(time.Duration(int64(wait)))
}
time.Sleep(time.Duration(10000))
close(taskChan)
log.Printf("Finished: %s", time.Since(start))
}
func makeConsumers(uri string, concurrency int, toConsume int) {
doneChan := make(chan bool)
for i := 0; i < concurrency; i++ {
go Consume(uri, doneChan)
}
start := time.Now()
if toConsume > 0 {
for i := 0; i < toConsume; i++ {
<-doneChan
if i == 1 {
start = time.Now()
}
log.Println("Consumed: ", i)
}
} else {
for {
<-doneChan
}
}
log.Printf("Done consuming! %s", time.Since(start))
}