文档

接入方式

更新时间:

本文介绍Linkedmall分销消息在多种SDK下的接入方式

JAVA SDK

环境准备

  • 安装1.8或以上版本JDK。具体操作。请参见安装JDK

  • 安装2.5或以上版本Maven。具体操作,请参见安装Maven

  • 安装编译工具。

安装依赖库

在pom.xml中添加如下依赖

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.4.0</version>
</dependency>

配置准备

  1. 下载SSL根证书

  1. 在项目src/resource文件下新建kafka.properties文件,并填写以下内容。

## SSL接入点,请联系Linkedmall工作人员获取。
bootstrap.servers=xxxx
## Group,请联系Linkedmall工作人员获取
group.id=xxxx
## SASL用户名,请联系Linkedmall工作人员获取
sasl.username=12345
## SASL密码,请联系Linkedmall工作人员获取。
sasl.password=12345
##SSL根证书,请指向步骤1中下载的根证书文件绝对路径。
ssl.truststore.location=/xxxx/kafka.client.truststore.jks
  1. 创建配置文件加载程序

import java.util.Properties;

public class KafkaConfigurer {
    private static volatile Properties properties;

    public static Properties getKafkaProperties() {
        if (properties == null) {
            synchronized (KafkaConfigurer.class) {
                if (properties == null) {
                    //获取配置文件kafka.properties的内容。
                    Properties kafkaProperties = new Properties();
                    try {
                        kafkaProperties.load(KafkaConfigurer.class.getClassLoader().getResourceAsStream("kafka.properties"));
                    } catch (Exception e) {
                        //没加载到文件,程序要考虑退出。
                        e.printStackTrace();
                    }
                    properties = kafkaProperties;
                }
            }
        }
        return properties;
    }
}

消息消费

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;

import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.*;
import java.util.stream.Collectors;

public class LinkedmallConsumerExample {

    private static final String JAAS_CONFIG_TEMPLATE = "org.apache.kafka.common.security.plain.PlainLoginModule required" +
            " username=\"%s\"" +
            " password=\"%s\";";

    public static void main(String[] args) throws IOException {

        //加载kafka.properties
        Properties kafkaProperties = KafkaConfigurer.getKafkaProperties();
        Properties props = new Properties();

        //设置接入点及鉴权相关配置。
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        String jaasConfig = String.format(JAAS_CONFIG_TEMPLATE, kafkaProperties.get("sasl.username"), kafkaProperties.get("sasl.password"));
        props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");

        //设置group
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));

        //两次Poll之间的最大允许间隔。
        //消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Group移除并触发Rebalance,默认30s。
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);

        //每次poll的消息最大数量。
        //不宜设置过大,需要考虑消费端消费速率,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿。
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);

        //设置单次拉取的字节数,推荐设置为每次poll的消息最大数量*1024(Linkedmall下发消息约1K),过大可能导致限流。
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);

        //消息的反序列化方式
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //建议关闭自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);


        //构造消息对象,也即生成一个消费实例。
        KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);

        //设置订阅的topic
        List<String> topics = Arrays.stream(kafkaProperties.getProperty("topics").split(",")).collect(Collectors.toList());
        consumer.subscribe(topics);

        //用于异步处理消息的线程池。
        ExecutorService executorService = new ThreadPoolExecutor(0,
                Math.max(1, Runtime.getRuntime().availableProcessors() - 1),
                60L, TimeUnit.SECONDS,
                new SynchronousQueue<>(), Executors.defaultThreadFactory());
        //循环消费消息。
        while (true) {
            try {
                ConsumerRecords<String, String> records = consumer.poll(Duration.of(1, ChronoUnit.SECONDS));
                System.out.printf("Record pulled: %d\n", records.count());
              
                // 按partition分别串行处理,保证顺序性;
                Collection<Callable<Void>> tasks = records.partitions().stream()
                        // 获取单个分区内消息
                        .map(records::records)
                        // 为每个分区创建分区内消息串行消费的逻辑
                        .map(partitionRecords -> (Callable<Void>) () -> {
                            for (ConsumerRecord<String, String> record : partitionRecords) {
                                System.out.printf("Consume topic:%s partition:%d offset:%d%n", record.topic(), record.partition(), record.offset());
                                consume(record);
                            }
                            return null;
                        })
                        .collect(Collectors.toList());
              
                // 等待所有分区消息消费完成,超时时间25s;等待时间应小于SESSION_TIMEOUT_MS_CONFIG,否则会触发rebalance
                List<Future<Void>> result = executorService.invokeAll(tasks, 25, TimeUnit.SECONDS);
                for (Future<Void> future : result) {
                    future.get();
                }

                // 全部消费成功,手动提交位点
                consumer.commitSync();
            } catch (Exception e) {
                // 消费失败,待下次重新拉取消费,注意幂等处理
                try {
                    Thread.sleep(1000);
                } catch (Throwable ignore) {
                }
                e.printStackTrace();
            }
        }
    }

    private static void consume(ConsumerRecord<String, String> record) {
        // 消息处理逻辑
    }
}

注意事项

1. 消费重试

  • 手动提交位点(推荐)

    在关闭自动提交的情况下,只有在客户端显式地调用consumer.commitSync()才会将本地位点提交至服务端。因此在本地消费失败的情况下,可以选择不手动提交位点,这样下次调用consumer.poll()时仍会拉取与上次相同的消息,从而实现消息的消费重试。

    • 注意:注意区分消息失败原因是否是可重试的,对不可重试的消息(如因非预期异常导致的失败)不做阻塞,通过日志等其他途径记录;对可重试的消息做最大重试次数等限制,否则无限制地重试可能导致消息进度被阻塞,导致消息堆积等问题。

    • 注意:未提交位点时,每次重试都会拉取上次提交位点开始的消息,因此可能存在已消费成功的消息,需要做好幂等处理。

  • 开启自动提交(不推荐)

    在开启自动提交的情况下,客户端会定时向kafka服务端提交已拉取的位点,与本地消费逻辑是否成功无关。因此在由于网络、业务信息延迟等问题导致消息消费失败的情况下,该条消息将被跳过。

2. 顺序消费

Linkedmall消息在发送时保证按分区有序,如您需要按序消费消息,请务必参考示例代码中的消费部分,对每个分区内的消息串行处理

// 拉取消息
ConsumerRecords<String, String> records = consumer.poll(Duration.of(1, ChronoUnit.SECONDS));

// 获取分区信息
Set<TopicPartition> partitions = records.partitions();

for (TopicPartition partition: partitions) {
  // 获取单个分区内的消息
  List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
  // 可以使用异步线程串行处理分区内的消息
  consumePartition(partitionRecords);
}
  1. 消息类型

    Topic中会包含多种Event,如商品Topic中包含ProductCreated,SkuEdited等消息,您可以根据需要接受,如果业务上不需要,消息处理时可以直接跳过。

其他语言

其他语言请根据接入点、topic、group等,参考阿里云kafka官方接入demo自助接入:https://github.com/AliwareMQ/aliware-kafka-demos/tree/master

  • 本页导读 (0)
文档反馈