使用SSL加密Kafka链接

SSL(Secure Sockets Layer)是一种网络协议,提供了一种在客户端和服务器之间建立安全连接的方法。启用SSL后,DataFlow集群中的所有数据传输,包括生产者、消费者与Broker之间的消息交互都会被加密,确保敏感信息在网络传输过程中不被窃听或篡改。本文为您介绍如何开启SSL,并使用SSL连接Kafka。

前提条件

已在E-MapReduce控制台创建选择了Kafka服务的DataFlow集群(即Kafka集群),详情请参见创建DataFlow Kafka集群

配置SSL功能

E-MapReduce Kafka集群提供以下两种配置SSL的方式:

  • 使用默认证书配置SSL:使用E-MapReduce默认创建的证书和默认配置方式快速启用SSL功能。

  • 自定义配置SSL:使用自定义证书和配置值启用SSL功能。

E-MapReduce通过server.properties配置文件的kafka.ssl.config.type配置项来管理配置SSL的策略。

使用默认证书配置SSL

说明

Kafka集群的SSL功能默认关闭,您可以执行以下步骤快速开启SSL功能。

  1. 进入服务的配置页面。

    1. 登录E-MapReduce控制台

    2. 在顶部菜单栏处,根据实际情况选择地域和资源组

    3. 单击目标集群操作列的集群服务

    4. 集群服务页面,单击Kafka服务区域的配置

  2. 修改配置项。

    1. 配置页面,单击server.properties页签。

    2. 修改kafka.ssl.config.type的参数值为DEFAULT

      修改参数

  3. 保存配置。

    1. 单击保存

    2. 在弹出的对话框中,输入执行原因,单击保存

  4. 重启Kafka服务。

    1. 在Kafka服务的配置页面,选择更多操作 > 重启

    2. 在弹出的对话框中,输入执行原因,单击确定

    3. 确认对话框中,单击确定

自定义配置SSL

说明

Kafka集群的SSL功能默认关闭,您可以通过自定义配置开启SSL功能。

  1. 进入服务的配置页面。

    1. 登录E-MapReduce控制台

    2. 在顶部菜单栏处,根据实际情况选择地域和资源组

    3. 单击目标集群操作列的集群服务

    4. 集群服务页面,单击Kafka服务区域的配置

  2. 修改配置项。

    1. 配置页面,单击server.properties页签。

    2. 修改kafka.ssl.config.type的参数值为CUSTOM

  3. 保存配置。

    1. 单击保存

    2. 在弹出的对话框中,输入执行原因,单击保存

  4. 修改SSL其他配置。

    您需要根据业务需求,自行配置除listeners之外的SSL相关配置。例如,ssl.keystore.locationssl.keystore.passwordssl.truststore.locationssl.truststore.passwordssl.key.password,ssl.keystore.type和ssl.truststore.type等。

  5. 重启Kafka服务。

    1. 在Kafka服务的配置页面,选择更多操作 > 重启

    2. 在弹出的对话框中,输入执行原因,单击确定

    3. 确认对话框中,单击确定

使用SSL连接Kafka

使用SSL连接Kafka时,需要客户端配置参数security.protocolssl.truststore.passwordssl.truststore.location

例如,在已开启SSL的Kafka集群中,使用Kafka自带的Producer和Consumer执行作业,操作步骤如下:

  1. 使用SSH方式连接集群的Master节点,详情请参见登录集群

  2. 创建配置文件。

    1. 执行以下命令,创建配置文件ssl.properties。

      vim ssl.properties
    2. 添加以下内容至配置文件ssl.properties中。

      security.protocol=SSL
      ssl.truststore.location=/var/taihao-security/ssl/ssl/truststore
      ssl.truststore.password=${password}
      ssl.keystore.location=/var/taihao-security/ssl/ssl/keystore
      ssl.keystore.password=${password}
      ssl.endpoint.identification.algorithm=

      上面参数的值,您可以在EMR控制台Kafka服务的配置页面查看。如果是在Kafka集群以外的环境执行作业,您可以将Kafka集群中任意节点相应目录下的truststorekeystore文件,拷贝至运行环境进行相应配置。

  3. 执行以下命令,创建Topic。

    kafka-topics.sh --partitions 10 --replication-factor 2 --bootstrap-server core-1-1:9092 --topic test --create --command-config ssl.properties
  4. 执行以下命令,使用SSL配置文件产生数据。

    export IP=<your_InnerIP>
    kafka-producer-perf-test.sh --topic test --num-records 123456 --throughput 10000 --record-size 1024 --producer-props bootstrap.servers=${IP}:9092 --producer.config ssl.properties
    说明

    本文代码示例中的your_InnerIP为master-1-1节点的内网IP地址。

  5. 执行以下命令,使用SSL配置文件消费数据。

    export IP=<your_InnerIP>
    kafka-consumer-perf-test.sh --broker-list ${IP}:9092 --messages 100000000 --topic test --consumer.config ssl.properties

相关文档

如需对连接至Kafka服务的用户进行身份验证,详情请参见使用SASL登录认证Kafka服务