本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
使用SDK进行数据投递前,您需要了解使用数据湖投递功能的注意事项、接口等信息。创建投递任务后,表格存储数据表中的数据会自动投递到OSS Bucket中存储。
注意事项
目前支持使用数据湖投递功能的地域有华东1(杭州)、华东2(上海)、华北2(北京)和华北3(张家口)。
数据湖投递不支持同步删除操作,表格存储中的删除操作在数据投递时会被忽略,已投递到OSS中的数据不会被删除。
新建数据投递任务时存在最多1分钟的初始化时间。
数据同步存在延迟,写入速率稳定时,延迟在3分钟内。数据同步的P99延迟在10分钟内。
说明P99延迟表示过去10秒内最慢的1%的请求的平均延迟。
前提条件
在对象存储服务侧已完成如下操作:
已开通OSS服务且在表格存储实例所在地域创建Bucket,详情请参见开通OSS服务。
说明数据湖投递支持投递到和表格存储相同地域的任意OSS Bucket中。如需投递到其他数仓存储(例如MaxCompute),请提交工单申请。
在表格存储服务侧已完成如下操作:
在访问控制RAM服务侧完成如下操作:
已创建RAM用户并为RAM用户授予管理表格存储权限(AliyunOTSFullAccess)。具体操作,请参见创建RAM用户和为RAM用户授权。
警告阿里云账号AccessKey泄露会威胁您所有资源的安全。建议您使用RAM用户AccessKey进行操作,可以有效降低AccessKey泄露的风险。
已为RAM用户创建AccessKey。具体操作,请参见创建AccessKey。
已配置访问凭证。具体操作,请参见配置访问凭证。
接口
接口 | 说明 |
CreateDeliveryTask | 创建一个投递任务。 |
ListDeliveryTask | 列出一个数据表所有的投递任务信息。 |
DescribeDeliveryTask | 查询投递任务描述信息。 |
DeleteDeliveryTask | 删除一个投递任务。 |
参数
参数 | 说明 |
tableName | 数据表名称。 |
taskName | 投递任务名称。 名称只能包含英文小写字母(a~z)、数字和短横线(-),开头和结尾必须为英文小写字母或数字,且长度为3~16字符。 |
taskConfig | 投递任务配置,包括如下选项:
|
taskType | 投递任务的类型,包括如下选项:
|
使用
您可以通过Java SDK、Go SDK实现数据湖投递功能。此处以Java SDK为例介绍数据投递湖的操作。
以下示例用于为数据表创建投递任务。
import com.alicloud.openservices.tablestore.ClientException;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TableStoreException;
import com.alicloud.openservices.tablestore.model.delivery.*;
public class DeliveryTask {
public static void main(String[] args) {
final String endPoint = "https://yourinstancename.cn-hangzhou.ots.aliyuncs.com";
final String accessKeyId = System.getenv("OTS_AK_ENV");
final String accessKeySecret = System.getenv("OTS_SK_ENV");
final String instanceName = "yourinstancename";
SyncClient client = new SyncClient(endPoint, accessKeyId, accessKeySecret, instanceName);
try {
createDeliveryTask(client);
System.out.println("end");
} catch (TableStoreException e) {
System.err.println("操作失败,详情:" + e.getMessage() + e.getErrorCode() + e.toString());
System.err.println("Request ID:" + e.getRequestId());
} catch (ClientException e) {
System.err.println("请求失败,详情:" + e.getMessage());
} finally {
client.shutdown();
}
}
private static void createDeliveryTask(SyncClient client){
String tableName = "sampleTable";
String taskName = "sampledeliverytask";
OSSTaskConfig taskConfig = new OSSTaskConfig();
taskConfig.setOssPrefix("sampledeliverytask/year=$yyyy/month=$MM");
taskConfig.setOssBucket("datadeliverytest");
taskConfig.setOssEndpoint("oss-cn-hangzhou.aliyuncs.com");
taskConfig.setOssStsRole("acs:ram::17************45:role/aliyunserviceroleforotsdatadelivery");
//eventColumn为可选配置,指定按某一列数据的时间进行分区。如果不设置此参数,则按数据写入表格存储的时间进行分区。
EventColumn eventColumn = new EventColumn("Col1", EventTimeFormat.RFC1123);
taskConfig.setEventTimeColumn(eventColumn);
taskConfig.addParquetSchema(new ParquetSchema("PK1", "PK1", DataType.UTF8));
taskConfig.addParquetSchema(new ParquetSchema("PK2", "PK2", DataType.BOOL));
taskConfig.addParquetSchema(new ParquetSchema("Col1", "Col1", DataType.UTF8));
CreateDeliveryTaskRequest request = new CreateDeliveryTaskRequest();
request.setTableName(tableName);
request.setTaskName(taskName);
request.setTaskConfig(taskConfig);
request.setTaskType(DeliveryTaskType.BASE_INC);
CreateDeliveryTaskResponse response = client.createDeliveryTask(request);
System.out.println("resquestID: "+ response.getRequestId());
System.out.println("traceID: " + response.getTraceId());
System.out.println("create delivery task success");
}
}