本文为您介绍MySQL升级为PolarDB后,如何将之前从MySQL同步到MaxCompute的任务批量修改为从PolarDB同步到MaxCompute。
使用限制
目前仅适用于客户场景为MySQL升级到PolarDB时使用,其他任何数据源替换场景均不适用。
说明
因为API中直接修改原始配置文件JSON存在风险,因此不建议作为其他业务目的使用,否则会导致同步任务运行出错甚至出现数据质量问题。
POM 依赖示例代码
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.5.20</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-dataworks-public</artifactId>
<version>3.4.4</version>
</dependency>
Java Sdk调用示例代码
package com.alibaba.eas;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.dataworks_public.model.v20200518.*;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import java.util.List;
public class updateOfflineTask {
public static ListFilesResponse.Data.File ListFiles(String filePath, String fileName) throws Exception {
ListFilesRequest request = new ListFilesRequest();
request.setProjectId(1911L);
request.setFileFolderPath(filePath);
request.setKeyword(fileName);
ListFilesResponse response1 = client.getAcsResponse(request);
for(int i = 0 ; i < response1.getData().getFiles().size(); ++i) {
return response1.getData().getFiles().get(i);
}
return null;
}
public static String GetFiles(Long fileId) throws Exception {
GetFileRequest request = new GetFileRequest();
request.setProjectId(1911L);
request.setFileId(fileId);
GetFileResponse response1 = client.getAcsResponse(request);
return response1.getData().getFile().getContent();
}
public static void UpdateDISyncTask(Long fileId, String content) throws Exception {
UpdateDISyncTaskRequest request = new UpdateDISyncTaskRequest();
request.setProjectId(1911L);
request.setFileId(fileId);
request.setTaskContent(content);
request.setTaskType("DI_OFFLINE");
UpdateDISyncTaskResponse response1 = client.getAcsResponse(request);
}
public static Long submitFile(Long fileId) throws Exception {
SubmitFileRequest request = new SubmitFileRequest();
request.setProjectId(1911L);
request.setFileId(fileId);
SubmitFileResponse acsResponse = client.getAcsResponse(request);
Long deploymentId = acsResponse.getData();
return deploymentId;
}
public static void getDeployment(Long deploymentId) throws Exception {
GetDeploymentRequest request = new GetDeploymentRequest();
request.setProjectId(1911L);
request.setDeploymentId(deploymentId);
GetDeploymentResponse acsResponse = client.getAcsResponse(request);
System.out.println(acsResponse.getData().getDeployment().getStatus());
}
public static Long deploy(Long fileId) throws Exception {
DeployFileRequest request = new DeployFileRequest();
request.setProjectId(1911L);
request.setFileId(fileId);
DeployFileResponse acsResponse = client.getAcsResponse(request);
Long deploymentId = acsResponse.getData();
return deploymentId;
}
public static Long listNode(String nodeName) throws Exception {
ListNodesRequest request = new ListNodesRequest();
request.setProjectId(1911L);
request.setNodeName(nodeName);
request.setProjectEnv("PROD");
ListNodesResponse acsResponse = client.getAcsResponse(request);
List<ListNodesResponse.Data.NodesItem> nodesItemList = acsResponse.getData().getNodes();
return nodesItemList.get(0).getNodeId();
}
public static void RunCycleDagNodes(Long nodeId) throws Exception {
RunCycleDagNodesRequest request = new RunCycleDagNodesRequest();
request.setIncludeNodeIds(nodeId.toString());
request.setName("rerun_job");
request.setParallelism(false);
request.setProjectEnv("PROD");
request.setRootNodeId(nodeId);
request.setStartBizDate("2021-09-29 00:00:00");
request.setEndBizDate("2021-09-29 00:00:00");
request.setProjectEnv("PROD");
RunCycleDagNodesResponse acsResponse = client.getAcsResponse(request);
}
/*
把下面的配置中的 mysql 和 mysql_from_polardb 进行替换,
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "mysql",
"parameter": {
"envType": 0,
"datasource": "mysql_from_polardb",
"column": [
"id",
"name",
"create_time",
"create_user"
],
"tableComment": "配置表",
"connection": [
{
"selectedDatabase": "polardb_db1",
"datasource": "mysql_from_polardb",
"table": [
"lcl_test_demo"
]
}
],
"where": "",
"splitPk": "id",
"encoding": "UTF-8"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "odps",
"parameter": {
"partition": "pt=${bizdate}",
"truncate": true,
"datasource": "odps_source",
"envType": 0,
"column": [
"id",
"name",
"create_time",
"create_user"
],
"emptyAsNull": false,
"tableComment": "配置表",
"table": "lcl_test_demo"
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""
},
"locale": "zh_CN",
"speed": {
"throttle": false,
"concurrent": 2
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
替换完之后的结果是
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "polardb",
"parameter": {
"envType": 0,
"datasource": "polardb",
"column": [
"id",
"name",
"create_time",
"create_user"
],
"tableComment": "配置表",
"connection": [
{
"selectedDatabase": "polardb_db1",
"datasource": "polardb",
"table": [
"lcl_test_demo"
]
}
],
"where": "",
"splitPk": "id",
"encoding": "UTF-8"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "odps",
"parameter": {
"partition": "pt=${bizdate}",
"truncate": true,
"datasource": "odps_source",
"envType": 0,
"column": [
"id",
"name",
"create_time",
"create_user"
],
"emptyAsNull": false,
"tableComment": "配置表",
"table": "lcl_test_demo"
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""
},
"locale": "zh_CN",
"speed": {
"throttle": false,
"concurrent": 2
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
*/
public static String modifyContent(String content, String newStepType, String newDatasource){
JSONObject jsonObject = JSON.parseObject(content);
JSONArray steps = jsonObject.getJSONArray("steps");
if (steps != null) {
for (int i = 0; i < steps.size(); ++i) {
JSONObject step = steps.getJSONObject(i);
if (step != null && step.getString("category") != null && "reader".equals(step.getString("category"))) {
if (step.getString("stepType") != null && "mysql".equals(step.getString("stepType"))) {
step.put("stepType", newStepType);
JSONObject parameter = step.getJSONObject("parameter");
if (parameter != null) {
parameter.put("datasource", newDatasource);
JSONArray connections = parameter.getJSONArray("connection");
if (connections != null) {
for (int j = 0; j < connections.size(); ++j) {
JSONObject connection = connections.getJSONObject(j);
if (connection != null) {
connection.put("datasource", newDatasource);
}
}
}
}
}
}
}
}
return jsonObject.toJSONString();
}
static IAcsClient client;
public static void main(String[] args) throws Exception {
String akId = "XXX";
String akSecret = "XXX";
String regionId = "cn-chengdu";
IClientProfile profile = DefaultProfile.getProfile(regionId, akId, akSecret);
DefaultProfile.addEndpoint(regionId, "dataworks-public", "dataworks." + regionId + ".aliyuncs.com");
client = new DefaultAcsClient(profile);
String folderPath = "业务流程/重兴/数据集成/";
String filename = "mysql_to_odps";
ListFilesResponse.Data.File file = ListFiles(folderPath, filename);
Long fileId = file.getFileId();
System.out.println(file.getFileId());
String content = GetFiles(fileId);
String contentModified = modifyContent(content, "polardb", "polardb_datasource");
// "polardb" 代表替换的目标数据源,这里的实例是把 mysql 替换成 polardb
//
UpdateDISyncTask(file.getFileId(), contentModified);
Long deployId = submitFile(fileId);
getDeployment(deployId);
Thread.sleep(10000);
getDeployment(deployId);
deployId = deploy(fileId);
getDeployment(deployId);
Thread.sleep(10000);
getDeployment(deployId);
Long nodeId = listNode(filename);
RunCycleDagNodes(nodeId);
}
}
文档内容是否对您有帮助?