Lindorm数据源

DataWorks数据集成支持使用Lindorm ReaderLindorm Writer插件读取和写入Lindorm双向通道的功能,本文为您介绍DataWorksLindorm数据读取与写入能力。

使用限制

说明

Lindorm为多模数据库,详情请参见Lindorm使用文档,当前DataWorks仅支持宽表引擎及计算引擎两种。

支持的字段类型

Lindorm ReaderLindorm Writer支持大部分Lindorm类型,但也存在个别没有支持的情况,请注意检查您的数据类型。

Lindorm ReaderLindorm Writer针对Lindorm类型的转换列表,如下所示。

类型分类

数据类型

整数类

INT、LONG、SHORT

浮点类

DOUBLE、FLOAT、DOUBLE

字符串类

STRING

日期时间类

DATE

布尔类

BOOLEAN

二进制类

BINARYSTRING

数据同步任务开发

数据同步任务的配置入口和通用配置流程可参见下文的配置指导。

附录:脚本Demo与参数说明

离线任务脚本配置方式

如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见脚本模式配置,以下为您介绍脚本模式下数据源的参数配置详情。

Reader脚本Demo

  • 配置一个宽表引擎Lindorm SQL Table抽取数据到本地的作业。

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "lindorm",
                "parameter": {
                    "mode": "FixedColumn",
                    "caching": 128,
                    "column": [
                       "id",
                      "value" 
                    ],
                    "envType": 1,
                    "datasource": "lindorm",
                    "tableMode": "tableService",
                    "table": "lindorm_table"
                },
                "name": "lindormreader",
                "category": "reader"
            },
            {
                "stepType": "mysql",
                "parameter": {
                    "postSql": [],
                    "datasource": "lindorm",
                    "session": [],
                    "envType": 1,
                    "column": [
                        "id",
                        "value" 
                    ],
                    "socketTimeout": 3600000,
                    "writeMode": "insert",
                    "batchSize": 1024,
                    "encoding": "UTF-8",
                    "table": "",
                    "preSql": []
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "setting": {
            "jvmOption": "",
            "executeMode": null,
            "errorLimit": {
                "record": "0"
            },
            "speed": {
            //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它.
            "byte": 1048576
          }
          //出错限制
          "errorLimit": {
            //出错的record条数上限,当大于该值即报错。
            "record": 0,
            //出错的record百分比上限 1.0表示100%,0.02表示2%
            "percentage": 0.02
          }
        },
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
    }
  • 配置一个宽表引擎Lindorm HbaseLike(WideColumn)表抽取数据到本地的作业。

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "lindorm",
                "parameter": {
                    "mode": "FixedColumn",
                    "column":  [
                         "STRING|rowkey",
                          "INT|f:a"
                    ],
                    "envType": 1,
                    "datasource": "lindorm",
                    "tableMode": "wideColumn",
                    "table":"lindorm_table"
                },
                "name": "lindormreader",
                "category": "reader"
            },
            {
                "stepType": "mysql",
                "parameter": {
                    "postSql": [],
                    "datasource": "_IDB.TAOBAO",
                    "session": [],
                    "envType": 1,
                    "column": [
                        "id",
                        "value"
                    ],
                    "socketTimeout": 3600000,
                    "guid": "",
                    "writeMode": "insert",
                    "batchSize": 1024,
                    "encoding": "UTF-8",
                    "table": "",
                    "preSql": []
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "setting": {
            "jvmOption": "",
            "executeMode": null,
            "errorLimit": {
                "record": "0"
            },
            "speed": {
            //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它。
            "byte": 1048576
          }
            //出错限制
            "errorLimit": {
            //出错的record条数上限,当大于该值即报错。
            "record": 0,
            //出错的record百分比上限 1.0表示100%,0.02表示2%。
            "percentage": 0.02
          }
        },
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
    }
  • 配置一个计算引擎表抽取数据到本地的作业。

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "lindorm",
                "parameter": {
                   "datasource": "lindorm_datasource",
                    "column": [
                       "id",
                       "value"
                    ],
                    "tableComment": "",
                    "where": "",
                    "session": [],
                    "splitPk": "id",
                    "table": "auto_ob_149912212480"
                },
                "name": "lindormreader",
                "category": "reader"
            },
            {
                "stepType": "mysql",
                "parameter": {
                    "postSql": [],
                    "datasource": "_IDB.TAOBAO",
                    "session": [],
                    "envType": 1,
                    "column": [
                        "id",
                        "value"
                    ],
                    "socketTimeout": 3600000,
                    "guid": "",
                    "writeMode": "insert",
                    "batchSize": 1024,
                    "encoding": "UTF-8",
                    "table": "",
                    "preSql": []
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "setting": {
            "jvmOption": "",
            "executeMode": null,
            "errorLimit": {
                "record": "0"
            },
            "speed": {
            //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它。
            "byte": 1048576
          }
            //出错限制
            "errorLimit": {
            //出错的record条数上限,当大于该值即报错。
            "record": 0,
            //出错的record百分比上限 1.0表示100%,0.02表示2%。
            "percentage": 0.02
          }
        },
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
    }

Reader脚本参数

参数

描述

是否必选

默认值

mode

宽表引擎特有,表示数据读取模式,包括固定列模式FixedColumn和动态列模式DynamicColumn

FixedColumn

tableMode

宽表引擎特有,包括普通表SQL模式table和宽表模式wideColumn。默认为table,如果选择table模式,可不填写。

默认不填写

table

表示所要读取的lindorm表名。lindorm表名对大小写敏感。

encoding

宽表引擎特有,编码方式,取值为UTF-8GBK。一般用于将二进制存储的lindorm byte[]类型转换为String类型。

UTF-8

caching

宽表引擎特有,一次性批量获取的记录数大小,该值可以极大减少数据同步系统与Lindorm的网络交互次数,并提升整体吞吐量。如果该值设置过大,会导致Lindorm服务端压力过大或者数据同步运行进程OOM异常。

100

selects

宽表引擎特有,当前读取的Table类型数据不支持自动切割分片,默认单并发运行,因此需要手动配置selects参数进行数据切片,例如:

selects": [
    "where(compare(\"id\", LESS, 5))",
    "where(and(compare(\"id\", GREATER_OR_EQUAL, 5), compare(\"id\", LESS, 10)))",
    "where(compare(\"id\", GREATER_OR_EQUAL, 10))"
    ],

使用限制

  • 仅允许主键列和索引列作为查询条件,如果使用普通列作为查询条件,将会触发大请求扫描,影响源集群的稳定性。

  • 当表包含多个主键列时,查询条件必须遵循主键的最左匹配原则,即左侧连续的n-1个主键列都需使用等于条件。例如:假设某表的主键为 [id, order_time],普通列为 [type, data],其中 type 列已创建二级索引。

    • 推荐语法示例

      SQL语法

      插件语法

      where id >= 1 and id < 100

      where(and(compare(\"id\", GREATER_OR_EQUAL, 1), compare(\"id\", LESS, 100)))

      where id = 1 and order_time > 1234567

      where(and(compare(\"id\", EQUAL, 1), compare(\"order_time\", GREATER, 1234567)))

      where type = 'pay'

      where(compare(\"type\", EQUAL, \"pay\"))
    • 不推荐语法示例

      SQL语法

      插件语法

      不推荐原因

      where order_time >= 1234567 and order_time < 5678910

      where(and(compare(\"order_time\", GREATER_OR_EQUAL, 1234567), compare(\"order_time\", LESS, 5678910)))

      缺少左侧主键列id

      where id > 1 and order_time > 1234567

      where(and(compare(\"id\", GREATER, 1), compare(\"order_time\", GREATER, 1234567)))

      左侧主键列id不是等于。

      where data > 'xxx'

      where(compare(\"data\", GREATER, \"xxx\"))

      data字段是非主键列。

session

计算引擎特有,Session粒度作业参数,例如set hive.execution.engine=tez

splitPk

计算引擎特有,切分键,计算引擎表读取特有,如果指定splitPk,表示您希望使用splitPk代表的字段进行数据分片,数据同步因此会启动并发任务进行数据同步,提高数据同步的效率。

  • 如果不填写splitPk,包括不提供splitPk或者splitPk值为空,数据同步视作使用单通道同步该表数据。

  • 目前splitPk仅支持整型数据切分,不支持字符串、浮点和日期等其他类型。

columns

读取字段列表。读取字段列表支持列裁剪和列换序,列裁剪指可以选择部分列进行导出,列换序指可以不按照表schema信息顺序进行导出。

  • table类型的表,只需要填写列名即可,会自动从表的meta获取schema信息。示例如下:

    [
        "id",
        "name",
        "age",
        "birthday",
        "gender"
    ]
  • HbaseLike(widecolumn)类型的表。示例如下:

    [
        "STRING|rowkey",
        "INT|f:a",
        "DOUBLE|f:b"
    ]

Writer脚本Demo

  • 配置一个数据源为MySQL,需要写入数据到宽表引擎 Lindorm SQL Table的作业。

    {
      "type": "job",
      "version": "2.0",
      "steps": [
        {
          "stepType": "mysql",
          "parameter": {
            "checkSlave": true,
            "datasource": " ",
            "envType": 1,
            "column": [
              "id",
              "value"
            ],
            "socketTimeout": 3600000,
            "masterSlave": "slave",
            "connection": [
              {
                "datasource": " ",
                "table": []
              }
            ],
            "where": "",
            "splitPk": "",
            "encoding": "UTF-8",
            "print": true
          },
          "name": "mysqlReader",
          "category": "reader"
        },
        {
          "stepType": "lindorm",
          "parameter": {
            "nullMode": "skip",
            "datasource": "lindorm_datasource",
            "envType": 1,
            "column": [
              "id",
              "value"
            ],
            "dynamicColumn": "false",
            "table": "lindorm_table",
            "encoding": "utf8"
          },
          "name": "Writer",
          "category": "writer"
        }
      ],
      "setting": {
        "jvmOption": "",
        "executeMode": null,
        "speed": {
          //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它。
          "byte": 1048576
        },
        //出错限制
        "errorLimit": {
          //出错的record条数上限,当大于该值即报错。
          "record": 0,
          //出错的record百分比上限 1.0表示100%,0.02表示2%。
          "percentage": 0.02
        }
      },
      "order": {
        "hops": [
          {
            "from": "Reader",
            "to": "Writer"
          }
        ]
      }
    }
  • 配置一个数据源为MySQL,需要写入数据到宽表引擎 Lindorm HbaseLike (WideColumn)表的作业。

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "mysql",
                "parameter": {
                    "envType": 0,
                    "datasource": " ",
                    "column": [
                         "id",
                         "value"
                    ],
                    "connection": [
                        {
                            "datasource": " ",
                            "table": []
                        }
                    ],
                    "where": "",
                    "splitPk": "",
                    "encoding": "UTF-8"
                },
                "name": "Reader",
                "category": "reader"
    
            },
          {
              "stepType": "lindorm",
              "parameter": {
                "datasource": "lindorm_datasource",
                "table": "xxxxxx",
                "encoding": "utf8",
                "nullMode": "skip",
                "dynamicColumn": "false",
                "caching": 128,
                "column": [  //从源端按字段顺序映射
                      "ROW|STRING", //行键,固定配置,将源端第一个字段映射为行键,例如本示例中将id映射为行键。
                      "cf:name|STRING" //cf表示列族名,可修改,name表示目标端列名,可修改
                ]
                },
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting": {
            "jvmOption": "",
                    "errorLimit": {
                            "record": "0"
                    },
                    "speed": {
                        "concurrent": 3,
                        "throttle": false
                    }
        },
        "order": {
                "hops": [
                   {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
      }
  • 配置一个数据源为MySQL,需要写入数据到计算引擎表的作业。

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "mysql",
                "parameter": {
                    "envType": 0,
                    "datasource": " ",
                    "column": [
                         "id",
                         "value"
                    ],
                    "connection": [
                        {
                            "datasource": " ",
                            "table": []
                        }
                    ],
                    "where": "",
                    "splitPk": "",
                    "encoding": "UTF-8"
                },
                "name": "Reader",
                "category": "reader"
    
            },
          {
              "stepType": "lindorm",
              "parameter": {
                  "datasource": "lindorm_datasource",
                  "table": "xxxxxx",
                  "column": [ 
                     "id",
                    "value"
                   ],
                  "formatType": "ICEBERG"
                },
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting": {
            "jvmOption": "",
                    "errorLimit": {
                            "record": "0"
                    },
                    "speed": {
                        "concurrent": 3,
                        "throttle": false
                    }
        },
        "order": {
                "hops": [
                   {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
      }

Writer脚本参数

参数

描述

是否必选

默认值

table

表示所要写入的lindorm表名。lindorm表名对大小写敏感。

encoding

宽表引擎特有,编码方式,取值为UTF-8GBK。一般用于将二进制存储的lindorm byte[]类型转换为String类型。

UTF-8

columns

写入字段列表。写入字段列表支持列裁剪和列换序,列裁剪指可以选择部分列进行导出,列换序指可以不按照表schema信息顺序进行导出。

  • table类型的表,只需要填写列名即可,会自动从表的meta获取schema信息。

  • widecolumn类型或table类型的表。

nullMode

宽表引擎特有,表示在读取源头数据的值为null时,Lindorm Writer 中的nullMode参数可通过配置不同内容,实现不同的处理方式。

  • SKIP:表示不向Lindorm写这列。

  • EMPTY_BYTES:表示遇到字段值为空时,写入空字节数组到Lindorm对应的字段。

  • NULL:表示写入null值。

  • DELETE:表示遇到字段值为空时删除Lindorm中对应的字段。

EMPTY_BYTES

formatType

计算引擎特有,比较待同步表的类型,取值范围:

  • iceberg

  • parquet

  • orc