1、注意join的使用,如果有较小的表可考虑使用广播的方式实现mapjoin,类似MR/HIVE。广播变量是一个executor一份副本 2、注意数据倾斜的问题,这个问题在分布式shuffle操作时都有可能出现,可采样key的分布情况,将倾斜的key单独抽离出来根据实际情况单独处理,或者在key上加随机值(当然也不一定加随机值比如按照某个key,count(distinct)操作)原来一个步骤拆成两个步骤,第一个步骤实现预聚合,第二个步骤再把随机值去掉聚合,减少数据倾斜量 3、注意大量小分区的问题,比如hdfs本身存储大量小文件,或者在spark filter操作后等等都有可能出现,大量小分区会造成启动大量小任务,任务启动,线程切换开销极大。coalesce减少分区,如果是hdfs有大量小文件,可sc.newAPIHadoopFile设置使用CombineTextInputFormat 4、本身任务并行度设置的问题,spark.default.parallelism控制shuffle后task个数,官方推荐cpu core的数量*2-3个的并行度,同时num-executor 、executor-memory(根据实际情况我们一般设置3-4G),executor-cores(官方推荐2-4core,我设置的是2) 5、同一份数据会多次访问,可使用cache/persist将数据缓存在内存 6、如果要实现按不同粒度计算最后产出到不同目录,可以flatMap,最后再multi insert 7、实JVM内存上的优化 尽量使用数据组类和一些基础数据类型,避免创建大量对象,对象会占用额外的空间,比如每个对象都有对象头。当然用不用对象还需要根据实际情况权衡,数据复杂的情况会造成代码可读性差,难扩展难维护 序列化,存在对象的情况,可在程序设置使用KyroSerializer序列化方式,相比java标准库速度更快,占用空间更少,序列化的本质是将对象数据转化为连续空间的字节数据存储的数据,指定会在shuffle阶段和rdd持久序列化(persist)阶段生效 指定Java参数中添加-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps可获得日志,executor的日志在executor的stdout中查看,观察gc触发的频率、gc耗时、造成程序pause的最长耗时等统计数据,是否有需要调整新生代,老生代比例,内存区域大小等,在一些大堆 JVM的场景下比如超过32G,尝试 G1 GC -XX:+UseG1GC。有助于提高gc性能 8、参数设置的优化 spark.shuffle.memoryFraction默认0.2,用于shuffle操作时做聚合作业的内存,如果cache操作较少,shuffle操作较多,可调高该比率 spark.storage.memoryFraction默认0.6,用于cache操作,如果作业频繁GC操作导致运行缓慢(可能作业有大量对象),可考虑调低该参数。 spark.akka.frameSize: 控制Spark中通信消息的最大容量 (如 task 的输出结果),默认为10M。当处理大数据时,task 的输出可能会大于这个值,需要根据实际数据设置一个更高的值,一般不需要设置 sort hash方式, - shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值(默认为200)。类似于hash需要并发打开多个文件,内存消耗量增加,如果内存GC有问题,可以调低这个值(真的?sort-hash就不占用内存吗),反之也可以调高这个值
- 不是排序类的shuffle算子(比如reduceByKey)。reduceByKey会做排序吗?
|