在重写MR到Spark的过程中发现Spark的性能并没有达到我的预期,记录一次调优操作即效果对比
本文涉及调优手段:
持久化
分区数
未调优之前的代码:
val jsonStringRDD: RDD[String] = sc.textFile(s"$inputPath")
val check1RDD = jsonStringRDD.filter(js => check1(js))
val rdd2= check1RDD.filter(js => check2(js)).map(js => parse(js))
val count = rdd2.count()
println("*"*10, count, "*"*10)
rdd2.saveASTextFile(outPath)
现象1:
由于Spark的RDD的血缘容错计算规则,代码中的count,saveAsTextFile算子出发action操作,每次计算都是从第一部开始读取,然后计算整个步骤。
问题:两次应用rdd2,计算了两次
增加缓存优化后的代码:
可以考虑缓存,所以更改代码如下 :
val jsonStringRDD: RDD[String] = sc.textFile(s"$inputPath")
val check1RDD = jsonStringRDD.filter(js => check1(js))
val rdd2= check1RDD.filter(js => check2(js)).map(js => parse(js))
/** 缓存rdd2 */
rdd2.persist(StorageLevel.MEMORY_ONLY)
val count = rdd2.count()
println("*"*10, count, "*"*10)
rdd2.saveASTextFile(outPath)
在SparkHistory中可以看到,优化之后的计算几乎缩短了30倍时间,在stage2中的时间减少,这是由于spark的持久化机制导致的,其持久化是懒加载的,直到遇见action算子才会持久化该rdd,也就意味着 当stage2中savAsTextFile时没有经过前面的计算直接进行了写出操作。
现象2:整个计算时间即使持久化之后仍然比较长,这个计算需要47min,即使只计算一次。
考虑增加并行度来提高计算效率,缩短计算时间。
增加并行度优化:
//val jsonStringRDD: RDD[String] = sc.textFile(s"$inputPath")
/** 增加分区 */
val jsonStringRDD: RDD[String] = sc.textFile(s"$inputPath").repartition(200)
val check1RDD = jsonStringRDD.filter(js => check1(js))
val rdd2= check1RDD.filter(js => check2(js)).map(js => parse(js))
/** 缓存rdd2 */
rdd2.persist(StorageLevel.MEMORY_ONLY)
val count = rdd2.count()
println("*"*10, count, "*"*10)
rdd2.saveASTextFile(outPath)
需要注意的是增加并行度单纯的增加分区是不够的,需要在资源方面与之匹配,也就是集群的core数也要增加,并行在内存足够的情况下会取分区数(task个数)与core数的最小值。
调优之后整个执行时间20min,也就是并行度增加了一倍时间缩短了一半。
---------------------
作者:向阳飞行
来源:CSDN
原文:https://blog.csdn.net/bigdataprimary/article/details/85074182
版权声明:本文为博主原创文章,转载请附上博文链接!
|
|