创建投递任务

如果要表格存储数据表中的数据投递到OSS Bucket中存储,您可以使用CreateDeliveryTask接口创建一个投递任务。

重要

请确认已安装支持数据湖投递功能的表格存储Go SDK。

前提条件

  • 已开通OSS服务且在表格存储实例所在地域创建Bucket。具体操作,请参见开通OSS服务

  • 已通过控制台创建表格存储服务关联角色并记录角色的ARN。具体操作,请参见创建投递任务

    服务关联角色的ARN请通过RAM控制台获取,具体操作如下:

    RAM 角色管理界面,搜索AliyunServiceRoleForOTSDataDelivery后,单击角色名称,在角色详情界面,可以查看和复制角色的ARN信息。

    figrole

  • 已初始化TableStoreClient。具体操作,请参见初始化

  • 已创建数据表并写入数据。

参数

参数

说明

TableName

数据表名称。

TaskName

投递任务名称。

名称只能包含英文小写字母(a~z)、数字和短横线(-),开头和结尾必须为英文小写字母或数字,且长度为3~16字符。

TaskConfig

投递任务配置,包括如下选项:

  • OssPrefix:OSS Bucket中的目录前缀,将表格存储的数据投递到该OSS Bucket目录中。投递路径中支持引用$yyyy、$MM、$dd、$HH、$mm五种时间变量。

    • 当投递路径中引用时间变量时,可以按数据的写入时间动态生成OSS目录,实现hive partition naming style的数据时间分区,从而按照时间分区组织OSS中的文件分布。

    • 当投递路径中不引用时间变量时,所有文件会被投递到固定的OSS前缀目录中。

  • OssBucket:OSS Bucket名称。

  • OssEndpoint:OSS Bucket所在地域的服务地址。

  • OssRoleName:表格存储服务关联角色的ARN信息。

  • Format:投递的数据的存储以Parquet列存格式存储,数据湖投递默认使用PLAIN编码方式,PLAIN编码方式支持任意类型数据。

  • EventTimeColumn:事件时间列,用于指定按某一列数据的时间进行分区。如果不设置此参数,则按数据写入表格存储的时间进行分区。

  • Schema:指定需要投递的数据列,必须手动配置投递字段的源表字段、目标字段和目标字段类型。

    您可以选择任意字段以任意顺序、名称写入列存文件,OSS的列存数据会按Schema数组中的数据列先后顺序分布。

    重要

    投递数据的字段类型必须与数据源的字段类型匹配,否则会作为脏数据丢弃。字段类型映射详情请参见数据格式映射

TaskType

投递任务的类型,包括如下选项:

  • IncTask:表示增量数据投递模式,只同步增量数据。

  • BaseTask:表示全量数据投递模式,一次性全表扫描数据同步。

  • BaseIncTask(默认):表示全量&增量数据投递模式,全量数据同步完成后,再同步增量数据。

    其中增量数据同步时可以获取最新投递时间和了解当前投递状态。

示例

以下示例用于为数据表创建投递任务。

func CreateTaskSample(client *tablestore.TableStoreClient) {
    createTask := &tablestore.CreateDeliveryTaskRequest{
        TableName: "<TABLE_NAME>",
        TaskName: "<TASK_NAME>",
        TaskType: tablestore.BaseIncTask,
        TaskConfig: &tablestore.OSSTaskConfig{
            OssPrefix:   "sample/year=$yyyy/month=$MM",
            OssBucket:      "datadeliverytest",
            OssEndpoint:    "oss-cn-hangzhou.aliyuncs.com",
            OssRoleName:    "acs:ram::17************45:role/aliyunserviceroleforotsdatadelivery",
            Schema: []*tablestore.TaskSchema{
                {
                    ColumnName: "PK1",
                    OssColumnName: "PK1",
                    Type: tablestore.ParquetInt64,
                },
                {
                    ColumnName: "PK2",
                    OssColumnName: "PK2",
                    Type: tablestore.ParquetUtf8,
                },
                {
                    ColumnName: "Col1",
                    OssColumnName: "Col1",
                    Type: tablestore.ParquetDouble,
                },
            },
        },
    }
    createResp, err := client.CreateDeliveryTask(createTask)
    if err != nil {
        log.Fatal("create delivery task failed ", err)
    }
    fmt.Println("create delivery task success ", createResp.RequestId)
}