本文为您介绍OTS Reader支持的数据类型、读取方式、字段映射和数据源等参数及配置示例。

OTS Reader插件实现了从Table Store(OTS)读取数据,通过您指定的抽取数据范围,可以方便地实现数据增量抽取的需求。目前支持以下三种抽取方式:
  • 全表抽取
  • 范围抽取
  • 指定分片抽取

Table Store是构建在阿里云飞天分布式系统之上的NoSQL数据库服务,提供海量结构化数据的存储和实时访问。Table Store以实例和表的形式组织数据,通过数据分片和负载均衡技术,实现规模上的无缝扩展。

OTS Reader通过Table Store官方Java SDK连接到Table Store服务端,获取并按照数据同步官方协议标准转为数据同步字段信息传递给下游Writer端。

OTS Reader会根据Table Store的表范围,按照数据同步并发的数目N,将范围等分为N份Task。每个Task都会有一个OTS Reader线程来执行。

目前OTS Reader支持所有Table Store类型,OTS Reader针对Table Store的类型转换表,如下所示。
类型分类 Table Store数据类型
整数类 INTEGER
浮点类 DOUBLE
字符串类 STRING
布尔型 BOOLEAN
二进制类 BINARY
说明 Table Store本身不支持日期型类型。应用层通常使用Long报错时间的Unix TimeStamp。

参数说明

参数 描述 是否必选 默认值
endpoint OTS Server的EndPoint(服务地址),详情请参见服务地址
accessId Table Store的accessId。
accessKey Table Store的accessKey。
instanceName Table Store的实例名称,实例是您使用和管理Table Store服务的实体。

您在开通Table Store服务后,需要通过管理控制台来创建实例,然后在实例内进行表的创建和管理。

实例是Table Store资源管理的基础单元,Table Store对应用程序的访问控制和资源计量都在实例级别完成。

table 所选取的需要抽取的表名称,这里有且只能填写一张表。在Table Store不存在多表同步的需求。
column 所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。由于Table Store本身是NoSQL系统,在OTS Reader抽取数据过程中,必须指定相应的字段名称。
  • 支持普通的列读取,例如{"name":"col1"}
  • 支持部分列读取,如果您不配置该列,则OTS Reader不予读取。
  • 支持常量列读取,例如{"type":"STRING", "value":"DataX"}。使用type描述常量类型,目前支持String、Int、Double、Bool、Binary(使用Base64编码填写)、INF_MIN(Table Store的系统限定最小值,如果使用该值,您不能填写value属性,否则报错)、INF_MAX(Table Store的系统限定最大值,如果使用该值,您不能填写value属性,否则报错)。
  • 不支持函数或者自定义表达式,由于Table Store本身不提供类似SQL的函数或者表达式功能,OTS Reader也不能提供函数或表达式列功能。
beginend 该配置项必须配对使用,用于支持Table Store表范围抽取。beginend中描述的是Table Store PrimaryKey的区间分布状态,而且必须保证区间覆盖到所有的PrimaryKey,需要指定该表下所有的PrimaryKey范围。对于无限大小的区间,您可以使用{"type":"INF_MIN"}{"type":"INF_MAX"}指代。
例如,对一张主键为[DeviceID, SellerID]的双主键的Table Store进行抽取任务,beginend的配置如下所示。
说明 请确保主键数和配置项对应。
"range": {
      "begin": [
        {"type":"INF_MIN"},  //指定deviceID最小值。
        {"type":"INT", "value":"0"}  //指定SellerID最小值。
      ], 
      "end": [
        {"type":"INF_MAX"}, //指定deviceID抽取最大值。
        {"type":"INT", "value":"9999"} //指定SellerID抽取最大值。
      ]
    }
如果要对上述表抽取全表,可以使用如下配置。
"range": {
      "begin": [
        {"type":"INF_MIN"},  //指定deviceID最小值。
        {"type":"INF_MIN"} //指定SellerID最小值。
      ], 
      "end": [
        {"type":"INF_MAX"}, //指定deviceID抽取最大值。
          {"type":"INF_MAX"} //指定SellerID抽取最大值。
      ]
    }
split 该配置项属于高级配置项,是您自己定义切分配置信息,普通情况下不建议使用。

适用场景:通常在Table Store数据存储发生热点,使用OTS Reader自动切分的策略不能生效的情况下,使用您自定义的切分规则。

split指定在Begin、End区间内的切分点,且只能是partitionKey的切分点信息,即在split仅配置partitionKey,而不需要指定全部的PrimaryKey。

如果对一张主键为[DeviceID, SellerID]的Table Store进行抽取任务,配置如下。
"range": {
      "begin": {
        {"type":"INF_MIN"},  //指定DeviceID最小值。
        {"type":"INF_MIN"}  //指定SellerID最小值。
      }, 
      "end": {
        {"type":"INF_MAX"}, //指定DeviceID抽取最大值。
        {"type":"INF_MAX"} //指定SellerID抽取最大值。
      },
       // 您指定的切分点。如果指定了切分点,Job将按照begin、end和split进行Task的切分。切分的列只能是Partition Key(ParimaryKey的第一列)。
       //支持INF_MIN、INF_MAX、STRING和INT。
            "split":[
                                {"type":"STRING", "value":"1"},
                                {"type":"STRING", "value":"2"},
                                {"type":"STRING", "value":"3"},
                                {"type":"STRING", "value":"4"},
                                {"type":"STRING", "value":"5"}
                    ]
    }

向导开发介绍

暂不支持向导开发模式。

脚本开发介绍

配置一个从Table Store同步抽取数据到本地的作业,使用脚本模式开发的详情请参见通过脚本模式配置任务
{
    "type":"job",
    "version":"2.0",//版本号。
    "steps":[
        {
            "stepType":"ots",//插件名。
            "parameter":{
                "datasource":"",//数据源。
                "column":[//字段。
                    {
                        "name":"column1"//字段名。
                    },
                    {
                        "name":"column2"
                    },
                    {
                        "name":"column3"
                    },
                    {
                        "name":"column4"
                    },
                    {
                        "name":"column5"
                    }
                ],
                "range":{
                    "split":[
                        {
                            "type":"INF_MIN"
                        },
                        {
                            "type":"STRING",
                            "value":"splitPoint1"
                        },
                        {
                            "type":"STRING",
                            "value":"splitPoint2"
                        },
                        {
                            "type":"STRING",
                            "value":"splitPoint3"
                        },
                        {
                            "type":"INF_MAX"
                        }
                    ],
                    "end":[
                        {
                            "type":"INF_MAX"
                        },
                        {
                            "type":"INF_MAX"
                        },
                        {
                            "type":"STRING",
                            "value":"end1"
                        },
                        {
                            "type":"INT",
                            "value":"100"
                        }
                    ],
                    "begin":[
                        {
                            "type":"INF_MIN"
                        },
                        {
                            "type":"INF_MIN"
                        },
                        {
                            "type":"STRING",
                            "value":"begin1"
                        },
                        {
                            "type":"INT",
                            "value":"0"
                        }
                    ]
                },
                "table":""//表名。
            },
            "name":"Reader",
            "category":"reader"
        },
        { 
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"//错误记录数。
        },
        "speed":{
            "throttle":false,//false代表不限流,下面的限流的速度不生效,true代表限流。
            "concurrent":1 //作业并发数。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}