协同消费

更新时间: 2023-09-07 13:03:25

协同消费

概念

点位服务

点位服务是提供将消费的点位保存在服务端的功能,点位由sequence和timestamp组成,sequence是递增的对应唯一记录的序列,timestamp是记录写入datahub的单位为ms的时间戳。

为Topic创建订阅,并在完成消费一部分数据后,将点位提交至服务端。下次启动任务时,可以从服务端获取上次提交的点位,从指定点位的下一条记录开始消费。将点位保存在服务端才能够实现shard重新分配后,能够从上次提交的点位之后消费,是协同消费功能的前提。

在Consumer中不需要手动处理点位,在config中设置点位提交的间隔,在读取记录时,认为之前的记录已经完成处理,若距离上次提交点位已经超过提交间隔,则尝试提交。在提交失败并且同时任务强制停止时,有一定可能造成点位提交不及时,重复消费一部分数据。

协同消费

协同消费是为了解决多个消费者同时消费一个topic时,自动分配shard的问题。能够简化消费的客户端处理,多个消费者可能是在不同机器上,通过自己协调分配shard是困难的。使用同一个Sub Id的Consummer在同一个Consumer Group中,同一个shard在一个Consumer Group中只会被分配给1个Consumer。

协同消费示意图

场景

现有3个消费者实例A,B,C,Topic共有10个shard

  1. 实例A启动,分配10个shard

  2. 实例B,C启动,shard分配为4,3,3

  3. 将1个shard进行split操作,在父节点消费完后,客户端主动释放,2个子节点加入后,shard分配为4,4,3

  4. 实例C停止后,shard分配为6,5

心跳

要实现协同消费的功能,需要通过心跳机制来通知让服务端消费者实例的状态,当前分配的shard和需要释放的shard,超过时间间隔没有收到心跳,则认为消费者实例已经停止。当消费者实例的状态发生改变,服务端会重新分配shard,新的分配计划也是通过心跳请求来返回,所以客户端感知shard变化是有时间间隔的。

版本

Maven依赖以及JDK:

maven pom

<dependency>
    <groupId>com.aliyun.datahub</groupId>
    <artifactId>aliyun-sdk-datahub</artifactId>
    <version>2.18.0-public</version>
</dependency>
<dependency>
      <groupId>com.aliyun.datahub</groupId>
      <artifactId>datahub-client-library</artifactId>
      <version>1.1.12-public</version>
</dependency>

jdk

jdk: >= 1.8

示例

Producer 代码示例

import com.aliyun.datahub.client.exception.AuthorizationFailureException;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.exception.InvalidParameterException;
import com.aliyun.datahub.client.exception.MalformedRecordException;
import com.aliyun.datahub.client.exception.NoPermissionException;
import com.aliyun.datahub.client.exception.ShardNotFoundException;
import com.aliyun.datahub.client.model.Field;
import com.aliyun.datahub.client.model.FieldType;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.model.RecordSchema;
import com.aliyun.datahub.client.model.TupleRecordData;
import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
import com.aliyun.datahub.clientlibrary.producer.Producer;
import com.aliyun.datahub.exception.ResourceNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class DatahubWriter {
    private static final Logger LOG = LoggerFactory.getLogger(DatahubWriter.class);

    private static void sleep(long milliSeconds) {
        try {
            TimeUnit.MILLISECONDS.sleep(milliSeconds);
        } catch (InterruptedException e) {
            // TODO:自行处理异常
        }
    }

    private static List<RecordEntry> genRecords(RecordSchema schema) {
        List<RecordEntry> recordEntries = new ArrayList<>();
        for (int cnt = 0; cnt < 10; ++cnt) {
            RecordEntry entry = new RecordEntry();
            entry.addAttribute("key1", "value1");
            entry.addAttribute("key2", "value2");

            TupleRecordData data = new TupleRecordData(schema);
            data.setField("field1", "testValue");
            data.setField("field2", 1);

            entry.setRecordData(data);
            recordEntries.add(entry);
        }
        return recordEntries;
    }

    private static void sendRecords(Producer producer, List<RecordEntry> recordEntries) {
        int maxRetry = 3;
        while (true) {
            try {
                // 自动选择shard写入
                producer.send(recordEntries, maxRetry);

                // 指定写入shard "0"
                // producer.send(recordEntries, "0", maxRetry);
                LOG.error("send records: {}", recordEntries.size());
                break;
            } catch (MalformedRecordException e) {
                // record 格式非法,根据业务场景选择忽略或直接抛异常
                LOG.error("write fail", e);
                throw e;
            } catch (InvalidParameterException |
                    AuthorizationFailureException |
                    NoPermissionException e) {
                // 请求参数非法
                // 签名不正确
                // 没有权限
                LOG.error("write fail", e);
                throw e;
            } catch (ShardNotFoundException e) {
                // shard 不存在, 如果不是写入自己指定的shard,可以不用处理
                LOG.error("write fail", e);
                sleep(1000);
            } catch (ResourceNotFoundException e) {
                // project, topic 或 shard 不存在
                LOG.error("write fail", e);
                throw e;
            } catch (DatahubClientException e) {
                // 基类异常,包含网络问题等,可以选择重试
                LOG.error("write fail", e);
                sleep(1000);
            }
        }
    }

    public static void main(String[] args) {
        // Endpoint以Region: 华东1为例,其他Region请按实际情况填写

        String projectName = "<YourProjectName>";
        String topicName = "<YourTopicName>";

        RecordSchema schema = new RecordSchema();
        schema.addField(new Field("field1", FieldType.STRING));
        schema.addField(new Field("field2", FieldType.BIGINT));

        ProducerConfig config = new ProducerConfig(endpoint, accessId, accessKey);
        Producer producer = new Producer(projectName, topicName, config);

        // 根据场景控制循环
        boolean stop = false;
        try {
            while (!stop) {
                List<RecordEntry> recordEntries = genRecords(schema);
                sendRecords(producer, recordEntries);
            }
        } finally {
            // 确保资源正确释放
            producer.close();
        }
    }
}

初始化Consumer

配置

名称

描述

autoCommit

是否自动提交点位,默认为true。点位的提交会在后台线程按配置的时间间隔执行,自动提交的逻辑是当read接口被调用时,认为之前读的数据已经处理完毕。如果设置为false,那么每条record处理完必须ack,后台提交点位会保证该点位之前的record全部被ack。

offsetCommitTimeoutMs

点位的提交间隔,单位毫秒,默认30000ms,范围[3000, 300000]

sessionTimeoutMs

会话超时时间,心跳间隔会设为改置的2/3,超过时间没有心跳,认为客户端已停止,服务端会重新分配被占有shard,单位毫秒,默认60000ms,范围[60000, 180000]

fetchSize

单个shard异步读取记录的大小,会缓存2倍于该值的记录,少于2倍会触发异步任务去读取,默认1000,必须大于0

您还需要在工程中配置相应的Access Key和Secret Key,推荐使用环境变量的形式在配置文件中配置。

datahub.endpoint=<yourEndpoint>
datahub.accessId=<yourAccessKeyId>
datahub.accessKey=<yourAccessKeySecret>
重要

阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。

强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。

// Endpoint以Region: 华东1为例,其他Region请按实际情况填写
@Value("${datahub.endpoint}")
String endpoint ;
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
String projectName = "<YourProjectName>";
String topicName = "<YourTopicName>";
String subId = "<YourSubscriptionId>";

// 1. 使用协同消费,subId
ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
Consumer consumer = new Consumer(projectName, topicName, subId, config);

// 2. 不使用协同消费,使用点位服务,提供subId和Consumer读取的shard列表
List<String> assignment = Arrays.asList("0", "1", "2");
ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
Consumer consumer = new Consumer(projectName, topicName, subId, assignment, config);

// 3. 不使用协同消费,不使用点位服务记录的点位,提供subId,Consumer读取的shard和初始点位
Map<String, Offset> offsetMap = new HashMap<>();

// 提供sequence和timestamp,若sequence超出范围则使用timestamp获取Cursor
offsetMap.put("0", new Offset(100, 1548573440756L));
// 只提供sequence,按照sequence获取Cursor
offsetMap.put("1", new Offset().setSequence(1));
// 只提供timestamp,按照timestamp获取Cursor
offsetMap.put("2", new Offset().setTimestamp(1548573440756L));

ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
Consumer consumer = new Consumer(projectName, topicName, subId, offsetMap, config);

协同代码示例

import com.aliyun.datahub.client.exception.AuthorizationFailureException;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.exception.InvalidParameterException;
import com.aliyun.datahub.client.exception.NoPermissionException;
import com.aliyun.datahub.client.exception.SubscriptionOfflineException;
import com.aliyun.datahub.client.exception.SubscriptionOffsetResetException;
import com.aliyun.datahub.client.exception.SubscriptionSessionInvalidException;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.model.TupleRecordData;
import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import com.aliyun.datahub.clientlibrary.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

public class DatahubReader {
    private static final Logger LOG = LoggerFactory.getLogger(DatahubReader.class);

    private static void sleep(long milliSeconds) {
        try {
            TimeUnit.MILLISECONDS.sleep(milliSeconds);
        } catch (InterruptedException e) {
            // TODO:自行处理异常
        }
    }

    public static Consumer createConsumer(ConsumerConfig config, String project, String topic, String subId)
    {
        return new Consumer(project, topic, subId, config);
    }

    public static void main(String[] args) {
 
         String projectName = "<YourProjectName>";
         String topicName = "<YourTopicName>";
         String subId = "<YourSubscriptionId>";

        ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
        Consumer consumer = createConsumer(config, projectName, topicName, subId);

        int maxRetry = 3;
        boolean stop = false;
        try {
            while (!stop) {
                try {
                    while (true) {
                        // 协同消费刚初始化,需要等待服务端分配shard,约40秒,期间只能返回null
                        // 自动提交模式,每次调用read,认为之前读的数据都已处理完成,自动ack
                        RecordEntry record = consumer.read(maxRetry);

                        // 处理数据
                        if (record != null) {
                            TupleRecordData data = (TupleRecordData) record.getRecordData();
                                                        // 根据自己的schema来处理数据,此处打印第一列的内容
                            LOG.info("field1: {}", data.getField(0));
                                                        
                                                        // 根据列名取数据
                            // LOG.info("field2: {}", data.getField("field2"));

                            // 非自动提交模式,每条record处理完后都需要ack
                            // 自动提交模式,ack不会做任何操作
                            // 1.1.7版本及以上
                            record.getKey().ack();
                        } else {
                            LOG.info("read null");
                        }
                    }
                } catch (SubscriptionOffsetResetException e) {
                    // 点位被重置,重新初始化consumer
                    try {
                        consumer.close();
                        consumer = createConsumer(config, projectName, topicName, subId);
                    } catch (DatahubClientException e1) {
                        // 初始化失败,重试或直接抛异常
                        LOG.error("create consumer failed", e);
                        throw e;
                    }
                } catch (InvalidParameterException |
                        SubscriptionOfflineException |
                        SubscriptionSessionInvalidException |
                        AuthorizationFailureException |
                        NoPermissionException e) {
                    // 请求参数非法
                    // 订阅被下线
                    // 订阅下相同shard被其他客户端占用
                    // 签名不正确
                    // 没有权限
                    LOG.error("read failed", e);
                    throw e;
                } catch (DatahubClientException e) {
                    // 基类异常,包含网络问题等,可以选择重试
                    LOG.error("read failed, retry", e);
                    sleep(1000);
                }
            }
        } catch (Throwable e) {
            LOG.error("read failed", e);
        } finally {
            // 确保资源正确释放
            // 会提交已ack的点位
            consumer.close();
        }
    }
}

注意事项

Consumer和Producer都不支持多线程访问,如果需要使用多线程,则在每个线程都使用不同的Consumer或Producer对象。

阿里云首页 DataHub 相关技术圈