A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

© 梦缠绕的时候 黑马粉丝团   /  2018-10-9 09:39  /  807 人查看  /  1 人回复  /   0 人收藏 转载请遵从CC协议 禁止商业使用本文

Spark的scala也好, 还是pyspark也好, 它们的编程思想都是函数式编程, 关于函数式编程的解析可以看这篇文章:
http://www.ruanyifeng.com/blog/2 ... al_programming.html
函数式编程只是返回新的值, 不修改原有的值, 所以在对RDD操作时一定要注意, 不要用对RDD操作以后, 引用了老的变量.


  • import pyspark







  • sc = pyspark.SparkContext()







  • rdd = sc.parallelize([1, 2, 3, 4, 5, 6, -1, 0])



  • rdd.sortBy(lambda x: x)



  • print rdd.collect()  # not ordered, because you are referencing the original variable 'rdd'







  • rdd = sc.parallelize([1, 2, 3, 4, 5, 6, -1, 0])



  • rdd.sortBy(lambda x: x).persist()



  • print rdd.collect()  # not ordered, same reason as above







  • rdd = sc.parallelize([1, 2, 3, 4, 5, 6, -1, 0])



  • print rdd.sortBy(lambda x : x).collect() # ordered




collect函数的作用把RDD的收集到当前的节点上, 注意如果RDD过大, 可能当前节点内存不能存放RDD,导致报错

Spark driver 是用来解析用户提交的代码, 并把这些代码转换成集群要执行的任务. 把这些任务交给master来分发执行.
master可以有很多选择, 比如yarn就是一个master(参考: https://blog.csdn.net/chengyuqiang/article/details/77864246#comments)
pyspark --master yarn --queue queue1 --deploy-mode client

通过配置spark-site.xml的HADOOP_CONF_DIR就能让spark知道yarn的访问路径
Spark 还分 client 模式和 cluster 模式, 详见:
https://blog.csdn.net/SummerMangoZz/article/details/72627518
https://blog.csdn.net/Trigl/article/details/72732241
简单来说如果你向spark集群提交任务的那台电脑和集群很近, 延迟很低, 那client就够了, 但是如果你用的是VPN在家里向公司的某个国外的集群提交spark任务, 那么由于网络延迟很大, 就用cluster模式吧


Spark也有类似hadoop中分布式缓存的实现, 叫broadcast variables.
Spark中有一个全局计数的实现叫Accumulators
Spark的map可以基于每一行, 去应用lambda表达式对各个元素进行转换, 也可以基于Partition来去转换, 以节省数据库的链接这样宝贵的资源.
reduce里指定的变换函数, 返回的结果必须和序列的元素的结构一致, 因为reducer的原理是 把第一个和第二个元素的转换结果和第三个元素再一起转换:


  • import pyspark







  • sc = pyspark.SparkContext()







  • rdd = sc.parallelize([1, 2, 3], 1)



  • # calculate the mean value



  • #the lambda function must return the same structure



  • print reduce( lambda x,y: x/y,rdd.map(lambda x: [x, 1.0]).reduce(lambda x,y:[x[0]+y[0],x[1]+y[1]]))





关于 Spark 的 fold 算子:


  • # -*- coding: utf-8 -*-







  • import pyspark







  • sc = pyspark.SparkContext()



  • baseline = 5



  • partition_count = 3



  • rdd = sc.parallelize([1, 2, 3], partition_count)



  • print rdd.fold(baseline, lambda x,y:x+y)



  • #equals



  • print sum([1, 2, 3]) + (len([1, 2, 3]) - 1) * baseline + (partition_count-1) * baseline




Spark-submit的参数:


  • spark-submit --master yarn --deploy-mode client



  • --executor-memory 1G



  • --queue yarnqueue



  • --num-executors 6



  • --executor-cores 1



  • somepythonfile.py



executor-memory:每个执行器的内存拥有量
num-execuotrs: 默认为2(yarn only)
queue: 使用yarn的哪个queue(yarn only)
executor-cores: 每个执行器的核心使用数, 默认1(yarn only)


---------------------本文来自 爱知菜 的CSDN 博客 ,全文地址请点击:https://blog.csdn.net/rav009/art ... 654?utm_source=copy

1 个回复

倒序浏览
奈斯
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马