使用kafka接收服务端订阅消息
服务端订阅功能支持AMQP和Kafka两种方式接收消息,本文示例展示如何使用Kafka进行服务端订阅。
一、前提条件
购买Tuya物联网平台阿里云版,或是使用免费体验版
准备好开发环境接收数据
mqtt客户端工具模拟设备上报数据
二、配置服务端订阅
首先在控制台页面创建kafka类型的服务端订阅
登录控制台页面
创建产品和设备
在左侧导航栏选择服务端订阅消息转发 > 服务端订阅点击创建消费组。然后,点击创建订阅,在弹出的页面选择Kafka类型。推送类型选择设备上报消息和设备状态变化通知。产品选择上文创建的产品(产品输入框支持输入产品名字或产品ID搜索)。
如上,这个产品的设备的上下线消息和设备上报消息都会通过kafka转发到对应的消费组
三、编写接收端代码
默认情况下,Kafka仅提供VPC地址,需要先配置VPC对等连接,服务端才能连接。在VPC对等连接情况下,流量免费,且相当于内网一样稳定且安全,正式环境建议使用此方式。
但是在开发环境,接收端可能是您的电脑,无法配置VPC对等连接,同时,开发环境流量费用可以忽略不计,也无需特别关注公网的安全问题,我们提供公网Kafka地址,需要联系对应BD开通公网Kafka地址,会给您提供公网Kafka的接入地址等信息。
推荐的实践是开发环境使用公网,生产环境使用VPC对等连接。
下文以公网和VPC对等连接,两种情况分别介绍如何编写接收端代码。
3.1. 公网
本示例演示如何使用python3接收服务端订阅消息
安装python3。macOS使用 :
brew install python3
安装kafka依赖的python库
pip3 install kafka-python
新建python代码
vi kafka-consumer.py
输入以下python代码
from kafka import KafkaConsumer from kafka import errors as kafka_errors # 阿里云Kafka服务器地址列表 bootstrap_servers = ['alikafka-serverless-cn-********.alikafka.aliyuncs.com:9093', 'alikafka-serverless-cn-********.alikafka.aliyuncs.com:9093', 'alikafka-serverless-cn-********.alikafka.aliyuncs.com:9093'] # 要消费的主题名称 topic_name = 'DEFAULT_GROUP' # 消费者组ID group_id = 'DEFAULT_GROUP' # SASL配置信息,替换为实际的用户名和密码 username = "6ENAGJpRpLVhp8TkI69TkshI" password = "37sl3jH4RdwVYHoe9pymeGudSpEhnVbefrzy2LXceV7ozDAM" ssl_cafile = "./only-4096-ca-cert" # 创建Kafka消费者实例并添加SSL和SASL相关安全配置 consumer = KafkaConsumer( topic_name, bootstrap_servers=bootstrap_servers, group_id=group_id, security_protocol="SASL_SSL", # 使用SASL_SSL开启SSL连接 sasl_mechanism="PLAIN", sasl_plain_username=username, sasl_plain_password=password, ssl_cafile=ssl_cafile ) # 循环读取并打印消息以及消息头信息 for message in consumer: # 消息值处理 try: decoded_value = message.value.decode('utf-8') # 检查是否有不可见字符 if any(0 <= ord(char) < 32 or ord(char) == 127 for char in decoded_value): raise ValueError("包含不可见字符") print(f"Received message: {decoded_value} (字符串显示)") except (UnicodeDecodeError, ValueError): hex_value = message.value.hex() print(f"Received message: {hex_value} (字节数组以十六进制显示)") # 消息头处理 print("Message Headers:") for header_key, header_value in message.headers: # 处理消息头键 if isinstance(header_key, bytes): header_key = header_key.decode('utf-8', errors='replace') # 处理消息头值 if isinstance(header_value, bytes): try: header_value_str = header_value.decode('utf-8') print(f" {header_key}: {header_value_str} (字符串显示)") except UnicodeDecodeError: if header_key == "generateTime1" and len(header_value) == 8: try: long_value = struct.unpack('>q', header_value)[0] print(f" {header_key}: {long_value} (时间戳,单位:毫秒)") except struct.error: print(f" {header_key}: {header_value} (解析时间戳失败)") else: print(f" {header_key}: {header_value.hex()} (二进制显示)") else: print(f" {header_key}: {header_value}")
修改代码。以下参数需要修改为自己实际的数据
参数
描述
bootstrap_servers
kafka公网接入地址,需要找自己BD获取
topic_name
上文创建的消费组id
group_id
上文创建的消费组id
username
左侧导航栏 密钥管理 > Access Key 。 如果为空,创建一个即可。
password
左侧导航栏 密钥管理 > Secret Key。
ssl_cafile
在Python代码相同文件夹的终端里执行以下代码,下载证书。
wget https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20220826/ytsw/only-4096-ca-cert
在终端运行python代码
python3 kafka-consumer.py
四、模拟设备上报消息
使用任何熟悉mqtt客户端工具,或者自己使用Paho等mqtt客户端编写代码,模拟上报数据。
本文用MQTTX
在终端运行上一步编写的Python脚本。如果接收到数据,会在终端打印出来。
python3 kafka-consumer.py
左侧导航栏选择 设备管理 > 设备管理 找到上文创建的设备。
点击详情 > 连接参数 查看mqtt连接的clientId,username,password,port, host数据
使用MQTTX,新建连接,使用上一步获取的参数进行连接。如下图:
切换到第1步运行Python脚本的终端,此时应该能接收到设备上线消息。
五、文档参考
[阿里云kafka开发指导]:
- 本页导读 (0)
- 一、前提条件
- 二、配置服务端订阅
- 三、编写接收端代码
- 3.1. 公网
- 四、模拟设备上报消息
- 五、文档参考