本文为您介绍如何通过实时计算控制台快速构建从Kafka到Hologres的数据同步作业,实现日志数据的实时入仓部署。
前提条件
-
如果您使用RAM用户或RAM角色等身份访问,需要确认已具有Flink控制台相关权限,详情请参见权限管理。
-
已创建Flink工作空间,详情请参见开通实时计算Flink版。
-
上下游存储
-
已创建消息队列Kafka实例,详情请参见步骤二:购买和部署实例。
-
已创建Hologres实例,详情请参见购买Hologres实例。
说明消息队列Kafka和Hologres需要与实时计算Flink版工作空间在相同地域相同VPC下,否则需要打通网络,详情请参见如何访问跨VPC的其他服务?或如何访问公网?。
-
步骤一:配置IP白名单
为了让Flink能访问Kafka和Hologres实例,您需要将Flink工作空间的网段添加到Kafka和Hologres的白名单中。
-
获取Flink工作空间的VPC网段。
-
登录实时计算控制台。
-
在目标工作空间右侧操作列,选择。
-
在工作空间详情对话框,查看虚拟交换机的网段信息。
对话框中展示工作空间基本信息及虚拟交换机列表,在列表的网段列可查看各可用区对应的VPC网段,将此网段记录下来用于后续白名单配置。
-
-
在消息队列Kafka的IP白名单中,添加Flink工作空间的网段信息。
您需要为网络类型为VPC的接入点配置白名单,操作步骤请参见配置白名单。在对应的白名单编辑对话框中,单击添加白名单IP添加网段。
-
在Hologres的IP白名单中,添加Flink工作空间的网段信息。
登录Hologres实例后配置IP白名单,操作步骤请参见IP白名单。在HoloWeb安全中心的白名单配置页面,在编辑IP白名单对话框的IP地址栏填入网段信息并单击确认。
步骤二:准备Kafka测试数据
使用实时计算Flink版的模拟数据生成Faker作为数据生成器,将数据写入到Kafka中。请按以下步骤使用实时计算开发控制台将数据写入至消息队列Kafka。
-
在Kafka控制台创建一个名称为users的Topic。
操作详情请参见步骤一:创建Topic。
-
创建将数据写入到Kafka的作业。
-
登录实时计算管理控制台。
-
单击目标工作空间操作列下的控制台。
-
在左侧导航栏,单击。
-
单击
后,单击新建流作业,填写文件名称并选择引擎版本。Flink也为您提供了丰富的代码模板和数据同步,每种代码模板都为您提供了具体的使用场景、代码示例和使用指导。您可以直接单击对应的模板快速地了解Flink产品功能和相关语法,实现您的业务逻辑,详情请参见代码模板和数据同步模板。
作业参数
说明
示例
文件名称
作业的名称。
说明作业名称在当前项目中必须保持唯一。
flink-test
引擎版本
当前作业使用的Flink引擎版本。
vvr-8.0.8-flink-1.17
-
单击创建。
-
编写SQL作业。
将以下作业代码拷贝到作业文本编辑区,然后根据实际配置,修改参数配置信息。
CREATE TEMPORARY TABLE source ( id INT, first_name STRING, last_name STRING, `address` ROW<`country` STRING, `state` STRING, `city` STRING>, event_time TIMESTAMP ) WITH ( 'connector' = 'faker', 'number-of-rows' = '100', 'rows-per-second' = '10', 'fields.id.expression' = '#{number.numberBetween ''0'',''1000''}', 'fields.first_name.expression' = '#{name.firstName}', 'fields.last_name.expression' = '#{name.lastName}', 'fields.address.country.expression' = '#{Address.country}', 'fields.address.state.expression' = '#{Address.state}', 'fields.address.city.expression' = '#{Address.city}', 'fields.event_time.expression' = '#{date.past ''15'',''SECONDS''}' ); CREATE TEMPORARY TABLE sink ( id INT, first_name STRING, last_name STRING, `address` ROW<`country` STRING, `state` STRING, `city` STRING>, `timestamp` TIMESTAMP METADATA ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092', 'topic' = 'users', 'format' = 'json', 'properties.enable.idempotence'='false' ); INSERT INTO sink SELECT * FROM source;需要修改的参数配置信息如下:
参数
示例值
说明
properties.bootstrap.servers
alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092
Kafka Broker地址。
格式为host:port,host:port,host:port,以英文逗号(,)分割。您可以在实例详情页面的接入点信息区域获取网络类型为VPC的域名接入点作为该参数的值。
topic
users
Kafka Topic名称。
-
-
启动作业。
-
在页面,单击部署。
-
在部署新版本对话框中,单击确定。
-
配置作业资源,资源设置填写详情请参见配置作业资源。
-
在页面,单击目标作业名称操作列中的启动。关于作业启动的配置说明,请参见作业启动。
-
您可以在作业运维页面观察作业的运行信息和状态。
由于faker数据源是一个有限流,因此在作业处于运行状态后,大约1分钟左右后,作业就会处于完成状态。当作业结束运行代表作业已经将相关的数据写入到Kafka的users中。其中,写入到消息队列Kafka的JSON数据格式大致如下。
{ "id": 765, "first_name": "Barry", "last_name": "Pollich", "address": { "country": "United Arab Emirates", "state": "Nevada", "city": "Powlowskifurt" } }
-
步骤三:创建并启动数据同步作业
通过Flink CDC同步
-
登录实时计算开发控制台,创建数据同步作业。
-
编写Flink CDC作业。将以下作业代码拷贝到作业文本编辑区,然后根据实际配置,修改参数配置信息。
假设Kafka的topic users中存有JSON格式的表数据,下面的作业可以将表的数据同步到Hologres的flink_test_db数据库下模式test_schema的表users中。
source: type: kafka name: Kafka Source properties.bootstrap.servers: alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092 topic: users scan.startup.mode: earliest-offset value.format: json json.infer-schema.flatten-nested-columns.enable: true sink: type: hologres name: Hologres Sink endpoint: hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80 dbname: flink_test_db username: ****** password: ** sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT transform: - source-table: \.*.\.* projection: \* primary-keys: id route: - source-table: users sink-table: test_schema.users需要修改的参数配置信息如下:
参数
示例值
说明
properties.bootstrap.servers
alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092
Kafka Broker地址。
格式为host:port,host:port,host:port,以英文逗号(,)分割。您可以在实例详情页面的接入点信息区域获取网络类型为VPC的域名接入点作为该参数的值。
topic
users
Kafka Topic名称。
endpoint
hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80
Hologres端点。
格式为<ip>:<port>。您可以在Hologres实例详情页面获取网络类型为指定VPC的域名信息作为该参数的值。
username
**
Hologres用户名和密码,请填写阿里云账号的AccessKey ID和AccessKey Secret。
重要为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见变量管理。
password
**
dbname
flink_test_db
Hologres数据库名称。
source-table
users
定义来自于上游哪个表,默认是topic名称。
sink-table
test_schema.users
定义写入下游哪张表,使用逗号连接Schema和表名。
-
单击保存。
-
在页面,单击部署。
-
在页面,单击目标作业名称操作列中的启动。关于作业启动的配置说明,请参见作业启动。
作业启动后,您可以在作业运维界面观察作业的运行信息和状态。页面中显示作业列表,包含状态、健康分、CPU、内存等运行指标,以及启动、停止等操作按钮。
通过SQL同步
-
登录实时计算开发控制台,创建数据同步作业。
-
编写SQL作业。将以下作业代码拷贝到作业文本编辑区,然后根据实际配置,修改参数配置信息。
将消息队列Kafka中名称为users的Topic数据同步至Hologres的flink_test_db数据库的users表中。您可以通过以下INSERT INTO方式完成数据同步。
考虑到Hologres中对于JSON和JSONB类型的数据会进行特殊的优化,您也可以通过INSERT INTO语句将嵌套JSON写入到Hologres中。
该方式需要您手动在Hologres中创建users表,然后通过下文的SQL将数据写入到Hologres的表中。
CREATE TEMPORARY TABLE kafka_users ( `id` INT NOT NULL, `address` STRING, -- 该列对应的数据为嵌套JSON。 `offset` BIGINT NOT NULL METADATA, `partition` BIGINT NOT NULL METADATA, `timestamp` TIMESTAMP METADATA, `date` AS CAST(`timestamp` AS DATE), `country` AS JSON_VALUE(`address`, '$.country') ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092', 'topic' = 'users', 'format' = 'json', 'json.infer-schema.flatten-nested-columns.enable' = 'true', -- 自动展开嵌套列。 'scan.startup.mode' = 'earliest-offset' ); CREATE TEMPORARY TABLE holo ( `id` INT NOT NULL, `address` STRING, `offset` BIGINT, `partition` BIGINT, `timestamp` TIMESTAMP, `date` DATE, `country` STRING ) WITH ( 'connector' = 'hologres', 'endpoint' = 'hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80', 'username' = '******', 'password' = '******', 'dbname' = 'flink_test_db', 'tablename' = 'users' ); INSERT INTO holo SELECT * FROM kafka_users;需要修改的参数配置信息如下:
参数
示例值
说明
properties.bootstrap.servers
alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092
Kafka Broker地址。
格式为host:port,host:port,host:port,以英文逗号(,)分割。您可以在实例详情页面的接入点信息区域获取网络类型为VPC的域名接入点作为该参数的值。
topic
users
Kafka Topic名称。
endpoint
hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80
Hologres端点。
格式为<ip>:<port>。您可以在Hologres实例详情页面获取网络类型为指定VPC的域名信息作为该参数的值。
username
******
Hologres用户名和密码,请填写阿里云账号的AccessKey ID和AccessKey Secret。
重要为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见变量管理。
password
******
dbname
flink_test_db
Hologres数据库名称。
tablename
users
Hologres表名称。
说明-
如果您通过INSERT INTO方式同步数据,则需要提前在目标实例的数据库中创建users表和字段。
-
如果Schema不为Public时,则tablename需要填写为schema.tableName。
-
-
单击保存。
-
在页面,单击部署。
-
在页面,单击目标作业名称操作列中的启动。关于作业启动的配置说明,请参见作业启动。
作业启动后,您可以在作业运维界面观察作业的运行信息和状态。页面中显示作业列表,包含状态、健康分、CPU、内存等运行指标,以及启动、停止等操作按钮。
步骤四:观察全量同步结果
-
在实例列表页面,单击目标实例名称。
-
在页面右上角,单击登录实例。
-
在元数据管理页签,查看users数据库中同步的users表结构和数据。
在左侧导航树中,依次展开目标实例名称 > flink_test_db > test_schema > 表,可以看到已同步的users表。
同步后的表结构和数据如下所示。
-
表结构
双击users表名称,查看表结构。
users表结构包含以下字段:id(int8,主键)、first_name(text)、last_name(text)、address.country(text)、address.state(text)、address.city(text)。
说明在同步过程中,建议声明Kafka的Metadata partition和offset作为Hologres表中的主键。这样可以避免由于作业Failover,数据重发导致下游存储多份相同数据。
-
表数据
在users表信息页面右上角,单击查询表后,输入如下命令,单击运行。
SELECT * FROM test_schema.users;表数据结果如下所示。
查询结果显示users表中已成功同步多行数据,包含id、first_name、last_name、address.country、address.state、address.city列的完整记录。
-
步骤五:观察自动同步表结构变更
-
在Kafka控制台手动发送一条包含新增列的消息。
-
在实例列表页面,单击目标实例名称。
-
在Topic管理页面,单击目标Topic名称users。
-
单击体验发送消息。
-
填写消息内容。
在快速体验消息收发对话框中,参照以下配置填写各项参数。
配置项
示例
发送方式
选中控制台。
消息Key
填写为flinktest。
消息内容
将以下JSON内容复制粘贴到消息内容中。
{ "id": 100001, "first_name": "Dennise", "last_name": "Schuppe", "address": { "country": "Isle of Man", "state": "Montana", "city": "East Coleburgh" }, "house-points": { "house": "Pukwudgie", "points": 76 } }说明该示例中house-points是一个新增的嵌套列。
发送到指定分区
选中是。
分区ID
填写为0。
-
单击确定。
-
在Hologres控制台,查看users表结构和数据的变化。
-
在实例列表页面,单击目标实例名称。
-
在页面右上角,单击登录实例。
-
在元数据管理页签,双击users表名称。
-
单击查询表后,输入如下命令,单击运行。
SELECT * FROM test_schema.users; -
查看表数据结果。
表数据结果如下所示。
可以观察到id为100001的数据已经成功地写入到了Hologres中。同时,Hologres中多了house-points.house和house-points.points 两列。
说明虽然插入到Kafka中的数据只有一个嵌套列house-points,但是由于在users表的WITH参数内声明要求json.infer-schema.flatten-nested-columns.enable,那么Flink 就会自动展平新增的嵌套列,并用访问该列的路径作为展开后的列的名字。
相关文档
-
CREATE TABLE AS (CTAS) 语法功能介绍,请参见CREATE TABLE AS(CTAS)语句。
-
消息队列Kafka作为源表或者结果表使用的功能介绍,请参见消息队列Kafka。
-
如果您需要调整节点的并发度和资源来提升作业性能,请参见配置作业部署信息。