使用Flink SQL消费实时变更数据示例

云原生多模数据库 Lindorm支持实时数据订阅功能,对实时变更数据进行计算,可以为实时监控,实时报表和流数据分析功能提供有效数据。通过Flink Kafka Connector可以从底层存储消费订阅数据,同时Flink Kafka Connector对订阅数据进行解析时支持Debezium格式。

背景信息

云原生多模数据库 Lindorm的实时数据订阅功能包括Push模式和Pull模式。Push模式支持将实时变更数据投递至下游系统(例如TT、DD或MetaQ),同时可以结合其他计算平台对增量数据进行计算,构建一套完整的实时数据系统,但是这种模式存在日志堆积、开发周期长等问题。Pull模式的实时数据订阅功能底层使用Lindorm Streams Storage存储订阅消息,读写方式兼容Kafka客户端,因此可以支持Flink通过Kafka Connector的方式订阅消费。

前提条件

已创建数据订阅通道,具体操作,请参见通过Pull模式创建数据订阅通道

Table API接入方式

Table API是Flink提供的API接口,支持使用Flink SQL进行数据查询。

  • Lindorm CDC的变更消息采用Debezium Format格式。

  • Table API支持Debezium-Json格式,相关内容请参见解析说明

如果您在开源Flink上消费数据,需要配置以下Maven依赖。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-java</artifactId>
  <version>1.13.2</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.12</artifactId>
  <version>1.13.2</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.13.2</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients_2.12</artifactId>
  <version>1.13.2</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table</artifactId>
  <version>1.13.2</version>
  <type>pom</type>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-common</artifactId>
  <version>1.13.2</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java</artifactId>
  <version>1.13.2</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.12</artifactId>
  <version>1.13.2</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner-blink_2.12</artifactId>
  <version>1.13.2</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-json</artifactId>
  <version>1.13.2</version>
</dependency>

消费数据示例

  1. 使用以下语句创建Lindorm源表格。

    CREATE TABLE `lindorm_table` (
        `c1` INT,
      `col1` VARCHAR,
      `col2` VARCHAR,
      `col3` VARCHAR,
      PRIMARY KEY ( `c1` )
    )
  2. 创建数据订阅通道后,通过以下Java代码创建表格消费数据。

    说明

    创建表格消费数据需要获取以下配置内容。

    • properties.bootstrap.servers:表示云原生多模数据库 Lindorm宽表提供的消费接入点,您可以通过云原生多模数据库 Lindorm控制台获取,在实例列表页面,左侧导航栏中选择宽表引擎 > 数据订阅,在链接信息区域中获取Lindorm宽表提供的消费接入点。

    • properties.group.id:表示消费组名称,您可以自定义。

    • topic:表示创建数据订阅通道时自定义的主题名。

    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    
    import static org.apache.flink.table.api.Expressions.$;
    
    public class TestDebeziumFlink {
      public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
    
        // topic created by subscription
        String topicName = "test-topic";
        // replace with cdc endpoint
        String bootstrapServer = "localhost:9092";
        // consumer group id
        String groupID = "test-flink";
    
        // create table connected to source
        tableEnvironment.executeSql("CREATE TABLE test_table (\n" +
            "    c1 INT,\n" +
            "    `f:col1` STRING,\n" +
            "    `f:col2` STRING,\n" +
            "    `f:col3` STRING\n" +
            ") WITH (\n" +
            "    'connector' = 'kafka',\n" +
            "    'topic'     = '"+ topicName + "',\n" +
            "    'properties.bootstrap.servers' = '" + bootstrapServer + "',\n" +
            "    'properties.group.id' = '" + groupID + "',\n" +
            "    'debezium-json.schema-include' = 'true',\n" +
            "    'format'    = 'debezium-json'\n" +
            ")");
    
        // create print table
        tableEnvironment.executeSql("CREATE TABLE test_table_copy (\n" +
            "    c1 INT,\n" +
            "    `f:col1` STRING,\n" +
            "    `f:col2` STRING,\n" +
            "    `f:col3` STRING\n" +
            ") WITH (\n" +
            "    'connector' = 'print'" +
            ")");
    
        Table testTable = tableEnvironment.from("test_table");
        testTable.select($("c1"), $("f:col1"), $("f:col2"), $("f:col3")).executeInsert("test_table_copy");
    
        tableEnvironment.executeSql("SELECT * FROM test_table").print();
      }
    }
  3. 结果显示示例。消费示例