本文为您介绍Lindorm Writer的实现原理、参数定义及配置示例。

背景信息

Lindorm Writer插件实现了将数据写入Lindorm的功能。在底层实现上,Lindorm Writer通过Lindorm的Java客户端远程连接Lindorm服务,并支持通过对应的API将数据写入Lindorm的table类型或wideColumn类型的表中。
说明
  • LindormWriter的必填配置项configuration,可以通过Lindorm集群控制台查看连接Lindorm相关的配置项进行获取,并以JSON格式填写相关信息。
  • Lindorm为多模数据库,Lindorm Wirter支持数据写入table和wideColumn类型的模型表,关于table和wideColumn类型的详细介绍请参见Lindorm使用文档您也可以通过钉钉咨询Lindorm值班人员。

使用限制

Lindorm Writer仅支持使用新增和使用独享数据集成资源组,不支持使用公共资源组自定义资源组

类型转换

Lindorm Writer支持大部分Lindorm类型,但也存在个别没有支持的情况,请注意检查您的数据类型。

Lindorm Writer针对Lindorm类型的转换列表,如下所示。
类型分类数据类型
整数类INT、LONG、SHORT
浮点类DOUBLE、FLOAT、DOUBLE
字符串类STRING
日期时间类DATE
布尔类BOOLEAN
二进制类BINARYSTRING

参数说明

参数描述是否必选默认值
configuration每个lindorm集群提供给DataX客户端连接的配置信息,可以通过lindorm集群控制台查询,获取到配置信息后可以联系lindorm数据库管理员将其转换为如下JSON格式:{"key1":"value1","key2":"value2"}

例如:{"lindorm.zookeeper.quorum":"????","lindorm.zookeeper.property.clientPort":"????"}

说明 如果是手写的JSON代码,则需要将双引号转义为\"
dynamicColumn表示dynamic动态列模式,该模式配置较为复杂,一般不使用该模式。可选值包括truefalse,默认选择falsefalse
table表示所要写入的lindorm表名。lindorm表名对大小写敏感。
namespace表示所要写入的lindorm表的命名空间。lindorm表的命名空间对大小写敏感。
encoding编码方式,取值为UTF-8或GBK。一般用于将二进制存储的lindorm byte[]类型转换为String类型。UTF-8
columns写入字段列表。写入字段列表支持列裁剪和列换序,列裁剪指可以选择部分列进行导出,列换序指可以不按照表schema信息顺序进行导出。
  • table类型的表,只需要填写列名即可,会自动从表的meta获取schema信息。
  • widecolumn类型或table类型的表。

脚本开发介绍

  • 配置一个数据源为MySQL,需要写入数据到Lindorm Table(对应SDK中的TableService模型)的作业,使用脚本开发的详情请参见通过脚本模式配置离线同步任务
    说明 实际运行时,请删除下述代码中的注释。
    {
      "type": "job",
      "version": "2.0",
      "steps": [
        {
          "stepType": "mysql",
          "parameter": {
            "checkSlave": true,
            "datasource": " ",
            "envType": 1,
            "column": [
              "id",
              "value",
              "table"
            ],
            "socketTimeout": 3600000,
            "masterSlave": "slave",
            "connection": [
              {
                "datasource": " ",
                "table": []
              }
            ],
            "where": "",
            "splitPk": "",
            "encoding": "UTF-8",
            "print": true
          },
          "name": "mysqlReader",
          "category": "reader"
        },
        {
          "stepType": "lindorm",
          "parameter": {
            "configuration":  {
              "lindorm.client.seedserver": "xxxxxxx:30020",
              "lindorm.client.username": "xxxxxx",
              "lindorm.client.namespace": "default",
              "lindorm.client.password": "xxxxxx"
            },
            "nullMode": "skip",
            "datasource": "",
            "writeMode": "api",
            "envType": 1,
            "columns": [
              "id",
              "name",
              "age",
              "birthday",
              "gender"
            ],
            "dynamicColumn": "false",
            "table": "lindorm_table",
            "encoding": "utf8"
          },
          "name": "Writer",
          "category": "writer"
        }
      ],
      "setting": {
        "jvmOption": "",
        "executeMode": null,
        "speed": {
          //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它。
          "byte": 1048576
        },
        //出错限制
        "errorLimit": {
          //出错的record条数上限,当大于该值即报错。
          "record": 0,
          //出错的record百分比上限 1.0表示100%,0.02表示2%。
          "percentage": 0.02
        }
      },
      "order": {
        "hops": [
          {
            "from": "Reader",
            "to": "Writer"
          }
        ]
      }
    }
  • 配置一个数据源为MySQL,需要写入数据到Lindorm wideColumn(对应SDK中的WideColumnService模型)作业,使用脚本开发的详情请参见通过脚本模式配置离线同步任务
    说明 实际运行时,请删除下述代码中的注释。
    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "mysql",
                "parameter": {
                    "envType": 0,
                    "datasource": " ",
                    "column": [
                         "id",
                        "name",
                        "age",
                        "birthday",
                        "gender"
                    ],
                    "connection": [
                        {
                            "datasource": " ",
                            "table": []
                        }
                    ],
                    "where": "",
                    "splitPk": "",
                    "encoding": "UTF-8"
                },
                "name": "Reader",
                "category": "reader"
    
            },
          {
              "stepType": "lindorm",
              "parameter": {
                     "configuration":  {
                      "lindorm.client.seedserver": "xxxxxxx:30020",
                      "lindorm.client.username": "xxxxxx",
                      "lindorm.client.namespace": "default",
                      "lindorm.client.password": "xxxxxx"
                    },
                "writeMode": "api",
                "namespace": "default",
                "table": "xxxxxx",
                "encoding": "utf8",
                "nullMode": "skip",
                "dynamicColumn": "false",
                "caching": 128,
                "columns": [
                      "ROW|STRING",
                      "cf:id|STRING",
                      "cf:age|INT",
                      "cf:birthday|STRING"
                    ]
                  },
              "name": "Writer",
              "category": "writer"
            }
        ],
        "setting": {
            "jvmOption": "",
                    "errorLimit": {
                            "record": "0"
                    },
                    "speed": {
                        "concurrent": 3,
                        "throttle": false
                    }
        },
        "order": {
                "hops": [
                   {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
      }