本文介绍如何使用阿里云E-MapReduce创建的Hadoop和Kafka集群,运行Spark Streaming作业以消费Kafka数据。
前提条件
- 已注册阿里云账号,详情请参见阿里云账号注册流程。
- 已开通E-MapReduce服务。
- 已完成云账号的授权,详情请参见角色授权。
- 本地安装了PuTTY和文件传输工具(SSH Secure File Transfer Client)。
步骤三:在Kafka集群上创建Topic
本示例将创建一个分区数为10、副本数为2、名称为test的Topic。
- 登录Kafka集群的Master节点,详情请参见使用SSH连接主节点。
- 通过如下命令创建Topic。
/usr/lib/kafka-current/bin/kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper emr-header-1:2181 /kafka-1.0.0 --topic test --create
说明 创建Topic后,请保留该登录窗口,后续步骤仍将使用。
步骤四:运行Spark Streaming作业
本示例将运行一个流式单词统计(WordCount)的作业。
- 登录Hadoop集群的Master节点,详情请参见使用SSH连接主节点。
- 执行如下作业命令,进行流式单词统计(WordCount)。
spark-submit --class com.aliyun.emr.example.spark.streaming.KafkaSample /home/hadoop/examples-1.2.0-shaded-2.jar 192.168.xxx.xxx:9092 test 5
关键参数说明如下:
参数 |
描述 |
192.168.xxx.xxx |
Kafka集群中任一Kafka Broker组件的内网IP地址。IP地址如图 1所示。
|
test |
Topic名称。 |
5 |
时间间隔。 |
图 1. Kafka集群组件
步骤五:使用Kafka发布消息
- 在Kafka集群的命令行窗口,执行如下命令运行Kafka的生产者。
/usr/lib/kafka-current/bin/kafka-console-producer.sh --topic test --broker-list emr-worker-1:9092
- 在Kafka集群的登录窗口中输入文本,在Hadoop集群的登录窗口中,会实时显示文本的统计信息。
例如,在Kafka集群的登录窗口输入如下信息。

Hadoop集群的登录窗口会输出如下信息。

步骤六:查看Spark Streaming作业状态
- 在E-MapReduce控制台的集群管理页面。
- 单击创建的Hadoop集群所在行的详情。
- 在左侧导航栏,单击访问链接与端口。
- 单击Spark History Server UI所在行的链接。
- 在History Server页面,单击待查看的App ID。
您可以查看Spark Streaming作业的状态。

在文档使用中是否遇到以下问题
更多建议
匿名提交