Python High-Level SDK

更新时间:
复制为 MD 格式

介绍 Python 的 High-Level SDK中 Producer 和 Consumer 的相关参数和常见用法。


前提条件

  • Python 3.6+

  • 已创建 DataHub Project 和 Topic,并获取订阅 ID(sub_id)

  • 已获取 RAM 用户的 AccessKey ID 和 AccessKey Secret


安装 SDK

pip install pydatahub

如需使用零信任凭证(推荐),还需安装:

pip install alibabacloud-credentials

注意:PyPI 包名为 pydatahub,而非 aliyun-datahub-sdk


身份认证

方式一:环境变量(推荐)

阿里云 SDK 支持通过环境变量自动获取凭证,无需在代码中硬编码 AK。

export ALIBABA_CLOUD_ACCESS_KEY_ID=<your_access_key_id>
export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<your_access_key_secret>

代码中无需硬编码 AK:

import os
from alibabacloud_credentials.client import Client as CredClient
from alibabacloud_credentials.models import Config as CredConfig
from datahub.client import ProducerConfig, ConsumerConfig

config = CredConfig(
    type='access_key',
    access_key_id=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    access_key_secret=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
credential = CredClient(config)

# Producer
producer_config = ProducerConfig.from_credential(credential, endpoint)

# Consumer
consumer_config = ConsumerConfig.from_credential(credential, endpoint)

方式二:直接使用 AK 构建

from datahub.client import ProducerConfig

# 方式 A:直接传参
producer_config = ProducerConfig(access_id, access_key, endpoint)

# 方式 B:使用 from_access 工厂方法(推荐)
producer_config = ProducerConfig.from_access(access_id, access_key, endpoint)

重要:使用配置文件方案时,请确保系统中不存在环境变量 ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET,否则配置文件将不生效。


Producer 写入数据

限制说明

  • 线程安全:Producer 是线程安全的,同一进程内一个 Topic 只需一个 Producer 实例。

ProducerConfig 参数说明

参数

类型

默认值

说明

endpoint

str

必填

DataHub 服务地址,如 https://dh-cn-hangzhou.aliyuncs.com

retry_times

int

3

写入失败时的重试次数,-1 表示无限重试

async_thread_limit

int

16

异步线程池大小(范围:2~100)

thread_queue_limit

int

1024

异步队列长度限制

max_async_buffer_size

int

4000000

异步缓冲区最大字节数

max_async_buffer_records

int

10000

异步缓冲区最大记录条数

max_async_buffer_time

int

1

异步缓冲区最大等待时间(秒)

max_record_pack_queue_limit

int

1024

单次打包队列长度限制

protocol_type

DatahubProtocolType

PB

协议类型:PB(Protocol Buffers)或 JSON

compress_format

CompressFormat

LZ4

压缩格式:NONELZ4DEFLATEZLIB

logging_level

logging.Level

INFO

日志级别

logging_filename

str

./DatahubClient.log

日志文件路径

注意:Python SDK 当前支持的压缩格式为 NONELZ4DEFLATEZLIB不支持 ZSTD

同步写入

import os
from alibabacloud_credentials.client import Client as CredClient
from alibabacloud_credentials.models import Config as CredConfig
from datahub.core import DatahubProtocolType
from datahub.models import TupleRecord, BlobRecord, FieldType, RecordSchema, CompressFormat
from datahub.client import DatahubProducer, ProducerConfig
from datahub.exceptions import DatahubException

# ====== 配置 ======
endpoint = "https://dh-cn-hangzhou.aliyuncs.com"
project_name = "test_project"
topic_name = "test_topic"

# ====== 认证 ======
config = CredConfig(
    type='access_key',
    access_key_id=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    access_key_secret=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
credential = CredClient(config)

producer_config = ProducerConfig.from_credential(credential, endpoint)
producer_config.protocol_type = DatahubProtocolType.PB
producer_config.compress_format = CompressFormat.LZ4
producer_config.retry_times = 3

# ====== 创建 Producer ======
producer = DatahubProducer(project_name, topic_name, producer_config)

try:
    # 获取 Topic 元信息(包含 field_list)
    topic_meta = producer.topic_meta

    # 构建 TUPLE 类型记录
    records = []
    for i in range(100):
        record = TupleRecord(schema=topic_meta.record_schema)
        record.set_value(0, f"user_{i}")          # 按索引设置字段值
        record.set_value(1, "login")
        record.set_value(2, 1700000000 + i)
        record.put_attribute("source", "python-sdk")
        records.append(record)

    # 同步写入(阻塞直到写入完成)
    shard_id = producer.write(records)
    print(f"同步写入成功,写入 Shard: {shard_id}")

finally:
    producer.close()

注意TupleRecord 没有 .schema 属性。访问字段元信息请用 topic_meta.record_schema.field_list(Producer 端),或 record.field_list(Consumer 端)。

异步写入

producer = DatahubProducer(project_name, topic_name, producer_config)

try:
    topic_meta = producer.topic_meta

    records = []
    for i in range(100):
        record = TupleRecord(schema=topic_meta.record_schema)
        record.set_value(0, f"user_{i}")
        record.set_value(1, "click")
        record.set_value(2, 1700000000 + i)
        records.append(record)

    # 异步写入(非阻塞,立即返回)
    shard_id = producer.write_async(records)
    print(f"异步写入提交成功,写入 Shard: {shard_id}")

    # 程序退出前调用 flush 确保所有数据发送完毕
    producer.flush()

finally:
    producer.close()

注意:异步写入时,数据会先写入本地缓冲区,由后台线程池异步发送。程序退出前必须调用 producer.flush() 确保数据发送完毕。


Consumer 消费数据

限制说明

  • 订阅 ID:消费数据需要先创建订阅(SubId),一个订阅对应一组独立的消费位点。

  • 线程安全:Consumer 不是线程安全的,每个线程应使用独立的 Consumer 实例。

  • 消费模式:支持 协同消费(自动分配 Shard)和 指定 Shard 消费(手动分配 Shard)两种模式。

ConsumerConfig 参数说明

参数

类型

默认值

说明

endpoint

str

必填

DataHub 服务地址

retry_times

int

3

消费失败时的重试次数,-1 表示无限重试

async_thread_limit

int

16

异步线程池大小(范围:2~100)

thread_queue_limit

int

1024

异步队列长度限制

auto_ack_offset

bool

True

是否自动提交消费位点(ACK)。关闭后需手动 record.record_key.ack()

session_timeout

int

6000

会话超时时间(毫秒)。超时未心跳的 Consumer 会被认为下线

max_record_buffer_size

int

100

单次读取最大记录数

fetch_limit

int

1000

单次请求 DataHub 拉取的最大记录数

protocol_type

DatahubProtocolType

PB

协议类型

compress_format

CompressFormat

LZ4

压缩格式

logging_level

logging.Level

INFO

日志级别

logging_filename

str

./DatahubClient.log

日志文件路径

自动 ACK 消费(推荐)

最简单的方式,读取成功后自动提交位点。适用于对数据丢失不敏感的场景。

import os
from alibabacloud_credentials.client import Client as CredClient
from alibabacloud_credentials.models import Config as CredConfig
from datahub.core import DatahubProtocolType
from datahub.models import CompressFormat, TupleRecord, BlobRecord, FieldType
from datahub.client import DatahubConsumer, ConsumerConfig
from datahub.exceptions import DatahubException

# ====== 配置 ======
endpoint = "https://dh-cn-hangzhou.aliyuncs.com"
project_name = "test_project"
topic_name = "test_topic"
sub_id = "YOUR_SUBSCRIPTION_ID"  # 在 DataHub 控制台创建

# ====== 认证 ======
config = CredConfig(
    type='access_key',
    access_key_id=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    access_key_secret=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
credential = CredClient(config)

consumer_config = ConsumerConfig.from_credential(credential, endpoint)
consumer_config.auto_ack_offset = True    # 默认值,可省略

# ====== 创建 Consumer(协同消费模式) ======
datahub_consumer = DatahubConsumer(
    project_name, topic_name, sub_id, consumer_config
)

record_cnt = 0
try:
    while True:
        record = datahub_consumer.read(timeout=60)
        if record is None:
            # timeout 内无数据,返回 None
            break

        # 区分 TupleRecord 和 BlobRecord
        if isinstance(record, TupleRecord):
            fields = record.values            # 字段值元组
            field_list = record.field_list    # 字段元信息列表
            print(f"Tuple 记录: {fields}")
            # 按字段名访问
            # for idx, field in enumerate(field_list):
            #     print(f"  {field.name} = {fields[idx]}")
        elif isinstance(record, BlobRecord):
            print(f"Blob 记录: {record.blob_data}")

        record_cnt += 1
        # auto_ack_offset=True 时,read() 返回即自动 ACK

except DatahubException as e:
    print(f"消费异常: {e}")
finally:
    datahub_consumer.close()

print(f"共消费 {record_cnt} 条记录")

手动 ACK 消费

适用于要求 每条数据处理成功后才能提交位点 的场景

consumer_config = ConsumerConfig.from_credential(credential, endpoint)
consumer_config.auto_ack_offset = False   # 关闭自动 ACK

datahub_consumer = DatahubConsumer(
    project_name, topic_name, sub_id, consumer_config
)

try:
    while True:
        record = None
        try:
            record = datahub_consumer.read(timeout=60)
            if record is not None:
                # TODO: 处理数据
                process_record(record)
        except DatahubException as e:
            print(f"读取异常: {e}")
        finally:
            # 无论成功还是失败,处理完后必须手动 ACK
            if record is not None:
                record.record_key.ack()
                print("手动 ACK 成功")
finally:
    datahub_consumer.close()

注意:关闭自动 ACK 后,必须对每条读取到的记录调用 record.record_key.ack(),否则消费位点无法推进。

协同消费

不指定 shard_ids 时,Consumer 会自动加入消费组,DataHub 服务端会自动分配 Shard 给各个 Consumer 实例。

# 不传 shard_ids,自动协同消费
datahub_consumer = DatahubConsumer(
    project_name, topic_name, sub_id, consumer_config
)

# 启动多个 Consumer 实例(不同进程或不同线程),服务端会自动分配 Shard
# 实例 1 -> 分配 Shard 0, 1
# 实例 2 -> 分配 Shard 2, 3

特性

  • 同一订阅 ID 的多个 Consumer 实例组成一个消费组

  • 服务端自动分配 Shard,确保每个 Shard 只被一个 Consumer 消费

  • Consumer 下线后,其 Shard 会自动重新分配给其他 Consumer


示例代码下载

aliyun_python_sdk_demo

常见问题

Q: 同步写入和异步写入怎么选?

  • 同步写入 (write):阻塞直到数据写入成功或重试耗尽。适合对数据可靠性要求高的场景。

  • 异步写入 (write_async):非阻塞,数据写入后台缓冲区。适合高吞吐场景,程序退出前必须调用 flush()

Q: 自动 ACK 和手动 ACK 怎么选?

  • 自动 ACK:简单高效,但存在极少量数据丢失风险(处理中途进程崩溃)。

  • 手动 ACK:确保每条数据处理完才提交位点,适合金融、订单等不能丢数据的场景。

Q: 如何查看 Producer/Consumer 的日志?

日志默认输出到 ./DatahubClient.log,可以通过配置修改:

import logging

producer_config.logging_level = logging.DEBUG
producer_config.logging_filename = "/var/log/datahub/producer.log"

Q: record.record_key.ack() 如何使用?

record.record_key.ack() 仅在手动 ACK 模式下使用,且记录必须是通过 consumer.read() 返回的(服务端会自动注入 record_key)。

consumer_config.auto_ack_offset = False

datahub_consumer = DatahubConsumer(
    project_name, topic_name, sub_id, consumer_config
)

try:
    while True:
        record = datahub_consumer.read(timeout=60)
        if record is not None:
            # TODO: 处理数据
            process_record(record)
            # 处理完成后手动 ACK
            record.record_key.ack()
finally:
    datahub_consumer.close()

注意:手动创建的记录(如 Producer 写入时的 TupleRecord)没有 record_key,调用 record.record_key.ack() 会报 AttributeError: 'NoneType' object has no attribute 'ack'