方案概览
一个Logstore中包含多个Shard,通过消费组消费数据就是将Shard分配给一个消费组中的消费者,分配方式遵循以下原则。
新的消费者加入消费组后,这个消费组下面的Shard从属关系会调整,以实现消费的负载均衡,但是仍遵循上述分配原则。
基本概念
名词 | 说明 |
消费组 | 日志服务支持通过消费组消费数据。一个消费组由多个消费者构成,同一个消费组中的消费者共同消费一个Logstore中的数据,各个消费者不会重复消费数据。 |
消费者 | 消费组的构成单元,实际承担消费任务。 |
Logstore | 数据采集、存储和查询单元。更多信息,请参见日志库(Logstore)。 |
Shard | 用于控制Logstore的读写能力,数据必定保存在某一个Shard中。更多信息,请参见分区(Shard)。 |
Checkpoint | 消费位点,是程序消费到的最新位置。程序重启后,可以通过Checkpoint恢复消费进度。 说明 通过消费组消费,程序发生故障时,会默认保存Checkpoint。在程序故障恢复时,能够从断点处继续消费,从而保证数据不会被重复消费。 |
步骤一:创建消费组
下面分别介绍通过SDK、API和CLI方式创建消费组。
通过SDK创建消费组
通过API创建消费组
通过CLI创建消费组
创建消费组代码如下所示:
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.exception.LogException;
public class CreateConsumerGroup {
public static void main(String[] args) throws LogException {
String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
String projectName = "ali-test-project";
String logstoreName = "ali-test-logstore";
String host = "https://cn-hangzhou.log.aliyuncs.com";
Client client = new Client(host, accessId, accessKey);
try {
String consumerGroupName = "ali-test-consumergroup2";
System.out.println("ready to create consumergroup");
ConsumerGroup consumerGroup = new ConsumerGroup(consumerGroupName, 300, true);
client.CreateConsumerGroup(projectName, logstoreName, consumerGroup);
System.out.println(String.format("create consumergroup %s success", consumerGroupName));
} catch (LogException e) {
System.out.println("LogException e :" + e.toString());
System.out.println("error code :" + e.GetErrorCode());
System.out.println("error message :" + e.GetErrorMessage());
throw e;
}
}
}
管理消费组的代码示例,请参见使用Java SDK管理消费组、使用Python SDK管理消费组。
步骤二:消费日志
消费原理
消费组SDK的消费者在首次启动时,当消费组不存在时会创建消费组。起始消费位点是指创建消费组时的数据起始消费位点,该消费位点仅在第一次创建时有效。后续重启消费者时,消费者会从上次服务端保存的消费位点处继续消费。以本示例为例:
消费示例
您可以通过Java、C++、Python及Go SDK实现消费组消费数据,此处以Java SDK为例。
添加Maven依赖。
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-client-lib</artifactId>
<version>0.6.50</version>
</dependency>
创建消费逻辑,代码如下所示:
SampleLogHubProcessor.java
import com.aliyun.openservices.log.common.FastLog;
import com.aliyun.openservices.log.common.FastLogContent;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.common.FastLogTag;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import java.util.List;
public class SampleLogHubProcessor implements ILogHubProcessor {
private int shardId;
private long mLastSaveTime = 0;
public void initialize(int shardId) {
this.shardId = shardId;
}
public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
for (LogGroupData logGroup : logGroups) {
FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
System.out.println("Tags");
for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
FastLogTag logTag = fastLogGroup.getLogTags(i);
System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
}
for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
FastLog log = fastLogGroup.getLogs(i);
System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
for (int j = 0; j < log.getContentsCount(); ++j) {
FastLogContent content = log.getContents(j);
System.out.println(content.getKey() + "\t:\t" + content.getValue());
}
}
}
long curTime = System.currentTimeMillis();
try {
if (curTime - mLastSaveTime > 30 * 1000) {
checkPointTracker.saveCheckPoint(true);
mLastSaveTime = curTime;
} else {
checkPointTracker.saveCheckPoint(false);
}
} catch (LogHubCheckPointException e) {
e.printStackTrace();
}
return null;
}
public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
try {
checkPointTracker.saveCheckPoint(true);
} catch (LogHubCheckPointException e) {
e.printStackTrace();
}
}
}
更多示例代码,请参见aliyun-log-consumer-java、Aliyun LOG Go Consumer。
创建消费者实体,代码如下所示:
SampleLogHubProcessorFactory.java
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
class SampleLogHubProcessorFactory implements ILogHubProcessorFactory {
public ILogHubProcessor generatorProcessor() {
return new SampleLogHubProcessor();
}
}
创建一个消费者并启动一个消费者线程,该消费者会从指定的Logstore中消费数据。代码如下所示:
import com.aliyun.openservices.loghub.client.ClientWorker;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
public class Main {
private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
private static String Project = "ali-test-project";
private static String Logstore = "ali-test-logstore";
private static String ConsumerGroup = "ali-test-consumergroup2";
private static String AccessKeyId= System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR,1000);
ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
Thread thread = new Thread(worker);
thread.start();
Thread.sleep(60 * 60 * 1000);
worker.shutdown();
Thread.sleep(30 * 1000);
}
}
运行Main.java
。
: GET
request_uri : /request/path-3/file-7
status : 200
body_bytes_sent : 3820
host : www.example.com
request_time : 43
request_length : 1987
http_user_agent : Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
http_referer : www.example.com
http_x_forwarded_for : 192.168.10.196
upstream_response_time : 0.02
--------
Log: 158, time: 1635629778, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
Tags
__receive_time__ : 1635629815
--------
Log: 0, time: 1635629788, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
Tags
__receive_time__ : 1635629877
--------
......
添加Maven依赖。
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-client-lib</artifactId>
<version>0.6.47</version>
</dependency>
创建消费逻辑,代码如下所示:
import com.aliyun.openservices.log.common.FastLog;
import com.aliyun.openservices.log.common.FastLogContent;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.common.FastLogTag;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import java.util.List;
public class SPLLogHubProcessor implements ILogHubProcessor {
private int shardId;
private long mLastSaveTime = 0;
public void initialize(int shardId) {
this.shardId = shardId;
}
public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
for (LogGroupData logGroup : logGroups) {
FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
System.out.println("Tags");
for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
FastLogTag logTag = fastLogGroup.getLogTags(i);
System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
}
for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
FastLog log = fastLogGroup.getLogs(i);
System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
for (int j = 0; j < log.getContentsCount(); ++j) {
FastLogContent content = log.getContents(j);
System.out.println(content.getKey() + "\t:\t" + content.getValue());
}
}
}
long curTime = System.currentTimeMillis();
try {
if (curTime - mLastSaveTime > 30 * 1000) {
checkPointTracker.saveCheckPoint(true);
mLastSaveTime = curTime;
} else {
checkPointTracker.saveCheckPoint(false);
}
} catch (LogHubCheckPointException e) {
e.printStackTrace();
}
return null;
}
public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
try {
checkPointTracker.saveCheckPoint(true);
} catch (LogHubCheckPointException e) {
e.printStackTrace();
}
}
}
创建消费者实体,代码如下所示:
SPLLogHubProcessorFactory.java
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
class SPLLogHubProcessorFactory implements ILogHubProcessorFactory {
public ILogHubProcessor generatorProcessor() {
return new SPLLogHubProcessor();
}
}
创建一个消费者并启动一个消费者线程,该消费者会从指定的Logstore中消费数据。代码如下所示:
import com.aliyun.openservices.loghub.client.ClientWorker;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
public class Main {
private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
private static String Project = "ali-test-project";
private static String Logstore = "ali-test-logstore";
private static String ConsumerGroup = "ali-test-consumergroup2";
private static String AccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR, 1000);
config.setQuery("* | where cast(body_bytes_sent as bigint) > 14000");
ClientWorker worker = new ClientWorker(new SPLLogHubProcessorFactory(), config);
Thread thread = new Thread(worker);
thread.start();
Thread.sleep(60 * 60 * 1000);
worker.shutdown();
Thread.sleep(30 * 1000);
}
}
运行Main.java
。
: GET
request_uri : /request/path-3/file-7
status : 200
body_bytes_sent : 3820
host : www.example.com
request_time : 43
request_length : 1987
http_user_agent : Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
http_referer : www.example.com
http_x_forwarded_for : 192.168.10.196
upstream_response_time : 0.02
--------
Log: 158, time: 1635629778, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
Tags
__receive_time__ : 1635629815
--------
Log: 0, time: 1635629788, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
Tags
__receive_time__ : 1635629877
--------
......
步骤三:查看消费组状态
下面介绍两种查看消费组状态方式:
通过Java SDK查看消费组状态
在控制台查看消费组状态
查看每个Shard消费数据的进度。代码如下所示:
import java.util.List;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts.CursorMode;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
import com.aliyun.openservices.log.exception.LogException;
public class ConsumerGroupTest {
static String endpoint = "cn-hangzhou.log.aliyuncs.com";
static String project = "ali-test-project";
static String logstore = "ali-test-logstore";
static String accesskeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
static String accesskey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
public static void main(String[] args) throws LogException {
Client client = new Client(endpoint, accesskeyId, accesskey);
List<ConsumerGroup> consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
for(ConsumerGroup c: consumerGroups){
System.out.println("名称: " + c.getConsumerGroupName());
System.out.println("心跳超时时间: " + c.getTimeout());
System.out.println("按序消费: " + c.isInOrder());
for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
System.out.println("shard: " + cp.getShard());
System.out.println("最后一次更新消费进度的时间: " + cp.getUpdateTime());
System.out.println("消费者名称: " + cp.getConsumer());
String consumerPrg = "";
if(cp.getCheckPoint().isEmpty())
consumerPrg = "尚未开始消费";
else{
try{
int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
consumerPrg = "" + prg;
}
catch(LogException e){
if(e.GetErrorCode() == "InvalidCursor")
consumerPrg = "非法,前一次消费时刻已经超出了Logstore中数据的生命周期";
else{
throw e;
}
}
}
System.out.println("消费进度: " + consumerPrg);
String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
int endPrg = 0;
try{
endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
}
catch(LogException e){
}
System.out.println("最后一条数据到达时刻: " + endPrg);
}
}
}
}
-
名称: ali-test-consumergroup2
心跳超时时间: 60
按序消费: false
shard: 0
最后一次更新消费进度的时间: 0
消费者名称: consumer_1
消费进度: 尚未开始消费
最后一条数据到达时刻: 1729583617
shard: 1
最后一次更新消费进度的时间: 0
消费者名称: consumer_1
消费进度: 尚未开始消费
最后一条数据到达时刻: 1729583738
Process finished with exit code 0
登录日志服务控制台。
在Project列表区域,单击目标Project。

在页签中,单击目标Logstore左侧的
图标,然后单击数据消费左侧的
图标。
在消费组列表中,单击目标消费组。
在Consumer Group状态页面,查看每个Shard消费数据的进度。