Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Devops workload extensions #5

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@
.vscode
*~

bin

# High Dynamic Range (HDR) Histogram files
*.hdr
3 changes: 3 additions & 0 deletions cmd/tsbs_load_clickhouse/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ func init() {
Host: viper.GetString("host"),
User: viper.GetString("user"),
Password: viper.GetString("password"),
Port: viper.GetInt("port"),
LogBatches: viper.GetBool("log-batches"),
Debug: viper.GetInt("debug"),
DbName: loaderConf.DBName,
Secure: viper.GetBool("secure"),
SkipVerify: viper.GetBool("skip-verify"),
}

loader = load.GetBenchmarkRunner(loaderConf)
Expand Down
39 changes: 31 additions & 8 deletions cmd/tsbs_load_influx/creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,16 @@ func (d *dbCreator) DBExists(dbName string) bool {
}

func (d *dbCreator) listDatabases() ([]string, error) {
client := http.Client{}
u := fmt.Sprintf("%s/query?q=show%%20databases", d.daemonURL)
resp, err := http.Get(u)
req, err := http.NewRequest("GET", u, nil)
if authToken != "" {
req.Header = http.Header{
headerAuthorization: []string{fmt.Sprintf("Token %s", authToken)},
}
}
resp, err := client.Do(req)

if err != nil {
return nil, fmt.Errorf("listDatabases error: %s", err.Error())
}
Expand All @@ -61,20 +69,30 @@ func (d *dbCreator) listDatabases() ([]string, error) {
}

ret := []string{}
for _, nestedName := range listing.Results[0].Series[0].Values {
name := nestedName[0]
// the _internal database is skipped:
if name == "_internal" {
continue
if len(listing.Results) > 0 {
for _, nestedName := range listing.Results[0].Series[0].Values {
name := nestedName[0]
// the _internal database is skipped:
if name == "_internal" {
continue
}
ret = append(ret, name)
}
ret = append(ret, name)
}
return ret, nil
}

func (d *dbCreator) RemoveOldDB(dbName string) error {
u := fmt.Sprintf("%s/query?q=drop+database+%s", d.daemonURL, dbName)
resp, err := http.Post(u, "text/plain", nil)
client := http.Client{}
req, err := http.NewRequest("POST", u, nil)
if authToken != "" {
req.Header = http.Header{
"Content-Type": []string{"text/plain"},
headerAuthorization: []string{fmt.Sprintf("Token %s", authToken)},
}
}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("drop db error: %s", err.Error())
}
Expand All @@ -99,6 +117,11 @@ func (d *dbCreator) CreateDB(dbName string) error {
u.RawQuery = v.Encode()

req, err := http.NewRequest("GET", u.String(), nil)
if authToken != "" {
req.Header = http.Header{
headerAuthorization: []string{fmt.Sprintf("Token %s", authToken)},
}
}
if err != nil {
return err
}
Expand Down
8 changes: 6 additions & 2 deletions cmd/tsbs_load_influx/http_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
const (
httpClientName = "tsbs_load_influx"
headerContentEncoding = "Content-Encoding"
headerAuthorization = "Authorization"
headerGzip = "gzip"
)

Expand Down Expand Up @@ -65,13 +66,16 @@ var (
textPlain = []byte("text/plain")
)

func (w *HTTPWriter) initializeReq(req *fasthttp.Request, body []byte, isGzip bool) {
func (w *HTTPWriter) initializeReq(req *fasthttp.Request, body []byte, isGzip bool, authToken string) {
req.Header.SetContentTypeBytes(textPlain)
req.Header.SetMethodBytes(methodPost)
req.Header.SetRequestURIBytes(w.url)
if isGzip {
req.Header.Add(headerContentEncoding, headerGzip)
}
if authToken != "" {
req.Header.Add(headerAuthorization, fmt.Sprintf("Token %s", authToken))
}
req.SetBody(body)
}

Expand All @@ -96,7 +100,7 @@ func (w *HTTPWriter) executeReq(req *fasthttp.Request, resp *fasthttp.Response)
func (w *HTTPWriter) WriteLineProtocol(body []byte, isGzip bool) (int64, error) {
req := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(req)
w.initializeReq(req, body, isGzip)
w.initializeReq(req, body, isGzip, authToken)

resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(resp)
Expand Down
10 changes: 5 additions & 5 deletions cmd/tsbs_load_influx/http_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestHTTPWriterInitializeReq(t *testing.T) {
defer fasthttp.ReleaseRequest(req)
w := NewHTTPWriter(testConf, testConsistency)
body := "this is a test body"
w.initializeReq(req, []byte(body), false)
w.initializeReq(req, []byte(body), false, "")

if got := string(req.Body()); got != body {
t.Errorf("non-gzip: body not correct: got '%s' want '%s'", got, body)
Expand All @@ -129,7 +129,7 @@ func TestHTTPWriterInitializeReq(t *testing.T) {
t.Errorf("non-gzip: Content-Encoding is not empty: got %s", got)
}

w.initializeReq(req, []byte(body), true)
w.initializeReq(req, []byte(body), true, "")
if got := string(req.Header.Peek(headerContentEncoding)); got != headerGzip {
t.Errorf("gzip: Content-Encoding is not correct: got %s want %s", got, headerGzip)
}
Expand All @@ -144,7 +144,7 @@ func TestHTTPWriterExecuteReq(t *testing.T) {
w := NewHTTPWriter(testConf, testConsistency)
body := "this is a test body"
normalURL := w.url // save for later modification
w.initializeReq(req, []byte(body), false)
w.initializeReq(req, []byte(body), false, "")
resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(resp)
lat, err := w.executeReq(req, resp)
Expand All @@ -161,7 +161,7 @@ func TestHTTPWriterExecuteReq(t *testing.T) {
w.url = []byte(fmt.Sprintf("%s&%s=true", string(normalURL), shouldBackoffParam))
req = fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(req)
w.initializeReq(req, []byte(body), false)
w.initializeReq(req, []byte(body), false, "")
lat, err = w.executeReq(req, resp)
if err != errBackoff {
t.Errorf("unexpected error response received (not backoff error): %v", err)
Expand All @@ -176,7 +176,7 @@ func TestHTTPWriterExecuteReq(t *testing.T) {
w.url = []byte(fmt.Sprintf("%s&%s=true", string(normalURL), shouldInvalidParam))
req = fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(req)
w.initializeReq(req, []byte(body), false)
w.initializeReq(req, []byte(body), false, "")
lat, err = w.executeReq(req, resp)
if err == nil {
t.Errorf("unexpected non-error response received")
Expand Down
11 changes: 11 additions & 0 deletions cmd/tsbs_load_influx/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ var (
useGzip bool
doAbortOnExist bool
consistency string
authToken string // InfluxDB v2
bucketId string // InfluxDB v2
orgId string // InfluxDB v2
)

// Global vars
Expand Down Expand Up @@ -73,13 +76,21 @@ func init() {
csvDaemonURLs = viper.GetString("urls")
replicationFactor = viper.GetInt("replication-factor")
consistency = viper.GetString("consistency")
authToken = viper.GetString("auth-token")
orgId = viper.GetString("org")
backoff = viper.GetDuration("backoff")
useGzip = viper.GetBool("gzip")

if _, ok := consistencyChoices[consistency]; !ok {
log.Fatalf("invalid consistency settings")
}

if authToken != "" {
log.Println("Using Authorization header in benchmark")
} else {
log.Println("Given no Authorization header was provided will not send it in benchmark")
}

daemonURLs = strings.Split(csvDaemonURLs, ",")
if len(daemonURLs) == 0 {
log.Fatal("missing 'urls' flag")
Expand Down
21 changes: 14 additions & 7 deletions cmd/tsbs_run_queries_clickhouse/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (

// Program option vars:
var (
chConnect string
hostsList []string
user string
password string

chConnect string
hostsList []string
user string
password string
port int
secure bool
skipVerify bool
showExplain bool
)

Expand All @@ -45,7 +47,9 @@ func init() {
"Comma separated list of ClickHouse hosts (pass multiple values for sharding reads on a multi-node setup)")
pflag.String("user", "default", "User to connect to ClickHouse as")
pflag.String("password", "", "Password to connect to ClickHouse")

pflag.Int("port", 9000, "Port of ClickHouse instance")
pflag.Bool("secure", false, "establish secure connection (default false)")
pflag.Bool("skip-verify", false, "skip certificate verification (default false)")
pflag.Parse()

err := utils.SetupConfigFile()
Expand All @@ -62,6 +66,9 @@ func init() {
hosts = viper.GetString("hosts")
user = viper.GetString("user")
password = viper.GetString("password")
port = viper.GetInt("port")
secure = viper.GetBool("secure")
skipVerify = viper.GetBool("skip-verify")

// Parse comma separated string of hosts and put in a slice (for multi-node setups)
for _, host := range strings.Split(hosts, ",") {
Expand All @@ -84,7 +91,7 @@ func getConnectString(workerNumber int) string {
// Round robin the host/worker assignment by assigning a host based on workerNumber % totalNumberOfHosts
host := hostsList[workerNumber%len(hostsList)]

return fmt.Sprintf("tcp://%s:9000?username=%s&password=%s&database=%s", host, user, password, runner.DatabaseName())
return fmt.Sprintf("tcp://%s:%d?username=%s&password=%s&database=%s&secure=%t&skip_verify=%t", host, port, user, password, runner.DatabaseName(), secure, skipVerify)
}

// prettyPrintResponse prints a Query and its response in JSON format with two
Expand Down
13 changes: 11 additions & 2 deletions cmd/tsbs_run_queries_influx/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
)

var bytesSlash = []byte("/") // heap optimization
var headerAuthorization = "Authorization"

// HTTPClient is a reusable HTTP Client.
type HTTPClient struct {
Expand All @@ -22,6 +23,7 @@ type HTTPClient struct {
Host []byte
HostString string
uri []byte
authToken string
}

// HTTPClientDoOptions wraps options uses when calling `Do`.
Expand All @@ -46,12 +48,17 @@ func getHttpClient() *http.Client {
}

// NewHTTPClient creates a new HTTPClient.
func NewHTTPClient(host string) *HTTPClient {
func NewHTTPClient(host string, authToken string) *HTTPClient {
token := ""
if authToken != "" {
token = fmt.Sprintf("Token %s", authToken)
}
return &HTTPClient{
client: getHttpClient(),
Host: []byte(host),
HostString: host,
uri: []byte{}, // heap optimization
authToken: token,
}
}

Expand All @@ -74,7 +81,9 @@ func (w *HTTPClient) Do(q *query.HTTP, opts *HTTPClientDoOptions) (lag float64,
if err != nil {
panic(err)
}

if w.authToken != "" {
req.Header.Add(headerAuthorization, w.authToken)
}
// Perform the request while tracking latency:
start := time.Now()
resp, err := w.client.Do(req)
Expand Down
11 changes: 9 additions & 2 deletions cmd/tsbs_run_queries_influx/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
var (
daemonUrls []string
chunkSize uint64
authToken string
)

// Global vars:
Expand All @@ -35,6 +36,7 @@ func init() {

pflag.String("urls", "http://localhost:8086", "Daemon URLs, comma-separated. Will be used in a round-robin fashion.")
pflag.Uint64("chunk-response-size", 0, "Number of series to chunk results into. 0 means no chunking.")
pflag.String("auth-token", "", "Use the Authorization header with the Token scheme to provide your token to InfluxDB. If empty will not send the Authorization header.")

pflag.Parse()

Expand All @@ -49,8 +51,13 @@ func init() {
}

csvDaemonUrls = viper.GetString("urls")
authToken = viper.GetString("auth-token")
chunkSize = viper.GetUint64("chunk-response-size")

if authToken != "" {
log.Println("Using Authorization header in benchmark")
} else {
log.Println("Given no Authorization header was provided will not send it in benchmark")
}
daemonUrls = strings.Split(csvDaemonUrls, ",")
if len(daemonUrls) == 0 {
log.Fatal("missing 'urls' flag")
Expand Down Expand Up @@ -78,7 +85,7 @@ func (p *processor) Init(workerNumber int) {
database: runner.DatabaseName(),
}
url := daemonUrls[workerNumber%len(daemonUrls)]
p.w = NewHTTPClient(url)
p.w = NewHTTPClient(url, authToken)
}

func (p *processor) ProcessQuery(q query.Query, _ bool) ([]*query.Stat, error) {
Expand Down
23 changes: 23 additions & 0 deletions docs/clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ cpu,1451606400000000000,58.1317132304976170,2.6224297271376256,24.99694950699478

Hostname of the ClickHouse server.

### `-port` (type: `int`, default: `9000`)

Port of the ClickHouse server. The default port is 9000

#### `-user` (type: `string`, default: `default`)

User to use to connect to the ClickHouse server. Yes, default user is really called **default**
Expand All @@ -46,6 +50,13 @@ User to use to connect to the ClickHouse server. Yes, default user is really cal

Password to use to connect to the ClickHouse server. Default password is empty

#### `-secure` (type: `boolean`, default: `false`)

Establish a secure connection. (default is false)

#### `-skip-verify` (type: `boolean`, default: `false`)

Skip certificate verification. (default is false)

### Miscellaneous

Expand All @@ -68,6 +79,10 @@ system performance while writing data to the database.
Comma separated list of hostnames for the ClickHouse servers.
Workers are connected to a server in a round-robin fashion.

### `-port` (type: `int`, default: `9000`)

Port of the ClickHouse server. The default port is 9000

#### `-user` (type: `string`, default: `default`)

User to use to connect to the ClickHouse server. Yes, default user is really called **default**
Expand All @@ -76,6 +91,14 @@ User to use to connect to the ClickHouse server. Yes, default user is really cal

Password to use to connect to the ClickHouse server. Default password is empty

#### `-secure` (type: `boolean`, default: `false`)

Establish a secure connection. (default is false)

#### `-skip-verify` (type: `boolean`, default: `false`)

Skip certificate verification. (default is false)

---

## How to run test. Ubuntu 16.04 LTS example
Expand Down
Loading