介绍 Python 的 High-Level SDK中 Producer 和 Consumer 的相关参数和常见用法。
前提条件
Python 3.6+
已创建 DataHub Project 和 Topic,并获取订阅 ID(sub_id)
已获取 RAM 用户的 AccessKey ID 和 AccessKey Secret
安装 SDK
pip install pydatahub如需使用零信任凭证(推荐),还需安装:
pip install alibabacloud-credentials注意:PyPI 包名为 pydatahub,而非 aliyun-datahub-sdk。
身份认证
方式一:环境变量(推荐)
阿里云 SDK 支持通过环境变量自动获取凭证,无需在代码中硬编码 AK。
export ALIBABA_CLOUD_ACCESS_KEY_ID=<your_access_key_id>
export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<your_access_key_secret>代码中无需硬编码 AK:
import os
from alibabacloud_credentials.client import Client as CredClient
from alibabacloud_credentials.models import Config as CredConfig
from datahub.client import ProducerConfig, ConsumerConfig
config = CredConfig(
type='access_key',
access_key_id=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID'),
access_key_secret=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
credential = CredClient(config)
# Producer
producer_config = ProducerConfig.from_credential(credential, endpoint)
# Consumer
consumer_config = ConsumerConfig.from_credential(credential, endpoint)方式二:直接使用 AK 构建
from datahub.client import ProducerConfig
# 方式 A:直接传参
producer_config = ProducerConfig(access_id, access_key, endpoint)
# 方式 B:使用 from_access 工厂方法(推荐)
producer_config = ProducerConfig.from_access(access_id, access_key, endpoint)重要:使用配置文件方案时,请确保系统中不存在环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID 和 ALIBABA_CLOUD_ACCESS_KEY_SECRET,否则配置文件将不生效。
Producer 写入数据
限制说明
线程安全:Producer 是线程安全的,同一进程内一个 Topic 只需一个 Producer 实例。
ProducerConfig 参数说明
参数 | 类型 | 默认值 | 说明 |
| str | 必填 | DataHub 服务地址,如 |
| int |
| 写入失败时的重试次数, |
| int |
| 异步线程池大小(范围:2~100) |
| int |
| 异步队列长度限制 |
| int |
| 异步缓冲区最大字节数 |
| int |
| 异步缓冲区最大记录条数 |
| int |
| 异步缓冲区最大等待时间(秒) |
| int |
| 单次打包队列长度限制 |
| DatahubProtocolType |
| 协议类型: |
| CompressFormat |
| 压缩格式: |
| logging.Level |
| 日志级别 |
| str |
| 日志文件路径 |
注意:Python SDK 当前支持的压缩格式为 NONE、LZ4、DEFLATE、ZLIB,不支持 ZSTD。
同步写入
import os
from alibabacloud_credentials.client import Client as CredClient
from alibabacloud_credentials.models import Config as CredConfig
from datahub.core import DatahubProtocolType
from datahub.models import TupleRecord, BlobRecord, FieldType, RecordSchema, CompressFormat
from datahub.client import DatahubProducer, ProducerConfig
from datahub.exceptions import DatahubException
# ====== 配置 ======
endpoint = "https://dh-cn-hangzhou.aliyuncs.com"
project_name = "test_project"
topic_name = "test_topic"
# ====== 认证 ======
config = CredConfig(
type='access_key',
access_key_id=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID'),
access_key_secret=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
credential = CredClient(config)
producer_config = ProducerConfig.from_credential(credential, endpoint)
producer_config.protocol_type = DatahubProtocolType.PB
producer_config.compress_format = CompressFormat.LZ4
producer_config.retry_times = 3
# ====== 创建 Producer ======
producer = DatahubProducer(project_name, topic_name, producer_config)
try:
# 获取 Topic 元信息(包含 field_list)
topic_meta = producer.topic_meta
# 构建 TUPLE 类型记录
records = []
for i in range(100):
record = TupleRecord(schema=topic_meta.record_schema)
record.set_value(0, f"user_{i}") # 按索引设置字段值
record.set_value(1, "login")
record.set_value(2, 1700000000 + i)
record.put_attribute("source", "python-sdk")
records.append(record)
# 同步写入(阻塞直到写入完成)
shard_id = producer.write(records)
print(f"同步写入成功,写入 Shard: {shard_id}")
finally:
producer.close()注意:TupleRecord 没有 .schema 属性。访问字段元信息请用 topic_meta.record_schema.field_list(Producer 端),或 record.field_list(Consumer 端)。
异步写入
producer = DatahubProducer(project_name, topic_name, producer_config)
try:
topic_meta = producer.topic_meta
records = []
for i in range(100):
record = TupleRecord(schema=topic_meta.record_schema)
record.set_value(0, f"user_{i}")
record.set_value(1, "click")
record.set_value(2, 1700000000 + i)
records.append(record)
# 异步写入(非阻塞,立即返回)
shard_id = producer.write_async(records)
print(f"异步写入提交成功,写入 Shard: {shard_id}")
# 程序退出前调用 flush 确保所有数据发送完毕
producer.flush()
finally:
producer.close()注意:异步写入时,数据会先写入本地缓冲区,由后台线程池异步发送。程序退出前必须调用 producer.flush() 确保数据发送完毕。
Consumer 消费数据
限制说明
订阅 ID:消费数据需要先创建订阅(SubId),一个订阅对应一组独立的消费位点。
线程安全:Consumer 不是线程安全的,每个线程应使用独立的 Consumer 实例。
消费模式:支持 协同消费(自动分配 Shard)和 指定 Shard 消费(手动分配 Shard)两种模式。
ConsumerConfig 参数说明
参数 | 类型 | 默认值 | 说明 |
| str | 必填 | DataHub 服务地址 |
| int |
| 消费失败时的重试次数, |
| int |
| 异步线程池大小(范围:2~100) |
| int |
| 异步队列长度限制 |
| bool |
| 是否自动提交消费位点(ACK)。关闭后需手动 |
| int |
| 会话超时时间(毫秒)。超时未心跳的 Consumer 会被认为下线 |
| int |
| 单次读取最大记录数 |
| int |
| 单次请求 DataHub 拉取的最大记录数 |
| DatahubProtocolType |
| 协议类型 |
| CompressFormat |
| 压缩格式 |
| logging.Level |
| 日志级别 |
| str |
| 日志文件路径 |
自动 ACK 消费(推荐)
最简单的方式,读取成功后自动提交位点。适用于对数据丢失不敏感的场景。
import os
from alibabacloud_credentials.client import Client as CredClient
from alibabacloud_credentials.models import Config as CredConfig
from datahub.core import DatahubProtocolType
from datahub.models import CompressFormat, TupleRecord, BlobRecord, FieldType
from datahub.client import DatahubConsumer, ConsumerConfig
from datahub.exceptions import DatahubException
# ====== 配置 ======
endpoint = "https://dh-cn-hangzhou.aliyuncs.com"
project_name = "test_project"
topic_name = "test_topic"
sub_id = "YOUR_SUBSCRIPTION_ID" # 在 DataHub 控制台创建
# ====== 认证 ======
config = CredConfig(
type='access_key',
access_key_id=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID'),
access_key_secret=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
credential = CredClient(config)
consumer_config = ConsumerConfig.from_credential(credential, endpoint)
consumer_config.auto_ack_offset = True # 默认值,可省略
# ====== 创建 Consumer(协同消费模式) ======
datahub_consumer = DatahubConsumer(
project_name, topic_name, sub_id, consumer_config
)
record_cnt = 0
try:
while True:
record = datahub_consumer.read(timeout=60)
if record is None:
# timeout 内无数据,返回 None
break
# 区分 TupleRecord 和 BlobRecord
if isinstance(record, TupleRecord):
fields = record.values # 字段值元组
field_list = record.field_list # 字段元信息列表
print(f"Tuple 记录: {fields}")
# 按字段名访问
# for idx, field in enumerate(field_list):
# print(f" {field.name} = {fields[idx]}")
elif isinstance(record, BlobRecord):
print(f"Blob 记录: {record.blob_data}")
record_cnt += 1
# auto_ack_offset=True 时,read() 返回即自动 ACK
except DatahubException as e:
print(f"消费异常: {e}")
finally:
datahub_consumer.close()
print(f"共消费 {record_cnt} 条记录")手动 ACK 消费
适用于要求 每条数据处理成功后才能提交位点 的场景
consumer_config = ConsumerConfig.from_credential(credential, endpoint)
consumer_config.auto_ack_offset = False # 关闭自动 ACK
datahub_consumer = DatahubConsumer(
project_name, topic_name, sub_id, consumer_config
)
try:
while True:
record = None
try:
record = datahub_consumer.read(timeout=60)
if record is not None:
# TODO: 处理数据
process_record(record)
except DatahubException as e:
print(f"读取异常: {e}")
finally:
# 无论成功还是失败,处理完后必须手动 ACK
if record is not None:
record.record_key.ack()
print("手动 ACK 成功")
finally:
datahub_consumer.close()注意:关闭自动 ACK 后,必须对每条读取到的记录调用 record.record_key.ack(),否则消费位点无法推进。
协同消费
不指定 shard_ids 时,Consumer 会自动加入消费组,DataHub 服务端会自动分配 Shard 给各个 Consumer 实例。
# 不传 shard_ids,自动协同消费
datahub_consumer = DatahubConsumer(
project_name, topic_name, sub_id, consumer_config
)
# 启动多个 Consumer 实例(不同进程或不同线程),服务端会自动分配 Shard
# 实例 1 -> 分配 Shard 0, 1
# 实例 2 -> 分配 Shard 2, 3特性:
同一订阅 ID 的多个 Consumer 实例组成一个消费组
服务端自动分配 Shard,确保每个 Shard 只被一个 Consumer 消费
Consumer 下线后,其 Shard 会自动重新分配给其他 Consumer
示例代码下载
常见问题
Q: 同步写入和异步写入怎么选?
同步写入 (
write):阻塞直到数据写入成功或重试耗尽。适合对数据可靠性要求高的场景。异步写入 (
write_async):非阻塞,数据写入后台缓冲区。适合高吞吐场景,程序退出前必须调用flush()。
Q: 自动 ACK 和手动 ACK 怎么选?
自动 ACK:简单高效,但存在极少量数据丢失风险(处理中途进程崩溃)。
手动 ACK:确保每条数据处理完才提交位点,适合金融、订单等不能丢数据的场景。
Q: 如何查看 Producer/Consumer 的日志?
日志默认输出到 ./DatahubClient.log,可以通过配置修改:
import logging
producer_config.logging_level = logging.DEBUG
producer_config.logging_filename = "/var/log/datahub/producer.log"Q: record.record_key.ack() 如何使用?
record.record_key.ack() 仅在手动 ACK 模式下使用,且记录必须是通过 consumer.read() 返回的(服务端会自动注入 record_key)。
consumer_config.auto_ack_offset = False
datahub_consumer = DatahubConsumer(
project_name, topic_name, sub_id, consumer_config
)
try:
while True:
record = datahub_consumer.read(timeout=60)
if record is not None:
# TODO: 处理数据
process_record(record)
# 处理完成后手动 ACK
record.record_key.ack()
finally:
datahub_consumer.close()注意:手动创建的记录(如 Producer 写入时的 TupleRecord)没有 record_key,调用 record.record_key.ack() 会报 AttributeError: 'NoneType' object has no attribute 'ack'。