DataWorks数据集成支持使用Lindorm Reader和Lindorm Writer插件读取和写入Lindorm双向通道的功能,本文为您介绍DataWorks的Lindorm数据读取与写入能力。
使用限制
Lindorm需要在DataWorks上配置数据源,通过Lindorm Reader和Lindorm Writer插件读取与写入Lindorm数据。
Lindorm Reader和LindormWriter仅支持使用独享数据集成资源组。
Lindorm 时序引擎目前不支持作为DataWorks数据集成的数据源。
LindormReader和LindormWriter的必填配置项configuration,可以通过Lindorm集群控制台查看连接Lindorm的相关配置项进行获取,并以JSON格式填写相关信息。
说明Lindorm为多模数据库,LindormReader和LindormWriter支持读取table和widecolumn类型的数据,关于table和widecolumn类型的详细介绍请参见Lindorm使用文档,您也可以通过钉钉咨询Lindorm值班人员。
支持的字段类型
Lindorm Reader和Lindorm Writer支持大部分Lindorm类型,但也存在个别没有支持的情况,请注意检查您的数据类型。
Lindorm Reader和Lindorm Writer针对Lindorm类型的转换列表,如下所示。
类型分类 | 数据类型 |
整数类 | INT、LONG、SHORT |
浮点类 | DOUBLE、FLOAT、DOUBLE |
字符串类 | STRING |
日期时间类 | DATE |
布尔类 | BOOLEAN |
二进制类 | BINARYSTRING |
数据同步任务开发
操作流程请参见通过脚本模式配置离线同步任务。
脚本模式配置的全量参数和脚本Demo请参见下文的附录:脚本Demo与参数说明。
附录:脚本Demo与参数说明
附录:离线任务脚本配置方式
如果您配置离线任务时使用脚本模式的方式进行配置,您需要在任务脚本中按照脚本的统一格式要求编写脚本中的reader参数和writer参数,脚本模式的统一要求请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下的数据源的Reader参数和Writer参数的指导详情。
Reader脚本Demo
配置一个Lindorm Table(对应SDK中的TableService模型)抽取数据到本地的作业。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "lindorm", "parameter": { "mode": "FixedColumn", "caching": 128, "configuration": { //lindorm控制台中与连接相关的配置项,以JSON格式填写 "lindorm.client.username": "", "lindorm.client.seedserver": "seddserver.et2sqa.tbsite.net:30020", "lindorm.client.namespace": "namespace", "lindorm.client.password": "" }, "columns": [ "id", "name", "age", "birthday", "gender" ], "envType": 1, "datasource": "_LINDORM", "namespace": "namespace", "table": "lindorm_table" }, "name": "lindormreader", "category": "reader" }, { "stepType": "mysql", "parameter": { "postSql": [], "datasource": "_IDB.TAOBAO", "session": [], "envType": 1, "columns": "columns": [ "id", "name", "age", "birthday", "gender" ], "selects": [ "where(compare(\"id\", LESS, 5))", "where(and(compare(\"id\", GREATER_OR_EQUAL, 5), compare(\"id\", LESS, 10)))", "where(compare(\"id\", GREATER_OR_EQUAL, 10))" ], "socketTimeout": 3600000, "guid": "", "writeMode": "insert", "batchSize": 1024, "encoding": "UTF-8", "table": "", "preSql": [] }, "name": "Writer", "category": "writer" } ], "setting": { "jvmOption": "", "executeMode": null, "errorLimit": { "record": "0" }, "speed": { //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它. "byte": 1048576 } //出错限制 "errorLimit": { //出错的record条数上限,当大于该值即报错。 "record": 0, //出错的record百分比上限 1.0表示100%,0.02表示2% "percentage": 0.02 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
配置一个Lindorm wideColumn(对应SDK中的WideColumnService模型)抽取数据到本地的作业。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "lindorm", "parameter": { "mode": "FixedColumn", "configuration": { //lindorm控制台中与连接相关的配置项,以JSON格式填写 "lindorm.client.username": "", "lindorm.client.seedserver": "seddserver.et2sqa.tbsite.net:30020", "lindorm.client.namespace": "namespace", "lindorm.client.password": "" }, "columns": [ "STRING|rowkey", "INT|f:a", "DOUBLE|f:b" ], "envType": 1, "datasource": "_LINDORM", "namespace": "namespace", "tableMode": "wideColumn", "table":"yourTableName" }, "name": "lindormreader", "category": "reader" }, { "stepType": "mysql", "parameter": { "postSql": [], "datasource": "_IDB.TAOBAO", "session": [], "envType": 1, "column": [ "id", "value" ], "socketTimeout": 3600000, "guid": "", "writeMode": "insert", "batchSize": 1024, "encoding": "UTF-8", "table": "", "preSql": [] }, "name": "Writer", "category": "writer" } ], "setting": { "jvmOption": "", "executeMode": null, "errorLimit": { "record": "0" }, "speed": { //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它。 "byte": 1048576 } //出错限制 "errorLimit": { //出错的record条数上限,当大于该值即报错。 "record": 0, //出错的record百分比上限 1.0表示100%,0.02表示2%。 "percentage": 0.02 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
Reader脚本参数
参数 | 描述 | 是否必选 | 默认值 |
configuration | 表示每个lindorm集群提供给DataX客户端连接的配置信息,可以通过lindorm集群控制台查询,获取到配置信息后可以联系lindorm数据库管理员将其转换为如下JSON格式:{"key1":"value1","key2":"value2"}。 例如:{"lindorm.zookeeper.quorum":"????","lindorm.zookeeper.property.clientPort":"????"} 说明 如果是手工编写的JSON代码,则需要将JSON格式中value值的双引号转义为\"。 | 是 | 无 |
mode | 表示数据读取模式,包括固定列模式FixedColumn和动态列模式DynamicColumn。默认选择FixedColumn。 | 是 | FixedColumn |
tableMode | 包括普通表模式table和宽表模式wideColumn。默认为table,如果选择table模式,可不填写。 | 否 | 默认不填写 |
table | 表示所要读取的lindorm表名。lindorm表名对大小写敏感。 | 是 | 无 |
namespace | 表示所要读取的lindorm表的命名空间。lindorm表的命名空间对大小写敏感。 | 是 | 无 |
encoding | 编码方式,取值为UTF-8或GBK。一般用于将二进制存储的lindorm byte[]类型转换为String类型。 | 否 | UTF-8 |
caching | 一次性批量获取的记录数大小,该值可以极大减少数据同步系统与Lindorm的网络交互次数,并提升整体吞吐量。如果该值设置过大,会导致Lindorm服务端压力过大或者数据同步运行进程OOM异常。 | 否 | 100 |
selects | 当前读取的Table类型数据不支持自动切割分片,默认单并发运行,因此需要手动配置selects参数进行数据切片,例如:
| 否 | 无 |
columns | 读取字段列表。读取字段列表支持列裁剪和列换序,列裁剪指可以选择部分列进行导出,列换序指可以不按照表schema信息顺序进行导出。
| 是 | 无 |
Writer脚本Demo
配置一个数据源为MySQL,需要写入数据到Lindorm Table(对应SDK中的TableService模型)的作业。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "checkSlave": true, "datasource": " ", "envType": 1, "column": [ "id", "value", "table" ], "socketTimeout": 3600000, "masterSlave": "slave", "connection": [ { "datasource": " ", "table": [] } ], "where": "", "splitPk": "", "encoding": "UTF-8", "print": true }, "name": "mysqlReader", "category": "reader" }, { "stepType": "lindorm", "parameter": { "configuration": { "lindorm.client.seedserver": "xxxxxxx:30020", "lindorm.client.username": "xxxxxx", "lindorm.client.namespace": "default", "lindorm.client.password": "xxxxxx" }, "nullMode": "skip", "datasource": "", "writeMode": "api", "envType": 1, "columns": [ "id", "name", "age", "birthday", "gender" ], "dynamicColumn": "false", "table": "lindorm_table", "encoding": "utf8" }, "name": "Writer", "category": "writer" } ], "setting": { "jvmOption": "", "executeMode": null, "speed": { //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它。 "byte": 1048576 }, //出错限制 "errorLimit": { //出错的record条数上限,当大于该值即报错。 "record": 0, //出错的record百分比上限 1.0表示100%,0.02表示2%。 "percentage": 0.02 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
配置一个数据源为MySQL,需要写入数据到Lindorm wideColumn(对应SDK中的WideColumnService模型)作业。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "envType": 0, "datasource": " ", "column": [ "id", "name", "age", "birthday", "gender" ], "connection": [ { "datasource": " ", "table": [] } ], "where": "", "splitPk": "", "encoding": "UTF-8" }, "name": "Reader", "category": "reader" }, { "stepType": "lindorm", "parameter": { "configuration": { "lindorm.client.seedserver": "xxxxxxx:30020", "lindorm.client.username": "xxxxxx", "lindorm.client.namespace": "default", "lindorm.client.password": "xxxxxx" }, "writeMode": "api", "namespace": "default", "table": "xxxxxx", "encoding": "utf8", "nullMode": "skip", "dynamicColumn": "false", "caching": 128, "columns": [ "ROW|STRING", "cf:id|STRING", "cf:age|INT", "cf:birthday|STRING" ] }, "name":"Writer", "category":"writer" } ], "setting": { "jvmOption": "", "errorLimit": { "record": "0" }, "speed": { "concurrent": 3, "throttle": false } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
Writer脚本参数
参数 | 描述 | 是否必选 | 默认值 |
configuration | 每个lindorm集群提供给DataX客户端连接的配置信息,可以通过lindorm集群控制台查询,获取到配置信息后可以联系lindorm数据库管理员将其转换为如下JSON格式:{"key1":"value1","key2":"value2"}。 例如:{"lindorm.zookeeper.quorum":"????","lindorm.zookeeper.property.clientPort":"????"} 说明 如果是手写的JSON代码,则需要将双引号转义为\"。 | 是 | 无 |
table | 表示所要写入的lindorm表名。lindorm表名对大小写敏感。 | 是 | 无 |
namespace | 表示所要写入的lindorm表的命名空间。lindorm表的命名空间对大小写敏感。 | 是 | 无 |
encoding | 编码方式,取值为UTF-8或GBK。一般用于将二进制存储的lindorm byte[]类型转换为String类型。 | 否 | UTF-8 |
columns | 写入字段列表。写入字段列表支持列裁剪和列换序,列裁剪指可以选择部分列进行导出,列换序指可以不按照表schema信息顺序进行导出。
| 是 | 无 |
nullMode | 表示在读取源头数据的值为null时,Lindorm Writer 中的nullMode参数可通过配置不同内容,实现不同的处理方式。
| 否 | EMPTY_BYTES |