Skip to content

Commit

Permalink
add profile action
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Jul 17, 2024
1 parent 67f105f commit 4acc559
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.regression.action

import com.google.common.collect.Maps
import groovy.transform.stc.ClosureParams
import groovy.transform.stc.FromString
import groovy.util.logging.Slf4j
Expand All @@ -37,7 +38,9 @@ class HttpCliAction implements SuiteAction {
private String body
private String result
private String op
private Map<String, String> headers = Maps.newLinkedHashMap()
private Closure check
private boolean printResponse = true
SuiteContext context

HttpCliAction(SuiteContext context) {
Expand All @@ -60,6 +63,17 @@ class HttpCliAction implements SuiteAction {
this.uri = uri
}

void header(String key, String value) {
this.headers.put(key, value)
}

void basicAuthorization(String user, String password) {
String credentials = user + ":" + (password.is(null) ? "" : password)
String encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes())
String headerValue = "Basic " + encodedCredentials;
headers.put("Authorization", headerValue)
}

void body(Closure<String> bodySupplier) {
this.body = bodySupplier.call()
}
Expand All @@ -80,6 +94,10 @@ class HttpCliAction implements SuiteAction {
this.result = result
}

void printResponse(boolean printResponse) {
this.printResponse = printResponse
}

@Override
void run() {
try {
Expand All @@ -91,17 +109,25 @@ class HttpCliAction implements SuiteAction {

if (op == "get") {
HttpGet httpGet = new HttpGet(uri)
for (final def header in headers.entrySet()) {
httpGet.setHeader(header.getKey(), header.getValue())
}

client.execute(httpGet).withCloseable { resp ->
resp.withCloseable {
String respJson = EntityUtils.toString(resp.getEntity())
def respCode = resp.getStatusLine().getStatusCode()
log.info("respCode: ${respCode}, respJson: ${respJson}")
if (printResponse) {
log.info("respCode: ${respCode}, respJson: ${respJson}")
}
return new ActionResult(respCode, respJson)
}
}
} else {
HttpPost httpPost = new HttpPost(uri)
for (final def header in headers.entrySet()) {
httpPost.setHeader(header.getKey(), header.getValue())
}
StringEntity requestEntity = new StringEntity(
body,
ContentType.APPLICATION_JSON);
Expand All @@ -111,14 +137,18 @@ class HttpCliAction implements SuiteAction {
resp.withCloseable {
String respJson = EntityUtils.toString(resp.getEntity())
def respCode = resp.getStatusLine().getStatusCode()
log.info("respCode: ${respCode}, respJson: ${respJson}")
if (printResponse) {
log.info("respCode: ${respCode}, respJson: ${respJson}")
}
return new ActionResult(respCode, respJson)
}
}
}
}
log.info("result:${result}".toString())
log.info("this.result:${this.result}".toString())
if (printResponse) {
log.info("result:${result}".toString())
log.info("this.result:${this.result}".toString())
}
if (check != null) {
check.call(result.respCode, result.body)
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package org.apache.doris.regression.action

import groovy.json.JsonSlurper
import groovy.transform.stc.ClosureParams
import groovy.transform.stc.FromString
import groovy.util.logging.Slf4j
import org.apache.doris.regression.suite.SuiteContext
import org.apache.doris.regression.util.JdbcUtils

@Slf4j
class ProfileAction implements SuiteAction {
private String tag
private Runnable runCallback
private Closure<String> check
private SuiteContext context

ProfileAction(SuiteContext context) {
this.context = context
}

void tag(String tag) {
this.tag = tag
}
void run(@ClosureParams(value = FromString, options = []) Runnable run) {
runCallback = run
}

void check(
@ClosureParams(value = FromString, options = ["String"]) Closure<String> check) {
this.check = check
}

@Override
void run() {
if (tag.is(null)) {
throw new IllegalStateException("Missing tag")
}
if (runCallback.is(null)) {
throw new IllegalStateException("Missing tag")
}
if (check.is(null)) {
throw new IllegalStateException("Missing check")
}
def conn = context.getConnection()
try {
JdbcUtils.executeToList(conn, "set enable_profile=true")

this.runCallback.run()

def httpCli = new HttpCliAction(context)
httpCli.endpoint(context.config.feHttpAddress)
httpCli.uri("/rest/v1/query_profile")
httpCli.op("get")
httpCli.printResponse(false)

if (context.config.fetchRunMode()) {
httpCli.basicAuthorization(context.config.feCloudHttpUser, context.config.feCloudHttpPassword)
} else {
httpCli.basicAuthorization(context.config.feHttpUser, context.config.feHttpPassword)
}
httpCli.check { code, body ->
if (code != 200) {
throw new IllegalStateException("Get profile list failed, code: ${code}, body:\n${body}")
}

def jsonSlurper = new JsonSlurper()
List profileData = jsonSlurper.parseText(body).data.rows
for (final def profileItem in profileData) {
if (profileItem["Sql Statement"].toString().contains(tag)) {
def profileId = profileItem["Profile ID"].toString()

def profileCli = new HttpCliAction(context)
profileCli.endpoint(context.config.feHttpAddress)
profileCli.uri("/rest/v1/query_profile/${profileId}")
profileCli.op("get")
profileCli.printResponse(false)

if (context.config.fetchRunMode()) {
profileCli.basicAuthorization(context.config.feCloudHttpUser, context.config.feCloudHttpPassword)
} else {
profileCli.basicAuthorization(context.config.feHttpUser, context.config.feHttpPassword)
}
profileCli.check { profileCode, profileResp ->
if (profileCode != 200) {
throw new IllegalStateException("Get profile failed, url: ${"/rest/v1/query_profile/${profileId}"}, code: ${profileCode}, body:\n${profileResp}")
}

def jsonSlurper2 = new JsonSlurper()
def profileText = jsonSlurper2.parseText(profileResp).data
profileText = profileText.replace("&nbsp;", " ")
profileText = profileText.replace("</br>", "\n")
this.check(profileText)
}
profileCli.run()

break
}
}
}
httpCli.run()
} finally {
JdbcUtils.executeToList(conn, "set enable_profile=false")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableList
import org.apache.commons.lang3.ObjectUtils
import org.apache.doris.regression.Config
import org.apache.doris.regression.action.BenchmarkAction
import org.apache.doris.regression.action.ProfileAction
import org.apache.doris.regression.action.WaitForAction
import org.apache.doris.regression.util.DataUtils
import org.apache.doris.regression.util.OutputUtils
Expand Down Expand Up @@ -620,6 +621,10 @@ class Suite implements GroovyInterceptable {
}
}

void profile(Closure<String> actionSupplier) {
runAction(new ProfileAction(context), actionSupplier)
}

void createMV(String sql) {
(new CreateMVAction(context, sql)).run()
}
Expand Down
4 changes: 3 additions & 1 deletion regression-test/framework/src/main/groovy/suite.gdsl
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ bindAction("streamLoad", "org.apache.doris.regression.action.StreamLoadAction")
bindAction("httpTest", "org.apache.doris.regression.action.HttpCliAction")
bindAction("benchmark", "org.apache.doris.regression.action.BenchmarkAction")
bindAction("waitForSchemaChangeDone", "org.apache.doris.regression.action.WaitForAction")
bindAction("profile", "org.apache.doris.regression.action.ProfileAction")

// bind qt_xxx and order_qt_xxx methods
contributor([suiteContext]) {
Expand Down Expand Up @@ -81,7 +82,8 @@ contributor([suiteContext]) {
!enclosingCall("explain") &&
!enclosingCall("streamLoad") &&
!enclosingCall("httpTest") &&
!enclosingCall("waitForSchemaChangeDone"))) {
!enclosingCall("waitForSchemaChangeDone") &&
!enclosingCall("profile"))) {
// bind other suite method and field
def suiteClass = findClass(suiteClassName)
delegatesTo(suiteClass)
Expand Down
44 changes: 41 additions & 3 deletions regression-test/suites/query_p0/cache/sql_cache.groovy
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import java.util.concurrent.atomic.AtomicReference
import java.util.stream.Collectors

// Licensed to the Apache Software Foundation (ASF) under one
Expand Down Expand Up @@ -191,7 +192,16 @@ suite("sql_cache") {
sql 'set default_order_by_limit = 2'
sql 'set sql_select_limit = 1'

qt_sql_cache8 """

AtomicReference<Throwable> exception = new AtomicReference<>()

profile {
tag "sql_cache8"

run {
try {
qt_sql_cache8 """
-- sql_cache8
select
k1,
sum(k2) as total_pv
Expand All @@ -204,8 +214,25 @@ suite("sql_cache") {
order by
k1;
"""

qt_sql_cache9 """
} catch (Throwable t) {
exception.set(t)
}
}

check { def profileString ->
if (!exception.get().is(null)) {
logger.error("Profile failed, profile result:\n${profileString}", exception.get())
}
}
}

profile {
tag "sql_cache9"

run {
try {
qt_sql_cache9 """
-- sql_cache9
select
k1,
sum(k2) as total_pv
Expand All @@ -218,6 +245,17 @@ suite("sql_cache") {
order by
k1;
"""
} catch (Throwable t) {
exception.set(t)
}
}

check { def profileString ->
if (!exception.get().is(null)) {
logger.error("Profile failed, profile result:\n${profileString}", exception.get())
}
}
}

sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
}

0 comments on commit 4acc559

Please sign in to comment.