MySQL数据源为您提供读取和写入MySQL的双向通道,本文为您介绍DataWorks的MySQL数据同步的能力支持情况。
支持的MySQL版本
- 离线读写: - 支持MySQL 5.5.x、MySQL 5.6.x、MySQL 5.7.x、MySQL 8.0.x,兼容Amazon RDS for MySQL、Azure MySQL、Amazon Aurora MySQL。 - 离线同步支持读取视图表。 
- 实时读取: - 数据集成实时读取MySQL数据是基于实时订阅MySQL实现的,当前仅支持实时同步MySQL 5.5.x、MySQL 5.6.x、MySQL 5.7.x、MySQL 8.0.x(非8.0新特性,比如functional index,仅兼容原有功能)版本的MySQL数据,兼容Amazon RDS for MySQL、Azure MySQL、Amazon Aurora MySQL。 重要- 如果需要同步DRDS的MySQL,请不要将DRDS的MySQL配置为MySQL数据源,您可以参考配置DRDS数据源文档直接将其配置为DRDS数据源。 
使用限制
实时读
- 不支持同步MySQL只读库实例的数据。 
- 不支持同步含有Functional index的表。 
- 不支持XA ROLLBACK。 - 针对已经XA PREPARE的事务数据,实时同步会将其同步到目标端,如果XA ROLLBACK,实时同步不会针对XA PREPARE的数据做回滚写入的操作。若要处理XA ROLLBACK场景,需要手动将XA ROLLBACK的表从实时同步任务中移除,再添加表后重新进行同步。 
- 仅支持同步MySQL服务器Binlog配置格式为ROW。 
- 实时同步不会同步被级联删除的关联表记录。 
- 对于Amazon Aurora MySQL数据库,需要连接到您的主/写数据库,因为AWS不允许在Aurora MySQL的只读副本上激活Binlog功能。实时同步任务需要Binlog来执行增量更新。 
- 实时同步在线DDL变更仅支持通过数据管理DMS对MySQL表进行加列(Add Column)在线DDL变更。 
- 不支持读取MySQL中的存储过程。 
离线读
- MySQL Reader插件在进行分库分表等多表同步时,若要对单表进行切分,则需要满足任务并发数大于表个数这一条件,否则切分的Task数目等于表的个数。 
- 不支持读取MySQL中的存储过程。 
支持的字段类型
各版本MySQL的全量字段类型请参见MySQL官方文档。以下以MySQL 8.0.x为例,为您罗列当前主要字段的支持情况。
| 字段类型 | 离线读(MySQL Reader) | 离线写(MySQL Writer) | 实时读 | 实时写 | 
| TINYINT | 
 | 
 | 
 | 
 | 
| SMALLINT | 
 | 
 | 
 | 
 | 
| INTEGER | 
 | 
 | 
 | 
 | 
| BIGINT | 
 | 
 | 
 | 
 | 
| FLOAT | 
 | 
 | 
 | 
 | 
| DOUBLE | 
 | 
 | 
 | 
 | 
| DECIMAL/NUMBERIC | 
 | 
 | 
 | 
 | 
| REAL | 
 | 
 | 
 | 
 | 
| VARCHAR | 
 | 
 | 
 | 
 | 
| JSON | 
 | 
 | 
 | 
 | 
| TEXT | 
 | 
 | 
 | 
 | 
| MEDIUMTEXT | 
 | 
 | 
 | 
 | 
| LONGTEXT | 
 | 
 | 
 | 
 | 
| VARBINARY | 
 | 
 | 
 | 
 | 
| BINARY | 
 | 
 | 
 | 
 | 
| TINYBLOB | 
 | 
 | 
 | 
 | 
| MEDIUMBLOB | 
 | 
 | 
 | 
 | 
| LONGBLOB | 
 | 
 | 
 | 
 | 
| ENUM | 
 | 
 | 
 | 
 | 
| SET | 
 | 
 | 
 | 
 | 
| BOOLEAN | 
 | 
 | 
 | 
 | 
| BIT | 
 | 
 | 
 | 
 | 
| DATE | 
 | 
 | 
 | 
 | 
| DATETIME | 
 | 
 | 
 | 
 | 
| TIMESTAMP | 
 | 
 | 
 | 
 | 
| TIME | 
 | 
 | 
 | 
 | 
| YEAR | 
 | 
 | 
 | 
 | 
| LINESTRING | 
 | 
 | 
 | 
 | 
| POLYGON | 
 | 
 | 
 | 
 | 
| MULTIPOINT | 
 | 
 | 
 | 
 | 
| MULTILINESTRING | 
 | 
 | 
 | 
 | 
| MULTIPOLYGON | 
 | 
 | 
 | 
 | 
| GEOMETRYCOLLECTION | 
 | 
 | 
 | 
 | 
准备工作
配置DataWorks数据源前,请按本文指引完成MySQL环境准备,确保后续任务正常运行。
以下为您介绍MySQL同步前的相关环境准备。
确认MySQL版本
数据集成对MySQL版本有要求,您可参考上文支持的MySQL版本章节,查看当前待同步的MySQL是否符合版本要求。您可以在MySQL数据库通过如下语句查看当前MySQL数据库版本。
SELECT version();配置账号权限
建议您提前规划并创建一个专用于DataWorks访问数据源的MySQL账号,操作如下。
- 可选:创建账号。 - 操作详情请参见创建MySQL账号。 
- 配置权限。 - 离线 - 在离线同步场景下: - 在离线读MySQL数据时,此账号需拥有同步表的读( - SELECT)权限。
- 在离线写MySQL数据时,此账号需拥有同步表的写( - INSERT、- DELETE、- UPDATE)权限。
 
- 实时 - 在实时同步场景下,此账号需要拥有数据库的 - SELECT、- REPLICATION SLAVE、- REPLICATION CLIENT权限。
 - 您可以参考以下命令为账号添加权限,或直接给账号赋予 - SUPER权限。如下执行语句在实际使用时,请替换- '同步账号'为上述创建的账号。- -- CREATE USER '同步账号'@'%' IDENTIFIED BY '密码'; //创建同步账号并设置密码,使其可以通过任意主机登录数据库。%表示任意主机。 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '同步账号'@'%'; //授权同步账号数据库的SELECT, REPLICATION SLAVE, REPLICATION CLIENT权限。- *.*表示授权同步账号对所有数据库的所有表拥有上述权限。您也可以指定授权同步账号对目标数据库的指定表拥有上述权限。例如,授权同步账号对test数据库的user表拥有上述权限,则可以使用- GRANT SELECT, REPLICATION CLIENT ON test.user TO '同步账号'@'%';语句。说明- REPLICATION SLAVE语句为全局权限,不能指定授权同步账号对目标数据库的指定表拥有相关权限。
开启MySQL Binlog(仅实时同步需要)
数据集成通过实时订阅MySQL Binlog实现增量数据实时同步,您需要在DataWorks配置同步前,先开启MySQL Binlog服务。操作如下:
- 如果Binlog在消费中,则无法被数据库删除。如果实时同步任务运行延迟将可能导致源端Binlog长时间被消费,请合理配置任务的延迟告警,并及时关注数据库的磁盘空间。 
- Binlog至少保留72小时以上,避免任务失败后因Binlog已经消失,再启动无法重置位点到故障发生前而导致的数据丢失(此时只能使用全量离线同步来补齐数据)。 
- 检查Binlog是否开启。 - 使用如下语句检查Binlog是否开启。 - SHOW variables LIKE "log_bin";- 返回结果为ON时,表明已开启Binlog。 
- 如果您使用备用库同步数据,则还可以通过如下语句检查Binlog是否开启。 - SHOW variables LIKE "log_slave_updates";- 返回结果为ON时,表明备用库已开启Binlog。 
 - 如果返回的结果与上述结果不符: - 开源MySQL请参考MySQL官方文档开启Binlog。 
- 阿里云RDS MySQL请参考RDS MySQL日志备份开启Binlog。 
- 阿里云PolarDB MySQL请参考开启Binlog开启Binlog。 
 
- 查询Binlog的使用格式。 - 使用如下语句查询Binlog的使用格式。 - SHOW variables LIKE "binlog_format";- 返回结果说明: - 返回ROW,表示开启的Binlog格式为ROW。 
- 返回STATEMENT,表示开启的Binlog格式为STATEMENT。 
- 返回MIXED,表示开启的Binlog格式为MIXED。 
 重要- DataWorks实时同步仅支持同步MySQL服务器Binlog配置格式为ROW。如果返回非ROW请修改Binlog Format。 
- 查询Binlog完整日志是否开启。 - 使用如下语句查询Binlog完整日志是否开启。 - SHOW variables LIKE "binlog_row_image";- 返回结果说明: - 返回FULL,表示Binlog开启了完整日志。 
- 返回MINIMAL,表示Binlog开启了最小日志,未开启完整日志。 
 重要- DataWorks实时同步,仅支持同步开启了Binlog完整日志的MySQL服务器数据。若查询结果返回非FULL,请修改binlog_row_image的配置。 
OSS Binlog读取授权配置
在添加MySQL数据源时,如果配置模式为阿里云实例模式,且RDS MySQL实例地域与DataWorks项目空间在同一地域,您可以开启支持OSS binlog读取,开启后,在无法访问RDS Binlog时,将会尝试从OSS获取Binlog,以避免实时同步任务中断。
如果选择的OSS binlog访问身份为阿里云RAM子账号或阿里云RAM角色,您还需参考如下方式配置账号授权。
- 阿里云RAM子账号。 - 登录RAM 访问控制-用户控制台,找到需要授权的子账号。具体操作: 
- 单击操作列的添加权限。 
- 配置如下关键参数后,单击确认新增授权。 - 资源范围:账号级别。 
- 权限策略:系统策略。 
- 策略名称: - AliyunDataWorksAccessingRdsOSSBinlogPolicy。
  
 
- 阿里云RAM角色。 - 登录RAM 访问控制-角色控制台,创建一个RAM角色。具体操作,请参见创建可信实体为阿里云账号的RAM角色。 - 关键参数: - 信任主体类型:云账号。 
- 信任主体名称:其他云账号,需要填写DataWorks工作空间所属的云账号。 
- 角色名称:自定义。 
 
- 为创建好的RAM角色精确授权。具体操作,请参见为RAM角色授权。 - 关键参数: - 权限策略:系统策略。 
- 策略名称: - AliyunDataWorksAccessingRdsOSSBinlogPolicy。
 
- 为创建好的RAM角色修改信任策略。具体操作,请参见修改RAM角色的信任策略。 - { "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "<DataWorks使用者主账号的云账号ID>@di.dataworks.aliyuncs.com", "<DataWorks使用者主账号的云账号ID>@dataworks.aliyuncs.com" ] } } ], "Version": "1" }
 
创建数据源
阿里云实例模式
若您的MySQL为阿里云RDS实例,建议您选择阿里云实例模式创建数据源。参数配置方式如下:
| 参数 | 说明 | 
| 数据源名称 | 数据源名称在工作空间内唯一。建议使用可清晰识别业务和环境的命名,例如 | 
| 配置模式 | 此处选择阿里云实例模式。配置模式说明见:场景1:实例模式(当前云账号),场景2:实例模式(其他云账号)。 | 
| 所属云账号 | 根据实例的归属选择,若选择其他云账号,需要注意跨账号权限配置:跨账号授权(RDS、Hive或Kafka)。 若选择其他云账号,需填写以下信息: 
 | 
| 地域 | 实例所在的地域。 | 
| 实例 | 选择要连接的实例名称。 | 
| 备库设置 | 如果此数据源具备只读实例(备库),可以在配置任务时选择备库读取。优势是防止干扰主库,不影响主库性能。 | 
| 实例地址 | 选择正确的实例后,点击获取最新地址,可以查看实例的公/私网地址、VPC和交换机等信息。 | 
| 数据库 | 数据源要访问的数据库名称。请确保指定的用户具备对该数据库的访问权限。 | 
| 用户名/密码 | MySQL数据库创建的用户名和密码。若使用RDS实例,可在该实例的账号管理中创建和维护。 | 
| 支持OSS binlog读取 | 开启后,在访问不到RDS binlog时,会尝试从OSS获取binlog,避免实时同步任务中断。配置详情参见:OSS Binlog读取授权配置。并根据授权配置,设置OSS binlog访问身份。 | 
| 认证选项 | 可选择无认证或SSL认证。若选择SSL认证,需要实例本身开启SSL认证。准备好证书文件后上传至认证文件管理。 | 
| 版本 | 可以登录MySQL服务器后,使用SELECT VERSION()查询来查看版本号。 | 
连接串模式
你也可以选择连接串模式创建数据源,方式更为灵活。连接串模式的参数配置方式如下:
| 参数 | 说明 | 
| 数据源名称 | 数据源名称在工作空间内唯一。建议使用可清晰识别业务和环境的命名,例如 | 
| 配置模式 | 此处选择连接串模式。即通过JDBC URL的方式连接数据库。 | 
| 连接串预览 | 填写完下文的连接地址和数据库名称后,DataWorks会自动拼接将其拼接成JDBC URL供您预览。 | 
| 连接地址 | 主机地址IP:填写数据库所在的IP或域名。若该数据库为阿里云RDS实例,可在该实例的详情,点击数据库连接查看地址。 端口号:数据库端口,默认为3306。 | 
| 数据库名称 | 数据源要访问的数据库名称。请确保指定的用户具备对该数据库的访问权限。 | 
| 用户名/密码 | MySQL数据库创建的用户名和密码。若使用RDS实例,可在该实例的账号管理中创建和维护。 | 
| 版本 | 可以登录MySQL服务器后,使用SELECT VERSION()查询来查看版本号。 | 
| 认证选项 | 可选择无认证或SSL认证。若选择SSL认证,需要实例本身开启SSL认证。准备好证书文件后上传至认证文件管理。 | 
| 高级参数 | 参数:点击参数下拉框,选择已支持的参数名称。如:connectTimeout。 值:根据选择的参数填入合适的值。如:3000。 则URL会自动拼接为:jdbc:mysql://192.168.90.28:3306/test?connectTimeout=50000 | 
请确保数据源与资源组的网络联通,否则后续任务将运行失败。根据数据源的网络环境和连接模式配置网络联通。详情参见:测试连通性。
数据同步任务开发:MySQL同步流程引导
数据同步任务的配置入口和通用配置流程可参见下文的配置指导。
单表离线同步任务配置指导
- 脚本模式配置的全量参数和脚本Demo请参见下文的附录:MySQL脚本Demo与参数说明。 
单表实时同步任务配置指导
操作流程请参见DataStudio侧实时同步任务配置。
整库离线、整库(实时)全增量、整库(实时)分库分表等整库级别同步配置指导
操作流程请参见整库实时同步任务配置。
常见问题
更多其他数据集成常见问题请参见数据集成常见问题。
附录:MySQL脚本Demo与参数说明
离线任务脚本配置方式
如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见脚本模式配置,以下为您介绍脚本模式下数据源的参数配置详情。
Reader脚本Demo
本文为您提供单库单表和分库分表的配置示例:
本文JSON示例中的注释仅用于展示部分重要参数含义,实际配置时,请移除注释内容。
- 配置单库单表 - { "type": "job", "version": "2.0",//版本号。 "steps": [ { "stepType": "mysql",//插件名。 "parameter": { "column": [//列名。 "id" ], "connection": [ { "querySql": [ "select a,b from join1 c join join2 d on c.id = d.id;" ], "datasource": ""//数据源名称。 } ], "where": "",//过滤条件。 "splitPk": "",//切分键。 "encoding": "UTF-8"//编码格式。 }, "name": "Reader", "category": "reader" }, { "stepType": "stream", "parameter": {}, "name": "Writer", "category": "writer" } ], "setting": { "errorLimit": { "record": "0"//错误记录数。 }, "speed": { "throttle": true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。 "concurrent": 1,//作业并发数。 "mbps": "12"//限流,此处1mbps = 1MB/s。 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
- 配置分库分表 说明- 分库分表是指在MySQL Reader端可以选择多个MySQL数据表,且表结构保持一致。此处的‘分库分表’是指多个MySQL写入同一个目标表,如想要支持整库级别配置分库分表,还请在数据集成站点建立任务并选择整库分库分表能力 - { "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "indexes": [ { "type": "unique", "column": [ "id" ] } ], "envType": 0, "useSpecialSecret": false, "column": [ "id", "buyer_name", "seller_name", "item_id", "city", "zone" ], "tableComment": "测试订单表", "connection": [ { "datasource": "rds_dataservice", "table": [ "rds_table" ] }, { "datasource": "rds_workshop_log", "table": [ "rds_table" ] } ], "where": "", "splitPk": "id", "encoding": "UTF-8" }, "name": "Reader", "category": "reader" }, { "stepType": "odps", "parameter": {}, "name": "Writer", "category": "writer" }, { "name": "Processor", "stepType": null, "category": "processor", "copies": 1, "parameter": { "nodes": [], "edges": [], "groups": [], "version": "2.0" } } ], "setting": { "executeMode": null, "errorLimit": { "record": "" }, "speed": { "concurrent": 2, "throttle": false } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
Reader脚本参数
| 脚本参数名 | 描述 | 是否必选 | 默认值 | 
| datasource | 数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须与添加的数据源名称保持一致。 | 是 | 无 | 
| table | 选取的需要同步的表名称。一个数据集成任务只能从一张表中读取数据。 table用于配置范围的高级用法示例如下: 
 说明  任务会读取匹配到的所有表,具体读取这些表中column配置项指定的列。如果表不存在,或者读取的列不存在,会导致任务失败。 | 是 | 无 | 
| column | 所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。默认使用所有列配置,例如[*]。 
 | 是 | 无 | 
| splitPk | MySQL Reader进行数据抽取时,如果指定splitPk,表示您希望使用splitPk代表的字段进行数据分片,数据同步因此会启动并发任务进行数据同步,提高数据同步的效率。 
 | 否 | 无 | 
| splitFactor | 切分因子,可以配置同步数据的切分份数,如果配置了多并发,会按照并发数 * splitFactor份来切分。例如,并发数=5,splitFactor=5,则会按照5*5=25份来切分,在5个并发线程上执行。 说明  建议取值范围:1~100,过大会导致内存溢出。 | 否 | 5 | 
| where | 筛选条件,在实际业务场景中,往往会选择当天的数据进行同步,将where条件指定为 
 | 否 | 无 | 
| querySql(高级模式,向导模式不支持此参数的配置) | 在部分业务场景中,where配置项不足以描述所筛选的条件,您可以通过该配置项来自定义筛选SQL。配置该项后,数据同步系统会忽略tables、columns和splitPk配置项,直接使用该项配置的内容对数据进行筛选。例如,需要进行多表join后同步数据,使用 说明  querySql需要区分大小写,例如,写为querysql会不生效。 | 否 | 无 | 
| useSpecialSecret | 多来源数据源时,是否使用各自数据源的密码。取值包括: 
 如果您配置了多个来源数据源,且各个数据源使用的用户名密码不一致,您可以设置使用各自数据源的密码,即此参数设置为true。 | 否 | false | 
Writer脚本Demo
{
  "type": "job",
  "version": "2.0",//版本号。
  "steps": [
    {
      "stepType": "stream",
      "parameter": {},
      "name": "Reader",
      "category": "reader"
    },
    {
      "stepType": "mysql",//插件名。
      "parameter": {
        "postSql": [],//导入后的准备语句。
        "datasource": "",//数据源。
        "column": [//列名。
          "id",
          "value"
        ],
        "writeMode": "insert",//写入模式,您可以设置为insert、replace或update。
        "batchSize": 1024,//一次性批量提交的记录数大小。
        "table": "",//表名。
        "nullMode": "skipNull",//NULL值处理策略。
        "skipNullColumn": [//需要跳过NULL值的列。
          "id",
          "value"
        ],
        "preSql": [
          "delete from XXX;"//导入前的准备语句。
        ]
      },
      "name": "Writer",
      "category": "writer"
    }
  ],
  "setting": {
    "errorLimit": {//错误记录数。
      "record": "0"
    },
    "speed": {
      "throttle": true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
      "concurrent": 1,//作业并发数。
      "mbps": "12"//限流,控制同步的最高速率,防止对上游/下游数据库读取/写入压力过大,此处1mbps = 1MB/s。
    }
  },
  "order": {
    "hops": [
      {
        "from": "Reader",
        "to": "Writer"
      }
    ]
  }
}Writer脚本参数
| 脚本参数名 | 描述 | 是否必选 | 默认值 | 
| datasource | 数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须与添加的数据源名称保持一致。 | 是 | 无 | 
| table | 选取的需要同步的表名称。 | 是 | 无 | 
| writeMode | 选择导入模式,可以支持insert into、on duplicate key update和replace into三种方式: 
 | 否 | insert | 
| nullMode | NULL值处理策略,取值范围: 
 重要  配置为skipNull时,任务会动态拼接写数据的SQL语句以支持目标端默认值,会增加FLUSH次数,降低同步速度,最差情况下会每条数据FLUSH一次。 | 否 | writeNull | 
| skipNullColumn | nullMode配置为skipNull时,此参数配置的列不会被强制写为 配置格式: | 否 | 默认为本任务配置的所有列。 | 
| column | 目标表需要写入数据的字段,字段之间用英文所逗号分隔,例如 | 是 | 无 | 
| preSql | 执行数据同步任务之前率先执行的SQL语句。目前向导模式仅允许执行一条SQL语句,脚本模式可以支持多条SQL语句。例如,执行前清空表中的旧数据(truncate table tablename)。 说明  当有多条SQL语句时,不支持事务。 | 否 | 无 | 
| postSql | 执行数据同步任务之后执行的SQL语句,目前向导模式仅允许执行一条SQL语句,脚本模式可以支持多条SQL语句。例如,加上某一个时间戳 说明  当有多条SQL语句时,不支持事务。 | 否 | 无 | 
| batchSize | 一次性批量提交的记录数大小,该值可以极大减少数据同步系统与MySQL的网络交互次数,并提升整体吞吐量。如果该值设置过大,会导致数据同步运行进程OOM异常。 | 否 | 256 | 
| updateColumn | 当writeMode配置成update时,发生主键/唯一性索引冲突时所更新的字段。字段之间用英文逗号所分隔,例如 | 否 | 无 | 

