Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhancement](test) Add show data test #45501

Merged
merged 2 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@


// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
Expand Down Expand Up @@ -49,7 +48,7 @@ Suite.metaClass.listOssObjectWithPrefix = { OSS client, String bucketName, Strin
String nextMarker = null;
final int maxKeys = 500;
List<OSSObjectSummary> sums = null;

if (!client.doesBucketExist(bucketName)) {
logger.info("no bucket named ${bucketName} in ${endpoint}")
return
Expand All @@ -61,14 +60,14 @@ Suite.metaClass.listOssObjectWithPrefix = { OSS client, String bucketName, Strin
do {
objectListing = client.listObjects(new ListObjectsRequest(bucketName).
withPrefix(prefix).withMarker(nextMarker).withMaxKeys(maxKeys));

sums = objectListing.getObjectSummaries();
for (OSSObjectSummary s : sums) {
logger.info("\t" + s.getKey());
}

nextMarker = objectListing.getNextMarker();

} while (objectListing.isTruncated());
} catch (OSSException oe) {
logger.error("Caught an OSSException, which means your request made it to OSS, "
Expand Down Expand Up @@ -107,7 +106,7 @@ Suite.metaClass.calculateFolderLength = { OSS client, String bucketName, String
for (OSSObjectSummary s : sums) {
size += s.getSize();
}
} while (objectListing.isTruncated());
} while (objectListing.isTruncated());
return size;
}

Expand Down Expand Up @@ -143,7 +142,7 @@ Suite.metaClass.getOssAllDirSizeWithPrefix = { OSS client, String bucketName, St
logger.info(s.getKey() + " : " + (s.getSize() / (1024 * 1024 * 1024)) + "GB");
}
} while (objectListing.isTruncated());

} catch (OSSException oe) {
logger.error("Caught an OSSException, which means your request made it to OSS, "
+ "but was rejected with an error response for some reason.");
Expand All @@ -164,6 +163,3 @@ Suite.metaClass.getOssAllDirSizeWithPrefix = { OSS client, String bucketName, St
logger.info("Done!")
}
}



Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,45 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// The cases is copied from https://github.com/trinodb/trino/tree/master
// /testing/trino-product-tests/src/main/resources/sql-tests/testcases/tpcds
// and modified by Doris.
import groovy.json.JsonOutput
import org.apache.doris.regression.suite.Suite
import org.codehaus.groovy.runtime.IOGroovyMethods

// loading one data 10 times, expect data size not rising
suite("test_mow_show_data_in_cloud","p2") {
//cloud-mode
if (!isCloudMode()) {
logger.info("not cloud mode, not run")
return
Suite.metaClass.repeate_stream_load_same_data = { String tableName, int loadTimes, String filePath->
for (int i = 0; i < loadTimes; i++) {
streamLoad {
table tableName
set 'column_separator', '|'
set 'compress_type', 'GZ'
file """${getS3Url()}/${filePath}"""
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}
}
}

def repeate_stream_load_same_data = { String tableName, int loadTimes ->
for (int i = 0; i < loadTimes; i++) {
Suite.metaClass.stream_load_partial_update_data = { String tableName->
for (int i = 0; i < 20; i++) {
int start = i * 10 + 1
int end = (i + 1) * 10
def elements = (start..end).collect { "a$it" }
String columns = "id," + elements.join(',')
streamLoad {
table tableName
set 'column_separator', '|'
set 'compress_type', 'GZ'
file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split00.gz"""
set 'columns', columns
set 'partial_columns', 'true'
file """${getS3Url()}/regression/show_data/fullData.1.part${i+1}.gz"""
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
if (exception != null) {
Expand All @@ -50,27 +68,27 @@ suite("test_mow_show_data_in_cloud","p2") {
}
}

def get_tablets_from_table = { String table ->
Suite.metaClass.get_tablets_from_table = { String table ->
def res = sql_return_maparray """show tablets from ${table}"""
return res
}

def show_tablet_compaction = { HashMap tablet ->
Suite.metaClass.show_tablet_compaction = { HashMap tablet ->
StringBuilder sb = new StringBuilder();
sb.append("curl -X GET ")
sb.append(tablet["CompactionStatus"])
String command = sb.toString()
logger.info(command)
process = command.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
out = process.getText()
def process = command.execute()
def code = process.waitFor()
def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
def out = process.getText()
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
return parseJson(out.trim())
}

def trigger_tablet_compaction = { HashMap tablet, String compact_type ->
Suite.metaClass.trigger_tablet_compaction = { HashMap tablet, String compact_type ->
//support trigger base/cumulative/full compaction
def tabletStatusBeforeCompaction = show_tablet_compaction(tablet)

Expand All @@ -82,10 +100,10 @@ suite("test_mow_show_data_in_cloud","p2") {
sb.append(triggerCompactionUrl)
String command = sb.toString()
logger.info(command)
process = command.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
out = process.getText()
def process = command.execute()
def code = process.waitFor()
def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
def out = process.getText()
def outJson = parseJson(out)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
// if code = 0 means compaction happend, need to check
Expand All @@ -112,16 +130,16 @@ suite("test_mow_show_data_in_cloud","p2") {
}
}

def trigger_compaction = { List<List<Object>> tablets ->
Suite.metaClass.trigger_compaction = { List<List<Object>> tablets ->
for(def tablet: tablets) {
trigger_tablet_compaction(tablet, "cumulative")
trigger_tablet_compaction(tablet, "base")
trigger_tablet_compaction(tablet, "full")
}
}

def caculate_table_data_size_in_backend_storage = { List<List<Object>> tablets ->
storageType = context.config.otherConfigs.get("storageProvider")
Suite.metaClass.caculate_table_data_size_in_backend_storage = { List<List<Object>> tablets ->
def storageType = context.config.otherConfigs.get("storageProvider")
Double storageSize = 0

List<String> tabletIds = []
Expand All @@ -131,30 +149,30 @@ suite("test_mow_show_data_in_cloud","p2") {

if (storageType.toLowerCase() == "oss") {
//cbs means cluster backend storage
ak = context.config.otherConfigs.get("cbsS3Ak")
sk = context.config.otherConfigs.get("cbsS3Sk")
endpoint = context.config.otherConfigs.get("cbsS3Endpoint")
bucketName = context.config.otherConfigs.get("cbsS3Bucket")
storagePrefix = context.config.otherConfigs.get("cbsS3Prefix")
def ak = context.config.otherConfigs.get("cbsS3Ak")
def sk = context.config.otherConfigs.get("cbsS3Sk")
def endpoint = context.config.otherConfigs.get("cbsS3Endpoint")
def bucketName = context.config.otherConfigs.get("cbsS3Bucket")
def storagePrefix = context.config.otherConfigs.get("cbsS3Prefix")

client = initOssClient(ak, sk, endpoint)
def client = initOssClient(ak, sk, endpoint)
for(String tabletId: tabletIds) {
storageSize += calculateFolderLength(client, bucketName, storagePrefix + "/data/" + tabletId)
}
shutDownOssClient(client)
}

if (storageType.toLowerCase() == "hdfs") {
fsName = context.config.otherConfigs.get("cbsFsName")
isKerberosFs = context.config.otherConfigs.get("cbsFsKerberos")
fsUser = context.config.otherConfigs.get("cbsFsUser")
storagePrefix = context.config.otherConfigs.get("cbsFsPrefix")
def fsName = context.config.otherConfigs.get("cbsFsName")
def isKerberosFs = context.config.otherConfigs.get("cbsFsKerberos")
def fsUser = context.config.otherConfigs.get("cbsFsUser")
def storagePrefix = context.config.otherConfigs.get("cbsFsPrefix")
}

return storageSize
}

def translate_different_unit_to_MB = { String size, String unitField ->
Suite.metaClass.translate_different_unit_to_MB = { String size, String unitField ->
Double sizeKb = 0.0
if (unitField == "KB") {
sizeKb = Double.parseDouble(size) / 1024
Expand All @@ -168,7 +186,7 @@ suite("test_mow_show_data_in_cloud","p2") {
return sizeKb
}

def show_table_data_size_through_mysql = { String table ->
Suite.metaClass.show_table_data_size_through_mysql = { String table ->
def mysqlShowDataSize = 0L
def res = sql_return_maparray " show data from ${table}"
def tableSizeInfo = res[0]
Expand All @@ -181,7 +199,7 @@ suite("test_mow_show_data_in_cloud","p2") {
return mysqlShowDataSize
}

def caculate_table_data_size_through_api = { List<List<Object>> tablets ->
Suite.metaClass.caculate_table_data_size_through_api = { List<List<Object>> tablets ->
Double apiCaculateSize = 0
for (HashMap tablet in tablets) {
def tabletStatus = show_tablet_compaction(tablet)
Expand All @@ -199,42 +217,4 @@ suite("test_mow_show_data_in_cloud","p2") {

return apiCaculateSize
}

def main = {
tableName="lineitem_mow"
sql "DROP TABLE IF EXISTS ${tableName};"
sql new File("""${context.file.parent}/ddl/${tableName}.sql""").text
sql new File("""${context.file.parent}/ddl/lineitem_delete.sql""").text.replaceAll("\\\$\\{table\\}", tableName)
List<String> tablets = get_tablets_from_table(tableName)
def loadTimes = [1, 10]
Map<String, List> sizeRecords = ["apiSize":[], "mysqlSize":[], "cbsSize":[]]
for (int i in loadTimes){
// stream load 1 time, record each size
repeate_stream_load_same_data(tableName, i)
def rows = sql_return_maparray "select count(*) as count from ${tableName};"
logger.info("table ${tableName} has ${rows[0]["count"]} rows")
// 加一下触发compaction的机制
trigger_compaction(tablets)

// 然后 sleep 5min, 等fe汇报完
sleep(300 * 1000)

sizeRecords["apiSize"].add(caculate_table_data_size_through_api(tablets))
sizeRecords["cbsSize"].add(caculate_table_data_size_in_backend_storage(tablets))
sizeRecords["mysqlSize"].add(show_table_data_size_through_mysql(tableName))
sleep(300 * 1000)
logger.info("after ${i} times stream load, mysqlSize is: ${sizeRecords["mysqlSize"][-1]}, apiSize is: ${sizeRecords["apiSize"][-1]}, storageSize is: ${sizeRecords["cbsSize"][-1]}")

}

// expect mysqlSize == apiSize == storageSize
assertEquals(sizeRecords["mysqlSize"][0], sizeRecords["apiSize"][0])
assertEquals(sizeRecords["mysqlSize"][0], sizeRecords["cbsSize"][0])
// expect load 1 times == load 10 times
assertEquals(sizeRecords["mysqlSize"][0], sizeRecords["mysqlSize"][1])
assertEquals(sizeRecords["apiSize"][0], sizeRecords["apiSize"][1])
assertEquals(sizeRecords["cbsSize"][0], sizeRecords["cbsSize"][1])
}

main()
}
//http://qa-build.oss-cn-beijing.aliyuncs.com/regression/show_data/fullData.1.part1.gz
2 changes: 0 additions & 2 deletions regression-test/suites/show_data/ddl/lineitem_delete.sql

This file was deleted.

25 changes: 0 additions & 25 deletions regression-test/suites/show_data/ddl/lineitem_dup.sql

This file was deleted.

25 changes: 0 additions & 25 deletions regression-test/suites/show_data/ddl/lineitem_mow.sql

This file was deleted.

Loading
Loading