当您使用第三方软件、多语言应用、云产品、流式计算框架等通过SDK实时消费日志服务的数据时,SDK消费无法满足日志服务的实现细节及消费者之间的负载均衡、故障转移(Failover)等,您可以通过消费组(ConsumerGroup)消费日志,消费组(ConsumerGroup)消费的实时性较强,通常为秒级。本文为您介绍通过消费组消费数据的操作步骤。
工作流程
一个Logstore中包含多个Shard,通过消费组消费数据就是将Shard分配给一个消费组下面的消费者,分配方式遵循以下原则。
在一个消费组中,一个Shard只会分配到一个消费者。
在一个消费组中,一个消费者可以被分配多个Shard。
新的消费者加入消费组后,这个消费组下面的Shard从属关系会调整,以实现消费的负载均衡,但是仍遵循上述分配原则。
通过消费组消费,程序发生故障时,会默认保存Checkpoint。在程序故障恢复时,能够从断点处继续消费,从而保证数据不会被重复消费。
前提条件
已开通日志服务。更多信息,请参见开通日志服务。
已创建RAM用户并完成授权。具体操作,请参见创建RAM用户并完成授权。
已配置环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见在Linux、macOS和Windows系统配置环境变量。
阿里云账号的AccessKey拥有所有API的访问权限,建议您使用RAM用户的AccessKey进行API访问或日常运维。
强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
已安装SDK开发环境。具体操作,请参见SDK参考概述。
基本概念
概念 | 说明 |
概念 | 说明 |
消费组 | 日志服务支持通过消费组消费数据。一个消费组由多个消费者构成,同一个消费组下面的消费者共同消费一个Logstore中的数据,各个消费者不会重复消费数据。 每个Logstore中,最多创建30个消费组。 |
消费者 | 消费组的构成单元,实际承担消费任务。 同一个消费组中的消费者名称必须不同。 |
Logstore | 数据采集、存储和查询单元。更多信息,请参见日志库(Logstore)。 |
Shard | 用于控制Logstore的读写能力,数据必定保存在某一个Shard中。更多信息,请参见分区(Shard)。 |
Checkpoint | 消费位点,是程序消费到的最新位置。程序重启后,可以通过Checkpoint恢复消费进度。 |
步骤一:创建消费组
API创建消费组,请参见CreateConsumerGroup - 创建消费组。
查询消费组是否创建成功,请参见ListConsumerGroup - 查询消费组。
管理消费组的代码示例,请参见使用Java SDK管理消费组、使用Python SDK管理消费组。
CLI创建消费组,请参见create_consumer_group。
查询消费组是否创建成功,请参见list_consumer_group。
步骤二:消费数据
您可以通过Java、C++、Python及Go SDK实现消费组消费数据。此处,以Java SDK为例。
消费原理
消费组SDK的消费者在首次启动时,当消费组不存在时会创建消费组。起始消费位点是指创建消费组时的数据起始消费位点,该消费位点仅在第一次创建时有效。后续重启消费者时,消费者会从上次服务端保存的消费位点处继续消费。以本示例为例:
LogHubConfig.ConsumePosition.BEGIN_CURSOR
:消费组从头开始消费日志,起始消费位点为Logstore中的第一条日志。LogHubConfig.ConsumePosition.END_CURSOR
:此消费位点记录Logstore日志的最后一条日志之后。
添加Maven依赖。
在Java项目的根目录下,打开pom.xml文件,添加以下代码:
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>loghub-client-lib</artifactId> <version>0.6.47</version> </dependency>
创建消费者逻辑代码
SampleLogHubProcessor.java
。import com.aliyun.openservices.log.common.FastLog; import com.aliyun.openservices.log.common.FastLogContent; import com.aliyun.openservices.log.common.FastLogGroup; import com.aliyun.openservices.log.common.FastLogTag; import com.aliyun.openservices.log.common.LogGroupData; import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker; import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException; import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor; import java.util.List; public class SampleLogHubProcessor implements ILogHubProcessor { private int shardId; // 记录上次持久化Checkpoint的时间。 private long mLastSaveTime = 0; // initialize 方法会在 processor 对象初始化时被调用一次 public void initialize(int shardId) { this.shardId = shardId; } // 消费数据的主逻辑,消费时的所有异常都需要处理,不能直接抛出。 public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) { // 打印已获取的数据。 for (LogGroupData logGroup : logGroups) { FastLogGroup fastLogGroup = logGroup.GetFastLogGroup(); System.out.println("Tags"); for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) { FastLogTag logTag = fastLogGroup.getLogTags(i); System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue()); } for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) { FastLog log = fastLogGroup.getLogs(i); System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount()); for (int j = 0; j < log.getContentsCount(); ++j) { FastLogContent content = log.getContents(j); System.out.println(content.getKey() + "\t:\t" + content.getValue()); } } } long curTime = System.currentTimeMillis(); // 每隔30秒,写一次Checkpoint到服务端。如果30秒内发生Worker异常终止,新启动的Worker会从上一个Checkpoint获取消费数据,可能存在少量的重复数据。 try { if (curTime - mLastSaveTime > 30 * 1000) { // 参数为true表示立即手动将Checkpoint更新到服务端。此外,默认每60秒会自动将内存中缓存的Checkpoint更新到服务端。 checkPointTracker.saveCheckPoint(true); mLastSaveTime = curTime; } else { // 参数为false表示将Checkpoint缓存在本地,可被自动更新Checkpoint机制更新到服务端。 checkPointTracker.saveCheckPoint(false); } } catch (LogHubCheckPointException e) { e.printStackTrace(); } return null; } // 当Worker退出时,会调用该函数,您可以在此处执行清理工作。 public void shutdown(ILogHubCheckPointTracker checkPointTracker) { // 将Checkpoint立即保存到服务端。 try { checkPointTracker.saveCheckPoint(true); } catch (LogHubCheckPointException e) { e.printStackTrace(); } } }
创建消费者实体
SampleLogHubProcessorFactory.java
。class SampleLogHubProcessorFactory implements ILogHubProcessorFactory { public ILogHubProcessor generatorProcessor() { // 生成一个消费实例。注意:每次调用 generatorProcessor 方法,都应该返回一个新的 SampleLogHubProcessor 对象。 return new SampleLogHubProcessor(); } }
创建Main.java文件。创建一个消费者并启动一个消费者线程,该消费者会从指定的Logstore中消费数据。
import com.aliyun.openservices.loghub.client.ClientWorker; import com.aliyun.openservices.loghub.client.config.LogHubConfig; import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException; public class Main { // 日志服务的服务接入点,请您根据实际情况填写。 private static String Endpoint = "cn-hangzhou.log.aliyuncs.com"; // 日志服务项目名称,请您根据实际情况填写。请从已创建项目中获取项目名称。 private static String Project = "ali-cn-hangzhou-sls-admin"; // 日志库名称,请您根据实际情况填写。请从已创建日志库中获取日志库名称。 private static String Logstore = "sls_operation_log"; // 消费组名称,请您根据实际情况填写。您无需提前创建,该程序运行时会自动创建该消费组。 private static String ConsumerGroup = "consumerGroupX"; // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。。 private static String AccessKeyId= System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException { // consumer_1是消费者名称,同一个消费组下面的消费者名称必须不同。不同消费者在多台机器上启动多个进程,均衡消费一个Logstore时,消费者名称可以使用机器IP地址来区分。 // maxFetchLogGroupSize用于设置每次从服务端获取的LogGroup最大数目,使用默认值即可。您可以使用config.setMaxFetchLogGroupSize(100);调整,取值范围为(0,1000]。 LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR,1000); ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config); Thread thread = new Thread(worker); // Thread运行之后,ClientWorker会自动运行,ClientWorker扩展了Runnable接口。 thread.start(); Thread.sleep(60 * 60 * 1000); // 调用Worker的Shutdown函数,退出消费实例,关联的线程也会自动停止。 worker.shutdown(); // ClientWorker运行过程中会生成多个异步的任务。Shutdown完成后,请等待还在执行的任务安全退出。建议设置sleep为30秒。 Thread.sleep(30 * 1000); } }
运行Main.java。
以模拟消费Nginx日志为例,打印日志如下:
: GET request_uri : /request/path-3/file-7 status : 200 body_bytes_sent : 3820 host : www.example.com request_time : 43 request_length : 1987 http_user_agent : Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36 http_referer : www.example.com http_x_forwarded_for : 192.168.10.196 upstream_response_time : 0.02 -------- Log: 158, time: 1635629778, GetContentCount: 14 ...... category : null source : 127.0.0.1 topic : nginx_access_log machineUUID : null Tags __receive_time__ : 1635629815 -------- Log: 0, time: 1635629788, GetContentCount: 14 ...... category : null source : 127.0.0.1 topic : nginx_access_log machineUUID : null Tags __receive_time__ : 1635629877 -------- ......
您可以通过Java、C++、Python及Go SDK实现消费组消费数据。此处,以Java SDK为例。
消费原理
消费组SDK的消费者在首次启动时,当消费组不存在时会创建消费组。起始消费位点是指创建消费组时的数据起始消费位点,该消费位点仅在第一次创建时有效。后续重启消费者时,消费者会从上次服务端保存的消费位点处继续消费。以本示例为例:
LogHubConfig.ConsumePosition.BEGIN_CURSOR
:消费组从头开始消费日志,起始消费位点为Logstore中的第一条日志。LogHubConfig.ConsumePosition.END_CURSOR
:此消费位点记录Logstore日志的最后一条日志之后。
添加Maven依赖。
在Java项目的根目录下,打开pom.xml文件,添加以下代码:
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>aliyun-log</artifactId> <version>0.6.99</version> </dependency>
创建SPLLogHubProcessor.java文件。
import com.aliyun.openservices.log.common.FastLog; import com.aliyun.openservices.log.common.FastLogContent; import com.aliyun.openservices.log.common.FastLogGroup; import com.aliyun.openservices.log.common.FastLogTag; import com.aliyun.openservices.log.common.LogGroupData; import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker; import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException; import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor; import java.util.List; public class SPLLogHubProcessor implements ILogHubProcessor { private int shardId; // 记录上次持久化Checkpoint的时间。 private long mLastSaveTime = 0; // initialize 方法会在 processor 对象初始化时被调用一次 public void initialize(int shardId) { this.shardId = shardId; } // 消费数据的主逻辑,消费时的所有异常都需要处理,不能直接抛出。 public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) { // 打印已获取的数据。 for (LogGroupData logGroup : logGroups) { FastLogGroup fastLogGroup = logGroup.GetFastLogGroup(); System.out.println("Tags"); for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) { FastLogTag logTag = fastLogGroup.getLogTags(i); System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue()); } for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) { FastLog log = fastLogGroup.getLogs(i); System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount()); for (int j = 0; j < log.getContentsCount(); ++j) { FastLogContent content = log.getContents(j); System.out.println(content.getKey() + "\t:\t" + content.getValue()); } } } long curTime = System.currentTimeMillis(); // 每隔30秒,写一次Checkpoint到服务端。如果30秒内发生Worker异常终止,新启动的Worker会从上一个Checkpoint获取消费数据,可能存在少量的重复数据。 try { if (curTime - mLastSaveTime > 30 * 1000) { // 参数为true表示立即手动将Checkpoint更新到服务端。此外,默认每60秒会自动将内存中缓存的Checkpoint更新到服务端。 checkPointTracker.saveCheckPoint(true); mLastSaveTime = curTime; } else { // 参数为false表示将Checkpoint缓存在本地,可被自动更新Checkpoint机制更新到服务端。 checkPointTracker.saveCheckPoint(false); } } catch (LogHubCheckPointException e) { e.printStackTrace(); } return null; } // 当Worker退出时,会调用该函数,您可以在此处执行清理工作。 public void shutdown(ILogHubCheckPointTracker checkPointTracker) { // 将Checkpoint立即保存到服务端。 try { checkPointTracker.saveCheckPoint(true); } catch (LogHubCheckPointException e) { e.printStackTrace(); } } }
创建 SPLLogHubProcessorFactory.java 文件。
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor; import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory; class SPLLogHubProcessorFactory implements ILogHubProcessorFactory { public ILogHubProcessor generatorProcessor() { // 生成一个消费实例。注意:每次调用 generatorProcessor 方法,都应该返回一个新的 SPLLogHubProcessor 对象。 return new SPLLogHubProcessor(); } }
创建Main.java文件。创建一个消费组并启动一个消费者线程,该消费者会从指定的Logstore中消费数据。管理消费组的代码示例,请参见使用Java SDK管理消费组、使用Python SDK管理消费组。
import com.aliyun.openservices.loghub.client.ClientWorker; import com.aliyun.openservices.loghub.client.config.LogHubConfig; import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException; public class SPLConsumer { // 日志服务的服务接入点,请您根据实际情况填写。 private static String Endpoint = "cn-hangzhou.log.aliyuncs.com"; // 日志服务项目名称,请您根据实际情况填写。请从已创建项目中获取项目名称。 private static String Project = "your_project"; // 日志库名称,请您根据实际情况填写。请从已创建日志库中获取日志库名称。 private static String Logstore = "your_logstore"; // 消费组名称,请您根据实际情况填写。您无需提前创建,该程序运行时会自动创建该消费组。 private static String ConsumerGroup = "consumerGroupX"; // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。。 private static String AccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException { // consumer_1是消费者名称,同一个消费组下面的消费者名称必须不同。不同消费者在多台机器上启动多个进程,均衡消费一个Logstore时,消费者名称可以使用机器IP地址来区分。 // maxFetchLogGroupSize用于设置每次从服务端获取的LogGroup最大数目,使用默认值即可。您可以使用config.setMaxFetchLogGroupSize(100);调整,取值范围为(0,1000]。 LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR, 1000); // setQuery可以设置消费过程中的SLS SPL语句 config.setQuery("* | where cast(body_bytes_sent as bigint) > 14000"); ClientWorker worker = new ClientWorker(new SPLLogHubProcessorFactory(), config); Thread thread = new Thread(worker); // Thread运行之后,ClientWorker会自动运行,ClientWorker扩展了Runnable接口。 thread.start(); Thread.sleep(60 * 60 * 1000); // 调用Worker的Shutdown函数,退出消费实例,关联的线程也会自动停止。 worker.shutdown(); // ClientWorker运行过程中会生成多个异步的任务。Shutdown完成后,请等待还在执行的任务安全退出。建议设置sleep为30秒。 Thread.sleep(30 * 1000); } }
运行Main.java。
以模拟消费Nginx日志为例,打印日志如下:
: GET request_uri : /request/path-3/file-7 status : 200 body_bytes_sent : 3820 host : www.example.com request_time : 43 request_length : 1987 http_user_agent : Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36 http_referer : www.example.com http_x_forwarded_for : 192.168.10.196 upstream_response_time : 0.02 -------- Log: 158, time: 1635629778, GetContentCount: 14 ...... category : null source : 127.0.0.1 topic : nginx_access_log machineUUID : null Tags __receive_time__ : 1635629815 -------- Log: 0, time: 1635629788, GetContentCount: 14 ...... category : null source : 127.0.0.1 topic : nginx_access_log machineUUID : null Tags __receive_time__ : 1635629877 -------- ......
步骤三:查看消费组状态
登录日志服务控制台。
在Project列表区域,单击目标Project。
在
页签中,单击目标Logstore左侧的图标,然后单击数据消费左侧的
图标。
在消费组列表中,单击目标消费组。
在Consumer Group状态页面,查看每个Shard消费数据的进度。
此处以Java SDK为例。运行ConsumerGroupTest.java,查看每个Shard消费数据的进度。
import java.util.List;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts.CursorMode;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
import com.aliyun.openservices.log.exception.LogException;
public class ConsumerGroupTest {
static String endpoint = "";
static String project = "";
static String logstore = "";
static String accesskeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
static String accesskey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
public static void main(String[] args) throws LogException {
Client client = new Client(endpoint, accesskeyId, accesskey);
// 获取Logstore下的所有消费组。如果消费组不存在,则长度为0。
List<ConsumerGroup> consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
for(ConsumerGroup c: consumerGroups){
// 打印消费组的属性,包括名称、心跳超时时间、是否按序消费。
System.out.println("名称: " + c.getConsumerGroupName());
System.out.println("心跳超时时间: " + c.getTimeout());
System.out.println("按序消费: " + c.isInOrder());
for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
System.out.println("shard: " + cp.getShard());
// 该时间精确到微秒,类型为长整型。
System.out.println("最后一次更新消费进度的时间: " + cp.getUpdateTime());
System.out.println("消费者名称: " + cp.getConsumer());
String consumerPrg = "";
if(cp.getCheckPoint().isEmpty())
consumerPrg = "尚未开始消费";
else{
// Unix时间戳,单位是秒,输出时请注意格式化。
try{
int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
consumerPrg = "" + prg;
}
catch(LogException e){
if(e.GetErrorCode() == "InvalidCursor")
consumerPrg = "非法,前一次消费时刻已经超出了Logstore中数据的生命周期";
else{
// internal server error
throw e;
}
}
}
System.out.println("消费进度: " + consumerPrg);
String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
int endPrg = 0;
try{
endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
}
catch(LogException e){
// do nothing
}
//Unix时间戳,单位:秒。输出时,请注意格式化。
System.out.println("最后一条数据到达时刻: " + endPrg);
}
}
}
}
返回以下结果:
名称: etl-6cac01c571d5a4b933649c04a7ba215b
心跳超时时间: 60
按序消费: false
shard: 0
最后一次更新消费进度的时间: 1639555453575211
消费者名称: etl-356464787983a3d17086a9797e3d5f0e6959b066-256521
消费进度: 1639555453
最后一条数据到达时刻: 1639555453
shard: 1
最后一次更新消费进度的时间: 1639555392071328
消费者名称: etl-356464787983a3d17086a9797e3d5f0e6959b066-256521
消费进度: 1639555391
最后一条数据到达时刻: 1639555391
名称: etl-2bd3fdfdd63595d56b1ac24393bf5991
心跳超时时间: 60
按序消费: false
shard: 0
最后一次更新消费进度的时间: 1639555453256773
消费者名称: etl-532719dd2f198b3878ee3c6dfc80aeffb39ee48b-061390
消费进度: 1639555453
最后一条数据到达时刻: 1639555453
shard: 1
最后一次更新消费进度的时间: 1639555392066234
消费者名称: etl-532719dd2f198b3878ee3c6dfc80aeffb39ee48b-061390
消费进度: 1639555391
最后一条数据到达时刻: 1639555391
名称: consumerGroupX
心跳超时时间: 60
按序消费: false
shard: 0
最后一次更新消费进度的时间: 1639555434142879
消费者名称: consumer_1
消费进度: 1635615029
最后一条数据到达时刻: 1639555453
shard: 1
最后一次更新消费进度的时间: 1639555437976929
消费者名称: consumer_1
消费进度: 1635616802
最后一条数据到达时刻: 1639555391
RAM用户授权
使用RAM用户操作时,需授予RAM用户操作消费组的相关权限。具体操作,请参见创建RAM用户及授权。
授权的Action如下表所示。
动作(Action) | 说明 | 授权策略中的资源描述方式(Resource) |
动作(Action) | 说明 | 授权策略中的资源描述方式(Resource) |
log:GetCursorOrData(GetCursor - 通过时间查询Cursor) | 根据时间获取游标(cursor)。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName} |
log:CreateConsumerGroup(CreateConsumerGroup - 创建消费组) | 在指定的Logstore上创建一个消费组。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:ListConsumerGroup(ListConsumerGroup - 查询消费组) | 查询指定Logstore的所有消费组。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/* |
log:ConsumerGroupUpdateCheckPoint(ConsumerGroupUpdateCheckPoint - 更新消费进度) | 更新指定消费组的某个Shard的Checkpoint。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:ConsumerGroupHeartBeat(ConsumerGroupHeartBeat - 消费者发送心跳到服务端) | 为指定消费者发送心跳到服务端。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:UpdateConsumerGroup(UpdateConsumerGroup - 更新消费者组) | 修改指定消费组属性。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:GetConsumerGroupCheckPoint(GetCheckPoint - 获取指定消费组的消费点) | 获取指定消费组消费的某个或者所有Shard的Checkpoint。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
例如,消费组的相关资源信息如下所示,您要通过RAM用户操作该消费组,则需为RAM用户授予以下权限。
Project所属的阿里云账号:174649****602745。
Project所在地域ID:cn-hangzhou。
Project名称:project-test。
Logstore名称:logstore-test。
消费组名称:consumergroup-test。
{
"Version": "1",
"Statement": [
{
"Effect": "Allow",
"Action": [
"log:GetCursorOrData"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test"
},
{
"Effect": "Allow",
"Action": [
"log:CreateConsumerGroup",
"log:ListConsumerGroup"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/*"
},
{
"Effect": "Allow",
"Action": [
"log:ConsumerGroupUpdateCheckPoint",
"log:ConsumerGroupHeartBeat",
"log:UpdateConsumerGroup",
"log:GetConsumerGroupCheckPoint"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/consumergroup-test"
}
]
}
相关操作
异常诊断
建议您为消费者程序配置Log4j,将消费组内部遇到的异常信息打印出来,便于定位。log4j.properties典型配置:
log4j.rootLogger = info,stdout log4j.appender.stdout = org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target = System.out log4j.appender.stdout.layout = org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
配置Log4j后,执行消费者程序可以看到类似如下异常信息:
[WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(LogHubConsumer.java:159) com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]
通过消费组消费从某个时间开始的数据
// consumerStartTimeInSeconds表示消费这个时间点之后的数据。 public LogHubConfig(String consumerGroupName, String consumerName, String loghubEndPoint, String project, String logStore, String accessId, String accessKey, int consumerStartTimeInSeconds); // position是个枚举变量,LogHubConfig.ConsumePosition.BEGIN_CURSOR表示从最老的数据开始消费,LogHubConfig.ConsumePosition.END_CURSOR表示从最新的数据开始消费。 public LogHubConfig(String consumerGroupName, String consumerName, String loghubEndPoint, String project, String logStore, String accessId, String accessKey, ConsumePosition position);
按照消费需求,请您使用不同的构造方法。
当服务端已保存Checkpoint,则开始消费位置以服务端保存的Checkpoint为准。
日志服务消费数据时,默认优先使用Checkpoint作为消费点。当您指定从固定时间点开始消费数据时,必须保证consumerStartTimeInSeconds时间点落到TTL周期内,否则会造成消费不生效。
重置Checkpoint
public static void updateCheckpoint() throws Exception { Client client = new Client(host, accessId, accessKey); // 这里 timestamp 需要是以秒为单位的 unix timestamp,如果您的时间戳以毫秒为单位,需要如下所示除以1000 long timestamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000; ListShardResponse response = client.ListShard(new ListShardRequest(project, logStore)); for (Shard shard : response.GetShards()) { int shardId = shard.GetShardId(); String cursor = client.GetCursor(project, logStore, shardId, timestamp).GetCursor(); client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor); } }
相关文档
API
操作
API接口
操作
API接口
创建消费组
查询消费组
删除消费组
更新消费组
发送消费者心跳
查询消费组的Checkpoint
更新消费组的Checkpoint
SDK
语言
文档链接
语言
文档链接
Java
Python
CLI
操作
命令行接口
操作
命令行接口
创建消费组
查询消费组
更新消费组
删除消费组
查询消费组的Checkpoint
更新消费组的Checkpoint
- 本页导读 (1)
- 工作流程
- 基本概念
- 步骤一:创建消费组
- 步骤二:消费数据
- 步骤三:查看消费组状态
- RAM用户授权
- 相关操作
- 相关文档