本文介绍Linkedmall分销消息在多种SDK下的接入方式
JAVA SDK
环境准备
安装依赖库
在pom.xml中添加如下依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
配置准备
下载SSL根证书
在项目src/resource文件下新建kafka.properties文件,并填写以下内容。
## SSL接入点,请联系Linkedmall工作人员获取。
bootstrap.servers=xxxx
## Group,请联系Linkedmall工作人员获取
group.id=xxxx
## SASL用户名,请联系Linkedmall工作人员获取
sasl.username=12345
## SASL密码,请联系Linkedmall工作人员获取。
sasl.password=12345
##SSL根证书,请指向步骤1中下载的根证书文件绝对路径。
ssl.truststore.location=/xxxx/kafka.client.truststore.jks
创建配置文件加载程序
import java.util.Properties;
public class KafkaConfigurer {
private static volatile Properties properties;
public static Properties getKafkaProperties() {
if (properties == null) {
synchronized (KafkaConfigurer.class) {
if (properties == null) {
//获取配置文件kafka.properties的内容。
Properties kafkaProperties = new Properties();
try {
kafkaProperties.load(KafkaConfigurer.class.getClassLoader().getResourceAsStream("kafka.properties"));
} catch (Exception e) {
//没加载到文件,程序要考虑退出。
e.printStackTrace();
}
properties = kafkaProperties;
}
}
}
return properties;
}
}
消息消费
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class LinkedmallConsumerExample {
private static final String JAAS_CONFIG_TEMPLATE = "org.apache.kafka.common.security.plain.PlainLoginModule required" +
" username=\"%s\"" +
" password=\"%s\";";
public static void main(String[] args) throws IOException {
//加载kafka.properties
Properties kafkaProperties = KafkaConfigurer.getKafkaProperties();
Properties props = new Properties();
//设置接入点及鉴权相关配置。
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
String jaasConfig = String.format(JAAS_CONFIG_TEMPLATE, kafkaProperties.get("sasl.username"), kafkaProperties.get("sasl.password"));
props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
//设置group
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
//两次Poll之间的最大允许间隔。
//消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Group移除并触发Rebalance,默认30s。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
//每次poll的消息最大数量。
//不宜设置过大,需要考虑消费端消费速率,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿。
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
//设置单次拉取的字节数,推荐设置为每次poll的消息最大数量*1024(Linkedmall下发消息约1K),过大可能导致限流。
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
//消息的反序列化方式
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//建议关闭自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//构造消息对象,也即生成一个消费实例。
KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);
//设置订阅的topic
List<String> topics = Arrays.stream(kafkaProperties.getProperty("topics").split(",")).collect(Collectors.toList());
consumer.subscribe(topics);
//用于异步处理消息的线程池。
ExecutorService executorService = new ThreadPoolExecutor(0,
Math.max(1, Runtime.getRuntime().availableProcessors() - 1),
60L, TimeUnit.SECONDS,
new SynchronousQueue<>(), Executors.defaultThreadFactory());
//循环消费消息。
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(1, ChronoUnit.SECONDS));
System.out.printf("Record pulled: %d\n", records.count());
// 按partition分别串行处理,保证顺序性;
Collection<Callable<Void>> tasks = records.partitions().stream()
// 获取单个分区内消息
.map(records::records)
// 为每个分区创建分区内消息串行消费的逻辑
.map(partitionRecords -> (Callable<Void>) () -> {
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.printf("Consume topic:%s partition:%d offset:%d%n", record.topic(), record.partition(), record.offset());
consume(record);
}
return null;
})
.collect(Collectors.toList());
// 等待所有分区消息消费完成,超时时间25s;等待时间应小于SESSION_TIMEOUT_MS_CONFIG,否则会触发rebalance
List<Future<Void>> result = executorService.invokeAll(tasks, 25, TimeUnit.SECONDS);
for (Future<Void> future : result) {
future.get();
}
// 全部消费成功,手动提交位点
consumer.commitSync();
} catch (Exception e) {
// 消费失败,待下次重新拉取消费,注意幂等处理
try {
Thread.sleep(1000);
} catch (Throwable ignore) {
}
e.printStackTrace();
}
}
}
private static void consume(ConsumerRecord<String, String> record) {
// 消息处理逻辑
}
}
注意事项
1. 消费重试
手动提交位点(推荐)
在关闭自动提交的情况下,只有在客户端显式地调用
consumer.commitSync()
才会将本地位点提交至服务端。因此在本地消费失败的情况下,可以选择不手动提交位点,这样下次调用consumer.poll()
时仍会拉取与上次相同的消息,从而实现消息的消费重试。注意:注意区分消息失败原因是否是可重试的,对不可重试的消息(如因非预期异常导致的失败)不做阻塞,通过日志等其他途径记录;对可重试的消息做最大重试次数等限制,否则无限制地重试可能导致消息进度被阻塞,导致消息堆积等问题。
注意:未提交位点时,每次重试都会拉取上次提交位点开始的消息,因此可能存在已消费成功的消息,需要做好幂等处理。
开启自动提交(不推荐)
在开启自动提交的情况下,客户端会定时向kafka服务端提交已拉取的位点,与本地消费逻辑是否成功无关。因此在由于网络、业务信息延迟等问题导致消息消费失败的情况下,该条消息将被跳过。
2. 顺序消费
Linkedmall消息在发送时保证按分区有序,如您需要按序消费消息,请务必参考示例代码中的消费部分,对每个分区内的消息串行处理
// 拉取消息
ConsumerRecords<String, String> records = consumer.poll(Duration.of(1, ChronoUnit.SECONDS));
// 获取分区信息
Set<TopicPartition> partitions = records.partitions();
for (TopicPartition partition: partitions) {
// 获取单个分区内的消息
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
// 可以使用异步线程串行处理分区内的消息
consumePartition(partitionRecords);
}
消息类型
Topic中会包含多种Event,如商品Topic中包含ProductCreated,SkuEdited等消息,您可以根据需要接受,如果业务上不需要,消息处理时可以直接跳过。
其他语言
其他语言请根据接入点、topic、group等,参考阿里云kafka官方接入demo自助接入:https://github.com/AliwareMQ/aliware-kafka-demos/tree/master
- 本页导读 (0)