本文为您介绍如何使用OceanBase连接器。
背景信息
OceanBase数据库是一款原生分布式的HTAP数据库管理系统,详情请参见OceanBase官网。为了降低您从MySQL数据库或Oracle数据库迁移到OceanBase数据库时引发的业务系统改造成本,OceanBase数据库支持Oracle和MySQL两种兼容模式,两种模式下的数据类型、SQL功能、内部视图等与MySQL数据库或Oracle数据库保持一致。两种模式下建议使用的连接器如下:
Oracle模式:只能使用OceanBase连接器。
MySQL模式:与原生MySQL语法保持高度兼容,支持使用OceanBase和MySQL两种连接器读写OceanBase。
重要OceanBase连接器目前处于公测阶段。在OceanBase 3.2.4.4及以上版本,您可以使用MySQL连接器读写OceanBase,该功能也属于公测范围,请在使用前充分评估并谨慎使用。
在使用MySQL连接器读取OceanBase增量数据时,请确保OceanBase Binlog已开启且被正确设置。有关OceanBase Binlog的更多信息,请参见概述或Binlog 相关操作。
OceanBase连接器支持的信息如下。
类别 | 详情 |
支持类型 | 源表、维表和结果表 |
运行模式 | 流模式和批模式 |
数据格式 | 暂不适用 |
特有监控指标 | 暂无 |
API种类 | SQL |
是否支持更新或删除结果表数据 | 是 |
前提条件
连接的数据库和表都已被创建。
已设置IP白名单,详情请参见设置白名单分组。
如需采集OceanBase增量(CDC)数据,还需开启 OceanBase Binlog服务,参考Binlog 相关操作。
使用限制
Flink计算引擎VVR 8.0.1及以上版本支持OceanBase连接器。
语义上可以保证At-Least-Once,在结果表有主键的情况下,幂等可以保证数据的正确性。
语法结构
CREATE TABLE oceanabse_source (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase',
'url' = '<yourJdbcUrl>',
'tableName' = '<yourTableName>',
'userName' = '<yourUserName>',
'password' = '<yourPassword>'
);连接器写入结果表原理:写入结果表时,会将接收到的每条数据拼接成一条SQL去执行。具体执行的SQL情况如下:
对于没有主键的结果表,会拼接成INSERT INTO语句。
对于包含主键的结果表,会根据数据库的兼容模式拼接成UPSERT语句。
WITH参数
通用
参数
说明
是否必填
数据类型
默认值
备注
connector
表类型。
是
STRING
无
固定值为
oceanbase。password
密码。
是
STRING
无
无。
源表独有
重要注意事项:自Flink计算引擎VVR 11.4.0版本起,OceanBase CDC连接器进行了重大架构升级与功能调整。为确保用户准确理解变更内容并顺利完成版本迁移,现将核心变更说明如下:
原基于OceanBase LogProxy服务实现的 CDC 连接器已正式下线并从发行版本中移除。自 VVR-11.4.0 版本起,OceanBase CDC连接器仅支持通过OceanBase Binlog服务进行增量日志的捕获与数据同步。
OceanBase CDC连接器增强了对 OceanBase Binlog 服务的协议兼容性、连接稳定性,建议用户优先使用 OceanBase CDC 连接器。
OceanBase Binlog服务在协议层完全兼容 MySQL 复制协议,也可使用标准MySQL CDC连接器连接至OceanBase Binlog服务以实现数据订阅,但不作推荐。
自Flink计算引擎VVR 11.4.0版本起,OceanBase CDC 连接器不再支持在 Oracle 兼容模式下进行增量数据订阅。Oracle 兼容模式下的增量数据订阅请联系 OceanBase 企业技术支持。
参数
说明
是否必填
数据类型
默认值
备注
hostname
OceanBase数据库的IP地址或者Hostname。
是
STRING
否
建议填写专有网络VPC地址。
说明如果OceanBase与实时计算Flink版不在同一VPC,需要先打通跨VPC的网络或者使用公网的形式访问,详情请参见空间管理与操作和Flink全托管集群如何访问公网?。
username
OceanBase数据库服务的用户名。
是
STRING
否
无。
database-name
OceanBase数据库名称。
是
STRING
无
作为源表时,数据库名称支持正则表达式以读取多个数据库的数据。
使用正则表达式时,尽量不要使用^和$符号匹配开头和结尾。具体原因详见table-name备注的说明。
table-name
OceanBase表明。
是
STIRNG
无
作为源表时,表名支持正则表达式以读取多个表的数据。
使用正则表达式时,尽量不要使用^和$符号匹配开头和结尾。具体原因详见以下说明。
说明OceanBase 源表在正则匹配表名时,会将您填写的 database-name,table-name 通过字符串 \\.(VVR 8.0.1前使用字符.)连接成为一个全路径的正则表达式,然后使用该正则表达式和OceanBase数据库中表的全限定名进行正则匹配。
例如:当配置'database-name'='db_.*'且'table-name'='tb_.+'时,连接器将会使用正则表达式db_.*\\.tb_.+(8.0.1版本前为db_.*.tb_.+)去匹配表的全限定名来确定需要读取的表。
port
OceanBase数据库服务的端口号。
否
INTEGER
3306
无。
server-id
数据库客户端的一个数字ID。
否
STRING
默认会随机生成一个5400~6400的值。
该ID必须是全局唯一的。建议针对同一个数据库的每个作业都设置一个不同的ID。
该参数也支持ID范围的格式,例如5400-5408。在开启增量读取模式时支持多并发读取,此时推荐设定为ID范围,使得每个并发使用不同的ID。详情请参见Server ID使用。
scan.incremental.snapshot.chunk.size
每个chunk的大小(包含的行数)。
否
INTEGER
8096
当开启增量快照读取时,表会被切分成多个chunk读取。在读完chunk的数据之前,chunk的数据会先缓存在内存中。
每个chunk包含的行数越少,则表中的chunk的总数量越大,尽管这会降低故障恢复的粒度,但可能导致内存OOM和整体的吞吐量降低。因此,您需要进行权衡,并设置合理的chunk大小。
scan.snapshot.fetch.size
当读取表的全量数据时,每次最多拉取的记录数。
否
INTEGER
1024
无。
scan.startup.mode
消费数据时的启动模式。
否
STRING
initial
参数取值如下:
initial(默认):在第一次启动时,会先扫描历史全量数据,然后读取最新的Binlog数据。
latest-offset:在第一次启动时,不会扫描历史全量数据,直接从Binlog的末尾(最新的Binlog处)开始读取,即只读取该连接器启动以后的最新变更。
earliest-offset:不扫描历史全量数据,直接从可读取的最早Binlog开始读取。
specific-offset:不扫描历史全量数据,从您指定的Binlog位点启动,位点可通过同时配置scan.startup.specific-offset.file和scan.startup.specific-offset.pos参数来指定从特定Binlog文件名和偏移量启动,也可以只配置scan.startup.specific-offset.gtid-set来指定从某个GTID集合启动。
timestamp:不扫描历史全量数据,从指定的时间戳开始读取Binlog。时间戳通过scan.startup.timestamp-millis指定,单位为毫秒。
重要使用earliest-offset,specific-offset和timestamp启动模式时,确保在指定的Binlog消费位置到作业启动的时间之间,对应表的结构不发生变化,避免因表结构不同而报错。
scan.startup.specific-offset.file
使用指定位点模式启动时,启动位点的Binlog文件名。
否
STRING
无
使用该配置时,scan.startup.mode必须配置为specific-offset。文件名格式例如
mysql-bin.000003。scan.startup.specific-offset.pos
使用指定位点模式启动时,启动位点在指定Binlog文件中的偏移量。
否
INTEGER
无
使用该配置时,scan.startup.mode必须配置为specific-offset。
scan.startup.specific-offset.gtid-set
使用指定位点模式启动时,启动位点的GTID集合。
否
STRING
无
使用该配置时,scan.startup.mode必须配置为specific-offset。GTID集合格式例如
24DA167-0C0C-11E8-8442-00059A3C7B00:1-19。scan.startup.timestamp-millis
使用指定时间模式启动时,启动位点的毫秒时间戳。
否
LONG
无
使用该配置时,scan.startup.mode必须配置为timestamp。时间戳单位为毫秒。
重要在使用指定时间时,OceanBase CDC会尝试读取每个Binlog文件的初始事件以确定其时间戳,最终定位至指定时间对应的Binlog文件。请保证指定的时间戳对应的Binlog文件在数据库上没有被清理且可以被读取到。
server-time-zone
数据库在使用的会话时区。
否
STRING
如果您没有指定该参数,则系统默认使用Flink作业运行时的环境时区作为数据库服务器时区,即您选择的可用区所在的时区。
例如Asia/Shanghai,该参数控制了TIMESTAMP类型如何转成STRING类型。更多信息请参见Debezium时间类型。
debezium.min.row.count.to.stream.results
当表的条数大于该值时,会使用分批读取模式。
否
INTEGER
1000
Flink采用以下方式读取OceanBase源表数据:
全量读取:直接将整个表的数据读取到内存里。优点是速度快,缺点是会消耗对应大小的内存,如果源表数据量非常大,可能会有OOM风险。
分批读取:分多次读取,每次读取一定数量的行数,直到读取完所有数据。优点是读取数据量比较大的表没有OOM风险,缺点是读取速度相对较慢。
connect.timeout
连接OceanBase数据库服务器超时时,重试连接之前等待超时的最长时间。
否
DURATION
30s
无。
connect.max-retries
连接OceanBase数据库服务时,连接失败后重试的最大次数。
否
INTEGER
3
无。
connection.pool.size
数据库连接池大小。
否
INTEGER
20
数据库连接池用于复用连接,可以降低数据库连接数量。
jdbc.properties.*
JDBC URL中的自定义连接参数。
否
STRING
无
您可以传递自定义的连接参数,例如不使用SSL协议,则可配置为'jdbc.properties.useSSL' = 'false'。
支持的连接参数请参见MySQL Configuration Properties。
debezium.*
Debezium读取Binlog的自定义参数。
否
STRING
无
您可以传递自定义的Debezium参数,例如使用'debezium.event.deserialization.failure.handling.mode'='ignore'来指定解析错误时的处理逻辑。
heartbeat.interval
Source通过心跳事件推动Binlog位点前进的时间间隔。
否
DURATION
30s
心跳事件用于推动Source中的Binlog位点前进,这对OceanBase中更新缓慢的表非常有用。对于更新缓慢的表,Binlog位点无法自动前进,通过够心跳事件可以推到Binlog位点前进,可以避免Binlog位点不前进引起Binlog位点过期问题,Binlog位点过期会导致作业失败无法恢复,只能无状态重启。
scan.incremental.snapshot.chunk.key-column
可以指定某一列作为快照阶段切分分片的切分列。
见备注列。
STRING
无
无主键表必填,选择的列必须是非空类型(NOT NULL)。
有主键的表为选填,仅支持从主键中选择一列。
scan.incremental.close-idle-reader.enabled
是否在快照结束后关闭空闲的 Reader。
否
BOOLEAN
false
仅Flink计算引擎VVR 8.0.1及以上版本支持。
该配置生效需要设置execution.checkpointing.checkpoints-after-tasks-finish.enabled为true。
scan.read-changelog-as-append-only.enabled
是否将changelog数据流转换为append-only数据流。
否
BOOLEAN
false
参数取值如下:
true:所有类型的消息(包括INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER)都会转换成INSERT类型的消息。仅在需要保存上游表删除消息等特殊场景下开启使用。
false(默认):所有类型的消息都保持原样下发。
说明仅Flink计算引擎VVR 8.0.8及以上版本支持。
scan.only.deserialize.captured.tables.changelog.enabled
在增量阶段,是否仅对指定表的变更事件进行反序列化。
否
BOOLEAN
VVR 8.x版本中默认值为false。
VVR 11.1及以上版本默认值为true。
参数取值如下:
true:仅对目标表的变更数据进行反序列化,加快Binlog读取速度。
false(默认):对所有表的变更数据进行反序列化。
说明仅Flink计算引擎VVR 8.0.7及以上版本支持。
在Flink计算引擎VVR 8.0.8及以下版本使用时,参数名需要修改为debezium.scan.only.deserialize.captured.tables.changelog.enable。
scan.parse.online.schema.changes.enabled
在增量阶段,是否尝试解析 RDS 无锁变更 DDL 事件。
否
BOOLEAN
false
参数取值如下:
true:解析 RDS 无锁变更 DDL 事件。
false(默认):不解析 RDS 无锁变更 DDL 事件。
实验性功能。建议在执行线上无锁变更前,先对Flink作业执行一次快照以便恢复。
说明仅Flink计算引擎VVR 11.1及以上版本支持。
scan.incremental.snapshot.backfill.skip
是否在快照读取阶段跳过backfill。
否
BOOLEAN
false
参数取值如下:
true:快照读取阶段跳过backfill。
false(默认):快照读取阶段不跳过backfill。
如果跳过backfill,快照阶段表的更改将在稍后的增量阶段读取,而不是合并到快照中。
重要跳过backfill可能导致数据不一致,因为快照阶段发生的变更可能会被重放,仅保证at-least-once语义。
说明仅Flink计算引擎VVR 11.1及以上版本支持。
scan.incremental.snapshot.unbounded-chunk-first.enabled
快照读取阶段是否先分发无界的分片。
否
BOOELEAN
false
参数取值如下:
true:快照读取阶段优先分发无界的分片。
false(默认):快照读取阶段不优先分发无界的分片。
实验性功能。开启后能够降低TaskManager在快照阶段同步最后一个分片时遇到内存溢出 (OOM) 的风险,建议在作业第一次启动前添加。
说明仅Flink计算引擎VVR 11.1及以上版本支持。
维表独有
参数
说明
是否必填
数据类型
默认值
备注
url
JDBC url。
是
STRING
无
url中需要包含MySQL database名或Oracle service名。
userName
用户名。
是
STRING
无
无。
cache
缓存策略。
否
STRING
ALL
支持以下三种缓存策略:
ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。
适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。
LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据。如果没有找到,则去物理维表中查找。使用该缓存策略时,必须配置cacheSize参数。
None:无缓存。
重要使用ALL缓存策略时,请注意节点内存大小,防止出现OOM。
因为系统会异步加载维表数据,所以在使用ALL缓存策略时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。
cacheSize
最大缓存条数。
否
INTEGER
100000
当选择LRU缓存策略后,必须设置缓存大小。
当选择ALL缓存策略后,可以不设置缓存大小。
cacheTTLMs
缓存超时时间。
否
LONG
Long.MAX_VALUE
cacheTTLMs的配置和cache有关,详情如下:
如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。
如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。
如果cache配置为ALL,则cacheTTLMs为缓存加载时间。默认不重新加载。
maxRetryTimeout
最大重试时间。
否
DURATION
60s
无。
结果表独有
参数
说明
是否必填
数据类型
默认值
备注
userName
用户名。
是
STRING
无
无。
compatibleMode
OceanBase的兼容模式。
否
STRING
mysql
参数取值如下:
mysql
oracle
说明oceanabse独有参数。
url
JDBC url。
是
STRING
无
url中需要包含MySQL database名或Oracle service名。
tableName
表名。
是
STRING
无
无。
maxRetryTimes
最大重试次数。
否
INTEGER
3
无。
poolInitialSize
数据库连接池初始大小。
否
INTEGER
1
无。
poolMaxActive
数据库连接池最大连接数。
否
INTEGER
8
无。
poolMaxWait
从数据库连接池中获取连接的最大等待时间。
否
INTEGER
2000
单位毫秒。
poolMinIdle
数据库连接池中最小空闲连接数。
否
INTEGER
1
无。
connectionProperties
jdbc的连接属性。
否
STRING
无
格式为"k1=v1;k2=v2;k3=v3"。
ignoreDelete
是否忽略数据Delete操作。
否
Boolean
false
无。
excludeUpdateColumns
指定要排除的列名。在执行更新操作时,这些列将不会被更新。
否
STRING
无
如果忽略指定的字段为多个时,则需要使用英文逗号(,)分隔。例如
excludeUpdateColumns=column1,column2。说明该值始终会包含主键列,也就是实际生效的列名为您指定的列名和主键列。
partitionKey
分区键。
否
STRING
无
当设置分区键时,连接器会先将数据按照分区键进行分组,各个分组将分别写入数据库。这里的分组处理早于modRule的处理。
modRule
分组规则。
否
STRING
无
分组规则格式需要为"列名mod数字",如
user_id mod 8,列类型必须为数字类型。当设置分组规则时,数据先按partitionKey分区;在每个分区内,再根据 modRule 计算结果分组;
bufferSize
数据缓冲区大小。
否
INTEGER
1000
无。
flushIntervalMs
清空缓存的时间间隔。表示如果缓存中的数据在等待指定时间后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。
否
LONG
1000
无。
retryIntervalMs
最大重试时间。
否
INTEGER
5000
单位毫秒。
类型映射
MySQL兼容模式
OceanBase字段类型
Flink字段类型
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
BIGINT
BIGINT
INT UNSIGNED
BIGINT UNSIGNED
DECIMAL(20, 0)
REAL
FLOAT
FLOAT
DOUBLE
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
说明其中p <= 38。
DECIMAL(p, s)
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
CHAR(n)
CHAR(n)
VARCHAR(n)
VARCHAR(n)
BIT(n)
BINARY(⌈n/8⌉)
BINARY(n)
BINARY(n)
VARBINARY(N)
VARBINARY(N)
TINYTEXT
STRING
TEXT
MEDIUMTEXT
LONGTEXT
TINYBLOB
BYTES
重要Flink仅支持小于等于2,147,483,647(2^31 - 1)的BLOB类型的记录。
BLOB
MEDIUMBLOB
LONGBLOB
Oracle兼容模式
OceanBase字段类型
Flink字段类型
NUMBER(p, s <= 0), p - s < 3
TINYINT
NUMBER(p, s <= 0), p - s < 5
SMALLINT
NUMBER(p, s <= 0), p - s < 10
INT
NUMBER(p, s <= 0), p - s < 19
BIGINT
NUMBER(p, s <= 0), 19 <= p - s <= 38
DECIMAL(p - s, 0)
NUMBER(p, s > 0)
DECIMAL(p, s)
NUMBER(p, s <= 0), p - s > 38
STRING
FLOAT
FLOAT
BINARY_FLOAT
BINARY_DOUBLE
DOUBLE
NUMBER(1)
BOOLEAN
DATE
TIMESTAMP [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)]
CHAR(n)
STRING
NCHAR(n)
NVARCHAR2(n)
VARCHAR(n)
VARCHAR2(n)
CLOB
BLOB
BYTES
ROWID
使用示例
源表&结果表
-- oceanbase cdc 源表 CREATE TEMPORARY TABLE oceanbase_source ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); -- oceanbase结果表 CREATE TEMPORARY TABLE oceanbase_sink ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'url' = '<yourJdbcUrl>', 'userName' = '<yourUserName>', 'password' = '<yourPassword>', 'tableName' = '<yourTableName>' ); BEGIN STATEMENT SET; INSERT INTO oceanbase_sink SELECT * FROM oceanbase_source; END;维表
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE oceanbase_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'url' = '<yourJdbcUrl>', 'userName' = '<yourUserName>', 'password' = '${secret_values.password}', 'tableName' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN oceanbase_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
相关文档
Flink支持的连接器,请参见支持的连接器。