StarRocks provides a self-developed connector named StarRocks Connector for Apache Spark™ (Spark connector for short) to help you read data from a StarRocks table by using Spark. You can use Spark for complex processing and machine learning on the data you have read from StarRocks.
The Spark connector supports three reading methods: Spark SQL, Spark DataFrame, and Spark RDD.
You can use Spark SQL to create a temporary view on the StarRocks table, and then directly read data from the StarRocks table by using that temporary view.
You can also map the StarRocks table to a Spark DataFrame or a Spark RDD, and then read data from the Spark DataFrame or Spark RDD. We recommend the use of a Spark DataFrame.
NOTICE
Reading data from StarRocks tables with Spark connector needs SELECT privilege. If you do not have the privilege, follow the instructions provided in GRANT to grant the privilege to the user that you use to connect to your StarRocks cluster.
- You can filter data on StarRocks before you read the data, thereby reducing the amount of data transferred.
- If the overhead of reading data is substantial, you can employ appropriate table design and filter conditions to prevent Spark from reading an excessive amount of data at a time. As such, you can reduce I/O pressure on your disk and network connection, thereby ensuring routine queries can be run properly.
Spark connector | Spark | StarRocks | Java | Scala |
---|---|---|---|---|
1.1.2 | 3.2, 3.3, 3.4, 3.5 | 2.5 and later | 8 | 2.12 |
1.1.1 | 3.2, 3.3, 3.4 | 2.5 and later | 8 | 2.12 |
1.1.0 | 3.2, 3.3, 3.4 | 2.5 and later | 8 | 2.12 |
1.0.0 | 3.x | 1.18 and later | 8 | 2.12 |
1.0.0 | 2.x | 1.18 and later | 8 | 2.11 |
NOTICE
- Please see Upgrade Spark connector for behaviour changes among different connector versions.
- The connector does not provide MySQL JDBC driver since version 1.1.1, and you need import the driver to the spark classpath manually. You can find the driver on Maven Central.
- In version 1.0.0, the Spark connector only supports reading data from StarRocks. From version 1.1.0 onwards, the Spark connector supports both reading data from and writing data to StarRocks.
- Version 1.0.0 differs from version 1.1.0 in terms of parameters and data type mappings. See Upgrade Spark connector.
- In general cases, no new features will be added to version 1.0.0. We recommend that you upgrade your Spark connector at your earliest opportunity.
Use one of the following methods to obtain the Spark connector .jar package that suits your business needs:
- Download a compiled package.
- Use Maven to add the dependencies required by the Spark connector. (This method is supported only for Spark connector 1.1.0 and later.)
- Manually compile a package.
Spark connector .jar packages are named in the following format:
starrocks-spark-connector-${spark_version}_${scala_version}-${connector_version}.jar
For example, if you want to use Spark connector 1.1.0 with Spark 3.2 and Scala 2.12, you can choose starrocks-spark-connector-3.2_2.12-1.1.0.jar
.
NOTICE
In normal cases, the latest Spark connector version can be used with the most recent three Spark versions.
You can obtain Spark connector .jar packages of various versions at Maven Central Repository.
Configure the dependencies required by the Spark connector as follows:
NOTICE
You must replace
spark_version
,scala_version
, andconnector_version
with the Spark version, Scala version, and Spark connector version you use.
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-${spark_version}_${scala_version}</artifactId>
<version>${connector_version}</version>
</dependency>
For example, if you want to use Spark connector 1.1.0 with Spark 3.2 and Scala 2.12, configure the dependencies as follows:
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-3.2_2.12</artifactId>
<version>1.1.0</version>
</dependency>
-
Download the Spark connector code.
-
Use the following command to compile the Spark connector:
NOTICE
You must replace
spark_version
with the Spark version you use.sh build.sh <spark_version>
For example, if you want to use the Spark connector with Spark 3.2, compile the Spark connector as follows:
sh build.sh 3.2
-
Go to the
target/
path, in which a Spark connector .jar package likestarrocks-spark-connector-3.2_2.12-1.1.0-SNAPSHOT.jar
is generated upon compilation.NOTICE
If you are using a Spark connector version that is not officially released, the name of the generated Spark connector .jar package contains
SNAPSHOT
as a suffix.
-
Download the Spark connector code.
NOTICE
You must switch to
spark-1.0
. -
Take one of the following actions to compile the Spark connector:
-
If you are using Spark 2.x, run the following command, which compiles the Spark connector to suit Spark 2.3.4 by default:
sh build.sh 2
-
If you are using Spark 3.x, run the following command, which compiles the Spark connector to suit Spark 3.1.2 by default:
sh build.sh 3
-
-
Go to the
output/
path, in which thestarrocks-spark2_2.11-1.0.0.jar
file is generated upon compilation. Then, copy the file to the classpath of Spark:- If your Spark cluster runs in
Local
mode, place the file into thejars/
path. - If your Spark cluster runs in
Yarn
mode, place the file into the pre-deployment package.
- If your Spark cluster runs in
You can use the Spark connector to read data from StarRocks only after you place the file into the specified location.
This section describes the parameters you need to configure when you use the Spark connector to read data from StarRocks.
The following parameters apply to all three reading methods: Spark SQL, Spark DataFrame, and Spark RDD.
Parameter | Default value | Description |
---|---|---|
starrocks.fenodes | None | The HTTP URL of the FE in your StarRocks cluster. Format <fe_host>:<fe_http_port> . You can specify multiple URLs, which must be separated by a comma (,). |
starrocks.table.identifier | None | The name of the StarRocks table. Format: <database_name>.<table_name> . |
starrocks.request.retries | 3 | The maximum number of times that Spark can retry to send a read request o StarRocks. |
starrocks.request.connect.timeout.ms | 30000 | The maximum amount of time after which a read request sent to StarRocks times out. |
starrocks.request.read.timeout.ms | 30000 | The maximum amount of time after which the reading for a request sent to StarRocks times out. |
starrocks.request.query.timeout.s | 3600 | The maximum amount of time after which a query of data from StarRocks times out. The default timeout period is 1 hour. -1 means that no timeout period is specified. |
starrocks.request.tablet.size | Integer.MAX_VALUE | The number of StarRocks tablets grouped into each Spark RDD partition. A smaller value of this parameter indicates that a larger number of Spark RDD partitions will be generated. A larger number of Spark RDD partitions means higher parallelism on Spark but greater pressure on StarRocks. |
starrocks.batch.size | 4096 | The maximum number of rows that can be read from BEs at a time. Increasing the value of this parameter can reduce the number of connections established between Spark and StarRocks, thereby mitigating extra time overheads caused by network latency. |
starrocks.exec.mem.limit | 2147483648 | The maximum amount of memory allowed per query. Unit: bytes. The default memory limit is 2 GB. |
starrocks.deserialize.arrow.async | false | Specifies whether to support asynchronously converting the Arrow memory format to RowBatches required for the iteration of the Spark connector. |
starrocks.deserialize.queue.size | 64 | The size of the internal queue that holds tasks for asynchronously converting the Arrow memory format to RowBatches. This parameter is valid when starrocks.deserialize.arrow.async is set to true . |
starrocks.filter.query | None | The condition based on which you want to filter data on StarRocks. You can specify multiple filter conditions, which must be joined by and . StarRocks filters the data from the StarRocks table based on the specified filter conditions before the data is read by Spark. |
starrocks.timezone | Default timezone of JVM | Supported since 1.1.1. The timezone used to convert StarRocks DATETIME to Spark TimestampType . The default is the timezone of JVM returned by ZoneId#systemDefault() . The format could be a timezone name such as Asia/Shanghai , or a zone offset such as +08:00 . |
The following parameters apply only to the Spark SQL and Spark DataFrame reading methods.
Parameter | Default value | Description |
---|---|---|
starrocks.fe.http.url | None | The HTTP IP address of the FE. This parameter is supported from Spark connector 1.1.0 onwards. This parameter is equivalent to starrocks.fenodes . You only need to configure one of them. In Spark connector 1.1.0 and later, we recommend that you use starrocks.fe.http.url because starrocks.fenodes may be deprecated. |
starrocks.fe.jdbc.url | None | The address that is used to connect to the MySQL server of the FE. Format: jdbc:mysql://<fe_host>:<fe_query_port> .NOTICE In Spark connector 1.1.0 and later, this parameter is mandatory. |
user | None | The username of your StarRocks cluster account. |
starrocks.user | None | The username of your StarRocks cluster account. This parameter is supported from Spark connector 1.1.0 onwards. This parameter is equivalent to user . You only need to configure one of them. In Spark connector 1.1.0 and later, we recommend that you use starrocks.user because user may be deprecated. |
password | None | The password of your StarRocks cluster account. |
starrocks.password | None | The password of your StarRocks cluster account. This parameter is supported from Spark connector 1.1.0 onwards. This parameter is equivalent to password . You only need to configure one of them. In Spark connector 1.1.0 and later, we recommend that you use starrocks.password because password may be deprecated. |
starrocks.filter.query.in.max.count | 100 | The maximum number of values supported by the IN expression during predicate pushdown. If the number of values specified in the IN expression exceeds this limit, the filter conditions specified in the IN expression are processed on Spark. |
The following parameters apply only to the Spark RDD reading method.
Parameter | Default value | Description |
---|---|---|
starrocks.request.auth.user | None | The username of your StarRocks cluster account. |
starrocks.request.auth.password | None | The password of your StarRocks cluster account. |
starrocks.read.field | None | The StarRocks table column from which you want to read data. You can specify multiple columns, which must be separated by a comma (,). |
StarRocks data type | Spark data type |
---|---|
BOOLEAN | DataTypes.BooleanType |
TINYINT | DataTypes.ByteType |
SMALLINT | DataTypes.ShortType |
INT | DataTypes.IntegerType |
BIGINT | DataTypes.LongType |
LARGEINT | DataTypes.StringType |
FLOAT | DataTypes.FloatType |
DOUBLE | DataTypes.DoubleType |
DECIMAL | DecimalType |
CHAR | DataTypes.StringType |
VARCHAR | DataTypes.StringType |
STRING | DataTypes.StringType |
DATE | DataTypes.DateType |
DATETIME | DataTypes.TimestampType |
JSON | DataTypes.StringType NOTE: Supported since version 1.1.2, and require StarRocks at least 2.5.13, 3.0.3, 3.1 or later |
ARRAY | Unsupported datatype |
HLL | Unsupported datatype |
BITMAP | Unsupported datatype |
StarRocks data type | Spark data type |
---|---|
BOOLEAN | DataTypes.BooleanType |
TINYINT | DataTypes.ByteType |
SMALLINT | DataTypes.ShortType |
INT | DataTypes.IntegerType |
BIGINT | DataTypes.LongType |
LARGEINT | DataTypes.StringType |
FLOAT | DataTypes.FloatType |
DOUBLE | DataTypes.DoubleType |
DECIMAL | DecimalType |
CHAR | DataTypes.StringType |
VARCHAR | DataTypes.StringType |
DATE | DataTypes.StringType |
DATETIME | DataTypes.StringType |
ARRAY | Unsupported datatype |
HLL | Unsupported datatype |
BITMAP | Unsupported datatype |
The processing logic of the underlying storage engine used by StarRocks cannot cover an expected time range when DATE and DATETIME data types are directly used. Therefore, the Spark connector maps the DATE and DATETIME data types from StarRocks to the STRING data type from Spark, and generates readable string texts matching the date and time data read from StarRocks.
- Since 1.1.1, the Spark connector does not provide
mysql-connector-java
which is the official JDBC driver for MySQL, because of the limitations of the GPL license used bymysql-connector-java
. However, the Spark connector still needs themysql-connector-java
to connect to StarRocks for the table metadata, so you need to add the driver to the Spark classpath manually. You can find the driver on MySQL site or Maven Central.
-
In version 1.1.0, the Spark connector uses JDBC to access StarRocks to obtain more detailed table information. Therefore, you must configure
starrocks.fe.jdbc.url
. -
In version 1.1.0, some parameters are renamed. Both the old and new parameters are retained for now. For each pair of equivalent parameters, you only need to configure one of them, but we recommend that you use the new one because the old one may be deprecated.
starrocks.fenodes
is renamed asstarrocks.fe.http.url
.user
is renamed asstarrocks.user
.password
is renamed asstarrocks.password
.
-
In version 1.1.0, the mappings of some data types are adjusted based on Spark 3.x:
DATE
in StarRocks is mapped toDataTypes.DateType
(originallyDataTypes.StringType
) in Spark.DATETIME
in StarRocks is mapped toDataTypes.TimestampType
(originallyDataTypes.StringType
) in Spark.
The following examples assume you have created a database named test
in your StarRocks cluster and you have the permissions of user root
. The parameter settings in the examples are based on Spark Connector 1.1.0.
Do as follows to prepare a sample table:
-
Go to the
test
database and create a table namedscore_board
.MySQL [test]> CREATE TABLE `score_board` ( `id` int(11) NOT NULL COMMENT "", `name` varchar(65533) NULL DEFAULT "" COMMENT "", `score` int(11) NOT NULL DEFAULT "0" COMMENT "" ) ENGINE=OLAP PRIMARY KEY(`id`) COMMENT "OLAP" DISTRIBUTED BY HASH(`id`) PROPERTIES ( "replication_num" = "3" );
-
Insert data into the
score_board
table.MySQL [test]> INSERT INTO score_board VALUES (1, 'Bob', 21), (2, 'Stan', 21), (3, 'Sam', 22), (4, 'Tony', 22), (5, 'Alice', 22), (6, 'Lucy', 23), (7, 'Polly', 23), (8, 'Tom', 23), (9, 'Rose', 24), (10, 'Jerry', 24), (11, 'Jason', 24), (12, 'Lily', 25), (13, 'Stephen', 25), (14, 'David', 25), (15, 'Eddie', 26), (16, 'Kate', 27), (17, 'Cathy', 27), (18, 'Judy', 27), (19, 'Julia', 28), (20, 'Robert', 28), (21, 'Jack', 29);
-
Query the
score_board
table.MySQL [test]> SELECT * FROM score_board; +------+---------+-------+ | id | name | score | +------+---------+-------+ | 1 | Bob | 21 | | 2 | Stan | 21 | | 3 | Sam | 22 | | 4 | Tony | 22 | | 5 | Alice | 22 | | 6 | Lucy | 23 | | 7 | Polly | 23 | | 8 | Tom | 23 | | 9 | Rose | 24 | | 10 | Jerry | 24 | | 11 | Jason | 24 | | 12 | Lily | 25 | | 13 | Stephen | 25 | | 14 | David | 25 | | 15 | Eddie | 26 | | 16 | Kate | 27 | | 17 | Cathy | 27 | | 18 | Judy | 27 | | 19 | Julia | 28 | | 20 | Robert | 28 | | 21 | Jack | 29 | +------+---------+-------+ 21 rows in set (0.01 sec)
-
Run the following command in the Spark directory to start Spark SQL:
sh spark-sql
-
Run the following command to create a temporary view named
spark_starrocks
on thescore_board
table which belongs to thetest
database:spark-sql> CREATE TEMPORARY VIEW spark_starrocks USING starrocks OPTIONS ( "starrocks.table.identifier" = "test.score_board", "starrocks.fe.http.url" = "<fe_host>:<fe_http_port>", "starrocks.fe.jdbc.url" = "jdbc:mysql://<fe_host>:<fe_query_port>", "starrocks.user" = "root", "starrocks.password" = "" );
-
Run the following command to read data from the temporary view:
spark-sql> SELECT * FROM spark_starrocks;
Spark returns the following data:
1 Bob 21 2 Stan 21 3 Sam 22 4 Tony 22 5 Alice 22 6 Lucy 23 7 Polly 23 8 Tom 23 9 Rose 24 10 Jerry 24 11 Jason 24 12 Lily 25 13 Stephen 25 14 David 25 15 Eddie 26 16 Kate 27 17 Cathy 27 18 Judy 27 19 Julia 28 20 Robert 28 21 Jack 29 Time taken: 1.883 seconds, Fetched 21 row(s) 22/08/09 15:29:36 INFO thriftserver.SparkSQLCLIDriver: Time taken: 1.883 seconds, Fetched 21 row(s)
-
Run the following command in the Spark directory to start Spark Shell:
sh spark-shell
-
Run the following command to create a DataFrame named
starrocksSparkDF
on thescore_board
table which belongs to thetest
database:scala> val starrocksSparkDF = spark.read.format("starrocks") .option("starrocks.table.identifier", s"test.score_board") .option("starrocks.fe.http.url", s"<fe_host>:<fe_http_port>") .option("starrocks.fe.jdbc.url", s"jdbc:mysql://<fe_host>:<fe_query_port>") .option("starrocks.user", s"root") .option("starrocks.password", s"") .load()
-
Read data from the DataFrame. For example, if you want to read the first 10 rows, run the following command:
scala> starrocksSparkDF.show(10)
Spark returns the following data:
+---+-----+-----+ | id| name|score| +---+-----+-----+ | 1| Bob| 21| | 2| Stan| 21| | 3| Sam| 22| | 4| Tony| 22| | 5|Alice| 22| | 6| Lucy| 23| | 7|Polly| 23| | 8| Tom| 23| | 9| Rose| 24| | 10|Jerry| 24| +---+-----+-----+ only showing top 10 rows
NOTE
By default, if you do not specify the number of rows you want to read, Spark returns the first 20 rows.
-
Run the following command in the Spark directory to start Spark Shell:
sh spark-shell
-
Run the following command to create an RDD named
starrocksSparkRDD
on thescore_board
table which belongs to thetest
database.scala> import com.starrocks.connector.spark._ scala> val starrocksSparkRDD = sc.starrocksRDD ( tableIdentifier = Some("test.score_board"), cfg = Some(Map( "starrocks.fenodes" -> "<fe_host>:<fe_http_port>", "starrocks.request.auth.user" -> "root", "starrocks.request.auth.password" -> "" )) )
-
Read data from the RDD. For example, if you want to read the first 10 elements, run the following command:
scala> starrocksSparkRDD.take(10)
Spark returns the following data:
res0: Array[AnyRef] = Array([1, Bob, 21], [2, Stan, 21], [3, Sam, 22], [4, Tony, 22], [5, Alice, 22], [6, Lucy, 23], [7, Polly, 23], [8, Tom, 23], [9, Rose, 24], [10, Jerry, 24])
To read the entire RDD, run the following command:
scala> starrocksSparkRDD.collect()
Spark returns the following data:
res1: Array[AnyRef] = Array([1, Bob, 21], [2, Stan, 21], [3, Sam, 22], [4, Tony, 22], [5, Alice, 22], [6, Lucy, 23], [7, Polly, 23], [8, Tom, 23], [9, Rose, 24], [10, Jerry, 24], [11, Jason, 24], [12, Lily, 25], [13, Stephen, 25], [14, David, 25], [15, Eddie, 26], [16, Kate, 27], [17, Cathy, 27], [18, Judy, 27], [19, Julia, 28], [20, Robert, 28], [21, Jack, 29])
When you read data from StarRocks using the Spark connector, you can use the starrocks.filter.query
parameter to specify filter conditions based on which Spark prunes partitions, buckets, and prefix indexes to reduce the cost of data pulling. This section uses Spark DataFrame as an example to show how this is achieved.
Component | Version |
---|---|
Spark | Spark 2.4.4 and Scala 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_302) |
StarRocks | 2.2.0 |
Spark connector | starrocks-spark2_2.11-1.0.0.jar |
Do as follows to prepare a sample table:
-
Go to the
test
database and create a table namedmytable
.MySQL [test]> CREATE TABLE `mytable` ( `k` int(11) NULL COMMENT "bucket", `b` int(11) NULL COMMENT "", `dt` datetime NULL COMMENT "", `v` int(11) NULL COMMENT "" ) ENGINE=OLAP DUPLICATE KEY(`k`,`b`, `dt`) COMMENT "OLAP" PARTITION BY RANGE(`dt`) ( PARTITION p202201 VALUES [('2022-01-01 00:00:00'), ('2022-02-01 00:00:00')), PARTITION p202202 VALUES [('2022-02-01 00:00:00'), ('2022-03-01 00:00:00')), PARTITION p202203 VALUES [('2022-03-01 00:00:00'), ('2022-04-01 00:00:00')) ) DISTRIBUTED BY HASH(`k`) PROPERTIES ( "replication_num" = "3" );
-
Insert data into
mytable
.MySQL [test]> INSERT INTO mytable VALUES (1, 11, '2022-01-02 08:00:00', 111), (2, 22, '2022-02-02 08:00:00', 222), (3, 33, '2022-03-02 08:00:00', 333);
-
Query the
mytable
table.MySQL [test]> select * from mytable; +------+------+---------------------+------+ | k | b | dt | v | +------+------+---------------------+------+ | 1 | 11 | 2022-01-02 08:00:00 | 111 | | 2 | 22 | 2022-02-02 08:00:00 | 222 | | 3 | 33 | 2022-03-02 08:00:00 | 333 | +------+------+---------------------+------+ 3 rows in set (0.01 sec)
-
Run the following command in the Spark directory to create a DataFrame named
df
onmytable
table which belongs to thetest
database:scala> val df = spark.read.format("starrocks") .option("starrocks.table.identifier", s"test.mytable") .option("starrocks.fenodes", s"<fe_host>:<fe_http_port>") .option("user", s"root") .option("password", s"") .load()
-
View the FE log file fe.log of your StarRocks cluster, and find the SQL statement executed to read data. Example:
2022-08-09 18:57:38,091 INFO (nioEventLoopGroup-3-10|196) [TableQueryPlanAction.executeWithoutPassword():126] receive SQL statement [select `k`,`b`,`dt`,`v` from `test`.`mytable`] from external service [ user ['root'@'%']] for database [test] table [mytable]
-
In the
test
database, use EXPLAIN to obtain the execution plan of the SELECTk
,b
,dt
,v
fromtest
.mytable
statement:MySQL [test]> EXPLAIN select `k`,`b`,`dt`,`v` from `test`.`mytable`; +-----------------------------------------------------------------------+ | Explain String | +-----------------------------------------------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:1: k | 2: b | 3: dt | 4: v | | PARTITION: UNPARTITIONED | | | | RESULT SINK | | | | 1:EXCHANGE | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 01 | | UNPARTITIONED | | | | 0:OlapScanNode | | TABLE: mytable | | PREAGGREGATION: ON | | partitions=3/3 | | rollup: mytable | | tabletRatio=9/9 | | tabletList=41297,41299,41301,41303,41305,41307,41309,41311,41313 | | cardinality=3 | | avgRowSize=4.0 | | numNodes=0 | +-----------------------------------------------------------------------+ 26 rows in set (0.00 sec)
In this example, no pruning is performed. Therefore, Spark scans all of the three partitions (as suggested by partitions=3/3
) that hold data, and scans all of the 9 tablets (as suggested by tabletRatio=9/9
) in those three partitions.
-
Run the following command, in which you use the
starrocks.filter.query
parameter to specify a filter conditiondt='2022-01-02 08:00:00
for partition pruning, in the Spark directory to create a DataFrame nameddf
on themytable
table which belongs to thetest
database:scala> val df = spark.read.format("starrocks") .option("starrocks.table.identifier", s"test.mytable") .option("starrocks.fenodes", s"<fe_host>:<fe_http_port>") .option("user", s"root") .option("password", s"") .option("starrocks.filter.query", "dt='2022-01-02 08:00:00'") .load()
-
View the FE log file fe.log of your StarRocks cluster, and find the SQL statement executed to read data. Example:
2022-08-09 19:02:31,253 INFO (nioEventLoopGroup-3-14|204) [TableQueryPlanAction.executeWithoutPassword():126] receive SQL statement [select `k`,`b`,`dt`,`v` from `test`.`mytable` where dt='2022-01-02 08:00:00'] from external service [ user ['root'@'%']] for database [test] table [mytable]
-
In the
test
database, use EXPLAIN to obtain the execution plan of the SELECTk
,b
,dt
,v
fromtest
.mytable
where dt='2022-01-02 08:00:00' statement:MySQL [test]> EXPLAIN select `k`,`b`,`dt`,`v` from `test`.`mytable` where dt='2022-01-02 08:00:00'; +------------------------------------------------+ | Explain String | +------------------------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:1: k | 2: b | 3: dt | 4: v | | PARTITION: UNPARTITIONED | | | | RESULT SINK | | | | 1:EXCHANGE | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 01 | | UNPARTITIONED | | | | 0:OlapScanNode | | TABLE: mytable | | PREAGGREGATION: ON | | PREDICATES: 3: dt = '2022-01-02 08:00:00' | | partitions=1/3 | | rollup: mytable | | tabletRatio=3/3 | | tabletList=41297,41299,41301 | | cardinality=1 | | avgRowSize=20.0 | | numNodes=0 | +------------------------------------------------+ 27 rows in set (0.01 sec)
In this example, only partition pruning is performed, whereas bucket pruning is not. Therefore, Spark scans one of the three partitions (as suggested by partitions=1/3
) and all of the tablets (as suggested by tabletRatio=3/3
) in that partition.
-
Run the following command, in which you use the
starrocks.filter.query
parameter to specify a filter conditionk=1
for bucket pruning, in the Spark directory to create a DataFrame nameddf
on themytable
table which belongs to thetest
database:scala> val df = spark.read.format("starrocks") .option("starrocks.table.identifier", s"test.mytable") .option("starrocks.fenodes", s"<fe_host>:<fe_http_port>") .option("user", s"root") .option("password", s"") .option("starrocks.filter.query", "k=1") .load()
-
View the FE log file fe.log of your StarRocks cluster, and find the SQL statement executed to read data. Example:
2022-08-09 19:04:44,479 INFO (nioEventLoopGroup-3-16|208) [TableQueryPlanAction.executeWithoutPassword():126] receive SQL statement [select `k`,`b`,`dt`,`v` from `test`.`mytable` where k=1] from external service [ user ['root'@'%']] for database [test] table [mytable]
-
In the
test
database, use EXPLAIN to obtain the execution plan of the SELECTk
,b
,dt
,v
fromtest
.mytable
where k=1 statement:MySQL [test]> EXPLAIN select `k`,`b`,`dt`,`v` from `test`.`mytable` where k=1; +------------------------------------------+ | Explain String | +------------------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:1: k | 2: b | 3: dt | 4: v | | PARTITION: UNPARTITIONED | | | | RESULT SINK | | | | 1:EXCHANGE | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 01 | | UNPARTITIONED | | | | 0:OlapScanNode | | TABLE: mytable | | PREAGGREGATION: ON | | PREDICATES: 1: k = 1 | | partitions=3/3 | | rollup: mytable | | tabletRatio=3/9 | | tabletList=41299,41305,41311 | | cardinality=1 | | avgRowSize=20.0 | | numNodes=0 | +------------------------------------------+ 27 rows in set (0.01 sec)
In this example, only bucket pruning is performed, whereas partition pruning is not. Therefore, Spark scans all of the three partitions (as suggested by partitions=3/3
) that hold data, and scans all of the three tablets (as suggested by tabletRatio=3/9
) to retrieve Hash values that meet the k = 1
filter condition within those three partitions.
-
Run the following command, in which you use the
starrocks.filter.query
parameter to specify two filter conditionsk=7
anddt='2022-01-02 08:00:00'
for bucket pruning and partition pruning, in the Spark directory to create a DataFrame nameddf
on themytable
table on thetest
database:scala> val df = spark.read.format("starrocks") .option("starrocks.table.identifier", s"test.mytable") .option("starrocks.fenodes", s"<fe_host>:<fe_http_port>") .option("user", s"") .option("password", s"") .option("starrocks.filter.query", "k=7 and dt='2022-01-02 08:00:00'") .load()
-
View the FE log file fe.log of your StarRocks cluster, and find the SQL statement executed to read data. Example:
2022-08-09 19:06:34,939 INFO (nioEventLoopGroup-3-18|212) [TableQueryPlanAction.executeWithoutPassword():126] receive SQL statement [select `k`,`b`,`dt`,`v` from `test`.`mytable` where k=7 and dt='2022-01-02 08:00:00'] from external service [ user ['root'@'%']] for database [test] t able [mytable]
-
In the
test
database, use EXPLAIN to obtain the execution plan of the SELECTk
,b
,dt
,v
fromtest
.mytable
where k=7 and dt='2022-01-02 08:00:00' statement:MySQL [test]> EXPLAIN select `k`,`b`,`dt`,`v` from `test`.`mytable` where k=7 and dt='2022-01-02 08:00:00'; +----------------------------------------------------------+ | Explain String | +----------------------------------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:1: k | 2: b | 3: dt | 4: v | | PARTITION: RANDOM | | | | RESULT SINK | | | | 0:OlapScanNode | | TABLE: mytable | | PREAGGREGATION: ON | | PREDICATES: 1: k = 7, 3: dt = '2022-01-02 08:00:00' | | partitions=1/3 | | rollup: mytable | | tabletRatio=1/3 | | tabletList=41301 | | cardinality=1 | | avgRowSize=20.0 | | numNodes=0 | +----------------------------------------------------------+ 17 rows in set (0.00 sec)
In this example, both partition pruning and bucket pruning are performed. Therefore, Spark scans only one of the three partitions (as suggested by partitions=1/3
) and only one tablet (as suggested by tabletRatio=1/3
) in that partition.
-
Insert more data records into a partition of the
mytable
table which belongs to thetest
database:MySQL [test]> INSERT INTO mytable VALUES (1, 11, "2022-01-02 08:00:00", 111), (3, 33, "2022-01-02 08:00:00", 333), (3, 33, "2022-01-02 08:00:00", 333), (3, 33, "2022-01-02 08:00:00", 333);
-
Query the
mytable
table:MySQL [test]> SELECT * FROM mytable; +------+------+---------------------+------+ | k | b | dt | v | +------+------+---------------------+------+ | 1 | 11 | 2022-01-02 08:00:00 | 111 | | 1 | 11 | 2022-01-02 08:00:00 | 111 | | 3 | 33 | 2022-01-02 08:00:00 | 333 | | 3 | 33 | 2022-01-02 08:00:00 | 333 | | 3 | 33 | 2022-01-02 08:00:00 | 333 | | 2 | 22 | 2022-02-02 08:00:00 | 222 | | 3 | 33 | 2022-03-02 08:00:00 | 333 | +------+------+---------------------+------+ 7 rows in set (0.01 sec)
-
Run the following command, in which you use the
starrocks.filter.query
parameter to specify a filter conditionk=1
for prefix index filtering, in the Spark directory to create a DataFrame nameddf
on themytable
table which belongs to thetest
database:scala> val df = spark.read.format("starrocks") .option("starrocks.table.identifier", s"test.mytable") .option("starrocks.fenodes", s"<fe_host>:<fe_http_port>") .option("user", s"root") .option("password", s"") .option("starrocks.filter.query", "k=1") .load()
-
In the
test
database, setis_report_success
totrue
to enable profile reporting:MySQL [test]> SET is_report_success = true; Query OK, 0 rows affected (0.00 sec)
-
Use a browser to open the
http://<fe_host>:<http_http_port>/query
page, and view the profile of the SELECT * FROM mytable where k=1 statement. Example:OLAP_SCAN (plan_node_id=0): CommonMetrics: - CloseTime: 1.255ms - OperatorTotalTime: 1.404ms - PeakMemoryUsage: 0.00 - PullChunkNum: 8 - PullRowNum: 2 - __MAX_OF_PullRowNum: 2 - __MIN_OF_PullRowNum: 0 - PullTotalTime: 148.60us - PushChunkNum: 0 - PushRowNum: 0 - PushTotalTime: 0ns - SetFinishedTime: 136ns - SetFinishingTime: 129ns UniqueMetrics: - Predicates: 1: k = 1 - Rollup: mytable - Table: mytable - BytesRead: 88.00 B - __MAX_OF_BytesRead: 88.00 B - __MIN_OF_BytesRead: 0.00 - CachedPagesNum: 0 - CompressedBytesRead: 844.00 B - __MAX_OF_CompressedBytesRead: 844.00 B - __MIN_OF_CompressedBytesRead: 0.00 - CreateSegmentIter: 18.582us - IOTime: 4.425us - LateMaterialize: 17.385us - PushdownPredicates: 3 - RawRowsRead: 2 - __MAX_OF_RawRowsRead: 2 - __MIN_OF_RawRowsRead: 0 - ReadPagesNum: 12 - __MAX_OF_ReadPagesNum: 12 - __MIN_OF_ReadPagesNum: 0 - RowsRead: 2 - __MAX_OF_RowsRead: 2 - __MIN_OF_RowsRead: 0 - ScanTime: 154.367us - SegmentInit: 95.903us - BitmapIndexFilter: 0ns - BitmapIndexFilterRows: 0 - BloomFilterFilterRows: 0 - ShortKeyFilterRows: 3 - __MAX_OF_ShortKeyFilterRows: 3 - __MIN_OF_ShortKeyFilterRows: 0 - ZoneMapIndexFilterRows: 0 - SegmentRead: 2.559us - BlockFetch: 2.187us - BlockFetchCount: 2 - __MAX_OF_BlockFetchCount: 2 - __MIN_OF_BlockFetchCount: 0 - BlockSeek: 7.789us - BlockSeekCount: 2 - __MAX_OF_BlockSeekCount: 2 - __MIN_OF_BlockSeekCount: 0 - ChunkCopy: 25ns - DecompressT: 0ns - DelVecFilterRows: 0 - IndexLoad: 0ns - PredFilter: 353ns - PredFilterRows: 0 - RowsetsReadCount: 7 - SegmentsReadCount: 3 - __MAX_OF_SegmentsReadCount: 2 - __MIN_OF_SegmentsReadCount: 0 - TotalColumnsDataPageCount: 8 - __MAX_OF_TotalColumnsDataPageCount: 8 - __MIN_OF_TotalColumnsDataPageCount: 0 - UncompressedBytesRead: 508.00 B - __MAX_OF_UncompressedBytesRead: 508.00 B - __MIN_OF_UncompressedBytesRead: 0.00
In this example, the filter condition k = 1
can hit the prefix index. Therefore, Spark can filter out three rows (as suggested by ShortKeyFilterRows: 3
).