本文为您介绍MySQL Reader支持的数据类型、字段映射和数据源等参数及配置示例。

前提条件

开始配置MySQL Reader插件前,请首先配置好数据源,详情请参见配置MySQL数据源

背景信息

MySQL Reader插件通过JDBC连接器连接至远程的MySQL数据库,根据您配置的信息生成查询SQL语句,发送至远程MySQL数据库,执行该SQL语句并返回结果。然后使用数据同步自定义的数据类型拼装为抽象的数据集,传递给下游Writer处理。

在底层实现上,MySQL Reader插件通过JDBC连接远程MySQL数据库,并执行相应的SQL语句,从MySQL库中抽取数据。

MySQL Reader插件支持读取表和视图。表字段可以依序指定全部列、指定部分列、调整列顺序、指定常量字段和配置MySQL的函数,例如now()等。

类型转换列表

MySQL Reader针对MySQL类型的转换列表,如下所示。
类型分类 MySQL数据类型
整数类 INT、TINYINT、SMALLINT、MEDIUMINT和BIGINT
浮点类 FLOAT、DOUBLE和DECIMAL
字符串类 VARCHAR、CHAR、TINYTEXT、TEXT、MEDIUMTEXT和LONGTEXT
日期时间类 DATE、DATETIME、TIMESTAMP、TIME和YEAR
布尔型 BIT和BOOL
二进制类 TINYBLOB、MEDIUMBLOB、BLOB、LONGBLOB和VARBINARY
注意
  • 除上述罗列字段类型外,其它类型均不支持。
  • MySQL Reader插件将tinyint(1)视作整型。

参数说明

参数 描述 是否必选 默认值
datasource 数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须与添加的数据源名称保持一致。
table 选取的需要同步的表名称,一个数据集成Job只能同步一张表。
table用于配置范围的高级用法示例如下:
  • 您可以通过配置区间读取分库分表,例如'table_[0-99]'表示读取'table_0''table_1''table_2'直到'table_99'
  • 如果您的表数字后缀的长度一致,例如'table_000''table_001''table_002'直到'table_999',您可以配置为'"table": ["table_00[0-9]", "table_0[10-99]", "table_[100-999]"]'
说明 任务会读取匹配到的所有表,具体读取这些表中column配置项指定的列。如果表不存在,或者读取的列不存在,会导致任务失败。
column 所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息 。默认使用所有列配置,例如[ * ]。
  • 支持列裁剪:列可以挑选部分列进行导出。
  • 支持列换序:列可以不按照表schema信息顺序进行导出。
  • 支持常量配置:您需要按照MySQL SQL语法格式,例如["id","table","1","'mingya.wmy'","'null'","to_char(a+1)","2.3","true"]
    • id为普通列名。
    • table为包含保留字的列名。
    • 1为整型数字常量。
    • 'mingya.wmy'为字符串常量(注意需要加上一对单引号)。
    • 关于null
      • " "表示空。
      • null表示null。
      • 'null'表示null这个字符串。
    • to_char(a+1)为计算字符串长度函数。
    • 2.3为浮点数。
    • true为布尔值。
  • column必须显示指定同步的列集合,不允许为空。
splitPk MySQL Reader进行数据抽取时,如果指定splitPk,表示您希望使用splitPk代表的字段进行数据分片,数据同步因此会启动并发任务进行数据同步,提高数据同步的效能。
  • 推荐splitPk用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。
  • 目前splitPk仅支持整型数据切分,不支持字符串、浮点和日期等其他类型 。如果您指定其他非支持类型,忽略splitPk功能,使用单通道进行同步。
  • 如果不填写splitPk,包括不提供splitPk或者splitPk值为空,数据同步视作使用单通道同步该表数据 。
where 筛选条件,在实际业务场景中,往往会选择当天的数据进行同步,将where条件指定为gmt_create>$bizdate
  • where条件可以有效地进行业务增量同步。如果不填写where语句,包括不提供where的key或value,数据同步均视作同步全量数据。
  • 不可以将where条件指定为limit 10,这不符合MySQL SQL WHERE子句约束。
querySql(高级模式,向导模式不提供) 在部分业务场景中,where配置项不足以描述所筛选的条件,您可以通过该配置型来自定义筛选SQL。配置该项后,数据同步系统会忽略tables、columns和splitPk配置项,直接使用该项配置的内容对数据进行筛选。例如,需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id。当您配置querySql时,MySQL Reader直接忽略table、column、where和splitPk条件的配置,querySql优先级大于tablecolumnwheresplitPk选项。datasource通过它解析出用户名和密码等信息。
说明 querySql需要区分大小写,例如,写为querysql会不生效。
singleOrMulti(仅适用于分库分表) 表示分库分表,向导模式转换成脚本模式主动生成此配置"singleOrMulti":"multi",但配置脚本任务模板不会直接生成此配置必须手动添加,否则只会识别第一个数据源。 multi

向导开发介绍

打开新建的数据同步节点,即可进行同步任务的配置,详情请参见通过向导模式配置任务

您需要在数据同步任务的编辑页面进行以下配置:
  1. 选择数据源。
    配置同步任务的数据来源数据去向数据源
    参数 描述
    数据源 即上述参数说明中的datasource,通常填写您配置的数据源名称。
    即上述参数说明中的table
    数据过滤 您将要同步数据的筛选条件,暂时不支持limit关键字过滤。SQL语法与选择的数据源一致。
    切分键 您可以将源数据表中某一列作为切分键,建议使用主键或有索引的列作为切分键,仅支持类型为整型的字段。
    读取数据时,根据配置的字段进行数据分片,实现并发读取,可以提升数据同步效率。
    说明 切分键与数据同步中的选择来源有关,配置数据来源时才显示切分键配置项。
  2. 字段映射,即上述参数说明中的column
    左侧的源头表字段和右侧的目标表字段为一一对应关系。单击添加一行可以增加单个字段,鼠标放至需要删除的字段上,即可单击删除图标进行删除 。字段映射
    参数 描述
    同名映射 单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。
    同行映射 单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。
    取消映射 单击取消映射,可以取消建立的映射关系。
    自动排版 可以根据相应的规律自动排版。
    手动编辑源表字段 请手动编辑字段,一行表示一个字段,首尾空行会被采用,其他空行会被忽略。
    添加一行 单击添加一行,您可以输入以下类型的字段:
    • 可以输入常量,输入的值需要使用英文单引号,如'abc’'123’等。
    • 可以配合调度参数使用,例如${bizdate}等。
    • 可以输入关系数据库支持的函数,例如now()count(1)等。
    • 如果您输入的值无法解析,则类型显示为未识别。
  3. 通道控制。通道控制
    参数 描述
    任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。
    同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。
    错误记录数 错误记录数,表示脏数据的最大容忍条数。

脚本开发介绍

使用脚本开发的详情请参见通过脚本模式配置任务

本文为您提供单库单表和分库分表的配置示例:
  • 配置单库单表
    {
        "type":"job",
        "version":"2.0",//版本号。
        "steps":[
            {
                "stepType":"mysql",//插件名。
                "parameter":{
                    "column":[//列名。
                        "id"
                    ],
                    "connection":[
                        {   "querySql":["select a,b from join1 c join join2 d on c.id = d.id;"], //使用字符串的形式,将querySql写在connection中。
                            "datasource":"",//数据源。
                            "table":[//表名,即使只有一张表,也必须以[]的数组形式书写。
                                "xxx"
                            ]
                        }
                    ],
                    "where":"",//过滤条件。
                    "splitPk":"",//切分键。
                    "encoding":"UTF-8"//编码格式。
                },
                "name":"Reader",
                "category":"reader"
            },
            {
                "stepType":"stream",
                "parameter":{},
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting":{
            "errorLimit":{
                "record":"0"//错误记录数。
            },
            "speed":{
                "throttle":false,//false代表不限流,下面的限流的速度不生效;true代表限流。
                "concurrent":1,//作业并发数。
            }
        },
        "order":{
            "hops":[
                {
                    "from":"Reader",
                    "to":"Writer"
                }
            ]
        }
    }
  • 配置分库分表
    说明 分库分表是指在MySQL Reader端可以选择多个MySQL数据表,且表结构保持一致。
    {
        "type": "job",
        "version": "1.0",
        "configuration": {
            "reader": {
                "plugin": "mysql",
                "parameter": {
                    "connection": [
                        {
                            "table": [
                                "tbl1",
                                "tbl2",
                                "tbl3"
                            ],
                            "datasource": "datasourceName1"
                        },
                        {
                            "table": [
                                "tbl4",
                                "tbl5",
                                "tbl6"
                            ],
                            "datasource": "datasourceName2"
                        }
                    ],
                    "singleOrMulti": "multi",
                    "splitPk": "db_id",
                    "column": [
                        "id", "name", "age"
                    ],
                    "where": "1 < id and id < 100"
                }
            },
            "writer": {            
            }
        }
    }