A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

在重写MR到Spark的过程中发现Spark的性能并没有达到我的预期,记录一次调优操作即效果对比

本文涉及调优手段:

持久化
分区数
未调优之前的代码:

val jsonStringRDD: RDD[String] = sc.textFile(s"$inputPath")

val check1RDD = jsonStringRDD.filter(js => check1(js))

val rdd2= check1RDD.filter(js => check2(js)).map(js => parse(js))

val count = rdd2.count()

println("*"*10, count, "*"*10)

rdd2.saveASTextFile(outPath)
现象1:

由于Spark的RDD的血缘容错计算规则,代码中的count,saveAsTextFile算子出发action操作,每次计算都是从第一部开始读取,然后计算整个步骤。

问题:两次应用rdd2,计算了两次

增加缓存优化后的代码:
可以考虑缓存,所以更改代码如下 :


val jsonStringRDD: RDD[String] = sc.textFile(s"$inputPath")

val check1RDD = jsonStringRDD.filter(js => check1(js))

val rdd2= check1RDD.filter(js => check2(js)).map(js => parse(js))

/** 缓存rdd2 */
rdd2.persist(StorageLevel.MEMORY_ONLY)

val count = rdd2.count()

println("*"*10, count, "*"*10)

rdd2.saveASTextFile(outPath)
在SparkHistory中可以看到,优化之后的计算几乎缩短了30倍时间,在stage2中的时间减少,这是由于spark的持久化机制导致的,其持久化是懒加载的,直到遇见action算子才会持久化该rdd,也就意味着 当stage2中savAsTextFile时没有经过前面的计算直接进行了写出操作。

现象2:整个计算时间即使持久化之后仍然比较长,这个计算需要47min,即使只计算一次。

考虑增加并行度来提高计算效率,缩短计算时间。

增加并行度优化:


//val jsonStringRDD: RDD[String] = sc.textFile(s"$inputPath")

/** 增加分区 */
val jsonStringRDD: RDD[String] = sc.textFile(s"$inputPath").repartition(200)

val check1RDD = jsonStringRDD.filter(js => check1(js))

val rdd2= check1RDD.filter(js => check2(js)).map(js => parse(js))

/** 缓存rdd2 */
rdd2.persist(StorageLevel.MEMORY_ONLY)

val count = rdd2.count()

println("*"*10, count, "*"*10)

rdd2.saveASTextFile(outPath)
需要注意的是增加并行度单纯的增加分区是不够的,需要在资源方面与之匹配,也就是集群的core数也要增加,并行在内存足够的情况下会取分区数(task个数)与core数的最小值。

调优之后整个执行时间20min,也就是并行度增加了一倍时间缩短了一半。
---------------------
作者:向阳飞行
来源:CSDN
原文:https://blog.csdn.net/bigdataprimary/article/details/85074182
版权声明:本文为博主原创文章,转载请附上博文链接!

1 个回复

倒序浏览
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马