场景示例

更新时间: 2024-10-10 15:37:12

本文介绍运行任务并查看结果、重试失败文件的完整流程。

场景一: 运行任务

以下示例用于创建通道、创建代理、创建数据地址、创建迁移任务,运行任务并查看运行结果。

说明

在不使用代理的情况下,请忽略下面代码中的创建通道和代理的部分。使用代理前需要先部署代理,详情请参见 代理管理

package sample;

import com.aliyun.hcs_mgw20240626.Client;
import com.aliyun.hcs_mgw20240626.models.*;
import com.google.gson.Gson;

import java.util.LinkedList;
import java.util.List;

public class Demo {
	/** 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。*/
	static String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
	static String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
	/** 填写主账号ID。*/
	static String userId = "11470***876***55";
	private static final String AVAILABLE = "available";
	private static final String JOBFINISH = "IMPORT_JOB_FINISHED";

	public static void main(String[] args) {
		// 填写数据地址名称。
		String addressName = "exampleaddress";
		try {
			/** 步骤1:初始化 */
                        com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config();
			// 这里以北京区域为例。
			config.setEndpoint("mgw.cn-beijing.aliyuncs.com");
			config.setRegionId(accessKeyId);
			config.setAccessKeyId(accessKeyId);
			config.setAccessKeySecret(accessKeySecret);
			Client client = new Client(config);

			//* 创建通道和代理,代理可以创建多个关联一个通道。 */
			CreateTunnelRequest createTunnelRequest = new CreateTunnelRequest();
			CreateTunnelInfo createTunnelInfo = new CreateTunnelInfo();
			TunnelQos qos = new TunnelQos();
			// MaxBandwidth, MaxQps 默认为0, 表示没有限制, MaxBandWidth的单位是bit,请按照实际需求填写。
			qos.setMaxBandwidth(1073741824L);
			qos.setMaxQps(1000);
			createTunnelInfo.setTunnelQos(qos);
			createTunnelRequest.setImportTunnel(createTunnelInfo);
			CreateTunnelResponse createTunnelResponse = client.createTunnel(userId, createTunnelRequest);

			String tunnelId = createTunnelResponse.headers.get("x-oss-import-tunnel-id");
			// 填写代理名称。
			String agentName = "exampleagent";
			// 使用网络,公网请填写public, 专线或者VPN请填写vpc。
			String agentEndpoint = "public";
			// 部署方式,目前仅支持填写default。
			String deployMethod = "default";
			CreateAgentRequest createAgentRequest = new CreateAgentRequest();
			CreateAgentInfo createAgentInfo = new CreateAgentInfo();
			createAgentInfo.setName(agentName);
			createAgentInfo.setTunnelId(tunnelId);
			createAgentInfo.setAgentEndpoint(agentEndpoint);
			createAgentInfo.setDeployMethod(deployMethod);
			createAgentRequest.setImportAgent(createAgentInfo);
			client.createAgent(userId, createAgentRequest);

			/** 步骤2:创建源地址和目的地址并验证可用性,这里以创建源为oss,目的为oss的地址为例,其它源示例请参见数据地址篇创建数据地址小节。 */
			String srcAddress = "examplesrcaddress";
			String destAddress = "exampledestaddress";

			AddressDetail addrssdetail = new AddressDetail();
			addrssdetail.addressType = "oss";
			// 以下参数请根据实际值填写。
			addrssdetail.bucket = "examplebucket";
			addrssdetail.prefix = "***/";
			addrssdetail.regionId = "oss-cn-beijing";
			addrssdetail.role = "rolename_xxxxx";
			CreateAddressRequest createAddressRequest = new CreateAddressRequest();
			CreateAddressInfo createAddressInfo = new CreateAddressInfo();
			createAddressInfo.name = addressName;
			createAddressInfo.setAddressDetail(addrssdetail);
			createAddressRequest.setImportAddress(createAddressInfo);
			client.createAddress(userId, createAddressRequest);
                        // 验证源端数据地址可用性。
			VerifyAddressResponse verifyAddressResponse = client.verifyAddress(userId, srcAddress);
			if (!AVAILABLE.equals(verifyAddressResponse.body.verifyAddressResponse.status)) {
				System.out.printf("job status is %s", verifyAddressResponse.body.verifyAddressResponse.status);
				return;
			}

			addrssdetail = new AddressDetail();
			addrssdetail.addressType = "oss";
			// 以下参数请根据实际值填写。
			addrssdetail.bucket = "examplebucket";
			addrssdetail.prefix = "***/";
			addrssdetail.regionId = "oss-cn-beijing";
			addrssdetail.role = "rolename_xxxxx";
			createAddressRequest = new CreateAddressRequest();
			createAddressInfo = new CreateAddressInfo();
			createAddressInfo.name = addressName;
			createAddressInfo.setAddressDetail(addrssdetail);
			createAddressRequest.setImportAddress(createAddressInfo);
			client.createAddress(userId, createAddressRequest);

			verifyAddressResponse = client.verifyAddress(userId, destAddress);
			if (!AVAILABLE.equals(verifyAddressResponse.body.verifyAddressResponse.status)) {
				System.out.printf("job status is %s", verifyAddressResponse.body.verifyAddressResponse.status);
				return;
			}

			/** 步骤3:创建并且启动任务 */
			// 填写任务名称。
			String jobName = "examplejob";
			CreateJobInfo createJobInfo = new CreateJobInfo();
			createJobInfo.name = jobName;
			createJobInfo.srcAddress = srcAddress;
			createJobInfo.destAddress = destAddress;
			
			/*
                          overwriteMode和transferMode需要组合使用,具体组合含义如下
                          always,all 全覆盖
                          always,lastmodified 根据文件最后修改时间覆盖
                          never,all 不覆盖
                        */
			/* 文件覆盖方式, 可能得值为 1.never  2.always */
			createJobInfo.overwriteMode = "always";
			/* 文件传输的方式, 可能得值为 1.changed 2.all 3.lastmodified */
			createJobInfo.transferMode = "lastmodified";

			// 如果maxImportTaskQps值为0或者不设置,会被设置为默认值;MaxBandWidth值为0或者不设置,会被设置为默认值,maxBandWidth值单位为bits。
			ImportQos importQos = new ImportQos();
			// maxBandWidth,maxImportTaskQps 根据实际需求值填写。
			importQos.maxBandWidth = 1073741824L;
			importQos.maxImportTaskQps = 1000L;
			createJobInfo.setImportQos(importQos);

			// 配置调度规则,具体参数含义请参看API文档。
			ScheduleRule scheduleRule = new ScheduleRule();
			// maxScheduleCount,startCronExpression,suspendCronExpression 根据实际需求值填写。
			scheduleRule.maxScheduleCount = 2L;
			scheduleRule.startCronExpression = "0 0 10 * * ?";
			scheduleRule.suspendCronExpression = "0 0 14 * * ?";
			createJobInfo.setScheduleRule(scheduleRule);

			// 配置过滤规则,具体参数含义请参看API文档。
			FilterRule filterRule = new FilterRule();
			// 文件过滤器,根据实际需求值填写。
			KeyFilters keyFilters = new KeyFilters();
			KeyFilterItem includekeyFilterItem = new KeyFilterItem();
			List<String> includeRegex = new LinkedList<>();
			includeRegex.add(".*.jpg");
			includeRegex.add(".*.jpeg");
			includekeyFilterItem.setRegex(includeRegex);
			KeyFilterItem excludekeyFilterItem = new KeyFilterItem();
			List<String> excludeRegex = new LinkedList<>();
			excludeRegex.add(".*.txt");
			excludeRegex.add(".*.js");
			excludekeyFilterItem.setRegex(excludeRegex);
			keyFilters.setIncludes(includekeyFilterItem);
			keyFilters.setExcludes(excludekeyFilterItem);
			filterRule.setKeyFilters(keyFilters);

			// 时间过滤器, 时间格式遵循UTC时间格式,根据实际需求值填写。
			LastModifiedFilters lastModifiedFilters = new LastModifiedFilters();
			LastModifyFilterItem includeLastModifyFilterItem = new LastModifyFilterItem();
			List<TimeFilter> includeRegexs = new LinkedList<>();
			TimeFilter includeTimeFilter = new TimeFilter();
			includeTimeFilter.startTime = "2006-01-01T00:00:00Z";
			includeTimeFilter.endTime = "2006-12-31T23:59:59Z";
			includeRegexs.add(includeTimeFilter);
			includeLastModifyFilterItem.setTimeFilter(includeRegexs);
			LastModifyFilterItem excludeLastModifyFilterItem = new LastModifyFilterItem();
			List<TimeFilter> excludeRegexs = new LinkedList<>();
			TimeFilter excludeTimeFilter = new TimeFilter();
			excludeTimeFilter.startTime = "2008-01-01T00:00:00Z";
			excludeTimeFilter.endTime = "2008-12-31T23:59:59Z";
			excludeRegexs.add(excludeTimeFilter);
			excludeLastModifyFilterItem.setTimeFilter(excludeRegexs);
			lastModifiedFilters.setIncludes(includeLastModifyFilterItem);
			lastModifiedFilters.setExcludes(excludeLastModifyFilterItem);
			filterRule.setLastModifiedFilters(lastModifiedFilters);
			createJobInfo.setFilterRule(filterRule);

			CreateJobRequest createJobRequest = new CreateJobRequest();
			createJobRequest.setImportJob(createJobInfo);
			client.createJob(userId, createJobRequest);

			UpdateJobRequest updateJobRequest = new UpdateJobRequest();
			UpdateJobInfo updateJobInfo = new UpdateJobInfo();
			updateJobInfo.setStatus("IMPORT_JOB_LAUNCHING");
			updateJobRequest.setImportJob(updateJobInfo);
			client.updateJob(userId, jobName, updateJobRequest);

			/** 步骤4:循环查看当前任务状态 */
                        for(;;) {
				GetJobResponse response = client.getJob(userId, jobName, new GetJobRequest());
				if ("IMPORT_JOB_INTERRUPTED".equals(response.getBody().importJob.status)) {
					System.out.println("job is interrupted");
					return;
				}
				if ("IMPORT_JOB_FINISHED".equals(response.getBody().importJob.status)) {
					System.out.println("job is finished");
					break;
				}
				Thread.sleep(6000);
			}
			
			/** 步骤5:任务结束后查看结果。 */
			ListJobHistoryRequest listJobHistoryRequest = new ListJobHistoryRequest();
			ListJobHistoryResponse listJobHistoryResp = client.listJobHistory(userId, jobName, listJobHistoryRequest);
                        System.out.println(new Gson().toJson(listJobHistoryResp.body.jobHistoryList.jobHistory))
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

场景二:重试失败文件

迁移任务运行完成后,可能有失败文件,迁移服务会为这些失败文件构造一份失败文件列表,以下示例先获取失败文件列表的详情,然后据此创建一个新的数据地址,再创建一个重试子任务。通过这种方式,您能够重新迁移这些失败的文件。

package sample;

import com.aliyun.hcs_mgw20240626.Client;
import com.aliyun.hcs_mgw20240626.models.*;

public class Demo {
	/** 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。*/
	static String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
	static String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
	/** 填写主账号ID。*/
	static String userId = "11470***876***55";
	
	private static final String AVAILABLE = "available";
	private static final String JOBFINISH = "IMPORT_JOB_FINISHED";
	private static final String INTERRUPT = "Interrupt";
	private static final String NONEED = "NoNeed";
	private static final String READY = "Ready";
	private static final String SYSTEM = "SYSTEM";
	private static final String LAUNCHING = "IMPORT_JOB_LAUNCHING";

	public static void main(String[] args) {
		try {
			/** 步骤1:初始化 */
			com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config();
			// 这里以北京区域为例。
			config.setEndpoint("mgw.cn-beijing.aliyuncs.com");
			config.setRegionId(accessKeyId);
			config.setAccessKeyId(accessKeyId);
			config.setAccessKeySecret(accessKeySecret);
			Client client = new Client(config);

			// 下列参数值请根据实际需求填写。
			int runtimeId = 1;
			String jobName = "examplejob";
			String srcAddress = "examplesrcaddress";
			String destAddress = "exampledestaddress";
                        
                         /** 步骤2:循环查看任务状态,查看任务是否结束或者中断。 */
                         for(;;) {
				GetJobResponse response = client.getJob(userId, jobName, new GetJobRequest());
				if ("IMPORT_JOB_INTERRUPTED".equals(response.getBody().importJob.status)) {
					System.out.println("job is interrupted");
					return;
				}
				if ("IMPORT_JOB_FINISHED".equals(response.getBody().importJob.status)) {
					System.out.println("job is finished");
					break;
				}
				Thread.sleep(6000);
			}
			
			/** 步骤3:ListJobHistory,查看是否有失败文件。 */
			ListJobHistoryRequest listJobHistoryRequest = new ListJobHistoryRequest();
			listJobHistoryRequest.runtimeId = runtimeId;
			ListJobHistoryResponse listJobHistoryResponse = client.listJobHistory(userId, jobName, listJobHistoryRequest);
			if (listJobHistoryResponse.body.jobHistoryList.jobHistory.size() < 1) {
				System.out.println("list job history length less than 1");
				return;
			}
			if (listJobHistoryResponse.body.jobHistoryList.jobHistory.get(0).failedCount <= 0) {
				System.out.println("job no need retry");
				return;
			}
			
			/** 步骤4:循环GetJobResult,等待Ready。 */
			for (;;) {
				GetJobResultRequest getJobResultRequest = new GetJobResultRequest();
				getJobResultRequest.setRuntimeId(runtimeId);
				GetJobResultResponse getJobResultResponse = client.getJobResult(userId, jobName, getJobResultRequest);
				if ("NoNeed".equals(getJobResultResponse.body.importJobResult.readyRetry)) {
					System.out.println("job no need retry");
					return;
				}
				if ("Ready".equals(getJobResultResponse.body.importJobResult.readyRetry)) {
					break;
				}
				Thread.sleep(6000);
			}
			
	                retry(client, jobName, srcAddress, destAddress, runtimeId);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}


	static void retry(Client client, String jobName, String srcAddress, String destAddress, int runtimeId) throws Exception {
		/** 步骤5:根据失败文件清单信息创建新的数据地址并且验证是否可用。 */
		GetJobResultRequest getJobResultRequest = new GetJobResultRequest();
		getJobResultRequest.runtimeId = runtimeId;
		GetJobResultResponse getJobResultResponse = client.getJobResult(userId, jobName, getJobResultRequest);
		GetJobResponse getJobResponse = client.getJob(userId, jobName, new GetJobRequest());
		GetAddressResponse getAddressResponse = client.getAddress(userId, srcAddress);

		CreateAddressRequest request = new CreateAddressRequest();
		CreateAddressInfo info = new CreateAddressInfo();
		long time = System.currentTimeMillis();
		info.name = srcAddress + "_" + time + "_retry";
		AddressDetail detail = new AddressDetail();
		detail.addressType = getJobResultResponse.body.importJobResult.addressType;
		detail.invLocation = getJobResultResponse.body.importJobResult.invLocation;
		detail.invBucket = getJobResultResponse.body.importJobResult.invBucket;
		detail.invAccessId = SYSTEM;
		detail.invAccessSecret = SYSTEM;
		detail.invPath = getJobResultResponse.body.importJobResult.invPath;
		detail.invRegionId = getJobResultResponse.body.importJobResult.invRegionId;
		detail.domain = getAddressResponse.body.importAddress.addressDetail.domain;
		detail.regionId = getAddressResponse.body.importAddress.addressDetail.regionId;
		detail.accessId = getAddressResponse.body.importAddress.addressDetail.accessId;
		detail.accessSecret = getAddressResponse.body.importAddress.addressDetail.accessSecret;
		detail.agentList = getAddressResponse.body.importAddress.addressDetail.agentList;
		detail.role = getAddressResponse.body.importAddress.addressDetail.role;
		detail.prefix = getAddressResponse.body.importAddress.getAddressDetail().prefix;
		detail.invRole = getAddressResponse.body.importAddress.addressDetail.invRole;
		detail.bucket = getAddressResponse.body.importAddress.addressDetail.bucket;
		info.setAddressDetail(detail);
		request.setImportAddress(info);
		client.createAddress(userId, request);

		VerifyAddressResponse verifyAddressResponse = client.verifyAddress(userId, info.name);
		if (!verifyAddressResponse.body.verifyAddressResponse.status.equals(AVAILABLE)) {
			throw new Exception("retry srcaddress unavailable");
		}
		
		/** 步骤6:创建重试子任务并启动。 */
		CreateJobRequest createJobRequest = new CreateJobRequest();
		CreateJobInfo createJobInfo = new CreateJobInfo();
		createJobInfo.name = jobName + "_" + time + "_retry";
		createJobInfo.transferMode = getJobResponse.body.importJob.transferMode;
		createJobInfo.overwriteMode = getJobResponse.body.importJob.overwriteMode;
		createJobInfo.srcAddress = info.name;
		createJobInfo.destAddress = destAddress;
		createJobInfo.parentVersion = getJobResultResponse.body.importJobResult.version;
		createJobInfo.filterRule = getJobResponse.body.importJob.filterRule;
		createJobInfo.audit = getJobResponse.body.importJob.audit;
		createJobInfo.createReport = getJobResponse.body.importJob.createReport;
		createJobRequest.setImportJob(createJobInfo);
		client.createJob(userId, createJobRequest);
		UpdateJobRequest updateJobRequest = new UpdateJobRequest();
		UpdateJobInfo updateJobInfo = new UpdateJobInfo();
		updateJobInfo.status = LAUNCHING;
		updateJobRequest.setImportJob(updateJobInfo);
		client.updateJob(userId, createJobInfo.name, updateJobRequest);
	}
}
上一篇: 任务 下一篇: Python SDK