本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
本文为您介绍如何使用JDBC连接器。
背景信息
此连接器为开源Flink的JDBC连接器,JDBC连接器提供了对MySQL、PostgreSQL和Oracle等常见的数据库读写支持。JDBC连接器支持的信息如下。
| 类别 | 详情 | 
| 支持类型 | 源表、维表和结果表 | 
| 运行模式 | 流模式和批模式 | 
| 数据格式 | 暂不适用 | 
| 特有监控指标 | 暂无 | 
| API种类 | SQL | 
| 是否支持更新或删除结果表数据 | 是 | 
前提条件
连接的数据库和表都已被创建。
使用限制
- JDBC源表为有界数据源。数据读取完成后,任务自动结束。如需捕获实时变更,请使用CDC连接器,详情请参见MySQL的CDC源表和Postgres的CDC源表(公测中)。 
- 写入PostgreSQL结果表时,数据库版本必须为9.5及以上。否则ON CONFLICT语法不支持,写入失败。 
- Flink不内置数据库Driver。您必须手动上传对应Driver的JAR包作为附加依赖文件。目前支持的Driver如下表所示: - Driver - Group Id - Artifact Id - MySQL - mysql - Oracle - com.oracle.database.jdbc - PostgreSQL - org.postgresql - 如果您采用非列表中的JDBC Driver,则其正确性和可用性需要您自行充分测试并保证。 
- JDBC连接器在向MySQL结果表写入数据时,会将接收到的每条数据拼接成一条SQL去执行。对于包含主键的MySQL结果表,会拼接执行 - INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;语句。警告- 若表存在唯一索引(非主键),插入主键不同但唯一索引相同的记录,将因冲突导致数据覆盖,引发数据丢失。 
语法结构
CREATE TABLE jdbc_table (
  `id` BIGINT,
  `name` VARCHAR,
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:xxx',
  'table-name' = '<yourTable>',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>'
);WITH参数
- 通用 - 参数 - 说明 - 数据类型 - 是否必填 - 默认值 - 备注 - connector - 表类型。 - String - 是 - 无 - 固定值为jdbc。 - url - 数据库的URL。 - String - 是 - 无 - 无。 - table-name - JDBC表的名称。 - String - 是 - 无 - 无。 - username - JDBC用户名称。 - String - 否 - 无 - 如果指定了username和password中的任一参数,则两者必须都被指定。 - password - JDBC用户密码。 - String - 否 - 无 
- 源表独有 - 参数 - 说明 - 数据类型 - 是否必填 - 默认值 - 备注 - scan.partition.column - 对输入进行分区的列名。 - String - 否 - 无 - 该列必须是数值类型或时间戳类型,且该类型在数据库中需要支持与数值类型进行比较。关于分区扫描的详情请参见Partitioned Scan。 - scan.partition.num - 分区数。 - Integer - 否 - 无 - 无。 - scan.partition.lower-bound - 第一个分区的最小值。 - Long - 否 - 无 - 无。 - scan.partition.upper-bound - 最后一个分区的最大值。 - Long - 否 - 无 - 无。 - scan.fetch-size - 每次循环读取时,从数据库中获取的行数。 - Integer - 否 - 0 - 如果指定的值为0,则该配置项会被忽略。 - scan.auto-commit - 是否开启auto-commit。 - Boolean - 否 - true - 无。 
- 结果表独有 - 参数 - 说明 - 数据类型 - 是否必填 - 默认值 - 备注 - sink.buffer-flush.max-rows - flush数据前,缓存记录的最大值。 - Integer - 否 - 100 - 您可以设置为0来禁用它,即不再缓存记录,直接flush数据。 - sink.buffer-flush.interval - flush数据的时间间隔。数据在Flink中缓存的时间超过该参数指定的时间后,异步线程将flush数据到数据库中。 - Duration - 否 - 1 s - 您可以设置为0来禁用它,即不再缓存记录,直接flush数据。 说明- 如果您需要完全异步地处理缓存的flush事件,则可以将sink.buffer-flush.max-rows设置为0,并配置适当的flush时间间隔。 - sink.max-retries - 写入记录到数据库失败后的最大重试次数。 - Integer - 否 - 3 - 无。 
- 维表独有 - 参数 - 说明 - 数据类型 - 是否必填 - 默认值 - 备注 - lookup.cache.max-rows - 指定缓存的最大行数。如果超过该值,则最老的行记录将会过期,会被新的记录替换掉。 - Integer - 否 - 无 - 默认情况下,维表Cache是未开启的。您可以设置lookup.cache.max-rows和lookup.cache.ttl参数来启用维表Cache。启用缓存时,采用的是LRU策略缓存。 - lookup.cache.ttl - 指定缓存中每行记录的最大存活时间。如果某行记录超过该时间,则该行记录将会过期。 - Duration - 否 - 无 - lookup.cache.caching-missing-key - 是否缓存空的查询结果。 - Boolean - 否 - true - 参数取值如下: - true(默认值):缓存空的查询结果。 
- false:不缓存空的查询结果。 
 - lookup.max-retries - 查询数据库失败的最大重试次数。 - Integer - 否 - 3 - 无。 
- PostgreSQL独有 - 参数 - 说明 - 数据类型 - 是否必填 - 默认值 - 备注 - source.extend-type.enabled - 作为源表和维表时,是否允许读取JSONB和UUID拓展类型,并映射到Flink支持的类型。 - Boolean - 否 - false - 参数取值如下: - true:支持读取和映射拓展类型。 
- false(默认值):不支持读取和映射拓展类型。 
 
类型映射
| MySQL类型 | Oracle类型 | PostgreSQL类型 | FlinkSQL类型 | 
| TINYINT | 无 | 无 | TINYINT | 
| 
 | 无 | 
 | SMALLINT | 
| 
 | 无 | 
 | INT | 
| 
 | 无 | 
 | BIGINT | 
| BIGINT UNSIGNED | 无 | 无 | DECIMAL(20, 0) | 
| BIGINT | 无 | BIGINT | BIGINT | 
| FLOAT | BINARY_FLOAT | 
 | FLOAT | 
| 
 | BINARY_DOUBLE | 
 | DOUBLE | 
| 
 | 
 | 
 | DECIMAL(p, s) | 
| 
 | 无 | BOOLEANcan | BOOLEAN | 
| DATE | DATE | DATE | DATE | 
| TIME [(p)] | DATE | TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] | 
| DATETIME [(p)] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] | 
| 
 | 
 | 
 | STRING | 
| 
 | 
 | BYTEA | BYTES | 
| 无 | 无 | ARRAY | ARRAY | 
使用示例
- 源表 - CREATE TEMPORARY TABLE jdbc_source ( `id` INT, `name` VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:xxx', 'table-name' = '<yourTable>', 'username' = '<yourUsername>', 'password' = '<yourPassword>' ); CREATE TEMPORARY TABLE blackhole_sink( `id` INT, `name` VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT * FROM jdbc_source ;
- 结果表 - CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE jdbc_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:xxxx', 'table-name' = '<yourTable>', 'username' = '<yourUsername>', 'password' = '<yourPassword>' ); INSERT INTO jdbc_sink SELECT * FROM datagen_source;
- 维表 - CREATE TEMPORARY TABLE datagen_source( `id` INT, `data` BIGINT, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE jdbc_dim ( `id` INT, `name` VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:xxx', 'table-name' = '<yourTable>', 'username' = '<yourUsername>', 'password' = '<yourPassword>' ); CREATE TEMPORARY TABLE blackhole_sink( `id` INT, `data` BIGINT, `name` VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.`id`,T.`data`, H.`name` FROM datagen_source AS T JOIN jdbc_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.id = H.id;