文档

Spark对接Kafka快速入门

更新时间:

简介

消息队列 Kafka 是一个分布式的、高吞吐量、高可扩展性消息队列服务,广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,是大数据生态中不可或缺的产品之一。这里主要介绍通过“数据工作台”使用SparkStreaming对接Kafka 0.10的方法。

前置条件

  1. Spark集群和Kafka集群在同一个VPC下。
    进入Spark分析集群页面,选择“数据库连接”>“连接信息”,查看Spark集群的VPC ID信息。如下图:

    然后部署Kafka时选择和Spark相同的VPC ID。

  2. Kafka的购买和部署流程
    进入 Kafka 售卖页购买 Kafka 实例,进入Kafka购买链接。具体 Kafka 的部署请参照Kafka部署参考

  3. Kafka集群已配置Spark集群的白名单。
    首先在阿里云控制台进入“专有网络VPC”,找到Spark集群的VPC ID对应的IPv4网段。如下图:

    然后进入Kafka集群配置白名单,添加上述IPv4网段。如下图:
  4. Kafka集群已经创建Topic、Consumer Group。
    本例使用的Topic名称为:topic01,Consumer Group名称为:testgroup和spark-executor-testgroup。
    注意:因为SparkStreaming在消费Kafka时,Spark的executor会在Group前面加上“spark-executor-”,所以需要在Kafka集群创建两个Consumer Group。
  5. 发送数据到Topic01。
    下载Jar包到ECS机器上(和Kafka集群在相同的VPC下;Kafka集群开通了ECS机器白名单):
    1. wget https://spark-home.oss-cn-shanghai.aliyuncs.com/common_test/common-test-0.0.1-SNAPSHOT.jar
    2. wget https://spark-home.oss-cn-shanghai.aliyuncs.com/kafka010/kafka-clients-0.10.2.2.jar
    3. wget https://spark-home.oss-cn-shanghai.aliyuncs.com/kafka010/slf4j-api-1.7.16.jar
    下载的jar包分别放入到如下目录:
    1. /opt/jars/kafka/kafka-clients-0.10.2.2.jar
    2. /opt/jars/kafka/slf4j-api-1.7.16.jar
    3. /opt/jars/test/common-test-0.0.1-SNAPSHOT.jar
    然后运行如下命令,向Topic01发送数据。
    1. java -cp /opt/jars/kafka/*:/opt/jars/test/common-test-0.0.1-SNAPSHOT.jar com.aliyun.kafka.KafkaProducerDemo_java xxx1:9092,xxx2:9092,xxx3:9092 topic01
    其中:xxx1:9092,xxx2:9092,xxx3:9092是Kafka集群的“默认接入点”。

使用“数据工作台”>“作业管理”运行样例

步骤 1:通过“资源管理”上传样例代码Jar包

下载样例代码jar包“sparkstreaming-0.0.1-SNAPSHOT.jar”以及依赖的jar包到本地目录。

  1. wget https://spark-home.oss-cn-shanghai.aliyuncs.com/spark_example/sparkstreaming-0.0.1-SNAPSHOT.jar
  2. wget https://spark-home.oss-cn-shanghai.aliyuncs.com/kafka010/kafka-clients-0.10.2.2.jar
  3. wget https://spark-home.oss-cn-shanghai.aliyuncs.com/kafka010/spark-streaming-kafka-0-10_2.11-2.3.2.jar

在“数据工作台”>“资源管理”中添加文件夹“sparkstreaming”。
上传下载的jar包到此文件夹。如下图:

步骤 2:通过“作业管理”创建并编辑作业内容

在“数据工作台”>“作业管理”中创建Spark作业,作业内容如下:

  1. --class com.aliyun.spark.SparkStreamingOnKafka010
  2. --jars /sparkstreaming/spark-streaming-kafka-0-10_2.11-2.3.2.jar,/sparkstreaming/kafka-clients-0.10.2.2.jar
  3. --driver-memory 1G
  4. --driver-cores 1
  5. --executor-cores 2
  6. --executor-memory 2G
  7. --num-executors 1
  8. --name sparkstreaming
  9. /sparkstreaming/sparkstreaming-0.0.1-SNAPSHOT.jar
  10. xxx1:9092,xxx2:9092,xxx3:9092 topic01 testgroup

作业内容参数说明:

参数 说明
xxx1:9092,xxx2:9092,xxx3:9092 Kafka集群的“默认接入点”。
topic01 testgroup 分别是Kafka集群创建的Topic和Consumer Group。

如下图:

步骤 3:通过“作业管理”运行作业并查看结果

作业编辑完成后点击“运行”,选择Spark集群。运行状态会在下侧显示,如图:


运行成功后点击“YarnUI”,翻到最后看到如下结果表明Spark读取Kafka成功。如下:

  1. -------------------------------------------
  2. Time: 1555384700000 ms
  3. -------------------------------------------
  4. (this,1)
  5. (is,1)
  6. (value,1)
  7. (message's,1)
  8. (the,1)

小结