全部产品
阿里云办公

MySQLWriter

更新时间:2017-06-07 13:26:11

1 快速介绍

MySQLWriter 插件实现了写入数据到 MySQL 主库的目的表的功能。在底层实现上, MySQLReader 通过 JDBC 连接远程 MySQL 数据库,并执行相应的 insert into ... 或者 ( replace into ...) 的 sql 语句将数据写入 MySQL,内部会分批次提交入库,需要数据库本身采用 innodb 引擎。

MySQLWriter 面向ETL开发工程师,他们使用 MySQLWriter 从数仓导入数据到 MySQL。同时 MySQLWriter 亦可以作为数据迁移工具为DBA等用户提供服务。

2 实现原理

MySQLWriter 通过 CDP 框架获取 Reader 生成的协议数据,根据你配置的 writemode 生成

  • insert into...(当主键/唯一性索引冲突时会写不进去冲突的行)
  • replace into...(没有遇到主键/唯一性索引冲突时,与 insert into 行为一致,冲突时会用新行替换原有行所有字段) 的语句写入数据到 MySQL。处于性能考虑,采用了 PreparedStatement + Batch,并且设置了:rewriteBatchedStatements=true,将数据缓冲到线程上下文 Buffer 中,当 Buffer 累计到预定阈值时,才发起写入请求。

注意:目的表所在数据库必须是主库才能写入数据;整个任务至少需要具备 insert/replace into...的权限,是否需要其他权限,取决于你任务配置中在 preSql 和 postSql 中指定的语句。

3 功能说明

3.1 配置样例
  • 使用RDS的Instance配置一个写入MySQL的作业。
{
    "type": "job",
    "traceId": "您可以在这里填写您作业的追踪ID,建议使用业务名+您的作业ID",
    "version": "1.0",
    "configuration": {
        "reader": {
            "plugin": "sqlserver",
            "parameter": {
                "instanceName": "rds.aliyun.com",
                "username": "username",
                "password": "password",
                "table": "table",
                "column": [
                    "*"
                ],
                "where": "1 = 1"
            }
        },
        "writer": {
            "plugin": "MySQL",
            "parameter": {
                "writeMode": "insert",
                "instanceName": "instanceName",
                "database": "database",
                "username": "username",
                "password": "password",
                "table": "table",
                "column": [
                    "*"
                ],
                "preSql": [
                    "delete from XXX;"
                ]
            }
        }
    }
}
  • 使用JDBC配置一个向RDS写入数据的作业
{
    "type": "job",
    "traceId": "您可以在这里填写您作业的追踪ID,建议使用业务名+您的作业ID",
    "version": "1.0",
    "configuration": {
        "reader": {
            "plugin": "sqlserver",
            "parameter": {
                "instanceName": "rds.aliyun.com",
                "username": "username",
                "password": "password",
                "table": "table",
                "column": [
                    "*"
                ],
                "where": "1 = 1"
            }
        },
        "writer": {
            "plugin": "MySQL",
            "parameter": {
                "writeMode": "insert",
                "jdbcUrl": "jdbc:MySQL://ip:port/database",
                "username": "username",
                "password": "password",
                "table": "table",
                "column": [
                    "*"
                ],
                "preSql": [
                    "delete from XXX;"
                ]
            }
        }
    }
}
3.2 参数说明
  • instanceName

    • 描述: 阿里云RDS实例名称(Instance名称)。用户使用该配置指定RDS的Instance名称,CDP将翻译为底层执行的jdbc连接串连接。

      instanceName指定的是RDS实例,类似MySQL实例,需要和database配合使用。

    • 必选:是

    • 默认值:无

  • jdbcUrl

    • 描述:对于CDP部分私有云场景,数据库迁移场景等,其本身数据源是普通MySQL数据库不是RDS,对于这类场景,用户可以指定jdbc信息直连。

      jdbcUrl和instanceName/database两类信息概念上是等同的,因此只能配置其一。如果两者均配置,CDP默认将使用jdbc信息。

    • 必选:是

    • 默认值:无

  • database

    • 描述: 阿里云RDS数据库名称。
    • 必选:是
    • 默认值:无
  • username

    • 描述:数据源的用户名
    • 必选:是
    • 默认值:无
  • password

    • 描述:数据源指定用户名的密码
    • 必选:是
    • 默认值:无
  • table

    • 描述:所选取的需要同步的表。
    • 必选:是
    • 默认值:无
  • writeMode

    • 描述: 选择导入模式,可以支持insert/replace/insert ignore方式,
      • insert指当主键/唯一性索引冲突,CDP视为脏数据进行处理。
      • replace指没有遇到主键/唯一性索引冲突时,与 insert行为一致,当主键/唯一性索引冲突时会用新行替换原有行所有字段。
      • insert ignore指当主键/唯一性索引冲突,CDP将直接忽略更新丢弃,并且不记录!
    • 必选:否
    • 默认值:insert
  • column

    • 描述:所配置的表中需要同步的列名集合。以英文逗号(,)进行分隔。我们强烈不推荐用户使用默认列情况
    • 必选:是
    • 默认值:无
  • encoding

    • 描述:写入MySQL时使用的编码流,默认情况下使用UTF-8。注意,该字段描述的不是对端数据存储的编码,只是数据传输的编码类型。建议大部分用户可以忽略该配置。
    • 必选:否
    • 默认值:UTF-8
  • preSql

    • 描述:执行数据同步任务之前率先执行的sql语句,目前只允许执行一条SQL语句,例如清除旧数据。
    • 必选:否
    • 默认值:无
  • postSql

    • 描述:执行数据同步任务之后执行的sql语句,目前只允许执行一条SQL语句,例如加上某一个时间戳。
    • 必选:否
    • 默认值:无
  • batchSize

    • 描述:一次性批量提交的记录数大小,该值可以极大减少CDP与MySQL的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成CDP运行进程OOM情况。
    • 必选:否
    • 默认值:1024
3.3 类型转换

类似 MySQLReader ,目前 MySQLWriter 支持大部分 MySQL 类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。

下面列出 MySQLWriter 针对 MySQL 类型转换列表:

CDP 内部类型 MySQL 数据类型
Long int, tinyint, smallint, mediumint, int, bigint, year
Double float, double, decimal
String varchar, char, tinytext, text, mediumtext, longtext
Date date, datetime, timestamp, time
Boolean bit, bool
Bytes tinyblob, mediumblob, blob, longblob, varbinary

4 性能报告

4.1 环境准备
4.1.1 数据特征

建表语句:

  CREATE TABLE `CDP_MySQLWriter_perf_00` (
`biz_order_id` bigint(20) NOT NULL AUTO_INCREMENT  COMMENT 'id',
`key_value` varchar(4000) NOT NULL COMMENT 'Key-value的内容',
`gmt_create` datetime NOT NULL COMMENT '创建时间',
`gmt_modified` datetime NOT NULL COMMENT '修改时间',
`attribute_cc` int(11) DEFAULT NULL COMMENT '防止并发修改的标志',
`value_type` int(11) NOT NULL DEFAULT '0' COMMENT '类型',
`buyer_id` bigint(20) DEFAULT NULL COMMENT 'buyerid',
`seller_id` bigint(20) DEFAULT NULL COMMENT 'seller_id',
PRIMARY KEY (`biz_order_id`,`value_type`),
KEY `idx_biz_vertical_gmtmodified` (`gmt_modified`)
  ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='CDP perf test'

单行记录类似于:

   key_value: ;orderIds:20148888888,2014888888813800;
  gmt_create: 2011-09-24 11:07:20
  gmt_modified: 2011-10-24 17:56:34
  attribute_cc: 1
  value_type: 3
  buyer_id: 8888888
   seller_id: 1
4.1.2 机器参数
  • 执行CDP的机器参数为:
    1. cpu: 24核 Intel(R) Xeon(R) CPU E5-2630 0 @ 2.30GHz
    2. mem: 48GB
    3. net: 千兆双网卡
    4. disc: CDP 数据不落磁盘,不统计此项
  • MySQL数据库机器参数为:
    1. cpu: 32核 Intel(R) Xeon(R) CPU E5-2650 v2 @ 2.60GHz
    2. mem: 256GB
    3. net: 千兆双网卡
    4. disc: BTWL419303E2800RGN INTEL SSDSC2BB800G4 D2010370
4.1.3 CDP jvm 参数

-Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError

4.2 测试报告
4.2.1 单表测试报告
通道数 批量提交行数 CDP速度(Rec/s) CDP流量(MB/s) CDP机器网卡流出流量(MB/s) CDP机器运行负载 DB网卡进入流量(MB/s) DB运行负载 DB TPS
1 128 5319 0.260 0.580 0.05 0.620 0.5 50
1 512 14285 0.697 1.6 0.12 1.6 0.6 28
1 1024 17241 0.842 1.9 0.20 1.9 0.6 16
1 2048 31250 1.49 2.8 0.15 3.0 0.8 15
1 4096 31250 1.49 3.5 0.20 3.6 0.8 8
4 128 11764 0.574 1.5 0.21 1.6 0.8 112
4 512 30769 1.47 3.5 0.3 3.6 0.9 88
4 1024 50000 2.38 5.4 0.3 5.5 1.0 66
4 2048 66666 3.18 7.0 0.3 7.1 1.37 46
4 4096 80000 3.81 7.3 0.5 7.3 1.40 26
8 128 17777 0.868 2.9 0.28 2.9 0.8 200
8 512 57142 2.72 8.5 0.5 8.5 0.70 159
8 1024 88888 4.24 12.2 0.9 12.4 1.0 108
8 2048 133333 6.36 14.7 0.9 14.7 1.0 81
8 4096 166666 7.95 19.5 0.9 19.5 3.0 45
16 128 32000 1.53 3.3 0.6 3.4 0.88 401
16 512 106666 5.09 16.1 0.9 16.2 2.16 260
16 1024 173913 8.29 22.1 1.5 22.2 4.5 200
16 2048 228571 10.90 28.6 1.61 28.7 4.60 128
16 4096 246153 11.74 31.1 1.65 31.2 4.66 57
32 1024 246153 11.74 30.5 3.17 30.7 12.10 270

说明:

  1. 这里的单表,主键类型为 bigint(20),自增。
  2. batchSize 和 通道个数,对性能影响较大。
  3. 16通道,4096批量提交时,出现 full gc 2次。
4.2.2 性能测试小结
  1. 批量提交行数(batchSize)对性能影响很大,当 batchSize>=512 之后,单线程写入速度能达到每秒写入一万行
  2. batchSize>=512 的基础上,随着通道数的增加(通道数<32),速度呈线性比增加。
  3. 通常不建议写入数据库时,通道个数 >32

5 FAQ


Q: MySQLWriter 执行 postSql 语句报错,那么数据导入到目标数据库了吗?

A: CDP 导入过程存在三块逻辑,pre 操作、导入操作、post 操作,其中任意一环报错,CDP 作业报错。由于 CDP 不能保证在同一个事务完成上述几个操作,因此有可能数据已经落入到目标端。


Q: 按照上述说法,那么有部分脏数据导入数据库,如果影响到线上数据库怎么办?

A: 目前有两种解法,第一种配置 pre 语句,该 sql 可以清理当天导入数据, CDP 每次导入时候可以把上次清理干净并导入完整数据。第二种,向临时表导入数据,完成后再 rename 到线上表。


Q: 上面第二种方法可以避免对线上数据造成影响,那我具体怎样操作?

A: 可以配置临时表导入