协同消费组(ConsumerGroup) 是实时消费数据高级模式,能够提供多个消费实例对日志库消费自动负载均衡。Spark Streaming、Storm 都以 ConsumerGroup 作为基础模式。

通过控制台查看消费进度

  1. 登录日志服务控制台。
  2. 选择所需的项目,单击项目名称。
  3. 单击左侧导航栏中的 LogHub - 实时消费 > 协同消费
  4. 协同消费 功能页面,选择日志库(Logstore)后即可查看目前是否启用协同消费功能。
    图 1. 协同消费


  5. 选择指定的 ConsumerGroup 之后,单击 消费状态,即可查看当前每个 shard 消费数据的进度。
    图 2. 消费状态


    如上图所示,页面上展示该日志库包含 4 个 Shard,对应 4 个消费者,其中每个消费者最近消费的数据时间如第二列显示。通过消费数据时间可以判断出目前数据处理是否能满足数据产生速度,如果已经严重落后于当前时间(即数据消费速率小于数据产生速率),可以考虑增加消费者数目。

通过 API/SDK 查看消费进度

以 Java SDK 作为例子,演示如何通过 API 获得消费状态。

package test;
import java.util.ArrayList;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts.CursorMode;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
import com.aliyun.openservices.log.exception.LogException;
public class ConsumerGroupTest {
    static String endpoint = "";
    static String project = "";
    static String logstore = "";
    static String accesskeyId = "";
    static String accesskey = "";
    public static void main(String[] args) throws LogException {
        Client client = new Client(endpoint, accesskeyId, accesskey);
        //获取这个 logstore 下的所有 consumer group,可能不存在,此时 consumerGroups 的长度是 0
        ArrayList<ConsumerGroup>  consumerGroups;
        try{
            consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
        }
        catch(LogException e){
            if(e.GetErrorCode() == "LogStoreNotExist")
                System.out.println("this logstore does not have any consumer group");
            else{
                //internal server error branch
            }
            return;
        }
        for(ConsumerGroup c: consumerGroups){
            //打印 consumer group 的属性,包括名称、心跳超时时间、是否按序消费
            System.out.println("名称: " + c.getConsumerGroupName());
            System.out.println("心跳超时时间: " + c.getTimeout());
            System.out.println("按序消费: " + c.isInOrder());
            for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
                System.out.println("shard: " + cp.getShard());
                //请格式化下,这个时间返回精确到毫秒的时间,长整型
                System.out.println("最后一次消费数据的时间: " + cp.getUpdateTime());
                System.out.println("消费者名称: " + cp.getConsumer());
                String consumerPrg = "";
                if(cp.getCheckPoint().isEmpty())
                    consumerPrg = "尚未开始消费";
                else{
                    //unix 时间戳,单位是秒,输出的时候请注意格式化
                    try{
                        int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
                        consumerPrg = "" + prg;
                    }
                    catch(LogException e){
                        if(e.GetErrorCode() == "InvalidCursor")
                            consumerPrg = "非法,前一次消费时刻已经超出了logstore中数据的生命周期";
                        else{
                            //internal server error
                            throw e;
                        }
                    }
                }
                System.out.println("消费进度: " + consumerPrg);
                String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
                int endPrg = 0;
                try{
                    endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
                }
                catch(LogException e){
                    //do nothing
                }
                //unix 时间戳,单位是秒,输出的时候请注意格式化
                System.out.println("最后一条数据到达时刻: " + endPrg);
            }
        }
    }
}