1.概述
早期的shark是spark团队在Hive之上设计的一套大规模数据仓库系统,shark继承了hive的复杂代码,并依赖spark
的版本,维护成本高,2014年官方从shark转战Spark SQL.spark SQL底层使用的是RDD 效率非常高。
SparkSQL的优点如下:
1. sql查询与spark rdd无缝整合。
2. 可以使用java,scala,python,R语言等来实现。
3. 可以对接多种数据源:Hive Avro ORC JSON JDBC/ODBC 等,同时兼容Hive语法。
2.核心API
SparkSQL的核心是DataFrame,在Spark 2.0以后,DataFrame和DataSet已经整合在一起。这里我们重点讲解
DataFrame:
1. DataFrame分为两部分:1.RDD基础分布式数据集。 2.Schema元信息(记录二维数据表中每一列的列名和类型)
2. 不同于RDD记录的一个个对象(spark不知道对象内部的信息),DataFrame却提供了每条数据的内部多列详细
信息。
3. DataFrame SQL的代码会经过Catalyst(Spark优化器)的优化,变成高效的处理代码。举个例子吧,比如使用
RDD读取某个文件,并取出第1条数据,此刻系统会先读取所有数据,再取第一条,消费了大部分没用的内存,
而SparkSQL经过优化后,会只将第一条数据加载到内存中。
3.利用spark-shell创建DataFrame
[root@node01 ~]# spark-shell --master spark://node01:7077Spark context available as 'sc' (master = spark://node01:7077, app id = app-20190619142652-0000).
Spark session available as 'spark'.
|
1. sc:SparkContext是所有Spark任务的入口,它包含了Spark程序的基本设置,如程序名字 内存大小 并行处理
粒度。
2. spark session:不仅有SparkContext的所有功能,还集成了DataFrame,Spark Streaming,Structed Streaming
提供的API,不用为不同功能分别定义Context。
要操作DataFrame,首先要知道如何创建,这介绍两种方式:
方式一:将RDD转换成DataFrame,这里case class就是定义DataFrame的schema
// 将spark/examples/src/main/resource下有各种资源,存放到hdfs上。
scala> val rdd1 = sc.textFile("/spark_res/people.txt").map(_.split(", "))
scala> case class People(val name:String,val age:Int)
scala> val rdd2 = rdd1.map(x => People(x(0),x(1).toInt))
scala> rdd2.toDF
这里报错:已存在元数据,但对本案例无影响
res3: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> res3.printSchema 查看数据结构
scala> res3.show 查看数据内容 |
方式二:直接通过spark session读取【普通文本数据、JSON数据、Parquet数据 】
scala> val df = spark.read.text("/spark_res/people.txt")df: org.apache.spark.sql.DataFrame = [value: string]
scala> val rdd1 = spark.read.json("/spark_res/people.json")
rdd1: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> val rdd1 = spark.read.parquet("/spark_res/users.parquet")
rdd1: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more
field]
|
方式三:直接通过spark session读取 【Mysql数据】
1.启动spark-shell指定驱动包
spark-shell \
--master spark://node01:7077 \
--executor-memory 1g \
--total-executor-cores 2 \
--jars /export/servers/hive-1.1.0-cdh5.14.0/lib/mysql-connector-java-5.1.38.jar \--driver-class-path /export/servers/hive-1.1.0-cdh5.14.0/lib/mysql-connector-java-5.1.38.jar
2.从mysql中加载数据
val mysqlDF = spark.read.format("jdbc").options(
Map("url" -> "jdbc:mysql://192.168.52.105:3306/iplocation",
"driver" -> "com.mysql.jdbc.Driver","dbtable" -> "iplocation",
"user" -> "root", "password" -> "123456")).load()
scala> mysqlDF.show
+----------+---------+-----------+
| longitude| latitude|total_count|
+----------+---------+-----------+
|108.948024|34.263161| 1824|
|116.405285|39.904989| 1535|
| 107.7601| 29.32548| 85|
+----------+---------+-----------+
|
4.利用spark-shell 操作DataFrame
上一节我们介绍了如何创建DataFrame,接着看看如何使用DataFrame操作数据,对于操作数据,系统提供了两种
方式:
操作方式一:DSL风格语法
scala> df1.select("name").show
scala> df1.select($"age",$"age"+1).show
scala> df1.filter("age is not null").select($"age",$"age"+1).show
scala> df1.filter(col("age")>21).show
|
操作方式二:SQL语法
scala> val df = rdd2.toDF
scala> val df = rdd2.toDF
scala> spark.sql("select * from t_people").show
scala> spark.sql("select * from t_people where age>20").show |
下一节,我们将介绍如何使用Java代码来操作 DataFrame。
|
|