本文为您介绍Dataphin中SAP HAHA Connector的语法结构、WITH参数和使用示例等。
背景信息
SAP HANA是一种多模型数据库,它将数据存储在内存中,而不是将其保存在磁盘上。面向列的内存数据库设计允许您在单个系统中运行高级分析和高速事务。更多信息,请参见SAP HANA官网。
支持的版本
Flink版本 | SAP HANA版本 | SAP HANA Driver版本 |
1.15.3 | 不限制 | 2.19.16 |
SAP HANA Connector没有使用特别的JDBC Driver特性。所以,如果Driver能连通访问SAP HANA数据库,基本都可以运行。
使用限制
全量读取和增量读取都是单并发,在大数据量下,全量读取会有性能瓶颈,建议通过其他方案解决。例如,离线方案。
仅支持单表的数据采集,不支持多表、整库。
系统按照固定频率
query
,定时请求时间需要自行配置,如果配置的时间较长,将会导致实时性降低;如果配置的时间很短,将会增加数据库查询压力。系统采集到的数据都是
INSERT
类型的,如果想获取类似CDC
的数据,需要通过其他技术方案解决。例如,通过基于主键的GROUP BY
生成CDC
数据。来源表上必须配置
update_time
字段(记录更新时间),并且每次记录更新必须要更新这个字段值(系统需要依据这个字段判断记录是否更新过)。系统按照
update_time
时间位点获取更新数据。如果update_time
的值比实际写入库的时间小,那么系统可能会遗漏这条数据。(系统默认有5秒的等待)。数据中的数据的
update_time
是有序发生的。A数据的update_time
小于B数据,那么A数据总是先于B数据对查询可见。否则可能导致数据丢失。在数据量大的表上(>10000条),会有比较大的性能问题。建议数据量大的表在
update_time
上建索引。Flink SQL任务读取到的数据的变更类型默认为
INSERT
类型,如果需要处理成ChangeLog
数据,您需要自行处理(按主键聚合或按更新时间排序取最新一条数据),这种情况下如果更新主键,数据可能会出错,请谨慎操作。不支持采集
Delete
的数据。如果您必须要删除,并且采集到下游,可以配置删除字段(加一个标记该条数据是否已删除)做软删除。
性能参考
测试环境下,使用1CU的Yarn资源(1cpu 4G Memoery,单并发),能够产生3000RPS的消费速率。
实际的任务消费速率和具体的任务逻辑、输入输出的网络带宽、字段的大小等有关系。该参数值仅供参考。
Connector相关说明
Connector支持的类别详情如下。
类别 | 详情 |
支持类型 | 源表 |
原生DDL访问 | 支持 |
数据源编码方式访问 | 不支持 |
元表 | 支持 |
运行模式 | 流模式 |
数据格式 | 暂不适用 |
特有监控指标 | 暂无 |
API种类 | SQL |
语法结构
CREATE TABLE input_table (
id BIGINT
name STRING,
update_time TIMESTAMP(3)
) WITH (
'connector' = 'scan-jdbc',
'driver' = 'com.sap.db.jdbc.Driver’,
'url' = '<JDBC URL>',
'table-name' = '<yourTableName>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'scan-period.seconds' = '<PeriodSeconds>',
'scan.start.time' = '<StartTime>',
'scan.table.time.column' = '<UpdateTime>'
);
WITH参数
通用
参数
说明
数据类型
是否必填
默认值
备注
connector
表类型。
String
是
无
固定值为
scan-jdbc
。driver
驱动。
String
是
无
固定值为
com.sap.db.jdbc.Driver
。url
URL地址。
String
是
无
格式为
jdbc:sap://host:port/dbname
。table-name
表名。
String
是
无
读取的表名称。
username
用户名。
String
是
无
填写访问SAP HAHA的用户名。
password
密码。
String
是
无
填写访问SAP HAHA的密码。
源表
参数
说明
数据类型
是否必填
默认值
备注
scan.period.seconds
扫描的时间周期。
Integer
否
10
单位秒。
scan.start.time
读取更新的起始时间。
Date
否
当前时间
格式为
yyyy-MM-dd hh-mm-ss
格式;或者带时间区的yyyy-MM-dd hh-mm-ss Z
的格式。说明可配置为earliest,表示全量读取后,再增量读取。
参数示例如下:
earliest
2024-03-01 23:55:55
2024-03-01 23:55:55 +0800
scan.table.time.column
更新时间列。
Timestamp
是
无
指定表中那个列作为更新时间列,用于判断改行数据是否更新。必须为
timestamp
类型。scan.delay.seconds
延迟查询。
Integer
否
5
指延迟多少秒查询数据库。比如要查询
update_time<'2024-03-08 12:00:00'
的数据,会在2024-03-08 12:00:05
的时候触发查询。可以避免数据库时间偏差和数据延迟。
类型映射
SAP HAHA类型 | Flink SQL类型 |
DATE | DATE |
TIME | TIME |
SECONDDTE | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
SMALLDECIMAL | 不支持 |
DECIMAL | DECIMAL |
REAL | FLOAT |
DOUBLE | DOUBLE |
FLOAT(n) | DOUBLE |
BOOLEAN | BOOLEAN |
VARCHAR | STRING |
NVARCHAR | STRING |
ALPHANUM | STRING |
SHORTTEXT | STRING |
VARBINARY | VARBINARY |
BLOB | 不支持 |
CLOB | 不支持 |
NCLOB | 不支持 |
TEXT | STRING |
ARRAY | 不支持 |
ST_GEOMETRY | 不支持 |
ST_POINT | 不支持 |
使用示例
源表
CREATE TABLE input_table ( id BIGINT name STRING, update_time TIMESTAMP(3) ) WITH ( 'connector' = 'scan-jdbc', 'driver' = 'com.sap.db.jdbc.Driver’, 'url' = jdbc:sap://127.***.***.1:39041?databaseName=HXE', 'table-name' = 'table_id_name_time', 'username' = 'root', 'password' = 'root', 'scan-period.seconds' = '20', 'scan.start.time' = '2024-02-20 18:30:00', 'scan.table.time.column' = 'update_time' );
时区设置
如果SAP HANA数据库的时区和Flink任务的时区不同,您需要进行时区的设置。否则将导致读取不到变化数据,或者重复读取数据。SAP HANA的JDBC本身无法设置时区参数,默认读取的是JVM的时区配置 (user.timezone),您可以在Flink SQL任务的Flink Config参数中增加如下参数:
设置JVM的时区后,会影响整个Fink任务的时间处理,请谨慎设置。
env.java.opts: -Duser.timezone=xxx
-- 如果设置utc时间:注意UTC必须大写
env.java. opts: -Duser.timezone=UTC
-- 如果设置北京时间:
env.java.opts: -Duser.timezone=GMT+8