A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始


一、SparkSQL概述
1、概念
   官网:http://spark.apache.org/sql/

       Spark SQK是Spark用来处理结构化数据(结构化数据可以来自外部结构化数据源也可以通过RDD获取)的一个模块

        外部的结构化数据源包括 Json,parquet(默认),rmdbs,hive等

2、Spark SQL的优点
     mapreduce             hive(sql框架)减少代码编写

     sparkcore              sparksql(sql框架) 

      hive将sql转换成mapreduce,然后提交到集群上执行,大大简化了mapreduce的程序的复杂性,由于mapreduce这种计算模型。因此spark sql就应运而生了 

优点: 1、容易整合

            2、同一的数据访问方式

            3、兼容hive

            4、标准的数据连接

3、Spark SQL版本迭代
1)sparkSQL的前身是shark

2)spark-1.1(2014-9-11)开始的引入sparksql,对hive进行无缝的兼容

3)spark-1.3:增加了DataFrame的API

4)spark-1.4:增加了窗口分析函数

5)spark-1.5:增加了UDF函数

6)spark-1.6:引入DataSet SparkSession

7)spark-2.x:SparkSQL+DataFrame+DataSet(正式版本),Structured Streaming(DataSet),引入SparkSession 统一了 RDD,DataFrame,DataSet 的编程入口

 

二、SparkSession
1、介绍
       SparkSession实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样可以使用的。SparkSession内部封装了SparkContext,所有计算实际上是由SparkContext完成的。

2、特点
1)为用户提供了同一的切入点使用spark各项功能

2)允许用户通过它调用DataFrame和DataSet相关API来编写程序

3)减少用户需要了解的一些概念,可以很容易的与Spark进行交互

4)与spark交互之时不需要显示创建sparkconf,sparkcontext以及sparksql,这些对象已经封闭在sparksession中

5)sparksession提供对hive特征的内部支持,用hive sql写sql语句访问hive udfs,从hive表中读取数据

                      在创建对象的时候加上: enableHiveSupport();

                         hive url/metstore:元数据库在哪里;

                        hive  warehouse:真实数据在哪里

3、SparkSession的创建
  1)如果在spark-shell中

[qyl@qyl02 ~]$ ~/apps/spark-2.3.1-bin-hadoop2.7/bin/spark-shell \
> --master spark://qyl02:7077 \
> --executor-memory 512m \
> --total-executor-cores 1
2)在idea中创建SparkSession

val spark=SparkSession.builder()
                      .appName("sparksqlexample")
                      .master("local")
                      .getOrCreate()
 
三、RDD/DataFrame/DataSet
1、RDD的局限性
           RDD仅表示数据集,RDD没有元数据,也就是说没有字段语义定义

2、什么是DataFrame? 
             是按列名的方式去组织的一个分布式数据集(RDD)

          由于RDD的局限性,Spark产生了DataFrame

         DataFrame=RDD+Schema=schemaRDD

               其中Schenam就是元数据,是语义描述

特点:

      内部数据无类型,统一为Row

      DataFrame是一种特殊类型的DataSet,DataSet[Row]=DataFrame

      DataFrame自带优化器Catalyst,可以自动优化程序

      DataFrame提供了一整套的Data Source API

 

3、DataSet的产生
    Row 运行时类型检查,比如salary是字符串类型,下面的语句也只有运行时才进行类型检查

dataframe.filter("salary>1000").show()
 由于DataFrame的数据类型统一是Row,所以DataFrame也是有缺点的。

明显缺点:

            1、Row不能直接操作domain对象

            2、函数风格编程,没有面向对象风格的API

所以,Spark SQL引入了DataSet,扩展了DataFrmae的API,提供了编译时类型检查,面向对象风格的API

4、Spark SQL程序基本编写步骤
 1)创建SparkSession对象

val sparksession:SparkSession=SparkSession.builder()
                                          .appName("mysparkSession")
                                          .master("local")
                                          .getOrCreate()
2)创建DataFrame或者DataSet

3)在DataFrame或者DataSet之上进行转换和Action

4)返回结果

5、创建DataFrame
创建DataFrame有三种方式:  val studentRDD

1、导入隐式转换

import spark.implicits._
  val studentDF=studentRDD.toDF
2、使用spark.sqlcontext.createDataFrame(studentRDD,schema)方法创建

valstudentRowRDD=studentRDD.map(student=>Row(student.getName,student.getAge,student.
 getClass))
 
val schema=StructType(fields=List(
        StructField("name",DataTypes.StringType,false),
        StructField("age",DataTypes.IntegerType,false),
        StructField("class",DataTypes.StringTyps,false)
))
 
val studentDF=spark.createDataFrame(studentRowRDD,schema)
 
四、Spark SQL 的wordcount
1、准备数据 hello.txt
hello word hello test
hello you hello me
you is beautiful and is pure
2、编写代码
package com.qyl
 
import org.apache.spark.sql.SparkSession
 
object WordCount {
  def main(args: Array[String]): Unit = {
 
    /*
    * 1.创建编程入口
    * */
    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("WordCount")
      .getOrCreate()
    /*
    * 2.读取文件,转成DataFrame
    * */
    import spark.implicits._
    val helloDF = spark.read.text("data/hello.txt").toDF("line")
    /*
    * 3.创建临时视图
    * */
    helloDF.createOrReplaceTempView("hellotable")
    println("--------------------split拆分--------------")
    var sql=
      """
        |select
        |split(line," ")
        |from hellotable
      """.stripMargin
    println("-----------explode压平----------------------")
    sql=
      """
        |select
        |explode(split(line," ")) as word
        |from hellotable
      """.stripMargin
    println("------------统计结果------------------")
    sql="""
      |select
      | tmp.word,
      | count(tmp.word) as count
      |from (
      |  select
      |     explode(split(line, '\\s+')) as word
      |  from hellotable
      |) tmp
      |group by tmp.word
      |order by count desc
    """.stripMargin
 
    spark.sql(sql).show()
    spark.stop()
  }
}
3、结果
|     word|count|
+---------+-----+
|    hello|    4|
|       is|    2|
|      you|    2|
|       me|    1|
|     pure|    1|
|     word|    1|
|     test|    1|
|      and|    1|
|beautiful|    1|
五、Spark SQL高级用法
1、SparkSQL自定义普通函数
/**
  * SparkSQL自定义UDF操作:
  *
  *  1、编写一个UDF函数,输入输出参数
  *  2、向SQLContext进行注册,该UDF
  *  3、就直接使用
  *
  *  案例:通过计算字符串长度的函数,来学习如何自定义UDF
  */
object _02SparkSQLUDFOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)
 
        val spark = SparkSession.builder()
            .master("local[2]")
            .appName("_02SparkSQLUDFOps")
            //            .enableHiveSupport()
            .getOrCreate()
        //第二步:向SQLContext进行注册,该UDF
        spark.udf.register[Int, String]("strLen", str => strLen(str))
 
        val topnDF = spark.read.json("data/sql/people.json")
        topnDF.createOrReplaceTempView("people")
        //自定义字符串长度函数,来就去表中name的长度
        //第三步:就直接使用
        val sql =
            """
              |select
              | name,
              | strLen(name) nameLen
              |from people
            """.stripMargin
 
        spark.sql(sql).show()
        spark.stop()
    }
 
    //第一步:编写一个UDF函数,输入输出参数
    def strLen(str:String):Int = str.length
}
2、自定义聚集(UDAF)函数
/**
  * 自定义UDAF操作:
  *  1、编写一个UDAF类,extends UserDefinedAggregateFunction
  *     复写其中的若干方法
  *  2、和UDF一样向SQLContext进行注册,该UDAF
  *  3、就直接使用
  *  模拟count函数
  *  可以参考在sparkCore中学习的combineByKey或者aggregateByKey
  */
object _03SparkSQlUDAFOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)
 
        val spark = SparkSession.builder()
            .master("local[2]")
            .appName("_02SparkSQLUDFOps")
            //            .enableHiveSupport()
            .getOrCreate()
        //2、和UDF一样向SQLContext进行注册,该UDAF
        spark.udf.register("myCount", new MyCountUDAF())
 
        val topnDF = spark.read.json("data/sql/people.json")
        topnDF.createOrReplaceTempView("people")
        topnDF.show()
 
        println("------------------------------")
        //3、就直接使用
        val sql =
            """
              |select
              |  age,
              |  myCount(age) countz
              |from people
              |group by age
            """.stripMargin
        spark.sql(sql).show()
        spark.stop()
    }
}
/**
  * 自定义UDAF
  */
class MyCountUDAF extends UserDefinedAggregateFunction {
    //该udaf的输入的数据类型
    override def inputSchema: StructType = {
        StructType(List(
            StructField("age", DataTypes.IntegerType, false)
        ))
    }
    /**
      * 在该udaf聚合过程中的数据的类型Schema
      */
    override def bufferSchema: StructType = {
        StructType(List(
            StructField("age", DataTypes.IntegerType, false)
        ))
    }
 
    //该udaf的输出的数据类型
    override def dataType: DataType = DataTypes.IntegerType
 
    //确定性判断,通常特定输入和输出的类型一致
    override def deterministic: Boolean = true
    /**
        初始化的操作
      var sum = 1
      for(i <- 0 to 9) {
        sum += i
      }
      row.get(0)
      @param buffer 就是我们计算过程中的临时的存储了聚合结果的Buffer(extends Row)
     */
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer.update(0, 0)//更新当前buffer数组中的第1列(索引为0)的值为0
    }
    /**
      * 分区内的数据聚合合并
      * @param buffer 就是我们在initialize方法中声明初始化的临时缓冲区
      * @param input  聚合操作新传入的值
      */
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        val oldValue = buffer.getInt(0) //row.get(0)
        buffer.update(0,  oldValue + 1)
    }
    /**
      * 分区间的数据聚合合并
      * 聚合之后将结果传递给分区一
      * @param buffer1 分区一聚合的临时结果
      * @param buffer2 分区二聚合的临时结果
      *                reduce(v1, v2)
      */
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        val pav1 = buffer1.getInt(0)
        val pav2 = buffer2.getInt(0)
        buffer1.update(0, pav1 + pav2)
    }
    /**
      * 该聚合函数最终要返回的值
      * @param buffer 数据就被存储在该buffer中
      */
    override def evaluate(buffer: Row): Any = {
        buffer.getInt(0)
    }
}
3、SparkSQL开窗函数的使用
 * SparkSQL中的开窗函数的使用:
  *     row_number()         --->分组topN(必须掌握)
  *     sum() over()         --->分组累加
  *     avg/min/max() over() --->分组求最大
  *
  */
object _05SparkSQLWindowFuncOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)
        val spark = SparkSession.builder()
            .master("local[2]")
            .appName("_04SparkSQLUDTFOps")
//            .enableHiveSupport()
            .getOrCreate()
        val topnDF = spark.read.json("data/sql/topn.json")
        topnDF.createOrReplaceTempView("stu_score")
        println("==================原始数据=========================")
        topnDF.show()
        println("===========计算各个科目学员成绩降序==================")
        val sql =
            """
              |select
              |    course,
              |    name,
              |    score,
              |    row_number() over(partition by course order by score desc) rank
              |from stu_score
            """.stripMargin
        spark.sql(sql).show()
        println("=========计算各个科目学员成绩降序Top3================")
//        val topnSQL =
//            """
//              |select
//              |    course,
//              |    name,
//              |    score,
//              |    row_number() over(partition by course order by score desc) rank
//              |from stu_score
//              |having rank < 4
//            """.stripMargin
        val topnSQL =
            """
              |select
              | tmp.*
              |from (
              |   select
              |      course,
              |      name,
              |      score,
              |      row_number() over(partition by course order by score desc) rank
              |   from stu_score
              |)tmp
              |where tmp.rank < 4
            """.stripMargin
        spark.sql(topnSQL).show
        spark.stop()
    }
}
所用pom.xml文件的配置:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.qyl</groupId>
  <artifactId>sparkSQL</artifactId>
  <version>1.0-SNAPSHOT</version>
  <inceptionYear>2008</inceptionYear>
  <properties>
    <project.build.sourceEncoding>UTF8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.11.8</scala.version>
    <spark.version>2.3.2</spark.version>
    <hadoop.version>2.7.6</hadoop.version>
    <scala.compat.version>2.11</scala.compat.version>
  </properties>
 
  <repositories>
    <repository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </repository>
  </repositories>
 
  <pluginRepositories>
    <pluginRepository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </pluginRepository>
  </pluginRepositories>
 
  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.4</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.specs</groupId>
      <artifactId>specs</artifactId>
      <version>1.2.5</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <!-- sparkStreaming -->
    <!-- https://mvnrepository.com/artifa ... ark/spark-streaming -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.scalikejdbc/scalikejdbc -->
    <dependency>
      <groupId>org.dom4j</groupId>
      <artifactId>dom4j</artifactId>
      <version>2.0.0</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>compile</scope>
    </dependency>
  </dependencies>
 
  <build>
    <!--<sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>-->
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <version>2.15.0</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
          <args>
            <arg>-target:jvm-1.5</arg>
          </args>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-eclipse-plugin</artifactId>
        <configuration>
          <downloadSources>true</downloadSources>
          <buildcommands>
            <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
          </buildcommands>
          <additionalProjectnatures>
            <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
          </additionalProjectnatures>
          <classpathContainers>
            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
            <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
          </classpathContainers>
        </configuration>
      </plugin>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
          <archive>
            <!--<manifest>
              <mainClass></mainClass>
            </manifest>-->
          </archive>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>build-helper-maven-plugin</artifactId>
        <version>1.10</version>
        <executions>
          <execution>
            <id>add-source</id>
            <phase>generate-sources</phase>
            <goals>
              <goal>add-source</goal>
            </goals>
            <configuration>
              <!-- 我们可以通过在这里添加多个source节点,来添加任意多个源文件夹 -->
              <sources>
                <source>src/main/java</source>
                <source>src/main/scala</source>
              </sources>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
  <reporting>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
        </configuration>
      </plugin>
    </plugins>
  </reporting>
</project>

1 个回复

倒序浏览
奈斯
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马