1,概念
2,Spark 安装
2.1,Spark 安装依赖于 Scala
将 Scala tar 分别解压配置后安装在集群的所有节点设备上
2.2,解压 Spark tar 包 并完成配置
1)将 cdh 的 Spark 包解压到 /home/hduser/下
2)修改 Spark 主目录下/conf/slaves 内置配置如下:
node2
node3
3)修改 Spark主目录下/conf/spark-env.sh 内置配置如下:
export JAVA_HOME=/usr/java/jdk1.8.0.171
export HADOOP_HOME=/home/hduser/hadoop
export SCALA_HOME=/usr/local/scala-2.12.8
export HADOOP_CONF_DIR=/home/hduser/hadoop/etc/hadoop //hadoop 配置文件目录
export SPARK_LOCAL_DIRS=/home/hduserspark-1.6.0-cdh5.15.1
export SPARK_CLASSPATH=/home/hduserspark-1.6.0-cdh5.15.1/lib/* //配置spark classpath 指向lib包
export SPARK_MASTER_IP=node1 //主机pi
export SPARK_MASTER_PORT=8088 //端口
export SPARK_WORKER_MEMORY=512m //从机运行 spark job 最大内存
export SPARK_DIST_CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath) //引用hadoop jar包
4)将4个jar包拷贝导 /spark/lib/文件夹下
jackson-annotations-2.4.0.jar
jackson-core-2.4.4.jar
jackson-databind-2.4.4.jar
parquet-hadoop-1.4.3.jar
5)将配置好的 spark主目录 远程拷贝到 node2,node3上
2.3,常用命令
启动和关闭命令都在 spark主目录/sbin/下,其他执行命令都在 spark主目录/bin/下
1)启动
$>spark 主目录/sbin/start-all.sh
2)关闭
$>spark 主目录/sbin/stop-all.sh
3)spark-shell
$>spark 主目录/bin/spark-shell
执行完改名了后会切换到spark命令窗口
2.4,测试是否安装成功
1)启动后看进程 主机master,从机worker
2)浏览器访问 spark 集群(http://node1:8080)
3)运行 $>spark主目录/bin/spark-shell 命令,切换到spark开发窗口
3,术语
3.1,SparkConf
SparkConf:主要是 Spark 的环境配置,主机地址,作业名称
val conf = new SparkConf().setMaster("主机地址").setAppName("作业名称");
类似于:
Configuration conf = new Configuration();
core-site:主机地址->hdfs://node1:9000
Job job = Job.getInstance(conf,"job名称");
3.2,SparkContext
SparkContext 是 Spark 环境的上下文
val sc = new SparkContext(conf); //其中 conf 是 SparkConf ,所有 Spark 操作都是在 SparkContext 里完成的。
3.3,RDD
RDD:弹性分布式数据,一个RDD可以代表一个文件,一批文件。但是RDD没有真实存储对应的物理数据,而可以通过RDD完成对物理数据的操作,RDD是只读的。
1)根据 hdfs 上的文件创建对应的RDD对象
var rdd1 = sc.textFile("hdfs://node1:9000/input/words.txt");
2)对 RDD 数据做 map 操作
rdd1.map(x=>x.split(" ")).map(x=>(x,1));
1)rdd.map(x=>x.数据处理) //返回一个新的 RDD 对象
2)rdd.flatMap() 与 rdd.map() 区别如下:
如有一个rdd 内容为:
hadoop is a java
mysql
rdd.map 处理后新的 rdd 只有两条数据,每一条都是 Array[String] 类型
rdd.flatMap 扁平化处理后得到的新的 rdd 有五条数据,每一条都是 String 类型
3)rdd[(key,value)].reduceByKey:处理的是键值对,将相同key的数据一起处理。
如:rdd 对象 r1 数据内容为(hadoop 1)(java 1)(is 1)(java 1)
//第一个_代表本数据之前的运算结果集,第二个_代表当前值。
//r2经过reduceByKey处理后变为(hadoop,1)(java,2)(is,1)
val r2 = rs.reduceByKey(_+_);
4,Spark shell
将词频 words.txt 上传到 hdfs://node1:9000/input/下
val rd1 = sc.textFile("hdfs 上 music 素材");
val rd2 = rd1.map(x=>{(x.split( "\t")(1),1)});
val rd3 = rd2.reduceByKey(_+_);
5,eclipse 开发 Spark
|
|