Used to read and write(developing) tsfile in spark.
将一个或多个TsFile展示成SparkSQL中的一张表。允许指定单个目录,或使用通配符匹配多个目录。如果是多个TsFile,schema将保留各个TsFile中sensor的并集。
https://github.com/thulab/tsfile.git
The versions required for Spark and Java are as follow:
Spark Version | Scala Version | Java Version | TsFile |
---|---|---|---|
2.0+ |
2.11 |
1.8 |
0.4.0 |
ATTENTION: Please check the jar packages in the root directory of your spark and replace libthrift-0.9.2.jar and libfb303-0.9.2.jar with libthrift-0.9.1.jar and libfb303-0.9.1.jar respectively.
This library uses the following mapping the data type from TsFile to SparkSQL:
TsFile | SparkSQL |
---|---|
INT32 | IntegerType |
INT64 | LongType |
FLOAT | FloatType |
DOUBLE | DoubleType |
BYTE_ARRAY | StringType |
The set of time-series data in section "Time-series Data" is used here to illustrate the mapping from TsFile Schema to SparkSQL Table Stucture.
delta_object:root.car.turbine1 | |||||
---|---|---|---|---|---|
sensor_1 | sensor_2 | sensor_3 | |||
time | value | time | value | time | value |
1 | 1.2 | 1 | 20 | 2 | 50 |
3 | 1.4 | 2 | 20 | 4 | 51 |
5 | 1.1 | 3 | 21 | 6 | 52 |
7 | 1.8 | 4 | 20 | 8 | 53 |
There are two reserved columns in Spark SQL Table:
time
: Timestamp, LongTypedelta_object
: Delta_object ID, StringType
The SparkSQL Table Structure is as follow:
time(LongType) | delta_object(StringType) | sensor_1(FloatType) | sensor_2(IntType) | sensor_3(IntType) |
---|---|---|---|---|
1 | root.car.turbine1 | 1.2 | 20 | null |
2 | root.car.turbine1 | null | 20 | 50 |
3 | root.car.turbine1 | 1.4 | 21 | null |
4 | root.car.turbine1 | null | 20 | 51 |
5 | root.car.turbine1 | 1.1 | null | null |
6 | root.car.turbine1 | null | null | 52 |
7 | root.car.turbine1 | 1.8 | null | null |
8 | root.car.turbine1 | null | null | 53 |
If you want to unfold the delta_object column into multi columns you should add an option when read and write:
e.g.
option("delta_object_name" -> "root.device.turbine")
The "delta_object_name" is reserved key.
Then The SparkSQL Table Structure is as follow:
time(LongType) | device(StringType) | turbine(StringType) | sensor_1(FloatType) | sensor_2(IntType) | sensor_3(IntType) |
---|---|---|---|---|---|
1 | car | turbine1 | 1.2 | 20 | null |
2 | car | turbine1 | null | 20 | 50 |
3 | car | turbine1 | 1.4 | 21 | null |
4 | car | turbine1 | null | 20 | 51 |
5 | car | turbine1 | 1.1 | null | null |
6 | car | turbine1 | null | null | 52 |
7 | car | turbine1 | 1.8 | null | null |
8 | car | turbine1 | null | null | 53 |
Then you can group by any level in delta_object. And then with the same option you can write this dataframe to TsFile.
mvn clean scala:compile compile package
The path of 'test.tsfile' used in the following examples is "data/test.tsfile". Please upload 'test.tsfile' to hdfs in advance and the directory is "/test.tsfile".
-
Example 1
import cn.edu.tsinghua.tsfile._ //read data in TsFile and create a table val df = spark.read.tsfile("/test.tsfile") df.createOrReplaceTempView("tsfile_table") //query with filter val newDf = spark.sql("select * from tsfile_table where s1 > 1.2").cache() newDf.show()
-
Example 2
val df = spark.read .format("cn.edu.tsinghua.tsfile") .load("/test.tsfile ") df.filter("time < 10").show()
-
Example 3
//create a table in SparkSQL and build relation with a TsFile spark.sql("create temporary view tsfile_table using cn.edu.tsinghua.tsfile options(path = \"test.ts\")") spark.sql("select * from tsfile_table where s1 > 1.2").show()
-
Example 4(using options to read)
import cn.edu.tsinghua.tsfile._ val df = spark.read.option("delta_object_name", "root.device.turbine").tsfile("/test.tsfile") //create a table in SparkSQL and build relation with a TsFile df.createOrReplaceTempView("tsfile_table") spark.sql("select * from tsfile_table where turbine = 'd1' and device = 'car' and time < 10").show()
-
Example 5(write)
import cn.edu.tsinghua.tsfile._ val df = spark.read.tsfile("/test.tsfile").write.tsfile("/out")
-
Example 6(using options to write)
import cn.edu.tsinghua.tsfile._ val df = spark.read.option("delta_object_name", "root.device.turbine").tsfile("/test.tsfile") df.write.option("delta_object_name", "root.device.turbine").tsfile("/out")
./spark-2.0.1-bin-hadoop2.7/bin/spark-shell --jars tsfile-0.4.0.jar,tsfile-spark-connector-0.4.0.jar
ATTENTION:
- Please replace "spark-2.0.1-bin-hadoop2.7/bin/spark-shell" with the real path of your spark-shell.
- Multiple jar packages are separated by commas without any spaces.
- The latest version used is v0.4.0.
. /spark-2.0.1-bin-hadoop2.7/bin/spark-shell --jars tsfile-0.4.0.jar,tsfile-spark-connector-0.4.0.jar --master spark://ip:7077