浅析Spark SQL

Posted by JustDoDT on May 21, 2019

概述

Spark SQL 不仅仅是SQL,她还可以处理其他的结构化数据,如Orc/Parquet/JSON;她诞生于Spark 1.0,毕业于Spark 1.3;现在为Spark SQL, DataFrames and Datasets;以前就叫Spark SQL。

Hive On Spark 只需要设置为set hive.execution.engine=spark; 生产上慎用。

SparkSQL

DataFrame是Spark1.3出来的;以前叫做SchemaRDD

DataFrame 和Pandas类似,但是Pandas是单机的,DataFrame是分布式的。DF可以等价为一张表,她是带有列名的。她提供了更多的关于数据结构和数据计算(列式存储,列式更快)的信息。

Dataset是Spark1.6出现的;她支持Scala和Java,但是不支持Python

在Scala API中,DataFrame = Dataset[Row]

Spark SQL操作Hive

[hadoop@hadoop001 conf]$ cp /home/hadoop/app/hive-1.1.0-cdh5.7.0/conf/hive-site.xml /home/hadoop/app/spark-2.4.2-bin-2.6.0-cdh5.7.0/conf/

然后再启动Spark-shell;但是会发现报错,报错信息如下

Caused by: org.datanucleus.exceptions.NucleusException: Attempt to invoke the "BONECP" plugin to create a ConnectionPool gave an error : The specified datastore driver ("com.mysql.jdbc.Driver") was not found in the CLASSPATH. Please check your CLASSPATH specification, and the name of the driver.
  at org.datanucleus.store.rdbms.ConnectionFactoryImpl.generateDataSources(ConnectionFactoryImpl.java:259)
  at org.datanucleus.store.rdbms.ConnectionFactoryImpl.initialiseDataSources(ConnectionFactoryImpl.java:131)
  at org.datanucleus.store.rdbms.ConnectionFactoryImpl.<init>(ConnectionFactoryImpl.java:85)
  ... 150 more
Caused by: org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException: The specified datastore driver ("com.mysql.jdbc.Driver") was not found in the CLASSPATH. Please check your CLASSPATH specification, and the name of the driver.

根据上面的错误信息可以发现是没有mysql驱动jar包

[hadoop@hadoop001 ~]$ spark-shell --jars ~/software/mysql-connector-java-5.1.43-bin.jar 
.
.
scala> spark.sql("show tables").show
19/05/21 18:05:44 ERROR metastore.ObjectStore: Version information found in metastore differs 1.1.0 from expected schema version 1.2.0. Schema verififcation is disabled hive.metastore.schema.verification so setting version.
19/05/21 18:05:44 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|page_views_rcfile...|      false|
+--------+--------------------+-----------+


# 在show后面加上false参数
scala> spark.sql("show tables").show(false)
+--------+----------------------+-----------+
|database|tableName             |isTemporary|
+--------+----------------------+-----------+
|default |page_views_rcfile_test|false      |
+--------+----------------------+-----------+

scala> spark.sql("select * from page_views_rcfile_test limit 10").show(false)
+----------+---+----------+-------+---+-----------+-------+
|track_time|url|session_id|referer|ip |end_user_id|city_id|
+----------+---+----------+-------+---+-----------+-------+
+----------+---+----------+-------+---+-----------+-------+


scala> spark.sql("use hive")
res33: org.apache.spark.sql.DataFrame = []

scala> spark.sql("show tables").show
+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
|    hive|                 emp|      false|
|    hive|        emp_external|      false|
|    hive|           flow_info|      false|
|    hive|         managed_emp|      false|
|    hive|     order_partition|      false|
|    hive|          page_views|      false|
|    hive|      page_views_orc|      false|
|    hive| page_views_orc_none|      false|
|    hive|  page_views_parquet|      false|
|    hive|page_views_parque...|      false|
|    hive|       page_views_rc|      false|
|    hive|page_views_rcfile...|      false|
|    hive|      page_views_seq|      false|
|    hive|                   t|      false|
|    hive|                  t1|      false|
|    hive|                test|      false|
|    hive|        version_test|      false|
|        |              people|       true|
+--------+--------------------+-----------+

注意:用--jars把mysql的驱动传递进去,不要用拷贝,因为这样的话可能会影响其他的作业运行。

还可以用$SPARK_HOME/bin/spark-sql操作

[hadoop@hadoop001 bin]$ ./spark-sql --jars ~/software/mysql-connector-java-5.1.43-bin.jar 

按照上面的执行命令会报错,报错信息如下

Caused by: java.sql.SQLException: No suitable driver found for jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=false&characterEncoding=UTF-8

说是driver上连接不上jdbc;但是在spark-shell –help中说明了–jars可以传递到executor和driver的

 --jars JARS                 Comma-separated list of jars to include on the driver
                              and executor classpaths.
                              
 --driver-class-path         Extra class path entries to pass to the driver. Note that jars added with --jars are automatically included in the classpath.                              
[hadoop@hadoop001 bin]$ ./spark-sql --jars ~/software/mysql-connector-java-5.1.43-bin.jar --driver-class-path ~/software/mysql-connector-java-5.1.43-bin.jar
.
.
spark-sql (default)> show tables;
19/05/21 18:25:08 INFO metastore.HiveMetaStore: 0: get_database: global_temp
19/05/21 18:25:08 INFO HiveMetaStore.audit: ugi=hadoop  ip=unknown-ip-addr      cmd=get_database: global_temp
19/05/21 18:25:08 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
19/05/21 18:25:08 INFO metastore.HiveMetaStore: 0: get_database: default
19/05/21 18:25:08 INFO HiveMetaStore.audit: ugi=hadoop  ip=unknown-ip-addr      cmd=get_database: default
19/05/21 18:25:08 INFO metastore.HiveMetaStore: 0: get_database: default
19/05/21 18:25:08 INFO HiveMetaStore.audit: ugi=hadoop  ip=unknown-ip-addr      cmd=get_database: default
19/05/21 18:25:08 INFO metastore.HiveMetaStore: 0: get_tables: db=default pat=*
19/05/21 18:25:08 INFO HiveMetaStore.audit: ugi=hadoop  ip=unknown-ip-addr      cmd=get_tables: db=default pat=*
19/05/21 18:25:08 INFO codegen.CodeGenerator: Code generated in 223.866114 ms
default page_views_rcfile_test  false
Time taken: 2.13 seconds, Fetched 1 row(s)
19/05/21 18:25:08 INFO thriftserver.SparkSQLCLIDriver: Time taken: 2.13 seconds, Fetched 1 row(s)
spark-sql (default)> 


spark-sql (default)> use hive;

19/05/22 07:37:30 INFO metastore.HiveMetaStore: 0: get_database: global_temp
19/05/22 07:37:30 INFO HiveMetaStore.audit: ugi=hadoop  ip=unknown-ip-addr      cmd=get_database: global_temp
19/05/22 07:37:30 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
19/05/22 07:37:30 INFO metastore.HiveMetaStore: 0: get_database: hive
19/05/22 07:37:30 INFO HiveMetaStore.audit: ugi=hadoop  ip=unknown-ip-addr      cmd=get_database: hive
Time taken: 3.488 seconds
19/05/22 07:37:30 INFO thriftserver.SparkSQLCLIDriver: Time taken: 3.488 seconds

spark-sql (default)> show tables;

19/05/22 07:37:43 INFO metastore.HiveMetaStore: 0: get_database: hive
19/05/22 07:37:43 INFO HiveMetaStore.audit: ugi=hadoop  ip=unknown-ip-addr      cmd=get_database: hive
19/05/22 07:37:43 INFO metastore.HiveMetaStore: 0: get_database: hive
19/05/22 07:37:43 INFO HiveMetaStore.audit: ugi=hadoop  ip=unknown-ip-addr      cmd=get_database: hive
19/05/22 07:37:43 INFO metastore.HiveMetaStore: 0: get_tables: db=hive pat=*
19/05/22 07:37:43 INFO HiveMetaStore.audit: ugi=hadoop  ip=unknown-ip-addr      cmd=get_tables: db=hive pat=*
19/05/22 07:37:44 INFO codegen.CodeGenerator: Code generated in 373.175585 ms
hive    emp     false
hive    emp_external    false
hive    flow_info       false
hive    managed_emp     false
hive    order_partition false
hive    page_views      false
hive    page_views_orc  false
hive    page_views_orc_none     false
hive    page_views_parquet      false
hive    page_views_parquet_gzip false
hive    page_views_rc   false
hive    page_views_rcfile_test  false
hive    page_views_seq  false
hive    t       false
hive    t1      false
hive    test    false
hive    version_test    false
Time taken: 0.88 seconds, Fetched 17 row(s)

在Spark UI上可以看到是通过用户添加进去的mysql驱动jar包

SparkSQL

还可以用beline的方式

[[hadoop@hadoop001 sbin]$ ./start-thriftserver.sh --jars ~/software/mysql-connector-java-5.1.43-bin.jar 
starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to /home/hadoop/app/spark-2.4.2-bin-2.6.0-cdh5.7.0/logs/spark-hadoop-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-hadoop001.out

# 查看日志
[hadoop@hadoop001 conf]$ tail -200f /home/hadoop/app/spark-2.4.2-bin-2.6.0-cdh5.7.0/logs/spark-hadoop-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-hadoop001.out

19/05/21 18:29:43 INFO service.AbstractService: Service:HiveServer2 is started.
19/05/21 18:29:43 INFO thriftserver.HiveThriftServer2: HiveThriftServer2 started
19/05/21 18:29:43 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6e22d6bf{/sqlserver,null,AVAILABLE,@Spark}
19/05/21 18:29:43 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@32121140{/sqlserver/json,null,AVAILABLE,@Spark}
19/05/21 18:29:43 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6d4f266{/sqlserver/session,null,AVAILABLE,@Spark}
19/05/21 18:29:43 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6b756a62{/sqlserver/session/json,null,AVAILABLE,@Spark}
19/05/21 18:29:43 INFO thrift.ThriftCLIService: Starting ThriftBinaryCLIService on port 10000 with 5...500 worker threads

通过查看日志,发现服务端已经启动了;启动客户端beline

[hadoop@hadoop001 bin]$ ./beeline -u jdbc:hive2://localhost:10000 -n hadoop
Connecting to jdbc:hive2://localhost:10000
19/05/21 18:36:58 INFO jdbc.Utils: Supplied authorities: localhost:10000
19/05/21 18:36:58 INFO jdbc.Utils: Resolved authority: localhost:10000
19/05/21 18:36:58 INFO jdbc.HiveConnection: Will try to open client transport with JDBC Uri: jdbc:hive2://localhost:10000
Connected to: Spark SQL (version 2.4.2)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 1.2.1.spark2 by Apache Hive
0: jdbc:hive2://localhost:10000> 

0: jdbc:hive2://localhost:10000> show databases;
+---------------+--+
| databaseName  |
+---------------+--+
| default       |
| hive          |
+---------------+--+
2 rows selected (0.988 seconds)

注意:可以把启动的命名加在spark-defaults.conf里面,然后用spark-shell启动

通过UI界面查看

SparkSQL

SparkSQL

hive2和beeline官网描述 http://spark.apache.org/docs/latest/sql-distributed-sql-engine.html

show方法,默认只打印前20条记录,默认后面字段超过20个字符会被截断

def show(numRows: Int): Unit = show(numRows, truncate = true)

  /**
   * Displays the top 20 rows of Dataset in a tabular form. Strings more than 20 characters
   * will be truncated, and all cells will be aligned right.
   *
   * @group action
   * @since 1.6.0
   */
  def show(): Unit = show(20)

  /**
   * Displays the top 20 rows of Dataset in a tabular form.
   *
   * @param truncate Whether truncate long strings. If true, strings more than 20 characters will
   *                 be truncated and all cells will be aligned right
   *
   * @group action
   * @since 1.6.0
   */

RDD VS DF VS DS

spark 1.6又引入了dateset的概念,这三者的特点如下:

rdd的优点: 1.强大,内置很多函数操作,group,map,filter等,方便处理结构化或非结构化数据 2.面向对象编程,直接存储的java对象,类型转化也安全 rdd的缺点: 1.由于它基本和hadoop一样万能的,因此没有针对特殊场景的优化,比如对于结构化数据处理相对于sql来比非常麻烦 2.默认采用的是java序列化方式,序列化结果比较大,而且数据存储在java堆内存中,导致gc比较频繁

dataframe的优点: 1.结构化数据处理非常方便,支持Avro, CSV, Elasticsearch, and Cassandra等kv数据,也支持Hive tables, MySQL等传统数据表 2.有针对性的优化,由于数据结构元信息spark已经保存,序列化时不需要带上元信息,大大的减少了序列化大小,而且数据保存在堆外内存中,减少了gc次数。 3.hive兼容,支持hql,udf等 dataframe的缺点: 1.编译时不能类型转化安全检查,运行时才能确定是否有问题 2.对于对象支持不友好,rdd内部数据直接以java对象存储,dataframe内存存储的是row对象而不能是自定义对象

dataset的优点:

1.dataset整合了rdd和dataframe的优点,支持结构化和非结构化数据

2.和rdd一样,支持自定义对象存储

3.和dataframe一样,支持结构化数据的sql查询

4.采用堆外内存存储,gc友好

5.类型转化安全,代码友好

6.官方建议使用dataset

参考博客 https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html

操作DF

读json

scala> spark.read.json("file:///home/hadoop/app/spark-2.4.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json").show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

# 打印Schema
scala> val peopleDF = spark.read.json("file:///home/hadoop/app/spark-2.4.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json")
peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> peopleDF.printSchema
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
 
 # 统计某一列
 scala> peopleDF.select($"name").show
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+


scala> peopleDF.select("name","age").show
+-------+----+
|   name| age|
+-------+----+
|Michael|null|
|   Andy|  30|
| Justin|  19|
+-------+----+

scala> peopleDF.select(peopleDF("name"),$"age",$"age"+10).show
+-------+----+----------+
|   name| age|(age + 10)|
+-------+----+----------+
|Michael|null|      null|
|   Andy|  30|        40|
| Justin|  19|        29|
+-------+----+----------+

scala> peopleDF.select($"name",$"age">20).show
+-------+----------+
|   name|(age > 20)|
+-------+----------+
|Michael|      null|
|   Andy|      true|
| Justin|     false|
+-------+----------+

scala> peopleDF.filter($"age">20).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

scala> peopleDF.groupBy("age").count().show
+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+

用Spark SQL 注册为试图处理

scala> val peopleDF = spark.read.format("json").load("file:///home/hadoop/app/spark-2.4.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json")
peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> peopleDF.createOrReplaceTempView("people")

scala> spark.sql("select * from people")
res4: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> spark.sql("select * from people").show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

读 Parquet

scala> spark.read.parquet("file:///home/hadoop/app/spark-2.4.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet").show
+------+--------------+----------------+                                        
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

# 打印Schema
scala> val usersDF = spark.read.parquet("file:///home/hadoop/app/spark-2.4.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet")
usersDF: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]

scala> usersDF.printSchema
root
 |-- name: string (nullable = true)
 |-- favorite_color: string (nullable = true)
 |-- favorite_numbers: array (nullable = true)
 |    |-- element: integer (containsNull = true)

读orc

scala> spark.read.orc("file:///home/hadoop/app/spark-2.4.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.orc").show
+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

# 打印Schema
scala> val usersDF = spark.read.orc("file:///home/hadoop/app/spark-2.4.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.orc")
usersDF: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]

scala> usersDF.printSchema
root
 |-- name: string (nullable = true)
 |-- favorite_color: string (nullable = true)
 |-- favorite_numbers: array (nullable = true)
 |    |-- element: integer (containsNull = true)

读avro

https://github.com/gengliangwang/spark-avro

https://spark.apache.org/docs/latest/sql-data-sources-avro.html

[hadoop@hadoop001 software]$ spark-shell --jars spark-avro_2.11-2.4.2.jar 

scala> spark.read.format("avro").load("file:///home/hadoop/app/spark-2.4.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.avro").show
+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

读csv

scala> spark.read.csv("file:///home/hadoop/app/spark-2.4.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.csv").show 
+------------------+
|               _c0|
+------------------+
|      name;age;job|
|Jorge;30;Developer|
|  Bob;32;Developer|
+------------------+

# 打印Schema
scala> val peopleDF = spark.read.csv("file:///home/hadoop/app/spark-2.4.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.csv")
peopleDF: org.apache.spark.sql.DataFrame = [_c0: string]

scala> peopleDF.printSchema
root
 |-- _c0: string (nullable = true)

读文本txt

scala> val peopleDF = spark.read.text("file:///home/hadoop/app/spark-2.4.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.txt").show
+-----------+
|      value|
+-----------+
|Michael, 29|
|   Andy, 30|
| Justin, 19|
+-----------+

peopleDF: Unit = ()


# 打印Schema信息
scala> val peopleDF = spark.read.text("file:///home/hadoop/app/spark-2.4.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.txt")
peopleDF: org.apache.spark.sql.DataFrame = [value: string]

scala> peopleDF.printSchema
root
 |-- value: string (nullable = true)

注意:读取普通文本需要加上text

注意:标准写法,用format(“”).load(“”)

scala> val peopleDF = spark.read.format("json").load("file:///home/hadoop/app/spark-2.4.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json")
peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]


scala> peopleDF.show(false)
+----+-------+
|age |name   |
+----+-------+
|null|Michael|
|30  |Andy   |
|19  |Justin |
+----+-------+

把读入的orc文件写成parquet格式的

scala> val orcDF = spark.read.orc("file:///home/hadoop/app/spark-2.4.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.orc")
orcDF: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]

scala> orcDF.write.parquet("file:///home/hadoop/app/data/spark/")

# 查看parquet格式文件
[hadoop@hadoop001 ~]$ cd /home/hadoop/app/data/spark
[hadoop@hadoop001 spark]$ ll
total 4
-rw-r--r--. 1 hadoop hadoop 966 May 22 05:41 part-00000-41a6a3bd-c6b8-4c8e-bbe3-62d789dd942d-c000.snappy.parquet
-rw-r--r--. 1 hadoop hadoop   0 May 22 05:41 _SUCCESS
[hadoop@hadoop001 spark]$ 

# 读出来刚才写的parquet文件
scala> val parDF = spark.read.format("parquet").load("file:///home/hadoop/app/data/spark").show
+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

parDF: Unit = ()

RDD.map是RDD类型,DF.map后是DS类型

分区探测

[hadoop@hadoop001 spark]$ hdfs dfs -mkdir -p /sparksql/table/gender=male/country=US
[hadoop@hadoop001 spark]$ hdfs dfs -mkdir -p /sparksql/table/gender=male/country=CN
[hadoop@hadoop001 spark]$ hdfs dfs -mkdir -p /sparksql/table/gender=female/country=CN
[hadoop@hadoop001 spark]$ hdfs dfs -mkdir -p /sparksql/table/gender=female/country=US
[hadoop@hadoop001 resources]$ hdfs dfs -put users.parquet /sparksql/table/gender=male/country=US

scala> val parquetpartitionDF = spark.read.format("parquet").load("hdfs://hadoop001:8020/sparksql/table/")
parquetpartitionDF: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 3 more fields]

scala> parquetpartitionDF.printSchema
root
 |-- name: string (nullable = true)               //内置的
 |-- favorite_color: string (nullable = true)     //内置的
 |-- favorite_numbers: array (nullable = true)    //内置的
 |    |-- element: integer (containsNull = true)  //内置的
 |-- gender: string (nullable = true)     // 分区1
 |-- country: string (nullable = true)    //分区2
 
 # 如果传入的是下一层路径,只能探测传入的路径下一层的分区
scala> val parquetpartitionDF = spark.read.format("parquet").load("hdfs://hadoop001:8020/sparksql/table/gender=male").printSchema
root
 |-- name: string (nullable = true)
 |-- favorite_color: string (nullable = true)
 |-- favorite_numbers: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- country: string (nullable = true)

parquetpartitionDF: Unit = ()

scala> val parquetpartitionDF = spark.read.format("parquet").load("hdfs://hadoop001:8020/sparksql/table/gender=male/country=US").printSchema
root
 |-- name: string (nullable = true)
 |-- favorite_color: string (nullable = true)
 |-- favorite_numbers: array (nullable = true)
 |    |-- element: integer (containsNull = true)

parquetpartitionDF: Unit = () 

Schema 合并

scala> val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF: org.apache.spark.sql.DataFrame = [value: int, square: int]

scala> squaresDF.show
+-----+------+
|value|square|
+-----+------+
|    1|     1|
|    2|     4|
|    3|     9|
|    4|    16|
|    5|    25|
+-----+------+


scala> val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF: org.apache.spark.sql.DataFrame = [value: int, cube: int]

scala> cubesDF.show
+-----+----+
|value|cube|
+-----+----+
|    6| 216|
|    7| 343|
|    8| 512|
|    9| 729|
|   10|1000|
+-----+----+

scala> squaresDF.write.parquet("data/test_table/key=1")
                                                                                
scala> cubesDF.write.parquet("data/test_table/key=2")

# 合并Schema
scala> val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF: org.apache.spark.sql.DataFrame = [value: int, square: int ... 2 more fields]

scala> mergedDF.printSchema()
root
 |-- value: integer (nullable = true)
 |-- square: integer (nullable = true)
 |-- cube: integer (nullable = true)
 |-- key: integer (nullable = true)

scala> mergedDF.show
+-----+------+----+---+
|value|square|cube|key|
+-----+------+----+---+
|    4|    16|null|  1|
|    5|    25|null|  1|
|    9|  null| 729|  2|
|   10|  null|1000|  2|
|    1|     1|null|  1|
|    2|     4|null|  1|
|    3|     9|null|  1|
|    6|  null| 216|  2|
|    7|  null| 343|  2|
|    8|  null| 512|  2|
+-----+------+----+---+

注意:Schema的合并开销很大,从Spark1.5.0就默认关闭了此参数,在SQLConf.scala中也有此参数

scala> spark.sql("SET spark.sql.parquet.mergeSchema=true").show(false)
+-----------------------------+-----+
|key                          |value|
+-----------------------------+-----+
|spark.sql.parquet.mergeSchema|true |
+-----------------------------+-----+

spark-sql (default)> set spark.sql.parquet.mergeSchema=false;

spark-sql (default)> set spark.sql.parquet.mergeSchema;

spark.sql.parquet.mergeSchema   false

Hive Table

scala> import spark.implicits._
import spark.implicits._

scala> import spark.sql
import spark.sql

scala> sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")

scala> sql("LOAD DATA LOCAL INPATH '/home/hadoop/app/spark-2.4.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/kv1.txt' INTO TABLE src")

scala> sql("SELECT * FROM src").show()

scala> sql("SELECT COUNT(*) FROM src").show()
+--------+
|count(1)|
+--------+
|     500|
+--------+

scala> val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
sqlDF: org.apache.spark.sql.DataFrame = [key: int, value: string]

scala> sqlDF.show
+---+-----+
|key|value|
+---+-----+
|  0|val_0|
|  0|val_0|
|  0|val_0|
|  2|val_2|
|  4|val_4|
|  5|val_5|
|  5|val_5|
|  5|val_5|
|  8|val_8|
|  9|val_9|
+---+-----+

其他的代码测试详见官网 [park SQL操作Hive]http://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html)

操作JDBC

scala> val jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://192.168.100.111:3306/hive").option("dbtable", "TBLS").option("user", "root").option("password", "123456").load()
jdbcDF: org.apache.spark.sql.DataFrame = [TBL_ID: bigint, CREATE_TIME: int ... 9 more fields]

scala> jdbcDF.show(false)

Hive中的元数据join操作

scala> val tbls = spark.read.format("jdbc").option("url","jdbc:mysql://192.168.100.111:3306/hive").option("dbtable", "TBLS").option("user", "root").option("password", "123456").load()
tbls: org.apache.spark.sql.DataFrame = [TBL_ID: bigint, CREATE_TIME: int ... 9 more fields]

scala> val dbs = spark.read.format("jdbc").option("url","jdbc:mysql://192.168.100.111:3306/hive").option("dbtable", "DBS").option("user", "root").option("password", "123456").load()
dbs: org.apache.spark.sql.DataFrame = [DB_ID: bigint, DESC: string ... 4 more fields]

scala> dbs.join(tbls,dbs("DB_ID")===tbls("DB_ID")).select("DB_LOCATION_URI" ,"NAME", "TBL_TYPE").show(false)

注意:一定要注意join里面的select("")的格式,不要出现空格;我由于书写的时候出现了"TBL_TYPE " 这样的空格,导致一直报错,说是分析的时候语法有问题。

从上面的代码可以看出,多个不同数据源来的DF就可以做相应的关联操作

在生产中,JDBC写的话,不用jdbcDF.wirte;用foreachePartition算子写

Spark SQL 调优

Spark SQL shuffle

spark-sql (default)> select deptno,count(1) from emp group by deptno;

Spark SQL中的Shuffle的Task数量默认为200

spark-sql (default)> create table dept(deptno int,dname string,loc string) row format delimited fields terminated by '\t';

# 导入数据
spark-sql (default)> load data local inpath '/home/hadoop/data/dept.txt' into table dept;


# join   emp和dept
spark-sql (default)> select e.empno,e.ename,e.deptno,d.dname from emp e join dept d on e.deptno=d.deptno;

查看Web UI,发现做了一个Broadcast

SparkSQL

SparkSQL

修改参数spark.sql.autoBroadcastJoinThreshold=-1,即不让他广播

spark-sql (default)> set spark.sql.autoBroadcastJoinThreshold=-1;

spark-sql (default)> set spark.sql.autoBroadcastJoinThreshold;
spark.sql.autoBroadcastJoinThreshold    -1

Web UI查看情况,发现时间由前面的0.7s变成了4s;最后才做join操作的,出现了shuffle

SparkSQL

SparkSQL

参数spark.sql.autoBroadcastJoinThreshold默认为10MB,如果不想让他广播出去设置为-1;但是在生产上是肯定要广播的,设置的大小根据自己的业务需求。

Broadcast Hint for SQL Queries

import org.apache.spark.sql.functions.broadcast
broadcast(spark.table("src")).join(spark.table("records"), "key").show()

这种操作有可能把大表广播出去了,所以尽量选择上面的那种调优方式。

第三方的Spark-packages

https://spark-packages.org/

外部数据源

SparkSQL

SparkSQL

catalog

她是Spark2.0推出的新特性,她把元数据信息集成到了Spark里面,这样便于统一的管理,比如Hive的元数据信息是存放在MySQL里面的。

scala> spark.catalog.listDatabases().show(false)
+-------+---------------------+-------------------------------------------------+
|name   |description          |locationUri                                      |
+-------+---------------------+-------------------------------------------------+
|default|Default Hive database|hdfs://hadoop001:8020/user/hive/warehouse        |
|hive   |                     |hdfs://hadoop001:8020/user/hive/warehouse/hive.db|
+-------+---------------------+-------------------------------------------------+


scala> spark.catalog.listTables.show(false)
19/05/23 04:32:12 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
+----------------------+--------+-----------+---------+-----------+
|name                  |database|description|tableType|isTemporary|
+----------------------+--------+-----------+---------+-----------+
|hive_ints             |default |null       |EXTERNAL |false      |
|hive_records          |default |null       |MANAGED  |false      |
|page_views_rcfile_test|default |null       |MANAGED  |false      |
|src                   |default |null       |MANAGED  |false      |
+----------------------+--------+-----------+---------+-----------+



scala> spark.catalog.listTables("hive").show(false)
+-----------------------+--------+-----------+---------+-----------+
|name                   |database|description|tableType|isTemporary|
+-----------------------+--------+-----------+---------+-----------+
|dept                   |hive    |null       |MANAGED  |false      |
|emp                    |hive    |null       |MANAGED  |false      |
|emp_external           |hive    |null       |EXTERNAL |false      |
|flow_info              |hive    |null       |MANAGED  |false      |
|managed_emp            |hive    |null       |MANAGED  |false      |
|order_partition        |hive    |null       |MANAGED  |false      |
|page_views             |hive    |null       |MANAGED  |false      |
|page_views_orc         |hive    |null       |MANAGED  |false      |
|page_views_orc_none    |hive    |null       |MANAGED  |false      |
|page_views_parquet     |hive    |null       |MANAGED  |false      |
|page_views_parquet_gzip|hive    |null       |MANAGED  |false      |
|page_views_rc          |hive    |null       |MANAGED  |false      |
|page_views_rcfile_test |hive    |null       |MANAGED  |false      |
|page_views_seq         |hive    |null       |MANAGED  |false      |
|t                      |hive    |null       |MANAGED  |false      |
|t1                     |hive    |null       |MANAGED  |false      |
|test                   |hive    |null       |MANAGED  |false      |
|version_test           |hive    |null       |MANAGED  |false      |
+-----------------------+--------+-----------+---------+-----------+


scala> spark.catalog.listTables("default").show(false)
+----------------------+--------+-----------+---------+-----------+
|name                  |database|description|tableType|isTemporary|
+----------------------+--------+-----------+---------+-----------+
|hive_ints             |default |null       |EXTERNAL |false      |
|hive_records          |default |null       |MANAGED  |false      |
|page_views_rcfile_test|default |null       |MANAGED  |false      |
|src                   |default |null       |MANAGED  |false      |
+----------------------+--------+-----------+---------+-----------+

HDFS中的put的文件格式是啥?

SparkSQL

函数赋值给变量

scala> def sayHello(name:String):Unit={
     |   println("Hello  " + name)
     | }
sayHello: (name: String)Unit

scala> // 函数赋值给变量

scala> val sayHelloFunc = sayHello _
sayHelloFunc: String => Unit = $$Lambda$1163/1868931587@1d0b447b

scala> sayHelloFunc("JustDoDT")
Hello  JustDoDT

匿名函数

(参数名:参数类型)=> 函数体

匿名函数赋值给变量

scala> (x:Int) => x+1
res2: Int => Int = $$Lambda$1168/1672524765@397dfbe8

scala> x:Int => x+1
<console>:1: error: ';' expected but '=>' found.
       x:Int => x+1
             ^

scala> {x:Int => x+1}
res3: Int => Int = $$Lambda$1171/959378687@6ed7c178

scala> val a = (x:Int) => x+1
a: Int => Int = $$Lambda$1172/1018394275@7022fb5c

scala> a(2)
res4: Int = 3

scala> val b = {x:Int => x+1}
b: Int => Int = $$Lambda$1181/1454795974@6f076c53

scala> b(5)
res5: Int = 6

匿名函数赋值给函数

scala> def sum = (x:Int,y:Int)=>x+y
sum: (Int, Int) => Int

scala> sum(2,3)
res6: Int = 5

隐式转换

隐士: 偷偷摸摸的

目的:悄悄的为一个类的方法进行增强

Man ===> Superman

在Java中可以用Proxy来实现

object ImplicitApp {

  def main(args: Array[String]): Unit = {
    //定义隐士转换
    implicit def man2superman(man:Man):Superman = new Superman(man.name)
    val man = new Man("JustDoDT")
    man.fly()

  }
}

class Man(val name:String){
  def eat():Unit={
    println("Man eat")
  }
}


class Superman(val name:String){
  def fly():Unit ={
    println(s"$name fly")
  }
}

运行结果为:

JustDoDT fly

例子2:

import java.io.File
import scala.io.Source

object ImplicitApp {

  def main(args: Array[String]): Unit = {
  
    implicit def file2RichFile(file:File):RichFile = new RichFile(file)
    val file = new File("E:/temp/students.txt")
    val context = file.read()
    println(context)
  }
}

class RichFile(val file:File){
  def read() = Source.fromFile(file.getPath).mkString
}

运行结果

1 Tom 23
2 Mary 26
3 Mike 24
4 Jone 21

在工作中可以把封装一下所有经常要用到的隐士转换

import java.io.File
object ImplicitAspect {
  implicit def man2superman(a:Man):Superman = new Superman(a.name)
  implicit def file2RichFile(haha:File):RichFile = new RichFile(haha)
}

直接调用

import java.io.File
import scala.io.Source
import ImplicitAspect._

object ImplicitApp {

  def main(args: Array[String]): Unit = {

    val man = new Man("JustDoDT")
    man.fly()
    val file = new File("E:/temp/students.txt")
    val context = file.read()
    println(context)
  }
}

class RichFile(val file:File){
  def read() = Source.fromFile(file.getPath).mkString
}

class Man(val name:String){
  def eat():Unit={
    println("Man eat")
  }
}

class Superman(val name:String){
  def fly():Unit ={
    println(s"$name fly")
  }
}