更新时间:2020-12-10 20:12
$ sudo pip install pydatahub
$ git clone https://github.com/aliyun/aliyun-datahub-sdk-python.git
$ cd aliyun-datahub-sdk-python
$ sudo python setup.py install
1.如果安装过程中出现错误信息’Python.h: No such file or directory’,常用的操作系统安装方式如下
$ sudo apt-get install python-dev # for python2.x installs
$ sudo apt-get install python3-dev # for python3.x installs
$ sudo yum install python-devel # for python2.x installs
$ sudo yum install python3-devel # for python3 installs
2.如果使用windows操作系统,根据提示信息可到 此处 下载安装对应版本的 Visual C++ SDK。
Windows 10 安装cprotobuf依赖时如果报类似如下错误,也表示需要安装Visual C++ 生成工具:
bulding 'cprotobuf.internal' extention
error: [WinError2] The system cannot find the file specified
推荐使用python3.6或以上,会明确提示所需版本及链接信息。
3.Windows 下如果安装依赖时报类似如下错误,是环境问题所致,请搜索相关错误,根据具体情况,拷贝所需文件,或是直接使用 developer command prompt 工具进行安装:
LINK : fatal error LNK1158: cannot run 'rc.exe'
4.Windows 7 如果提示如下错误,可安装此 build tools:
error: Microsoft Visual C++ 14.0 is required. Get it with "Microsoft Visual C++ Build Tools": https://visualstudio.microsoft.com/downloads/
$ python -c "from datahub import DataHub"
如果上述命令执行成功,恭喜你安装Datahub Python版本SDK成功!
详见: 名词解释
import sys
import traceback
from datahub import DataHub
from datahub.exceptions import ResourceExistException
from datahub.models import FieldType, RecordSchema, TupleRecord, BlobRecord, CursorType, RecordType
access_id = ***your access id***
access_key = ***your access key***
endpoint = ***your datahub server endpoint***
dh = DataHub(access_id, access_key, endpoint)
project_name = 'project'
comment = 'comment'
try:
dh.create_project(project_name, comment)
print("create project success!")
print("=======================================\n\n")
except ResourceExistException:
print("project already exist!")
print("=======================================\n\n")
except Exception as e:
print(traceback.format_exc())
sys.exit(-1)
类型 | 含义 | 值域 |
---|---|---|
Bigint | 8字节有符号整型。请不要使用整型的最小值 (-9223372036854775808),这是系统保留值。 | -9223372036854775807 ~ 9223372036854775807 |
String | 字符串,只支持UTF-8编码。 | 单个String列最长允许1MB。 |
Boolean | 布尔型。 | 可以表示为True/False,true/false, 0/1 |
Double | 8字节双精度浮点数。 | -1.0 10308 ~ 1.0 10308 |
TimeStamp | 时间戳类型 | 表示到微秒的时间戳类型 |
topic_name = "tuple_topic"
shard_count = 3
life_cycle = 7
record_schema = RecordSchema.from_lists(
['bigint_field', 'string_field', 'double_field', 'bool_field', 'time_field'],
[FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP])
try:
dh.create_tuple_topic(project_name, topic_name, shard_count, life_cycle, record_schema, comment)
print("create tuple topic success!")
print("=======================================\n\n")
except ResourceExistException:
print("topic already exist!")
print("=======================================\n\n")
except Exception as e:
print(traceback.format_exc())
sys.exit(-1)
topic_name = "blob_topic"
shard_count = 3
life_cycle = 7
try:
dh.create_blob_topic(project_name, topic_name, shard_count, life_cycle, comment)
print("create blob topic success!")
print("=======================================\n\n")
except ResourceExistException:
print("topic already exist!")
print("=======================================\n\n")
except Exception as e:
print(traceback.format_exc())
sys.exit(-1)
shard_result = dh.list_shard(project_name, topic_name)
shards = shard_result.shards
print(len(shards))
返回结果是一个ListShardResult对象,包含一个Shard对象的list,list中的每个元素是一个shard,可以获取shard_id,state状态,begin_hash_key,end_hash_key等信息
put_result = dh.put_records(project_name, topic_name, records)
print(put_result.failed_record_count)
print(put_result.failed_records)
其中传入参数records是一个List对象,每个元素为一个record,但是必须为相同类型的record,即Tuple类型或者Blob类型,返回结果为PutRecordsResult对象,包含failed_record_count和failed_records成员,failed_records是一个FailedRecord对象的list,FailedRecord对象包含成员index,error_code和error_message
try:
# block等待所有shard状态ready
dh.wait_shards_ready(project_name, topic_name)
print("shards all ready!!!")
print("=======================================\n\n")
topic_result = dh.get_topic(project_name, topic_name)
print(topic_result)
if topic_result.record_type != RecordType.TUPLE:
print("topic type illegal!")
sys.exit(-1)
print("=======================================\n\n")
record_schema = topic_result.record_schema
records0 = []
record0 = TupleRecord(schema=record_schema, values=[1, 'yc1', 10.01, True, 1455869335000000])
record0.shard_id = '0'
record0.put_attribute('AK', '47')
records0.append(record0)
record1 = TupleRecord(schema=record_schema)
record1.set_value('bigint_field', 2)
record1.set_value('string_field', 'yc2')
record1.set_value('double_field', None)
record1.set_value('bool_field', False)
record1.set_value('time_field', 1455869335000011)
record1.hash_key = '4FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD'
records0.append(record1)
record2 = TupleRecord(schema=record_schema)
record2.set_value(0, 3)
record2.set_value(1, 'yc3')
record2.set_value(2, 1.1)
record2.set_value(3, False)
record2.set_value(4, 1455869335000011)
record2.attributes = {'key': 'value'}
record2.partition_key = 'TestPartitionKey'
records0.append(record2)
put_result = dh.put_records(project_name, topic_name, records0)
print(put_result)
print("put tuple %d records, failed count: %d" %(len(records0), put_result.failed_record_count))
# failed_record_count如果大于0最好对failed record再进行重试
print("=======================================\n\n")
except DatahubException as e:
print(e)
sys.exit(-1)
try:
records1 = []
record3 = BlobRecord(blob_data='data')
record3.shard_id = '0'
record3.put_attribute('a', 'b')
records1.append(record3)
put_result = dh.put_records(project_name, topic_name, records1)
print(put_result)
except DatahubException as e:
print(e)
sys.exit(-1)
shard_id = '0'
time_stamp = 0
cursor_result0 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.OLDEST)
cursor_result1 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.LATEST)
cursor_result2 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.SYSTEM_TIME, time_stamp)
cursor = cursor_result0.cursor
通过get_cursor接口获取用于读取指定位置之后数据的cursor
project_name = 'project'
shard_id = "0"
limit = 10
# 读取blob topic的record
topic_name = 'blob_topic'
get_result = dh.get_blob_records(project_name, topic_name, shard_id, cursor, limit)
# 读取tuple topic的record
topic_name = 'tuple_topic'
get_result = dh.get_tuple_records(project_name, topic_name, shard_id, record_schema, cursor, limit)
try:
# block等待所有shard状态ready
dh.wait_shards_ready(project_name, topic_name)
print("shards all ready!!!")
print("=======================================\n\n")
topic_result = dh.get_topic(project_name, topic_name)
print(topic_result)
if topic_result.record_type != RecordType.TUPLE:
print("topic type illegal!")
sys.exit(-1)
print("=======================================\n\n")
shard_id = '0'
limit = 10
cursor_result = dh.get_cursor(project_name, topic_name, shard_id, CursorType.OLDEST)
cursor = cursor_result.cursor
while True:
get_result = dh.get_tuple_records(project_name, topic_name, shard_id, record_schema, cursor, limit)
for record in get_result.records:
print(record)
if 0 == get_result.record_count:
time.sleep(1)
cursor = get_result.next_cursor
except DatahubException as e:
print(e)
sys.exit(-1)
在文档使用中是否遇到以下问题
更多建议
匿名提交