本文为您介绍PostgreSQL Writer支持的数据类型、写入方式、字段映射和数据源等参数及配置示例。

PostgreSQL Writer插件实现了向PostgreSQL写入数据。在底层实现上,PostgreSQL Writer通过JDBC连接远程PostgreSQL数据库,并执行相应的SQL语句,将数据写入PostgreSQL。
说明 开始配置PostgreSql Writer插件前,请首先配置好数据源,详情请参见配置PostgreSQL数据源
  • 对于您配置的tablecolumnwhere等信息,PostgreSQL Writer将其拼接为SQL语句发送至PostgreSQL数据库。
  • 对于您配置的querySql信息,PostgreSQL直接将其发送至PostgreSQL数据库。

注意事项

当PostgreSQL中表名称、字段名称是以数字开头,或者名称中包含大小写英文字母、中划线(-)时需要使用双引号("")进行转义,不进行转义会导致PostgreSQL Writer插件写入数据至PostgreSQL失败。但是在PostgreSQL Writer插件中,双引号("")为JSON关键字,因此,您需要使用反斜线(\)再次对双引号("")进行转义。例如,表名称为123Test,则转义后表名称为\"123Test\"
说明
  • 双引号("")中,前引号(")和后引号(")均需使用反斜线(\)进行转义。
  • 向导模式不支持转义,您需要转换为脚本模式进行转义。
使用脚本模式进行转义的代码示例如下。
"parameter": {
    "datasource": "abc",
    "column": [
        "id",
        "\"123Test\"", //添加转义符
],
"where": "",
"splitPk": "id",
"table": "public.wpw_test"
},

类型转换列表

PostgreSQL Writer支持大部分PostgreSQL类型,请注意检查您的数据类型。

PostgreSQL Writer针对PostgreSQL的类型转换列表,如下所示。
数据集成内部类型 PostgreSQL数据类型
LONG BIGINT、BIGSERIAL、INTEGER、SMALLINT和SERIAL
DOUBLE DOUBLE、PRECISION、MONEY、NUMERIC和REAL
STRING VARCHAR、CHAR、TEXT、BIT和INET
DATE DATE、TIME和TIMESTAMP
BOOLEAN BOOL
BYTES BYTEA
说明
  • 除上述罗列字段类型外,其它类型均不支持。
  • MONEY、INET和BIT需要您使用a_inet::varchar类似的语法进行转换。

参数说明

参数 描述 是否必选 默认值
datasource 数据源名称,脚本模式支持添加数据源,该配置项填写的内容必须要与添加的数据源名称保持一致。
table 选取的需要同步的表名称。
writeMode 选择导入模式,目前支持insert和copy两种方式:
  • insert:执行PostgreSQL的insert into...values... 语句,将数据写入PostgreSQL中。当数据出现主键/唯一性索引冲突时,待同步的数据行写入PostgreSQL失败,当前记录行成为脏数据。建议您优先选择insert模式。
  • copy:PostgreSQL提供copy命令,用于表与文件(标准输出,标准输入)之间的相互复制。数据集成支持使用copy from将数据加载到表中。建议您在遇到性能问题时再尝试使用该模式。
insert
column 目标表需要写入数据的字段,字段之间用英文逗号分隔。例如"column":["id","name","age"]。如果要依次写入全部列,使用(*)表示,例如"column":["*"]
preSql 执行数据同步任务之前率先执行的SQL语句。目前向导模式仅允许执行一条SQL语句,脚本模式可以支持多条SQL语句,例如清除旧数据。
postSql 执行数据同步任务之后执行的SQL语句。目前向导模式仅允许执行一条SQL语句,脚本模式可以支持多条SQL语句,例如加上某一个时间戳。
batchSize 一次性批量提交的记录数大小,该值可以极大减少数据集成与PostgreSQL的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成数据集成运行进程OOM情况。 1,024
pgType PostgreSQL特有类型的转化配置,支持bigint[]、double[]、text[]、Jsonb和JSON类型。配置示例如下。
{
    "job":
    {
        "content":
        [{
            "reader": {...},
            "writer":
            {
                "parameter":
                {
                    "column":
                    [
                        // 目标表字段列表
                        "bigint_arr",
                        "double_arr",
                        "text_arr",
                        "jsonb_obj",
                        "json_obj"
                    ],
                    "pgType":
                    {
                        // 特殊的类型设置,key为目标表的字段名,value为字段类型。
                        "bigint_arr": "bigint[]",
                        "double_arr": "double[]",
                        "text_arr": "text[]",
                        "jsonb_obj": "jsonb",
                        "json_obj": "json"
                    }

                }
            }
        }]
    }
}

向导开发介绍

  1. 选择数据源。
    配置同步任务的数据来源数据去向选择数据来源
    参数 描述
    数据源 即上述参数说明中的datasource,通常填写您配置的数据源名称。
    即上述参数说明中的table
    导入前准备语句 即上述参数说明中的preSql,输入执行数据同步任务之前率先执行的SQL语句。
    导入后完成语句 即上述参数说明中的postSql,输入执行数据同步任务之后执行的SQL语句。
    导入模式 即上述参数说明中的writeMode,包括insertcopy两种模式。
  2. 字段映射,即上述参数说明中的column,左侧的源头表字段和右侧的目标表字段为一一对应的关系。字段映射
    参数 描述
    同名映射 单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。
    同行映射 单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。
    取消映射 单击取消映射,可以取消建立的映射关系。
    自动排版 可以根据相应的规律自动排版。
  3. 通道控制。通道配置
    参数 描述
    任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。
    同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。
    错误记录数 错误记录数,表示脏数据的最大容忍条数。
    分布式处理能力

    数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组新增和使用独享数据集成资源组

脚本开发介绍

使用脚本模式开发的详情请参见通过脚本模式配置任务

脚本配置示例如下,详情请参见上述参数说明。
{
    "type":"job",
    "version":"2.0",//版本号。
    "steps":[ 
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"postgresql",//插件名。
            "parameter":{
                "postSql":[],//执行数据同步任务之后率先执行的SQL语句。
                "datasource":"//数据源。
                    "col1",
                    "col2"
                ],
                "table":"",//表名。
                "preSql":[]//执行数据同步任务之前率先执行的SQL语句。
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"//错误记录数
        },
        "speed":{
            "throttle":true,//当throttle值为flase时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1, //作业并发数。
            "mbps":"12"//限流
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}