MySQL数据源

MySQL数据源为您提供读取和写入MySQL的双向通道,本文为您介绍DataWorks的MySQL数据同步的能力支持情况。

支持的MySQL版本

  • 离线读写:

    支持MySQL 5.5.x、MySQL 5.6.x、MySQL 5.7.x、MySQL 8.0.x,兼容Amazon RDS for MySQLAzure MySQLAmazon Aurora MySQL

    离线同步支持读取视图表。

  • 实时读取:

    数据集成实时读取MySQL数据是基于实时订阅MySQL实现的,当前仅支持实时同步MySQL 5.5.x、MySQL 5.6.x、MySQL 5.7.x、MySQL 8.0.x(非8.0新特性,比如functional index,仅兼容原有功能)版本的MySQL数据,兼容Amazon RDS for MySQLAzure MySQLAmazon Aurora MySQL

    重要

    如果需要同步DRDS的MySQL,请不要将DRDS的MySQL配置为MySQL数据源,您可以参考配置DRDS数据源文档直接将其配置为DRDS数据源。

使用限制

实时读

  • 不支持同步MySQL只读库实例的数据。

  • 不支持同步含有Functional index的表。

  • 不支持XA ROLLBACK。

    针对已经XA PREPARE的事务数据,实时同步会将其同步到目标端,如果XA ROLLBACK,实时同步不会针对XA PREPARE的数据做回滚写入的操作。若要处理XA ROLLBACK场景,需要手动将XA ROLLBACK的表从实时同步任务中移除,再添加表后重新进行同步。

  • 仅支持同步MySQL服务器Binlog配置格式为ROW。

  • 实时同步不会同步被级联删除的关联表记录。

  • 对于Amazon Aurora MySQL数据库,需要连接到您的主/写数据库,因为AWS不允许在Aurora MySQL的只读副本上激活binlog功能。实时同步任务需要binlog来执行增量更新。

  • 实时同步在线DDL变更仅支持通过数据管理DMS对MySQL表进行加列(Add Column)在线DDL变更。

离线读

MySQL Reader插件在进行分库分表等多表同步时,若要对单表进行切分,则需要满足任务并发数大于表个数这一条件,否则切分的Task数目等于表的个数。

支持的字段类型

各版本MySQL的全量字段类型请参见MySQL官方文档。以下以MySQL 8.0.x为例,为您罗列当前主要字段的支持情况。

字段类型

离线读(MySQL Reader)

离线写(MySQL Writer)

实时读

实时写

TINYINT

支持

支持

支持

支持

SMALLINT

支持

支持

支持

支持

INTEGER

支持

支持

支持

支持

BIGINT

支持

支持

支持

支持

FLOAT

支持

支持

支持

支持

DOUBLE

支持

支持

支持

支持

DECIMAL/NUMBERIC

支持

支持

支持

支持

REAL

不支持

不支持

不支持

不支持

VARCHAR

支持

支持

支持

支持

JSON

支持

支持

支持

支持

TEXT

支持

支持

支持

支持

MEDIUMTEXT

支持

支持

支持

支持

LONGTEXT

支持

支持

支持

支持

VARBINARY

支持

支持

支持

支持

BINARY

支持

支持

支持

支持

TINYBLOB

支持

支持

支持

支持

MEDIUMBLOB

支持

支持

支持

支持

LONGBLOB

支持

支持

支持

支持

ENUM

支持

支持

支持

支持

SET

支持

支持

支持

支持

BOOLEAN

支持

支持

支持

支持

BIT

支持

支持

支持

支持

DATE

支持

支持

支持

支持

DATETIME

支持

支持

支持

支持

TIMESTAMP

支持

支持

支持

支持

TIME

支持

支持

支持

支持

YEAR

支持

支持

支持

支持

LINESTRING

不支持

不支持

不支持

不支持

POLYGON

不支持

不支持

不支持

不支持

MULTIPOINT

不支持

不支持

不支持

不支持

MULTILINESTRING

不支持

不支持

不支持

不支持

MULTIPOLYGON

不支持

不支持

不支持

不支持

GEOMETRYCOLLECTION

不支持

不支持

不支持

不支持

数据同步前准备:MySQL环境准备

在DataWorks上进行数据同步前,您需要参考本文提前在MySQL侧进行数据同步环境准备,以便在DataWorks上进行MySQL数据同步任务配置与执行时服务正常。以下为您介绍MySQL同步前的相关环境准备。

准备工作1:确认MySQL版本

数据集成对MySQL版本有要求,您可参考上文支持的MySQL版本章节,查看当前待同步的MySQL是否符合版本要求。您可以在MySQL数据库通过如下语句查看当前MySQL数据库版本。

SELECT version();

准备工作2:配置账号权限

建议您提前规划并创建一个专用于DataWorks访问数据源的MySQL账号,操作如下。

  1. 可选:创建账号。

    操作详情请参见创建MySQL账号

  2. 配置权限。

    • 离线

      在离线同步场景下:

      • 在离线读MySQL数据时,此账号需拥有同步表的读(SELECT)权限。

      • 在离线写MySQL数据时,此账号需拥有同步表的写(INSERTDELETEUPDATE)权限。

    • 实时

      在实时同步场景下,此账号需要拥有数据库的SELECTREPLICATION SLAVEREPLICATION CLIENT权限。

    您可以参考以下命令为账号添加权限,或直接给账号赋予SUPER权限。如下执行语句在实际使用时,请替换'同步账号'为上述创建的账号。

    -- CREATE USER '同步账号'@'%' IDENTIFIED BY '密码'; //创建同步账号并设置密码,使其可以通过任意主机登录数据库。%表示任意主机。
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '同步账号'@'%'; //授权同步账号数据库的 SELECT, REPLICATION SLAVE, REPLICATION CLIENT权限。

    *.*表示授权同步账号对所有数据库的所有表拥有上述权限。您也可以指定授权同步账号对目标数据库的指定表拥有上述权限。例如,授权同步账号对test数据库的user表拥有上述权限,则可以使用GRANT SELECT, REPLICATION CLIENT ON test.user TO '同步账号'@'%';语句。

    说明

    REPLICATION SLAVE语句为全局权限,不能指定授权同步账号对目标数据库的指定表拥有相关权限。

准备工作3:(仅实时同步需要)开启MySQL Binlog

数据集成通过实时订阅MySQL Binlog实现增量数据实时同步,您需要在DataWorks配置同步前,先开启MySQL Binlog服务。操作如下:

重要
  • 如果Binlog在消费中,则无法被数据库删除。如果实时同步任务运行延迟将可能导致源端Binlog长时间被消费,请合理配置任务的延迟告警,并及时关注数据库的磁盘空间。

  • Binlog至少保留72小时以上,避免任务失败后因Binlog已经消失,再启动无法重置位点到故障发生前而导致的数据丢失(此时只能使用全量离线同步来补齐数据)。

  1. 检查Binlog是否开启。

    • 使用如下语句检查Binlog是否开启。

      SHOW variables like "log_bin";

      返回结果为ON时,表明已开启Binlog。

    • 如果您使用备用库同步数据,则还可以通过如下语句检查Binlog是否开启。

      SHOW variables LIKE "log_slave_updates";

      返回结果为ON时,表明备用库已开启Binlog。

    如果返回的结果与上述结果不符:

  2. 查询Binlog的使用格式。

    使用如下语句查询Binlog的使用格式。

    SHOW variables LIKE "binlog_format";

    返回结果说明:

    • 返回ROW,表示开启的Binlog格式为ROW

    • 返回STATEMENT,表示开启的Binlog格式为STATEMENT

    • 返回MIXED,表示开启的Binlog格式为MIXED

    重要

    DataWorks实时同步仅支持同步MySQL服务器Binlog配置格式为ROW。如果返回非ROW请修改Binlog Format。

  3. 查询Binlog完整日志是否开启。

    使用如下语句查询Binlog完整日志是否开启。

    show variables like "binlog_row_image";

    返回结果说明:

    • 返回FULL,表示Binlog开启了完整日志。

    • 返回MINIMAL,表示Binlog开启了最小日志,未开启完整日志。

    重要

    DataWorks实时同步,仅支持同步开启了Binlog完整日志的MySQL服务器数据。若查询结果返回非FULL,请修改binlog_row_image的配置。

OSS binlog读取授权配置

在添加MySQL数据源时,如果配置模式阿里云实例模式,且RDS MySQL实例地域与DataWorks项目空间在同一地域,您可以开启支持OSS binlog读取,开启后,在无法访问RDS binlog时,将会尝试从OSS获取binlog,以避免实时同步任务中断。

如果选择的OSS binlog访问身份阿里云RAM子账号阿里云RAM角色,您还需参考如下方式配置账号授权。

阿里云RAM子账号
  1. 登录RAM 访问控制-用户控制台,找到需要授权的子账号。具体操作:

  2. 单击操作列的添加权限

  3. 配置如下关键参数后,单击确认新增授权

    • 资源范围:账号级别。

    • 权限策略:系统策略。

    • 策略名称AliyunDataWorksAccessingRdsOSSBinlogPolicy

    image

阿里云RAM角色
  1. 登录RAM 访问控制-角色控制台,创建一个RAM角色。具体操作,请参见创建可信实体为阿里云账号的RAM角色

    关键参数:

    • 选择可信实体类型:阿里云账号。

    • 角色名称:自定义。

    • 选择信任的云账号:其他账号,填写DataWorks工作空间所属的云账号。

  2. 为创建好的RAM角色精确授权。具体操作,请参见为RAM角色授权

    关键参数:

    • 权限策略:系统策略。

    • 策略名称AliyunDataWorksAccessingRdsOSSBinlogPolicy

  3. 为创建好的RAM角色修改信任策略。具体操作,请参见修改RAM角色的信任策略

    {
        "Statement": [
            {
                "Action": "sts:AssumeRole",
                "Effect": "Allow",
                "Principal": {
                    "Service": [
                        "<DataWorks使用者主账号的云账号ID>@cdp.aliyuncs.com"
                    ]
                }
            }
        ],
        "Version": "1"
    }

创建数据源

在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源详细的配置参数解释可在配置界面查看对应参数的文案提示

数据同步任务开发:MySQL同步流程引导

数据同步任务的配置入口和通用配置流程可参见下文的配置指导。

单表离线同步任务配置指导

单表实时同步任务配置指导

操作流程请参见DataStudio侧实时同步任务配置

整库离线、整库(实时)全增量、整库(实时)分库分表等整库级别同步配置指导

操作流程请参见数据集成侧同步任务配置

常见问题

更多其他数据集成常见问题请参见数据集成常见问题

附录:MySQL脚本Demo与参数说明

离线任务脚本配置方式

如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。

Reader脚本Demo

本文为您提供单库单表和分库分表的配置示例:

说明

本文JSON示例中的注释仅用于展示部分重要参数含义,实际配置时,请移除注释内容。

  • 配置单库单表

    {
      "type": "job",
      "version": "2.0",//版本号。
      "steps": [
        {
          "stepType": "mysql",//插件名。
          "parameter": {
            "column": [//列名。
              "id"
            ],
            "connection": [
              {
                "querySql": [
                  "select a,b from join1 c join join2 d on c.id = d.id;"
                ],
                "datasource": ""//数据源名称。
              }
            ],
            "where": "",//过滤条件。
            "splitPk": "",//切分键。
            "encoding": "UTF-8"//编码格式。
          },
          "name": "Reader",
          "category": "reader"
        },
        {
          "stepType": "stream",
          "parameter": {},
          "name": "Writer",
          "category": "writer"
        }
      ],
      "setting": {
        "errorLimit": {
          "record": "0"//错误记录数。
        },
        "speed": {
          "throttle": true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
          "concurrent": 1,//作业并发数。
          "mbps": "12"//限流,此处1mbps = 1MB/s。
        }
      },
      "order": {
        "hops": [
          {
            "from": "Reader",
            "to": "Writer"
          }
        ]
      }
    }
  • 配置分库分表

    说明

    分库分表是指在MySQL Reader端可以选择多个MySQL数据表,且表结构保持一致。此处的‘分库分表’是指多个MySQL写入同一个目标表,如想要支持整库级别配置分库分表,还请在数据集成站点建立任务并选择整库分库分表能力

    {
      "type": "job",
      "version": "2.0",
      "steps": [
        {
          "stepType": "mysql",
          "parameter": {
            "indexes": [
              {
                "type": "unique",
                "column": [
                  "id"
                ]
              }
            ],
            "envType": 0,
            "useSpecialSecret": false,
            "column": [
              "id",
              "buyer_name",
              "seller_name",
              "item_id",
              "city",
              "zone"
            ],
            "tableComment": "测试订单表",
            "connection": [
              {
                "datasource": "rds_dataservice",
                "table": [
                  "rds_table"
                ]
              },
              {
                "datasource": "rds_workshop_log",
                "table": [
                  "rds_table"
                ]
              }
            ],
            "where": "",
            "splitPk": "id",
            "encoding": "UTF-8"
          },
          "name": "Reader",
          "category": "reader"
        },
        {
          "stepType": "odps",
          "parameter": {},
          "name": "Writer",
          "category": "writer"
        },
        {
          "name": "Processor",
          "stepType": null,
          "category": "processor",
          "copies": 1,
          "parameter": {
            "nodes": [],
            "edges": [],
            "groups": [],
            "version": "2.0"
          }
        }
      ],
      "setting": {
        "executeMode": null,
        "errorLimit": {
          "record": ""
        },
        "speed": {
          "concurrent": 2,
          "throttle": false
        }
      },
      "order": {
        "hops": [
          {
            "from": "Reader",
            "to": "Writer"
          }
        ]
      }
    }

Reader脚本参数

脚本参数名

描述

是否必选

默认值

datasource

数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须与添加的数据源名称保持一致。

table

选取的需要同步的表名称。一个数据集成任务只能从一张表中读取数据。

table用于配置范围的高级用法示例如下:

  • 您可以通过配置区间读取分库分表,例如'table_[0-99]'表示读取'table_0''table_1''table_2'直到'table_99'

  • 如果您的表数字后缀的长度一致,例如'table_000''table_001''table_002'直到'table_999',您可以配置为'"table": ["table_00[0-9]", "table_0[10-99]", "table_[100-999]"]'

说明

任务会读取匹配到的所有表,具体读取这些表中column配置项指定的列。如果表不存在,或者读取的列不存在,会导致任务失败。

column

所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息 。默认使用所有列配置,例如[ * ]。

  • 支持列裁剪:列可以挑选部分列进行导出。

  • 支持列换序:列可以不按照表schema信息顺序进行导出。

  • 支持常量配置:您需要按照MySQL SQL语法格式,例如["id","table","1","'mingya.wmy'","'null'","to_char(a+1)","2.3","true"]

    • id为普通列名。

    • table为包含保留字的列名。

    • 1为整型数字常量。

    • 'mingya.wmy'为字符串常量(注意需要加上一对单引号)。

    • 关于null

      • " "表示空。

      • null表示null。

      • 'null'表示null这个字符串。

    • to_char(a+1)为计算字符串长度函数。

    • 2.3为浮点数。

    • true为布尔值。

  • column必须显示指定同步的列集合,不允许为空。

splitPk

MySQL Reader进行数据抽取时,如果指定splitPk,表示您希望使用splitPk代表的字段进行数据分片,数据同步因此会启动并发任务进行数据同步,提高数据同步的效能。

  • 推荐splitPk用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。

  • 目前splitPk仅支持整型数据切分,不支持字符串、浮点和日期等其他类型 。如果您指定其他非支持类型,忽略splitPk功能,使用单通道进行同步。

  • 如果不填写splitPk,包括不提供splitPk或者splitPk值为空,数据同步视作使用单通道同步该表数据 。

where

筛选条件,在实际业务场景中,往往会选择当天的数据进行同步,将where条件指定为gmt_create>$bizdate

  • where条件可以有效地进行业务增量同步。如果不填写where语句,包括不提供where的key或value,数据同步均视作同步全量数据。

  • 不可以将where条件指定为limit 10,这不符合MySQL SQL WHERE子句约束。

querySql(高级模式,向导模式不支持此参数的配置)

在部分业务场景中,where配置项不足以描述所筛选的条件,您可以通过该配置型来自定义筛选SQL。配置该项后,数据同步系统会忽略tables、columns和splitPk配置项,直接使用该项配置的内容对数据进行筛选。例如,需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id。当您配置querySql时,MySQL Reader直接忽略table、column、where和splitPk条件的配置,querySql优先级大于tablecolumnwheresplitPk选项。datasource通过它解析出用户名和密码等信息。

说明

querySql需要区分大小写,例如,写为querysql会不生效。

useSpecialSecret

多来源数据源时,是否使用各自数据源的密码。取值包括:

  • true

  • false

如果您配置了多个来源数据源,且各个数据源使用的用户名密码不一致,您可以设置使用各自数据源的密码,即此参数设置为true

false

Writer脚本Demo

{
  "type": "job",
  "version": "2.0",//版本号。
  "steps": [
    {
      "stepType": "stream",
      "parameter": {},
      "name": "Reader",
      "category": "reader"
    },
    {
      "stepType": "mysql",//插件名。
      "parameter": {
        "postSql": [],//导入后的准备语句。
        "datasource": "",//数据源。
        "column": [//列名。
          "id",
          "value"
        ],
        "writeMode": "insert",//写入模式,您可以设置为insert、replace或update。
        "batchSize": 1024,//一次性批量提交的记录数大小。
        "table": "",//表名。
        "nullMode": "skipNull",//NULL值处理策略。
        "skipNullColumn": [//需要跳过NULL值的列。
          "id",
          "value"
        ],
        "preSql": [
          "delete from XXX;"//导入前的准备语句。
        ]
      },
      "name": "Writer",
      "category": "writer"
    }
  ],
  "setting": {
    "errorLimit": {//错误记录数。
      "record": "0"
    },
    "speed": {
      "throttle": true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
      "concurrent": 1,//作业并发数。
      "mbps": "12"//限流,控制同步的最高速率,防止对上游/下游数据库读取/写入压力过大,此处1mbps = 1MB/s。
    }
  },
  "order": {
    "hops": [
      {
        "from": "Reader",
        "to": "Writer"
      }
    ]
  }
}

Writer脚本参数

脚本参数名

描述

是否必选

默认值

datasource

数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须与添加的数据源名称保持一致。

table

选取的需要同步的表名称。

writeMode

选择导入模式,可以支持insert intoon duplicate key updatereplace into三种方式:

  • insert into:当主键/唯一性索引冲突时会写不进去冲突的行,以脏数据的形式体现。

    如果您通过脚本模式配置任务,请设置writeModeinsert

  • on duplicate key update:没有遇到主键/唯一性索引冲突时,与insert into行为一致。冲突时会用新行替换已经指定的字段的语句,写入数据至MySQL。

    如果您通过脚本模式配置任务,请设置writeModeupdate

  • replace into:没有遇到主键/唯一性索引冲突时,与insert into行为一致。冲突时会先删除原有行,再插入新行。即新行会替换原有行的所有字段。

    如果您通过脚本模式配置任务,请设置writeModereplace

insert

nullMode

NULL值处理策略,取值范围:

  • writeNull:当源端字段数据是NULL值时,给目标端字段写入NULL值。

  • skipNull:当源端字段数据是NULL值时,目标端不写入本字段,若目标端有默认值定义,该列值会使用目标端默认值,若目标端无默认值定义,该列值会是NULL。配置此参数时,需要同时配置skipNullColumn参数。

重要

配置为skipNull时,任务会动态拼接写数据的SQL语句以支持目标端默认值,会增多FLUSH次数,降低同步速度,最差情况下会每条数据FLUSH一次。

writeNull

skipNullColumn

nullMode配置为skipNull时,此参数配置的列不会被强制写为NULL,会优先使用对应列本身的默认值。

配置格式:["c1", "c2", ...],其中,c1c2需要配置为column参数的子集。

默认为本任务配置的所有列。

column

目标表需要写入数据的字段,字段之间用英文所逗号分隔,例如"column": ["id", "name", "age"]。如果要依次写入全部列,使用星号(*)表示, 例如"column": ["*"]

preSql

执行数据同步任务之前率先执行的SQL语句。目前向导模式仅允许执行一条SQL语句,脚本模式可以支持多条SQL语句。例如,执行前清空表中的旧数据(truncate table tablename)。

说明

当有多条SQL语句时,不支持事务。

postSql

执行数据同步任务之后执行的SQL语句,目前向导模式仅允许执行一条SQL语句,脚本模式可以支持多条SQL语句。例如,加上某一个时间戳alter table tablename add colname timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP

说明

当有多条SQL语句时,不支持事务。

batchSize

一次性批量提交的记录数大小,该值可以极大减少数据同步系统与MySQL的网络交互次数,并提升整体吞吐量。如果该值设置过大,会导致数据同步运行进程OOM异常。

256

updateColumn

writeMode配置成update时,发生遇到主键/唯一性索引冲突时所更新的字段。字段之间用英文逗号所分隔,例如 "updateColumn":["name", "age"]