Lindorm数据源

DataWorks数据集成支持使用Lindorm Reader和Lindorm Writer插件读取和写入Lindorm双向通道的功能,本文为您介绍DataWorks的Lindorm数据读取与写入能力。

使用限制

  • Lindorm需要在DataWorks上配置数据源,通过Lindorm Reader和Lindorm Writer插件读取与写入Lindorm数据。

  • Lindorm Reader和LindormWriter支持使用Serverless资源组(推荐)独享数据集成资源组

  • Lindorm 时序引擎目前不支持作为DataWorks数据集成的数据源。

  • LindormReader和LindormWriter的必填配置项configuration,可以通过Lindorm集群控制台查看连接Lindorm的相关配置项进行获取,并以JSON格式填写相关信息。

    说明

    Lindorm为多模数据库,LindormReader和LindormWriter支持读取table和widecolumn类型的数据,关于table和widecolumn类型的详细介绍请参见Lindorm使用文档,您也可以通过钉钉咨询Lindorm值班人员。

支持的字段类型

Lindorm Reader和Lindorm Writer支持大部分Lindorm类型,但也存在个别没有支持的情况,请注意检查您的数据类型。

Lindorm Reader和Lindorm Writer针对Lindorm类型的转换列表,如下所示。

类型分类

数据类型

整数类

INT、LONG、SHORT

浮点类

DOUBLE、FLOAT、DOUBLE

字符串类

STRING

日期时间类

DATE

布尔类

BOOLEAN

二进制类

BINARYSTRING

数据同步任务开发

数据同步任务的配置入口和通用配置流程可参见下文的配置指导。

附录:脚本Demo与参数说明

离线任务脚本配置方式

如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。

Reader脚本Demo

  • 配置一个Lindorm Table(对应SDK中的TableService模型)抽取数据到本地的作业。

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "lindorm",
                "parameter": {
                    "mode": "FixedColumn",
                "caching": 128,
                    "configuration": {    //lindorm控制台中与连接相关的配置项,以JSON格式填写
                        "lindorm.client.username": "",
                        "lindorm.client.seedserver": "seddserver.et2sqa.tbsite.net:30020",
                        "lindorm.client.namespace": "namespace",
                        "lindorm.client.password": ""
                    },
                    "columns": [
                        "id",
                        "name",
                        "age",
                        "birthday",
                        "gender"
                    ],
                    "envType": 1,
                    "datasource": "_LINDORM",
                    "namespace": "namespace",
                    "table": "lindorm_table"
                },
                "name": "lindormreader",
                "category": "reader"
            },
            {
                "stepType": "mysql",
                "parameter": {
                    "postSql": [],
                    "datasource": "_IDB.TAOBAO",
                    "session": [],
                    "envType": 1,
                    "columns": "columns": [
                        "id",
                        "name",
                        "age",
                        "birthday",
                        "gender"
                    ],
             "selects": [
                        "where(compare(\"id\", LESS, 5))",
                        "where(and(compare(\"id\", GREATER_OR_EQUAL, 5), compare(\"id\", LESS, 10)))",
                        "where(compare(\"id\", GREATER_OR_EQUAL, 10))"
                    ],
                    "socketTimeout": 3600000,
                    "guid": "",
                    "writeMode": "insert",
                    "batchSize": 1024,
                    "encoding": "UTF-8",
                    "table": "",
                    "preSql": []
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "setting": {
            "jvmOption": "",
            "executeMode": null,
            "errorLimit": {
                "record": "0"
            },
            "speed": {
            //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它.
            "byte": 1048576
          }
          //出错限制
          "errorLimit": {
            //出错的record条数上限,当大于该值即报错。
            "record": 0,
            //出错的record百分比上限 1.0表示100%,0.02表示2%
            "percentage": 0.02
          }
        },
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
    }
  • 配置一个Lindorm wideColumn(对应SDK中的WideColumnService模型)抽取数据到本地的作业。

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "lindorm",
                "parameter": {
                    "mode": "FixedColumn",
                    "configuration": {  //lindorm控制台中与连接相关的配置项,以JSON格式填写
                        "lindorm.client.username": "",
                        "lindorm.client.seedserver": "seddserver.et2sqa.tbsite.net:30020",
                        "lindorm.client.namespace": "namespace",
                        "lindorm.client.password": ""
                    },
                    "columns":  [
                       "STRING|rowkey",
                          "INT|f:a",
                          "DOUBLE|f:b"
                    ],
                    "envType": 1,
                    "datasource": "_LINDORM",
                    "namespace": "namespace",
                    "tableMode": "wideColumn",
                    "table":"yourTableName"
                },
                "name": "lindormreader",
                "category": "reader"
            },
            {
                "stepType": "mysql",
                "parameter": {
                    "postSql": [],
                    "datasource": "_IDB.TAOBAO",
                    "session": [],
                    "envType": 1,
                    "column": [
                        "id",
                        "value"
                    ],
                    "socketTimeout": 3600000,
                    "guid": "",
                    "writeMode": "insert",
                    "batchSize": 1024,
                    "encoding": "UTF-8",
                    "table": "",
                    "preSql": []
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "setting": {
            "jvmOption": "",
            "executeMode": null,
            "errorLimit": {
                "record": "0"
            },
            "speed": {
            //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它。
            "byte": 1048576
          }
            //出错限制
            "errorLimit": {
            //出错的record条数上限,当大于该值即报错。
            "record": 0,
            //出错的record百分比上限 1.0表示100%,0.02表示2%。
            "percentage": 0.02
          }
        },
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
    }

Reader脚本参数

参数

描述

是否必选

默认值

configuration

表示每个lindorm集群提供给DataX客户端连接的配置信息,可以通过lindorm集群控制台查询,获取到配置信息后可以联系lindorm数据库管理员将其转换为如下JSON格式:{"key1":"value1","key2":"value2"}

例如:{"lindorm.zookeeper.quorum":"????","lindorm.zookeeper.property.clientPort":"????"}

说明

如果是手工编写的JSON代码,则需要将JSON格式中value值的双引号转义为\"

mode

表示数据读取模式,包括固定列模式FixedColumn和动态列模式DynamicColumn。默认选择FixedColumn

FixedColumn

tableMode

包括普通表模式table和宽表模式wideColumn。默认为table,如果选择table模式,可不填写。

默认不填写

table

表示所要读取的lindorm表名。lindorm表名对大小写敏感。

namespace

表示所要读取的lindorm表的命名空间。lindorm表的命名空间对大小写敏感。

encoding

编码方式,取值为UTF-8或GBK。一般用于将二进制存储的lindorm byte[]类型转换为String类型。

UTF-8

caching

一次性批量获取的记录数大小,该值可以极大减少数据同步系统与Lindorm的网络交互次数,并提升整体吞吐量。如果该值设置过大,会导致Lindorm服务端压力过大或者数据同步运行进程OOM异常。

100

selects

当前读取的Table类型数据不支持自动切割分片,默认单并发运行,因此需要手动配置selects参数进行数据切片,例如:

selects": [
                    "where(compare(\"id\", LESS, 5))",
                    "where(and(compare(\"id\", GREATER_OR_EQUAL, 5), compare(\"id\", LESS, 10)))",
                    "where(compare(\"id\", GREATER_OR_EQUAL, 10))"
                ],

columns

读取字段列表。读取字段列表支持列裁剪和列换序,列裁剪指可以选择部分列进行导出,列换序指可以不按照表schema信息顺序进行导出。

  • table类型的表,只需要填写列名即可,会自动从表的meta获取schema信息。示例如下:

    table类型:
    [
        "id",
        "name",
        "age",
        "birthday",
        "gender"
    ]
  • widecolumn类型的表。示例如下:

    Widecolumn类型:
    [
        "STRING|rowkey",
        "INT|f:a",
        "DOUBLE|f:b"
    ]

Writer脚本Demo

  • 配置一个数据源为MySQL,需要写入数据到Lindorm Table(对应SDK中的TableService模型)的作业。

    {
      "type": "job",
      "version": "2.0",
      "steps": [
        {
          "stepType": "mysql",
          "parameter": {
            "checkSlave": true,
            "datasource": " ",
            "envType": 1,
            "column": [
              "id",
              "value",
              "table"
            ],
            "socketTimeout": 3600000,
            "masterSlave": "slave",
            "connection": [
              {
                "datasource": " ",
                "table": []
              }
            ],
            "where": "",
            "splitPk": "",
            "encoding": "UTF-8",
            "print": true
          },
          "name": "mysqlReader",
          "category": "reader"
        },
        {
          "stepType": "lindorm",
          "parameter": {
            "configuration":  {
              "lindorm.client.seedserver": "xxxxxxx:30020",
              "lindorm.client.username": "xxxxxx",
              "lindorm.client.namespace": "default",
              "lindorm.client.password": "xxxxxx"
            },
            "nullMode": "skip",
            "datasource": "",
            "writeMode": "api",
            "envType": 1,
            "columns": [
              "id",
              "name",
              "age",
              "birthday",
              "gender"
            ],
            "dynamicColumn": "false",
            "table": "lindorm_table",
            "encoding": "utf8"
          },
          "name": "Writer",
          "category": "writer"
        }
      ],
      "setting": {
        "jvmOption": "",
        "executeMode": null,
        "speed": {
          //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它。
          "byte": 1048576
        },
        //出错限制
        "errorLimit": {
          //出错的record条数上限,当大于该值即报错。
          "record": 0,
          //出错的record百分比上限 1.0表示100%,0.02表示2%。
          "percentage": 0.02
        }
      },
      "order": {
        "hops": [
          {
            "from": "Reader",
            "to": "Writer"
          }
        ]
      }
    }
  • 配置一个数据源为MySQL,需要写入数据到Lindorm wideColumn(对应SDK中的WideColumnService模型)作业。

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "mysql",
                "parameter": {
                    "envType": 0,
                    "datasource": " ",
                    "column": [
                         "id",
                        "name",
                        "age",
                        "birthday",
                        "gender"
                    ],
                    "connection": [
                        {
                            "datasource": " ",
                            "table": []
                        }
                    ],
                    "where": "",
                    "splitPk": "",
                    "encoding": "UTF-8"
                },
                "name": "Reader",
                "category": "reader"
    
            },
          {
              "stepType": "lindorm",
              "parameter": {
                     "configuration":  {
                      "lindorm.client.seedserver": "xxxxxxx:30020",
                      "lindorm.client.username": "xxxxxx",
                      "lindorm.client.namespace": "default",
                      "lindorm.client.password": "xxxxxx"
                    },
                "writeMode": "api",
                "namespace": "default",
                "table": "xxxxxx",
                "encoding": "utf8",
                "nullMode": "skip",
                "dynamicColumn": "false",
                "caching": 128,
                "columns": [
                      "ROW|STRING",
                      "cf:id|STRING",
                      "cf:age|INT",
                      "cf:birthday|STRING"
                    ]
                  },
              "name":"Writer",
        "category":"writer"
            }
        ],
        "setting": {
            "jvmOption": "",
                    "errorLimit": {
                            "record": "0"
                    },
                    "speed": {
                        "concurrent": 3,
                        "throttle": false
                    }
        },
        "order": {
                "hops": [
                   {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
      }

Writer脚本参数

参数

描述

是否必选

默认值

configuration

每个lindorm集群提供给DataX客户端连接的配置信息,可以通过lindorm集群控制台查询,获取到配置信息后可以联系lindorm数据库管理员将其转换为如下JSON格式:{"key1":"value1","key2":"value2"}

例如:{"lindorm.zookeeper.quorum":"????","lindorm.zookeeper.property.clientPort":"????"}

说明

如果是手写的JSON代码,则需要将双引号转义为\"

table

表示所要写入的lindorm表名。lindorm表名对大小写敏感。

namespace

表示所要写入的lindorm表的命名空间。lindorm表的命名空间对大小写敏感。

encoding

编码方式,取值为UTF-8或GBK。一般用于将二进制存储的lindorm byte[]类型转换为String类型。

UTF-8

columns

写入字段列表。写入字段列表支持列裁剪和列换序,列裁剪指可以选择部分列进行导出,列换序指可以不按照表schema信息顺序进行导出。

  • table类型的表,只需要填写列名即可,会自动从表的meta获取schema信息。

  • widecolumn类型或table类型的表。

nullMode

表示在读取源头数据的值为null时,Lindorm Writer 中的nullMode参数可通过配置不同内容,实现不同的处理方式。

  • SKIP:表示不向Lindorm写这列。

  • EMPTY_BYTES:表示遇到字段值为空时,写入空字节数组到Lindorm对应的字段。

  • NULL:表示写入null值。

  • DELETE:表示遇到字段值为空时删除Lindorm中对应的字段。

EMPTY_BYTES