diff --git a/.gitignore b/.gitignore index caf0afa9a..17c00d909 100644 --- a/.gitignore +++ b/.gitignore @@ -5,5 +5,7 @@ .vscode *~ +bin + # High Dynamic Range (HDR) Histogram files *.hdr diff --git a/cmd/tsbs_load_clickhouse/main.go b/cmd/tsbs_load_clickhouse/main.go index ac7181d0c..a42b8d1c9 100644 --- a/cmd/tsbs_load_clickhouse/main.go +++ b/cmd/tsbs_load_clickhouse/main.go @@ -43,9 +43,12 @@ func init() { Host: viper.GetString("host"), User: viper.GetString("user"), Password: viper.GetString("password"), + Port: viper.GetInt("port"), LogBatches: viper.GetBool("log-batches"), Debug: viper.GetInt("debug"), DbName: loaderConf.DBName, + Secure: viper.GetBool("secure"), + SkipVerify: viper.GetBool("skip-verify"), } loader = load.GetBenchmarkRunner(loaderConf) diff --git a/cmd/tsbs_load_influx/creator.go b/cmd/tsbs_load_influx/creator.go index 28fc9a6bb..1f7f8d12c 100644 --- a/cmd/tsbs_load_influx/creator.go +++ b/cmd/tsbs_load_influx/creator.go @@ -33,8 +33,16 @@ func (d *dbCreator) DBExists(dbName string) bool { } func (d *dbCreator) listDatabases() ([]string, error) { + client := http.Client{} u := fmt.Sprintf("%s/query?q=show%%20databases", d.daemonURL) - resp, err := http.Get(u) + req, err := http.NewRequest("GET", u, nil) + if authToken != "" { + req.Header = http.Header{ + headerAuthorization: []string{fmt.Sprintf("Token %s", authToken)}, + } + } + resp, err := client.Do(req) + if err != nil { return nil, fmt.Errorf("listDatabases error: %s", err.Error()) } @@ -61,20 +69,30 @@ func (d *dbCreator) listDatabases() ([]string, error) { } ret := []string{} - for _, nestedName := range listing.Results[0].Series[0].Values { - name := nestedName[0] - // the _internal database is skipped: - if name == "_internal" { - continue + if len(listing.Results) > 0 { + for _, nestedName := range listing.Results[0].Series[0].Values { + name := nestedName[0] + // the _internal database is skipped: + if name == "_internal" { + continue + } + ret = append(ret, name) } - ret = append(ret, name) } return ret, nil } func (d *dbCreator) RemoveOldDB(dbName string) error { u := fmt.Sprintf("%s/query?q=drop+database+%s", d.daemonURL, dbName) - resp, err := http.Post(u, "text/plain", nil) + client := http.Client{} + req, err := http.NewRequest("POST", u, nil) + if authToken != "" { + req.Header = http.Header{ + "Content-Type": []string{"text/plain"}, + headerAuthorization: []string{fmt.Sprintf("Token %s", authToken)}, + } + } + resp, err := client.Do(req) if err != nil { return fmt.Errorf("drop db error: %s", err.Error()) } @@ -99,6 +117,11 @@ func (d *dbCreator) CreateDB(dbName string) error { u.RawQuery = v.Encode() req, err := http.NewRequest("GET", u.String(), nil) + if authToken != "" { + req.Header = http.Header{ + headerAuthorization: []string{fmt.Sprintf("Token %s", authToken)}, + } + } if err != nil { return err } diff --git a/cmd/tsbs_load_influx/http_writer.go b/cmd/tsbs_load_influx/http_writer.go index b56ae2d8e..a53ce989f 100644 --- a/cmd/tsbs_load_influx/http_writer.go +++ b/cmd/tsbs_load_influx/http_writer.go @@ -14,6 +14,7 @@ import ( const ( httpClientName = "tsbs_load_influx" headerContentEncoding = "Content-Encoding" + headerAuthorization = "Authorization" headerGzip = "gzip" ) @@ -65,13 +66,16 @@ var ( textPlain = []byte("text/plain") ) -func (w *HTTPWriter) initializeReq(req *fasthttp.Request, body []byte, isGzip bool) { +func (w *HTTPWriter) initializeReq(req *fasthttp.Request, body []byte, isGzip bool, authToken string) { req.Header.SetContentTypeBytes(textPlain) req.Header.SetMethodBytes(methodPost) req.Header.SetRequestURIBytes(w.url) if isGzip { req.Header.Add(headerContentEncoding, headerGzip) } + if authToken != "" { + req.Header.Add(headerAuthorization, fmt.Sprintf("Token %s", authToken)) + } req.SetBody(body) } @@ -96,7 +100,7 @@ func (w *HTTPWriter) executeReq(req *fasthttp.Request, resp *fasthttp.Response) func (w *HTTPWriter) WriteLineProtocol(body []byte, isGzip bool) (int64, error) { req := fasthttp.AcquireRequest() defer fasthttp.ReleaseRequest(req) - w.initializeReq(req, body, isGzip) + w.initializeReq(req, body, isGzip, authToken) resp := fasthttp.AcquireResponse() defer fasthttp.ReleaseResponse(resp) diff --git a/cmd/tsbs_load_influx/http_writer_test.go b/cmd/tsbs_load_influx/http_writer_test.go index 170ae4ea3..ba27656c2 100644 --- a/cmd/tsbs_load_influx/http_writer_test.go +++ b/cmd/tsbs_load_influx/http_writer_test.go @@ -114,7 +114,7 @@ func TestHTTPWriterInitializeReq(t *testing.T) { defer fasthttp.ReleaseRequest(req) w := NewHTTPWriter(testConf, testConsistency) body := "this is a test body" - w.initializeReq(req, []byte(body), false) + w.initializeReq(req, []byte(body), false, "") if got := string(req.Body()); got != body { t.Errorf("non-gzip: body not correct: got '%s' want '%s'", got, body) @@ -129,7 +129,7 @@ func TestHTTPWriterInitializeReq(t *testing.T) { t.Errorf("non-gzip: Content-Encoding is not empty: got %s", got) } - w.initializeReq(req, []byte(body), true) + w.initializeReq(req, []byte(body), true, "") if got := string(req.Header.Peek(headerContentEncoding)); got != headerGzip { t.Errorf("gzip: Content-Encoding is not correct: got %s want %s", got, headerGzip) } @@ -144,7 +144,7 @@ func TestHTTPWriterExecuteReq(t *testing.T) { w := NewHTTPWriter(testConf, testConsistency) body := "this is a test body" normalURL := w.url // save for later modification - w.initializeReq(req, []byte(body), false) + w.initializeReq(req, []byte(body), false, "") resp := fasthttp.AcquireResponse() defer fasthttp.ReleaseResponse(resp) lat, err := w.executeReq(req, resp) @@ -161,7 +161,7 @@ func TestHTTPWriterExecuteReq(t *testing.T) { w.url = []byte(fmt.Sprintf("%s&%s=true", string(normalURL), shouldBackoffParam)) req = fasthttp.AcquireRequest() defer fasthttp.ReleaseRequest(req) - w.initializeReq(req, []byte(body), false) + w.initializeReq(req, []byte(body), false, "") lat, err = w.executeReq(req, resp) if err != errBackoff { t.Errorf("unexpected error response received (not backoff error): %v", err) @@ -176,7 +176,7 @@ func TestHTTPWriterExecuteReq(t *testing.T) { w.url = []byte(fmt.Sprintf("%s&%s=true", string(normalURL), shouldInvalidParam)) req = fasthttp.AcquireRequest() defer fasthttp.ReleaseRequest(req) - w.initializeReq(req, []byte(body), false) + w.initializeReq(req, []byte(body), false, "") lat, err = w.executeReq(req, resp) if err == nil { t.Errorf("unexpected non-error response received") diff --git a/cmd/tsbs_load_influx/main.go b/cmd/tsbs_load_influx/main.go index f6b85f1b5..268ffa95c 100644 --- a/cmd/tsbs_load_influx/main.go +++ b/cmd/tsbs_load_influx/main.go @@ -30,6 +30,9 @@ var ( useGzip bool doAbortOnExist bool consistency string + authToken string // InfluxDB v2 + bucketId string // InfluxDB v2 + orgId string // InfluxDB v2 ) // Global vars @@ -73,6 +76,8 @@ func init() { csvDaemonURLs = viper.GetString("urls") replicationFactor = viper.GetInt("replication-factor") consistency = viper.GetString("consistency") + authToken = viper.GetString("auth-token") + orgId = viper.GetString("org") backoff = viper.GetDuration("backoff") useGzip = viper.GetBool("gzip") @@ -80,6 +85,12 @@ func init() { log.Fatalf("invalid consistency settings") } + if authToken != "" { + log.Println("Using Authorization header in benchmark") + } else { + log.Println("Given no Authorization header was provided will not send it in benchmark") + } + daemonURLs = strings.Split(csvDaemonURLs, ",") if len(daemonURLs) == 0 { log.Fatal("missing 'urls' flag") diff --git a/cmd/tsbs_run_queries_clickhouse/main.go b/cmd/tsbs_run_queries_clickhouse/main.go index 0058164bb..a7cbde367 100644 --- a/cmd/tsbs_run_queries_clickhouse/main.go +++ b/cmd/tsbs_run_queries_clickhouse/main.go @@ -20,11 +20,13 @@ import ( // Program option vars: var ( - chConnect string - hostsList []string - user string - password string - + chConnect string + hostsList []string + user string + password string + port int + secure bool + skipVerify bool showExplain bool ) @@ -45,7 +47,9 @@ func init() { "Comma separated list of ClickHouse hosts (pass multiple values for sharding reads on a multi-node setup)") pflag.String("user", "default", "User to connect to ClickHouse as") pflag.String("password", "", "Password to connect to ClickHouse") - + pflag.Int("port", 9000, "Port of ClickHouse instance") + pflag.Bool("secure", false, "establish secure connection (default false)") + pflag.Bool("skip-verify", false, "skip certificate verification (default false)") pflag.Parse() err := utils.SetupConfigFile() @@ -62,6 +66,9 @@ func init() { hosts = viper.GetString("hosts") user = viper.GetString("user") password = viper.GetString("password") + port = viper.GetInt("port") + secure = viper.GetBool("secure") + skipVerify = viper.GetBool("skip-verify") // Parse comma separated string of hosts and put in a slice (for multi-node setups) for _, host := range strings.Split(hosts, ",") { @@ -84,7 +91,7 @@ func getConnectString(workerNumber int) string { // Round robin the host/worker assignment by assigning a host based on workerNumber % totalNumberOfHosts host := hostsList[workerNumber%len(hostsList)] - return fmt.Sprintf("tcp://%s:9000?username=%s&password=%s&database=%s", host, user, password, runner.DatabaseName()) + return fmt.Sprintf("tcp://%s:%d?username=%s&password=%s&database=%s&secure=%t&skip_verify=%t", host, port, user, password, runner.DatabaseName(), secure, skipVerify) } // prettyPrintResponse prints a Query and its response in JSON format with two diff --git a/cmd/tsbs_run_queries_influx/http_client.go b/cmd/tsbs_run_queries_influx/http_client.go index 24b7b4827..fbfd1b33b 100644 --- a/cmd/tsbs_run_queries_influx/http_client.go +++ b/cmd/tsbs_run_queries_influx/http_client.go @@ -14,6 +14,7 @@ import ( ) var bytesSlash = []byte("/") // heap optimization +var headerAuthorization = "Authorization" // HTTPClient is a reusable HTTP Client. type HTTPClient struct { @@ -22,6 +23,7 @@ type HTTPClient struct { Host []byte HostString string uri []byte + authToken string } // HTTPClientDoOptions wraps options uses when calling `Do`. @@ -46,12 +48,17 @@ func getHttpClient() *http.Client { } // NewHTTPClient creates a new HTTPClient. -func NewHTTPClient(host string) *HTTPClient { +func NewHTTPClient(host string, authToken string) *HTTPClient { + token := "" + if authToken != "" { + token = fmt.Sprintf("Token %s", authToken) + } return &HTTPClient{ client: getHttpClient(), Host: []byte(host), HostString: host, uri: []byte{}, // heap optimization + authToken: token, } } @@ -74,7 +81,9 @@ func (w *HTTPClient) Do(q *query.HTTP, opts *HTTPClientDoOptions) (lag float64, if err != nil { panic(err) } - + if w.authToken != "" { + req.Header.Add(headerAuthorization, w.authToken) + } // Perform the request while tracking latency: start := time.Now() resp, err := w.client.Do(req) diff --git a/cmd/tsbs_run_queries_influx/main.go b/cmd/tsbs_run_queries_influx/main.go index 48a84d757..8e96cb83f 100644 --- a/cmd/tsbs_run_queries_influx/main.go +++ b/cmd/tsbs_run_queries_influx/main.go @@ -20,6 +20,7 @@ import ( var ( daemonUrls []string chunkSize uint64 + authToken string ) // Global vars: @@ -35,6 +36,7 @@ func init() { pflag.String("urls", "http://localhost:8086", "Daemon URLs, comma-separated. Will be used in a round-robin fashion.") pflag.Uint64("chunk-response-size", 0, "Number of series to chunk results into. 0 means no chunking.") + pflag.String("auth-token", "", "Use the Authorization header with the Token scheme to provide your token to InfluxDB. If empty will not send the Authorization header.") pflag.Parse() @@ -49,8 +51,13 @@ func init() { } csvDaemonUrls = viper.GetString("urls") + authToken = viper.GetString("auth-token") chunkSize = viper.GetUint64("chunk-response-size") - + if authToken != "" { + log.Println("Using Authorization header in benchmark") + } else { + log.Println("Given no Authorization header was provided will not send it in benchmark") + } daemonUrls = strings.Split(csvDaemonUrls, ",") if len(daemonUrls) == 0 { log.Fatal("missing 'urls' flag") @@ -78,7 +85,7 @@ func (p *processor) Init(workerNumber int) { database: runner.DatabaseName(), } url := daemonUrls[workerNumber%len(daemonUrls)] - p.w = NewHTTPClient(url) + p.w = NewHTTPClient(url, authToken) } func (p *processor) ProcessQuery(q query.Query, _ bool) ([]*query.Stat, error) { diff --git a/docs/clickhouse.md b/docs/clickhouse.md index 6327e2197..c9cd56e4a 100644 --- a/docs/clickhouse.md +++ b/docs/clickhouse.md @@ -38,6 +38,10 @@ cpu,1451606400000000000,58.1317132304976170,2.6224297271376256,24.99694950699478 Hostname of the ClickHouse server. +### `-port` (type: `int`, default: `9000`) + +Port of the ClickHouse server. The default port is 9000 + #### `-user` (type: `string`, default: `default`) User to use to connect to the ClickHouse server. Yes, default user is really called **default** @@ -46,6 +50,13 @@ User to use to connect to the ClickHouse server. Yes, default user is really cal Password to use to connect to the ClickHouse server. Default password is empty +#### `-secure` (type: `boolean`, default: `false`) + +Establish a secure connection. (default is false) + +#### `-skip-verify` (type: `boolean`, default: `false`) + +Skip certificate verification. (default is false) ### Miscellaneous @@ -68,6 +79,10 @@ system performance while writing data to the database. Comma separated list of hostnames for the ClickHouse servers. Workers are connected to a server in a round-robin fashion. +### `-port` (type: `int`, default: `9000`) + +Port of the ClickHouse server. The default port is 9000 + #### `-user` (type: `string`, default: `default`) User to use to connect to the ClickHouse server. Yes, default user is really called **default** @@ -76,6 +91,14 @@ User to use to connect to the ClickHouse server. Yes, default user is really cal Password to use to connect to the ClickHouse server. Default password is empty +#### `-secure` (type: `boolean`, default: `false`) + +Establish a secure connection. (default is false) + +#### `-skip-verify` (type: `boolean`, default: `false`) + +Skip certificate verification. (default is false) + --- ## How to run test. Ubuntu 16.04 LTS example diff --git a/docs/influx.md b/docs/influx.md index e7f27edba..fefe67e00 100644 --- a/docs/influx.md +++ b/docs/influx.md @@ -7,6 +7,33 @@ using the data importer (`tsbs_load_influx`), and additional flags available for the query runner (`tsbs_run_queries_influx`). **This should be read *after* the main README.** +## Setup steps InfluxDB v2 + +If on a new setup run the following command: + +```bash +influx setup +``` + +If you need to create a new bucket adjust the bucket name (`-n`) and the org name (`-o`) accordingly: + +```bash +influx bucket create -n bucket-perf -o org -r 0 +``` + +Create a DBRP mapping with the InfluxDB 1.x compatibility API ([official docs](https://docs.influxdata.com/influxdb/cloud/reference/cli/influx/v1/dbrp/create/)). + +Adjust bucket name and db accordingly: + +```bash +influx v1 dbrp create --db benchmark --rp 0 --bucket-id `influx bucket ls --name bucket-perf | awk -v i=2 -v j=1 'FNR == i {print $j}'` --default +``` + +Retrieve the auth token as follows: +```bash +influx auth list +``` + ## Data format Data generated by `tsbs_generate_data` for InfluxDB is serialized in a @@ -58,6 +85,11 @@ Whether to encode writes to the server with gzip. For best performance, encoding with gzip is the best choice, but if the server does not support or has gzip disabled, this flag should be set to false. +#### `-auth-token` (type: `string`, default: `""`) + +Use the Authorization header with the Token scheme to provide your token to InfluxDB. +If empty will not send the Authorization header. + --- ## `tsbs_run_queries_influx` Additional Flags @@ -76,3 +108,10 @@ everything in a single response. Comma-separated list of URLs to connect to for querying. Workers will be distributed in a round robin fashion across the URLs. + +### Miscellaneous + +#### `-auth-token` (type: `string`, default: `""`) + +Use the Authorization header with the Token scheme to provide your token to InfluxDB. +If empty will not send the Authorization header. diff --git a/pkg/targets/clickhouse/benchmark.go b/pkg/targets/clickhouse/benchmark.go index 3308c7004..f9b810be2 100644 --- a/pkg/targets/clickhouse/benchmark.go +++ b/pkg/targets/clickhouse/benchmark.go @@ -16,11 +16,14 @@ type ClickhouseConfig struct { Host string User string Password string + Port int LogBatches bool InTableTag bool Debug int DbName string + Secure bool + SkipVerify bool } // String values of tags and fields to insert - string representation @@ -43,10 +46,10 @@ func getConnectString(conf *ClickhouseConfig, db bool) string { // ClickHouse ex.: // tcp://host1:9000?username=user&password=qwerty&database=clicks&read_timeout=10&write_timeout=20&alt_hosts=host2:9000,host3:9000 if db { - return fmt.Sprintf("tcp://%s:9000?username=%s&password=%s&database=%s", conf.Host, conf.User, conf.Password, conf.DbName) + return fmt.Sprintf("tcp://%s:%d?username=%s&password=%s&database=%s&secure=%t&skip_verify=%t", conf.Host, conf.Port, conf.User, conf.Password, conf.DbName, conf.Secure, conf.SkipVerify) } - return fmt.Sprintf("tcp://%s:9000?username=%s&password=%s", conf.Host, conf.User, conf.Password) + return fmt.Sprintf("tcp://%s:%d?username=%s&password=%s&secure=%t&skip_verify=%t", conf.Host, conf.Port, conf.User, conf.Password, conf.Secure, conf.SkipVerify) } // Point is a single row of data keyed by which table it belongs diff --git a/pkg/targets/clickhouse/creator.go b/pkg/targets/clickhouse/creator.go index 9e991d36b..40a0475cf 100644 --- a/pkg/targets/clickhouse/creator.go +++ b/pkg/targets/clickhouse/creator.go @@ -143,7 +143,7 @@ func createMetricsTable(conf *ClickhouseConfig, db *sqlx.DB, tableName string, f tags_id UInt32, %s, additional_tags String DEFAULT '' - ) ENGINE = MergeTree(created_date, (tags_id, created_at), 8192) + ) ENGINE MergeTree() PARTITION BY toYYYYMM(created_date) ORDER BY (tags_id, created_at) SETTINGS index_granularity=8192 `, tableName, strings.Join(columnsWithType, ",")) @@ -180,7 +180,7 @@ func generateTagsTableQuery(tagNames, tagTypes []string) string { "created_at DateTime DEFAULT now(),\n"+ "id UInt32,\n"+ "%s"+ - ") ENGINE = MergeTree(created_date, (%s), 8192)", + ") ENGINE MergeTree() PARTITION BY toYYYYMM(created_date) ORDER BY (%s) SETTINGS index_granularity=8192", cols, index) } diff --git a/pkg/targets/clickhouse/implemented_target.go b/pkg/targets/clickhouse/implemented_target.go index 27cb53411..f50b5cb22 100644 --- a/pkg/targets/clickhouse/implemented_target.go +++ b/pkg/targets/clickhouse/implemented_target.go @@ -28,8 +28,11 @@ func (c clickhouseTarget) TargetSpecificFlags(flagPrefix string, flagSet *pflag. flagSet.String(flagPrefix+"host", "localhost", "Hostname of ClickHouse instance") flagSet.String(flagPrefix+"user", "default", "User to connect to ClickHouse as") flagSet.String(flagPrefix+"password", "", "Password to connect to ClickHouse") + flagSet.Int(flagPrefix+"port", 9000, "Port of ClickHouse instance") flagSet.Bool(flagPrefix+"log-batches", false, "Whether to time individual batches.") flagSet.Int(flagPrefix+"debug", 0, "Debug printing (choices: 0, 1, 2). (default 0)") + flagSet.Bool(flagPrefix+"secure", false, "establish secure connection (default false)") + flagSet.Bool(flagPrefix+"skip-verify", false, "skip certificate verification (default false)") } func (c clickhouseTarget) TargetName() string { diff --git a/pkg/targets/influx/implemented_target.go b/pkg/targets/influx/implemented_target.go index 736a37182..17cb88ba5 100644 --- a/pkg/targets/influx/implemented_target.go +++ b/pkg/targets/influx/implemented_target.go @@ -21,6 +21,8 @@ func (t *influxTarget) TargetSpecificFlags(flagPrefix string, flagSet *pflag.Fla flagSet.String(flagPrefix+"urls", "http://localhost:8086", "InfluxDB URLs, comma-separated. Will be used in a round-robin fashion.") flagSet.Int(flagPrefix+"replication-factor", 1, "Cluster replication factor (only applies to clustered databases).") flagSet.String(flagPrefix+"consistency", "all", "Write consistency. Must be one of: any, one, quorum, all.") + flagSet.String(flagPrefix+"auth-token", "", "Use the Authorization header with the Token scheme to provide your token to InfluxDB. If empty will not send the Authorization header.") + flagSet.String(flagPrefix+"organization", "", "Organization name (InfluxDB v2).") flagSet.Duration(flagPrefix+"backoff", time.Second, "Time to sleep between requests when server indicates backpressure is needed.") flagSet.Bool(flagPrefix+"gzip", true, "Whether to gzip encode requests (default true).") } diff --git a/pkg/targets/timescaledb/creator.go b/pkg/targets/timescaledb/creator.go index 195a39be5..0c38951a8 100644 --- a/pkg/targets/timescaledb/creator.go +++ b/pkg/targets/timescaledb/creator.go @@ -116,7 +116,6 @@ func (d *dbCreator) PostCreateDB(dbName string) error { } r = MustQuery(dbBench, checkTableQuery) } - return nil } } return nil @@ -197,7 +196,7 @@ func (d *dbCreator) createTableAndIndexes(dbBench *sql.DB, tableName string, fie if d.opts.UseHypertable { var creationCommand string = "create_hypertable" - var partitionsOption string = "replication_factor => NULL" + var partitionsOption string = "" MustExec(dbBench, "CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE") @@ -213,18 +212,27 @@ func (d *dbCreator) createTableAndIndexes(dbBench *sql.DB, tableName string, fie // We assume a single partition hypertable. This provides an option to test // partitioning on regular hypertables if d.opts.NumberPartitions > 0 { - partitionsOption = fmt.Sprintf("partitioning_column => '%s'::name, number_partitions => %v::smallint", partitionColumn, d.opts.NumberPartitions) + partitionsOption = fmt.Sprintf("'%s'::name, %v::smallint", partitionColumn, d.opts.NumberPartitions) } if d.opts.ReplicationFactor > 0 { // This gives us a future option of testing the impact of // multi-node replication across data nodes + creationCommand = "create_distributed_hypertable" partitionsOption = fmt.Sprintf("partitioning_column => '%s'::name, replication_factor => %v::smallint", partitionColumn, d.opts.ReplicationFactor) } MustExec(dbBench, - fmt.Sprintf("SELECT %s('%s'::regclass, 'time'::name, %s, chunk_time_interval => %d, create_default_indexes=>FALSE)", - creationCommand, tableName, partitionsOption, d.opts.ChunkTime.Nanoseconds()/1000)) + fmt.Sprintf("SELECT %s('%s'::regclass, by_range('time'::name), create_default_indexes=>FALSE)", + creationCommand, tableName)) + MustExec(dbBench, + fmt.Sprintf("SELECT set_chunk_time_interval('%s'::regclass, %d)", + tableName, d.opts.ChunkTime.Nanoseconds()/1000)) + if partitionsOption != "" { + MustExec(dbBench, + fmt.Sprintf("SELECT add_dimension('%s'::regclass, by_range(%s))", + tableName, partitionsOption)) + } } } diff --git a/scripts/full_cycle_minitest/full_cycle_minitest_influx.sh b/scripts/full_cycle_minitest/full_cycle_minitest_influx.sh new file mode 100755 index 000000000..6c1cd8701 --- /dev/null +++ b/scripts/full_cycle_minitest/full_cycle_minitest_influx.sh @@ -0,0 +1,77 @@ +#!/bin/bash +# showcases the ftsb 3 phases for influxdb +# - 1) data and query generation +# - 2) data loading/insertion +# - 3) query execution + +SCALE=${SCALE:-"10"} +SEED=${SEED:-"123"} +FORMAT="influx" + +mkdir -p /tmp/bulk_data +rm /tmp/bulk_data/${FORMAT}_* + +# exit immediately on error +set -e + +# Load parameters - common +DATABASE_PORT=${DATABASE_PORT:-8086} +DATABASE_HOST=${DATABASE_HOST:-localhost} + +# All available query types (sorted alphabetically) +QUERY_TYPES_ALL="\ + cpu-max-all-1 \ + cpu-max-all-8 \ + double-groupby-1 \ + double-groupby-5 \ + double-groupby-all \ + groupby-orderby-limit \ + high-cpu-1 \ + high-cpu-all \ + lastpoint \ + single-groupby-1-1-1 \ + single-groupby-1-1-12 \ + single-groupby-1-8-1 \ + single-groupby-5-1-1 \ + single-groupby-5-1-12 \ + single-groupby-5-8-1" + +# What query types to generate +QUERY_TYPES=${QUERY_TYPES:-$QUERY_TYPES_ALL} + +# generate data +$GOPATH/bin/tsbs_generate_data --format ${FORMAT} --use-case cpu-only --scale=${SCALE} --seed=${SEED} --file /tmp/bulk_data/${FORMAT}_data + +for queryName in $QUERY_TYPES; do + echo "generating query: $queryName" + $GOPATH/bin/tsbs_generate_queries --format ${FORMAT} --use-case cpu-only --scale=${SCALE} --seed=${SEED} \ + --queries=10 \ + --query-type $queryName \ + --file /tmp/bulk_data/${FORMAT}_query_$queryName +done + +until curl http://${DATABASE_HOST}:${DATABASE_PORT}/ping 2>/dev/null; do + echo "Waiting for InfluxDB" + sleep 1 +done + +# Remove previous database +curl -X POST http://${DATABASE_HOST}:${DATABASE_PORT}/query?q=drop%20database%20benchmark + +# insert benchmark +$GOPATH/bin/tsbs_load_${FORMAT} \ + --db-name=benchmark \ + --backoff=1s \ + --workers=1 \ + --urls=http://${DATABASE_HOST}:${DATABASE_PORT} \ + --auth-token ${INFLUX_AUTH_TOKEN} \ + --file=/tmp/bulk_data/${FORMAT}_data + +# queries benchmark +for queryName in $QUERY_TYPES; do + echo "running query: $queryName" + $GOPATH/bin/tsbs_run_queries_${FORMAT} --print-responses \ + --workers=1 \ + --auth-token ${INFLUX_AUTH_TOKEN} \ + --file /tmp/bulk_data/${FORMAT}_query_$queryName +done diff --git a/scripts/load/load_influx.sh b/scripts/load/load_influx.sh index 90e9e13c0..f8c6774d9 100755 --- a/scripts/load/load_influx.sh +++ b/scripts/load/load_influx.sh @@ -10,6 +10,7 @@ fi # Load parameters - common DATA_FILE_NAME=${DATA_FILE_NAME:-influx-data.gz} DATABASE_PORT=${DATABASE_PORT:-8086} +INFLUX_AUTH_TOKEN=${$INFLUX_AUTH_TOKEN:-""} EXE_DIR=${EXE_DIR:-$(dirname $0)} source ${EXE_DIR}/load_common.sh @@ -20,7 +21,10 @@ until curl http://${DATABASE_HOST}:${DATABASE_PORT}/ping 2>/dev/null; do done # Remove previous database -curl -X POST http://${DATABASE_HOST}:${DATABASE_PORT}/query?q=drop%20database%20${DATABASE_NAME} +curl --header "Authorization: Token $INFLUX_AUTH_TOKEN" \ + -X POST http://${DATABASE_HOST}:${DATABASE_PORT}/query?q=drop%20database%20${DATABASE_NAME} + + # Load new data cat ${DATA_FILE} | gunzip | $EXE_FILE_NAME \ --db-name=${DATABASE_NAME} \ @@ -28,4 +32,5 @@ cat ${DATA_FILE} | gunzip | $EXE_FILE_NAME \ --workers=${NUM_WORKERS} \ --batch-size=${BATCH_SIZE} \ --reporting-period=${REPORTING_PERIOD} \ + --auth-token $INFLUX_AUTH_TOKEN \ --urls=http://${DATABASE_HOST}:${DATABASE_PORT} diff --git a/scripts/run_queries/run_queries_influx.sh b/scripts/run_queries/run_queries_influx.sh index 749902e63..23c96554e 100755 --- a/scripts/run_queries/run_queries_influx.sh +++ b/scripts/run_queries/run_queries_influx.sh @@ -7,17 +7,25 @@ if [[ -z "$EXE_FILE_NAME" ]]; then exit 1 fi -# Default queries folder -BULK_DATA_DIR=${BULK_DATA_DIR:-"/tmp/bulk_queries"} -MAX_QUERIES=${MAX_QUERIES:-"0"} -# How many concurrent worker would run queries - match num of cores, or default to 4 -NUM_WORKERS=${NUM_WORKERS:-$(grep -c ^processor /proc/cpuinfo 2> /dev/null || echo 4)} +DATABASE_PORT=${DATABASE_PORT:-8086} +INFLUX_AUTH_TOKEN=${$INFLUX_AUTH_TOKEN:-""} + +EXE_DIR=${EXE_DIR:-$(dirname $0)} +source ${EXE_DIR}/run_common.sh + + +until curl http://${DATABASE_HOST}:${DATABASE_PORT}/ping 2>/dev/null; do + echo "Waiting for InfluxDB" + sleep 1 +done + +# Ensure RESULTS DIR available +mkdir -p ${RESULTS_DIR} # # Run test for one file # -function run_file() -{ +function run_file() { # $FULL_DATA_FILE_NAME: /full/path/to/file_with.ext # $DATA_FILE_NAME: file_with.ext # $DIR: /full/path/to @@ -29,24 +37,36 @@ function run_file() EXTENSION="${DATA_FILE_NAME##*.}" NO_EXT_DATA_FILE_NAME="${DATA_FILE_NAME%.*}" - # Several options on how to name results file - #OUT_FULL_FILE_NAME="${DIR}/result_${DATA_FILE_NAME}" - OUT_FULL_FILE_NAME="${DIR}/result_${NO_EXT_DATA_FILE_NAME}.out" - #OUT_FULL_FILE_NAME="${DIR}/${NO_EXT_DATA_FILE_NAME}.out" - if [ "${EXTENSION}" == "gz" ]; then GUNZIP="gunzip" else GUNZIP="cat" fi - echo "Running ${DATA_FILE_NAME}" - cat $FULL_DATA_FILE_NAME \ - | $GUNZIP \ - | $EXE_FILE_NAME \ - --max-queries $MAX_QUERIES \ - --workers $NUM_WORKERS \ - | tee $OUT_FULL_FILE_NAME + for run in $(seq ${REPETITIONS}); do + # Several options on how to name results file + #OUT_FULL_FILE_NAME="${DIR}/result_${DATA_FILE_NAME}" + OUT_FULL_FILE_NAME="${RESULTS_DIR}/result_${NO_EXT_DATA_FILE_NAME}_${run}.out" + #OUT_FULL_FILE_NAME="${DIR}/${NO_EXT_DATA_FILE_NAME}.out" + HDR_FULL_FILE_NAME="${RESULTS_DIR}/HDR_TXT_result_${NO_EXT_DATA_FILE_NAME}_${run}.out" + + echo "Running ${DATA_FILE_NAME}" + echo " Saving results to ${OUT_FULL_FILE_NAME}" + echo " Saving HDR results to ${HDR_FULL_FILE_NAME}" + + cat $FULL_DATA_FILE_NAME | + $GUNZIP | + $EXE_FILE_NAME \ + --max-queries=${MAX_QUERIES} \ + --db-name=${DATABASE_NAME} \ + --workers=${NUM_WORKERS} \ + --print-interval=${QUERIES_PRINT_INTERVAL} \ + --hdr-latencies=${HDR_FULL_FILE_NAME} \ + --auth-token $INFLUX_AUTH_TOKEN \ + --debug=${DEBUG} \ + --urls=http://${DATABASE_HOST}:${DATABASE_PORT} | + tee $OUT_FULL_FILE_NAME + done } if [ "$#" -gt 0 ]; then