A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

本帖最后由 xiaozuoquan 于 2019-12-19 15:46 编辑

1、安装spark2-lib到oozie
                按照上述文档具体操作命令如下:
参考文档:https://blog.csdn.net/zkf541076398/article/details/79941598

1. 在HDFS目录中创建spark2目录,存储Spark2相关jar包
[AppleScript] 纯文本查看 复制代码
hdfs dfs -mkdir -p /user/root/share/lib/lib_20190723215106/spark2


向spark2目录添加spark2的jars和oozie-sharelib-spark*.jar

2. 上传Spark2相关jar包至创建目录中(共享库spark2目录)
[AppleScript] 纯文本查看 复制代码
hdfs dfs -put /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/jars/* /user/root/share/lib/lib_20190723215106/spark2

​ 3. 拷贝oozie-spark集成jar包至共享库spark2目录
[AppleScript] 纯文本查看 复制代码
hdfs dfs -cp /user/root/share/lib/lib_20190723215106/spark/oozie-sharelib-spark-4.1.0-cdh5.14.0.jar /user/root/share/lib/lib_20190723215106/spark2/
hdfs dfs -cp /user/root/share/lib/lib_20190723215106/spark/oozie-sharelib-spark-4.1.0-cdh5.14.0.jar /user/root/share/lib/lib_20190723215106/spark2/oozie-sharelib-spark.jar
​


# 4. 更新Oozie的share-lib
[AppleScript] 纯文本查看 复制代码
oozie admin -oozie http://bigdata-cdh01.itcast.cn:11000/oozie -sharelibupdate

# 5. 确认spark2已经添加到共享库
[AppleScript] 纯文本查看 复制代码
[root@bigdata-cdh01 ~]# oozie admin -oozie http://bigdata-cdh01.itcast.cn:11000/oozie -shareliblist

[Available ShareLib]
hive
spark2
distcp
mapreduce-streaming
spark
oozie
hcatalog
hive2
sqoop
pig
2、Oozie Spark Action
                Oozie中使用Spark Action官方文档如下:
http://bigdata-cdh01.itcast.cn:11000/oozie/docs/DG_SparkActionExtension.html

1、The spark action runs a Spark job.

2、The workflow job will wait until the Spark job completes before continuing to the next action.

3、To run the Spark job, you have to configure the spark action with the job-tracker , name-node , Spark master elements as well as the necessary elements, arguments and configuration.

4、Spark options can be specified in an element called spark-opts .

5、A spark action can be configured to create or delete HDFS directories before starting the Spark job.
                WorkFlow具体配置语法案例如下:
[AppleScript] 纯文本查看 复制代码
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.3">
    ...
    <action name="[NODE-NAME]">
        <spark xmlns="uri:oozie:spark-action:0.1">
            <job-tracker>[JOB-TRACKER]</job-tracker>
            <name-node>[NAME-NODE]</name-node>
            <prepare>
               <delete path="[PATH]"/>
               ...
               <mkdir path="[PATH]"/>
               ...
            </prepare>
            <job-xml>[SPARK SETTINGS FILE]</job-xml>
            <configuration>
                <property>
                    <name>[PROPERTY-NAME]</name>
                    <value>[PROPERTY-VALUE]</value>
                </property>
                ...
            </configuration>
            <master>[SPARK MASTER URL]</master>
            <mode>[SPARK MODE]</mode>
            <name>[SPARK JOB NAME]</name>
            <class>[SPARK MAIN CLASS]</class>
            <jar>[SPARK DEPENDENCIES JAR / PYTHON FILE]</jar>
            <spark-opts>[SPARK-OPTIONS]</spark-opts>
            <arg>[ARG-VALUE]</arg>
                ...
            <arg>[ARG-VALUE]</arg>
            ...
        </spark>
        <ok to="[NODE-NAME]"/>
        <error to="[NODE-NAME]"/>
    </action>
    ...
</workflow-app>

                当Spark Application运行在YARN上时,为了监控运行完成的应用,相关设置如下:
    To ensure that your Spark job shows up in the Spark History Server, make sure to specify these three Spark configuration properties either in spark-opts with --conf or from oozie.service.SparkConfigurationService.spark.configurations in oozie-site.xml.
   
1. spark.yarn.historyServer.address=SPH-HOST:18080

2. spark.eventLog.dir=hdfs://NN:8020/user/spark/applicationHistory

3. spark.eventLog.enabled=true

3、测试 Oozie 调度Spark2
                编写相关job.properties属性文件和Workflow配置文件及Coordinator调度文件,运行Spark 框架官方案例:圆周率PI,分别运行在local[2]本地模式、YANR集群(client和cluster)模式。
3.1、本地模式:spark2–local

                编写配置文件,运行SparkPi程序在本地模式local[2],具体说明如下。
  • job属性文件:job.properties


[AppleScript] 纯文本查看 复制代码
oozie.use.system.libpath=true
user.name=root
​
# HDFS and YARN
nameNode=hdfs://bigdata-cdh01.itcast.cn:8020
jobTracker=bigdata-cdh01.itcast.cn:8032
queueName=default
appPath=${nameNode}/user/${user.name}/oozie_works/wf_spark_pi/
​
# Spark Submit Options
master=local[2]
mode=client
sparkOptions= --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=hdfs://bigdata-cdh01.itcast.cn:8020/spark/eventLogs
mainClass=org.apache.spark.examples.SparkPi
appName=SparkExamplePi
jarPath=${appPath}/lib/spark-examples_2.11-2.2.0.jar
appParam=10
​
# Oozie Parameters
oozie.wf.application.path=${appPath}
oozie.libpath=${nameNode}/user/${user.name}/share/lib/lib_20190723215106/spark2


  • Workflow配置文件:workflow.xml

[AppleScript] 纯文本查看 复制代码
<?xml version="1.0" encoding="utf-8"?>
​
<workflow-app xmlns="uri:oozie:workflow:0.4" name="wf_spark_pi">
    <start to="spark_node"/>
    <action name="spark_node">
        <spark xmlns="uri:oozie:spark-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
                <property>
                    <name>oozie.action.sharelib.for.spark</name>
                    <value>spark2</value>
                </property>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <master>${master}</master>
            <mode>${mode}</mode>
            <name>${appName}</name>
            <class>${mainClass}</class>
            <jar>${jarPath}</jar>
            <spark-opts>${sparkOptions}</spark-opts>
            <arg>${appParam}</arg>
        </spark>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Workflow failed, error message [${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

  • 应用jar包及依赖包:lib目录

  • 提交执行工作流Workflow,命令如下:

校验配置文件
[AppleScript] 纯文本查看 复制代码
/export/servers/oozie/bin/oozie validate -oozie http://bigdata-cdh01.itcast.cn:11000/oozie/ /root/wf_spark_pi/workflow.xml


运行job
[AppleScript] 纯文本查看 复制代码
/export/servers/oozie/bin/oozie job -oozie http://bigdata-cdh01.itcast.cn:11000/oozie/ -config /root/wf_spark_pi/job.properties -run


                由于本地模式运行Spark 应用,所以Spark 应用运行在Oozie Workflow中MapTask任务中。

3.2、YARN 集群:spark2–client

                编写配置文件,运行SparkPi程序在yarn集群上,使用client部署运行模式,具体说明如下。
  • job属性文件:job.properties


[AppleScript] 纯文本查看 复制代码
oozie.use.system.libpath=true
user.name=root
​
# HDFS and YARN
nameNode=hdfs://bigdata-cdh01.itcast.cn:8020
jobTracker=bigdata-cdh01.itcast.cn:8032
queueName=default
appPath=${nameNode}/user/${user.name}/oozie_works/wf_yarn_pi/
​
# Spark Submit Options
master=yarn
mode=client
sparkOptions= --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 --conf spark.yarn.historyServer.address=http://bigdata-cdh01.itcast.cn:18080 --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=hdfs://bigdata-cdh01.itcast.cn:8020/spark/eventLogs --conf spark.yarn.jars=hdfs://bigdata-cdh01.itcast.cn:8020/spark/jars/*
mainClass=org.apache.spark.examples.SparkPi
appName=SparkExamplePi
jarPath=${appPath}/lib/spark-examples_2.11-2.2.0.jar
appParam=10
​
# Oozie Parameters
oozie.wf.application.path=${appPath}
oozie.libpath=${nameNode}/user/${user.name}/share/lib/lib_20190723215106/spark2
​

  • Workflow配置文件:workflow.xml

[AppleScript] 纯文本查看 复制代码
<?xml version="1.0" encoding="utf-8"?>
​
<workflow-app xmlns="uri:oozie:workflow:0.4" name="wf_spark_pi">
    <start to="spark_node"/>
    <action name="spark_node">
        <spark xmlns="uri:oozie:spark-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
                <property>
                    <name>oozie.action.sharelib.for.spark</name>
                    <value>spark2</value>
                </property>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <master>${master}</master>
            <mode>${mode}</mode>
            <name>${appName}</name>
            <class>${mainClass}</class>
            <jar>${jarPath}</jar>
            <spark-opts>${sparkOptions}</spark-opts>
            <arg>${appParam}</arg>
        </spark>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Workflow failed, error message [${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

  • 提交执行工作流Workflow,命令如下:

校验配置文件
[AppleScript] 纯文本查看 复制代码
/export/servers/oozie/bin/oozie validate -oozie http://bigdata-cdh01.itcast.cn:11000/oozie/ /root/wf_yarn_pi/workflow.xml


运行job
[AppleScript] 纯文本查看 复制代码
/export/servers/oozie/bin/oozie job -oozie http://bigdata-cdh01.itcast.cn:11000/oozie/ -config /root/wf_yarn_pi/job.properties -run

               

2.4.3、YARN 集群:spark2–cluster
                修改上述job属性文件,将Spark Application以cluster Deploy Mode运行YARN上,运行截图如下。
1. job.properties修改如下内容:
[AppleScript] 纯文本查看 复制代码
mode=cluster


2. 提交运行
[AppleScript] 纯文本查看 复制代码
/export/servers/oozie/bin/oozie job -oozie http://bigdata-cdh01.itcast.cn:11000/oozie/ -config /root/wf_yarn_pi/job.properties -run

               

3.5、测试Oozie定时调度

                定时触发任务执行,编写配置文件,具体如下:
  • job属性文件:job.properties


[AppleScript] 纯文本查看 复制代码
oozie.use.system.libpath=true
user.name=root
​
# HDFS and YARN
nameNode=hdfs://bigdata-cdh01.itcast.cn:8020
jobTracker=bigdata-cdh01.itcast.cn:8032
queueName=default
appPath=${nameNode}/user/${user.name}/oozie_works/cron_yarn_pi/
​
# Spark Submit Options
master=yarn
mode=cluster
sparkOptions= --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 --conf spark.yarn.historyServer.address=http://bigdata-cdh01.itcast.cn:18080 --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=hdfs://bigdata-cdh01.itcast.cn:8020/spark/eventLogs --conf spark.yarn.jars=hdfs://bigdata-cdh01.itcast.cn:8020/spark/jars/*
mainClass=org.apache.spark.examples.SparkPi
appName=SparkExamplePi
jarPath=${appPath}/lib/spark-examples_2.11-2.2.0.jar
appParam=10
​
# Crontab
start=2019-11-17T09:55+0800
freq=0/5 * * * *
end=2019-11-17T10:05+0800
​
​
# Oozie Parameters
oozie.coord.application.path=${appPath}
​
oozie.libpath=${nameNode}/user/root/share/lib/lib_20190723215106/spark2
​

  • Workflow配置文件:workflow.xml

[AppleScript] 纯文本查看 复制代码
<?xml version="1.0" encoding="utf-8"?>
​
<workflow-app xmlns="uri:oozie:workflow:0.4" name="wf_spark_pi">
    <start to="spark_pi_node"/>
    <action name="spark_pi_node">
        <spark xmlns="uri:oozie:spark-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
                <property>
                    <name>oozie.action.sharelib.for.spark</name>
                    <value>spark2</value>
                </property>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <master>${master}</master>
            <mode>${mode}</mode>
            <name>${appName}</name>
            <class>${mainClass}</class>
            <jar>${jarPath}</jar>
            <spark-opts>${sparkOptions}</spark-opts>
            <arg>${appParam}</arg>
        </spark>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Workflow failed, error message [${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>
​

  • Coordnator调度配置文件:coordinator.xml,内容如下:

[AppleScript] 纯文本查看 复制代码
<?xml version="1.0" encoding="utf-8"?>
​
<coordinator-app xmlns="uri:oozie:coordinator:0.4" name="cron_yarn_pi" frequency="${freq}" start="${start}" end="${end}" timezone="Asia/Shanghai">
    <action>
        <workflow>
            <app-path>${appPath}</app-path>
        </workflow>
    </action>
</coordinator-app>

                执行任务:
[AppleScript] 纯文本查看 复制代码
/export/servers/oozie/bin/oozie job -oozie http://bigdata-cdh01.itcast.cn:11000/oozie/ -config /root/cron_yarn_pi/job.properties -run

        



4.1、Java API Example
                ==编写JAVA 程序提交Oozie作业,在运行提交程序前,首先需要把相应的程序打成jar包,定义好workflow.xml,再把它们上传到HDFS中,然后在程序中指定作业的属性。==
官方文档:http://oozie.apache.org/docs/4.1.0/DG_Examples.html
                主要涉及类:==OozieClient,传递OozieUrl地址、工作流参数配置及配置文件,提交执行即可==,伪代码:
[AppleScript] 纯文本查看 复制代码
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowJob;
​
import java.util.Properties;
​
    ...
​
    // get a OozieClient for local Oozie
    OozieClient wc = new OozieClient("http://bar:11000/oozie");
​
    // create a workflow job configuration and set the workflow application path
    Properties conf = wc.createConfiguration();
    conf.setProperty(OozieClient.APP_PATH, "hdfs://foo:8020/usr/tucu/my-wf-app");
​
    // setting workflow parameters
    conf.setProperty("jobTracker", "foo:8021");
    conf.setProperty("inputDir", "/usr/tucu/inputdir");
    conf.setProperty("outputDir", "/usr/tucu/outputdir");
    ...
​
    // submit and start the workflow job
    String jobId = wc.run(conf);
    System.out.println("Workflow job submitted");
​
    // wait until the workflow job finishes printing the status every 10 seconds
    while (wc.getJobInfo(jobId).getStatus() == Workflow.Status.RUNNING) {
        System.out.println("Workflow job running ...");
        Thread.sleep(10 * 1000);
    }
​
    // print the final status o the workflow job
    System.out.println("Workflow job completed ...");
    System.out.println(wf.getJobInfo(jobId));
    ...
               
                其中POM文件内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>profile-tags_2.11</artifactId>
        <groupId>cn.itcast.tags</groupId>
        <version>1.0.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
​
    <artifactId>tags-oozie</artifactId>
​
    <!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 -->
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>jboss</id>
            <url>http://repository.jboss.com/nexus/content/groups/public</url>
        </repository>
    </repositories>
​
    <properties>
        <scala.version>2.11.8</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <spark.version>2.2.0</spark.version>
        <hadoop.version>2.6.0-cdh5.14.0</hadoop.version>
        <hbase.version>1.2.0-cdh5.14.0</hbase.version>
        <zookeeper.version>3.4.5-cdh5.14.0</zookeeper.version>
        <oozie.version>4.1.0-cdh5.14.0</oozie.version>
    </properties>
​
    <dependencies>
​
        <!-- Hadoop Client 依赖 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
​
​
        <!-- Oozie Client API -->
        <dependency>
            <groupId>org.apache.oozie</groupId>
            <artifactId>oozie-client</artifactId>
            <version>${oozie.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.oozie</groupId>
            <artifactId>oozie-core</artifactId>
            <version>${oozie.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.oozie</groupId>
            <artifactId>oozie-hadoop</artifactId>
            <version>2.6.0-cdh5.14.0.oozie-4.1.0-cdh5.14.0</version>
        </dependency>
​
    </dependencies>
​
    <build>
        <outputDirectory>target/classes</outputDirectory>
        <testOutputDirectory>target/test-classes</testOutputDirectory>
        <resources>
            <resource>
                <directory>${project.basedir}/src/main/resources</directory>
            </resource>
        </resources>
        <!-- Maven 编译的插件 -->
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
​
</project>

4.2、Workflow Submit
                将SparkPi圆周率程序提交到YARN上以cluster DeployMode运行,相关配置文件内容如下:
  • 属性文件job.properties

[AppleScript] 纯文本查看 复制代码
oozie.use.system.libpath=true
user.name=root
​
# HDFS and YARN
nameNode=hdfs://bigdata-cdh01.itcast.cn:8020
jobTracker=bigdata-cdh01.itcast.cn:8032
queueName=default
appPath=${nameNode}/user/${user.name}/oozie_works/wf_yarn_pi/
​
# Spark Submit Options
master=yarn
mode=cluster
sparkOptions= --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 --conf spark.yarn.historyServer.address=http://bigdata-cdh01.itcast.cn:18080 --conf spark.eventLog.enabled=true --conf spark.eventLog.compress=true --conf spark.eventLog.dir=hdfs://bigdata-cdh01.itcast.cn:8020/spark/eventLogs --conf spark.yarn.jars=hdfs://bigdata-cdh01.itcast.cn:8020/spark/jars/*
mainClass=org.apache.spark.examples.SparkPi
appName=SparkExamplePi
jarPath=${appPath}/lib/spark-examples_2.11-2.2.0.jar
appParam=10
​
# Oozie Parameters
oozie.wf.application.path=${appPath}
oozie.libpath=${nameNode}/user/${user.name}/share/lib/lib_20190723215106/spark2
​

  • Workflow工作流配置文件workflow.xml

[AppleScript] 纯文本查看 复制代码
<?xml version="1.0" encoding="utf-8"?>
<workflow-app xmlns="uri:oozie:workflow:0.4" name="wf_spark_pi">
    <start to="spark_node"/>
    <action name="spark_node">
        <spark xmlns="uri:oozie:spark-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
                <property>
                    <name>oozie.action.sharelib.for.spark</name>
                    <value>spark2</value>
                </property>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <master>${master}</master>
            <mode>${mode}</mode>
            <name>${appName}</name>
            <class>${mainClass}</class>
            <jar>${jarPath}</jar>
            <spark-opts>${sparkOptions}</spark-opts>
            <arg>${appParam}</arg>
        </spark>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Workflow failed, error message [${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app
>
  • 编写Java程序OozieWorkflowSubmit,构建OozieClient对象,将属性配置文件中Key/Value设置到Properties中,当运行Workflow时传递properties对象,具体代码如下所示:

[AppleScript] 纯文本查看 复制代码
package cn.itcast.bigdata.oozie.workflow;
​
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
import org.apache.oozie.client.WorkflowJob;
​
import java.util.Properties;
​
/**
* 调用Oozie Java API提交执行WorkFlow工作流
*/
public class OozieWorkflowSubmit {
​
    public static void main(String[] args) throws OozieClientException, InterruptedException {
​
        String OOZIE_URL = "http://bigdata-cdh01.itcast.cn:11000/oozie/" ;
​
        // TODO: 1. 构建OozieClient 客户端实例对象
        OozieClient oozieClient = new OozieClient(OOZIE_URL) ;
​
        // TODO: 2. 设置Workflow相关配置参数值
        Properties jobConf = oozieClient.createConfiguration() ;
        // 2.1. 系统参数设置
        jobConf.setProperty("oozie.use.system.libpath", "true") ;
        jobConf.setProperty("user.name", "root") ;
        jobConf.setProperty("oozie.libpath",
                "hdfs://bigdata-cdh01.itcast.cn:8020/user/root/share/lib/lib_20190723215106/spark2") ;
        // 2.2. 必要参数信息
        jobConf.setProperty("nameNode", "hdfs://bigdata-cdh01.itcast.cn:8020") ;
        jobConf.setProperty("jobTracker", "bigdata-cdh01.itcast.cn:8032") ;
        jobConf.setProperty("queueName", "default") ;
        // 2.3. 应用提交运行yarn参数
        jobConf.setProperty("master", "yarn") ;
        jobConf.setProperty("mode", "cluster") ;
        jobConf.setProperty("sparkOptions",
                " --driver-memory 512m " +
                "--executor-memory 512m " +
                "--num-executors 1 " +
                "--executor-cores 1 " +
                "--conf spark.yarn.historyServer.address=http://bigdata-cdh01.itcast.cn:18080 " +
                "--conf spark.eventLog.enabled=true " +
                "--conf spark.eventLog.dir=hdfs://bigdata-cdh01.itcast.cn:8020/spark/eventLogs " +
                "--conf spark.yarn.jars=hdfs://bigdata-cdh01.itcast.cn:8020/spark/jars/*"
        ) ;
        jobConf.setProperty("mainClass", "org.apache.spark.examples.SparkPi") ;
        jobConf.setProperty("appName", "SparkExamplePi") ;
        jobConf.setProperty("jarPath",
                "hdfs://bigdata-cdh01.itcast.cn:8020/user/root/oozie_works/wf_yarn_pi/lib/spark-examples_2.11-2.2.0.jar") ;
        jobConf.setProperty("appParam", "10") ;
        // 2.4. Oozie Workflow 参数
        jobConf.setProperty(OozieClient.APP_PATH,
                "hdfs://bigdata-cdh01.itcast.cn:8020/user/root/oozie_works/wf_yarn_pi/workflow.xml") ;
​
        // TODO: 3. 提交执行Oozie Workflow,返回应用提交JobID
        String jobId = oozieClient.run(jobConf) ;
        System.out.println("JobId = " + jobId);
​
        // TODO: 4. 依据JobID获取转态信息
        while (oozieClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) {
            System.out.println("Workflow job running ...");
            Thread.sleep(10 * 1000);
        }
        System.out.println("Workflow job completed ...");
    }
}

               


4.3、Coordinator Submit
                修改上述代码,添加定时调度时间设置及执行Coordinator配置文件,提交执行即可,具体如下:
[AppleScript] 纯文本查看 复制代码
package cn.itcast.bigdata.oozie.schedule;
​
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
import org.apache.oozie.client.WorkflowJob;
​
import java.util.Properties;
​
/**
* 调用Oozie Java API提交执行Coordinator工作流
*/
public class OozieCoordinatorSubmit {
​
    public static void main(String[] args) throws OozieClientException, InterruptedException {
​
        String OOZIE_URL = "http://bigdata-cdh01.itcast.cn:11000/oozie/" ;
​
        // TODO: 1. 构建OozieClient 客户端实例对象
        OozieClient oozieClient = new OozieClient(OOZIE_URL) ;
​
        // TODO: 2. 设置Workflow相关配置参数值
        Properties jobConf = oozieClient.createConfiguration() ;
        // 2.1. 系统参数设置
        jobConf.setProperty("oozie.use.system.libpath", "true") ;
        jobConf.setProperty("user.name", "root") ;
        jobConf.setProperty("oozie.libpath",
                "hdfs://bigdata-cdh01.itcast.cn:8020/user/root/share/lib/lib_20190723215106/spark2") ;
        // 2.2. 必要参数信息
        jobConf.setProperty("nameNode", "hdfs://bigdata-cdh01.itcast.cn:8020") ;
        jobConf.setProperty("jobTracker", "bigdata-cdh01.itcast.cn:8032") ;
        jobConf.setProperty("queueName", "default") ;
        // 2.3. 应用提交运行yarn参数
        jobConf.setProperty("master", "yarn") ;
        jobConf.setProperty("mode", "cluster") ;
        jobConf.setProperty("sparkOptions",
                " --driver-memory 512m " +
                "--executor-memory 512m " +
                "--num-executors 1 " +
                "--executor-cores 1 " +
                "--conf spark.yarn.historyServer.address=http://bigdata-cdh01.itcast.cn:18080 " +
                "--conf spark.eventLog.enabled=true " +
                "--conf spark.eventLog.dir=hdfs://bigdata-cdh01.itcast.cn:8020/spark/eventLogs " +
                "--conf spark.yarn.jars=hdfs://bigdata-cdh01.itcast.cn:8020/spark/jars/*"
        ) ;
        jobConf.setProperty("mainClass", "org.apache.spark.examples.SparkPi") ;
        jobConf.setProperty("appName", "SparkExamplePi") ;
        jobConf.setProperty("jarPath",
                "hdfs://bigdata-cdh01.itcast.cn:8020/user/root/oozie_works/cron_yarn_pi/lib/spark-examples_2.11-2.2.0.jar") ;
        jobConf.setProperty("appParam", "10") ;
        // 2.4. 定时任务设置
        jobConf.setProperty("start", "2019-11-17T17:42+0800");
        jobConf.setProperty("freq", "0/3 * * * *");
        jobConf.setProperty("end", "2019-11-17T17:50+0800");
        // 2.5. Oozie Workflow 参数
        jobConf.setProperty("appPath",
                "hdfs://bigdata-cdh01.itcast.cn:8020/user/root/oozie_works/cron_yarn_pi") ;
        jobConf.setProperty(OozieClient.COORDINATOR_APP_PATH,
                "hdfs://bigdata-cdh01.itcast.cn:8020/user/root/oozie_works/cron_yarn_pi/coordinator.xml") ;
​
        // TODO: 3. 提交执行Oozie Workflow,返回应用提交JobID
        String jobId = oozieClient.run(jobConf) ;
        System.out.println("JobId = " + jobId);
​
        // TODO: 4. 依据JobID获取转态信息
        while (oozieClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) {
            System.out.println("Workflow job running ...");
            Thread.sleep(10 * 1000);
        }
        System.out.println("Workflow job completed ...");
    }
}





0 个回复

您需要登录后才可以回帖 登录 | 加入黑马