通过Pull模式创建数据订阅通道

更新时间:
复制 MD 格式

本文介绍通过Pull模式创建数据订阅功能,创建后订阅通道会实时拉取数据库实例的增量数据,并将增量数据保存在订阅通道中,您可以使用Lindorm提供的SDK从订阅通道中订阅增量数据并进行消费。同时,您可以在LTS页面进行订阅通道的创建、查看及删除等操作。

前提条件

已将客户端IP添加至白名单中,具体操作请参见设置白名单

已开通数据订阅功能,具体操作,请参见开通数据订阅

操作步骤

  1. 进入LTS(原BDS)页面,在左侧导航栏中,选择数据订阅 > Pull模式

    进入 Lindorm CDC订阅通道列表 页面,页面右上角提供 创建数据订阅通道 按钮,列表展示已有通道的订阅通道 Id、Lindorm 表名、主题名及操作(详情删除)。

  2. 单击创建数据订阅通道,并配置以下参数。

    名称

    描述

    源集群

    填写Lindorm实例ID。

    Lindorm表名

    选择需要创建数据订阅通道的Lindorm实例表,一条通道只能选择一张表格。

    主题名

    用于消费数据的主题名称。

    数据过期时间(天)

    表示数据可以保存的天数,默认为7天。

    主题分区数

    表示Kafka客户端为该主题设置多个分区,多分区可以并发消费数据,默认为4个分区。

  3. 单击提交

  4. (可选)找到目标订阅通道,单击操作列的详情,可以查看数据订阅通道详情、消费详情和存储详情。

  5. (可选)您可以通过以下代码在Kafka客户端对订阅数据进行消费。

    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.kafka.clients.admin.AdminClientConfig;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.ByteArrayDeserializer;
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    public class TestConsume {
      public static void main(String[] args) throws Exception {
        // 创建订阅通道时填写的topic名称
        String topic = "test-topic";
        // 链接endpoint的配置项
        Properties props = new Properties();
        // 指定链接endpoint地址
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "ld-xxx:9092");
        // 指定Key序列化器,不可更改
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        // 指定Value序列化器,不可更改
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        // 指定消费组名称,在消费时会自动创建
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id-0");
        // 创建消费者
        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
        // 订阅主题
        consumer.subscribe(Arrays.asList(topic));
        // 用消费者拉取数据
        ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(10000));
        for (ConsumerRecord<byte[], byte[]> record : records) {
          // 查看数据内容
          System.out.println("key: " + Bytes.toString(record.key()));
          System.out.println("value: " + Bytes.toString(record.value()));
        }
        // 提交当前消费位移
        consumer.commitSync();
        // 关闭消费者
        consumer.close();
      }
    }
    说明

    数据消费格式说明请参见数据消费格式