通过Flink访问宽表引擎

更新时间:
复制为 MD 格式

您可以使用社区版Flink或阿里云实时计算版Flink访问云原生多模数据库 Lindorm宽表。本文介绍同时适用于阿里云Flink和社区版Flink访问Lindorm宽表的方法。

背景信息

您可以将云原生多模数据库 Lindorm宽表作为Flink中的维表或者结果表,通过Flink SQL或者Flink DataStream访问Lindorm宽表。

选择连接方式

Lindorm宽表中,表根据创建方式的不同分成HBase表(使用HBase API创建并写入数据的表)和SQL表(使用Lindorm SQL创建并写入数据的表)两种形态,访问它们需要用不同的接口。因此在开始创建Flink任务之前,需要先确定连接器的种类,并进而根据连接器种类确定宽表的连接地址。

确定要访问的宽表形态

Lindorm中,可以使用Lindorm SQL来确定开发中的Flink任务所要访问的宽表种类:

  1. 使用lindorm-cli(或者直接使用 Lindorm Insight 或 DMS)连接Lindorm宽表引擎。

  2. 执行下述SQL语句查看表的IS_HBASE_LIKE属性。

    • 如果查得的属性值为TRUE,则表示该表是一张HBase

    • 如果查得的属性值为FALSE,则表示该表是一张SQL

    SHOW TABLE VARIABLES FROM table_name LIKE 'IS_HBASE_LIKE';
说明

明确需要使用的连接器种类

用户需要基于选定的Flink的产品形态,决定具体使用哪种连接器来访问Lindorm宽表。

宽表形态

社区版Flink

阿里云实时计算Flink

HBase

开源HBase 连接器

(支持作为维表结果表

云原生多模数据库Lindorm连接器

(支持作为维表结果表

SQL

开源JDBC 连接器

(支持作为结果表

云原生多模数据库Lindorm连接器

(支持作为维表结果表

说明

阿里云实时计算Flink指的是阿里云上托管的Flink产品形态,详情可参考文档实时计算Flink。请注意,在阿里云ECS上使用开源Flink搭建的Flink集群,仍然归属于上图中的社区版Flink。

获取宽表连接信息

  • 场景一:使用开源HBase连接器云原生多模数据库Lindorm连接器
    在此场景下,用于连接的地址需要使用宽表引擎的HBase Java API访问地址(专有网络)。
    image.png

  • 场景二:使用JDBC连接器
    在此场景下,用于连接的地址需要使用宽表引擎的MySQL兼容地址(专有网络)。
    image.png

说明
  • 如果Flink任务中使用新创建的Lindorm用户来访问宽表,请确保该用户有访问Flink表的读写权限,赋予权限的具体操作请参见为指定用户赋予权限

  • Lindorm宽表的各种连接地址的详细说明,可参考文档查看宽表引擎连接地址

Flink作业访问宽表的方法

用户可根据所选择的实时计算框架的通用方式进行开发。对于作业中访问Lindorm宽表的需求,则可基于上文所选择的连接器对照下面的文档实现计算作业对Lindorm宽表的访问。

前提条件

  • 使用社区版Flink开发访问Lindorm宽表的作业

  • 使用阿里云实时计算Flink开发访问Lindorm宽表的作业,则对宽表引擎版本无限制。

  • 确保 Flink 集群所属环境已与Lindorm实例实现了网络打通,且已将客户端IP地址添加至Lindorm白名单。如何添加,请参见设置白名单

社区版Flink作业开发

开源HBase连接器

重要

使用社区版Flink开发访问HBase表的作业场景下,如果想要通过公网访问或访问目标的Lindorm实例类型为Lindorm单节点,那么在执行后续操作前,必须先升级SDK并更改配置

具体操作,请参见通过HBase Java API连接并使用宽表引擎章节中的步骤1。

使用开源HBase连接器创建维表与结果表的具体操作方法请参见HBase连接器使用文档

开源JDBC连接器

使用开源JDBC连接器访问Lindorm宽表时,目前只支持将宽表作为结果表。整体的使用方法可以参考JDBC连接器官方文档,不过其中存在一些需要特别注意的地方,如下所示:

  • 依赖要求
    相对官方文档中比较宽泛的依赖包版本,使用JDBC连接器访问Lindorm宽表目前对于相关依赖包额版本限定在以下列表:image.png

    MySQL JDBC驱动的依赖包可从社区官方下载

  • JDBC连接器参数
    由于不支持作为源表和维表,因此与源表、维表功能相关的参数都不支持(如类似于scan.fetch-size之类以 scan 作为前缀的参数以及lookup.cache之类以 lookup 作为前缀的参数等)。
    JDBC的部分连接器参数使用建议如下:

    • url:建议遵循文档基于Java JDBC接口的应用开发进行配置。

    • username:使用 Lindorm 实例中创建的用户名。

    • password:上述用户名对应的密码。

    • connector,table-name:遵循 JDBC 连接器社区的建议。

    • sink 相关参数:结合作业实际情况微调。

  • 数据类型映射
    Flink数据类型与Lindorm数据类型的映射原则上可参照MySQL的数据类型进行(可参照JDBC连接器章节数据类型映射),但有些数据类型Lindorm并没有与MySQL对齐。比如在JDBC连接器中宣称支持映射的MySQL类型中,下述类型在Lindorm中并不支持:

    • MEDIUMINT类型

    • DATETIME类型

    • BIGINT UNSIGNED以外的UNSIGNED类型

    关于Lindorm SQLMySQL的语法功能兼容性对比,可参见Lindorm SQLMySQL的兼容性对比

以下示例通过Flink SQL定义了一个基于JDBC连接器访问Lindorm宽表的作业。其中,假定在Lindorm宽表引擎中已经定义了一张表名为testflink的表。

# Flink建表和启动任务
CREATE TABLE source_table(
    c1 INT,
    c2 STRING
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '2',
  'fields.c2.length' = '5',
  'fields.c1.min' = '1',
  'fields.c1.max' = '100'
);


CREATE TABLE sink_table(
    c1 INT,
    c2 STRING
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://ld-xxxxx-proxy-lindorm.lindorm.rds.aliyuncs.com:33060/default?sslMode=disabled&allowPublicKeyRetrieval=true&useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSize=300&prepStmtCacheSqlLimit=50000000',
    'username' = 'root',
    'password' = 'root',
    'table-name' = 'testflink'
);


INSERT INTO sink_table SELECT * FROM source_table;

阿里云实时计算Flink作业开发

云原生多模数据库Lindorm连接器