PythonSDK
安装
快速安装
$ sudo pip install pydatahub
源码安装
$ git clone https://github.com/aliyun/aliyun-datahub-sdk-python.git
$ cd aliyun-datahub-sdk-python
$ sudo python setup.py install
常见问题
1.如果安装过程中出现错误信息’Python.h: No such file or directory’,常用的操作系统安装方式如下
$ sudo apt-get install python-dev # for python2.x installs
$ sudo apt-get install python3-dev # for python3.x installs
$ sudo yum install python-devel # for python2.x installs
$ sudo yum install python3-devel # for python3 installs
2.如果使用windows操作系统,根据提示信息可到 此处 下载安装对应版本的 Visual C++ SDK。Windows 10 安装cprotobuf依赖时如果报类似如下错误,也表示需要安装Visual C++ 生成工具:
bulding 'cprotobuf.internal' extention
error: [WinError2] The system cannot find the file specified
推荐使用python3.6或以上,会明确提示所需版本及链接信息。
3.Windows 下如果安装依赖时报类似如下错误,是环境问题所致,请搜索相关错误,根据具体情况,拷贝所需文件,或是直接使用 developer command prompt 工具进行安装:
LINK : fatal error LNK1158: cannot run 'rc.exe'
4.Windows 7 如果提示如下错误,可安装此 build tools:
error: Microsoft Visual C++ 14.0 is required. Get it with "Microsoft Visual C++ Build Tools": https://visualstudio.microsoft.com/downloads/
安装验证
$ python -c "from datahub import DataHub"
如果上述命令执行成功,恭喜你安装DataHub Python版本SDK成功!
基本概念
详见: 名词解释
准备工作
访问DataHub服务需要使用阿里云认证账号,需要提供阿里云accessId及accessKey。 同时需要提供访问的服务地址。
创建Project
登录DataHub WebConsole页面,创建Project
或使用SDK接口进行创建
初始化DataHub
import sys
import traceback
from datahub import DataHub
from datahub.exceptions import ResourceExistException
from datahub.models import FieldType, RecordSchema, TupleRecord, BlobRecord, CursorType, RecordType
access_id = ***your access id***
access_key = ***your access key***
endpoint = ***your datahub server endpoint***
dh = DataHub(access_id, access_key, endpoint)
Project操作
创建示例
project_name = 'project'
comment = 'comment'
try:
dh.create_project(project_name, comment)
print("create project success!")
print("=======================================\n\n")
except ResourceExistException:
print("project already exist!")
print("=======================================\n\n")
except Exception as e:
print(traceback.format_exc())
sys.exit(-1)
Topic操作
Tuple Topic
Tuple类型Topic写入的数据是有格式的,需要指定Record Schema,目前支持以下几种数据类型:
类型 | 含义 | 值域 |
Bigint | 8字节有符号整型。请不要使用整型的最小值 (-9223372036854775808),这是系统保留值。 | -9223372036854775807 ~ 9223372036854775807 |
String | 字符串,只支持UTF-8编码。 | 单个String列最长允许1MB。 |
Boolean | 布尔型。 | 可以表示为True/False,true/false, 0/1 |
Double | 8字节双精度浮点数。 | -1.0 10308 ~ 1.0 10308 |
TimeStamp | 时间戳类型 | 表示到微秒的时间戳类型 |
创建示例
topic_name = "tuple_topic"
shard_count = 3
life_cycle = 7
record_schema = RecordSchema.from_lists(
['bigint_field', 'string_field', 'double_field', 'bool_field', 'time_field'],
[FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP])
try:
dh.create_tuple_topic(project_name, topic_name, shard_count, life_cycle, record_schema, comment)
print("create tuple topic success!")
print("=======================================\n\n")
except ResourceExistException:
print("topic already exist!")
print("=======================================\n\n")
except Exception as e:
print(traceback.format_exc())
sys.exit(-1)
Blob Topic
Blob类型Topic支持写入一块二进制数据作为一个Record,数据将会以BASE64编码传输。
topic_name = "blob_topic"
shard_count = 3
life_cycle = 7
try:
dh.create_blob_topic(project_name, topic_name, shard_count, life_cycle, comment)
print("create blob topic success!")
print("=======================================\n\n")
except ResourceExistException:
print("topic already exist!")
print("=======================================\n\n")
except Exception as e:
print(traceback.format_exc())
sys.exit(-1)
数据发布/订阅
获取Shard列表
list_shards接口获取topic下的所有shard
shard_result = dh.list_shard(project_name, topic_name)
shards = shard_result.shards
print(len(shards))
返回结果是一个ListShardResult对象,包含一个Shard对象的list,list中的每个元素是一个shard,可以获取shard_id,state状态,begin_hash_key,end_hash_key等信息
发布数据
put_records接口向一个topic发布数据
put_result = dh.put_records(project_name, topic_name, records)
print(put_result.failed_record_count)
print(put_result.failed_records)
其中传入参数records是一个List对象,每个元素为一个record,但是必须为相同类型的record,即Tuple类型或者Blob类型,返回结果为PutRecordsResult对象,包含failed_record_count和failed_records成员,failed_records是一个FailedRecord对象的list,FailedRecord对象包含成员index,error_code和error_message
写入Tuple类型Record示例
try:
# block等待所有shard状态ready
dh.wait_shards_ready(project_name, topic_name)
print("shards all ready!!!")
print("=======================================\n\n")
topic_result = dh.get_topic(project_name, topic_name)
print(topic_result)
if topic_result.record_type != RecordType.TUPLE:
print("topic type illegal!")
sys.exit(-1)
print("=======================================\n\n")
record_schema = topic_result.record_schema
records0 = []
record0 = TupleRecord(schema=record_schema, values=[1, 'yc1', 10.01, True, 1455869335000000])
record0.shard_id = '0'
record0.put_attribute('AK', '47')
records0.append(record0)
record1 = TupleRecord(schema=record_schema)
record1.set_value('bigint_field', 2)
record1.set_value('string_field', 'yc2')
record1.set_value('double_field', None)
record1.set_value('bool_field', False)
record1.set_value('time_field', 1455869335000011)
record1.hash_key = '4FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD'
records0.append(record1)
record2 = TupleRecord(schema=record_schema)
record2.set_value(0, 3)
record2.set_value(1, 'yc3')
record2.set_value(2, 1.1)
record2.set_value(3, False)
record2.set_value(4, 1455869335000011)
record2.attributes = {'key': 'value'}
record2.partition_key = 'TestPartitionKey'
records0.append(record2)
put_result = dh.put_records(project_name, topic_name, records0)
print(put_result)
print("put tuple %d records, failed count: %d" %(len(records0), put_result.failed_record_count))
# failed_record_count如果大于0最好对failed record再进行重试
print("=======================================\n\n")
except DatahubException as e:
print(e)
sys.exit(-1)
写入BLOB类型Record示例
try:
records1 = []
record3 = BlobRecord(blob_data='data')
record3.shard_id = '0'
record3.put_attribute('a', 'b')
records1.append(record3)
put_result = dh.put_records(project_name, topic_name, records1)
print(put_result)
except DatahubException as e:
print(e)
sys.exit(-1)
获取cursor
获取Cursor,可以通过三种方式获取:OLDEST, LATEST, SYSTEM_TIME
OLDEST: 表示获取的cursor指向当前有效数据中时间最久远的record
LATEST: 表示获取的cursor指向当前最新的record
SYSTEM_TIME: 表示获取的cursor指向大于等于该时间(单位毫秒)的第一条record
shard_id = '0'
time_stamp = 0
cursor_result0 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.OLDEST)
cursor_result1 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.LATEST)
cursor_result2 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.SYSTEM_TIME, time_stamp)
cursor = cursor_result0.cursor
通过get_cursor接口获取用于读取指定位置之后数据的cursor
订阅数据
从指定shard读取数据,需要指定从哪个Cursor开始读,并指定读取的上限数据条数,如果从Cursor到shard结尾少于Limit条数的数据,则返回实际的条数的数据。
project_name = 'project'
shard_id = "0"
limit = 10
# 读取blob topic的record
topic_name = 'blob_topic'
get_result = dh.get_blob_records(project_name, topic_name, shard_id, cursor, limit)
# 读取tuple topic的record
topic_name = 'tuple_topic'
get_result = dh.get_tuple_records(project_name, topic_name, shard_id, record_schema, cursor, limit)
消费Tuple类型Record示例
try: # block等待所有shard状态ready dh.wait_shards_ready(project_name, topic_name) print("shards all ready!!!") print("=======================================\n\n") topic_result = dh.get_topic(project_name, topic_name) print(topic_result) if topic_result.record_type != RecordType.TUPLE: print("topic type illegal!") sys.exit(-1) print("=======================================\n\n") shard_id = '0' limit = 10 cursor_result = dh.get_cursor(project_name, topic_name, shard_id, CursorType.OLDEST) cursor = cursor_result.cursor while True: get_result = dh.get_tuple_records(project_name, topic_name, shard_id, record_schema, cursor, limit) for record in get_result.records: print(record) if 0 == get_result.record_count: time.sleep(1) cursor = get_result.next_cursor except DatahubException as e: print(e) sys.exit(-1)
结尾