文档

KingbaseES(人大金仓)数据源

更新时间:

KingbaseES数据源为您提供读取和写入KingbaseES双向通道的功能,本文为您介绍DataWorks的KingbaseES数据同步的能力支持情况。

使用限制

  • 目前该数据源仅支持独享数据集成资源组

  • 整个同步任务必须具备insert/replace into的权限。配置同步任务时,您可以在preSqlpostSql中,通过指定语句判断是否需要其它权限。

支持的字段类型

KingbaseES Reader针对KingbaseES类型的转换列表,如下所示。

类型分类

数据源的数据类型

整数类

INT、TINYINT、SMALLINT、MEDIUMINT和BIGINT

浮点类

FLOAT、DOUBLE和DECIMAL

字符串类

VARCHAR、CHAR、TINYTEXT、TEXT、MEDIUMTEXT和LONGTEXT

日期时间类

DATE、DATETIME、TIMESTAMP、TIME和YEAR

布尔型

BIT和BOOL

二进制类

TINYBLOB、MEDIUMBLOB、BLOB、LONGBLOB和VARBINARY

重要
  • 除上述罗列字段类型外,其它类型均不支持。

  • KingbaseES Reader插件将tinyint(1)视作整型。

数据同步任务开发

KingbaseES数据同步任务的配置入口和通用配置流程指导可参见下文的配置指导,详细的配置参数解释可在配置界面查看对应参数的文案提示。

创建数据源

在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建与管理数据源

单表离线同步任务配置指导

附录:脚本Demo与参数说明

附录:离线任务脚本配置方式

如果您配置离线任务时使用脚本模式的方式进行配置,您需要在任务脚本中按照脚本的统一格式要求编写脚本中的reader参数和writer参数,脚本模式的统一要求请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下的数据源的Reader参数和Writer参数的指导详情。

Reader脚本Demo

  • 配置单库单表

    {
        "type":"job",
        "version":"2.0",//版本号。
        "steps":[
            {
                "stepType":"kingbasees",//插件名。
                "parameter":{
                    "column":[//列名。
                        "id"
                    ],
                    "connection":[
                        {   "querySql":["select a,b from join1 c join join2 d on c.id = d.id;"], //使用字符串的形式,将querySql写在connection中。
                            "datasource":"",//数据源。
                            "table":[//表名,即使只有一张表,也必须以[]的数组形式书写。
                                "xxx"
                            ]
                        }
                    ],
                    "where":"",//过滤条件。
                    "splitPk":"",//切分键。
                    "encoding":"UTF-8"//编码格式。
                },
                "name":"Reader",
                "category":"reader"
            },
            {
                "stepType":"stream",
                "parameter":{},
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting":{
            "errorLimit":{
                "record":"0"//错误记录数。
            },
            "speed":{
                "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
                "concurrent":1, //作业并发数。
                "mbps":"12"//限流,此处1mbps = 1MB/s。
            }
        },
        "order":{
            "hops":[
                {
                    "from":"Reader",
                    "to":"Writer"
                }
            ]
        }
    }
  • 配置分库分表

    说明

    分库分表是指在KingbaseES Reader端可以选择多个KingbaseES数据表,且表结构保持一致。

    {
        "type": "job",
        "version": "1.0",
        "configuration": {
            "reader": {
                "plugin": "kingbasees",
                "parameter": {
                    "connection": [
                        {
                            "table": [
                                "tbl1",
                                "tbl2",
                                "tbl3"
                            ],
                            "datasource": "datasourceName1"
                        },
                        {
                            "table": [
                                "tbl4",
                                "tbl5",
                                "tbl6"
                            ],
                            "datasource": "datasourceName2"
                        }
                    ],
                    "singleOrMulti": "multi",
                    "splitPk": "db_id",
                    "column": [
                        "id", "name", "age"
                    ],
                    "where": "1 < id and id < 100"
                }
            },
            "writer": {            
            }
        }
    }

Reader脚本参数

参数

描述

username

用户名。

password

密码。

column

需要同步的字段名称。如果需要同步所有列,请使用星号(*)。

table

需要同步的表名。

jdbcUrl

连接KingbaseES的JDBC URL。例如,jdbc:kingbase8://127.0.0.1:30215?currentschema=TEST

splitPk

KingbaseES表中的某个字段作为同步的切分字段,切分字段有助于多并发同步KingbaseES表。

切分字段需要是数值整型的字段,如果没有该类型,则可以不填。

Writer脚本Demo

脚本配置示例如下所示。

{
    "type":"job",
    "version":"2.0",//版本号。
    "steps":[
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"kingbasees",//插件名。
            "parameter":{
                "postSql":[],//导入后的准备语句。
                "datasource":"",//数据源。
                "column":[//列名。
                    "id",
                    "value"
                ],
                "batchSize":1024,//一次性批量提交的记录数大小。
                "table":"",//表名。
                "preSql":[
                     "delete from XXX;" //导入前的准备语句。
                   ]
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{//错误记录数。
            "record":"0"
        },
        "speed":{
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1, //作业并发数。
            "mbps":"12"//限流,此处1mbps = 1MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Writer脚本参数

参数

描述

是否必选

默认值

datasource

数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须与添加的数据源名称保持一致。

table

选取的需要同步的表名称。

column

目标表需要写入数据的字段,字段之间用英文所逗号分隔,例如"column": ["id", "name", "age"]

如果要依次写入全部列,使用星号(*)表示, 例如"column":["*"]

说明

当来源端字段名称包含斜杠(/),需要使用反斜杠加双引号(\"your_column_name\")进行转义,例如,字段名称为/abc/efg,则转义后字段名称为\"/abc/efg\"

preSql

执行数据同步任务之前率先执行的SQL语句。目前向导模式仅允许执行一条SQL语句,脚本模式可以支持多条SQL语句。例如,执行前清空表中的旧数据:

truncate table tablename

说明

当有多条SQL语句时,不支持SQL事务原子性。

postSql

执行数据同步任务之后执行的SQL语句,目前向导模式仅允许执行一条SQL语句,脚本模式可以支持多条SQL语句。例如,加上某一个时间戳:alter table tablenameadd colname timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP

batchSize

一次性批量提交的记录数大小,该值可以极大减少数据同步系统与数据源的网络交互次数,并提升整体吞吐量。如果该值设置过大,会导致数据同步运行进程OOM异常。

1024