A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

© 滓曰不曰 初级黑马   /  2019-12-28 21:11  /  1142 人查看  /  0 人回复  /   0 人收藏 转载请遵从CC协议 禁止商业使用本文

RDD及其特点
1、RDD是Spark的核心数据模型,但是个抽象类,全称为Resillient Distributed Dataset,即弹性分布式数据集。

2、RDD在抽象上来说是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同节点上,从而让RDD中的数据可以被并行操作。(分布式数据集)

3、RDD通常通过Hadoop上的文件,即HDFS文件或者Hive表,来进行创建;有时也可以通过应用程序中的集合来创建。

4、RDD最重要的特性就是,提供了容错性,可以自动从节点失败中恢复过来。即如果某个节点上的RDDpartition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算该partition。这一切对使用者是透明的。

5、RDD的数据默认情况下存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘。(弹性)

创建RDD
进行Spark核心编程的第一步就是创建一个初始的RDD。该RDD,通常就代表和包含了Spark应用程序的输入源数据。然后通过Spark Core提供的transformation算子,对该RDD进行转换,来获取其他的RDD。

Spark Core提供了三种创建RDD的方式:

1.使用程序中的集合创建RDD(主要用于测试)

List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD<Integer> numbersRDD = sc.parallelize(numbers);
2.使用本地文件创建RDD(主要用于临时性处理有大量数据的文件)

SparkSession spark = SparkSession.builder().master("local").appName("WordCountLocal").getOrCreate();
JavaRDD<String> lines = spark.read().textFile("D:\\Users\\Administrator\\Desktop\\spark.txt").javaRDD();
3.使用HDFS文件创建RDD(生产环境的常用方式)

SparkSession spark = SparkSession.builder().appName("WordCountCluster").getOrCreate();
JavaRDD<String> lines = spark.read().textFile("hdfs://h0:9000/spark.txt").javaRDD();
使用HDFS文件创建RDD对比使用本地文件创建RDD,需要修改的,只有两个地方:
第一,将SparkSession对象的master("local")方法去掉
第二,我们针对的不是本地文件了,修改为hadoop hdfs上的真正的存储大数据的文件

操作RDD
Spark支持两种RDD操作:transformation和action。

transformation操作
transformation操作会针对已有的RDD创建一个新的RDD。transformation具有lazy特性,即transformation不会触发spark程序的执行,它们只是记录了对RDD所做的操作,不会自发的执行。只有执行了一个action,之前的所有transformation才会执行。

常用的transformation介绍:

map :将RDD中的每个元素传人自定义函数,获取一个新的元素,然后用新的元素组成新的RDD。

filter:对RDD中每个元素进行判断,如果返回true则保留,返回false则剔除。

flatMap:与map类似,但是对每个元素都可以返回一个或多个元素。

groupByKey:根据key进行分组,每个key对应一个Iterable<value>。

reduceByKey:对每个key对应的value进行reduce操作。

sortByKey:对每个key对应的value进行排序操作。

join:对两个包含<key,value>对的RDD进行join操作,每个keyjoin上的pair,都会传入自定义函数进行处理。

cogroup:同join,但是每个key对应的Iterable<value>都会传入自定义函数进行处理。

action操作
action操作主要对RDD进行最后的操作,比如遍历,reduce,保存到文件等,并可以返回结果给Driver程序。action操作执行,会触发一个spark job的运行,从而触发这个action之前所有的transformation的执行,这是action的特性。

常用的action介绍:

reduce:将RDD中的所有元素进行聚合操作。第一个和第二个元素聚合,值与第三个元素聚合,值与第四个元素聚合,以此类推。

collect:将RDD中所有元素获取到本地客户端(一般不建议使用)。

count:获取RDD元素总数。

take(n):获取RDD中前n个元素。



saveAsTextFile:将RDD元素保存到文件中,对每个元素调用toString方法。

countByKey:对每个key对应的值进行count计数。

foreach:遍历RDD中的每个元素。

RDD持久化
要持久化一个RDD,只要调用其cache()或者persist()方法即可。在该RDD第一次被计算出来时,就会直接缓存在每个节点中。但是cache()或者persist()的使用是有规则的,必须在transformation或者textFile等创建了一个RDD之后,直接连续调用cache()或persist()才可以。

如果你先创建一个RDD,然后单独另起一行执行cache()或persist()方法,是没有用的,而且会报错,大量的文件会丢失。

val lines = spark.read.textFile("hdfs://h0:9000/spark.txt").persist()

0 个回复

您需要登录后才可以回帖 登录 | 加入黑马