Skip to content

Commit

Permalink
[regression-test](profile) Add some flaky profile tests back (#46720)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiqiang-hhhh authored Jan 14, 2025
1 parent 44834b6 commit 1b125cd
Show file tree
Hide file tree
Showing 3 changed files with 350 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import groovy.json.StringEscapeUtils


def getProfileList = {
def dst = 'http://' + context.config.feHttpAddress
def conn = new URL(dst + "/rest/v1/query_profile").openConnection()
conn.setRequestMethod("GET")
def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
(context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8"))
conn.setRequestProperty("Authorization", "Basic ${encoding}")
return conn.getInputStream().getText()
}


def getProfile = { id ->
def dst = 'http://' + context.config.feHttpAddress
def conn = new URL(dst + "/api/profile/text/?query_id=$id").openConnection()
conn.setRequestMethod("GET")
def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
(context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8"))
conn.setRequestProperty("Authorization", "Basic ${encoding}")
return conn.getInputStream().getText()
}

suite('adaptive_pipeline_task_serial_read_on_limit') {
sql """
DROP TABLE IF EXISTS adaptive_pipeline_task_serial_read_on_limit;
"""
sql """
CREATE TABLE if not exists `adaptive_pipeline_task_serial_read_on_limit` (
`id` INT,
`name` varchar(32)
) ENGINE=OLAP
DISTRIBUTED BY HASH(`id`) BUCKETS 5
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""

// Insert data to table
sql """
insert into adaptive_pipeline_task_serial_read_on_limit values
(1, "A"),(2, "B"),(3, "C"),(4, "D"),(5,"E"),(6,"F"),(7,"G"),(8,"H"),(9,"K");
"""
sql """
insert into adaptive_pipeline_task_serial_read_on_limit values
(10, "A"),(20, "B"),(30, "C"),(40, "D"),(50,"E"),(60,"F"),(70,"G"),(80,"H"),(90,"K");
"""
sql """
insert into adaptive_pipeline_task_serial_read_on_limit values
(101, "A"),(201, "B"),(301, "C"),(401, "D"),(501,"E"),(601,"F"),(701,"G"),(801,"H"),(901,"K");
"""
sql """
insert into adaptive_pipeline_task_serial_read_on_limit values
(1010, "A"),(2010, "B"),(3010, "C"),(4010, "D"),(5010,"E"),(6010,"F"),(7010,"G"),(8010,"H"),(9010,"K");
"""

sql "set enable_profile=true"
// set parallel_pipeline_task_num to 1 so that only one scan operator is created,
// and we can check MaxScannerThreadNum in profile.
sql "set parallel_pipeline_task_num=1;"

// Create a set<String> queryShouldHasOnePeakRunningScanner
// to store the query that should have only one peak running scanner.
def queryShouldHaveOnePeakRunningScanner = new HashSet<String>()
queryShouldHaveOnePeakRunningScanner.add("select * from adaptive_pipeline_task_serial_read_on_limit limit 10;")
queryShouldHaveOnePeakRunningScanner.add("select * from adaptive_pipeline_task_serial_read_on_limit limit 10000;")

def queryShouldHasMoreThanOnePeakRunningScanner = new HashSet<String>()
queryShouldHasMoreThanOnePeakRunningScanner.add("select * from adaptive_pipeline_task_serial_read_on_limit limit 9999;")
queryShouldHasMoreThanOnePeakRunningScanner.add("select * from adaptive_pipeline_task_serial_read_on_limit where id > 10 limit 1;")
queryShouldHasMoreThanOnePeakRunningScanner.add("select \"enable_adaptive_pipeline_task_serial_read_on_limit=false\", * from adaptive_pipeline_task_serial_read_on_limit limit 1000000;")

// With Limit, MaxScannerThreadNum = 1
sql "select * from adaptive_pipeline_task_serial_read_on_limit limit 10;"
sql "select * from adaptive_pipeline_task_serial_read_on_limit limit 10000;"
// With Limit, but bigger then adaptive_pipeline_task_serial_read_on_limit, MaxScannerThreadNum = TabletNum
sql "set adaptive_pipeline_task_serial_read_on_limit=9998;"
sql "select * from adaptive_pipeline_task_serial_read_on_limit limit 9999;"
// With limit, but with predicates too. MaxScannerThreadNum = TabletNum
sql "select * from adaptive_pipeline_task_serial_read_on_limit where id > 10 limit 1;"
// With large engough limit, but enable_adaptive_pipeline_task_serial_read_on_limit is false. MaxScannerThreadNum = TabletNum
sql "set enable_adaptive_pipeline_task_serial_read_on_limit=false;"
sql """select "enable_adaptive_pipeline_task_serial_read_on_limit=false", * from adaptive_pipeline_task_serial_read_on_limit limit 1000000;"""
// Sleep 500ms to wait for the profile collection
Thread.sleep(500)

// Get profile list by using show query profile command
// SHOW QUERY PROFILE returns profile meta as a table.
// The first column is profile id, last column is query stmt.
// Compare the query stmt, and get profile id for each query that we just emitted.

def queryProfiles = sql "show query profile limit 100;"
def profileList = queryProfiles.collect { row -> row.toList() }
List<String> profileShouldHaveOnePeakRunningScanner = new ArrayList<String>()
List<String> profileShouldHaveMoreThanOnePeakRunningScanner = new ArrayList<String>()

for (def profileItem in profileList) {
if (profileShouldHaveMoreThanOnePeakRunningScanner.size() + profileShouldHaveOnePeakRunningScanner.size() ==
queryShouldHasMoreThanOnePeakRunningScanner.size() + queryShouldHaveOnePeakRunningScanner.size()) {
break
}

if (queryShouldHaveOnePeakRunningScanner.contains(profileItem[-1])) {
profileShouldHaveOnePeakRunningScanner.add(profileItem[0])
continue
}
if (queryShouldHasMoreThanOnePeakRunningScanner.contains(profileItem[-1])) {
profileShouldHaveMoreThanOnePeakRunningScanner.add(profileItem[0])
continue
}
}

logger.info("profileShouldHaveOnePeakRunningScanner: ${profileShouldHaveOnePeakRunningScanner}")
logger.info("profileShouldHaveMoreThanOnePeakRunningScanner: ${profileShouldHaveMoreThanOnePeakRunningScanner}")

assertTrue(profileShouldHaveOnePeakRunningScanner.size() == queryShouldHaveOnePeakRunningScanner.size())
assertTrue(profileShouldHaveMoreThanOnePeakRunningScanner.size() == queryShouldHasMoreThanOnePeakRunningScanner.size())

for (def profileId : profileShouldHaveOnePeakRunningScanner) {
def profile = getProfile(profileId).toString()
logger.info("Profile ${profile}")
assertTrue(profile.contains("- MaxScannerThreadNum: 1"))
}

for (def profileId : profileShouldHaveMoreThanOnePeakRunningScanner) {
def profile = getProfile(profileId).toString()
logger.info("Profile ${profile}")
assertTrue(!profile.contains("- MaxScannerThreadNum: 1"))
}
}
108 changes: 108 additions & 0 deletions regression-test/suites/query_profile/scanner_profile.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import groovy.json.StringEscapeUtils


def getProfileList = {
def dst = 'http://' + context.config.feHttpAddress
def conn = new URL(dst + "/rest/v1/query_profile").openConnection()
conn.setRequestMethod("GET")
def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
(context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8"))
conn.setRequestProperty("Authorization", "Basic ${encoding}")
return conn.getInputStream().getText()
}


def getProfile = { id ->
def dst = 'http://' + context.config.feHttpAddress
def conn = new URL(dst + "/api/profile/text/?query_id=$id").openConnection()
conn.setRequestMethod("GET")
def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
(context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8"))
conn.setRequestProperty("Authorization", "Basic ${encoding}")
return conn.getInputStream().getText()
}

suite('scanner_profile') {
sql """
DROP TABLE IF EXISTS scanner_profile;
"""
sql """
CREATE TABLE if not exists `scanner_profile` (
`id` INT,
`name` varchar(32)
) ENGINE=OLAP
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""

// Insert data to table
sql """
insert into scanner_profile values
(1, "A"),(2, "B"),(3, "C"),(4, "D"),(5,"E"),(6,"F"),(7,"G"),(8,"H"),(9,"K");
"""
sql """
insert into scanner_profile values
(10, "A"),(20, "B"),(30, "C"),(40, "D"),(50,"E"),(60,"F"),(70,"G"),(80,"H"),(90,"K");
"""
sql """
insert into scanner_profile values
(101, "A"),(201, "B"),(301, "C"),(401, "D"),(501,"E"),(601,"F"),(701,"G"),(801,"H"),(901,"K");
"""
sql """
insert into scanner_profile values
(1010, "A"),(2010, "B"),(3010, "C"),(4010, "D"),(5010,"E"),(6010,"F"),(7010,"G"),(8010,"H"),(9010,"K");
"""

def uuidString = UUID.randomUUID().toString()
sql "set enable_profile=true"
// With Limit, MaxScannerThreadNum = 1
sql """
select "with_limit_1_${uuidString}", * from scanner_profile limit 10;
"""

def wholeString = getProfileList()
List profileData = new JsonSlurper().parseText(wholeString).data.rows
String queryIdWithLimit1 = "";


logger.info("{}", uuidString)

for (def profileItem in profileData) {
if (profileItem["Sql Statement"].toString().contains("with_limit_1_${uuidString}")) {
queryIdWithLimit1 = profileItem["Profile ID"].toString()
logger.info("profileItem: {}", profileItem)
}
}

logger.info("queryIdWithLimit1_${uuidString}: {}", queryIdWithLimit1)

assertTrue(queryIdWithLimit1 != "")

// Sleep 2 seconds to make sure profile collection is done
Thread.sleep(2000)

def String profileWithLimit1 = getProfile(queryIdWithLimit1).toString()
logger.info("Profile of ${queryIdWithLimit1} ${profileWithLimit1}")
assertTrue(profileWithLimit1.contains("- PeakRunningScannerPeak: 1"))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import groovy.json.StringEscapeUtils

def getProfileList = {
def dst = 'http://' + context.config.feHttpAddress
def conn = new URL(dst + "/rest/v1/query_profile").openConnection()
conn.setRequestMethod("GET")
def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
(context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8"))
conn.setRequestProperty("Authorization", "Basic ${encoding}")
return conn.getInputStream().getText()
}

def getProfile = { id ->
def dst = 'http://' + context.config.feHttpAddress
def conn = new URL(dst + "/api/profile/text/?query_id=$id").openConnection()
conn.setRequestMethod("GET")
def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
(context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8"))
conn.setRequestProperty("Authorization", "Basic ${encoding}")
// set conn parameters

return conn.getInputStream().getText()
}

suite('test_execute_by_frontend') {
sql """
CREATE TABLE if not exists `test_execute_by_frontend` (
`id` INT,
`name` varchar(32)
)ENGINE=OLAP
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""

sql "set enable_profile=true"
def simpleSql1 = "select * from test_execute_by_frontend"
sql "${simpleSql1}"
simpleSql2 = """select cast("1" as Int)"""
sql "${simpleSql2}"
def isRecorded = false
def wholeString = getProfileList()
List profileData = new JsonSlurper().parseText(wholeString).data.rows
String queryId1 = "";
String queryId2 = "";

for (final def profileItem in profileData) {
if (profileItem["Sql Statement"].toString() == simpleSql1) {
isRecorded = true
queryId1 = profileItem["Profile ID"].toString()
assertEquals("internal", profileItem["Default Catalog"].toString())
}
if (profileItem["Sql Statement"].toString() == simpleSql2) {
queryId2 = profileItem["Profile ID"].toString()
}
}

assertTrue(isRecorded)

String profileContent1 = getProfile(queryId1)
logger.info("Profile of ${queryId1}: ${profileContent1}")
def executionProfileIdx1 = profileContent1.indexOf("Executed By Frontend: true")
assertTrue(executionProfileIdx1 > 0)
String profileContent2 = getProfile(queryId2)
logger.info("Profile of ${queryId2}: ${profileContent2}")
def executionProfileIdx2 = profileContent2.indexOf("Executed By Frontend: true")
assertTrue(executionProfileIdx2 > 0)

sql """ SET enable_profile = false """
sql """ DROP TABLE IF EXISTS test_execute_by_frontend """
}

0 comments on commit 1b125cd

Please sign in to comment.