本文为您介绍Dataphin中SAP HAHA Connector的语法结构、WITH参数和使用示例等。

背景信息

SAP HANA是一种多模型数据库,它将数据存储在内存中,而不是将其保存在磁盘上。面向列的内存数据库设计允许您在单个系统中运行高级分析和高速事务。

支持的版本

Flink版本

SAP HANA版本

SAP HANA Driver版本

1.15.3

不限制

2.19.16

说明

SAP HANA Connector没有使用特别的JDBC Driver特性。所以,如果Driver能连通访问SAP HANA数据库,基本都可以运行。

使用限制

  • 全量读取和增量读取都是单并发,在大数据量下,全量读取会有性能瓶颈,建议通过其他方案解决。例如,离线方案。

  • 仅支持单表的数据采集,不支持多表、整库。

  • 系统按照固定频率query定时请求时间需要自行配置,如果配置的时间较长,将会导致实时性降低;如果配置的时间很短,将会增加数据库查询压力。

  • 系统采集到的数据都是INSERT类型的,如果想获取类似CDC的数据,需要通过其他技术方案解决。例如,通过基于主键的GROUP BY生成CDC数据。

  • 来源表上必须配置update_time字段(记录更新时间),并且每次记录更新必须要更新这个字段值(系统需要依据这个字段判断记录是否更新过)。

  • 系统按照update_time时间位点获取更新数据。如果update_time的值比实际写入库的时间小,那么系统可能会遗漏这条数据。(系统默认有5秒的等待)。

  • 数据中的数据的update_time是有序发生的。A数据的update_time小于B数据,那么A数据总是先于B数据对查询可见。否则可能导致数据丢失。

  • 在数据量大的表上(>10000条),会有比较大的性能问题。建议数据量大的表在update_time上建索引。

  • Flink SQL任务读取到的数据的变更类型默认为INSERT类型,如果需要处理成ChangeLog数据,您需要自行处理(按主键聚合或按更新时间排序取最新一条数据),这种情况下如果更新主键,数据可能会出错,请谨慎操作

  • 不支持采集Delete的数据。如果您必须要删除,并且采集到下游,可以配置删除字段(加一个标记该条数据是否已删除)做软删除。

性能参考

测试环境下,使用1CU的Yarn资源(1cpu 4G Memoery,单并发),能够产生3000RPS的消费速率。

重要

实际的任务消费速率和具体的任务逻辑、输入输出的网络带宽、字段的大小等有关系。该参数值仅供参考。

Connector相关说明

Connector支持的类别详情如下。

类别

详情

支持类型

源表

原生DDL访问

支持

数据源编码方式访问

不支持

元表

支持

运行模式

流模式

数据格式

暂不适用

特有监控指标

暂无

API种类

SQL

语法结构

CREATE TABLE input_table (
  id BIGINT
  name STRING,
  update_time TIMESTAMP(3)
) WITH (
    'connector' = 'scan-jdbc',
    'driver' = 'com.sap.db.jdbc.Driver’,
    'url' = '<JDBC URL>',
    'table-name' = '<yourTableName>',
    'username' = '<yourUsername>',
    'password' = '<yourPassword>',
    'scan-period.seconds' = '<PeriodSeconds>',
    'scan.start.time' = '<StartTime>',
    'scan.table.time.column' = '<UpdateTime>'
);

WITH参数

  • 通用

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    表类型。

    String

    固定值为scan-jdbc

    driver

    驱动。

    String

    固定值为com.sap.db.jdbc.Driver

    url

    URL地址。

    String

    格式为jdbc:sap://host:port/dbname

    table-name

    表名。

    String

    读取的表名称。

    username

    用户名。

    String

    填写访问SAP HAHA的用户名。

    password

    密码。

    String

    填写访问SAP HAHA的密码。

  • 源表

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    scan.period.seconds

    扫描的时间周期。

    Integer

    10

    单位秒。

    scan.start.time

    读取更新的起始时间。

    Date

    当前时间

    格式为yyyy-MM-dd hh-mm-ss格式;或者带时间区的yyyy-MM-dd hh-mm-ss Z的格式。

    说明

    可配置为earliest,表示全量读取后,再增量读取。

    参数示例如下:

    • earliest

    • 2024-03-01 23:55:55

    • 2024-03-01 23:55:55 +0800

    scan.table.time.column

    更新时间列。

    Timestamp

    指定表中那个列作为更新时间列,用于判断改行数据是否更新。必须为timestamp类型。

    scan.delay.seconds

    延迟查询。

    Integer

    5

    指延迟多少秒查询数据库。比如要查询update_time<'2024-03-08 12:00:00'的数据,会在2024-03-08 12:00:05的时候触发查询。可以避免数据库时间偏差和数据延迟。

类型映射

SAP HAHA类型

Flink SQL类型

DATE

DATE

TIME

TIME

SECONDDTE

TIMESTAMP

TIMESTAMP

TIMESTAMP

TINYINT

TINYINT

SMALLINT

SMALLINT

INTEGER

INTEGER

BIGINT

BIGINT

SMALLDECIMAL

不支持

DECIMAL

DECIMAL

REAL

FLOAT

DOUBLE

DOUBLE

FLOAT(n)

DOUBLE

BOOLEAN

BOOLEAN

VARCHAR

STRING

NVARCHAR

STRING

ALPHANUM

STRING

SHORTTEXT

STRING

VARBINARY

VARBINARY

BLOB

不支持

CLOB

不支持

NCLOB

不支持

TEXT

STRING

ARRAY

不支持

ST_GEOMETRY

不支持

ST_POINT

不支持

使用示例

  • 源表

    CREATE TABLE input_table (
      id BIGINT
      name STRING,
      update_time TIMESTAMP(3)
    ) WITH (
        'connector' = 'scan-jdbc',
        'driver' = 'com.sap.db.jdbc.Driver’,
        'url' = jdbc:sap://127.***.***.1:39041?databaseName=HXE',
        'table-name' = 'table_id_name_time',
        'username' = 'root',
        'password' = 'root',
        'scan-period.seconds' = '20',
        'scan.start.time' = '2024-02-20 18:30:00',
        'scan.table.time.column' = 'update_time'
    );

时区设置

如果SAP HANA数据库的时区和Flink任务的时区不同,您需要进行时区的设置。否则将导致读取不到变化数据,或者重复读取数据。SAP HANA的JDBC本身无法设置时区参数,默认读取的是JVM的时区配置 (user.timezone),您可以在Flink SQL任务的Flink Config参数中增加如下参数:

重要

设置JVM的时区后,会影响整个Fink任务的时间处理,请谨慎设置

env.java.opts: -Duser.timezone=xxx

-- 如果设置utc时间:注意UTC必须大写
env.java. opts: -Duser.timezone=UTC

-- 如果设置北京时间:
env.java.opts: -Duser.timezone=GMT+8