概述
Flink的程序可以在本地IDE运行,也可以打包上传到集群运行。Flink程序代码最开始都是要创建执行环
境,根据不同的运行场景对应不同的执行环境,分别为本地执行环境和集群执行环境。那么不同执行环
境该如何创建,本文将通过一些示例进行演示。
一个简单的Flink程序
[AppleScript] 纯文本查看 复制代码 def main(args: Array[String]): Unit = {
// 创建执行环境
val env: ExecutionEnvironment =
ExecutionEnvironment.getExecutionEnvironment
// 加载本地集合
val listDataSet: DataSet[String] = env.fromCollection(List("1,张三", "2,李
四", "3,王五", "4,赵六"))
// 进行map操作
val personDataSet: DataSet[Person] = listDataSet.map { text =>
val textArr = text.split(",")
Person(textArr(0).toLong, textArr(1))
}
// 打印结果
personDataSet.print()
}
从上面示例中可以看到,Flink的程序结构,一开始就是创建执行环境,加载数据进行转换,最后落地。
示例中采用 getExecutionEnvironment 方法创建执行环境。
本地执行环境
Flink支持两种不同的本地执行环境。
LocalExecutionEnvironment 是启动完整的Flink运行时(Flink Runtime),包括 JobManager 和
TaskManager 。这种方式包括内存管理和在集群模式下执行的所有内部算法。
LocalExecutionEnvironment 应用示例
[AppleScript] 纯文本查看 复制代码 def main(args: Array[String]): Unit = {
// 开始时间
val startTime: Long = new Date().getTime
// 创建执行环境
val env: ExecutionEnvironment =
ExecutionEnvironment.createLocalEnvironment
// 加载数据
val listDataSet = env.fromCollection(List("1","2","3","4"))
// 打印结果
listDataSet.print()
// 结束时间
val endTime: Long = new Date().getTime
println(endTime - startTime)
}
CollectionEnvironment 是在 Java 集合(Java Collections)上执行 Flink 程序。 此模式不会启动完整
的Flink运行 时(Flink Runtime),因此执行的开销非常低并且轻量化。 例如一个 DataSet.map() 变
换,会对Java list中所有元素应用 map() 函数。
CollectionEnvironment 应用示例
[AppleScript] 纯文本查看 复制代码 def main(args: Array[String]): Unit = {
// 开始时间
val startTime: Long = new Date().getTime
// 创建执行环境
val env: ExecutionEnvironment =
ExecutionEnvironment.createCollectionsEnvironment
// 加载数据
val listDataSet = env.fromCollection(List("1","2","3","4"))
// 打印结果
listDataSet.print()
// 结束时间
val endTime: Long = new Date().getTime
println(endTime - startTime)
}
集群执行环境是Flink程序在Flink集群上运行时创建的执行环境,Flink程序可以在许多机器的集群上分
布运行。有两种方法可将程序发送到群集以供执行
使用命令行界面提交
使用代码中的远程环境提交
使用命令行提交
这种方式是将编写好的Flink程序达成jar包,使用flink安装包的 bin 目录下的的内置命令flink进行提
交。
[AppleScript] 纯文本查看 复制代码 # 命令和参数说明
# bin/flink:flink安装包中bin目录下的执命令flink
# ./examples/batch/WordCount.jar:flink自带的测试案例,这个路径是相对路径
# --input file:///home/user/hamlet.txt:指定输入文件
# --output file:///home/user/wordcount_out:指定输出文件
bin/flink run ./examples/batch/WordCount.jar --input
file:///home/user/hamlet.txt --
output file:///home/user/wordcount_out
使用代码中远程环境提交
通过IDE,直接在远程环境上执行Flink Java程序。
[AppleScript] 纯文本查看 复制代码
/**
* node01:集群节点hostname
* 8081:flink访问端口
* E:\ideaWorkspace\flinik-base\target\flinik-base-1.0-SNAPSHOT.jar:
flink程序打成jar包后的本地路径
* hdfs://node01:8020/flink-data/score.csv:hdfs集群中的csv文件
*/
def main(args: Array[String]): Unit = {
// 创建执行环境
val env =
ExecutionEnvironment.createRemoteEnvironment("node01",8081,"E:\\ideaWorkspa
ce\\flinik-base\\target\\flinik-base-1.0-SNAPSHOT.jar")
// 加载数据
val scoreDataSet = env.readCsvFile[(Long, String, Long, Double)]
("hdfs://node01:8020/flink-data/score.csv")
// 根据姓名分组,按成绩倒序排列,取第一个值
val list: DataSet[(Long, String, Long, Double)] =
scoreDataSet.groupBy(1).sortGroup(3, Order.DESCENDING).first(1)
// 打印结果
list.print()
}
score.csv文件数据样式
[AppleScript] 纯文本查看 复制代码 # id,姓名,科目代号,成绩
1,张三,1,98
2,张三,2,77.5
3,张三,3,89
getExecutionEnvironment
[AppleScript] 纯文本查看 复制代码 默认的执行环境创建方式,它会根据上下文去创建正确的ExecutionEnvironment,如果你在IDE中执
行程序或者将程序作为一个常规的Java/Scala程序执行,那么它将为你创建一个本地的环境,你的程序
将在本地执行。如果你将你的程序打成jar包,并通过命令行调用它,那么Flink集群管理器将执行你的
main方法并且getExecutionEnvironment()方法将为你的程序在集群中执行生成一个执行环境。
所以getExecutionEnvironment方法在本地执行或者集群执行都可用这个方法。
createLocalEnvironment
[AppleScript] 纯文本查看 复制代码 本地执行环境创建方式,当程序在IDE中运行时候,可以通过createLocalEnvironment创建基于本地
的执行环境。
createCollectionsEnvironment
[AppleScript] 纯文本查看 复制代码 集合环境执行环境创建方式,使用与本地代码调试时使用。
createRemoteEnvironment
[AppleScript] 纯文本查看 复制代码 创建远程执行环境。远程环境将程序(部分)发送到集群以执行。适用于本地直接发送程序到集群上执行
测试。
结论
[AppleScript] 纯文本查看 复制代码 在不确定时候选择默认的执行环境创建方式(getExecutionEnvironment)
小结
[AppleScript] 纯文本查看 复制代码 上述内容对Flink创建执行环境方式和应用场景进行了介绍。通常在代码开发过程当中我们都是使用默认
的创建方式创建执行环境,其他方式的创建的产生是为了使我们更高效的在不同场景下对开发的程序进
行执行测试,节约执行时间和手动部署的操作。
|