5. 导入Java依赖
要使用SparkSQL的API,首先要导入Scala,Spark,SparkSQL的依赖:
<properties>
<scala.version>2.11.8</scala.version>
<hadoop.version>2.7.4</hadoop.version>
<spark.version>2.0.2</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.32</version>
</dependency>
<!-- spark sql 依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.0.2</version>
</dependency>
</dependencies> |
6. Java代码操作DataFrame
1.DataFrame作为SparkSQL的核心API,它是通过SparkContext来获取,代码如下:
//1.创建spark session,并指定appName,需要将任务提交到哪里。
val spark = new
SparkSession.Builder().appName("CaseClassSchema").master("local[2]").getOrCreate()
//2.获取SparkContext,后面所有的SparkSQL操作都需要该上下文。
val sc: SparkContext = spark.sparkContext |
上文中,master指定的是sparkSQL的执行环境,可以是集群也可以是本地,这里的local[2]指定的是本地单机运行模
式,使用2条线程来执行任务,注意这里local必须是小写的。
SparkSession是SparkContext的升级版,他支持HiveContext和SparkContext。
2.通过SparkContext后我们可以获取到数据对应的DataFrame,代码如下:
//3.获取每一行内容的RDD,并通过schema将RDD转化成DF
val lineRdd: RDD[Array[String]] =
sc.textFile("hdfs://node01:8020/spark_res/people.txt").map(_.split(", "))
val peopleRdd: RDD[People] = lineRdd.map(x => People(x(0), x(1).toInt))
import spark.implicits._
val peopleDF: DataFrame = peopleRdd.toDF
//4.对DF进行操作
peopleDF.printSchema()
peopleDF.show()
println(peopleDF.head())
println(peopleDF.count())
peopleDF.columns.foreach(println) |
使用DataFrame之前,必须导包,否则无法使用 toDF 方法。
3.DataFrame操作SQL有两种方式,DSL和SQL,代码如下:
//DSL
peopleDF.select("name","age").show()
peopleDF.filter($"age">20).groupBy("name").count().show
//SQL
peopleDF.createOrReplaceTempView("t_people")
spark.sql("select * from t_people order by age desc").show |
4.SQL操作完毕后必须关闭sparkContext和SparkSession,代码分别是 sc.stop() 和 spark.stop()
除了读取普通文件,还可以读取mysql orcale数据,代码如下:
val properties = new Properties()
properties.setProperty("user","root")
properties.setProperty("password","123456")
//重点代码,连接JDBC
val ipLocationDF: DataFrame =
spark.read.jdbc("jdbc:mysql://localhost:3306/iplocation","iplocation",properties)
ipLocationDF.printSchema
ipLocationDF.show |
7. 保存DataFrame的结果
除了读取数据,DataFrame还提供了一整套保存处理数据结果的机制,代码如下:
object SparkSqlToMysql {
def main(args: Array[String]): Unit = {
val sc: SparkContext = ...
//1.通过spark session读取json数据,并返回DataFrame
val peopleDF: DataFrame = spark.read.json(args(0))
//2.将DataFrame注册为t_people表,并对该表进行SQL语句操作
peopleDF.createOrReplaceTempView("t_people")
val resultDF = spark.sql("select * from t_people")
//3.对上个SQL语句的操作结果进行保存
val properties = new Properties()
properties.setProperty("user","root")
properties.setProperty("password","123456")
resultDF.write.jdbc("jdbc:mysql://192.168.52.105:3306/iplocation","spark_save_result",properties)
//close sparkcontext sparksession..
}
} |
需要注意的是resultDF.write,其返回DataFrameWriter。
1. 该类可以保存任何SQL的结果,并且由于API的便利性,可以保存成多种格式,如 text ,json ,orc,csv,jdbc
等。
2. 对于保存的数据,系统提供了几种保存模式,可以通过mode(String)来指定:
overwrite : 重写文件内部数据
append : 将新增内容添加到文件末尾
ignore : 如果文件已存在 则忽略操作
error : default option, 如果文件存在,则抛出异常
8. 总结
1. 在SparkSQL系列中,我们首先介绍了SparkSQL的核心API DataFrame,DataFrame内部分为RDD基础分布式
数据集和Schema元信息。DataFrame的SQL代码在执行之前会经过Catalyst优化,变成高效的处理代码。接着
我们介绍了通过 spark-shell 和 java api 两种客户端窗口操作DataFrame。
2. 创建DataFrame有两种方式:
1. 通过 rdd.toDF 直接将rdd转换成DataFrame。
2. 通过 spark.read 直接读取各种格式的数据。
3. 查看DataFrame的内容有两种:
1. 通过 df.printSchema 查看数据结构。
2. 通过 df.show 查看数据内容。
4. df提供了DSL和SQL两种风格的来操作数据。对于DSL风格,常见的方法有 select() filter() 等。
5. 本文在后半部分主要介绍SparkSQL如何与mysql进行交互,除此之外,还支持Parquet,ORC,JSON,Hive,
JDBC , avro协议文件等交互,可以通过 官方网站学习:http://spark.apache.org/docs/latest/sql-data-source
s.html。
|
|