通过Pull模式创建数据订阅通道

本文介绍通过Pull模式创建数据订阅功能,创建后订阅通道会实时拉取数据库实例的增量数据,并将增量数据保存在订阅通道中,您可以使用Lindorm提供的SDK从订阅通道中订阅增量数据并进行消费。同时,您可以在LTS页面进行订阅通道的创建、查看及删除等操作。

前提条件

已将客户端IP添加至白名单中,具体操作请参见设置白名单

已开通数据订阅功能,具体操作,请参见开通数据订阅

操作步骤

  1. 进入LTS(原BDS)页面,在左侧导航栏中,选择数据订阅 > Pull模式

    streamone
  2. 单击创建数据订阅通道,并配置以下参数。

    创建订阅通道

    名称

    描述

    源集群

    填写Lindorm实例ID。

    Lindorm表名

    选择需要创建数据订阅通道的Lindorm实例表,一条通道只能选择一张表格。

    主题名

    用于消费数据的主题名称。

    数据过期时间(天)

    表示数据可以保存的天数,默认为7天。

    主题分区数

    表示Kafka客户端为该主题设置多个分区,多分区可以并发消费数据,默认为4个分区。

  3. 单击提交

  4. (可选)找到目标订阅通道,单击操作列的详情,可以查看数据订阅通道详情、消费详情和存储详情。

    详细信息
  5. (可选)您可以通过以下代码在Kafka客户端对订阅数据进行消费。

    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.kafka.clients.admin.AdminClientConfig;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.ByteArrayDeserializer;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    
    public class TestConsume {
      public static void main(String[] args) throws Exception {
        // 创建订阅通道时填写的topic名称
        String topic = "test-topic";
    
        // 链接endpoint的配置项
        Properties props = new Properties();
        // 指定链接endpoint地址
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "ld-xxx:9092");
        // 指定Key序列化器,不可更改
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        // 指定Value序列化器,不可更改
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        // 指定消费组名称,在消费时会自动创建
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id-0");
    
        // 创建消费者
        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
        // 订阅主题
        consumer.subscribe(Arrays.asList(topic));
    
        // 用消费者拉取数据
        ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(10000));
        for (ConsumerRecord<byte[], byte[]> record : records) {
          // 查看数据内容
          System.out.println("key: " + Bytes.toString(record.key()));
          System.out.println("value: " + Bytes.toString(record.value()));
        }
        // 提交当前消费位移
        consumer.commitSync();
        // 关闭消费者
        consumer.close();
      }
    }
    说明

    数据消费格式说明请参见数据消费格式