全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 智能硬件
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 更多
E-MapReduce

Kafka 使用说明

更新时间:2018-04-03 12:50:28

从EMR-3.4.0版本开始将支持Kafka服务。

创建Kafka集群

在E-MapReduce控制台创建集群时,选择集群类型为Kafka,则会创建一个默认只包含Kafka组件的集群,除了基础组件外包括Zookeeper,Kafka和KafkaManager三个组件。每个节点将只部署一个Kafka broker。我们建议您的Kafka集群是一个专用集群,不要和Hadoop相关服务混部在一起。

跨集群访问Kakfa

通常,我们会单独部署一个Kafka集群来提供服务,所以经常需要跨集群访问Kafka服务。这时,我们需要在机器上配置Kafka集群节点的host信息。注意,这里我们需要在client端机器配置Kafka集群节点的长域名,否则会出现访问不到Kafka服务的问题。示例如下:

  1. etc/hosts
  2. # kafka cluster
  3. 10.0.1.23 emr-header-1.cluster-48742
  4. 10.0.1.24 emr-worker-1.cluster-48742
  5. 10.0.1.25 emr-worker-2.cluster-48742
  6. 10.0.1.26 emr-worker-3.cluster-48742

线下访问Kafka集群

Kafka集群的core节点默认无法访问公网,所以如果您需要线下环境访问Kafka集群,可以参考以下步骤完成:

  • 打通Kafka集群和线下机器网络:
    • Kafka集群部署在VPC网络环境,两种方式:
      • 1.部署高速通道打通线下线上网络,参考高速通道文档。
      • 2.集群Core节点挂载弹性公网IP(EIP文档)。以下步骤使用挂载EIP方式。
    • Kafka集群部署在经典网络,两种方式:
      • 1.如果您创建的是按量集群,目前只能通过ECS API来实现,请参考API文档
      • 2.如果您创建的是包年包月集群,您可以直接在ECS控制台对相应机器变配开启分配公网IP。
  • ECS控制台创建EIP,根据您Kakfa集群Core节点个数购买相应地EIP,创建地址
  • 配置kafka集群安全组规则,限制公网可访问Kafka集群的IP等。目的提高Kafka集群暴露在公网中的安全性。您可以在EMR控制台查看到集群所属的安全组,根据安全组ID去查找并配置安全组规则。文档地址
  • 修改Kafka集群的软件配置”listeners.address.principal”为”HOST”,并重启Kafka集群。
  • 参考上一节“跨集群访问Kakfa”,配置本地客户端机器的hosts文件。

本地盘Kafka集群

为了更好地降低单位成本以及应对更大的存储需求,E-MapReduce将在EMR-3.5.1版本开始支持基于本地盘(D1类簇机型,详情参考ECS机型介绍文档)的Kafka集群。相比较云盘,本地盘Kafka集群有如下特点:

  • 实例配备大容量、高吞吐SATA HDD本地盘,单磁盘 190 MB/s 顺序读写性能,单实例最大 5 GB/s 存储吞吐能力。
  • 本地存储价格比 SSD 云盘降低 97%。
  • 更高网络性能,最大17 Gbit/s实例间网络带宽,满足业务高峰期实例间数据交互需求。

但本地盘机型也有特殊之处:

操作 本地磁盘数据状态 说明
操作系统重启/控制台重启/强制重启 保留 本地磁盘存储卷保留,数据保留
操作系统关机/控制台停止/强制停止 保留 本地磁盘存储卷保留,数据保留
控制台上释放(实例) 擦除 本地磁盘存储卷擦除,数据不保留

注意:

  • 当宿主机宕机或者磁盘损坏时,磁盘中的数据将会丢失
  • 请勿在本地磁盘上存储需要长期保存的业务数据,并及时做好数据备份和采用高可用架构。如需长期保存,建议将数据存储在云盘上。

为了能够在本地盘上部署Kafka服务,E-MapReduce默认要求:

  1. “default.replication.factor”= 3,即要求topic的分区副本数至少为3。如果设置更小副本数,则会增加数据丢失风险
  2. “min.insync.replicas” = 2,即要求当producer设定acks为all(-1)时,每次至少写入两个副本才算写入成功。

当出现本地盘损坏时,E-MapReduce会进行:

  1. 将坏盘从Broker的配置中剔除,重启Broker,在其他正常可用的本地盘上恢复坏盘丢失的数据。根据坏盘上已经写入的数据量不等,恢复的总时间也不等。
  2. 当机器磁盘损坏数目过多(超过20%)是,E-MapReduce将主动进行机器迁移,恢复异常的磁盘。
  3. 如果当前机器上可用剩余磁盘空间不足以恢复坏盘丢失数据时,Broker将异常Down掉。这种情况,您可以选择清理一些数据,腾出磁盘空间并重启Broker服务;也可以联系E-MapReduce进行机器迁移,恢复异常的磁盘。

参数说明

您可以在E-MapReduce的集群配置管理中查看Kafka的软件配置,当前主要有:

配置项 说明
zookeeper.connect Kafka集群的Zookeeper连接地址
kafka.heap.opts Kafka broker的堆内存大小
num.io.threads Kafka broker的IO线程数,默认为机器CPU核数目的2倍
num.network.threads Kafka broker的网络线程数,默认为机器的CPU核数目

常见问题

  1. “Error while executing topic command : Replication factor: 1 larger than available brokers: 0.”

常见原因:

  • Kafka服务异常,集群Broker进程都退出了,需要结合日志排查问题。
  • Kafka服务的Zookeeper地址使用错误,请注意查看并使用集群配置管理中Kafka组件的Zookeeper连接地址
  1. “java.net.BindException: Address already in use (Bind failed)”

当您使用kafka命令行工具时,有时会碰到”java.net.BindException: Address already in use (Bind failed)”异常,这一般由JMX端口被占用导致,您可以在命令行前说动指定一个JMX端口即可:例如

  1. JMX_PORT=10101 kafka-topics --zookeeper emr-header-1:2181/kafka-1.0.0 --list
本文导读目录