使用kafka接收服务端订阅消息

更新时间:2025-02-13 01:54:50

服务端订阅功能支持AMQPKafka两种方式接收消息,本文示例展示如何使用Kafka进行服务端订阅。

一、前提条件

  • 购买Tuya物联网平台阿里云版,或是使用免费体验版

  • 准备好开发环境接收数据

  • mqtt客户端工具模拟设备上报数据

二、配置服务端订阅

首先在控制台页面创建kafka类型的服务端订阅

  1. 登录控制台页面

  2. 创建产品和设备

  3. 在左侧导航栏选择服务端订阅消息转发 > 服务端订阅点击创建消费组。然后,点击创建订阅,在弹出的页面选择Kafka类型。推送类型选择设备上报消息设备状态变化通知。产品选择上文创建的产品(产品输入框支持输入产品名字或产品ID搜索)。

如上,这个产品的设备的上下线消息和设备上报消息都会通过kafka转发到对应的消费组

、编写接收端代码

默认情况下,Kafka仅提供VPC地址,需要先配置VPC对等连接,服务端才能连接。在VPC对等连接情况下,流量免费,且相当于内网一样稳定且安全,正式环境建议使用此方式。

但是在开发环境,接收端可能是您的电脑,无法配置VPC对等连接,同时,开发环境流量费用可以忽略不计,也无需特别关注公网的安全问题,我们提供公网Kafka地址,需要联系对应BD开通公网Kafka地址,会给您提供公网Kafka的接入地址等信息。

推荐的实践是开发环境使用公网,生产环境使用VPC对等连接。

下文以公网VPC对等连接,两种情况分别介绍如何编写接收端代码。

3.1. 公网

本示例演示如何使用python3接收服务端订阅消息

  1. 安装python3。macOS使用 :

    brew install python3 
  2. 安装kafka依赖的python

    pip3 install kafka-python
  1. 新建python代码

    vi kafka-consumer.py
  2. 输入以下python代码

    from kafka import KafkaConsumer
    from kafka import errors as kafka_errors
    
    # 阿里云Kafka服务器地址列表
    bootstrap_servers = ['alikafka-serverless-cn-********.alikafka.aliyuncs.com:9093',
                         'alikafka-serverless-cn-********.alikafka.aliyuncs.com:9093',
                         'alikafka-serverless-cn-********.alikafka.aliyuncs.com:9093']
    
    # 要消费的主题名称
    topic_name = 'DEFAULT_GROUP'
    
    # 消费者组ID
    group_id = 'DEFAULT_GROUP'
    
    # SASL配置信息,替换为实际的用户名和密码
    username = "6ENAGJpRpLVhp8TkI69TkshI"
    password = "37sl3jH4RdwVYHoe9pymeGudSpEhnVbefrzy2LXceV7ozDAM"
    
    ssl_cafile = "./only-4096-ca-cert"
    
    # 创建Kafka消费者实例并添加SSL和SASL相关安全配置
    consumer = KafkaConsumer(
        topic_name,
        bootstrap_servers=bootstrap_servers,
        group_id=group_id,
        security_protocol="SASL_SSL",  # 使用SASL_SSL开启SSL连接
        sasl_mechanism="PLAIN",
        sasl_plain_username=username,
        sasl_plain_password=password,
        ssl_cafile=ssl_cafile
    )
    
    # 循环读取并打印消息以及消息头信息
    for message in consumer:
        # 消息值处理
        try:
            decoded_value = message.value.decode('utf-8')
            # 检查是否有不可见字符
            if any(0 <= ord(char) < 32 or ord(char) == 127 for char in decoded_value):
                raise ValueError("包含不可见字符")
            print(f"Received message: {decoded_value} (字符串显示)")
        except (UnicodeDecodeError, ValueError):
            hex_value = message.value.hex()
            print(f"Received message: {hex_value} (字节数组以十六进制显示)")
    
        # 消息头处理
        print("Message Headers:")
        for header_key, header_value in message.headers:
            # 处理消息头键
            if isinstance(header_key, bytes):
                header_key = header_key.decode('utf-8', errors='replace')
    
            # 处理消息头值
            if isinstance(header_value, bytes):
                try:
                    header_value_str = header_value.decode('utf-8')
                    print(f"  {header_key}: {header_value_str} (字符串显示)")
                except UnicodeDecodeError:
                    if header_key == "generateTime1" and len(header_value) == 8:
                        try:
                            long_value = struct.unpack('>q', header_value)[0]
                            print(f"  {header_key}: {long_value} (时间戳,单位:毫秒)")
                        except struct.error:
                            print(f"  {header_key}: {header_value} (解析时间戳失败)")
                    else:
                        print(f"  {header_key}: {header_value.hex()} (二进制显示)")
            else:
                print(f"  {header_key}: {header_value}")
  3. 修改代码。以下参数需要修改为自己实际的数据

    参数

    描述

    bootstrap_servers

    kafka公网接入地址,需要找自己BD获取

    topic_name

    上文创建的消费组id

    group_id

    上文创建的消费组id

    username

    左侧导航栏 密钥管理 > Access Key 。 如果为空,创建一个即可。

    password

    左侧导航栏 密钥管理 > Secret Key

    ssl_cafile

    Python代码相同文件夹的终端里执行以下代码,下载证书。

    wget https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20220826/ytsw/only-4096-ca-cert

  4. 在终端运行python代码

    python3 kafka-consumer.py

四、模拟设备上报消息

使用任何熟悉mqtt客户端工具,或者自己使用Pahomqtt客户端编写代码,模拟上报数据。

本文用MQTTX

  1. 在终端运行上一步编写的Python脚本。如果接收到数据,会在终端打印出来。

    python3 kafka-consumer.py
  1. 左侧导航栏选择 设备管理 > 设备管理 找到上文创建的设备。

  2. 点击详情 > 连接参数 查看mqtt连接的clientId,username,password,port, host数据

  3. 使用MQTTX,新建连接,使用上一步获取的参数进行连接。如下图:

    image

  1. 切换到第1步运行Python脚本的终端,此时应该能接收到设备上线消息。

五、文档参考

[阿里云kafka开发指导]:

使用实例接入点收发消息

  • 本页导读 (0)
  • 一、前提条件
  • 二、配置服务端订阅
  • 三、编写接收端代码
  • 3.1. 公网
  • 四、模拟设备上报消息
  • 五、文档参考