云原生多模数据库 Lindorm支持实时数据订阅功能,对实时变更数据进行计算,可以为实时监控,实时报表和流数据分析功能提供有效数据。通过Flink Kafka Connector可以从底层存储消费订阅数据,同时Flink Kafka Connector对订阅数据进行解析时支持Debezium格式。
背景信息
云原生多模数据库 Lindorm的实时数据订阅功能包括Push模式和Pull模式。Push模式支持将实时变更数据投递至下游系统(例如TT、DD或MetaQ),同时可以结合其他计算平台对增量数据进行计算,构建一套完整的实时数据系统,但是这种模式存在日志堆积、开发周期长等问题。Pull模式的实时数据订阅功能底层使用Lindorm Streams Storage存储订阅消息,读写方式兼容Kafka客户端,因此可以支持Flink通过Kafka Connector的方式订阅消费。
前提条件
已创建数据订阅通道,具体操作,请参见通过Pull模式创建数据订阅通道。
Table API接入方式
Table API是Flink提供的API接口,支持使用Flink SQL进行数据查询。
Lindorm CDC的变更消息采用Debezium Format格式。
Table API支持Debezium-Json格式,相关内容请参见解析说明。
如果您在开源Flink上消费数据,需要配置以下Maven依赖。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>1.13.2</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>1.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.13.2</version>
</dependency>
消费数据示例
使用以下语句创建Lindorm源表格。
CREATE TABLE `lindorm_table` ( `c1` INT, `col1` VARCHAR, `col2` VARCHAR, `col3` VARCHAR, PRIMARY KEY ( `c1` ) )
创建数据订阅通道后,通过以下Java代码创建表格消费数据。
说明创建表格消费数据需要获取以下配置内容。
properties.bootstrap.servers:表示云原生多模数据库 Lindorm宽表提供的消费接入点,您可以通过云原生多模数据库 Lindorm控制台获取,在实例列表页面,左侧导航栏中选择 ,在链接信息区域中获取Lindorm宽表提供的消费接入点。
properties.group.id:表示消费组名称,您可以自定义。
topic:表示创建数据订阅通道时自定义的主题名。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.table.api.Expressions.$; public class TestDebeziumFlink { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env); // topic created by subscription String topicName = "test-topic"; // replace with cdc endpoint String bootstrapServer = "localhost:9092"; // consumer group id String groupID = "test-flink"; // create table connected to source tableEnvironment.executeSql("CREATE TABLE test_table (\n" + " c1 INT,\n" + " `f:col1` STRING,\n" + " `f:col2` STRING,\n" + " `f:col3` STRING\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = '"+ topicName + "',\n" + " 'properties.bootstrap.servers' = '" + bootstrapServer + "',\n" + " 'properties.group.id' = '" + groupID + "',\n" + " 'debezium-json.schema-include' = 'true',\n" + " 'format' = 'debezium-json'\n" + ")"); // create print table tableEnvironment.executeSql("CREATE TABLE test_table_copy (\n" + " c1 INT,\n" + " `f:col1` STRING,\n" + " `f:col2` STRING,\n" + " `f:col3` STRING\n" + ") WITH (\n" + " 'connector' = 'print'" + ")"); Table testTable = tableEnvironment.from("test_table"); testTable.select($("c1"), $("f:col1"), $("f:col2"), $("f:col3")).executeInsert("test_table_copy"); tableEnvironment.executeSql("SELECT * FROM test_table").print(); } }
结果显示示例。