DataWorks是基于MaxCompute作为计算和存储引擎的海量数据离线加工分析平台,用于工作流可视化开发和托管调度运维,支持按照时间和依赖关系的任务全面托管调度。您可以在DataWorks中,通过Shell节点或自定义节点调度和管理云原生数据仓库 AnalyticDB MySQL 版的Spark作业。
前提条件
AnalyticDB for MySQL集群的产品系列为企业版、基础版或湖仓版。
已在AnalyticDB for MySQL集群中创建Job型资源组。具体操作,请参见新建资源组。
AnalyticDB for MySQL集群与OSS存储空间位于相同地域。
已创建AnalyticDB for MySQL集群的数据库账号。
如果是通过阿里云账号访问,只需创建高权限账号。具体操作,请参见创建高权限账号。
如果是通过RAM用户访问,需要创建高权限账号和普通账号并且将RAM用户绑定到普通账号上。具体操作,请参见创建数据库账号和绑定或解绑RAM用户与数据库账号。
已开通DataWorks服务,并在AnalyticDB for MySQL同一地域创建工作空间。具体操作,请参见开通DataWorks服务和创建工作空间。
通过自定义节点在DataWorks中调度Spark作业时,DataWorks需为企业版。
通过Shell节点在DataWorks中调度Spark作业时,需购买并为工作空间绑定独享调度资源组。具体操作,请参见新增和使用独享调度资源组。
准备工作
通过DataWorks调度Spark作业依赖DataXADBSparkPlugin-1.0.0.jar
包,您需下载DataXADBSparkPlugin-1.0.0.jar包并将其上传至OSS。
通过Shell节点调度Spark作业(推荐)
通过Shell节点调度Spark作业时,您可以选择如下两种方法:
全局配置:通过运维助手全局配置
clusterId
、resourceGroup
、regionId
等参数和DataXADBSparkPlugin-1.0.0.jar
包所在的OSS路径,再提交Spark作业。单个Spark作业配置:提交某个Spark作业时在该业务代码中配置
clusterId
、resourceGroup
、regionId
等参数和DataXADBSparkPlugin-1.0.0.jar
包所在的OSS路径。
全局配置
通过运维助手安装第三方包时,需手动输入以下命令。具体操作,请参见运维助手。
rm -rf /home/admin/usertools/tools/.adb.config touch /home/admin/usertools/tools/.adb.config echo '[Credentials]' > /home/admin/usertools/tools/.adb.config echo 'endpoint = oss-cn-hangzhou.aliyuncs.com' >> /home/admin/usertools/tools/.adb.config echo 'accessKeyID = LTAI5U58FDtHnS3NzBXy****' >> /home/admin/usertools/tools/.adb.config echo 'accessKeySecret = 1Bvozs3DJDNSIRY57VVNPFXUt****' >> /home/admin/usertools/tools/.adb.config echo 'clusterId = amv-uf64134gxtjt****' >> /home/admin/usertools/tools/.adb.config echo 'resourceGroup = job' >> /home/admin/usertools/tools/.adb.config echo 'regionId = cn-hangzhou' >> /home/admin/usertools/tools/.adb.config cd /home/admin/usertools/tools/ rm -rf DataXADBSparkPlugin-1.0.0.jar wget --no-check-certificate https://testBucketname.oss-cn-hangzhou.aliyuncs.com/DataWorks/DataXADBSparkPlugin-1.0.0.jar
参数说明:
参数
说明
endpoint
OSS的Endpoint(地域节点)。
如何查看OSS的Endpoint,请参见访问域名。
accessKeyID
阿里云账号或具备AnalyticDB for MySQL访问权限的RAM用户的AccessKey ID。
如何获取AccessKey ID和AccessKey Secret,请参见账号与权限。
accessKeySecret
阿里云账号或具备AnalyticDB for MySQL访问权限的RAM用户的AccessKey Secret。
如何获取AccessKey ID和AccessKey Secret,请参见账号与权限。
clusterId
AnalyticDB for MySQL的集群ID。
resourceGroup
AnalyticDB for MySQL的Job型资源组名称。
regionId
AnalyticDB for MySQL集群的地域ID。
wget
下载
DataXADBSparkPlugin-1.0.0.jar
包。如果
DataXADBSparkPlugin-1.0.0.jar
包的读写权限为公共读,则使用wget --no-check-certificate https://testBucketname.oss-cn-hangzhou.aliyuncs.com/DataWorks/DataXADBSparkPlugin-1.0.0.jar
命令下载。如果
DataXADBSparkPlugin-1.0.0.jar
包的读写权限为非公共读,则使用wget -O DataXADBSparkPlugin-1.0.0.jar "https://testBucketname.oss-cn-hangzhou.aliyuncs.com/DataWorks/DataXADBSparkPlugin-1.0.0.jar?Expires=1722****"
命令下载。
前缀为
https
的连接串是DataXADBSparkPlugin-1.0.0.jar
包的URL,如何获取DataXADBSparkPlugin-1.0.0.jar
包的URL,请参见获取单个文件的URL。创建业务流程。具体操作,请参见创建业务流程。
新建节点。
右击业务流程,选择新建节点>Shell。
在弹出的新建节点对话框中配置对应参数,单击确定。
节点创建完成后,在右侧代码框中输入执行语句,单击按钮执行。
Spark SQL作业
#!/bin/bash classpath="/home/admin/usertools/tools/DataXADBSparkPlugin-1.0.0.jar" if [ -f "$classpath" ]; then echo "Jar exists" else echo "Jar not exists" exit 1 fi mainclass="com.aliyun.adb.spark.datax.ADBSparkShellMain" conf='{ "AppType": "SQL", "SQL": "SHOW DATABASE; SHOW TABLES" }' java -cp "$classpath" "$mainclass" "$conf"
参数说明:
参数
说明
AppType
Spark SQL作业请填写SQL。
SQL
业务具体的SQL语句,多个SQL语句用英文分号(;)隔开。
Spark Jar作业
#!/bin/bash classpath="/home/admin/usertools/tools/DataXADBSparkPlugin-1.0.0.jar" if [ -f "$classpath" ]; then echo "Jar exists" else echo "Jar not exists" exit 1 fi mainclass="com.aliyun.adb.spark.datax.ADBSparkShellMain" conf='{ "args": [ "1000" ], "file": "oss://testBucketname/spark-examples.jar", "name": "SparkPi", "className": "org.apache.spark.examples.SparkPi", "conf": { "spark.driver.resourceSpec": "medium", "spark.executor.instances": 2, "spark.executor.resourceSpec": "medium" } }' java -cp "$classpath" "$mainclass" "$conf"
参数说明:
参数
说明
args
请根据业务需求,填写使用JAR包时需要使用的参数。多个参数之间以英文逗号(,)分隔。
file
Spark作业依赖Jar包所在的路径。
name
Spark应用名称。
className
Java程序入口类名称。
conf其他参数
与开源Spark中的配置项基本一致,参数格式为
key: value
形式,多个参数之间以英文逗号(,)分隔。与开源Spark用法不一致的配置参数及AnalyticDB for MySQL特有的配置参数,请参见Spark应用配置参数说明。运行成功后,在下方运行日志中查看Spark作业的执行结果和日志信息。
单个Spark作业配置
创建业务流程。具体操作,请参见创建业务流程。
新建节点。
右击业务流程,选择新建节点>Shell。
在弹出的新建节点对话框中配置对应参数,单击确定。
节点创建完成后,在右侧代码框中输入执行语句,单击按钮执行。
Spark SQL作业
#!/bin/bash /home/admin/usertools/tools/ossutil64 -e oss-cn-hangzhou-internal.aliyuncs.com -i LTAI5U58FDtHnS3NzBXy**** -k 1Bvozs3DJDNSIRY57VVNPFXUt**** cp oss://testBucketname/DataXADBSparkPlugin-1.0.0.jar DataXADBSparkPlugin-1.0.0.jar if [[ $? == 0 ]];then echo "get oss file success" else echo "get oss file failed" exit 1 fi if [ -n "$JAVA_HOME" ]; then echo "JAVA_HOME configured." else echo "JAVA_HOME not configured." exit 1 fi current_dir=$(pwd) classpath="$current_dir/DataXADBSparkPlugin-1.0.0.jar" if [ -f "$classpath" ]; then echo "Jar exists" else echo "Jar not exists" exit 1 fi mainclass="com.aliyun.adb.spark.datax.ADBSparkShellMain" conf='{ "ClusterId": "amv-uf64134gxtjt****", "ResourceGroup": "job", "RegionId": "cn-hangzhou", "AccessKeyId": "LTAI5U58FDtHnS3NzBXy****", "AccessKeySec": "1Bvozs3DJDNSIRY57VVNPFXUt****", "AppType": "SQL", "SQL": "SHOW DATABASE; SHOW TABLES" }' java -cp "$classpath" "$mainclass" "$conf"
Spark Jar作业
#!/bin/bash /home/admin/usertools/tools/ossutil64 -e oss-cn-hangzhou-internal.aliyuncs.com -i LTAI5U58FDtHnS3NzBXy**** -k 1Bvozs3DJDNSIRY57VVNPFXUt**** cp oss://testBucketname/DataXADBSparkPlugin-1.0.0.jar DataXADBSparkPlugin-1.0.0.jar if [[ $? == 0 ]];then echo "get oss file success" else echo "get oss file failed" exit 1 fi if [ -n "$JAVA_HOME" ]; then echo "JAVA_HOME configured." else echo "JAVA_HOME not configured." exit 1 fi current_dir=$(pwd) classpath="$current_dir/DataXADBSparkPlugin-1.0.0.jar" if [ -f "$classpath" ]; then echo "Jar exists" else echo "Jar not exists" exit 1 fi mainclass="com.aliyun.adb.spark.datax.ADBSparkShellMain" conf='{ "ClusterId": "amv-uf64134gxtjt****", "ResourceGroup": "job", "RegionId": "cn-hangzhou", "AccessKeyId": "LTAI5U58FDtHnS3NzBXy****", "AccessKeySec": "1Bvozs3DJDNSIRY57VVNPFXUt****", "args": [ "1000" ], "file": "oss://testBucketname/spark-examples.jar", "name": "SparkPi", "className": "org.apache.spark.examples.SparkPi", "conf": { "spark.driver.resourceSpec": "medium", "spark.executor.instances": 2, "spark.executor.resourceSpec": "medium" } }' java -cp "$classpath" "$mainclass" "$conf"
参数说明:
参数类型
参数
说明
公共参数
ClusterId
AnalyticDB for MySQL的集群ID。
ResourceGroup
AnalyticDB for MySQL的Job型资源组名称。
RegionId
AnalyticDB for MySQL集群的地域ID。
AccessKeyId
阿里云账号或具备AnalyticDB for MySQL访问权限的RAM用户的AccessKey ID。
如何获取AccessKey ID和AccessKey Secret,请参见账号与权限。
AccessKeySec
阿里云账号或具备AnalyticDB for MySQL访问权限的RAM用户的AccessKey Secret。
如何获取AccessKey ID和AccessKey Secret,请参见账号与权限。
Spark SQL作业参数
AppType
Spark作业的类型。此处为SQL。
SQL
业务具体的SQL语句,多个SQL语句用英文分号(;)隔开。
Spark Jar作业参数
args
请根据业务需求,填写使用JAR包时需要使用的参数。多个参数之间以英文逗号(,)分隔。
file
Spark作业依赖Jar包所在的OSS路径。
name
Spark应用名称。
className
Java程序入口类名称。
conf其他参数
与开源Spark中的配置项基本一致,参数格式为
key: value
形式,多个参数之间以英文逗号(,)分隔。与开源Spark用法不一致的配置参数及AnalyticDB for MySQL特有的配置参数,请参见Spark应用配置参数说明。运行成功后,在下方运行日志中查看Spark作业的执行结果和日志信息。
通过自定义节点调度Spark作业
创建自定义节点插件。具体操作,请参见新增节点插件。
插件类型选择引擎型。
资源文件需上传准备工作中下载的
DataXADBSparkPlugin-1.0.0.jar
包。类名需填写为
com.aliyun.adb.spark.datax.ADBSparkWrapper
。参数模板填写为
{}
即可。
创建自定义节点。具体操作,请参见新增自定义节点。
插件类型选择为引擎型。
在选择插件下拉列表中选择步骤1创建的自定义插件。
创建业务流程。具体操作,请参见创建业务流程。
新建节点。
右击业务流程,选择新建节点。
选择步骤2创建的自定义节点。
在弹出的新建节点对话框中配置对应参数,单击确定。
节点创建完成后,在右侧代码框中输入执行语句,单击按钮执行。
Spark SQL作业
{ "ClusterId": "amv-uf64134gxtjt****", "ResourceGroup": "job", "RegionId": "cn-hangzhou", "AccessKeyId": "LTAI5U58FDtHnS3NzBXy****", "AccessKeySec": "1Bvozs3DJDNSIRY57VVNPFXUt****", "AppType": "SQL", "SQL": "SHOW DATABASE; SHOW TABLES" }
Spark Jar作业
{ "ClusterId": "amv-uf64134gxtjt****", "ResourceGroup": "job", "RegionId": "cn-hangzhou", "AccessKeyId": "LTAI5U58FDtHnS3NzBXy****", "AccessKeySec": "1Bvozs3DJDNSIRY57VVNPFXUt****", "args": [ "1000" ], "file": "oss://testBucketname/spark-examples.jar", "name": "SparkPi", "className": "org.apache.spark.examples.SparkPi", "conf": { "spark.driver.resourceSpec": "medium", "spark.executor.instances": 2, "spark.executor.resourceSpec": "medium" } }
参数说明:
参数类型
参数
说明
公共参数
ClusterId
AnalyticDB for MySQL的集群ID。
ResourceGroup
AnalyticDB for MySQL的Job型资源组名称。
RegionId
AnalyticDB for MySQL集群的地域ID。
AccessKeyId
阿里云账号或具备AnalyticDB for MySQL访问权限的RAM用户的AccessKey ID。
如何获取AccessKey ID和AccessKey Secret,请参见账号与权限。
AccessKeySec
阿里云账号或具备AnalyticDB for MySQL访问权限的RAM用户的AccessKey Secret。
如何获取AccessKey ID和AccessKey Secret,请参见账号与权限。
Spark SQL作业参数
AppType
Spark作业的类型。此处为SQL。
SQL
业务具体的SQL语句,多个SQL语句用英文分号(;)隔开。
Spark Jar作业参数
args
请根据业务需求,填写使用JAR包时需要使用的参数。多个参数之间以英文逗号(,)分隔。
file
Spark作业依赖Jar包所在的OSS路径。
name
Spark应用名称。
className
Java程序入口类名称。
conf其他参数
与开源Spark中的配置项基本一致,参数格式为
key: value
形式,多个参数之间以英文逗号(,)分隔。与开源Spark用法不一致的配置参数及AnalyticDB for MySQL特有的配置参数,请参见Spark应用配置参数说明。运行成功后,在下方运行日志中查看Spark作业的执行结果和日志信息。