本文介绍Linkedmall分销消息在多种SDK下的接入方式
JAVA SDK
环境准备
安装依赖库
在pom.xml中添加如下依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
配置准备
下载SSL根证书
在项目src/resource文件下新建kafka.properties文件,并填写以下内容。
说明bootstrap.servers、group.id、sasl.username、sasl.password等参数请登录LinkedMall开放平台自助获取。
## SSL接入点
bootstrap.servers=xxxx
## Group
group.id=xxxx
## SASL用户名
sasl.username=12345
## SASL密码
sasl.password=12345
##SSL根证书,请指向步骤1中下载的根证书文件绝对路径。
ssl.truststore.location=/xxxx/only.4096.client.truststore.jks
创建配置文件加载程序
import java.util.Properties;
public class KafkaConfigurer {
private static volatile Properties properties;
public static Properties getKafkaProperties() {
if (properties == null) {
synchronized (KafkaConfigurer.class) {
if (properties == null) {
//获取配置文件kafka.properties的内容。
Properties kafkaProperties = new Properties();
try {
kafkaProperties.load(KafkaConfigurer.class.getClassLoader().getResourceAsStream("kafka.properties"));
} catch (Exception e) {
//没加载到文件,程序要考虑退出。
e.printStackTrace();
}
properties = kafkaProperties;
}
}
}
return properties;
}
}
消息消费
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class LinkedmallConsumerExample {
private static final String JAAS_CONFIG_TEMPLATE = "org.apache.kafka.common.security.plain.PlainLoginModule required" +
" username=\"%s\"" +
" password=\"%s\";";
public static void main(String[] args) throws IOException {
//加载kafka.properties
Properties kafkaProperties = KafkaConfigurer.getKafkaProperties();
Properties props = new Properties();
//设置接入点及鉴权相关配置。
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
String jaasConfig = String.format(JAAS_CONFIG_TEMPLATE, kafkaProperties.get("sasl.username"), kafkaProperties.get("sasl.password"));
props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
//设置group
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
//两次Poll之间的最大允许间隔。
//消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Group移除并触发Rebalance,默认30s。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
//每次poll的消息最大数量。
//不宜设置过大,需要考虑消费端消费速率,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿。
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
//设置单次拉取的字节数,推荐设置为每次poll的消息最大数量*1024(Linkedmall下发消息约1K),过大可能导致限流。
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
//消息的反序列化方式
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//建议关闭自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//构造消息对象,也即生成一个消费实例。
KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);
//设置订阅的topic
List<String> topics = Arrays.stream(kafkaProperties.getProperty("topics").split(",")).collect(Collectors.toList());
consumer.subscribe(topics);
//用于异步处理消息的线程池。
ExecutorService executorService = new ThreadPoolExecutor(0,
Math.max(1, Runtime.getRuntime().availableProcessors() - 1),
60L, TimeUnit.SECONDS,
new SynchronousQueue<>(), Executors.defaultThreadFactory());
//循环消费消息。
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(1, ChronoUnit.SECONDS));
System.out.printf("Record pulled: %d\n", records.count());
// 按partition分别串行处理,保证顺序性;
Collection<Callable<Void>> tasks = records.partitions().stream()
// 获取单个分区内消息
.map(records::records)
// 为每个分区创建分区内消息串行消费的逻辑
.map(partitionRecords -> (Callable<Void>) () -> {
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.printf("Consume topic:%s partition:%d offset:%d%n", record.topic(), record.partition(), record.offset());
consume(record);
}
return null;
})
.collect(Collectors.toList());
// 等待所有分区消息消费完成,超时时间25s;等待时间应小于SESSION_TIMEOUT_MS_CONFIG,否则会触发rebalance
List<Future<Void>> result = executorService.invokeAll(tasks, 25, TimeUnit.SECONDS);
for (Future<Void> future : result) {
future.get();
}
// 全部消费成功,手动提交位点
consumer.commitSync();
} catch (Exception e) {
// 消费失败,记录失败原因
try {
Thread.sleep(1000);
} catch (Throwable ignore) {
}
e.printStackTrace();
}
}
}
private static void consume(ConsumerRecord<String, String> record) {
int retryTime = 10;
Exception finalException = null;
// 手动捕获异常重试
for (int i = 0; i < retryTime; i++) {
try {
doConsume(record);
return;
} catch (Exception e) {
finalException = e;
}
}
// 记录异常原因
finalException.printStackTrace();
}
private static void doConsume(ConsumerRecord<String, String> record) {
// 消息业务处理逻辑
}
}
注意事项
1. 消费重试
kafka客户端自身并不提供消费重试机制,建议捕获异常后自行进行重试,或通过spring-kafka等组件实现重试机制。
注意:注意区分消息失败原因是否是可重试的,对不可重试的消息(如因非预期异常导致的失败)不做阻塞,通过日志等其他途径记录,事后分析订正;对可重试的消息做最大重试次数等限制,否则无限制地重试可能导致消息进度被阻塞,导致消息堆积等问题。
注意:未提交位点时,客户端重启后会重新拉取上次提交位点开始的消息,因此可能重复消费部分已成功消费的消息,需要进行幂等处理。
2. 顺序消费
Linkedmall消息在发送时保证按分区有序,如您需要按序消费消息,请务必参考示例代码中的消费部分,对每个分区内的消息串行处理
// 拉取消息
ConsumerRecords<String, String> records = consumer.poll(Duration.of(1, ChronoUnit.SECONDS));
// 获取分区信息
Set<TopicPartition> partitions = records.partitions();
for (TopicPartition partition: partitions) {
// 获取单个分区内的消息
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
// 可以使用异步线程串行处理分区内的消息
consumePartition(partitionRecords);
}
消息类型
Topic中会包含多种Event,如商品Topic中包含ProductCreated,SkuEdited等消息,您可以根据需要接受,如果业务上不需要,消息处理时可以直接跳过。
重置消息消费位点
package com.aliyun.neuron.demo.kafka;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.time.Duration;
import java.util.*;
/**
* @author zj-407802
* @date 2024/7/10 17:58
* @desc 设置偏移量
**/
public class LinkedMallConsumerOffsetTest {
private static final String JAAS_CONFIG_TEMPLATE = "org.apache.kafka.common.security.plain.PlainLoginModule required" +
" username=\"%s\"" +
" password=\"%s\";";
private static final String BOOTSTRAP_SERVERS="alikafka-post-cn-uqm3e7bsu003-1.alikafka.aliyuncs.com:9093,alikafka-post-cn-uqm3e7bsu003-2.alikafka.aliyuncs.com:9093,alikafka-post-cn-uqm3e7bsu003-3.alikafka.aliyuncs.com:9093";
private static final String SSL_TRUSTSTORE_LOCATION="/Users/*******/ideaProjets/linkedmall/distributor-kafka-example/src/main/resources/only.4096.client.truststore.jks";
private static final String USER_NAME="u12000*****";
private static final String PASS_WORD="ygBmPv4******";
private static final String GROUP_ID="1200*****-prod";
private static String topic = "1200******-product";
private static Consumer<String, String> consumer;
public static void main(String[] args) throws InterruptedException {
//1.初始化消费者
initConsumer();
//2.添加订阅关系
addSubscribe();
//3.查询分区和位点
Set<TopicPartition> assignmentPartitions = consumer.assignment();
for (TopicPartition assignmentPartition : assignmentPartitions) {
int partition = assignmentPartition.partition();
long position = consumer.position(new TopicPartition(topic, partition));
System.out.println("position:" + partition + ",position:" + position);
}
//4.重置到最早的消费位点
// resetOffsetToBeging(0);
//4.2根据时间戳重置
resetOffsetByTimestamp(1,1718709716000L);
//5.添加订阅关系
addSubscribe();
//6.查询重置位点后 分区和位点
Set<TopicPartition> assignmentPartitions2 = consumer.assignment();
for (TopicPartition assignmentPartition : assignmentPartitions2) {
int partition = assignmentPartition.partition();
long position = consumer.position(new TopicPartition(topic, partition));
System.out.println("position2:" + partition + ",position2:" + position);
}
}
private static void addSubscribe() {
//设置订阅的topic
List<String> topics = Arrays.asList(topic);
consumer.subscribe(topics);
// 等待分区分配
while (consumer.assignment().isEmpty ()) {
consumer.poll(Duration.ofMillis(100));
}
}
private static void initConsumer() {
//加载kafka.properties
Properties props = new Properties();
//设置接入点及鉴权相关配置。
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, SSL_TRUSTSTORE_LOCATION);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
String jaasConfig = String.format(JAAS_CONFIG_TEMPLATE, USER_NAME, PASS_WORD);
props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
//设置group
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
//两次Poll之间的最大允许间隔。
//消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Group移除并触发Rebalance,默认30s。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
//每次poll的消息最大数量。
//不宜设置过大,需要考虑消费端消费速率,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿。
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
//设置单次拉取的字节数,推荐设置为每次poll的消息最大数量*1024(Linkedmall下发消息约1K),过大可能导致限流。
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
//消息的反序列化方式
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//建议关闭自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//构造消息对象,也即生成一个消费实例。
consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);
}
/**
* 重置到最早的消费位点
* partition:分区
* @throws InterruptedException
*/
private static void resetOffsetToBeging(int partition) throws InterruptedException {
//取消订阅关系
consumer.unsubscribe();
HashMap<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
ArrayList<TopicPartition> list = new ArrayList<>();
for (PartitionInfo part : partitionInfos) {
if (part.partition() == partition) {
TopicPartition topicPartition = new TopicPartition(part.topic(), part.partition());
list.add(topicPartition);
offset.put(new TopicPartition(part.topic(), part.partition()), new OffsetAndMetadata(0));
}
}
consumer.assign(list);
//移动到最早offset
consumer.seekToBeginning(list);
//需要停掉已运行的服务再执行commit
consumer.commitSync(offset);
Thread.sleep(5000);
//取消订阅关系
consumer.unsubscribe();
}
/**
* 根据时间戳重置消费位点
*/
private static void resetOffsetByTimestamp(int partition,Long timestampMs) throws InterruptedException {
//取消订阅关系
consumer.unsubscribe();
HashMap<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
ArrayList<TopicPartition> list = new ArrayList<>();
HashMap<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo part : partitionInfos) {
if (part.partition() == partition) {
TopicPartition topicPartition = new TopicPartition(part.topic(), part.partition());
list.add(topicPartition);
map.put(new TopicPartition(part.topic(), part.partition()), timestampMs);
}
}
final Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetAndTimestampMap.entrySet()) {
final TopicPartition key = entry.getKey();
final OffsetAndTimestamp value = entry.getValue();
long position = 0;
if (value != null) {
position = value.offset();
} else {
list.add(key);
position = consumer.position(key);
}
offset.put(key, new OffsetAndMetadata(position));
}
consumer.assign(list);
//移动到最早offset
consumer.seekToBeginning(list);
//需要停掉已运行的服务再执行commit
consumer.commitSync(offset);
Thread.sleep(5000);
//取消订阅关系
consumer.unsubscribe();
}
}
其他语言
其他语言请根据接入点、topic、group等,参考阿里云kafka官方接入demo自助接入:https://github.com/AliwareMQ/aliware-kafka-demos/tree/master
文档内容是否对您有帮助?