场景示例
本文介绍运行任务并查看结果、重试失败文件的完整流程。
场景一: 运行任务
以下示例用于创建通道、创建代理、创建数据地址、创建迁移任务,运行任务并查看运行结果。
说明
在不使用代理的情况下,请忽略下面代码中的创建通道和代理的部分。使用代理前需要先部署代理,详情请参见 代理管理。
package sample;
import com.aliyun.hcs_mgw20240626.Client;
import com.aliyun.hcs_mgw20240626.models.*;
import com.google.gson.Gson;
import java.util.LinkedList;
import java.util.List;
public class Demo {
/** 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。*/
static String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
static String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
/** 填写主账号ID。*/
static String userId = "11470***876***55";
private static final String AVAILABLE = "available";
private static final String JOBFINISH = "IMPORT_JOB_FINISHED";
public static void main(String[] args) {
// 填写数据地址名称。
String addressName = "exampleaddress";
try {
/** 步骤1:初始化 */
com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config();
// 这里以北京区域为例。
config.setEndpoint("mgw.cn-beijing.aliyuncs.com");
config.setRegionId(accessKeyId);
config.setAccessKeyId(accessKeyId);
config.setAccessKeySecret(accessKeySecret);
Client client = new Client(config);
//* 创建通道和代理,代理可以创建多个关联一个通道。 */
CreateTunnelRequest createTunnelRequest = new CreateTunnelRequest();
CreateTunnelInfo createTunnelInfo = new CreateTunnelInfo();
TunnelQos qos = new TunnelQos();
// MaxBandwidth, MaxQps 默认为0, 表示没有限制, MaxBandWidth的单位是bit,请按照实际需求填写。
qos.setMaxBandwidth(1073741824L);
qos.setMaxQps(1000);
createTunnelInfo.setTunnelQos(qos);
createTunnelRequest.setImportTunnel(createTunnelInfo);
CreateTunnelResponse createTunnelResponse = client.createTunnel(userId, createTunnelRequest);
String tunnelId = createTunnelResponse.headers.get("x-oss-import-tunnel-id");
// 填写代理名称。
String agentName = "exampleagent";
// 使用网络,公网请填写public, 专线或者VPN请填写vpc。
String agentEndpoint = "public";
// 部署方式,目前仅支持填写default。
String deployMethod = "default";
CreateAgentRequest createAgentRequest = new CreateAgentRequest();
CreateAgentInfo createAgentInfo = new CreateAgentInfo();
createAgentInfo.setName(agentName);
createAgentInfo.setTunnelId(tunnelId);
createAgentInfo.setAgentEndpoint(agentEndpoint);
createAgentInfo.setDeployMethod(deployMethod);
createAgentRequest.setImportAgent(createAgentInfo);
client.createAgent(userId, createAgentRequest);
/** 步骤2:创建源地址和目的地址并验证可用性,这里以创建源为oss,目的为oss的地址为例,其它源示例请参见数据地址篇创建数据地址小节。 */
String srcAddress = "examplesrcaddress";
String destAddress = "exampledestaddress";
AddressDetail addrssdetail = new AddressDetail();
addrssdetail.addressType = "oss";
// 以下参数请根据实际值填写。
addrssdetail.bucket = "examplebucket";
addrssdetail.prefix = "***/";
addrssdetail.regionId = "oss-cn-beijing";
addrssdetail.role = "rolename_xxxxx";
CreateAddressRequest createAddressRequest = new CreateAddressRequest();
CreateAddressInfo createAddressInfo = new CreateAddressInfo();
createAddressInfo.name = addressName;
createAddressInfo.setAddressDetail(addrssdetail);
createAddressRequest.setImportAddress(createAddressInfo);
client.createAddress(userId, createAddressRequest);
// 验证源端数据地址可用性。
VerifyAddressResponse verifyAddressResponse = client.verifyAddress(userId, srcAddress);
if (!AVAILABLE.equals(verifyAddressResponse.body.verifyAddressResponse.status)) {
System.out.printf("job status is %s", verifyAddressResponse.body.verifyAddressResponse.status);
return;
}
addrssdetail = new AddressDetail();
addrssdetail.addressType = "oss";
// 以下参数请根据实际值填写。
addrssdetail.bucket = "examplebucket";
addrssdetail.prefix = "***/";
addrssdetail.regionId = "oss-cn-beijing";
addrssdetail.role = "rolename_xxxxx";
createAddressRequest = new CreateAddressRequest();
createAddressInfo = new CreateAddressInfo();
createAddressInfo.name = addressName;
createAddressInfo.setAddressDetail(addrssdetail);
createAddressRequest.setImportAddress(createAddressInfo);
client.createAddress(userId, createAddressRequest);
verifyAddressResponse = client.verifyAddress(userId, destAddress);
if (!AVAILABLE.equals(verifyAddressResponse.body.verifyAddressResponse.status)) {
System.out.printf("job status is %s", verifyAddressResponse.body.verifyAddressResponse.status);
return;
}
/** 步骤3:创建并且启动任务 */
// 填写任务名称。
String jobName = "examplejob";
CreateJobInfo createJobInfo = new CreateJobInfo();
createJobInfo.name = jobName;
createJobInfo.srcAddress = srcAddress;
createJobInfo.destAddress = destAddress;
/*
overwriteMode和transferMode需要组合使用,具体组合含义如下
always,all 全覆盖
always,lastmodified 根据文件最后修改时间覆盖
never,all 不覆盖
*/
/* 文件覆盖方式, 可能得值为 1.never 2.always */
createJobInfo.overwriteMode = "always";
/* 文件传输的方式, 可能得值为 1.changed 2.all 3.lastmodified */
createJobInfo.transferMode = "lastmodified";
// 如果maxImportTaskQps值为0或者不设置,会被设置为默认值;MaxBandWidth值为0或者不设置,会被设置为默认值,maxBandWidth值单位为bits。
ImportQos importQos = new ImportQos();
// maxBandWidth,maxImportTaskQps 根据实际需求值填写。
importQos.maxBandWidth = 1073741824L;
importQos.maxImportTaskQps = 1000L;
createJobInfo.setImportQos(importQos);
// 配置调度规则,具体参数含义请参看API文档。
ScheduleRule scheduleRule = new ScheduleRule();
// maxScheduleCount,startCronExpression,suspendCronExpression 根据实际需求值填写。
scheduleRule.maxScheduleCount = 2L;
scheduleRule.startCronExpression = "0 0 10 * * ?";
scheduleRule.suspendCronExpression = "0 0 14 * * ?";
createJobInfo.setScheduleRule(scheduleRule);
// 配置过滤规则,具体参数含义请参看API文档。
FilterRule filterRule = new FilterRule();
// 文件过滤器,根据实际需求值填写。
KeyFilters keyFilters = new KeyFilters();
KeyFilterItem includekeyFilterItem = new KeyFilterItem();
List<String> includeRegex = new LinkedList<>();
includeRegex.add(".*.jpg");
includeRegex.add(".*.jpeg");
includekeyFilterItem.setRegex(includeRegex);
KeyFilterItem excludekeyFilterItem = new KeyFilterItem();
List<String> excludeRegex = new LinkedList<>();
excludeRegex.add(".*.txt");
excludeRegex.add(".*.js");
excludekeyFilterItem.setRegex(excludeRegex);
keyFilters.setIncludes(includekeyFilterItem);
keyFilters.setExcludes(excludekeyFilterItem);
filterRule.setKeyFilters(keyFilters);
// 时间过滤器, 时间格式遵循UTC时间格式,根据实际需求值填写。
LastModifiedFilters lastModifiedFilters = new LastModifiedFilters();
LastModifyFilterItem includeLastModifyFilterItem = new LastModifyFilterItem();
List<TimeFilter> includeRegexs = new LinkedList<>();
TimeFilter includeTimeFilter = new TimeFilter();
includeTimeFilter.startTime = "2006-01-01T00:00:00Z";
includeTimeFilter.endTime = "2006-12-31T23:59:59Z";
includeRegexs.add(includeTimeFilter);
includeLastModifyFilterItem.setTimeFilter(includeRegexs);
LastModifyFilterItem excludeLastModifyFilterItem = new LastModifyFilterItem();
List<TimeFilter> excludeRegexs = new LinkedList<>();
TimeFilter excludeTimeFilter = new TimeFilter();
excludeTimeFilter.startTime = "2008-01-01T00:00:00Z";
excludeTimeFilter.endTime = "2008-12-31T23:59:59Z";
excludeRegexs.add(excludeTimeFilter);
excludeLastModifyFilterItem.setTimeFilter(excludeRegexs);
lastModifiedFilters.setIncludes(includeLastModifyFilterItem);
lastModifiedFilters.setExcludes(excludeLastModifyFilterItem);
filterRule.setLastModifiedFilters(lastModifiedFilters);
createJobInfo.setFilterRule(filterRule);
CreateJobRequest createJobRequest = new CreateJobRequest();
createJobRequest.setImportJob(createJobInfo);
client.createJob(userId, createJobRequest);
UpdateJobRequest updateJobRequest = new UpdateJobRequest();
UpdateJobInfo updateJobInfo = new UpdateJobInfo();
updateJobInfo.setStatus("IMPORT_JOB_LAUNCHING");
updateJobRequest.setImportJob(updateJobInfo);
client.updateJob(userId, jobName, updateJobRequest);
/** 步骤4:循环查看当前任务状态 */
for(;;) {
GetJobResponse response = client.getJob(userId, jobName, new GetJobRequest());
if ("IMPORT_JOB_INTERRUPTED".equals(response.getBody().importJob.status)) {
System.out.println("job is interrupted");
return;
}
if ("IMPORT_JOB_FINISHED".equals(response.getBody().importJob.status)) {
System.out.println("job is finished");
break;
}
Thread.sleep(6000);
}
/** 步骤5:任务结束后查看结果。 */
ListJobHistoryRequest listJobHistoryRequest = new ListJobHistoryRequest();
ListJobHistoryResponse listJobHistoryResp = client.listJobHistory(userId, jobName, listJobHistoryRequest);
System.out.println(new Gson().toJson(listJobHistoryResp.body.jobHistoryList.jobHistory))
} catch (Exception e) {
e.printStackTrace();
}
}
}
场景二:重试失败文件
迁移任务运行完成后,可能有失败文件,迁移服务会为这些失败文件构造一份失败文件列表,以下示例先获取失败文件列表的详情,然后据此创建一个新的数据地址,再创建一个重试子任务。通过这种方式,您能够重新迁移这些失败的文件。
package sample;
import com.aliyun.hcs_mgw20240626.Client;
import com.aliyun.hcs_mgw20240626.models.*;
public class Demo {
/** 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。*/
static String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
static String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
/** 填写主账号ID。*/
static String userId = "11470***876***55";
private static final String AVAILABLE = "available";
private static final String JOBFINISH = "IMPORT_JOB_FINISHED";
private static final String INTERRUPT = "Interrupt";
private static final String NONEED = "NoNeed";
private static final String READY = "Ready";
private static final String SYSTEM = "SYSTEM";
private static final String LAUNCHING = "IMPORT_JOB_LAUNCHING";
public static void main(String[] args) {
try {
/** 步骤1:初始化 */
com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config();
// 这里以北京区域为例。
config.setEndpoint("mgw.cn-beijing.aliyuncs.com");
config.setRegionId(accessKeyId);
config.setAccessKeyId(accessKeyId);
config.setAccessKeySecret(accessKeySecret);
Client client = new Client(config);
// 下列参数值请根据实际需求填写。
int runtimeId = 1;
String jobName = "examplejob";
String srcAddress = "examplesrcaddress";
String destAddress = "exampledestaddress";
/** 步骤2:循环查看任务状态,查看任务是否结束或者中断。 */
for(;;) {
GetJobResponse response = client.getJob(userId, jobName, new GetJobRequest());
if ("IMPORT_JOB_INTERRUPTED".equals(response.getBody().importJob.status)) {
System.out.println("job is interrupted");
return;
}
if ("IMPORT_JOB_FINISHED".equals(response.getBody().importJob.status)) {
System.out.println("job is finished");
break;
}
Thread.sleep(6000);
}
/** 步骤3:ListJobHistory,查看是否有失败文件。 */
ListJobHistoryRequest listJobHistoryRequest = new ListJobHistoryRequest();
listJobHistoryRequest.runtimeId = runtimeId;
ListJobHistoryResponse listJobHistoryResponse = client.listJobHistory(userId, jobName, listJobHistoryRequest);
if (listJobHistoryResponse.body.jobHistoryList.jobHistory.size() < 1) {
System.out.println("list job history length less than 1");
return;
}
if (listJobHistoryResponse.body.jobHistoryList.jobHistory.get(0).failedCount <= 0) {
System.out.println("job no need retry");
return;
}
/** 步骤4:循环GetJobResult,等待Ready。 */
for (;;) {
GetJobResultRequest getJobResultRequest = new GetJobResultRequest();
getJobResultRequest.setRuntimeId(runtimeId);
GetJobResultResponse getJobResultResponse = client.getJobResult(userId, jobName, getJobResultRequest);
if ("NoNeed".equals(getJobResultResponse.body.importJobResult.readyRetry)) {
System.out.println("job no need retry");
return;
}
if ("Ready".equals(getJobResultResponse.body.importJobResult.readyRetry)) {
break;
}
Thread.sleep(6000);
}
retry(client, jobName, srcAddress, destAddress, runtimeId);
} catch (Exception e) {
e.printStackTrace();
}
}
static void retry(Client client, String jobName, String srcAddress, String destAddress, int runtimeId) throws Exception {
/** 步骤5:根据失败文件清单信息创建新的数据地址并且验证是否可用。 */
GetJobResultRequest getJobResultRequest = new GetJobResultRequest();
getJobResultRequest.runtimeId = runtimeId;
GetJobResultResponse getJobResultResponse = client.getJobResult(userId, jobName, getJobResultRequest);
GetJobResponse getJobResponse = client.getJob(userId, jobName, new GetJobRequest());
GetAddressResponse getAddressResponse = client.getAddress(userId, srcAddress);
CreateAddressRequest request = new CreateAddressRequest();
CreateAddressInfo info = new CreateAddressInfo();
long time = System.currentTimeMillis();
info.name = srcAddress + "_" + time + "_retry";
AddressDetail detail = new AddressDetail();
detail.addressType = getJobResultResponse.body.importJobResult.addressType;
detail.invLocation = getJobResultResponse.body.importJobResult.invLocation;
detail.invBucket = getJobResultResponse.body.importJobResult.invBucket;
detail.invAccessId = SYSTEM;
detail.invAccessSecret = SYSTEM;
detail.invPath = getJobResultResponse.body.importJobResult.invPath;
detail.invRegionId = getJobResultResponse.body.importJobResult.invRegionId;
detail.domain = getAddressResponse.body.importAddress.addressDetail.domain;
detail.regionId = getAddressResponse.body.importAddress.addressDetail.regionId;
detail.accessId = getAddressResponse.body.importAddress.addressDetail.accessId;
detail.accessSecret = getAddressResponse.body.importAddress.addressDetail.accessSecret;
detail.agentList = getAddressResponse.body.importAddress.addressDetail.agentList;
detail.role = getAddressResponse.body.importAddress.addressDetail.role;
detail.prefix = getAddressResponse.body.importAddress.getAddressDetail().prefix;
detail.invRole = getAddressResponse.body.importAddress.addressDetail.invRole;
detail.bucket = getAddressResponse.body.importAddress.addressDetail.bucket;
info.setAddressDetail(detail);
request.setImportAddress(info);
client.createAddress(userId, request);
VerifyAddressResponse verifyAddressResponse = client.verifyAddress(userId, info.name);
if (!verifyAddressResponse.body.verifyAddressResponse.status.equals(AVAILABLE)) {
throw new Exception("retry srcaddress unavailable");
}
/** 步骤6:创建重试子任务并启动。 */
CreateJobRequest createJobRequest = new CreateJobRequest();
CreateJobInfo createJobInfo = new CreateJobInfo();
createJobInfo.name = jobName + "_" + time + "_retry";
createJobInfo.transferMode = getJobResponse.body.importJob.transferMode;
createJobInfo.overwriteMode = getJobResponse.body.importJob.overwriteMode;
createJobInfo.srcAddress = info.name;
createJobInfo.destAddress = destAddress;
createJobInfo.parentVersion = getJobResultResponse.body.importJobResult.version;
createJobInfo.filterRule = getJobResponse.body.importJob.filterRule;
createJobInfo.audit = getJobResponse.body.importJob.audit;
createJobInfo.createReport = getJobResponse.body.importJob.createReport;
createJobRequest.setImportJob(createJobInfo);
client.createJob(userId, createJobRequest);
UpdateJobRequest updateJobRequest = new UpdateJobRequest();
UpdateJobInfo updateJobInfo = new UpdateJobInfo();
updateJobInfo.status = LAUNCHING;
updateJobRequest.setImportJob(updateJobInfo);
client.updateJob(userId, createJobInfo.name, updateJobRequest);
}
}