A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

5. 导入Java依赖


要使用SparkSQL的API,首先要导入Scala,Spark,SparkSQL的依赖:

[AppleScript] 纯文本查看 复制代码
<properties>
  <scala.version>2.11.8</scala.version>
  <hadoop.version>2.7.4</hadoop.version>
  <spark.version>2.0.2</spark.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>${scala.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>${spark.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>${hadoop.version}</version>
  </dependency>
  <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.32</version>
  </dependency>
  <!-- spark sql 依赖-->
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.0.2</version>
  </dependency>
</dependencies>


6. Java代码操作DataFrame


1.DataFrame作为SparkSQL的核心API,它是通过SparkContext来获取,代码如下:

[AppleScript] 纯文本查看 复制代码
//1.创建spark session,并指定appName,需要将任务提交到哪里。
val spark = new
SparkSession.Builder().appName("CaseClassSchema").master("local[2]").getOrCreate()
//2.获取SparkContext,后面所有的SparkSQL操作都需要该上下文。
val sc: SparkContext = spark.sparkContext


上文中,master指定的是sparkSQL的执行环境,可以是集群也可以是本地,这里的local[2]指定的是本地单机运行模
式,使用2条线程来执行任务,注意这里local必须是小写的。
SparkSession是SparkContext的升级版,他支持HiveContext和SparkContext。


2.通过SparkContext后我们可以获取到数据对应的DataFrame,代码如下:

[AppleScript] 纯文本查看 复制代码
//3.获取每一行内容的RDD,并通过schema将RDD转化成DF
val lineRdd: RDD[Array[String]] =
sc.textFile("hdfs://node01:8020/spark_res/people.txt").map(_.split(", "))
val peopleRdd: RDD[People] = lineRdd.map(x => People(x(0), x(1).toInt))
import spark.implicits._
val peopleDF: DataFrame = peopleRdd.toDF
//4.对DF进行操作
peopleDF.printSchema()
peopleDF.show()
println(peopleDF.head())
println(peopleDF.count())
peopleDF.columns.foreach(println)



使用DataFrame之前,必须导包,否则无法使用  toDF 方法。


3.DataFrame操作SQL有两种方式,DSL和SQL,代码如下:


[AppleScript] 纯文本查看 复制代码
//DSL
peopleDF.select("name","age").show()
peopleDF.filter($"age">20).groupBy("name").count().show
//SQL
peopleDF.createOrReplaceTempView("t_people")
spark.sql("select * from t_people order by age desc").show


4.SQL操作完毕后必须关闭sparkContext和SparkSession,代码分别是 sc.stop() 和  spark.stop()


除了读取普通文件,还可以读取mysql orcale数据,代码如下:


[AppleScript] 纯文本查看 复制代码
val properties = new Properties()
properties.setProperty("user","root")
properties.setProperty("password","123456")
//重点代码,连接JDBC
val ipLocationDF: DataFrame =
spark.read.jdbc("jdbc:mysql://localhost:3306/iplocation","iplocation",properties)
ipLocationDF.printSchema
ipLocationDF.show


7. 保存DataFrame的结果



除了读取数据,DataFrame还提供了一整套保存处理数据结果的机制,代码如下:



[AppleScript] 纯文本查看 复制代码
object SparkSqlToMysql {
 def main(args: Array[String]): Unit = {
  val sc: SparkContext = ...
  //1.通过spark session读取json数据,并返回DataFrame
  val peopleDF: DataFrame = spark.read.json(args(0))
  //2.将DataFrame注册为t_people表,并对该表进行SQL语句操作
  peopleDF.createOrReplaceTempView("t_people")
  val resultDF = spark.sql("select * from t_people")
  //3.对上个SQL语句的操作结果进行保存
  val properties = new Properties()
  properties.setProperty("user","root")
  properties.setProperty("password","123456")
 resultDF.write.jdbc("jdbc:mysql://192.168.52.105:3306/iplocation","spark_save_result",pro
perties)
//close sparkcontext sparksession..
}
}


需要注意的是resultDF.write,其返回DataFrameWriter。
1. 该类可以保存任何SQL的结果,并且由于API的便利性,可以保存成多种格式,如 text ,json ,orc,csv,jdbc
等。
2. 对于保存的数据,系统提供了几种保存模式,可以通过mode(String)来指定:
overwrite : 重写文件内部数据
append : 将新增内容添加到文件末尾
ignore : 如果文件已存在 则忽略操作
error : default option, 如果文件存在,则抛出异常



8. 总结



1. 在SparkSQL系列中,我们首先介绍了SparkSQL的核心API DataFrame,DataFrame内部分为RDD基础分布式
数据集和Schema元信息。DataFrame的SQL代码在执行之前会经过Catalyst优化,变成高效的处理代码。接着
我们介绍了通过 spark-shell 和 java api 两种客户端窗口操作DataFrame。
2. 创建DataFrame有两种方式:
1. 通过 rdd.toDF 直接将rdd转换成DataFrame。
2. 通过  spark.read 直接读取各种格式的数据。
3. 查看DataFrame的内容有两种:
1. 通过  df.printSchema 查看数据结构。
2. 通过  df.show 查看数据内容。
4. df提供了DSL和SQL两种风格的来操作数据。对于DSL风格,常见的方法有  select() filter() 等。
5. 本文在后半部分主要介绍SparkSQL如何与mysql进行交互,除此之外,还支持Parquet,ORC,JSON,Hive,
JDBC , avro协议文件等交互,可以通过 官方网站学习:http://spark.apache.org/docs/latest/sql-data-source
s.html。

1 个回复

倒序浏览































回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马