黑马程序员技术交流社区

标题: [上海校区]使用Java语言完成词频统计Spark应用程序开发 [打印本页]

作者: JackieZhu    时间: 2018-8-29 14:40
标题: [上海校区]使用Java语言完成词频统计Spark应用程序开发
使用Java语言完成词频统计应用程序开发


1 环境准备


环境名称
环境版本

maven
apache-maven-3.3.9

JDK
jdk1.8.0_141

Scala
Scala 2.11.8

IDE
IntelliJ IDEA Ultimate


2 新建maven工程
我们使用IDEA工具创建maven工程,首先需要创建一个maven工程

其次,我们指定maven工程的GAV三个元素,确定其坐标

创建好maven工程之后,我们就可以接着向下进行编程了.
3 配置pom.xml
[XML] 纯文本查看 复制代码
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.itheima</groupId>
    <artifactId>SaprkCloud4</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>
    <!--定义版本号-->
    <properties>
        <hadoop.version>2.7.4</hadoop.version>
        <scala.binary.version>2.11</scala.binary.version>
        <spark.version>2.2.0</spark.version>
        <scala.version>2.11.8</scala.version>
    </properties>
    <dependencies>
        <!--Scala依赖库-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!--spark核心依赖库-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!--mysql数据库依赖-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.18</version>
        </dependency>
        <!--hadoop版本库-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <!--配置maven编译插件-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

4 编写Java程序
[Java] 纯文本查看 复制代码
public class WordCountJavaLocal {
    public static void main(String[] args) {
        //1:创建SparkConf对象,配置Spark应用程序得基本信息
        SparkConf sparkConf = new SparkConf().setAppName("WordCountJavaLocal").setMaster("local[2]");
        //2: 创建JavaSparkContext对象
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        //3:要针对输入源(hdfs文件,本地文件),创建一个初始得RDD
        JavaRDD<String> lines = sc.textFile("F:\\data\\words.txt");
        //4: 对初始RDD进行transformation操作,也就是一些计算操作
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
                String[] split = line.split(" ");
                return Arrays.asList(split).iterator();
            }
        });
        //5:接着,需要将每一个单词,映射为(单词,1)得这种格式
        JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String,Integer>(word,1);
            }
        });
        //6: 接着,需要以单词作为key,统计每个单词出现得次数
        JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        //7:到这里为止,我们通过几个Spark算子操作,已经统计出了单词的次数,将结果打印到控制台
        wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            @Override
            public void call(Tuple2<String, Integer> wordCount) throws Exception {
                System.out.println(wordCount._1+"  出现了  "+wordCount._2+" 次");
            }
        });
        sc.close();
    }
}

5 运行结果

6 WordCount程序说明
SparkConf sparkConf = new SparkConf().setAppName("WordCountJavaLocal").setMaster("local[2]");
编写Spark程序首先需要创建一个SparkConf对象,这个对象包含了spark应用程序的一些信息.从上述代码可以看出,setMaster可以设置Spark应用程序需要连接的Spark集群的master节点的url,但是,如果设置为local,则代表的是在本地进行测试实验.setAppName()其实是用来设置集群UI页面的显示名称的
JavaSparkContext sc = new JavaSparkContext(sparkConf);
第二步就是需要创建一个SparkContext对象,SparkContext对象会告知Spark应用程序如何访问一个集群.也就是说        SparkContext对象是所有Spark应用程序的入口.无论我们是使用java,scala,还是python编写,都必须要有一个SparkContext对象.它的主要作用,包括初始化Spark应用程序所需的一些核心组件,包括调度器(DAGScheduler,TaskScheduler),还会去到Spark Master节点上进行注册,等等.总而言之,SparkContext是Spark应用程序中最最重要的一个对象.
但是呢,在Spark中,编写不同类型的Spark应用程序,使用的SparkContext是不同的.如下所示:
程序开发
SparkContext对象类型

scala
SparkContext

Java
JavaSparkContext

Spark SQL
SQLContext/HiveContext

Spark Streaming
StreamingContext
注:在Spark2.0之后,Spark框架将所有的入口全部整合到了一个新的对象,这个新的对象是所有一切Spark应用程序的入口,这个对象是SparkSession,后期我们会对其进行讲解
JavaRDD<String> lines = sc.textFile("F:\\data\\words.txt");
针对输入源(hdfs文件,本地文件 等等),创建一个初始的RDD,输入源中的数据会被打散,分配到RDD的每个partition中,从而形成一个初始的分布式的数据集.在SparkContext中,用于根据文件类型的输入源创建RDD的方法,叫做textFile()方法,在Java中,创建的普通的RDD,都叫做JavaRDD.我们这里的lines这个RDD,其实就是文件里的每一行数据.
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
                String[] split = line.split(" ");
                return Arrays.asList(split).iterator();
            }
});
接下来就是对RDD进行transformation操作,也就是一些计算操作,通常操作会通过创建Function,并配合RDD的map,flatMap等算子来执行.Function是一个接口,如果实现比较简单,则创建指定Function的匿名内部类,但是,如果Function实现比较复杂,则会单独创建这个接口的实现类.而且Function接口是一个函数式接口,可以使用Java的新特性(lamdba表达式)来实现业务.
这里是将原始的RDD中的每一行数据拆分成单个的单词,调用flatMap方法,这个方法中需要一个FlatMapFunction的对象,这个FlatMapFunction,有两个泛型参数,分别代表了输入和输出类型.在我们编写的程序中,输入的数据类型肯定是String,因为是一行一行的文本,输出也是String,因为是每一行的文本.
flatMap算子的作用,其实就是,将RDD的一个元素,给拆分成一个或多个元素.
JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String,Integer>(word,1);
            }
        });
我们需要将每一个单词,映射为(单词,1)的这种格式,因为只有这样,后面才能根据单词作为key,来进行每个单词的出现次数的累加.mapToPair,其实就是将每个元素,映射为一个(v1,v2)这样的Tuple2类型的元素.这里的Tuple2是scala类型,包含了两个值,mapToPair这个算子,要求的是与PairFunction配合使用,第一个泛型参数代表了输入类型,第二个和第三个泛型参数,代表的输出的Tuple2的第一个值和第二个值得类型.JavaPairRDD得两个泛型参数,分别代表了tuple元素得第一个值和第二个值得类型.
JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
});
这一步需要我们以单词作为key,统计每个单词出现得次数,这个地方需要用到一个算子操作reduceByKey,对每个key对应得value,都进行reduce操作.
例如JavaPairRDD中得几个元素,分别为(spark,1) (spark,1) (spark,1) (hadoop,1).
reduce操作,相当于是把第一个值和第二个值进行计算,然后在将结果与第三个值进行计算.例如上述例子中得spark,就相当于1 + 1 = 2,然后在世 2 + 1 = 3;最后返回得JavaPairRDD中得元素,也是tuple,但是第一个值就是每个key,第二个值就是key得value,reduce之后得结果就相当于每个单词出现得次数.
wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {
     @Override
     public void call(Tuple2<String, Integer> wordCount) throws Exception {
         System.out.println(wordCount._1+"\t\t\t出现了\t\t"+wordCount._2+"\t次");
     }
});
到这里,我们已经完成了词频统计得所有算子操作,也就是说,我们已经完成了词频统计,但是,之前我们得所有算子操作所使用得flatMap,mapToPair,ReduceByKey这种操作,都叫做transformation操作,这种操作都是懒加载得,不会主动执行得,必须等我调用action操作得时候,才可以触发这些算子操作.




作者: 梦缠绕的时候    时间: 2018-8-30 11:01

作者: 不二晨    时间: 2018-8-30 17:01
奈斯,加油加油
作者: 小影姐姐    时间: 2018-8-30 17:52





欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) 黑马程序员IT技术论坛 X3.2