PostgreSQLWriter

更新时间:2017-06-07 13:26:11

1 快速介绍

PostgreSQLWriter 插件实现了写入数据到 PostgreSQL 主库的目的表的功能。在底层实现上, PostgreSQLReader 通过 JDBC 连接远程 PostgreSQL 数据库,并执行相应的 insert into ...的 sql 语句将数据写入 PostgreSQL,内部会分批次提交入库。

PostgreSQLWriter 面向ETL开发工程师,他们使用 PostgreSQLWriter 从数仓导入数据到 PostgreSQL。同时 PostgreSQLWriter 亦可以作为数据迁移工具为DBA等用户提供服务。

2 实现原理

PostgreSQLWriter 通过 CDP 框架获取 Reader 生成的协议数据,生成 insert into...(当主键/唯一性索引冲突时会写不进去冲突的行),性能考虑,采用了 PreparedStatement + Batch,并且设置了:rewriteBatchedStatements=true,将数据缓冲到线程上下文 Buffer 中,当 Buffer 累计到预定阈值时,才发起写入请求。

注意:目的表所在数据库必须是主库才能写入数据;整个任务至少需要具备 insert into...的权限,是否需要其他权限,取决于你任务配置中在 preSql 和 postSql 中指定的语句。

3 功能说明

3.1 配置样例
  • 使用RDS的Instance配置一个写入PostgreSQL的作业。
{
    "type": "job",
    "traceId": "您可以在这里填写您作业的追踪ID,建议使用业务名+您的作业ID",
    "version": "1.0",
    "configuration": {
        "reader": {
            "plugin": "sqlserver",
            "parameter": {
                "instanceName": "rds.aliyun.com",
                "username": "username",
                "password": "password",
                "table": "table",
                "column": [
                    "*"
                ],
                "where": "1 = 1"
            }
        },
        "writer": {
            "plugin": "postgresql",
            "parameter": {
                "instanceName": "instanceName",
                "database": "database",
                "username": "username",
                "password": "password",
                "table": "table",
                "column": [
                    "*"
                ],
                "preSql": [
                    "delete from XXX;"
                ]
            }
        }
    }
}
  • 使用JDBC配置一个向RDS写入数据的作业
{
    "type": "job",
    "traceId": "您可以在这里填写您作业的追踪ID,建议使用业务名+您的作业ID",
    "version": "1.0",
    "configuration": {
        "reader": {
            "plugin": "sqlserver",
            "parameter": {
                "instanceName": "rds.aliyun.com",
                "username": "username",
                "password": "password",
                "table": "table",
                "column": [
                    "*"
                ],
                "where": "1 = 1"
            }
        },
        "writer": {
            "plugin": "PostgreSQL",
            "parameter": {
                "jdbcUrl": "jdbc:postgresql://ip:port/database",
                "username": "username",
                "password": "password",
                "table": "table",
                "column": [
                    "*"
                ],
                "preSql": [
                    "delete from XXX;"
                ]
            }
        }
    }
}
3.2 参数说明
  • instanceName

    • 描述: 阿里云RDS实例名称(Instance名称)。用户使用该配置指定RDS的Instance名称,CDP将翻译为底层执行的jdbc连接串连接。

      instanceName指定的是RDS实例,类似PostgreSQL实例,需要和database配合使用。

    • 必选:是

    • 默认值:无

  • jdbcUrl

    • 描述:对于CDP部分私有云场景,数据库迁移场景等,其本身数据源是普通PostgreSQL数据库不是RDS,对于这类场景,用户可以指定jdbc信息直连。

      jdbcUrl和instanceName/database两类信息概念上是等同的,因此只能配置其一。如果两者均配置,CDP默认将使用jdbc信息。

    • 必选:是

    • 默认值:无

  • database

    • 描述: 阿里云RDS数据库名称。
    • 必选:是
    • 默认值:无
  • username

    • 描述:数据源的用户名
    • 必选:是
    • 默认值:无
  • password

    • 描述:数据源指定用户名的密码
    • 必选:是
    • 默认值:无
  • table

    • 描述:所选取的需要同步的表。
    • 必选:是
    • 默认值:无
  • column

    • 描述:所配置的表中需要同步的列名集合。以英文逗号(,)进行分隔。我们强烈不推荐用户使用默认列情况
    • 必选:是
    • 默认值:无
  • preSql

    • 描述:执行数据同步任务之前率先执行的sql语句,目前只允许执行一条SQL语句,例如清除旧数据。
    • 必选:否
    • 默认值:无
  • postSql

    • 描述:执行数据同步任务之后执行的sql语句,目前只允许执行一条SQL语句,例如加上某一个时间戳。
    • 必选:否
    • 默认值:无
  • batchSize

    • 描述:一次性批量提交的记录数大小,该值可以极大减少CDP与PostgreSQL的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成CDP运行进程OOM情况。
    • 必选:否
    • 默认值:512
3.3 类型转换

目前 PostgresqlWriter支持大部分 PostgreSQL类型,但也存在部分没有支持的情况,请注意检查你的类型。

下面列出 PostgresqlWriter针对 PostgreSQL类型转换列表:

CDP 内部类型 PostgreSQL 数据类型
Long bigint, bigserial, integer, smallint, serial
Double double precision, money, numeric, real
String varchar, char, text, bit
Date date, time, timestamp
Boolean bool
Bytes bytea

4 性能报告

4.1 环境准备
4.1.1 数据特征

建表语句:

 create table pref_test(
 id serial, 
 a_bigint bigint, 
 a_bit bit(10), 
 a_boolean boolean, 
 a_char character(5), 
 a_date date, 
 a_double double precision, 
 a_integer integer, 
 a_money money, 
 a_num numeric(10,2),
 a_real real, 
 a_smallint smallint, 
 a_text text, 
 a_time time, 
 a_timestamp timestamp
)
4.1.2 机器参数
  • 执行CDP的机器参数为:

    1. cpu: 16核 Intel(R) Xeon(R) CPU E5620  @ 2.40GHz
    2. mem: MemTotal: 24676836kB    MemFree: 6365080kB
    3. net: 百兆双网卡
    
  • PostgreSQL数据库机器参数为:

    D12 24逻辑核  192G内存 12*480G SSD 阵列
    
4.2 测试报告
4.2.1 单表测试报告
通道数 批量提交batchSize CDP速度(Rec/s) CDP流量(M/s) CDP机器运行负载
1 128 9259 0.55 0.3
1 512 10869 0.653 0.3
1 2048 9803 0.589 0.8
4 128 30303 1.82 1
4 512 36363 2.18 1
4 2048 36363 2.18 1
8 128 57142 3.43 2
8 512 66666 4.01 1.5
8 2048 66666 4.01 1.1
16 128 88888 5.34 1.8
16 2048 94117 5.65 2.5
32 512 76190 4.58 3
4.2.2 性能测试小结
  1. channel数对性能影响很大
  2. 通常不建议写入数据库时,通道个数 > 32

5 FAQ


Q: PostgreSQLWriter 执行 postSql 语句报错,那么数据导入到目标数据库了吗?

A: CDP 导入过程存在三块逻辑,pre 操作、导入操作、post 操作,其中任意一环报错,CDP 作业报错。由于 CDP 不能保证在同一个事务完成上述几个操作,因此有可能数据已经落入到目标端。


Q: 按照上述说法,那么有部分脏数据导入数据库,如果影响到线上数据库怎么办?

A: 目前有两种解法,第一种配置 pre 语句,该 sql 可以清理当天导入数据, CDP 每次导入时候可以把上次清理干净并导入完整数据。第二种,向临时表导入数据,完成后再 rename 到线上表。


Q: 上面第二种方法可以避免对线上数据造成影响,那我具体怎样操作?

A: 可以配置临时表导入