接入方式

本文介绍Linkedmall分销消息在多种SDK下的接入方式

JAVA SDK

环境准备

  • 安装1.8或以上版本JDK。具体操作。请参见安装JDK

  • 安装2.5或以上版本Maven。具体操作,请参见安装Maven

  • 安装编译工具。

安装依赖库

在pom.xml中添加如下依赖

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.4.0</version>
</dependency>

配置准备

  1. 下载SSL根证书

  2. 在项目src/resource文件下新建kafka.properties文件,并填写以下内容。

## SSL接入点,请联系Linkedmall工作人员获取。
bootstrap.servers=xxxx
## Group,请联系Linkedmall工作人员获取
group.id=xxxx
## SASL用户名,请联系Linkedmall工作人员获取
sasl.username=12345
## SASL密码,请联系Linkedmall工作人员获取。
sasl.password=12345
##SSL根证书,请指向步骤1中下载的根证书文件绝对路径。
ssl.truststore.location=/xxxx/only.4096.client.truststore.jks
  1. 创建配置文件加载程序

import java.util.Properties;

public class KafkaConfigurer {
    private static volatile Properties properties;

    public static Properties getKafkaProperties() {
        if (properties == null) {
            synchronized (KafkaConfigurer.class) {
                if (properties == null) {
                    //获取配置文件kafka.properties的内容。
                    Properties kafkaProperties = new Properties();
                    try {
                        kafkaProperties.load(KafkaConfigurer.class.getClassLoader().getResourceAsStream("kafka.properties"));
                    } catch (Exception e) {
                        //没加载到文件,程序要考虑退出。
                        e.printStackTrace();
                    }
                    properties = kafkaProperties;
                }
            }
        }
        return properties;
    }
}

消息消费

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;

import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.*;
import java.util.stream.Collectors;


public class LinkedmallConsumerExample {

    private static final String JAAS_CONFIG_TEMPLATE = "org.apache.kafka.common.security.plain.PlainLoginModule required" +
            " username=\"%s\"" +
            " password=\"%s\";";

    public static void main(String[] args) throws IOException {

        //加载kafka.properties
        Properties kafkaProperties = KafkaConfigurer.getKafkaProperties();
        Properties props = new Properties();

        //设置接入点及鉴权相关配置。
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        String jaasConfig = String.format(JAAS_CONFIG_TEMPLATE, kafkaProperties.get("sasl.username"), kafkaProperties.get("sasl.password"));
        props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");

        //设置group
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));

        //两次Poll之间的最大允许间隔。
        //消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Group移除并触发Rebalance,默认30s。
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);

        //每次poll的消息最大数量。
        //不宜设置过大,需要考虑消费端消费速率,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿。
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);

        //设置单次拉取的字节数,推荐设置为每次poll的消息最大数量*1024(Linkedmall下发消息约1K),过大可能导致限流。
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);

        //消息的反序列化方式
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //建议关闭自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);


        //构造消息对象,也即生成一个消费实例。
        KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);

        //设置订阅的topic
        List<String> topics = Arrays.stream(kafkaProperties.getProperty("topics").split(",")).collect(Collectors.toList());
        consumer.subscribe(topics);

        //用于异步处理消息的线程池。
        ExecutorService executorService = new ThreadPoolExecutor(0,
                Math.max(1, Runtime.getRuntime().availableProcessors() - 1),
                60L, TimeUnit.SECONDS,
                new SynchronousQueue<>(), Executors.defaultThreadFactory());
        //循环消费消息。
        while (true) {
            try {
                ConsumerRecords<String, String> records = consumer.poll(Duration.of(1, ChronoUnit.SECONDS));
                System.out.printf("Record pulled: %d\n", records.count());

                // 按partition分别串行处理,保证顺序性;
                Collection<Callable<Void>> tasks = records.partitions().stream()
                        // 获取单个分区内消息
                        .map(records::records)
                        // 为每个分区创建分区内消息串行消费的逻辑
                        .map(partitionRecords -> (Callable<Void>) () -> {
                            for (ConsumerRecord<String, String> record : partitionRecords) {
                                System.out.printf("Consume topic:%s partition:%d offset:%d%n", record.topic(), record.partition(), record.offset());

                                consume(record);
                            }
                            return null;
                        })
                        .collect(Collectors.toList());

                // 等待所有分区消息消费完成,超时时间25s;等待时间应小于SESSION_TIMEOUT_MS_CONFIG,否则会触发rebalance
                List<Future<Void>> result = executorService.invokeAll(tasks, 25, TimeUnit.SECONDS);
                for (Future<Void> future : result) {
                    future.get();
                }

                // 全部消费成功,手动提交位点
                consumer.commitSync();
            } catch (Exception e) {
                // 消费失败,记录失败原因
                try {
                    Thread.sleep(1000);
                } catch (Throwable ignore) {
                }
                e.printStackTrace();
            }
        }
    }

    private static void consume(ConsumerRecord<String, String> record) {
        int retryTime = 10;
        Exception finalException = null;
        // 手动捕获异常重试
        for (int i = 0; i < retryTime; i++) {
            try {
                doConsume(record);
                return;
            } catch (Exception e) {
                finalException = e;
            }
        }
        // 记录异常原因
        finalException.printStackTrace();
    }

    private static void doConsume(ConsumerRecord<String, String> record) {
        // 消息业务处理逻辑
    }
}

注意事项

1. 消费重试

kafka客户端自身并不提供消费重试机制,建议捕获异常后自行进行重试,或通过spring-kafka等组件实现重试机制。

  • 注意:注意区分消息失败原因是否是可重试的,对不可重试的消息(如因非预期异常导致的失败)不做阻塞,通过日志等其他途径记录,事后分析订正;对可重试的消息做最大重试次数等限制,否则无限制地重试可能导致消息进度被阻塞,导致消息堆积等问题。

  • 注意:未提交位点时,客户端重启后会重新拉取上次提交位点开始的消息,因此可能重复消费部分已成功消费的消息,需要进行幂等处理。

2. 顺序消费

Linkedmall消息在发送时保证按分区有序,如您需要按序消费消息,请务必参考示例代码中的消费部分,对每个分区内的消息串行处理

// 拉取消息
ConsumerRecords<String, String> records = consumer.poll(Duration.of(1, ChronoUnit.SECONDS));

// 获取分区信息
Set<TopicPartition> partitions = records.partitions();

for (TopicPartition partition: partitions) {
  // 获取单个分区内的消息
  List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
  // 可以使用异步线程串行处理分区内的消息
  consumePartition(partitionRecords);
}
  1. 消息类型

    Topic中会包含多种Event,如商品Topic中包含ProductCreated,SkuEdited等消息,您可以根据需要接受,如果业务上不需要,消息处理时可以直接跳过。

重置消息消费位点

package com.aliyun.neuron.demo.kafka;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;

import java.time.Duration;
import java.util.*;

/**
 * @author zj-407802
 * @date 2024/7/10 17:58
 * @desc 设置偏移量
 **/
public class LinkedMallConsumerOffsetTest {
    private static final String JAAS_CONFIG_TEMPLATE = "org.apache.kafka.common.security.plain.PlainLoginModule required" +
            " username=\"%s\"" +
            " password=\"%s\";";

    private static final String BOOTSTRAP_SERVERS="alikafka-post-cn-uqm3e7bsu003-1.alikafka.aliyuncs.com:9093,alikafka-post-cn-uqm3e7bsu003-2.alikafka.aliyuncs.com:9093,alikafka-post-cn-uqm3e7bsu003-3.alikafka.aliyuncs.com:9093";
    private static final String SSL_TRUSTSTORE_LOCATION="/Users/*******/ideaProjets/linkedmall/distributor-kafka-example/src/main/resources/only.4096.client.truststore.jks";
    private static final String USER_NAME="u12000*****";
    private static final String PASS_WORD="ygBmPv4******";
    private static final String GROUP_ID="1200*****-prod";
    private static String topic = "1200******-product";


    private static Consumer<String, String> consumer;

    public static void main(String[] args) throws InterruptedException {
        //1.初始化消费者
        initConsumer();
        //2.添加订阅关系
        addSubscribe();
        //3.查询分区和位点
        Set<TopicPartition> assignmentPartitions = consumer.assignment();
        for (TopicPartition assignmentPartition : assignmentPartitions) {
            int partition = assignmentPartition.partition();
            long position = consumer.position(new TopicPartition(topic, partition));
            System.out.println("position:" + partition + ",position:" + position);
        }

        //4.重置到最早的消费位点
       // resetOffsetToBeging(0);
        //4.2根据时间戳重置
        resetOffsetByTimestamp(1,1718709716000L);

        //5.添加订阅关系
        addSubscribe();
        //6.查询重置位点后 分区和位点
        Set<TopicPartition> assignmentPartitions2 = consumer.assignment();
        for (TopicPartition assignmentPartition : assignmentPartitions2) {
            int partition = assignmentPartition.partition();
            long position = consumer.position(new TopicPartition(topic, partition));
            System.out.println("position2:" + partition + ",position2:" + position);
        }
    }

    private static void addSubscribe() {

        //设置订阅的topic
        List<String> topics = Arrays.asList(topic);
        consumer.subscribe(topics);

        // 等待分区分配
        while (consumer.assignment().isEmpty ()) {
            consumer.poll(Duration.ofMillis(100));
        }
    }

    private static void initConsumer() {
        //加载kafka.properties
        Properties props = new Properties();

        //设置接入点及鉴权相关配置。
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, SSL_TRUSTSTORE_LOCATION);
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        String jaasConfig = String.format(JAAS_CONFIG_TEMPLATE, USER_NAME, PASS_WORD);
        props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");

        //设置group
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);

        //两次Poll之间的最大允许间隔。
        //消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Group移除并触发Rebalance,默认30s。
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);

        //每次poll的消息最大数量。
        //不宜设置过大,需要考虑消费端消费速率,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿。
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);

        //设置单次拉取的字节数,推荐设置为每次poll的消息最大数量*1024(Linkedmall下发消息约1K),过大可能导致限流。
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);

        //消息的反序列化方式
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //建议关闭自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        //构造消息对象,也即生成一个消费实例。
        consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);
    }

    /**
     * 重置到最早的消费位点
     * partition:分区
     * @throws InterruptedException
     */
    private static void resetOffsetToBeging(int partition) throws InterruptedException {
        //取消订阅关系
        consumer.unsubscribe();
        HashMap<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
        ArrayList<TopicPartition> list = new ArrayList<>();
        for (PartitionInfo part : partitionInfos) {
            if (part.partition() == partition) {
                TopicPartition topicPartition = new TopicPartition(part.topic(), part.partition());
                list.add(topicPartition);
                offset.put(new TopicPartition(part.topic(), part.partition()), new OffsetAndMetadata(0));
            }

        }
        consumer.assign(list);
        //移动到最早offset
        consumer.seekToBeginning(list);
        //需要停掉已运行的服务再执行commit
        consumer.commitSync(offset);
        Thread.sleep(5000);
        //取消订阅关系
        consumer.unsubscribe();
    }

    /**
     * 根据时间戳重置消费位点
     */
    private static void resetOffsetByTimestamp(int partition,Long timestampMs) throws InterruptedException {
        //取消订阅关系
        consumer.unsubscribe();
        HashMap<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
        ArrayList<TopicPartition> list = new ArrayList<>();
        HashMap<TopicPartition, Long> map = new HashMap<>();
        for (PartitionInfo part : partitionInfos) {
            if (part.partition() == partition) {
                TopicPartition topicPartition = new TopicPartition(part.topic(), part.partition());
                list.add(topicPartition);
                map.put(new TopicPartition(part.topic(), part.partition()), timestampMs);
            }
        }

        final Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = consumer.offsetsForTimes(map);
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetAndTimestampMap.entrySet()) {
            final TopicPartition key = entry.getKey();
            final OffsetAndTimestamp value = entry.getValue();

            long position = 0;
            if (value != null) {
                position = value.offset();
            } else {
                list.add(key);
                position = consumer.position(key);
            }
            offset.put(key, new OffsetAndMetadata(position));
        }
        consumer.assign(list);
        //移动到最早offset
        consumer.seekToBeginning(list);
        //需要停掉已运行的服务再执行commit
        consumer.commitSync(offset);
        Thread.sleep(5000);
        //取消订阅关系
        consumer.unsubscribe();
    }

}

其他语言

其他语言请根据接入点、topic、group等,参考阿里云kafka官方接入demo自助接入:https://github.com/AliwareMQ/aliware-kafka-demos/tree/master