下载地址:单击下载。
该 SDK GitHub 地址如下:
https://github.com/aliyun/aliyun-log-python-sdk
为快速开始使用日志服务Python SDK,请按照如下步骤进行:
具体方法请参考 阿里云账号注册流程。
为了更好地使用阿里云服务,建议尽快完成实名认证,否则部分阿里云服务将无法使用。具体实名认证流程,请参考 这里。
为了使用SDK,您必须申请阿里云的访问密钥:
该密钥对会在下面的步骤使用,且需要保管好,不能对外泄露。另外,您可以参考SDK配置了解更多SDK如何使用访问密钥的信息。
在进行SDK操作之前,请先在控制台上创建Project和Logstore。
登录 日志服务管理控制台。
单击右上角的 创建Project。
填写 Project名称 和 所属地域,单击 确认。
在 Project列表 页面,单击项目的名称,然后单击 创建 创建日志库。
或者在创建完项目后,根据系统提示创建日志库,单击 创建。
填写日志库的配置信息并单击 确认。
您需要在此处填写Logstore名称、数据保存时间、Shard数目等。请按照您的需求合理设置Shard数目,例如本文实例中,您需要设置4个Shard。
注意:
- 请确保使用同一阿里云账号获取阿里云访问密钥和创建项目及日志库
- 关于日志服务的项目、日志库等概念请参考核心概念。
- Project名称为日志服务全局唯一,而Logstore名称在一个Project下面唯一。
- Project一旦创建则无法更改它的所属区域。目前也不支持在不同阿里云Region间迁移Project。
Python SDK是一个纯Python的库,支持所有运行Python的操作系统。目前主要为Linux、Mac OS X和Windows。请如下安装Python:
下载并安装最新的Python 安装包。
注意:
- 目前,Python SDK支持Python 2.6/2.7和Python 3.3/3.4/3.5/3.6环境。您可以运行
python -V
查询当前的Python运行版本。- Python官方不再支持Python 2.6和Python 3.3,推荐您使用Python 2.7、Python 3.4及以上版本。
下载并安装Python的包管理工具 pip。
完成pip安装后,您可以运行
pip -V
确认pip是否安装成功,并查看当前pip版本。
Shell下以管理员权限执以下命令,安装Python SDK。
pip install -U aliyun-log-python-sdk
现在,您可以开始使用SDK Python SDK。使用任何文本编辑器或者Python IDE,运行如下示例代码即可与服务端交互并得到相应输出。
更多信息请参考Github/reamdthedocs。
# encoding: utf-8
import time
from aliyun.log.logitem import LogItem
from aliyun.log.logclient import LogClient
from aliyun.log.getlogsrequest import GetLogsRequest
from aliyun.log.putlogsrequest import PutLogsRequest
from aliyun.log.listlogstoresrequest import ListLogstoresRequest
from aliyun.log.gethistogramsrequest import GetHistogramsRequest
def main():
endpoint = '' # 选择与上面步骤创建Project所属区域匹配的Endpoint
accessKeyId = '' # 使用您的阿里云访问密钥AccessKeyId
accessKey = '' # 使用您的阿里云访问密钥AccessKeySecret
project = '' # 上面步骤创建的项目名称
logstore = '' # 上面步骤创建的日志库名称
# 重要提示:创建的logstore请配置为4个shard以便于后面测试通过
# 构建一个client
client = LogClient(endpoint, accessKeyId, accessKey)
# list 所有的logstore
req1 = ListLogstoresRequest(project)
res1 = client.list_logstores(req1)
res1.log_print()
topic = ""
source = ""
# 发送10个数据包,每个数据包有10条log
for i in range(10):
logitemList = [] # LogItem list
for j in range(10):
contents = [('index', str(i * 10 + j))]
logItem = LogItem()
logItem.set_time(int(time.time()))
logItem.set_contents(contents)
logitemList.append(logItem)
req2 = PutLogsRequest(project, logstore, topic, source, logitemList)
res2 = client.put_logs(req2)
res2.log_print()
# list所有的shard,读取上1分钟写入的数据全部读取出来
listShardRes = client.list_shards(project, logstore)
for shard in listShardRes.get_shards_info():
shard_id = shard["shardID"]
start_time = int(time.time() - 60)
end_time = start_time + 60
res = client.get_cursor(project, logstore, shard_id, start_time)
res.log_print()
start_cursor = res.get_cursor()
res = client.get_cursor(project, logstore, shard_id, end_time)
end_cursor = res.get_cursor()
while True:
loggroup_count = 100 # 每次读取100个包
res = client.pull_logs(project, logstore, shard_id, start_cursor, loggroup_count, end_cursor)
res.log_print()
next_cursor = res.get_next_cursor()
if next_cursor == start_cursor:
break
start_cursor = next_cursor
# 重要提示: 只有打开索引功能,才可以使用以下接口来查询数据
time.sleep(60)
topic = ""
query = "index"
From = int(time.time()) - 600
To = int(time.time())
res3 = None
# 查询最近10分钟内,满足query条件的日志条数,如果执行结果不是完全正确,则进行重试
while (res3 is None) or (not res3.is_completed()):
req3 = GetHistogramsRequest(project, logstore, From, To, topic, query)
res3 = client.get_histograms(req3)
res3.log_print()
# 获取满足query的日志条数
total_log_count = res3.get_total_count()
log_line = 10
# 每次读取10条日志,将日志数据查询完,对于每一次查询,如果查询结果不是完全准确,则重试3次
for offset in range(0, total_log_count, log_line):
res4 = None
for retry_time in range(0, 3):
req4 = GetLogsRequest(project, logstore, From, To, topic, query, log_line, offset, False)
res4 = client.get_logs(req4)
if res4 is not None and res4.is_completed():
break
time.sleep(1)
if res4 is not None:
res4.log_print()
listShardRes = client.list_shards(project, logstore)
shard = listShardRes.get_shards_info()[0]
# 分裂shard
if shard["status"] == "readwrite":
shard_id = shard["shardID"]
inclusiveBeginKey = shard["inclusiveBeginKey"]
midKey = inclusiveBeginKey[:-1] + str((int(inclusiveBeginKey[-1:])) + 1)
client.split_shard(project, logstore, shard_id, midKey)
# 合并shard
shard = listShardRes.get_shards_info()[1]
if shard["status"] == "readwrite":
shard_id = shard["shardID"]
client.merge_shard(project, logstore, shard_id)
# 删除shard
shard = listShardRes.get_shards_info()[-1]
if shard["status"] == "readonly":
shard_id = shard["shardID"]
client.delete_shard(project, logstore, shard_id)
# 创建外部数据源
def sample_external_store(client,project):
res = client.create_external_store(project,ExternalStoreConfig("rds_store","cn-qingdao","rds-vpc","vpc-m5eq4irc1pucpk85frr5j","i-m5eeo2whsnfg4kzq54ah","47.104.78.128","3306","root","sfdsfldsfksflsdfs","meta","join_meta"));
res.log_print()
res = client.update_external_store(project,ExternalStoreConfig("rds_store","cn-qingdao","rds-vpc","vpc-m5eq4irc1pucpk85frr5j","i-m5eeo2whsnfg4kzq54ah","47.104.78.128","3306","root","sfdsfldsfksflsdfs","meta","join_meta"));
res.log_print()
res = client.get_external_store(project,"rds_store");
res.log_print()
res = client.list_external_store(project,"");
res.log_print();
res = client.delete_external_store(project,"rds_store")
res.log_print();
# 使用python sdk进行查询分析
req4 = GetLogsRequest(project, logstore, From, To, topic, "* | select count(1)", 10,0, False)
res4 = client.get_logs(req4)
# 使用python sdk进行join rds查询
req4 = GetLogsRequest(project, logstore, From, To, topic, "* | select count(1) from "+logstore +" l join rds_store r on l.ikey =r.ekey", 10,0, False)
res4 = client.get_logs(req4)
# 使用python sdk把查询结果写入rds
req4 = GetLogsRequest(project, logstore, From, To, topic, "* | insert into rds_store select count(1) ", 10,0, False)
res4 = client.get_logs(req4)
if __name__ == '__main__':
main()
在文档使用中是否遇到以下问题
更多建议
匿名提交