本文为您介绍如何使用OceanBase连接器。
背景信息
OceanBase数据库是一款原生分布式的HTAP数据库管理系统,详情请参见OceanBase官网。为了降低您从MySQL数据库或Oracle数据库迁移到OceanBase数据库时引发的业务系统改造成本,OceanBase数据库支持Oracle和MySQL两种兼容模式,两种模式下的数据类型、SQL功能、内部视图等与MySQL数据库或Oracle数据库保持一致。两种模式下建议使用的连接器如下:
- Oracle模式:只能使用OceanBase连接器。 
- MySQL模式:与原生MySQL语法保持高度兼容,支持使用OceanBase和MySQL两种连接器读写OceanBase。 重要- OceanBase连接器目前处于公测阶段。在OceanBase 3.2.4.4及以上版本,您可以使用MySQL连接器读写OceanBase,该功能也属于公测范围,请在使用前充分评估并谨慎使用。 
- 在使用MySQL连接器读取OceanBase增量数据时,请确保OceanBase Binlog已开启且被正确设置。有关OceanBase Binlog的更多信息,请参见概述或Binlog 相关操作。 
 
OceanBase连接器支持的信息如下。
| 类别 | 详情 | 
| 支持类型 | 源表、维表和结果表 | 
| 运行模式 | 流模式和批模式 | 
| 数据格式 | 暂不适用 | 
| 特有监控指标 | 暂无 | 
| API种类 | SQL | 
| 是否支持更新或删除结果表数据 | 是 | 
前提条件
连接的数据库和表都已被创建。具体操作可参考以下文档:
- MySQL模式 
- Oracle模式 
使用限制
- 维表和结果表 - Flink计算引擎VVR 8.0.1及以上版本支持OceanBase连接器。 
- 语义上可以保证At-Least-Once,在结果表有主键的情况下,幂等可以保证数据的正确性。 
 
- 结果表:OceanBase数据库没有部署数据库代理服务时,连接器会使用OCJ(OceanBase Connector Java)连接OceanBase数据库,该模式需要用到config url,要求OceanBase数据库已部署OceanBase云平台。该工作方式只能用于OceanBase数据库的MySQL兼容模式,不支持Oracle兼容模式。 说明- 数据库代理服务与OCJ实现了相同的路由功能,区别在于OCJ驱动集成于Java应用程序,而数据库代理是一个独立的代理服务。目前,OceanBase团队推荐通过数据库代理来连接OceanBase集群,OCJ驱动主要用于兼容一些历史集群和应用程序。 
语法结构
CREATE TABLE oceanabse_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'oceanbase',
  'url' = '<yourJdbcUrl>',
  'tableName' = '<yourTableName>',
  'userName' = '<yourUserName>',
  'password' = '<yourPassword>'
);连接器写入结果表原理:写入结果表时,会将接收到的每条数据拼接成一条SQL去执行。具体执行的SQL情况如下:
- 对于没有主键的结果表,会拼接成INSERT INTO语句。 
- 对于包含主键的结果表,会根据数据库的兼容模式拼接成UPSERT语句。 
WITH参数
- 通用 - 参数 - 说明 - 是否必填 - 数据类型 - 默认值 - 备注 - connector - 表类型。 - 是 - STRING - 无 - 作为CDC源表或维表时,固定值为 - oceanbase。
- 作为结果表时,参数取值如下: - 如果使用了OceanBase数据库代理,则表类型取值为 - oceanbase,
- 如果直连OceanBase集群,则表类型取值为 - oceanbase-ocj。
 
 - userName - 用户名。 - 是 - STRING - 无 - 无。 - password - 密码。 - 是 - STRING - 无 - 无。 
- 源表独有 说明- 连接器支持通过数据库名称(database-name)和表名(table-name)的正则匹配和表列表(table-list)的精确匹配两种模式来指定需要监听的表。当同时使用两种方式时,将会监听两种方式匹配的所有表。 - 参数 - 说明 - 是否必填 - 数据类型 - 默认值 - 备注 - logproxy.host - OceanBase日志代理服务器的IP地址或主机名。 - 是 - String - 无 - 无。 - logproxy.port - OceanBase日志代理服务器的端口号。 - 是 - Integer - 无 - 无。 - scan.startup.mode - OceanBase CDC的启动模式。 - 是 - String - 无 - 参数取值如下: - initial:从初始位点开始拉取全部数据。 
- latest-offset:从当前位点开始拉取变更数据。 
- timestamp:从指定的时间戳开始拉取变更数据。 
 - tenant-name - OceanBase数据库的租户名。 - 是 - String - 无 - 无。 - database-name - OceanBase数据库名称。 - 否 - String - 无 - 支持使用正则表达式指定数据库名称。 说明- 仅支持在启动模式为initial时,使用该参数。 - table-name - OceanBase数据库的表名称。 - 否 - String - 无 - 支持使用正则表达式指定表名称。 说明- 仅支持在启动模式为initial时,使用该参数。 - table-list - OceanBase数据库的全路径的表名列表。 - 否 - String - 无 - 可以使用英文逗号(,)分隔,例如 - db1.table1, db2.table2。- hostname - OceanBase数据库或 OceanBase代理的IP地址或主机名。 - 否 - String - 无 - 无。 - port - OceanBase数据库服务器的端口号。 - 否 - Integer - 无 - 可以是OceanBase服务器的SQL端口号(默认值为2881) - 或OceanBase代理服务器的端口号(默认值为2883)。 - connect.timeout - 连接到OceanBase数据库服务器之前的最长超时时间。 - 否 - Duration - 30s - 无。 - server-time-zone - 数据库服务器中的会话时区。 - 否 - String - +00:00 - 会话时区值的合法格式为 - ±hh:mm,表示与协调世界时(UTC)的时区偏移量。说明- 会话时区的设置会影响到时间类型的显示和存储方式。因此,如果您需要控制OceanBase的时间类型如何转换为字符串,则需要设置合理的会话时区信息,以确保显示正确的本地时间。 
- 如果您在MySQL数据库中已存在一个用于存储时区信息的表,则在设置时区时,可以使用这个表中已经创建的时区作为合法的值。 
 - logproxy.client.id - OceanBase日志代理服务器的客户端连接ID。 - 否 - String - 规则生成 - 如果您没有指定,则Flink会默认按照 - {flink_ip}_{process_id}_{timestamp}_{thread_id}_{tenant}规则生成。- rootserver-list - OceanBase根服务器列表。 - 否 - String - 无 - 服务器列表格式为 - ip:rpc_port:sql_port。您可以执行- SHOW PARAMETERS LIKE 'rootservice_list';SQL语句获取服务器列表信息。说明- OceanBase社区版本必填。 
- 多个服务器地址使用英文分号(;)分隔。 
 - config-url - 从配置服务器获取服务器信息的url。 - 否 - String - 无 - OceanBase企业版本必填。 - working-mode - 日志代理中libobcdc的工作模式。 - 否 - String - storage - 参数取值如下: - storage:表示数据将被存储在磁盘或其他持久性存储介质中。 
- memory:表示数据将被存储在内存中。 
 - compatible-mode - OceanBase的兼容模式。 - 否 - String - mysql - 参数取值如下: - mysql 
- oracle 
 - jdbc.driver - 全量读取源表数据时使用的jdbc驱动类名。 - 否 - String - com.mysql.jdbc.Driver - 无。 - jdbc.properties.* - 传递自定义的JDBC URL属性。 - 否 - String - 无 - 例如 - 'jdbc.properties.useSSL' = 'false'表示不使用SSL加密。- obcdc.properties.* - 将自定义的 OBCDC参数传递给libobcdc。 - 否 - String - 无 - 例如 - 'obcdc.properties.sort_trans_participants' = '1'。- 更多参数信息见OBCDC配置项说明。 
- 维表独有 - 参数 - 说明 - 是否必填 - 数据类型 - 默认值 - 备注 - url - JDBC url或config url。 - 是 - STRING - 无 - 当连接器类型为 - oceanbase时使用JDBC url,连接器类型为- oceanbase-ocj时,使用config url。
- url中需要包含MySQL database名或Oracle service名。 
 - cache - 缓存策略。 - 否 - STRING - ALL - 支持以下三种缓存策略: - ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。 - 适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。 
- LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据。如果没有找到,则去物理维表中查找。使用该缓存策略时,必须配置cacheSize参数。 
- None:无缓存。 
 重要- 使用ALL缓存策略时,请注意节点内存大小,防止出现OOM。 
- 因为系统会异步加载维表数据,所以在使用ALL缓存策略时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。 
 - cacheSize - 最大缓存条数。 - 否 - INTEGER - 100000 - 当选择LRU缓存策略后,必须设置缓存大小。 
- 当选择ALL缓存策略后,可以不设置缓存大小。 
 - cacheTTLMs - 缓存超时时间。 - 否 - LONG - Long.MAX_VALUE - cacheTTLMs的配置和cache有关,详情如下: - 如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。 
- 如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。 
- 如果cache配置为ALL,则cacheTTLMs为缓存加载时间。默认不重新加载。 
 - maxRetryTimeout - 最大重试时间。 - 否 - DURATION - 60s - 无。 
- 结果表独有 - 参数 - 说明 - 是否必填 - 数据类型 - 默认值 - 备注 - compatibleMode - OceanBase的兼容模式。 - 否 - STRING - mysql - 参数取值如下: - mysql 
- oracle 
 说明- oceanabse独有参数。 - databaseName - 数据库名。 - 是 - STRING - 无 - 应当与config url中保持一致。 说明- oceanbase-ocj独有参数。 - passwordEncrypted - 是否使用加密过的密码。 - 否 - Boolean - false - oceanbase-ocj独有参数。 - slowQueryThresholdMs - 慢查询等待阈值。 - 否 - INTEGER - 60000 - 单位毫秒。 说明- oceanbase-ocj独有参数。 - url - JDBC url或config url。 - 是 - STRING - 无 - 当连接器类型为 - oceanbase时使用JDBC url,连接器类型为- oceanbase-ocj时,使用config url。
- url中需要包含MySQL database名或Oracle service名。 
 - tableName - 表名。 - 是 - STRING - 无 - 无。 - maxRetryTimes - 最大重试次数。 - 否 - INTEGER - 3 - 无。 - poolInitialSize - 数据库连接池初始大小。 - 否 - INTEGER - 1 - 无。 - poolMaxActive - 数据库连接池最大连接数。 - 否 - INTEGER - 8 - 无。 - poolMaxWait - 从数据库连接池中获取连接的最大等待时间。 - 否 - INTEGER - 2000 - 单位毫秒。 - poolMinIdle - 数据库连接池中最小空闲连接数。 - 否 - INTEGER - 1 - 无。 - connectionProperties - jdbc的连接属性。 - 否 - STRING - 无 - 格式为"k1=v1;k2=v2;k3=v3"。 - ignoreDelete - 是否忽略数据Delete操作。 - 否 - Boolean - false - 无。 - excludeUpdateColumns - 指定要排除的列名。在执行更新操作时,这些列将不会被更新。 - 否 - STRING - 无 - 如果忽略指定的字段为多个时,则需要使用英文逗号(,)分隔。例如 - excludeUpdateColumns=column1,column2。说明- 该值始终会包含主键列,也就是实际生效的列名为您指定的列名和主键列。 - partitionKey - 分区键。 - 否 - STRING - 无 - 当设置分区键时,连接器会先将数据按照分区键进行分组,各个分组将分别写入数据库。这里的分组处理早于modRule的处理。 - modRule - 分组规则。 - 否 - STRING - 无 - 分组规则格式需要为"列名mod数字",列类型必须为数字类型。当设置分组规则时,数据会根据计算所得结果进行分组,各个分组将分别写入数据库。这里的分组处理晚于partitionKey的处理。 - bufferSize - 数据缓冲区大小。 - 否 - INTEGER - 1000 - 无。 - flushIntervalMs - 清空缓存的时间间隔。表示如果缓存中的数据在等待指定时间后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。 - 否 - LONG - 1000 - 无。 - retryIntervalMs - 最大重试时间。 - 否 - INTEGER - 5000 - 单位毫秒。 
类型映射
- MySQL兼容模式 - OceanBase字段类型 - Flink字段类型 - TINYINT - TINYINT - SMALLINT - SMALLINT - TINYINT UNSIGNED - INT - INT - MEDIUMINT - SMALLINT UNSIGNED - BIGINT - BIGINT - INT UNSIGNED - BIGINT UNSIGNED - DECIMAL(20, 0) - REAL - FLOAT - FLOAT - DOUBLE - DOUBLE - NUMERIC(p, s) - DECIMAL(p, s) 说明- 其中p <= 38。 - DECIMAL(p, s) - BOOLEAN - BOOLEAN - TINYINT(1) - DATE - DATE - TIME [(p)] - TIME [(p)] [WITHOUT TIME ZONE] - DATETIME [(p)] - TIMESTAMP [(p)] [WITHOUT TIME ZONE] - TIMESTAMP [(p)] - CHAR(n) - CHAR(n) - VARCHAR(n) - VARCHAR(n) - BIT(n) - BINARY(⌈n/8⌉) - BINARY(n) - BINARY(n) - VARBINARY(N) - VARBINARY(N) - TINYTEXT - STRING - TEXT - MEDIUMTEXT - LONGTEXT - TINYBLOB - BYTES 重要- Flink仅支持小于等于2,147,483,647(2^31 - 1)的BLOB类型的记录。 - BLOB - MEDIUMBLOB - LONGBLOB 
- Oracle兼容模式 - OceanBase字段类型 - Flink字段类型 - NUMBER(p, s <= 0), p - s < 3 - TINYINT - NUMBER(p, s <= 0), p - s < 5 - SMALLINT - NUMBER(p, s <= 0), p - s < 10 - INT - NUMBER(p, s <= 0), p - s < 19 - BIGINT - NUMBER(p, s <= 0), 19 <= p - s <= 38 - DECIMAL(p - s, 0) - NUMBER(p, s > 0) - DECIMAL(p, s) - NUMBER(p, s <= 0), p - s > 38 - STRING - FLOAT - FLOAT - BINARY_FLOAT - BINARY_DOUBLE - DOUBLE - NUMBER(1) - BOOLEAN - DATE - TIMESTAMP [(p)] [WITHOUT TIMEZONE] - TIMESTAMP [(p)] - CHAR(n) - STRING - NCHAR(n) - NVARCHAR2(n) - VARCHAR(n) - VARCHAR2(n) - CLOB - BLOB - BYTES - ROWID 
使用示例
- 源表&结果表 - CREATE TEMPORARY TABLE oceanbase_source ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'scan.startup.mode' = 'initial', 'username' = 'user', 'password' = 'password', 'tenant-name' = 'tenant', 'database-name' = '^test_db$', 'table-name' = '^orders$', 'hostname' = '11.22.xx.xx', 'port' = '2883', 'config-url' = 'http://11.22.xx.xx:55/services?Action=ObRootServiceInfo&User_ID=xxx&UID=xxx&ObRegion=xxx', 'logproxy.host' = '11.22.xx.xx', 'logproxy.port' = '2983', 'working-mode' = 'memory' ); -- oceanbase结果表 CREATE TEMPORARY TABLE oceanbase_sink ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'url' = '<yourJdbcUrl>', 'userName' = '<yourUserName>', 'password' = '<yourPassword>', 'tableName' = '<yourTableName>' ); --oceanbase-ocj结果表 CREATE TEMPORARY TABLE oceanbase_ocj_sink ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase-ocj', 'url' = '<yourConfigUrl>', 'userName' = '<yourUserName>', 'password' = '${secret_values.password}', 'databaseName' = '<yourDatabaseName>', 'tableName' = '<yourTableName>' ); BEGIN STATEMENT SET; INSERT INTO oceanbase_sink SELECT * FROM oceanbase_source; INSERT INTO oceanbase_ocj_sink SELECT * FROM oceanbase_source; END;
- 维表 - CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE oceanbase_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'url' = '<yourJdbcUrl>', 'userName' = '<yourUserName>', 'password' = '${secret_values.password}', 'tableName' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN oceanbase_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
相关文档
Flink支持的连接器,请参见支持的连接器。