文档

使用Java实现Kafka消费

更新时间:

本文介绍如何使用Java实现Kafka消费。

参数说明

参数

描述

示例

bootstrap.servers

初始连接的集群地址,格式为${project}.${endpoint}:${port},请根据实际的Project名称及其所在的Endpoint进行配置。更多信息,请参见服务入口

  • 阿里云VPC内网:端口号为10011。

  • 公网:端口号为10012。

etl-dev.cn-hangzhou.log.aliyuncs.com:10012

其中,etl-dev为Project名称。

security.protocol

为了保证数据传输的安全性,请使用sasl_ssl。

sasl_ssl

security.mecham

必须使用PLAIN。

PLAIN

sasl_ssl.username

日志服务Project名称。

etl-dev

sasl_ssl.password

阿里云账号AccessKey。格式为{access-key-id}#{access-key-secret}。建议您使用具备日志服务Project写入权限的RAM用户的AccessKey。授予RAM用户向指定Project写入数据权限的具体操作,请参见授权。如何获取AccessKey的具体操作,请参见访问密钥

topic

日志服务Logstore名称。

test

enable.auto.commit

是否自动提交消费点位,建议设置为true。

true

auto.commit.interval.ms

自动提交消费点位的间隔时间,建议设置为30000,单位为ms。

30000

max.poll.interval.ms

消费组在消费者发起加入组请求后,等待所有消费者加入的时间间隔。

在这个时间间隔内加入组的消费者为消费组的成员,进行分区分配,各个消费者按分配的分区开发消费数据,如果在这个时间内还有消费者没有加入消费组,则会触发消费组再平衡操作,再平衡期间不会消费数据,会导致消费延迟。建议设置max.poll.interval.ms为120000,单位为ms,保证所有消费者都能加入消费组。

120000

session.timeout.ms

心跳最大超时时间。

在该时间如果消费者没有发送心跳请求,则视为该消费者发生异常,触发消费组再平衡操作。

重要

使用Java实现Kafka消费时需要设置session.timeout.ms值大于max.poll.interval.ms。

130000

auto.offset.reset

消费起始点位,常用的值为latest和earliest,默认为latest。

  • earliest:表示使用最早的偏移量,从最早的消息开始读取。当有已提交的偏移量时,从提交的偏移量开始消费;无提交的偏移量时,从头开始消费。

  • latest:表示使用最新的偏移量,即从最新消息开始读取。当有已提交的偏移量时,从提交的偏移量开始消费;无提交的偏移量时,消费新产生的数据。

latest

依赖说明

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.1.0</version>
</dependency>

代码示例

/** 使用标准Kafka SDK消费SLS的日志数据 */
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerGroupTest {

    public static void consume() {
        Properties props = new Properties();
        String project = "etl-dev";
        String logstore = "test";
        // 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
        // 此处以把AccessKey 和 AccessKeySecret 保存在环境变量为例说明。您可以根据业务需要,保存到配置文件里。
        // 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险
        String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID");
        String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET");
        String groupId = "kafka-test";
        String endpoint = "cn-hangzhou.log.aliyuncs.com";
        String port = "10012";
       
        String hosts = project + "." + endpoint + ":" + port;
        props.put("bootstrap.servers", hosts);
        props.put("security.protocol", "sasl_ssl");
        props.put("sasl.mechanism", "PLAIN");
        props.put("sasl.jaas.config",
                "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" +
                        project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";");

        // Kafka消费者配置
        props.put("group.id", groupId);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "30000");
        props.put("session.timeout.ms", "130000");
        props.put("auto.offset.reset", "earliest");
        props.put("max.poll.interval.ms", "120000");
        props.put("heartbeat.interval.ms", "5000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

      	// 创建Kafka消费者实例
        KafkaConsumer<String,String> consumer =  new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(logstore));
        while(true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
            }
        }
    }
    public static void main(String[] args){
       consume();
    }
}

  • 本页导读 (1)
文档反馈