Azkaban调度Spark
Azkaban是一个批量工作流任务调度器,可以构建、执行和管理包含复杂依赖关系的工作流。您可以在Azkaban Web界面调度云原生数据仓库 AnalyticDB MySQL 版的Spark作业。
前提条件
AnalyticDB for MySQL集群的产品系列为企业版、基础版或湖仓版。
AnalyticDB for MySQL集群中已创建Job型资源组或Spark引擎的Interactive型资源组。
已安装Beeline。
已将运行Azkaban的服务器IP地址添加至AnalyticDB for MySQL集群的白名单中。
调度Spark SQL作业
AnalyticDB for MySQL支持使用批处理和交互式两种方法执行Spark SQL。选择的执行方式不同,调度的操作步骤也有所不同。详细步骤如下:
批处理
- 说明
您只需要配置
keyId
、secretId
、regionId
、clusterId
和rgName
这些必填参数。 编写工作流文件,并将其所在的文件夹压缩成ZIP格式。
nodes: - name: SparkPi type: command config: command: /<your path>/adb-spark-toolkit-submit/bin/spark-submit --class com.aliyun.adb.spark.sql.OfflineSqlTemplate local:///opt/spark/jars/offline-sql.jar "show databases" "select 100" dependsOn: - jobA - jobB - name: jobA type: command config: command: echo "This is an echoed text." - name: jobB type: command config: command: pwd
重要<your path>
需替换为Spark-Submit命令行工具的实际安装路径。command节点的命令输入请不要使用续行符(\)。
创建项目并上传步骤2压缩的工作流文件。
访问Azkaban Web界面,在顶部导航栏单击Project。
单击页面右上角的Create Project。
在弹出的Create Project对话框中配置Name、Description参数。
单击页面右上角的Upload。
在弹出的Upload Project Files对话框中上传工作流文件,单击Upload。
运行工作流。
在Project页面,单击Flows页签。
单击Execute Flow。
单击Execute。
在弹出的Flow submitted对话框中,单击Continue。
查看工作流详细信息。
在顶部导航栏单击Executing。
单击Recently Finished页签。
单击Execution Id,并单击Job List页签查看每一个Job的执行详情。
单击Log查看Job的日志信息。
交互式
获取Spark Interactive型资源组的连接地址。
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。
在左侧导航栏,单击,单击资源组管理页签。
单击对应资源组操作列的详情,查看内网连接地址和公网连接地址。您可单击端口号括号内的
按钮,复制连接地址。
以下两种情况,您需要单击公网地址后的申请网络,手动申请公网连接地址。
提交Spark SQL作业的客户端工具部署在本地。
提交Spark SQL作业的客户端工具部署在ECS上,且ECS与AnalyticDB for MySQL不属于同一VPC。
编写工作流文件,并将其所在的文件夹压缩成ZIP格式。
nodes: - name: jobB type: command config: command: <path> -u "jdbc:hive2://amv-t4n83e67n7b****sparkwho.ads.aliyuncs.com:10000/adb_demo" -n spark_interactive_prod/spark_user -p "spark_password" -e "show databases;show tables;" dependsOn: - jobA - name: jobA type: command config: command: <path> -u "jdbc:hive2://amv-t4n83e67n7b****sparkwho.ads.aliyuncs.com:10000/adb_demo" -n spark_interactive_prod/spark_user -p "spark_password" -e "show tables;"
参数说明:
参数
说明
path
需替换为Beeline客户端所在的路径。例如:
/path/to/spark/bin/beeline
。-u
请填写步骤1中获取的连接地址。连接地址中的
default
需替换为实际的数据库名,并且需要删除连接地址中的resource_group=<资源组名称>
后缀。例如:
jdbc:hive2://amv-t4naxpqk****sparkwho.ads.aliyuncs.com:10000/adb_demo
。-n
AnalyticDB for MySQL的数据库账号及资源组名称。格式为
资源组名称/数据库账号名称
。例如:本文示例资源组名称为spark_interactive_prod,数据库账号名称为spark_user,此处填写为
spark_interactive_prod/spark_user
。-p
AnalyticDB for MySQL数据库账号的密码。
-e
业务具体的SQL语句。多条SQL使用英文分号(;)分隔。
创建项目并上传步骤1压缩的工作流文件。
访问Azkaban Web界面,在顶部导航栏单击Project。
单击页面右上角的Create Project。
在弹出的Create Project对话框中配置Name、Description参数。
单击页面右上角的Upload。
在弹出的Upload Project Files对话框中上传工作流文件,单击Upload。
运行工作流。
在Project页面,单击Flows页签。
单击Execute Flow。
单击Execute。
在弹出的Flow submitted对话框中,单击Continue。
查看工作流详细信息。
在顶部导航栏单击Executing。
单击Recently Finished页签。
单击Execution Id,并单击Job List页签查看每一个Job的执行详情。
单击Log查看Job的日志信息。
调度Spark Jar作业
- 说明
您只需要配置
keyId
、secretId
、regionId
、clusterId
和rgName
这些必填参数。如果您的Spark Jar包在本地,还需要配置ossUploadPath
等OSS相关参数。 编写工作流文件,并将其所在的文件夹压缩成ZIP格式。
nodes: - name: SparkPi type: command config: command: /<your path>/adb-spark-toolkit-submit/bin/spark-submit --class org.apache.spark.examples.SparkPi --name SparkPi --conf spark.driver.resourceSpec=medium --conf spark.executor.instances=2 --conf spark.executor.resourceSpec=medium local:///tmp/spark-examples.jar 1000 dependsOn: - jobA - jobB - name: jobA type: command config: command: echo "This is an echoed text." - name: jobB type: command config: command: pwd
重要<your path>
需替换为Spark-Submit命令行工具的实际安装路径。command节点的命令输入请不要使用续行符(\)。
创建项目并上传步骤2压缩的工作流文件。
访问Azkaban Web界面,在顶部导航栏单击Project。
单击页面右上角的Create Project。
在弹出的Create Project对话框中配置Name、Description参数。
单击页面右上角的Upload。
在弹出的Upload Project Files对话框中上传工作流文件,单击Upload。
运行工作流。
在Project页面,单击Flows页签。
单击Execute Flow。
单击Execute。
在弹出的Flow submitted对话框中,单击Continue。
查看工作流详细信息。
在顶部导航栏单击Executing。
单击Recently Finished页签。
单击Execution Id,并单击Job List页签查看每一个Job的执行详情。
单击Log查看Job的日志信息。