打开一个spark-shell
,输入spark.read.
之后按一下tab
键会出现好多数据加载的方式
scala> spark.read.
csv format jdbc json load option options orc parquet schema table text textFile
这里以几个经典的读取数据方式为例,基本语法:
// load 通用数据加载方式
spark.read[.format("...")][.option("...")].load("...")
// json 读取json文件
spark.read.json("...")
// csv
spark.read.format("csv").option("seq",";").option("inferSchema","true").option("header","true").load("...")
-
format("...")
:指定加载的数据类型,可以是csv
、json
、txt
、jdbc
等 -
option("...")
:在jdbc
格式下需要传入jdbc相应的参数,url、user、password、和dbtable等 -
spark.read.load
是加载数据的通用方式,默认情况下只能加载parquet
格式的数据:
案例:
# 使用load默认读取parquet格式的数据
scala> spark.read.load("examples/src/main/resources/users.parquet").show
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+
# 读取json格式的数据
# 等同于spark.read.json(path)
scala> spark.read.format("json").load("examples/src/main/resources/people.json").show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
# 也可以按照文件查询,哪种格式的数据只需要在前面加个文件格式就好了
scala> spark.sql("select * from json.`examples/src/main/resources/people.json`").show
22/05/09 19:28:08 WARN HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
# 读取CSV格式的数据
scala> spark.read.format("csv").option("sep",";").option("inferSchema","true").option("header","true").load("examples/src/main/resources/people.csv").show
+-----+---+---------+
| name|age| job|
+-----+---+---------+
|Jorge| 30|Developer|
| Bob| 32|Developer|
+-----+---+---------+
DataFrame.write.save
是保存数据的通用方法:
DataFrame.write.mode("方式").format("文件格式")[.options("...")].save("路径")
-
mode
:指定文件保存的方式,包括追加、覆盖、忽略等:Scala/Java Any Language Meaning SaveMode.ErrorIfExists(default) "error"(default) 如果文件已经存在则抛出异常 SaveMode.Append "append" 如果文件已经存在则追加 SaveMode.Overwrite "overwrite" 如果文件已经存储则覆盖 SaveMode.Ignore "ignore" 如果文件已经存在则忽略 -
format("...")
:指定保存的数据类型,包括csv\jdbc\json\orc\parquet和textFile等 -
save("...")
:需要传递保存数据的路径 -
option("...")
:在jdbc
格式下需要传入JDBC相应参数,url\user\password和dbtable等
案例:
# 创建一个DataFrame
scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
# 保存文件
scala> df.write.json("output")
# 再次写入就会保存
scala> df.write.json("output")
org.apache.spark.sql.AnalysisException: path file:/opt/module/spark-local/output already exists.;
# 追加的形式保存文件
scala> df.write.mode("append").json("output")
# 覆盖
scala> df.write.mode("overwrite").json("output")
# 忽略
scala> df.write.mode("ignore").json("output")
# 通用方式保存文件
scala> df.write.mode("overwrite").format("json").save("output")
在开始之前,先打开hadoop102
主机的数据库创建一张表,并导入一些数据:
create database spark_sql;
use spark_sql;
create table user(
id int primary key AUTO_INCREMENT,
name varchar(128),
age int
);
insert into user (name,age) VALUES ("wzq",12);
insert into user (name,age) VALUES ("wtt",12);
insert into user (name,age) VALUES ("haha",18);
insert into user (name,age) VALUES ("abc",16);
insert into user (name,age) VALUES ("bbb",20);
insert into user (name,age) VALUES ("na",60);
这部分在idea里面操作,打开idea项目的pom.xml
文件,第一步添加mysql
驱动依赖:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
操作MySQL表的数据,需要添加option
package com.wzq.bigdata.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import java.util.Properties
object SparkSQL05_MySQL {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("MySQL")
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
// 读取MySQL表中的数据
val df: DataFrame = spark.read.format("jdbc")
.option("url", "jdbc:mysql://hadoop102:3306/spark_sql")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "root")
.option("dbtable", "user")
.load()
df.show()
// 第二种读取数据的方式
val properties: Properties = new Properties()
properties.setProperty("user", "root")
properties.setProperty("password", "root")
val df1: DataFrame = spark.read.jdbc("jdbc:mysql://hadoop102:3306/spark_sql", "user", properties)
df1.show()
// 保存数据
df.write.format("jdbc")
.option("url", "jdbc:mysql://hadoop102:3306/spark_sql")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "root")
.option("dbtable", "user1")
.mode(SaveMode.Append)
.save()
// 保存数据的第二种方式
df.write.mode(SaveMode.Append).jdbc("jdbc:mysql://hadoop102:3306/spark_sql", "user2", properties)
spark.close()
}
}
在实际中,基本不用内置的Hive
在这部分开始之前应该启动HDFS
与Hive
:
[wzq@hadoop102 spark-local]$ myhadoop.sh start
[wzq@hadoop102 spark-local]$ hiveservices.sh start
[wzq@hadoop102 spark-local]$ hiveservices.sh status
Metastore服务运行正常
HiveServer2服务运行正常
Spark连接外部的Hive其实很简单:
- 把
Hive
配置文件目录下的hive-site.xml
拷贝到spark
的conf/
目录下 - 把
MySQL
驱动拷贝到Spark的jars/
目录下 - 如果访问不到HDFS,则需要把hadoop的
core-site.xml
和hdfs-site.xml
拷贝Spark的conf/
目录下即可 - 重启
spark-shell
scala> spark.sql("show databases").show
22/05/09 20:10:32 WARN HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist
+---------+
|namespace|
+---------+
| db_hive|
| default|
|gulivideo|
+---------+
scala> spark.sql("use db_hive").show
scala> spark.sql("show tables").show
+--------+------------------+-----------+
|database| tableName|isTemporary|
+--------+------------------+-----------+
| db_hive| bigtable| false|
| db_hive| bigtable2| false|
| db_hive| bigtable_buck1| false|
| db_hive| bigtable_buck2| false|
| db_hive| business| false|
| db_hive| dept| false|
| db_hive| dept_par| false|
| db_hive| dept_par2| false|
| db_hive| dept_par3| false|
| db_hive|dept_par_by_deptno| false|
| db_hive| emp| false|
| db_hive| emp_sex| false|
| db_hive| jointable| false|
| db_hive| log_parquet| false|
| db_hive| log_text| false|
| db_hive| log_text_orc| false|
| db_hive| movie_info| false|
| db_hive| nullidtable| false|
| db_hive| person_info| false|
| db_hive| score| false|
+--------+------------------+-----------+
only showing top 20 rows
这样就成功的连接到了Hive
的仓库
如果觉得这样每次都要写spark.sql("...").show
,Spark为用户提供了两种方式只写sql即可:
# 方式一:运行spark-sql
[wzq@hadoop102 spark-local]$ bin/spark-sql
# 方式二:beeline的方式
[wzq@hadoop102 spark-local]$ sbin/start-thriftserver.sh
[wzq@hadoop102 spark-local]$ bin/beeline -u jdbc:hive2://hadoop102:10000 -n wzq
代码操作Hive,在开始之前,也是首先要导入Hive
和MySQL
的依赖,因为Hive的元数据存储在MySQL:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.0</version>
</dependency>
<!-- 添加对应的 jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.0</version>
</dependency>
还需要把hive-site.xml
拷贝到resources
里面
需要注意的是在创建SparkSession
对象的时候,需要加上.enableHiveSupport
然后的操作可以通过spark.sql("...")
写HiveQL