创建投递任务

如果要表格存储数据表中的数据投递到OSS Bucket中存储,您可以使用CreateDeliveryTask接口创建一个投递任务。

重要

表格存储Java SDK5.10.3版本开始支持数据湖投递功能,请确保已安装正确的表格存储Java SDK。关于表格存储Java SDK版本的更多信息,请参见Java SDK历史迭代版本

前提条件

  • 已开通OSS服务且在表格存储实例所在地域创建Bucket。具体操作,请参见开通OSS服务

  • 已通过控制台创建表格存储服务关联角色并记录角色的ARN。具体操作,请参见创建投递任务

    服务关联角色的ARN请通过RAM控制台获取,具体操作如下:

    RAM 角色管理界面,搜索AliyunServiceRoleForOTSDataDelivery后,单击角色名称,在角色详情界面,可以查看和复制角色的ARN信息。

    figrole

  • 已初始化OTSClient。具体操作,请参见初始化

  • 已创建数据表并写入数据。

参数

参数

说明

tableName

数据表名称。

taskName

投递任务名称。

名称只能包含英文小写字母(a~z)、数字和短横线(-),开头和结尾必须为英文小写字母或数字,且长度为3~16字符。

taskConfig

投递任务配置,包括如下选项:

  • ossPrefix:OSS Bucket中的目录前缀,将表格存储的数据投递到该OSS Bucket目录中。投递路径中支持引用$yyyy、$MM、$dd、$HH、$mm五种时间变量。

    • 当投递路径中引用时间变量时,可以按数据的写入时间动态生成OSS目录,实现hive partition naming style的数据时间分区,从而按照时间分区组织OSS中的文件分布。

    • 当投递路径中不引用时间变量时,所有文件会被投递到固定的OSS前缀目录中。

  • ossBucket:OSS Bucket名称。

  • ossEndpoint:OSS Bucket所在地域的服务地址。

  • ossStsRole:表格存储服务关联角色的ARN信息。

  • format:投递的数据的存储以Parquet列存格式存储,数据湖投递默认使用PLAIN编码方式,PLAIN编码方式支持任意类型数据。

  • eventTimeColumn:事件时间列,用于指定按某一列数据的时间进行分区。如果不设置此参数,则按数据写入表格存储的时间进行分区。

  • parquetSchema:指定需要投递的数据列,必须手动配置投递字段的源表字段、目标字段和目标字段类型。

    您可以选择任意字段以任意顺序、名称写入列存文件,OSS的列存数据会按Schema数组中的数据列先后顺序分布。

    重要

    投递数据的字段类型必须与数据源的字段类型匹配,否则会作为脏数据丢弃。字段类型映射详情请参见数据格式映射

taskType

投递任务的类型,包括如下选项:

  • INC:表示增量数据投递模式,只同步增量数据。

  • BASE:表示全量数据投递模式,一次性全表扫描数据同步。

  • BASE_INC(默认):表示全量&增量数据投递模式,全量数据同步完成后,再同步增量数据 其中增量数据同步时可以获取最新投递时间和了解当前投递状态。

示例

以下示例用于为数据表创建投递任务。

private static void createDeliveryTask(SyncClient client) {
    String tableName = "<TABLE_NANE>";
    String taskName = "<TASK_NAME>";
    OSSTaskConfig taskConfig = new OSSTaskConfig();
    taskConfig.setOssPrefix("sampledeliverytask/year=$yyyy/month=$MM");
    taskConfig.setOssBucket("datadeliverytest");
    taskConfig.setOssEndpoint("oss-cn-hangzhou.aliyuncs.com");
    taskConfig.setOssStsRole("acs:ram::17************45:role/aliyunserviceroleforotsdatadelivery"); //eventColumn为可选配置,指定按某一列数据的时间进行分区。如果不设置此参数,则按数据写入表格存储的时间进行分区。
    EventColumn eventColumn = new EventColumn("PK1", EventTimeFormat.RFC1123);
    taskConfig.setEventTimeColumn(eventColumn);
    taskConfig.addParquetSchema(new ParquetSchema("PK1", "PK1", DataType.UTF8));
    taskConfig.addParquetSchema(new ParquetSchema("PK2", "PK2", DataType.BOOL));
    taskConfig.addParquetSchema(new ParquetSchema("Col1", "Col1", DataType.UTF8));
    CreateDeliveryTaskRequest request = new CreateDeliveryTaskRequest();
    request.setTableName(tableName);
    request.setTaskName(taskName);
    request.setTaskConfig(taskConfig);
    request.setTaskType(DeliveryTaskType.BASE_INC);
    CreateDeliveryTaskResponse response = client.createDeliveryTask(request);
    System.out.println("resquestID: "+ response.getRequestId());
    System.out.println("traceID: " + response.getTraceId());
    System.out.println("create delivery task success");
}