OceanBase(公测中)

本文为您介绍如何使用OceanBase连接器。

背景信息

OceanBase数据库是一款原生分布式的HTAP数据库管理系统,详情请参见OceanBase官网。为了降低您从MySQL数据库或Oracle数据库迁移到OceanBase数据库时引发的业务系统改造成本,OceanBase数据库支持OracleMySQL两种兼容模式,两种模式下的数据类型、SQL功能、内部视图等与MySQL数据库或Oracle数据库保持一致。两种模式下建议使用的连接器如下:

  • Oracle模式:只能使用OceanBase连接器。

  • MySQL模式:与原生MySQL语法保持高度兼容,支持使用OceanBaseMySQL两种连接器读写OceanBase。

    重要
    • OceanBase连接器目前处于公测阶段。在OceanBase 3.2.4.4及以上版本,您可以使用MySQL连接器读写OceanBase,该功能也属于公测范围,请在使用前充分评估并谨慎使用。

    • 在使用MySQL连接器读取OceanBase增量数据时,请确保OceanBase Binlog已开启且被正确设置。有关OceanBase Binlog的更多信息,请参见概述Binlog 相关操作

OceanBase连接器支持的信息如下。

类别

详情

支持类型

源表、维表和结果表

运行模式

流模式和批模式

数据格式

暂不适用

特有监控指标

暂无

API种类

SQL

是否支持更新或删除结果表数据

前提条件

使用限制

  • Flink计算引擎VVR 8.0.1及以上版本支持OceanBase连接器。

  • 语义上可以保证At-Least-Once,在结果表有主键的情况下,幂等可以保证数据的正确性。

语法结构

CREATE TABLE oceanabse_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'oceanbase',
  'url' = '<yourJdbcUrl>',
  'tableName' = '<yourTableName>',
  'userName' = '<yourUserName>',
  'password' = '<yourPassword>'
);
说明

连接器写入结果表原理:写入结果表时,会将接收到的每条数据拼接成一条SQL去执行。具体执行的SQL情况如下:

  • 对于没有主键的结果表,会拼接成INSERT INTO语句。

  • 对于包含主键的结果表,会根据数据库的兼容模式拼接成UPSERT语句。

WITH参数

  • 通用

    参数

    说明

    是否必填

    数据类型

    默认值

    备注

    connector

    表类型。

    STRING

    固定值为oceanbase

    password

    密码。

    STRING

    无。

  • 源表独有

    重要

    注意事项:自Flink计算引擎VVR 11.4.0版本起,OceanBase CDC连接器进行了重大架构升级与功能调整。为确保用户准确理解变更内容并顺利完成版本迁移,现将核心变更说明如下:

    • 原基于OceanBase LogProxy服务实现的 CDC 连接器已正式下线并从发行版本中移除。自 VVR-11.4.0 版本起,OceanBase CDC连接器仅支持通过OceanBase Binlog服务进行增量日志的捕获与数据同步。

    • OceanBase CDC连接器增强了对 OceanBase Binlog 服务的协议兼容性、连接稳定性,建议用户优先使用 OceanBase CDC 连接器。

      OceanBase Binlog服务在协议层完全兼容 MySQL 复制协议,也可使用标准MySQL CDC连接器连接至OceanBase Binlog服务以实现数据订阅,但不作推荐。

    • Flink计算引擎VVR 11.4.0版本起,OceanBase CDC 连接器不再支持在 Oracle 兼容模式下进行增量数据订阅。Oracle 兼容模式下的增量数据订阅请联系 OceanBase 企业技术支持。

    参数

    说明

    是否必填

    数据类型

    默认值

    备注

    hostname

    OceanBase数据库的IP地址或者Hostname。

    STRING

    建议填写专有网络VPC地址。

    说明

    如果OceanBase与实时计算Flink版不在同一VPC,需要先打通跨VPC的网络或者使用公网的形式访问,详情请参见空间管理与操作Flink全托管集群如何访问公网?

    username

    OceanBase数据库服务的用户名。

    STRING

    无。

    database-name

    OceanBase数据库名称。

    STRING

    • 作为源表时,数据库名称支持正则表达式以读取多个数据库的数据。

    • 使用正则表达式时,尽量不要使用^$符号匹配开头和结尾。具体原因详见table-name备注的说明。

    table-name

    OceanBase表明。

    STIRNG

    • 作为源表时,表名支持正则表达式以读取多个表的数据。

    • 使用正则表达式时,尽量不要使用^$符号匹配开头和结尾。具体原因详见以下说明。

    说明

    OceanBase 源表在正则匹配表名时,会将您填写的 database-nametable-name 通过字符串 \\.(VVR 8.0.1前使用字符.)连接成为一个全路径的正则表达式,然后使用该正则表达式和OceanBase数据库中表的全限定名进行正则匹配。

    例如:当配置'database-name'='db_.*'且'table-name'='tb_.+'时,连接器将会使用正则表达式db_.*\\.tb_.+(8.0.1版本前为db_.*.tb_.+)去匹配表的全限定名来确定需要读取的表。

    port

    OceanBase数据库服务的端口号。

    INTEGER

    3306

    无。

    server-id

    数据库客户端的一个数字ID。

    STRING

    默认会随机生成一个5400~6400的值。

    ID必须是全局唯一的。建议针对同一个数据库的每个作业都设置一个不同的ID。

    该参数也支持ID范围的格式,例如5400-5408。在开启增量读取模式时支持多并发读取,此时推荐设定为ID范围,使得每个并发使用不同的ID。详情请参见Server ID使用

    scan.incremental.snapshot.chunk.size

    每个chunk的大小(包含的行数)。

    INTEGER

    8096

    当开启增量快照读取时,表会被切分成多个chunk读取。在读完chunk的数据之前,chunk的数据会先缓存在内存中。

    每个chunk包含的行数越少,则表中的chunk的总数量越大,尽管这会降低故障恢复的粒度,但可能导致内存OOM和整体的吞吐量降低。因此,您需要进行权衡,并设置合理的chunk大小。

    scan.snapshot.fetch.size

    当读取表的全量数据时,每次最多拉取的记录数。

    INTEGER

    1024

    无。

    scan.startup.mode

    消费数据时的启动模式。

    STRING

    initial

    参数取值如下:

    • initial(默认):在第一次启动时,会先扫描历史全量数据,然后读取最新的Binlog数据。

    • latest-offset:在第一次启动时,不会扫描历史全量数据,直接从Binlog的末尾(最新的Binlog处)开始读取,即只读取该连接器启动以后的最新变更。

    • earliest-offset:不扫描历史全量数据,直接从可读取的最早Binlog开始读取。

    • specific-offset:不扫描历史全量数据,从您指定的Binlog位点启动,位点可通过同时配置scan.startup.specific-offset.filescan.startup.specific-offset.pos参数来指定从特定Binlog文件名和偏移量启动,也可以只配置scan.startup.specific-offset.gtid-set来指定从某个GTID集合启动。

    • timestamp:不扫描历史全量数据,从指定的时间戳开始读取Binlog。时间戳通过scan.startup.timestamp-millis指定,单位为毫秒。

    重要

    使用earliest-offsetspecific-offsettimestamp启动模式时,确保在指定的Binlog消费位置到作业启动的时间之间,对应表的结构不发生变化,避免因表结构不同而报错。

    scan.startup.specific-offset.file

    使用指定位点模式启动时,启动位点的Binlog文件名。

    STRING

    使用该配置时,scan.startup.mode必须配置为specific-offset。文件名格式例如mysql-bin.000003

    scan.startup.specific-offset.pos

    使用指定位点模式启动时,启动位点在指定Binlog文件中的偏移量。

    INTEGER

    使用该配置时,scan.startup.mode必须配置为specific-offset

    scan.startup.specific-offset.gtid-set

    使用指定位点模式启动时,启动位点的GTID集合。

    STRING

    使用该配置时,scan.startup.mode必须配置为specific-offset。GTID集合格式例如24DA167-0C0C-11E8-8442-00059A3C7B00:1-19

    scan.startup.timestamp-millis

    使用指定时间模式启动时,启动位点的毫秒时间戳。

    LONG

    使用该配置时,scan.startup.mode必须配置为timestamp。时间戳单位为毫秒。

    重要

    在使用指定时间时,OceanBase CDC会尝试读取每个Binlog文件的初始事件以确定其时间戳,最终定位至指定时间对应的Binlog文件。请保证指定的时间戳对应的Binlog文件在数据库上没有被清理且可以被读取到。

    server-time-zone

    数据库在使用的会话时区。

    STRING

    如果您没有指定该参数,则系统默认使用Flink作业运行时的环境时区作为数据库服务器时区,即您选择的可用区所在的时区。

    例如Asia/Shanghai,该参数控制了TIMESTAMP类型如何转成STRING类型。更多信息请参见Debezium时间类型

    debezium.min.row.count.to.stream.results

    当表的条数大于该值时,会使用分批读取模式。

    INTEGER

    1000

    Flink采用以下方式读取OceanBase源表数据:

    • 全量读取:直接将整个表的数据读取到内存里。优点是速度快,缺点是会消耗对应大小的内存,如果源表数据量非常大,可能会有OOM风险。

    • 分批读取:分多次读取,每次读取一定数量的行数,直到读取完所有数据。优点是读取数据量比较大的表没有OOM风险,缺点是读取速度相对较慢。

    connect.timeout

    连接OceanBase数据库服务器超时时,重试连接之前等待超时的最长时间。

    DURATION

    30s

    无。

    connect.max-retries

    连接OceanBase数据库服务时,连接失败后重试的最大次数。

    INTEGER

    3

    无。

    connection.pool.size

    数据库连接池大小。

    INTEGER

    20

    数据库连接池用于复用连接,可以降低数据库连接数量。

    jdbc.properties.*

    JDBC URL中的自定义连接参数。

    STRING

    您可以传递自定义的连接参数,例如不使用SSL协议,则可配置为'jdbc.properties.useSSL' = 'false'

    支持的连接参数请参见MySQL Configuration Properties

    debezium.*

    Debezium读取Binlog的自定义参数。

    STRING

    您可以传递自定义的Debezium参数,例如使用'debezium.event.deserialization.failure.handling.mode'='ignore'来指定解析错误时的处理逻辑。

    heartbeat.interval

    Source通过心跳事件推动Binlog位点前进的时间间隔。

    DURATION

    30s

    心跳事件用于推动Source中的Binlog位点前进,这对OceanBase中更新缓慢的表非常有用。对于更新缓慢的表,Binlog位点无法自动前进,通过够心跳事件可以推到Binlog位点前进,可以避免Binlog位点不前进引起Binlog位点过期问题,Binlog位点过期会导致作业失败无法恢复,只能无状态重启。

    scan.incremental.snapshot.chunk.key-column

    可以指定某一列作为快照阶段切分分片的切分列。

    见备注列。

    STRING

    • 无主键表必填,选择的列必须是非空类型(NOT NULL)。

    • 有主键的表为选填,仅支持从主键中选择一列。

    scan.incremental.close-idle-reader.enabled

    是否在快照结束后关闭空闲的 Reader。

    BOOLEAN

    false

    • Flink计算引擎VVR 8.0.1及以上版本支持。

    • 该配置生效需要设置execution.checkpointing.checkpoints-after-tasks-finish.enabledtrue。

    scan.read-changelog-as-append-only.enabled

    是否将changelog数据流转换为append-only数据流。

    BOOLEAN

    false

    参数取值如下:

    • true:所有类型的消息(包括INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER)都会转换成INSERT类型的消息。仅在需要保存上游表删除消息等特殊场景下开启使用。

    • false(默认):所有类型的消息都保持原样下发。

    说明

    Flink计算引擎VVR 8.0.8及以上版本支持。

    scan.only.deserialize.captured.tables.changelog.enabled

    在增量阶段,是否仅对指定表的变更事件进行反序列化。

    BOOLEAN

    • VVR 8.x版本中默认值为false。

    • VVR 11.1及以上版本默认值为true。

    参数取值如下:

    • true:仅对目标表的变更数据进行反序列化,加快Binlog读取速度。

    • false(默认):对所有表的变更数据进行反序列化。

    说明
    • Flink计算引擎VVR 8.0.7及以上版本支持。

    • Flink计算引擎VVR 8.0.8及以下版本使用时,参数名需要修改为debezium.scan.only.deserialize.captured.tables.changelog.enable

    scan.parse.online.schema.changes.enabled

    在增量阶段,是否尝试解析 RDS 无锁变更 DDL 事件。

    BOOLEAN

    false

    参数取值如下:

    • true:解析 RDS 无锁变更 DDL 事件。

    • false(默认):不解析 RDS 无锁变更 DDL 事件。

    实验性功能。建议在执行线上无锁变更前,先对Flink作业执行一次快照以便恢复。

    说明

    Flink计算引擎VVR 11.1及以上版本支持。

    scan.incremental.snapshot.backfill.skip

    是否在快照读取阶段跳过backfill。

    BOOLEAN

    false

    参数取值如下:

    • true:快照读取阶段跳过backfill。

    • false(默认):快照读取阶段不跳过backfill。

    如果跳过backfill,快照阶段表的更改将在稍后的增量阶段读取,而不是合并到快照中。

    重要

    跳过backfill可能导致数据不一致,因为快照阶段发生的变更可能会被重放,仅保证at-least-once语义。

    说明

    Flink计算引擎VVR 11.1及以上版本支持。

    scan.incremental.snapshot.unbounded-chunk-first.enabled

    快照读取阶段是否先分发无界的分片。

    BOOELEAN

    false

    参数取值如下:

    • true:快照读取阶段优先分发无界的分片。

    • false(默认):快照读取阶段不优先分发无界的分片。

    实验性功能。开启后能够降低TaskManager在快照阶段同步最后一个分片时遇到内存溢出 (OOM) 的风险,建议在作业第一次启动前添加。

    说明

    Flink计算引擎VVR 11.1及以上版本支持。

  • 维表独有

    参数

    说明

    是否必填

    数据类型

    默认值

    备注

    url

    JDBC url。

    STRING

    • url中需要包含MySQL database名或Oracle service名。

    userName

    用户名。

    STRING

    无。

    cache

    缓存策略。

    STRING

    ALL

    支持以下三种缓存策略:

    • ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。

      适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。

    • LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据。如果没有找到,则去物理维表中查找。使用该缓存策略时,必须配置cacheSize参数。

    • None:无缓存。

    重要
    • 使用ALL缓存策略时,请注意节点内存大小,防止出现OOM。

    • 因为系统会异步加载维表数据,所以在使用ALL缓存策略时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。

    cacheSize

    最大缓存条数。

    INTEGER

    100000

    • 当选择LRU缓存策略后,必须设置缓存大小。

    • 当选择ALL缓存策略后,可以不设置缓存大小。

    cacheTTLMs

    缓存超时时间。

    LONG

    Long.MAX_VALUE

    cacheTTLMs的配置和cache有关,详情如下:

    • 如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。

    • 如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。

    • 如果cache配置为ALL,则cacheTTLMs为缓存加载时间。默认不重新加载。

    maxRetryTimeout

    最大重试时间。

    DURATION

    60s

    无。

  • 结果表独有

    参数

    说明

    是否必填

    数据类型

    默认值

    备注

    userName

    用户名。

    STRING

    无。

    compatibleMode

    OceanBase的兼容模式。

    STRING

    mysql

    参数取值如下:

    • mysql

    • oracle

    说明

    oceanabse独有参数。

    url

    JDBC url。

    STRING

    • url中需要包含MySQL database名或Oracle service名。

    tableName

    表名。

    STRING

    无。

    maxRetryTimes

    最大重试次数。

    INTEGER

    3

    无。

    poolInitialSize

    数据库连接池初始大小。

    INTEGER

    1

    无。

    poolMaxActive

    数据库连接池最大连接数。

    INTEGER

    8

    无。

    poolMaxWait

    从数据库连接池中获取连接的最大等待时间。

    INTEGER

    2000

    单位毫秒。

    poolMinIdle

    数据库连接池中最小空闲连接数。

    INTEGER

    1

    无。

    connectionProperties

    jdbc的连接属性。

    STRING

    格式为"k1=v1;k2=v2;k3=v3"。

    ignoreDelete

    是否忽略数据Delete操作。

    Boolean

    false

    无。

    excludeUpdateColumns

    指定要排除的列名。在执行更新操作时,这些列将不会被更新。

    STRING

    如果忽略指定的字段为多个时,则需要使用英文逗号(,)分隔。例如excludeUpdateColumns=column1,column2

    说明

    该值始终会包含主键列,也就是实际生效的列名为您指定的列名和主键列。

    partitionKey

    分区键。

    STRING

    当设置分区键时,连接器会先将数据按照分区键进行分组,各个分组将分别写入数据库。这里的分组处理早于modRule的处理。

    modRule

    分组规则。

    STRING

    分组规则格式需要为"列名mod数字",如user_id mod 8,列类型必须为数字类型。

    当设置分组规则时,数据先按partitionKey分区;在每个分区内,再根据 modRule 计算结果分组;

    bufferSize

    数据缓冲区大小。

    INTEGER

    1000

    无。

    flushIntervalMs

    清空缓存的时间间隔。表示如果缓存中的数据在等待指定时间后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。

    LONG

    1000

    无。

    retryIntervalMs

    最大重试时间。

    INTEGER

    5000

    单位毫秒。

类型映射

  • MySQL兼容模式

    OceanBase字段类型

    Flink字段类型

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    BIGINT

    BIGINT

    INT UNSIGNED

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    REAL

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    NUMERIC(p, s)

    DECIMAL(p, s)

    说明

    其中p <= 38。

    DECIMAL(p, s)

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    CHAR(n)

    VARCHAR(n)

    VARCHAR(n)

    BIT(n)

    BINARY(⌈n/8⌉)

    BINARY(n)

    BINARY(n)

    VARBINARY(N)

    VARBINARY(N)

    TINYTEXT

    STRING

    TEXT

    MEDIUMTEXT

    LONGTEXT

    TINYBLOB

    BYTES

    重要

    Flink仅支持小于等于2,147,483,647(2^31 - 1)的BLOB类型的记录。

    BLOB

    MEDIUMBLOB

    LONGBLOB

  • Oracle兼容模式

    OceanBase字段类型

    Flink字段类型

    NUMBER(p, s <= 0), p - s < 3

    TINYINT

    NUMBER(p, s <= 0), p - s < 5

    SMALLINT

    NUMBER(p, s <= 0), p - s < 10

    INT

    NUMBER(p, s <= 0), p - s < 19

    BIGINT

    NUMBER(p, s <= 0), 19 <= p - s <= 38

    DECIMAL(p - s, 0)

    NUMBER(p, s > 0)

    DECIMAL(p, s)

    NUMBER(p, s <= 0), p - s > 38

    STRING

    FLOAT

    FLOAT

    BINARY_FLOAT

    BINARY_DOUBLE

    DOUBLE

    NUMBER(1)

    BOOLEAN

    DATE

    TIMESTAMP [(p)] [WITHOUT TIMEZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    STRING

    NCHAR(n)

    NVARCHAR2(n)

    VARCHAR(n)

    VARCHAR2(n)

    CLOB

    BLOB

    BYTES

    ROWID

使用示例

  • 源表&结果表

    -- oceanbase cdc 源表
    CREATE TEMPORARY TABLE oceanbase_source (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    -- oceanbase结果表
    CREATE TEMPORARY TABLE oceanbase_sink (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'url' = '<yourJdbcUrl>',
      'userName' = '<yourUserName>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTableName>'
    );
    
    
    BEGIN STATEMENT SET;  
    INSERT INTO oceanbase_sink
    SELECT * FROM oceanbase_source;
    END; 

  • 维表

    CREATE TEMPORARY TABLE datagen_source(
      a INT,
      b BIGINT,
      c STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE oceanbase_dim (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'url' = '<yourJdbcUrl>',
      'userName' = '<yourUserName>',
      'password' = '${secret_values.password}',
      'tableName' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      b STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b
    FROM datagen_source AS T 
    JOIN oceanbase_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H 
    ON T.a = H.a;

相关文档

Flink支持的连接器,请参见支持的连接器