DataWorks数据集成支持使用Lindorm Reader和Lindorm Writer插件读取和写入Lindorm双向通道的功能,本文为您介绍DataWorks的Lindorm数据读取与写入能力。
使用限制
Lindorm需要在DataWorks上配置数据源,通过Lindorm Reader和Lindorm Writer插件读取与写入Lindorm数据。
宽表引擎支持使用Serverless资源组(推荐)和独享数据集成资源组。
计算引擎仅支持使用Serverless资源组。
Lindorm为多模数据库,详情请参见Lindorm使用文档,当前DataWorks仅支持宽表引擎及计算引擎两种。
支持的字段类型
Lindorm Reader和Lindorm Writer支持大部分Lindorm类型,但也存在个别没有支持的情况,请注意检查您的数据类型。
Lindorm Reader和Lindorm Writer针对Lindorm类型的转换列表,如下所示。
类型分类 | 数据类型 |
整数类 | INT、LONG、SHORT |
浮点类 | DOUBLE、FLOAT、DOUBLE |
字符串类 | STRING |
日期时间类 | DATE |
布尔类 | BOOLEAN |
二进制类 | BINARYSTRING |
数据同步任务开发
数据同步任务的配置入口和通用配置流程可参见下文的配置指导。
操作流程请参见脚本模式配置。
脚本模式配置的全量参数和脚本Demo请参见下文的附录:脚本Demo与参数说明。
附录:脚本Demo与参数说明
离线任务脚本配置方式
如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见脚本模式配置,以下为您介绍脚本模式下数据源的参数配置详情。
Reader脚本Demo
配置一个宽表引擎Lindorm SQL Table抽取数据到本地的作业。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "lindorm", "parameter": { "mode": "FixedColumn", "caching": 128, "column": [ "id", "value" ], "envType": 1, "datasource": "lindorm", "tableMode": "tableService", "table": "lindorm_table" }, "name": "lindormreader", "category": "reader" }, { "stepType": "mysql", "parameter": { "postSql": [], "datasource": "lindorm", "session": [], "envType": 1, "column": [ "id", "value" ], "socketTimeout": 3600000, "writeMode": "insert", "batchSize": 1024, "encoding": "UTF-8", "table": "", "preSql": [] }, "name": "Writer", "category": "writer" } ], "setting": { "jvmOption": "", "executeMode": null, "errorLimit": { "record": "0" }, "speed": { //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它. "byte": 1048576 } //出错限制 "errorLimit": { //出错的record条数上限,当大于该值即报错。 "record": 0, //出错的record百分比上限 1.0表示100%,0.02表示2% "percentage": 0.02 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
配置一个宽表引擎Lindorm HbaseLike(WideColumn)表抽取数据到本地的作业。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "lindorm", "parameter": { "mode": "FixedColumn", "column": [ "STRING|rowkey", "INT|f:a" ], "envType": 1, "datasource": "lindorm", "tableMode": "wideColumn", "table":"lindorm_table" }, "name": "lindormreader", "category": "reader" }, { "stepType": "mysql", "parameter": { "postSql": [], "datasource": "_IDB.TAOBAO", "session": [], "envType": 1, "column": [ "id", "value" ], "socketTimeout": 3600000, "guid": "", "writeMode": "insert", "batchSize": 1024, "encoding": "UTF-8", "table": "", "preSql": [] }, "name": "Writer", "category": "writer" } ], "setting": { "jvmOption": "", "executeMode": null, "errorLimit": { "record": "0" }, "speed": { //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它。 "byte": 1048576 } //出错限制 "errorLimit": { //出错的record条数上限,当大于该值即报错。 "record": 0, //出错的record百分比上限 1.0表示100%,0.02表示2%。 "percentage": 0.02 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
配置一个计算引擎表抽取数据到本地的作业。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "lindorm", "parameter": { "datasource": "lindorm_datasource", "column": [ "id", "value" ], "tableComment": "", "where": "", "session": [], "splitPk": "id", "table": "auto_ob_149912212480" }, "name": "lindormreader", "category": "reader" }, { "stepType": "mysql", "parameter": { "postSql": [], "datasource": "_IDB.TAOBAO", "session": [], "envType": 1, "column": [ "id", "value" ], "socketTimeout": 3600000, "guid": "", "writeMode": "insert", "batchSize": 1024, "encoding": "UTF-8", "table": "", "preSql": [] }, "name": "Writer", "category": "writer" } ], "setting": { "jvmOption": "", "executeMode": null, "errorLimit": { "record": "0" }, "speed": { //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它。 "byte": 1048576 } //出错限制 "errorLimit": { //出错的record条数上限,当大于该值即报错。 "record": 0, //出错的record百分比上限 1.0表示100%,0.02表示2%。 "percentage": 0.02 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
Reader脚本参数
参数 | 描述 | 是否必选 | 默认值 | ||||||||||||||||||||
mode | 宽表引擎特有,表示数据读取模式,包括固定列模式FixedColumn和动态列模式DynamicColumn。 | 是 | FixedColumn | ||||||||||||||||||||
tableMode | 宽表引擎特有,包括普通表SQL模式table和宽表模式wideColumn。默认为table,如果选择table模式,可不填写。 | 否 | 默认不填写 | ||||||||||||||||||||
table | 表示所要读取的lindorm表名。lindorm表名对大小写敏感。 | 是 | 无 | ||||||||||||||||||||
encoding | 宽表引擎特有,编码方式,取值为UTF-8或GBK。一般用于将二进制存储的lindorm byte[]类型转换为String类型。 | 否 | UTF-8 | ||||||||||||||||||||
caching | 宽表引擎特有,一次性批量获取的记录数大小,该值可以极大减少数据同步系统与Lindorm的网络交互次数,并提升整体吞吐量。如果该值设置过大,会导致Lindorm服务端压力过大或者数据同步运行进程OOM异常。 | 否 | 100 | ||||||||||||||||||||
selects | 宽表引擎特有,当前读取的Table类型数据不支持自动切割分片,默认单并发运行,因此需要手动配置selects参数进行数据切片,例如:
使用限制:
| 否 | 无 | ||||||||||||||||||||
session | 计算引擎特有,Session粒度作业参数,例如 | 否 | 无 | ||||||||||||||||||||
splitPk | 计算引擎特有,切分键,计算引擎表读取特有,如果指定splitPk,表示您希望使用splitPk代表的字段进行数据分片,数据同步因此会启动并发任务进行数据同步,提高数据同步的效率。
| 否 | 无 | ||||||||||||||||||||
columns | 读取字段列表。读取字段列表支持列裁剪和列换序,列裁剪指可以选择部分列进行导出,列换序指可以不按照表schema信息顺序进行导出。
| 是 | 无 |
Writer脚本Demo
配置一个数据源为MySQL,需要写入数据到宽表引擎 Lindorm SQL Table的作业。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "checkSlave": true, "datasource": " ", "envType": 1, "column": [ "id", "value" ], "socketTimeout": 3600000, "masterSlave": "slave", "connection": [ { "datasource": " ", "table": [] } ], "where": "", "splitPk": "", "encoding": "UTF-8", "print": true }, "name": "mysqlReader", "category": "reader" }, { "stepType": "lindorm", "parameter": { "nullMode": "skip", "datasource": "lindorm_datasource", "envType": 1, "column": [ "id", "value" ], "dynamicColumn": "false", "table": "lindorm_table", "encoding": "utf8" }, "name": "Writer", "category": "writer" } ], "setting": { "jvmOption": "", "executeMode": null, "speed": { //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它。 "byte": 1048576 }, //出错限制 "errorLimit": { //出错的record条数上限,当大于该值即报错。 "record": 0, //出错的record百分比上限 1.0表示100%,0.02表示2%。 "percentage": 0.02 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
配置一个数据源为MySQL,需要写入数据到宽表引擎 Lindorm HbaseLike (WideColumn)表的作业。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "envType": 0, "datasource": " ", "column": [ "id", "value" ], "connection": [ { "datasource": " ", "table": [] } ], "where": "", "splitPk": "", "encoding": "UTF-8" }, "name": "Reader", "category": "reader" }, { "stepType": "lindorm", "parameter": { "datasource": "lindorm_datasource", "table": "xxxxxx", "encoding": "utf8", "nullMode": "skip", "dynamicColumn": "false", "caching": 128, "column": [ //从源端按字段顺序映射 "ROW|STRING", //行键,固定配置,将源端第一个字段映射为行键,例如本示例中将id映射为行键。 "cf:name|STRING" //cf表示列族名,可修改,name表示目标端列名,可修改 ] }, "name":"Writer", "category":"writer" } ], "setting": { "jvmOption": "", "errorLimit": { "record": "0" }, "speed": { "concurrent": 3, "throttle": false } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
配置一个数据源为MySQL,需要写入数据到计算引擎表的作业。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "envType": 0, "datasource": " ", "column": [ "id", "value" ], "connection": [ { "datasource": " ", "table": [] } ], "where": "", "splitPk": "", "encoding": "UTF-8" }, "name": "Reader", "category": "reader" }, { "stepType": "lindorm", "parameter": { "datasource": "lindorm_datasource", "table": "xxxxxx", "column": [ "id", "value" ], "formatType": "ICEBERG" }, "name":"Writer", "category":"writer" } ], "setting": { "jvmOption": "", "errorLimit": { "record": "0" }, "speed": { "concurrent": 3, "throttle": false } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
Writer脚本参数
参数 | 描述 | 是否必选 | 默认值 |
table | 表示所要写入的lindorm表名。lindorm表名对大小写敏感。 | 是 | 无 |
encoding | 宽表引擎特有,编码方式,取值为UTF-8或GBK。一般用于将二进制存储的lindorm byte[]类型转换为String类型。 | 否 | UTF-8 |
columns | 写入字段列表。写入字段列表支持列裁剪和列换序,列裁剪指可以选择部分列进行导出,列换序指可以不按照表schema信息顺序进行导出。
| 是 | 无 |
nullMode | 宽表引擎特有,表示在读取源头数据的值为null时,Lindorm Writer 中的nullMode参数可通过配置不同内容,实现不同的处理方式。
| 否 | EMPTY_BYTES |
formatType | 计算引擎特有,比较待同步表的类型,取值范围:
| 否 | 无 |