本文为您介绍如何使用JDBC连接器。

背景信息

此连接器为开源Flink的JDBC连接器,JDBC连接器提供了对MySQL、PostgreSQL和Oracle等常见的数据库读写支持。JDBC连接器支持的信息如下。
类别详情
支持类型源表、维表和结果表
运行模式流模式和批模式
数据格式暂不适用
特有监控指标暂无
API种类SQL
是否支持更新或删除结果表数据

前提条件

连接的数据库和表都已被创建。

使用限制

  • 仅实时计算引擎VVR 6.0.1及以上版本支持JDBC连接器。
  • JDBC源表为Bounded Source,表中数据读取完,对应的Task就会结束。如果需要捕获实时变更数据,则请使用CDC连接器,详情请参见MySQL的CDC源表Postgres的CDC源表(公测中)
  • 使用JDBC结果表连接PostgreSQL数据库时,需要数据库版本为PostgreSQL 9.5及以上。因为DDL中定义主键的情况下,PostgreSQL采用ON CONFLICT语法进行插入或更新,此语法需要PostgreSQL 9.5及以上版本才支持。
  • Flink全托管中只提供了开源JDBC连接器的实现,不包含具体的数据库的Driver。在使用JDBC连接器时,需要手动上传目标数据库Driver的JAR包。目前支持的Driver如下表所示。
    DriverGroup IdArtifact Id
    MySQLmysqlmysql-connector-java
    Oraclecom.oracle.database.jdbcojdbc8
    PostgreSQLorg.postgresqlpostgresql
    说明 如果您采用非列表中的JDBC Driver,则其正确性和可用性需要您自行充分测试并保证。

语法结构

CREATE TABLE jdbc_table (
  `id` BIGINT,
  `name` VARCHAR,
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:xxx',
  'table-name' = '<yourTable>',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>'
);

WITH参数

  • 通用
    参数说明数据类型是否必填默认值备注
    connector表类型。String固定值为jdbc。
    url数据库的URL。String无。
    table-nameJDBC表的名称。String无。
    usernameJDBC用户名称。String如果指定了username和password中的任一参数,则两者必须都被指定。
    passwordJDBC用户密码。String
  • 源表独有
    参数说明数据类型是否必填默认值备注
    scan.partition.column对输入进行分区的列名。String该列必须是数值类型、日期类型和时间戳类型等。关于分区扫描的详情请参见Partitioned Scan
    scan.partition.num分区数。Integer无。
    scan.partition.lower-bound第一个分区的最小值。Integer无。
    scan.partition.upper-bound最后一个分区的最大值。Integer无。
    scan.fetch-size每次循环读取时,从数据库中获取的行数。Integer0如果指定的值为0,则该配置项会被忽略。
    scan.auto-commit是否开启auto-commitBooleantrue无。
  • 结果表独有
    参数说明数据类型是否必填默认值备注
    sink.buffer-flush.max-rowsflush数据前,缓存记录的最大值。Integer100您可以设置为0来禁用它,即不再缓存记录,直接flush数据。
    sink.buffer-flush.intervalflush数据的时间间隔。数据在Flink中缓存的时间超过该参数指定的时间后,异步线程将flush数据到数据库中。Duration1 s您可以设置为0来禁用它,即不再缓存记录,直接flush数据。
    说明 如果您需要完全异步地处理缓存的flush事件,则可以将sink.buffer-flush.max-rows设置为0,并配置适当的flush时间间隔。
    sink.max-retries写入记录到数据库失败后的最大重试次数。Integer3无。
  • 维表独有
    参数说明数据类型是否必填默认值备注
    lookup.cache.max-rows指定缓存的最大行数。如果超过该值,则最老的行记录将会过期,会被新的记录替换掉。Integer默认情况下,维表Cache是未开启的。您可以设置lookup.cache.max-rowslookup.cache.ttl参数来启用维表Cache。启用缓存时,采用的是LRU策略缓存。
    lookup.cache.ttl指定缓存中每行记录的最大存活时间。如果某行记录超过该时间,则该行记录将会过期。Duration
    lookup.cache.caching-missing-key是否缓存空的查询结果。Booleantrue参数取值如下:
    • true(默认值):缓存空的查询结果。
    • false:不缓存空的查询结果。
    lookup.max-retries查询数据库失败的最大重试次数。Integer3无。

类型映射

MySQL类型Oracle类型PostgreSQL类型FlinkSQL类型
TINYINTTINYINT
  • SMALLINT
  • TINYINT UNSIGNED
  • SMALLINT
  • INT2
  • SMALLSERIAL
  • SERIAL2
SMALLINT
  • INT
  • MEDIUMINT
  • SMALLINT UNSIGNED
  • INTEGER
  • SERIAL
INT
  • BIGINT
  • INT UNSIGNED
  • BIGINT
  • BIGSERIAL
BIGINT
BIGINT UNSIGNEDDECIMAL(20, 0)
BIGINTBIGINTBIGINT
FLOATBINARY_FLOAT
  • REAL
  • FLOAT4
FLOAT
  • DOUBLE
  • DOUBLE PRECISION
BINARY_DOUBLE
  • FLOAT8
  • DOUBLE PRECISION
DOUBLE
  • NUMERIC(p, s)
  • DECIMAL(p, s)
  • SMALLINT
  • FLOAT(s)
  • DOUBLE PRECISION
  • REAL
  • NUMBER(p, s)
  • NUMERIC(p, s)
  • DECIMAL(p, s)
DECIMAL(p, s)
  • BOOLEAN
  • TINYINT(1)
BOOLEANcanBOOLEAN
DATEDATEDATEDATE
TIME [(p)]DATETIME [(p)] [WITHOUT TIMEZONE]TIME [(p)] [WITHOUT TIMEZONE]
DATETIME [(p)]TIMESTAMP [(p)] [WITHOUT TIMEZONE]TIMESTAMP [(p)] [WITHOUT TIMEZONE]TIMESTAMP [(p)] [WITHOUT TIMEZONE]
  • CHAR(n)
  • VARCHAR(n)
  • TEXT
  • CHAR(n)
  • VARCHAR(n)
  • CLOB
  • CHAR(n)
  • CHARACTER(n)
  • VARCHAR(n)
  • CHARACTER VARYING(n)
  • TEXT
STRING
  • BINARY
  • VARBINARY
  • BLOB
  • RAW(s)
  • BLOB
BYTEABYTES
ARRAYARRAY

使用示例

  • 源表
    CREATE TEMPORARY TABLE jdbc_source (
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:xxx',
      'table-name' = '<yourTable>',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT * FROM jdbc_source ;
  • 结果表
    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE jdbc_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:xxxx',
      'table-name' = '<yourTable>',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>'
    );
    
    INSERT INTO jdbc_sink
    SELECT * FROM datagen_source;
  • 维表
    CREATE TEMPORARY TABLE datagen_source(
     `id` INT,
     `data` BIGINT,
     `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE jdbc_dim (
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:xxx',
      'table-name' = '<yourTable>',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      `id` INT,
      `data` BIGINT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.`id`,T.`data`, H.`name`
    FROM datagen_source AS T
    JOIN jdbc_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.id = H.id;