diff --git a/regression-test/suites/query_profile/adaptive_pipeline_task_serial_read_on_limit.groovy b/regression-test/suites/query_profile/adaptive_pipeline_task_serial_read_on_limit.groovy new file mode 100644 index 00000000000000..8ace6d03572c1b --- /dev/null +++ b/regression-test/suites/query_profile/adaptive_pipeline_task_serial_read_on_limit.groovy @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import groovy.json.JsonOutput +import groovy.json.JsonSlurper +import groovy.json.StringEscapeUtils + + +def getProfileList = { + def dst = 'http://' + context.config.feHttpAddress + def conn = new URL(dst + "/rest/v1/query_profile").openConnection() + conn.setRequestMethod("GET") + def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" + + (context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8")) + conn.setRequestProperty("Authorization", "Basic ${encoding}") + return conn.getInputStream().getText() +} + + +def getProfile = { id -> + def dst = 'http://' + context.config.feHttpAddress + def conn = new URL(dst + "/api/profile/text/?query_id=$id").openConnection() + conn.setRequestMethod("GET") + def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" + + (context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8")) + conn.setRequestProperty("Authorization", "Basic ${encoding}") + return conn.getInputStream().getText() +} + +suite('adaptive_pipeline_task_serial_read_on_limit') { + sql """ + DROP TABLE IF EXISTS adaptive_pipeline_task_serial_read_on_limit; + """ + sql """ + CREATE TABLE if not exists `adaptive_pipeline_task_serial_read_on_limit` ( + `id` INT, + `name` varchar(32) + ) ENGINE=OLAP + DISTRIBUTED BY HASH(`id`) BUCKETS 5 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + // Insert data to table + sql """ + insert into adaptive_pipeline_task_serial_read_on_limit values + (1, "A"),(2, "B"),(3, "C"),(4, "D"),(5,"E"),(6,"F"),(7,"G"),(8,"H"),(9,"K"); + """ + sql """ + insert into adaptive_pipeline_task_serial_read_on_limit values + (10, "A"),(20, "B"),(30, "C"),(40, "D"),(50,"E"),(60,"F"),(70,"G"),(80,"H"),(90,"K"); + """ + sql """ + insert into adaptive_pipeline_task_serial_read_on_limit values + (101, "A"),(201, "B"),(301, "C"),(401, "D"),(501,"E"),(601,"F"),(701,"G"),(801,"H"),(901,"K"); + """ + sql """ + insert into adaptive_pipeline_task_serial_read_on_limit values + (1010, "A"),(2010, "B"),(3010, "C"),(4010, "D"),(5010,"E"),(6010,"F"),(7010,"G"),(8010,"H"),(9010,"K"); + """ + + sql "set enable_profile=true" + // set parallel_pipeline_task_num to 1 so that only one scan operator is created, + // and we can check MaxScannerThreadNum in profile. + sql "set parallel_pipeline_task_num=1;" + + // Create a set queryShouldHasOnePeakRunningScanner + // to store the query that should have only one peak running scanner. + def queryShouldHaveOnePeakRunningScanner = new HashSet() + queryShouldHaveOnePeakRunningScanner.add("select * from adaptive_pipeline_task_serial_read_on_limit limit 10;") + queryShouldHaveOnePeakRunningScanner.add("select * from adaptive_pipeline_task_serial_read_on_limit limit 10000;") + + def queryShouldHasMoreThanOnePeakRunningScanner = new HashSet() + queryShouldHasMoreThanOnePeakRunningScanner.add("select * from adaptive_pipeline_task_serial_read_on_limit limit 9999;") + queryShouldHasMoreThanOnePeakRunningScanner.add("select * from adaptive_pipeline_task_serial_read_on_limit where id > 10 limit 1;") + queryShouldHasMoreThanOnePeakRunningScanner.add("select \"enable_adaptive_pipeline_task_serial_read_on_limit=false\", * from adaptive_pipeline_task_serial_read_on_limit limit 1000000;") + + // With Limit, MaxScannerThreadNum = 1 + sql "select * from adaptive_pipeline_task_serial_read_on_limit limit 10;" + sql "select * from adaptive_pipeline_task_serial_read_on_limit limit 10000;" + // With Limit, but bigger then adaptive_pipeline_task_serial_read_on_limit, MaxScannerThreadNum = TabletNum + sql "set adaptive_pipeline_task_serial_read_on_limit=9998;" + sql "select * from adaptive_pipeline_task_serial_read_on_limit limit 9999;" + // With limit, but with predicates too. MaxScannerThreadNum = TabletNum + sql "select * from adaptive_pipeline_task_serial_read_on_limit where id > 10 limit 1;" + // With large engough limit, but enable_adaptive_pipeline_task_serial_read_on_limit is false. MaxScannerThreadNum = TabletNum + sql "set enable_adaptive_pipeline_task_serial_read_on_limit=false;" + sql """select "enable_adaptive_pipeline_task_serial_read_on_limit=false", * from adaptive_pipeline_task_serial_read_on_limit limit 1000000;""" + // Sleep 500ms to wait for the profile collection + Thread.sleep(500) + + // Get profile list by using show query profile command + // SHOW QUERY PROFILE returns profile meta as a table. + // The first column is profile id, last column is query stmt. + // Compare the query stmt, and get profile id for each query that we just emitted. + + def queryProfiles = sql "show query profile limit 100;" + def profileList = queryProfiles.collect { row -> row.toList() } + List profileShouldHaveOnePeakRunningScanner = new ArrayList() + List profileShouldHaveMoreThanOnePeakRunningScanner = new ArrayList() + + for (def profileItem in profileList) { + if (profileShouldHaveMoreThanOnePeakRunningScanner.size() + profileShouldHaveOnePeakRunningScanner.size() == + queryShouldHasMoreThanOnePeakRunningScanner.size() + queryShouldHaveOnePeakRunningScanner.size()) { + break + } + + if (queryShouldHaveOnePeakRunningScanner.contains(profileItem[-1])) { + profileShouldHaveOnePeakRunningScanner.add(profileItem[0]) + continue + } + if (queryShouldHasMoreThanOnePeakRunningScanner.contains(profileItem[-1])) { + profileShouldHaveMoreThanOnePeakRunningScanner.add(profileItem[0]) + continue + } + } + + logger.info("profileShouldHaveOnePeakRunningScanner: ${profileShouldHaveOnePeakRunningScanner}") + logger.info("profileShouldHaveMoreThanOnePeakRunningScanner: ${profileShouldHaveMoreThanOnePeakRunningScanner}") + + assertTrue(profileShouldHaveOnePeakRunningScanner.size() == queryShouldHaveOnePeakRunningScanner.size()) + assertTrue(profileShouldHaveMoreThanOnePeakRunningScanner.size() == queryShouldHasMoreThanOnePeakRunningScanner.size()) + + for (def profileId : profileShouldHaveOnePeakRunningScanner) { + def profile = getProfile(profileId).toString() + logger.info("Profile ${profile}") + assertTrue(profile.contains("- MaxScannerThreadNum: 1")) + } + + for (def profileId : profileShouldHaveMoreThanOnePeakRunningScanner) { + def profile = getProfile(profileId).toString() + logger.info("Profile ${profile}") + assertTrue(!profile.contains("- MaxScannerThreadNum: 1")) + } +} \ No newline at end of file diff --git a/regression-test/suites/query_profile/scanner_profile.groovy b/regression-test/suites/query_profile/scanner_profile.groovy new file mode 100644 index 00000000000000..3678daba1ca3be --- /dev/null +++ b/regression-test/suites/query_profile/scanner_profile.groovy @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import groovy.json.JsonOutput +import groovy.json.JsonSlurper +import groovy.json.StringEscapeUtils + + +def getProfileList = { + def dst = 'http://' + context.config.feHttpAddress + def conn = new URL(dst + "/rest/v1/query_profile").openConnection() + conn.setRequestMethod("GET") + def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" + + (context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8")) + conn.setRequestProperty("Authorization", "Basic ${encoding}") + return conn.getInputStream().getText() +} + + +def getProfile = { id -> + def dst = 'http://' + context.config.feHttpAddress + def conn = new URL(dst + "/api/profile/text/?query_id=$id").openConnection() + conn.setRequestMethod("GET") + def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" + + (context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8")) + conn.setRequestProperty("Authorization", "Basic ${encoding}") + return conn.getInputStream().getText() +} + +suite('scanner_profile') { + sql """ + DROP TABLE IF EXISTS scanner_profile; + """ + sql """ + CREATE TABLE if not exists `scanner_profile` ( + `id` INT, + `name` varchar(32) + ) ENGINE=OLAP + DISTRIBUTED BY HASH(`id`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + // Insert data to table + sql """ + insert into scanner_profile values + (1, "A"),(2, "B"),(3, "C"),(4, "D"),(5,"E"),(6,"F"),(7,"G"),(8,"H"),(9,"K"); + """ + sql """ + insert into scanner_profile values + (10, "A"),(20, "B"),(30, "C"),(40, "D"),(50,"E"),(60,"F"),(70,"G"),(80,"H"),(90,"K"); + """ + sql """ + insert into scanner_profile values + (101, "A"),(201, "B"),(301, "C"),(401, "D"),(501,"E"),(601,"F"),(701,"G"),(801,"H"),(901,"K"); + """ + sql """ + insert into scanner_profile values + (1010, "A"),(2010, "B"),(3010, "C"),(4010, "D"),(5010,"E"),(6010,"F"),(7010,"G"),(8010,"H"),(9010,"K"); + """ + + def uuidString = UUID.randomUUID().toString() + sql "set enable_profile=true" + // With Limit, MaxScannerThreadNum = 1 + sql """ + select "with_limit_1_${uuidString}", * from scanner_profile limit 10; + """ + + def wholeString = getProfileList() + List profileData = new JsonSlurper().parseText(wholeString).data.rows + String queryIdWithLimit1 = ""; + + + logger.info("{}", uuidString) + + for (def profileItem in profileData) { + if (profileItem["Sql Statement"].toString().contains("with_limit_1_${uuidString}")) { + queryIdWithLimit1 = profileItem["Profile ID"].toString() + logger.info("profileItem: {}", profileItem) + } + } + + logger.info("queryIdWithLimit1_${uuidString}: {}", queryIdWithLimit1) + + assertTrue(queryIdWithLimit1 != "") + + // Sleep 2 seconds to make sure profile collection is done + Thread.sleep(2000) + + def String profileWithLimit1 = getProfile(queryIdWithLimit1).toString() + logger.info("Profile of ${queryIdWithLimit1} ${profileWithLimit1}") + assertTrue(profileWithLimit1.contains("- PeakRunningScannerPeak: 1")) +} \ No newline at end of file diff --git a/regression-test/suites/query_profile/test_execute_by_frontend.groovy b/regression-test/suites/query_profile/test_execute_by_frontend.groovy new file mode 100644 index 00000000000000..2b2d867f73b6bc --- /dev/null +++ b/regression-test/suites/query_profile/test_execute_by_frontend.groovy @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import groovy.json.JsonOutput +import groovy.json.JsonSlurper +import groovy.json.StringEscapeUtils + +def getProfileList = { + def dst = 'http://' + context.config.feHttpAddress + def conn = new URL(dst + "/rest/v1/query_profile").openConnection() + conn.setRequestMethod("GET") + def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" + + (context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8")) + conn.setRequestProperty("Authorization", "Basic ${encoding}") + return conn.getInputStream().getText() +} + +def getProfile = { id -> + def dst = 'http://' + context.config.feHttpAddress + def conn = new URL(dst + "/api/profile/text/?query_id=$id").openConnection() + conn.setRequestMethod("GET") + def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" + + (context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8")) + conn.setRequestProperty("Authorization", "Basic ${encoding}") + // set conn parameters + + return conn.getInputStream().getText() + } + +suite('test_execute_by_frontend') { + sql """ + CREATE TABLE if not exists `test_execute_by_frontend` ( + `id` INT, + `name` varchar(32) + )ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql "set enable_profile=true" + def simpleSql1 = "select * from test_execute_by_frontend" + sql "${simpleSql1}" + simpleSql2 = """select cast("1" as Int)""" + sql "${simpleSql2}" + def isRecorded = false + def wholeString = getProfileList() + List profileData = new JsonSlurper().parseText(wholeString).data.rows + String queryId1 = ""; + String queryId2 = ""; + + for (final def profileItem in profileData) { + if (profileItem["Sql Statement"].toString() == simpleSql1) { + isRecorded = true + queryId1 = profileItem["Profile ID"].toString() + assertEquals("internal", profileItem["Default Catalog"].toString()) + } + if (profileItem["Sql Statement"].toString() == simpleSql2) { + queryId2 = profileItem["Profile ID"].toString() + } + } + + assertTrue(isRecorded) + + String profileContent1 = getProfile(queryId1) + logger.info("Profile of ${queryId1}: ${profileContent1}") + def executionProfileIdx1 = profileContent1.indexOf("Executed By Frontend: true") + assertTrue(executionProfileIdx1 > 0) + String profileContent2 = getProfile(queryId2) + logger.info("Profile of ${queryId2}: ${profileContent2}") + def executionProfileIdx2 = profileContent2.indexOf("Executed By Frontend: true") + assertTrue(executionProfileIdx2 > 0) + + sql """ SET enable_profile = false """ + sql """ DROP TABLE IF EXISTS test_execute_by_frontend """ +} \ No newline at end of file