本文介绍在阿里云E-MapReduce创建的包含kafka服务的DataFlow集群中,如何使用Spark Streaming作业从Kafka中实时消费数据。
前提条件
已注册阿里云账号。
已开通E-MapReduce服务。
已完成云账号的授权,详情请参见角色授权。
步骤一:创建DataLake和DataFlow集群
创建同一个安全组下的DataLake和DataFlow集群(包含Kafka服务)。创建详情请参见创建集群。
创建DataLake集群。
说明本文以Spark 3为例。
创建DataFlow。
说明务必确认已选择Kafka服务。系统会自动选择其依赖的Kafka-Manager和Zookeeper服务。
步骤二:获取JAR包并上传到DataLake集群
获取JAR包(spark-streaming-demo-1.0.jar)。
上传JAR包至DataLake集群Master节点的
/home/emr-user
路径下。
步骤三:在DataFlow集群上创建Topic
本示例将创建一个名称为test的Topic。
登录DataFlow集群的Master节点,详情请参见登录集群。
执行以下命令,创建Topic。
kafka-topics.sh --create --bootstrap-server core-1-1:9092,core-1-2:9092,core-1-3:9092 --replication-factor 2 --partitions 2 --topic demo
说明创建Topic后,请保留该登录窗口,后续步骤仍将使用。
步骤四:运行Spark Streaming作业
本示例将运行一个流式单词统计(WordCount)的作业。
登录DataLake集群的Master节点,详情请参见登录集群。
执行以下命令,进入emr-user目录。
cd /home/emr-user
执行以下命令,进行流式单词统计(WordCount)。
spark-submit --class com.aliyun.emr.KafkaApp1 ./spark-streaming-demo-1.0.jar <Kafka broker的内网IP地址>:9092 demogroup1 demo
关键参数如下表所示。
参数
描述
<Kafka broker的内网IP地址>:9092
DataFlow集群中Broker节点的内网IP地址和端口号,端口号默认为9092。例如,
172.16.**.**:9092,172.16.**.**:9092,172.16.**.**:9092
。您可以在DataFlow集群Kafka服务的状态页签,单击KafkaBroker组件前的图标,查看所有节点的内网IP地址。
demogroup1
指定Kafka消费组的名称。您也可以根据实际情况修改。
demo
Topic名称。
步骤五:使用Kafka发布消息
在DataFlow集群的命令行窗口,执行如下命令运行Kafka的生产者。
kafka-topics.sh --create --bootstrap-server core-1-1:9092,core-1-2:9092,core-1-3:9092 --replication-factor 2 --partitions 2 --topic demokafka-console-producer.sh --topic demo --broker-list core-1-1:9092
在DataFlow集群的登录窗口中输入文本,在DataLake集群的登录窗口中,会实时显示文本的统计信息。
例如,在DataFlow集群的登录窗口输入如下信息。
DataLake集群的登录窗口会输出如下信息。
步骤六:查看Spark Streaming作业状态
在EMR on ECS页面,单击DataLake集群的名称。
单击访问链接与端口页签。
访问SPARK UI,详情请参见访问链接与端口。
在History Server页面,单击待查看的App ID。
您可以查看Spark Streaming作业的状态。