RDD介绍1.RDD概念以及特性 RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将数据缓存在内存中,后续的查询能够重用这些数据,这极大地提升了查询速度。(A Resilient Distributed Dataset)弹性分布式数据集合。并且是spark最基本的编程抽象,而且RDD是只读、可分区的、可以进行并行计算的一个对象。 1.1RDD弹性1) 自动进行内存和磁盘数据存储的切换 Spark优先把数据放到内存中,如果内存放不下,就会放到磁盘里面,程序进行自动的存储切换 2) 基于血统的高效容错机制 在RDD进行转换和动作的时候,会形成RDD的Lineage依赖链,当某一个RDD失效的时候,可以通过重新计算上游的RDD来重新生成丢失的RDD数据。 3) Task如果失败会自动进行特定次数的重试 RDD的计算任务如果运行失败,会自动进行任务的重新计算,默认次数是4次。 4) Stage如果失败会自动进行特定次数的重试 如果Job的某个Stage阶段计算失败,框架也会自动进行任务的重新计算,默认次数也是4次。 5) Checkpoint和Persist可主动或被动触发 RDD可以通过Persist持久化将RDD缓存到内存或者磁盘,当再次用到该RDD时直接读取就行。也可以将RDD进行检查点,检查点会将数据存储在HDFS中,该RDD的所有父RDD依赖都会被移除。 6) 数据调度弹性 Spark把这个JOB执行模型抽象为通用的有向无环图DAG,可以将多Stage的任务串联或并行执行,调度引擎自动处理Stage的失败以及Task的失败。 7) 数据分片的高度弹性 可以根据业务的特征,动态调整数据分片的个数,提升整体的应用执行效率。 RDD是一种分布式的内存抽象,表示一个只读的记录分区的集合,它只能通过其他RDD转换而创建,为此,RDD支持丰富的转换操作(如map, join, filter, groupBy等),通过这种转换操作,新的RDD则包含了如何从其他RDDs衍生所必需的信息,所以说RDDs之间是有依赖关系的。基于RDDs之间的依赖,RDDs会形成一个有向无环图DAG,该DAG描述了整个流式计算的流程,实际执行的时候,RDD是通过血缘关系(Lineage)一气呵成的,即使出现数据分区丢失,也可以通过血缘关系重建分区,总结起来,基于RDD的流式计算任务可描述为:从稳定的物理存储(如分布式文件系统)中加载记录,记录被传入由一组确定性操作构成的DAG,然后写回稳定存储(HDFS或磁盘)。另外RDD还可以将数据集缓存到内存中,使得在多个操作之间可以重用数据集,基于这个特点可以很方便地构建迭代型应用(图计算、机器学习等)或者交互式数据分析应用。可以说Spark最初也就是实现RDD的一个分布式系统,后面通过不断发展壮大成为现在较为完善的大数据生态系统,简单来讲,Spark-RDD的关系类似于Hadoop-MapReduce关系。 1.2RDD的五大属性1) 一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。 如果文件的block个数 <=2 那么 sc.textFile(“file:///wordcount.txt”)分区个数为2 如果文件的block块个数 >2 那么 sc.textFile(“file:///wordcount.txt”)分区的个数等于block块的个数 2) 一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。RDD的每一个算子操作比如map 都会通过compute方法作用在每个分区之上 3) RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。每一个RDD都有其依赖列表RDD的依赖关系 都是存在一个序列集合中,作用:容错 以及构建起血统机制 4) 一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。 5) 一个列表,存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。 [url=][/url]
a list of preferred locations to compute each split on (e.g. block locations for * an HDFS file) Spark在读取hdfs文件的是,hdfs文件每一个block默认有多个备份,spark会获取每一个block块以及其备份的位置信息构建成列表,在进行计算的时候,spark会在位置列表中选取一个最佳位置进行任务分配。 移动数据不如移动计算的原则。 移动数据不如移动计算的原则最高境界:数据在当前运行程序的进程之中 RDD是如何确定优先位置? getPreferredLocations(split: Partition): Seq[String] 通过以上方法确定计算的最佳位置。 RDD的数据本地化: 5种[url=][/url]
2.RDD的构建方式3种构建方式 根据以后数据集合构建RDD val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8)) val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8))
根据外部文件 可以是本地文件也可是HDFS上文件 根据以后RDD创建新的RDD 需要经过算子操作
3.RDD的算子操作RDD的算子分为两类 转换算子(Transform算子) 重要算子操作 mapPartitions :作用于每个分区之上的 groupByKey算子和ReduceByKey算子的区别 coalesce 减少分区数据的算子 repartition 实际上是调用了 coalesce 算子 ,而且 repartition一定会进行shuffle操作,既可以增加也可以减少分区
Action算子
action算子内部都会有一个runJob方法进行提交一个Job任务 广播变量: 4.RDD的依赖关系RDD和它依赖的父RDD的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。
|