本文为您介绍Lindorm Reader支持的实现原理、参数定义及配置示例。

背景信息

LindormReader 插件提供了从Lindorm读取数据。在底层实现上,LindormReader通过Lindorm的Java客户端连接远程的Lindorm服务,并且支持通过对应的API读取Table或者WideColumn类型的数据,并将读取的数据使用DataX自定义的数据类型封装为抽象的数据集,传递给下游Writer处理。
说明
  • LindormReader的必填配置项configuration,可以通过Lindorm集群控制台查看连接Lindorm的相关配置项进行获取,并以JSON格式填写相关信息。
  • Lindorm为多模数据库,LindormReader支持读取table和widecolumn类型的数据,关于table和widecolumn类型的详细介绍请参见Lindorm使用文档,您也可以通过钉钉咨询Lindorm值班人员。

使用限制

Lindorm Reader仅支持使用新增和使用独享数据集成资源组,不支持使用使用公共资源组自定义资源组

类型转换

Lindorm Reader支持大部分Lindorm类型,但也存在个别没有支持的情况,请注意检查您的数据类型。

Lindorm Reader针对Lindorm类型的转换列表,如下所示。
类型分类 数据类型
整数类 INT、LONG、SHORT
浮点类 DOUBLE、FLOAT、DOUBLE
字符串类 STRING
日期时间类 DATE
布尔类 BOOLEAN
二进制类 BINARYSTRING

参数说明

参数 描述 是否必选 默认值
configuration 表示每个lindorm集群提供给DataX客户端连接的配置信息,可以通过lindorm集群控制台查询,获取到配置信息后可以联系lindorm数据库管理员将其转换为如下JSON格式:{"key1":"value1","key2":"value2"}

例如:{"lindorm.zookeeper.quorum":"????","lindorm.zookeeper.property.clientPort":"????"}

说明 如果是手工编写的JSON代码,则需要将JSON格式中value值的双引号转义为\"
mode 表示数据读取模式,包括固定列模式FixedColumn和动态列模式DynamicColumn。默认选择FixedColumn FixedColumn
tablemode 包括普通表模式table和宽表模式wideColumn。默认为table,如果选择table模式,可不填写。 默认不填写
table 表示所要读取的lindorm表名。lindorm表名对大小写敏感。
namespace 表示所要读取的lindorm表的命名空间。lindorm表的命名空间对大小写敏感。
encoding 编码方式,取值为UTF-8或GBK。一般用于将二进制存储的lindorm byte[]类型转换为String类型。 UTF-8
selects 当前读取的Table类型数据不支持自动切割分片,默认单并发运行,因此需要手动配置selects参数进行数据切片,例如:
selects": [
                    "where(compare(\"id\", LESS, 5))",
                    "where(and(compare(\"id\", GREATER_OR_EQUAL, 5), compare(\"id\", LESS, 10)))",
                    "where(compare(\"id\", GREATER_OR_EQUAL, 10))"
                ],
columns 读取字段列表。读取字段列表支持列裁剪和列换序,列裁剪指可以选择部分列进行导出,列换序指可以不按照表schema信息顺序进行导出。
  • table类型的表,只需要填写列名即可,会自动从表的meta获取schema信息。示例如下:
    table类型:
    [
        "id",
        "name",
        "age",
        "birthday",
        "gender"
    ]
  • widecolumn类型的表。示例如下:
    Widecolumn类型:
    [
        "STRING|rowkey",
        "INT|f:a",
        "DOUBLE|f:b"
    ]

向导开发介绍

暂不支持向导开发模式。

脚本开发介绍

  • 配置一个Lindorm Table(对应SDK中的TableService模型)抽取数据到本地的作业,使用脚本开发的详情请参见通过脚本模式配置离线同步任务
    说明 实际运行时,请删除下述代码中的注释。
    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "lindorm",
                "parameter": {
                    "mode": "FixedColumn",
                "caching": 128,
                    "configuration": {    //lindorm控制台中与连接相关的配置项,以JSON格式填写
                        "lindorm.client.username": "",
                        "lindorm.client.seedserver": "seddserver.et2sqa.tbsite.net:30020",
                        "lindorm.client.namespace": "namespace",
                        "lindorm.client.password": ""
                    },
                    "columns": [
                        "id",
                        "name",
                        "age",
                        "birthday",
                        "gender"
                    ],
                    "envType": 1,
                    "datasource": "_LINDORM",
                    "namespace": "namespace",
                    "table": "lindorm_table"
                },
                "name": "lindormreader",
                "category": "reader"
            },
            {
                "stepType": "mysql",
                "parameter": {
                    "postSql": [],
                    "datasource": "_IDB.TAOBAO",
                    "session": [],
                    "envType": 1,
                    "columns": "columns": [
                        "id",
                        "name",
                        "age",
                        "birthday",
                        "gender"
                    ],
             "selects": [
                        "where(compare(\"id\", LESS, 5))",
                        "where(and(compare(\"id\", GREATER_OR_EQUAL, 5), compare(\"id\", LESS, 10)))",
                        "where(compare(\"id\", GREATER_OR_EQUAL, 10))"
                    ],
                    "socketTimeout": 3600000,
                    "guid": "",
                    "writeMode": "insert",
                    "batchSize": 1024,
                    "encoding": "UTF-8",
                    "table": "",
                    "preSql": []
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "setting": {
            "jvmOption": "",
            "executeMode": null,
            "errorLimit": {
                "record": "0"
            },
            "speed": {
            //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它.
            "byte": 1048576
          }
          //出错限制
          "errorLimit": {
            //出错的record条数上限,当大于该值即报错。
            "record": 0,
            //出错的record百分比上限 1.0表示100%,0.02表示2%
            "percentage": 0.02
          }
        },
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
    }
  • 配置一个Lindorm wideColumn(对应SDK中的WideColumnService模型)抽取数据到本地的作业,使用脚本开发的详情请参见通过脚本模式配置离线同步任务
    说明 实际运行时,请删除下述代码中的注释。
    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "lindorm",
                "parameter": {
                    "mode": "FixedColumn",
                    "configuration": {  //lindorm控制台中与连接相关的配置项,以JSON格式填写
                        "lindorm.client.username": "",
                        "lindorm.client.seedserver": "seddserver.et2sqa.tbsite.net:30020",
                        "lindorm.client.namespace": "namespace",
                        "lindorm.client.password": ""
                    },
                    "columns":  [
                       "STRING|rowkey",
                          "INT|f:a",
                          "DOUBLE|f:b"
                    ],
                    "envType": 1,
                    "datasource": "_LINDORM",
                    "namespace": "namespace",
                    "table": "wideColumn"
                },
                "name": "lindormreader",
                "category": "reader"
            },
            {
                "stepType": "mysql",
                "parameter": {
                    "postSql": [],
                    "datasource": "_IDB.TAOBAO",
                    "session": [],
                    "envType": 1,
                    "column": [
                        "id",
                        "value"
                    ],
                    "socketTimeout": 3600000,
                    "guid": "",
                    "writeMode": "insert",
                    "batchSize": 1024,
                    "encoding": "UTF-8",
                    "table": "",
                    "preSql": []
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "setting": {
            "jvmOption": "",
            "executeMode": null,
            "errorLimit": {
                "record": "0"
            },
            "speed": {
            //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它。
            "byte": 1048576
          }
            //出错限制
            "errorLimit": {
            //出错的record条数上限,当大于该值即报错。
            "record": 0,
            //出错的record百分比上限 1.0表示100%,0.02表示2%。
            "percentage": 0.02
          }
        },
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
    }