使用Spring Cloud框架收发消息

Spring Cloud是用于构建消息驱动的微服务应用程序的框架,提供服务发现、配置管理、消息传递、负载均衡等微服务相关的解决方案,可以更容易地构建分布式系统和进行服务间通信。本文介绍如何使用Spring Cloud框架接入云消息队列 Kafka 版并收发消息。

前提条件

公网环境(消息传输需鉴权与加密)

公网环境,消息采用SASL_SSL协议进行鉴权并加密。客户端通过SSL接入点访问云消息队列 Kafka 版。接入点的详细信息,请参见接入点对比

本示例将Demo包上传在/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo路径。

  1. 登录Linux系统,执行以下命令,进入Demo包所在路径/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo

    cd /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo
  2. 执行以下命令,进入配置文件路径。

    cd sasl-ssl/src/main/resources/
  3. 执行以下命令,编辑application.properties文件,并根据参数列表配置实例信息。

    vi application.properties
    ##以下参数,您需配置为实际使用的实例信息。
    kafka.bootstrap-servers=alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093
    kafka.consumer.group=test-spring
    kafka.output.topic.name=test-output
    kafka.input.topic.name=test-input
    kafka.ssl.truststore.location=/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/sasl-ssl/src/main/resources/kafka.client.truststore.jks
    
    ### 配置Binding参数可以把消息队列Kafka版和Spring Cloud Stream的Binder绑定在一起,以下参数保持默认即可。
    spring.cloud.stream.bindings.MyOutput.destination=${kafka.output.topic.name}
    spring.cloud.stream.bindings.MyOutput.contentType=text/plain
    spring.cloud.stream.bindings.MyInput.group=${kafka.consumer.group}
    spring.cloud.stream.bindings.MyInput.destination=${kafka.input.topic.name}
    spring.cloud.stream.bindings.MyInput.contentType=text/plain
    
    ### Binder是Spring Cloud对消息中间件的封装模块,以下参数保持默认即可。
    spring.cloud.stream.kafka.binder.autoCreateTopics=false
    spring.cloud.stream.kafka.binder.brokers=${kafka.bootstrap-servers}
    spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
    spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=PLAIN
    spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=${kafka.ssl.truststore.location}
    spring.cloud.stream.kafka.binder.configuration.ssl.truststore.password=KafkaOnsClient
    ### 如果Demo中没有以下参数,请手动增加。该参数表示是否需要进行服务器主机名验证。因消息传输使用SASL身份校验,可设置为空字符串关闭服务器主机名验证。
    ### 服务器主机名验证是验证SSL证书的主机名与服务器的主机名是否匹配,默认为HTTPS。
    spring.cloud.stream.kafka.binder.configuration.ssl.endpoint.identification.algorithm=
    表 1. 参数列表

    参数

    描述

    kafka.bootstrap-servers

    云消息队列 Kafka 版实例接入点。您可在云消息队列 Kafka 版控制台实例详情页面的接入点信息区域获取。

    kafka.consumer.group

    订阅消息的Group。您可以在云消息队列 Kafka 版控制台Group 管理页面创建。具体操作,请参见步骤三:创建资源

    kafka.output.topic.name

    消息的Topic。控制台程序通过此Topic每隔一段时间发送消息,内容是固定的。您可以在云消息队列 Kafka 版控制台Topic 管理页面创建。具体操作,请参见步骤三:创建资源

    kafka.input.topic.name

    消息的Topic。您可以通过此Topic在控制台发送消息,Demo程序会消费消息,并将消息打印在日志中。

    kafka.ssl.truststore.location

    SSL根证书kafka.client.truststore.jks的存放路径。

  4. 执行以下命令,打开kafka_client_jaas.conf文件,配置实例的用户名与密码。

    vi kafka_client_jaas.conf
    说明
    • 如果实例未开启ACL,您可以在云消息队列 Kafka 版控制台的实例详情页面获取默认用户的用户名和密码。

    • 如果实例已开启ACL,请确保要使用的SASL用户为PLAIN类型且已授权收发消息的权限。具体信息,请参见SASL用户授权

    KafkaClient {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="XXX"
      password="XXX";
    };
  5. 进入/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/sasl-ssl路径,执行以下命令,运行Demo。

    sh run_demo.sh

    程序打印如下信息,说明接收到控制台程序通过kafka.output.topic.name配置的Topic所发送的消息。

    Send: hello world !!
    Send: hello world !!
    Send: hello world !!
    Send: hello world !!
  6. 登录云消息队列 Kafka 版控制台验证消息收发是否成功。

    • 查询kafka.output.topic.name配置的Topic是否接收到控制台程序发送的消息。具体操作,请参见消息查询

    • kafka.input.topic.name配置的Topic发送消息,查看Demo程序日志中是否会打印消息。具体操作,请参见发送消息

VPC环境(消息传输不鉴权不加密)

VPC环境,消息可以采用PLAINTEXT协议不鉴权不加密传输。客户端通过默认接入点访问云消息队列 Kafka 版。接入点的详细信息,请参见接入点对比

本示例将Demo包上传在/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo路径。

  1. 登录Linux系统,执行以下命令,进入Demo包所在路径/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo

    cd /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo
  2. 执行以下命令,进入配置文件路径。

    cd vpc/src/main/resources/
  3. 执行以下命令,编辑application.properties文件,并根据参数列表配置实例信息。

    vi application.properties
    ###以下参数请修改为实际使用的实例的信息。
    kafka.bootstrap-servers=alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092
    kafka.consumer.group=test-spring
    kafka.output.topic.name=test-output
    kafka.input.topic.name=test-input
  4. 进入/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/vpc路径,执行以下命令,运行Demo。

    sh run_demo.sh

    程序打印如下信息。

    Send: hello world !!
    Send: hello world !!
    Send: hello world !!
    Send: hello world !!
  5. 登录云消息队列 Kafka 版控制台验证消息收发是否成功。

    • 查询kafka.output.topic.name配置的Topic是否接收到控制台程序发送的消息。具体操作,请参见消息查询

    • kafka.input.topic.name配置的Topic发送消息,查看Demo程序日志中是否会打印消息。具体操作,请参见发送消息

相关文档

关于Spring Cloud框架的更多信息,请参见Spring Cloud Stream