Skip to content

Latest commit

 

History

History
368 lines (267 loc) · 10.3 KB

18、SparkSQL-数据的加载和保存.md

File metadata and controls

368 lines (267 loc) · 10.3 KB

一、普通文件的加载和保存

1、加载数据

打开一个spark-shell,输入spark.read.之后按一下tab键会出现好多数据加载的方式

scala> spark.read.
csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile

这里以几个经典的读取数据方式为例,基本语法:

// load 通用数据加载方式
spark.read[.format("...")][.option("...")].load("...")

// json 读取json文件
spark.read.json("...")

// csv
spark.read.format("csv").option("seq",";").option("inferSchema","true").option("header","true").load("...")
  • format("..."):指定加载的数据类型,可以是csvjsontxtjdbc

  • option("..."):在jdbc格式下需要传入jdbc相应的参数,url、user、password、和dbtable等

  • spark.read.load是加载数据的通用方式,默认情况下只能加载parquet格式的数据:

案例:

# 使用load默认读取parquet格式的数据
scala> spark.read.load("examples/src/main/resources/users.parquet").show
+------+--------------+----------------+                                        
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

# 读取json格式的数据
# 等同于spark.read.json(path)
scala> spark.read.format("json").load("examples/src/main/resources/people.json").show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

# 也可以按照文件查询,哪种格式的数据只需要在前面加个文件格式就好了
scala> spark.sql("select * from json.`examples/src/main/resources/people.json`").show
22/05/09 19:28:08 WARN HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

# 读取CSV格式的数据
scala> spark.read.format("csv").option("sep",";").option("inferSchema","true").option("header","true").load("examples/src/main/resources/people.csv").show
+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
+-----+---+---------+

2、保存数据

DataFrame.write.save是保存数据的通用方法:

DataFrame.write.mode("方式").format("文件格式")[.options("...")].save("路径")
  • mode:指定文件保存的方式,包括追加、覆盖、忽略等:

    Scala/Java Any Language Meaning
    SaveMode.ErrorIfExists(default) "error"(default) 如果文件已经存在则抛出异常
    SaveMode.Append "append" 如果文件已经存在则追加
    SaveMode.Overwrite "overwrite" 如果文件已经存储则覆盖
    SaveMode.Ignore "ignore" 如果文件已经存在则忽略
  • format("..."):指定保存的数据类型,包括csv\jdbc\json\orc\parquet和textFile等

  • save("..."):需要传递保存数据的路径

  • option("..."):在jdbc格式下需要传入JDBC相应参数,url\user\password和dbtable等

案例:

# 创建一个DataFrame
scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

# 保存文件
scala> df.write.json("output")

# 再次写入就会保存
scala> df.write.json("output")
org.apache.spark.sql.AnalysisException: path file:/opt/module/spark-local/output already exists.;

# 追加的形式保存文件
scala> df.write.mode("append").json("output")

# 覆盖
scala> df.write.mode("overwrite").json("output")

# 忽略
scala> df.write.mode("ignore").json("output")

# 通用方式保存文件
scala> df.write.mode("overwrite").format("json").save("output")

二、MySQL 读写

在开始之前,先打开hadoop102主机的数据库创建一张表,并导入一些数据:

create database spark_sql;
use spark_sql;

create table user(
	id int primary key AUTO_INCREMENT,
	name varchar(128),
	age int
);

insert into user (name,age) VALUES ("wzq",12);
insert into user (name,age) VALUES ("wtt",12);
insert into user (name,age) VALUES ("haha",18);
insert into user (name,age) VALUES ("abc",16);
insert into user (name,age) VALUES ("bbb",20);
insert into user (name,age) VALUES ("na",60);

这部分在idea里面操作,打开idea项目的pom.xml文件,第一步添加mysql驱动依赖:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.27</version>
</dependency>

操作MySQL表的数据,需要添加option

package com.wzq.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

import java.util.Properties

object SparkSQL05_MySQL {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("MySQL")
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    // 读取MySQL表中的数据
    val df: DataFrame = spark.read.format("jdbc")
      .option("url", "jdbc:mysql://hadoop102:3306/spark_sql")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "root")
      .option("dbtable", "user")
      .load()

    df.show()

    // 第二种读取数据的方式
    val properties: Properties = new Properties()
    properties.setProperty("user", "root")
    properties.setProperty("password", "root")
    val df1: DataFrame = spark.read.jdbc("jdbc:mysql://hadoop102:3306/spark_sql", "user", properties)
    df1.show()

    // 保存数据
    df.write.format("jdbc")
      .option("url", "jdbc:mysql://hadoop102:3306/spark_sql")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "root")
      .option("dbtable", "user1")
      .mode(SaveMode.Append)
      .save()

    // 保存数据的第二种方式
    df.write.mode(SaveMode.Append).jdbc("jdbc:mysql://hadoop102:3306/spark_sql", "user2", properties)

    spark.close()
  }
}

三、Hive 读写

1、操作内置Hive

在实际中,基本不用内置的Hive

2、外部的Hive

在这部分开始之前应该启动HDFSHive

[wzq@hadoop102 spark-local]$ myhadoop.sh start
[wzq@hadoop102 spark-local]$ hiveservices.sh start
[wzq@hadoop102 spark-local]$ hiveservices.sh status
Metastore服务运行正常
HiveServer2服务运行正常

Spark连接外部的Hive其实很简单:

  • Hive配置文件目录下的hive-site.xml拷贝到sparkconf/目录下
  • MySQL驱动拷贝到Spark的jars/目录下
  • 如果访问不到HDFS,则需要把hadoop的core-site.xmlhdfs-site.xml拷贝Spark的conf/目录下即可
  • 重启spark-shell
scala> spark.sql("show databases").show
22/05/09 20:10:32 WARN HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist
+---------+
|namespace|
+---------+
|  db_hive|
|  default|
|gulivideo|
+---------+

scala> spark.sql("use db_hive").show

scala> spark.sql("show tables").show
+--------+------------------+-----------+
|database|         tableName|isTemporary|
+--------+------------------+-----------+
| db_hive|          bigtable|      false|
| db_hive|         bigtable2|      false|
| db_hive|    bigtable_buck1|      false|
| db_hive|    bigtable_buck2|      false|
| db_hive|          business|      false|
| db_hive|              dept|      false|
| db_hive|          dept_par|      false|
| db_hive|         dept_par2|      false|
| db_hive|         dept_par3|      false|
| db_hive|dept_par_by_deptno|      false|
| db_hive|               emp|      false|
| db_hive|           emp_sex|      false|
| db_hive|         jointable|      false|
| db_hive|       log_parquet|      false|
| db_hive|          log_text|      false|
| db_hive|      log_text_orc|      false|
| db_hive|        movie_info|      false|
| db_hive|       nullidtable|      false|
| db_hive|       person_info|      false|
| db_hive|             score|      false|
+--------+------------------+-----------+
only showing top 20 rows

这样就成功的连接到了Hive的仓库

如果觉得这样每次都要写spark.sql("...").show,Spark为用户提供了两种方式只写sql即可:

# 方式一:运行spark-sql
[wzq@hadoop102 spark-local]$ bin/spark-sql

# 方式二:beeline的方式
[wzq@hadoop102 spark-local]$ sbin/start-thriftserver.sh
[wzq@hadoop102 spark-local]$ bin/beeline -u jdbc:hive2://hadoop102:10000 -n wzq

代码操作Hive,在开始之前,也是首先要导入HiveMySQL的依赖,因为Hive的元数据存储在MySQL:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.12</artifactId>
    <version>3.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>3.1.2</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.27</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.10.0</version>
</dependency>
<!-- 添加对应的 jackson-core -->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    <version>2.10.0</version>
</dependency>

还需要把hive-site.xml拷贝到resources里面

需要注意的是在创建SparkSession对象的时候,需要加上.enableHiveSupport

然后的操作可以通过spark.sql("...")HiveQL

参考资料