黑马程序员技术交流社区
标题: [上海校区]使用Java语言完成词频统计Spark应用程序开发 [打印本页]
作者: JackieZhu 时间: 2018-8-29 14:40
标题: [上海校区]使用Java语言完成词频统计Spark应用程序开发
使用Java语言完成词频统计应用程序开发
1 环境准备
环境名称
环境版本
maven
apache-maven-3.3.9
JDK
jdk1.8.0_141
Scala
Scala 2.11.8
IDE
IntelliJ IDEA Ultimate
2 新建maven工程
我们使用IDEA工具创建maven工程,首先需要创建一个maven工程
其次,我们指定maven工程的GAV三个元素,确定其坐标
创建好maven工程之后,我们就可以接着向下进行编程了.
3 配置pom.xml
[XML] 纯文本查看 复制代码
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.itheima</groupId>
<artifactId>SaprkCloud4</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<!--定义版本号-->
<properties>
<hadoop.version>2.7.4</hadoop.version>
<scala.binary.version>2.11</scala.binary.version>
<spark.version>2.2.0</spark.version>
<scala.version>2.11.8</scala.version>
</properties>
<dependencies>
<!--Scala依赖库-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!--spark核心依赖库-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!--mysql数据库依赖-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.18</version>
</dependency>
<!--hadoop版本库-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<!--配置maven编译插件-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
4 编写Java程序
[Java] 纯文本查看 复制代码
public class WordCountJavaLocal {
public static void main(String[] args) {
//1:创建SparkConf对象,配置Spark应用程序得基本信息
SparkConf sparkConf = new SparkConf().setAppName("WordCountJavaLocal").setMaster("local[2]");
//2: 创建JavaSparkContext对象
JavaSparkContext sc = new JavaSparkContext(sparkConf);
//3:要针对输入源(hdfs文件,本地文件),创建一个初始得RDD
JavaRDD<String> lines = sc.textFile("F:\\data\\words.txt");
//4: 对初始RDD进行transformation操作,也就是一些计算操作
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
String[] split = line.split(" ");
return Arrays.asList(split).iterator();
}
});
//5:接着,需要将每一个单词,映射为(单词,1)得这种格式
JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String,Integer>(word,1);
}
});
//6: 接着,需要以单词作为key,统计每个单词出现得次数
JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
//7:到这里为止,我们通过几个Spark算子操作,已经统计出了单词的次数,将结果打印到控制台
wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> wordCount) throws Exception {
System.out.println(wordCount._1+" 出现了 "+wordCount._2+" 次");
}
});
sc.close();
}
}
5 运行结果
6 WordCount程序说明
SparkConf sparkConf = new SparkConf().setAppName("WordCountJavaLocal").setMaster("local[2]");编写Spark程序首先需要创建一个SparkConf对象,这个对象包含了spark应用程序的一些信息.从上述代码可以看出,setMaster可以设置Spark应用程序需要连接的Spark集群的master节点的url,但是,如果设置为local,则代表的是在本地进行测试实验.setAppName()其实是用来设置集群UI页面的显示名称的
JavaSparkContext sc = new JavaSparkContext(sparkConf);第二步就是需要创建一个SparkContext对象,SparkContext对象会告知Spark应用程序如何访问一个集群.也就是说 SparkContext对象是所有Spark应用程序的入口.无论我们是使用java,scala,还是python编写,都必须要有一个SparkContext对象.它的主要作用,包括初始化Spark应用程序所需的一些核心组件,包括调度器(DAGScheduler,TaskScheduler),还会去到Spark Master节点上进行注册,等等.总而言之,SparkContext是Spark应用程序中最最重要的一个对象.
但是呢,在Spark中,编写不同类型的Spark应用程序,使用的SparkContext是不同的.如下所示:
程序开发
SparkContext对象类型
scala
SparkContext
Java
JavaSparkContext
Spark SQL
SQLContext/HiveContext
Spark Streaming
StreamingContext
注:在Spark2.0之后,Spark框架将所有的入口全部整合到了一个新的对象,这个新的对象是所有一切Spark应用程序的入口,这个对象是SparkSession,后期我们会对其进行讲解
JavaRDD<String> lines = sc.textFile("F:\\data\\words.txt");针对输入源(hdfs文件,本地文件 等等),创建一个初始的RDD,输入源中的数据会被打散,分配到RDD的每个partition中,从而形成一个初始的分布式的数据集.在SparkContext中,用于根据文件类型的输入源创建RDD的方法,叫做textFile()方法,在Java中,创建的普通的RDD,都叫做JavaRDD.我们这里的lines这个RDD,其实就是文件里的每一行数据.
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
String[] split = line.split(" ");
return Arrays.asList(split).iterator();
}
});接下来就是对RDD进行transformation操作,也就是一些计算操作,通常操作会通过创建Function,并配合RDD的map,flatMap等算子来执行.Function是一个接口,如果实现比较简单,则创建指定Function的匿名内部类,但是,如果Function实现比较复杂,则会单独创建这个接口的实现类.而且Function接口是一个函数式接口,可以使用Java的新特性(lamdba表达式)来实现业务.
这里是将原始的RDD中的每一行数据拆分成单个的单词,调用flatMap方法,这个方法中需要一个FlatMapFunction的对象,这个FlatMapFunction,有两个泛型参数,分别代表了输入和输出类型.在我们编写的程序中,输入的数据类型肯定是String,因为是一行一行的文本,输出也是String,因为是每一行的文本.
flatMap算子的作用,其实就是,将RDD的一个元素,给拆分成一个或多个元素.
JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String,Integer>(word,1);
}
});我们需要将每一个单词,映射为(单词,1)的这种格式,因为只有这样,后面才能根据单词作为key,来进行每个单词的出现次数的累加.mapToPair,其实就是将每个元素,映射为一个(v1,v2)这样的Tuple2类型的元素.这里的Tuple2是scala类型,包含了两个值,mapToPair这个算子,要求的是与PairFunction配合使用,第一个泛型参数代表了输入类型,第二个和第三个泛型参数,代表的输出的Tuple2的第一个值和第二个值得类型.JavaPairRDD得两个泛型参数,分别代表了tuple元素得第一个值和第二个值得类型.
JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});这一步需要我们以单词作为key,统计每个单词出现得次数,这个地方需要用到一个算子操作reduceByKey,对每个key对应得value,都进行reduce操作.
例如JavaPairRDD中得几个元素,分别为(spark,1) (spark,1) (spark,1) (hadoop,1).
reduce操作,相当于是把第一个值和第二个值进行计算,然后在将结果与第三个值进行计算.例如上述例子中得spark,就相当于1 + 1 = 2,然后在世 2 + 1 = 3;最后返回得JavaPairRDD中得元素,也是tuple,但是第一个值就是每个key,第二个值就是key得value,reduce之后得结果就相当于每个单词出现得次数.
wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> wordCount) throws Exception {
System.out.println(wordCount._1+"\t\t\t出现了\t\t"+wordCount._2+"\t次");
}
});
到这里,我们已经完成了词频统计得所有算子操作,也就是说,我们已经完成了词频统计,但是,之前我们得所有算子操作所使用得flatMap,mapToPair,ReduceByKey这种操作,都叫做transformation操作,这种操作都是懒加载得,不会主动执行得,必须等我调用action操作得时候,才可以触发这些算子操作.
作者: 梦缠绕的时候 时间: 2018-8-30 11:01
作者: 不二晨 时间: 2018-8-30 17:01
奈斯,加油加油
作者: 小影姐姐 时间: 2018-8-30 17:52
欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) |
黑马程序员IT技术论坛 X3.2 |