本文为您介绍Dataphin中SAP HAHA Connector的语法结构、WITH参数和使用示例等。
背景信息
SAP HANA是一种多模型数据库,它将数据存储在内存中,而不是将其保存在磁盘上。面向列的内存数据库设计允许您在单个系统中运行高级分析和高速事务。更多信息,请参见SAP HANA官网。
使用限制
仅支持单表的数据采集,不支持多表、整库。
系统按照固定频率query,定时请求时间需要自行配置,如果配置的时间较长,将会导致实时性降低;如果配置的时间很短,将会增加数据库查询压力。
来源表上必须配置
update_time
字段(记录更新时间),并且每次记录更新必须要更新这个字段值(系统需要依据这个字段判断记录是否更新过)。在数据量大的表上(>10000条),会有比较大的性能问题。建议数据量大的表在
update_time
上建索引。Flink SQL任务读取到的数据的变更类型默认为
Insert
类型,如果需要处理成ChangeLog
数据,您需要自行处理(按主键聚合或按更新时间排序取最新一条数据),这种情况下如果更新主键,数据可能会出错,请谨慎操作。不支持采集
Delete
的数据。如果您必须要删除,并且采集到下游,可以配置删除字段(加一个标记该条数据是否已删除)做软删除。
Connector相关说明
Connector支持的类别详情如下。
类别 | 详情 |
支持类型 | 源表 |
原生DDL访问 | 支持 |
数据源编码方式访问 | 不支持 |
元表 | 支持 |
运行模式 | 流模式 |
数据格式 | 暂不适用 |
特有监控指标 | 暂无 |
API种类 | SQL |
语法结构
CREATE TABLE input_table (
id BIGINT,
name STRING,
update_time TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'dp-jdbc',
'driver' = '<DriverName>',
'url' = '<JDBC URL>',
'table-name' = '<yourTableName>,
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'scan.table.time.column' = '<'UpdateTime'>'
);
WITH参数
通用
参数
说明
数据类型
是否必填
默认值
备注
connector
表类型。
String
是
无
固定值为
scan-jdbc
。driver
驱动。
String
是
无
固定值为
com.sap.db.jdbc.Driver
。url
URL地址。
String
是
无
格式为
jdbc:mysql://host:port/dbname
。table-name
表名。
String
是
无
读取的表名称。
username
用户名。
String
是
无
填写访问SAP HAHA的用户名。
password
密码。
String
是
无
填写访问SAP HAHA的密码。
源表
参数
说明
数据类型
是否必填
默认值
备注
scan.period.seconds
扫描的时间周期。
Integer
否
10
单位秒。
scan.start.time
读取更新的起始时间。
Date
否
当前时间
格式为
yyyy-MM-dd hh-mm-ss
格式;或者带时间区的yyyy-MM-dd hh-mm-ss Z
的格式。说明可配置为earliest,表示全量读取后,再增量读取。
参数示例如下:
earliest
2024-03-01 23:55:55
2024-03-01 23:55:55 +0800
scan.table.time.column
更新时间列。
Timestamp
是
无
指定表中那个列作为更新时间列,用于判断改行数据是否更新。必须为
timestamp
类型。scan.delay.seconds
延迟查询。
Integer
否
5
指延迟多少秒查询数据库。比如要查询update_time<'2024-03-08 12:00:00'的数据,会在2024-03-08 12:00:05的时候触发查询。可以避免数据库时间偏差和数据延迟。
类型映射
SAP HAHA类型 | Flink SQL类型 |
DATE | DATE |
TIME | TIME |
SECONDDTE | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
SMALLDECIMAL | 不支持 |
DECIMAL | DECIMAL |
REAL | FLOAT |
DOUBLE | DOUBLE |
FLOAT(n) | DOUBLE |
BOOLEAN | BOOLEAN |
VARCHAR | STRING |
NVARCHAR | STRING |
ALPHANUM | STRING |
SHORTTEXT | STRING |
VARBINARY | VARBINARY |
BLOB | 不支持 |
CLOB | 不支持 |
NCLOB | 不支持 |
TEXT | STRING |
ARRAY | 不支持 |
ST_GEOMETRY | 不支持 |
ST_POINT | 不支持 |
使用示例
源表
CREATE TABLE input_table ( id BIGINT, name STRING, update_time TIMESTAMP(3), PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'dp-jdbc', 'driver' = 'com.mysql.jdbc.Driver', 'url' = 'jdbc:mysql://localhost:3306/pf', 'table-name' = 'pf_id_name_time', 'username' = 'root', 'password' = 'root', 'scan.period.seconds' = '20', 'scan.start.time' = '2024-02-21 02:30:00', 'scan.table.time.column' = 'update_time' );