JDBC

本文为您介绍如何使用JDBC连接器。

背景信息

此连接器为开源Flink的JDBC连接器,JDBC连接器提供了对MySQL、PostgreSQL和Oracle等常见的数据库读写支持。JDBC连接器支持的信息如下。

类别

详情

支持类型

源表、维表和结果表

运行模式

流模式和批模式

数据格式

暂不适用

特有监控指标

暂无

API种类

SQL

是否支持更新或删除结果表数据

前提条件

连接的数据库和表都已被创建。

使用限制

  • 仅实时计算引擎VVR 6.0.1及以上版本支持JDBC连接器。

  • JDBC源表为Bounded Source,表中数据读取完,对应的Task就会结束。如果需要捕获实时变更数据,则请使用CDC连接器,详情请参见MySQL的CDC源表Postgres的CDC源表(公测中)

  • 使用JDBC结果表连接PostgreSQL数据库时,需要数据库版本为PostgreSQL 9.5及以上。因为DDL中定义主键的情况下,PostgreSQL采用ON CONFLICT语法进行插入或更新,此语法需要PostgreSQL 9.5及以上版本才支持。

  • Flink中只提供了开源JDBC连接器的实现,不包含具体的数据库的Driver。在使用JDBC连接器时,需要手动上传目标数据库Driver的JAR包作为附加依赖文件,具体操作请参见步骤三:进行更多配置。目前支持的Driver如下表所示。

    Driver

    Group Id

    Artifact Id

    MySQL

    mysql

    mysql-connector-java

    Oracle

    com.oracle.database.jdbc

    ojdbc8

    PostgreSQL

    org.postgresql

    postgresql

    • 如果您采用非列表中的JDBC Driver,则其正确性和可用性需要您自行充分测试并保证。

    • JDBC连接器在向MySQL结果表写入数据时,会将接收到的每条数据拼接成一条SQL去执行。对于包含主键的MySQL结果表,会拼接执行INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;语句。需要注意的是,如果物理表存在除主键外的唯一索引约束,当插入两条主键不同但唯一索引相同的记录时,下游数据会因为唯一索引冲突导致数据覆盖引发数据丢失。

语法结构

CREATE TABLE jdbc_table (
  `id` BIGINT,
  `name` VARCHAR,
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:xxx',
  'table-name' = '<yourTable>',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>'
);

WITH参数

  • 通用

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    表类型。

    String

    固定值为jdbc。

    url

    数据库的URL。

    String

    无。

    table-name

    JDBC表的名称。

    String

    无。

    username

    JDBC用户名称。

    String

    如果指定了username和password中的任一参数,则两者必须都被指定。

    password

    JDBC用户密码。

    String

  • 源表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    scan.partition.column

    对输入进行分区的列名。

    String

    该列必须是数值类型或时间戳类型,且该类型在数据库中需要支持与数值类型进行比较。关于分区扫描的详情请参见Partitioned Scan

    scan.partition.num

    分区数。

    Integer

    无。

    scan.partition.lower-bound

    第一个分区的最小值。

    Long

    无。

    scan.partition.upper-bound

    最后一个分区的最大值。

    Long

    无。

    scan.fetch-size

    每次循环读取时,从数据库中获取的行数。

    Integer

    0

    如果指定的值为0,则该配置项会被忽略。

    scan.auto-commit

    是否开启auto-commit

    Boolean

    true

    无。

  • 结果表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    sink.buffer-flush.max-rows

    flush数据前,缓存记录的最大值。

    Integer

    100

    您可以设置为0来禁用它,即不再缓存记录,直接flush数据。

    sink.buffer-flush.interval

    flush数据的时间间隔。数据在Flink中缓存的时间超过该参数指定的时间后,异步线程将flush数据到数据库中。

    Duration

    1 s

    您可以设置为0来禁用它,即不再缓存记录,直接flush数据。

    说明

    如果您需要完全异步地处理缓存的flush事件,则可以将sink.buffer-flush.max-rows设置为0,并配置适当的flush时间间隔。

    sink.max-retries

    写入记录到数据库失败后的最大重试次数。

    Integer

    3

    无。

  • 维表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    lookup.cache.max-rows

    指定缓存的最大行数。如果超过该值,则最老的行记录将会过期,会被新的记录替换掉。

    Integer

    默认情况下,维表Cache是未开启的。您可以设置lookup.cache.max-rowslookup.cache.ttl参数来启用维表Cache。启用缓存时,采用的是LRU策略缓存。

    lookup.cache.ttl

    指定缓存中每行记录的最大存活时间。如果某行记录超过该时间,则该行记录将会过期。

    Duration

    lookup.cache.caching-missing-key

    是否缓存空的查询结果。

    Boolean

    true

    参数取值如下:

    • true(默认值):缓存空的查询结果。

    • false:不缓存空的查询结果。

    lookup.max-retries

    查询数据库失败的最大重试次数。

    Integer

    3

    无。

  • PostgreSQL独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    source.extend-type.enabled

    作为源表和维表时,是否允许读取JSONB和UUID拓展类型,并映射到Flink支持的类型。

    Boolean

    false

    参数取值如下:

    • true:支持读取和映射拓展类型。

    • false(默认值):不支持读取和映射拓展类型。

类型映射

MySQL类型

Oracle类型

PostgreSQL类型

FlinkSQL类型

TINYINT

TINYINT

  • SMALLINT

  • TINYINT UNSIGNED

  • SMALLINT

  • INT2

  • SMALLSERIAL

  • SERIAL2

SMALLINT

  • INT

  • MEDIUMINT

  • SMALLINT UNSIGNED

  • INTEGER

  • SERIAL

INT

  • BIGINT

  • INT UNSIGNED

  • BIGINT

  • BIGSERIAL

BIGINT

BIGINT UNSIGNED

DECIMAL(20, 0)

BIGINT

BIGINT

BIGINT

FLOAT

BINARY_FLOAT

  • REAL

  • FLOAT4

FLOAT

  • DOUBLE

  • DOUBLE PRECISION

BINARY_DOUBLE

  • FLOAT8

  • DOUBLE PRECISION

DOUBLE

  • NUMERIC(p, s)

  • DECIMAL(p, s)

  • SMALLINT

  • FLOAT(s)

  • DOUBLE PRECISION

  • REAL

  • NUMBER(p, s)

  • NUMERIC(p, s)

  • DECIMAL(p, s)

DECIMAL(p, s)

  • BOOLEAN

  • TINYINT(1)

BOOLEANcan

BOOLEAN

DATE

DATE

DATE

DATE

TIME [(p)]

DATE

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

DATETIME [(p)]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

  • CHAR(n)

  • VARCHAR(n)

  • TEXT

  • CHAR(n)

  • VARCHAR(n)

  • CLOB

  • CHAR(n)

  • CHARACTER(n)

  • VARCHAR(n)

  • CHARACTER VARYING(n)

  • TEXT

  • JSONB

  • UUID

STRING

  • BINARY

  • VARBINARY

  • BLOB

  • RAW(s)

  • BLOB

BYTEA

BYTES

ARRAY

ARRAY

使用示例

  • 源表

    CREATE TEMPORARY TABLE jdbc_source (
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:xxx',
      'table-name' = '<yourTable>',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT * FROM jdbc_source ;
  • 结果表

    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE jdbc_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:xxxx',
      'table-name' = '<yourTable>',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>'
    );
    
    INSERT INTO jdbc_sink
    SELECT * FROM datagen_source;
  • 维表

    CREATE TEMPORARY TABLE datagen_source(
     `id` INT,
     `data` BIGINT,
     `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE jdbc_dim (
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:xxx',
      'table-name' = '<yourTable>',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      `id` INT,
      `data` BIGINT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.`id`,T.`data`, H.`name`
    FROM datagen_source AS T
    JOIN jdbc_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.id = H.id;