全部产品
云市场

Python SDK介绍

更新时间:2019-07-30 10:01:29

Python SDK介绍

安装

快速安装

  1. $ sudo pip install pydatahub

源码安装

  1. $ git clone https://github.com/aliyun/aliyun-datahub-sdk-python.git
  2. $ cd aliyun-datahub-sdk-python
  3. $ sudo python setup.py install

常见问题

1.如果安装过程中出现错误信息’Python.h: No such file or directory’,常用的操作系统安装方式如下:

  1. $ sudo apt-get install python-dev # for python2.x installs
  2. $ sudo apt-get install python3-dev # for python3.x installs
  3. $ sudo yum install python-devel # for python2.x installs
  4. $ sudo yum install python34-devel # for python3.4 installs

2.如果使用windows操作系统,根据提示信息可到 此处 下载安装对应版本的 Visual C++ SDK。

Windows 10 安装cprotobuf依赖时如果报类似如下错误,也表示需要安装Visual C++ 生成工具:

  1. bulding 'cprotobuf.internal' extention
  2. error: [WinError2] The system cannot find the file specified

推荐使用python3.6或以上,会明确提示所需版本及链接信息。

3.Windows 下如果安装依赖时报类似如下错误,是环境问题所致,请搜索相关错误,根据具体情况,拷贝所需文件,或是直接使用 developer command prompt 工具进行安装:

  1. LINK : fatal error LNK1158: cannot run 'rc.exe'

安装验证

  1. $ python -c "from datahub import DataHub"

如果上述命令执行成功,恭喜你安装Datahub Python版本SDK成功!

基本概念

详见: https://help.aliyun.com/document_detail/47440.html?spm=5176.product27797.3.2.VGxgya

准备工作

  • 访问DataHub服务需要使用阿里云认证账号,需要提供阿里云accessId及accessKey。 同时需要提供访问的服务地址。
  • 创建Project
  • 初始化Datahub
  1. import sys
  2. import traceback
  3. from datahub import DataHub
  4. from datahub.exceptions import ResourceExistException
  5. from datahub.models import FieldType, RecordSchema, TupleRecord, BlobRecord, CursorType, RecordType
  6. access_id = ***your access id***
  7. access_key = ***your access key***
  8. endpoint = ***your datahub server endpoint***
  9. dh = DataHub(access_id, access_key, endpoint)

Project操作

  • 创建示例
  1. project_name = 'project'
  2. comment = 'comment'
  3. try:
  4. dh.create_project(project_name, comment)
  5. print("create project success!")
  6. print("=======================================\n\n")
  7. except ResourceExistException:
  8. print("project already exist!")
  9. print("=======================================\n\n")
  10. except Exception as e:
  11. print(traceback.format_exc())
  12. sys.exit(-1)

Topic操作

Tuple Topic

  • Tuple类型Topic写入的数据是有格式的,需要指定Record Schema,目前支持以下几种数据类型:
类型 含义 值域
Bigint 8字节有符号整型。请不要使用整型的最小值 (-9223372036854775808),这是系统保留值。 -9223372036854775807 ~ 9223372036854775807
String 字符串,只支持UTF-8编码。 单个String列最长允许1MB。
Boolean 布尔型。 可以表示为True/False,true/false, 0/1
Double 8字节双精度浮点数。 -1.0 10308 ~ 1.0 10308
TimeStamp 时间戳类型 表示到微秒的时间戳类型
  • 创建示例
  1. topic_name = "tuple_topic"
  2. shard_count = 3
  3. life_cycle = 7
  4. record_schema = RecordSchema.from_lists(
  5. ['bigint_field', 'string_field', 'double_field', 'bool_field', 'time_field'],
  6. [FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP])
  7. try:
  8. dh.create_tuple_topic(project_name, topic_name, shard_count, life_cycle, record_schema, comment)
  9. print("create tuple topic success!")
  10. print("=======================================\n\n")
  11. except ResourceExistException:
  12. print("topic already exist!")
  13. print("=======================================\n\n")
  14. except Exception as e:
  15. print(traceback.format_exc())
  16. sys.exit(-1)

Blob Topic

  • Blob类型Topic支持写入一块二进制数据作为一个Record,数据将会以BASE64编码传输。
  1. topic_name = "blob_topic"
  2. shard_count = 3
  3. life_cycle = 7
  4. try:
  5. dh.create_blob_topic(project_name, topic_name, shard_count, life_cycle, comment)
  6. print("create blob topic success!")
  7. print("=======================================\n\n")
  8. except ResourceExistException:
  9. print("topic already exist!")
  10. print("=======================================\n\n")
  11. except Exception as e:
  12. print(traceback.format_exc())
  13. sys.exit(-1)

数据发布/订阅

获取Shard列表

  • list_shards接口获取topic下的所有shard
  1. shard_result = dh.list_shard(project_name, topic_name)
  2. shards = shard_result.shards
  3. print(len(shards))

返回结果是一个ListShardResult对象,包含一个Shard对象的list,list中的每个元素是一个shard,可以获取shard_id,state状态,begin_hash_key,end_hash_key等信息

发布数据

  • put_records接口向一个topic发布数据
  1. put_result = dh.put_records(project_name, topic_name, records)
  2. print(put_result.failed_record_count)
  3. print(put_result.failed_records)

其中传入参数records是一个List对象,每个元素为一个record,但是必须为相同类型的record,即Tuple类型或者Blob类型,返回结果为PutRecordsResult对象,包含failed_record_count和failed_records成员,failed_records是一个FailedRecord对象的list,FailedRecord对象包含成员index,error_code和error_message

  • 写入Tuple类型Record示例
  1. try:
  2. # block等待所有shard状态ready
  3. dh.wait_shards_ready(project_name, topic_name)
  4. print("shards all ready!!!")
  5. print("=======================================\n\n")
  6. topic_result = dh.get_topic(project_name, topic_name)
  7. print(topic_result)
  8. if topic_result.record_type != RecordType.TUPLE:
  9. print("topic type illegal!")
  10. sys.exit(-1)
  11. print("=======================================\n\n")
  12. record_schema = topic_result.record_schema
  13. records0 = []
  14. record0 = TupleRecord(schema=record_schema, values=[1, 'yc1', 10.01, True, 1455869335000000])
  15. record0.shard_id = '0'
  16. record0.put_attribute('AK', '47')
  17. records0.append(record0)
  18. record1 = TupleRecord(schema=record_schema)
  19. record1.set_value('bigint_field', 2)
  20. record1.set_value('string_field', 'yc2')
  21. record1.set_value('double_field', None)
  22. record1.set_value('bool_field', False)
  23. record1.set_value('time_field', 1455869335000011)
  24. record1.hash_key = '4FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD'
  25. records0.append(record1)
  26. record2 = TupleRecord(schema=record_schema)
  27. record2.set_value(0, 3)
  28. record2.set_value(1, 'yc3')
  29. record2.set_value(2, 1.1)
  30. record2.set_value(3, False)
  31. record2.set_value(4, 1455869335000011)
  32. record2.attributes = {'key': 'value'}
  33. record2.partition_key = 'TestPartitionKey'
  34. records0.append(record2)
  35. put_result = dh.put_records(project_name, topic_name, records0)
  36. print(put_result)
  37. print("put tuple %d records, failed count: %d" %(len(records0), put_result.failed_record_count))
  38. # failed_record_count如果大于0最好对failed record再进行重试
  39. print("=======================================\n\n")
  40. except DatahubException as e:
  41. print(e)
  42. sys.exit(-1)
  • 写入BLOB类型Record示例
  1. try:
  2. records1 = []
  3. record3 = BlobRecord(blob_data='data')
  4. record3.shard_id = '0'
  5. record3.put_attribute('a', 'b')
  6. records1.append(record3)
  7. put_result = dh.put_records(project_name, topic_name, records1)
  8. print(put_result)
  9. except DatahubException as e:
  10. print(e)
  11. sys.exit(-1)

获取cursor

  • 获取Cursor,可以通过三种方式获取:OLDEST, LATEST, SYSTEM_TIME

    • OLDEST: 表示获取的cursor指向当前有效数据中时间最久远的record

    • LATEST: 表示获取的cursor指向当前最新的record

    • SYSTEM_TIME: 表示获取的cursor指向大于等于该时间(单位毫秒)的第一条record

  1. shard_id = '0'
  2. time_stamp = 0
  3. cursor_result0 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.OLDEST)
  4. cursor_result1 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.LATEST)
  5. cursor_result2 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.SYSTEM_TIME, time_stamp)
  6. cursor = cursor_result0.cursor

通过get_cursor接口获取用于读取指定位置之后数据的cursor

订阅数据

  • 从指定shard读取数据,需要指定从哪个Cursor开始读,并指定读取的上限数据条数,如果从Cursor到shard结尾少于Limit条数的数据,则返回实际的条数的数据。
  1. project_name = 'project'
  2. shard_id = "0"
  3. limit = 10
  4. # 读取blob topic的record
  5. topic_name = 'blob_topic'
  6. get_result = dh.get_blob_records(project_name, topic_name, shard_id, cursor, limit)
  7. # 读取tuple topic的record
  8. topic_name = 'tuple_topic'
  9. get_result = dh.get_tuple_records(project_name, topic_name, shard_id, record_schema, cursor, limit)
  • 消费Tuple类型Record示例
  1. try:
  2. # block等待所有shard状态ready
  3. dh.wait_shards_ready(project_name, topic_name)
  4. print("shards all ready!!!")
  5. print("=======================================\n\n")
  6. topic_result = dh.get_topic(project_name, topic_name)
  7. print(topic_result)
  8. if topic_result.record_type != RecordType.TUPLE:
  9. print("topic type illegal!")
  10. sys.exit(-1)
  11. print("=======================================\n\n")
  12. shard_id = '0'
  13. limit = 10
  14. cursor_result = dh.get_cursor(project_name, topic_name, shard_id, CursorType.OLDEST)
  15. cursor = cursor_result.cursor
  16. while True:
  17. get_result = dh.get_tuple_records(project_name, topic_name, shard_id, record_schema, cursor, limit)
  18. for record in get_result.records:
  19. print(record)
  20. if 0 == get_result.record_count:
  21. time.sleep(1)
  22. cursor = get_result.next_cursor
  23. except DatahubException as e:
  24. print(e)
  25. sys.exit(-1)

结尾