A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

© xiaozuoquan 中级黑马   /  2019-11-22 13:19  /  1465 人查看  /  0 人回复  /   0 人收藏 转载请遵从CC协议 禁止商业使用本文

本帖最后由 xiaozuoquan 于 2019-11-22 13:26 编辑

Flink集群搭建
Flink支持多种安装模式。
1) local(本地)——单机模式,一般不使用
2) standalone——独立模式,Flink自带集群,开发测试环境使用
3) yarn——计算资源统一由Hadoop YARN管理,生产环境测试

standalone集群环境准备工作
1) jdk1.8及以上【配置JAVA_HOME环境变量】
2) ssh免密码登录【集群内节点之间免密登录】
下载安装包
[AppleScript] 纯文本查看 复制代码
https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.7.2/flink-1.7.2-bin-hadoop26-scala_2.11.tgz

集群规划
master(JobManager)+slave/worker(TaskManager)
node01(master+slave)   node02(slave)   node03(slave)

步骤
1) 解压Flink压缩包到指定目录
2) 配置Flink
3) 配置Slaves节点
4) 分发Flink到各个节点
5) 启动集群
6) 递交wordcount程序测试
7) 查看Flink WebUI

具体操作
1) 上传Flink压缩包到指定目录
2) 解压缩flink到 /export/servers 目录
[AppleScript] 纯文本查看 复制代码
[root@node01 softwares]# tar -zxvf flink-1.7.2-bin-hadoop26-scala_2.11.tgz
3) 修改安装目录下conf文件夹内的flink-conf.yaml配置文件,指定JobManager
# 配置Master的机器名(IP地址)
jobmanager.rpc.address: node01
# 配置每个taskmanager生成的临时文件夹
taskmanager.tmp.dirs: /export/servers/flink-1.7.2/tmp
4) 修改安装目录下conf文件夹内的slave配置文件,指定TaskManager
node01
node02
node03
5) 使用vi修改 /etc/profile 系统环境变量配置文件,添加HADOOP_CONF_DIR目录
[AppleScript] 纯文本查看 复制代码
export HADOOP_CONF_DIR=/export/servers/hadoop-2.7.2-cdh5.14.0/etc/hadoop
6) 分发/etc/profile到其他两个节点
[AppleScript] 纯文本查看 复制代码
scp -r /etc/profile node02:/etc
scp -r /etc/profile node03:/etc
7) 每个节点重新加载环境变量
[AppleScript] 纯文本查看 复制代码
[root@node01 servers]# source /etc/profile
8) 将配置好的Flink目录分发给其他的两台节点
[AppleScript] 纯文本查看 复制代码
[root@node01 servers]# for i in {2..3}; do scp -r flink-1.7.2/ node0$i:$PWD; done
9) 启动Flink集群
[AppleScript] 纯文本查看 复制代码
[root@node01 flink-1.7.2]# bin/start-cluster.sh
10) 通过jps查看进程信息
--------------------- node01 ----------------
86583 Jps
85963 StandaloneSessionClusterEntrypoint
86446 TaskManagerRunner

--------------------- node02 ----------------
44099 Jps
43819 TaskManagerRunner

--------------------- node03 ----------------
29461 TaskManagerRunner
29678 Jps
11) 启动HDFS集群
12) HDFS中创建/test/input目录
[AppleScript] 纯文本查看 复制代码
[root@node01 flink-1.7.2]# hadoop fs -mkdir -p /test/input
13) 上传wordcount.txt文件到HDFS /test/input目录
[AppleScript] 纯文本查看 复制代码
[root@node01 flink-1.7.2]# hadoop fs -put /root/wordcount.txt /test/input
14) 并运行测试任务
[AppleScript] 纯文本查看 复制代码
[root@node01 flink-1.7.2]# bin/flink run /export/servers/flink-1.7.2/examples/batch/WordCount.jar --input hdfs://node01:8020/test/input/wordcount.txt --output hdfs://node01:8020/test/output/0916001
15) 浏览Flink Web UI界面
http://node01:8081


启动/停止flink集群
启动:./bin/start-cluster.sh  
停止:./bin/stop-cluster.sh

Flink集群的重启或扩容
启动/停止jobmanager
如果集群中的jobmanager进程挂了,执行下面命令启动
bin/jobmanager.sh start
bin/jobmanager.sh stop

启动/停止taskmanager
添加新的taskmanager节点或者重启taskmanager节点
bin/taskmanager.sh start
bin/taskmanager.sh stop
Standalone集群架构
l client客户端提交任务给JobManager
l JobManager负责Flink集群计算资源管理,并分发任务给TaskManager执行
l TaskManager定期向JobManager汇报状态
高可用HA模式
从上述架构图中,可发现JobManager存在单点故障,一旦JobManager出现意外,整个集群无法工作。所以,为了确保集群的高可用,需要搭建Flink的HA。(如果是部署在YARN上,部署YARN的HA),我们这里演示如何搭建Standalone 模式HA。



集群规划
master(JobManager)+slave/worker(TaskManager)
node01(master+slave)   node02(master+slave)   node03(slave)

步骤
1) flink-conf.yaml中添加zookeeper配置
2) 将配置过的HA的 flink-conf.yaml 分发到另外两个节点
3) 分别到另外两个节点中修改flink-conf.yaml中的配置
4) masters 配置文件中添加多个节点
5) 分发masters配置文件到另外两个节点
6) 启动 zookeeper 集群
7) 启动 flink 集群

具体操作
1) flink-conf.yaml中添加zookeeper配置
#开启HA,使用文件系统作为快照存储
state.backend: filesystem
#默认为none,用于指定checkpoint的data files和meta data存储的目录
state.checkpoints.dir: hdfs://node01:8020/flink-checkpoints
#默认为none,用于指定savepoints的默认目录
state.savepoints.dir: hdfs://node01:8020/flink-checkpoints
#使用zookeeper搭建高可用
high-availability: zookeeper
# 存储JobManager的元数据到HDFS,用来恢复JobManager 所需的所有元数据
high-availability.storageDir: hdfs://node01:8020/flink/ha/
high-availability.zookeeper.quorum: node01:2181,node02:2181,node03:2181
2) 将配置过的HA的 flink-conf.yaml 分发到另外两个节点
[AppleScript] 纯文本查看 复制代码
[root@node01 conf]# for i in {2..3}; do scp -r /export/servers/flink-1.7.2/conf/flink-conf.yaml node0$i:$PWD; done
3) 到节点2中修改flink-conf.yaml中的配置,将JobManager设置为自己节点的名称
jobmanager.rpc.address: node02
4) masters 配置文件中添加多个节点
node1:8081
node2:8081
5) 分发masters配置文件到另外两个节点
[AppleScript] 纯文本查看 复制代码
[root@node01 servers]# for i in {2..3}; do scp -r flink-1.7.2/ node0$i:$PWD; done
6) 启动 zookeeper 集群
[AppleScript] 纯文本查看 复制代码
[root@node01 servers]# /export/servers/zookeeper-3.4.5-cdh5.14.0/bin/zkServer.sh start
[root@node02 servers]# /export/servers/zookeeper-3.4.5-cdh5.14.0/bin/zkServer.sh start
[root@node03 servers]# /export/servers/zookeeper-3.4.5-cdh5.14.0/bin/zkServer.sh start
7) 启动 HDFS 集群
8) 启动 flink 集群
[AppleScript] 纯文本查看 复制代码
[root@node01 flink-1.7.2]# bin/start-cluster.sh
Starting HA cluster with 2 masters.
Starting standalonesession daemon on host node01.hadoop.com.
Starting standalonesession daemon on host node02.hadoop.com.
Starting taskexecutor daemon on host node01.hadoop.com.
Starting taskexecutor daemon on host node02.hadoop.com.
Starting taskexecutor daemon on host node03.hadoop.com.
9) 分别查看两个节点的Flink Web UI
10) kill掉一个节点,查看另外的一个节点的Web UI

注意事项
切记搭建HA,需要将第二个节点的 jobmanager.rpc.address 修改为node02

yarn集群环境
在一个企业中,为了最大化的利用集群资源,一般都会在一个集群中同时运行多种类型的 Workload。因此 Flink 也支持在 Yarn 上面运行;
flink on yarn的前提是:hdfs、yarn均启动

准备工作
1) jdk1.8及以上【配置JAVA_HOME环境变量】
2) ssh免密码登录【集群内节点之间免密登录】
3) 至少hadoop2.2
4) hdfs & yarn

集群规划
master(JobManager)+slave/worker(TaskManager)
node01(master)   node02(slave)   node03(slave)

修改hadoop的配置参数
[AppleScript] 纯文本查看 复制代码
vim etc/hadoop/yarn-site.xml
添加:
<property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
</property>
是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true。
在这里面我们需要关闭,因为对于flink使用yarn模式下,很容易内存超标,这个时候yarn会自动杀掉job

修改全局变量/etc/profile
添加:
export HADOOP_CONF_DIR=/export/servers/hadoop/etc/Hadoop
YARN_CONF_DIR或者HADOOP_CONF_DIR必须将环境变量设置为读取YARN和HDFS配置
Flink on Yarn的运行机制
从图中可以看出,Yarn的客户端需要获取hadoop的配置信息,连接Yarn的ResourceManager。所以要有设置有 YARN_CONF_DIR或者HADOOP_CONF_DIR或者HADOOP_CONF_PATH,只要设置了其中一个环境变量,就会被读取。如果读取上述的变量失败了,那么将会选择hadoop_home的环境变量,都区成功将会尝试加载$HADOOP_HOME/etc/hadoop的配置文件。
1、当启动一个Flink Yarn会话时,客户端首先会检查本次请求的资源是否足够。资源足够将会上传包含HDFS配置信息和Flink的jar包到HDFS。
2、随后客户端会向Yarn发起请求,启动applicationMaster,随后NodeManager将会加载有配置信息和jar包,一旦完成,ApplicationMaster(AM)便启动。
3、当JobManager and AM 成功启动时,他们都属于同一个container,从而AM就能检索到JobManager的地址。此时会生成新的Flink配置信息以便TaskManagers能够连接到JobManager。同时,AM也提供Flink的WEB接口。用户可并行执行多个Flink会话。
4、随后,AM将会开始为分发从HDFS中下载的jar以及配置文件的container给TaskMangers.完成后Fink就完全启动并等待接收提交的job.

Flink on Yarn 的两种使用方式
yarn-session提供两种模式
1) 会话模式
使用Flink中的yarn-session(yarn客户端),会启动两个必要服务 JobManager 和 TaskManagers
客户端通过yarn-session提交作业
yarn-session会一直启动,不停地接收客户端提交的作用
有大量的小作业,适合使用这种方式


2) 分离模式
直接提交任务给YARN
大作业,适合使用这种方式

第一种方式:YARN session
Ø yarn-session.sh(开辟资源)+flink run(提交任务)
这种模式下会启动yarn session,并且会启动Flink的两个必要服务:JobManager和Task-managers,然后你可以向集群提交作业。同一个Session中可以提交多个Flink作业。需要注意的是,这种模式下Hadoop的版本至少是2.2,而且必须安装了HDFS(因为启动YARN session的时候会向HDFS上提交相关的jar文件和配置文件)
通过./bin/yarn-session.sh脚本启动YARN Session
脚本可以携带的参数:
Usage:
   Required
     -n,--container <arg>               分配多少个yarn容器 (=taskmanager的数量)  
   Optional
     -D <arg>                        动态属性
     -d,--detached                   独立运行 (以分离模式运行作业)
     -id,--applicationId <arg>            YARN集群上的任务id,附着到一个后台运行的yarn session中
     -j,--jar <arg>                      Path to Flink jar file
     -jm,--jobManagerMemory <arg>    [size=9.0000pt] JobManager的内存 [in MB]
     -m,--jobmanager <host:port>        指定需要连接的jobmanager(主节点)地址  
                                    使用这个参数可以指定一个不同于配置文件中的jobmanager  
     -n,--container <arg>               分配多少个yarn容器 (=taskmanager的数量)
     -nm,--name <arg>                [size=9.0000pt] 在YARN上为一个自定义的应用设置一个名字
     -q,--query                        显示yarn中可用的资源 (内存, cpu核数)
     -qu,--queue <arg>                 指定YARN队列
     -s,--slots <arg>                   每个TaskManager使用的slots数量
     -st,--streaming                   在流模式下启动Flink
     -tm,--taskManagerMemory <arg>   [size=9.0000pt] 每个TaskManager的内存 [in MB]
     -z,--zookeeperNamespace <arg>     针对HA模式在zookeeper上创建NameSpace
注意:
如果不想让Flink YARN客户端始终运行,那么也可以启动分离的 YARN会话。该参数被称为-d或--detached。

Ø 启动:
[AppleScript] 纯文本查看 复制代码
bin/yarn-session.sh -n 2 -tm 800 -s 1 -d

上面的命令的意思是,同时向Yarn申请3个container(即便只申请了两个,因为ApplicationMaster和Job Manager有一个额外的容器。一旦将Flink部署到YARN群集中,它就会显示Job Manager的连接详细信息),其中 2 个 Container 启动 TaskManager(-n 2),每个 TaskManager 拥有两个 Task Slot(-s 2),并且向每个 TaskManager 的 Container 申请 800M 的内存,以及一个ApplicationMaster(Job Manager)。


yarn页面:ip:8088可以查看当前提交的flink session

点击ApplicationMaster进入任务页面:

上面的页面就是使用:yarn-session.sh提交后的任务页面;
Ø 然后使用flink提交任务
[AppleScript] 纯文本查看 复制代码
bin/flink run examples/batch/WordCount.jar

在控制台中可以看到wordCount.jar计算出来的任务结果;

yarn-session.sh提交后的任务页面中也可以观察到当前提交的任务:

点击查看任务细节:

Ø 停止当前任务:
yarn application -kill  application_1527077715040_0007

第二种方式:在YARN上运行一个Flink作业
上面的YARN session是在Hadoop YARN环境下启动一个Flink cluster集群,里面的资源是可以共享给其他的Flink作业。我们还可以在YARN上启动一个Flink作业,这里我们还是使用./bin/flink,但是不需要事先启动YARN session:
Ø 使用flink直接提交任务
[AppleScript] 纯文本查看 复制代码
bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
以上命令在参数前加上y前缀,-yn表示TaskManager个数
8088页面观察:

Ø 停止yarn-cluster
[AppleScript] 纯文本查看 复制代码
yarn application -kill application的ID
注意:
在创建集群的时候,集群的配置参数就写好了,但是往往因为业务需要,要更改一些配置参数,这个时候可以不必因为一个实例的提交而修改conf/flink-conf.yaml;
可以通过:-D <arg>                        Dynamic properties
来覆盖原有的配置信息:比如:
-Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368

如果使用的是flink on yarn方式,想切换回standalone模式的话,需要删除问题:【/tmp/.yarn-properties-root】
因为默认查找当前yarn集群中已有的yarn-session信息中的jobmanager

0 个回复

您需要登录后才可以回帖 登录 | 加入黑马