创建投递任务
通过内置的投递任务模板,快速创建投递任务。本文介绍创建投递任务的操作方法。
前提条件
已创建RAM用户并完成授权。具体操作,请参见创建RAM用户并完成授权。
已配置环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见配置环境变量。
重要阿里云账号的AccessKey拥有所有API的访问权限,建议您使用RAM用户的AccessKey进行API访问或日常运维。
强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
步骤一:初始化日志服务Client
LogClient是日志服务的Python客户端,用于管理Project、Logstore等日志服务资源。使用Python SDK发起日志服务请求,您需要初始化一个Client实例。示例代码如下所示:
# Setup basic client
# !pip install -U matplotlib
import time
import os
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import aliyun.log as sls
# 日志服务的服务入口。
endpoint = "cn-beijing.log.aliyuncs.com"
# 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
accessId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# Project名称。
project = "YOUR_SLS_PROJECT"
# MetricStore名称。
metricstore = "YOUR_SLS_METRICSTORE"
# 保存巡检结果的Logstore。
sink_logstore = 'YOUR_SLS_LOGSTORE_FOR_RESULTS_WRITE'
# 设置任务名称。
task_name = "YOUR_TASK_NAME"
# 创建LogClient。
client = sls.LogClient(endpoint, accessId, accessKey)
步骤二:查询当前Logstore的投递任务及配置
在创建投递任务前,查询已存在的投递任务及配置。示例代码如下所示:
# 查询投递任务配置。
project = "YOUR_SLS_PROJECT"
logstore = "YOUR_SLS_LOGSTORE"
ret = client.list_shipper(project, logstore)
for shipper_name in ret.get_body()['shipper']:
ret = client.get_shipper(project, logstore, shipper_name)
print("------------------------------------------")
print(shipper_name)
print(ret.get_body())
步骤三:创建投递任务
创建MaxCompute投递任务
示例代码如下所示:
# 创建投递任务。 from aliyun.log.logexception import LogException project = "YOUR_SLS_PROJECT" logstore = "YOUR_SLS_LOGSTORE" # 投递参数的具体配置。 shipper_config = { 'shipperName': 'test_ship22', 'targetType': 'odps', 'targetConfiguration': { 'bufferInterval': 1800, 'enable': True, 'fields': ['__time__', '__source__', '__topic__', 'content'], 'odpsEndpoint': 'http://odps-ext.aliyun-inc.com/api', 'odpsProject': 'TS_DL', 'odpsTable': 'test_odps', 'partitionColumn': ['__time__'], 'partitionTimeFormat': 'yyyy_MM_dd_HH_mm' } } try: client.create_shipper(project, logstore, shipper_config) except LogException as ex: if 'shipperName already exists' in ex.get_error_message(): # create index if index not exists ret = client.update_shipper(project, logstore, shipper_config) else: raise ex
创建OSS投递任务
示例代码如下:
# 创建投递任务。 # Create Logstore Shipper from aliyun.log.logexception import LogException project = "YOUR_SLS_PROJECT" logstore = "YOUR_SLS_LOGSTORE" # 投递参数的具体配置。 shipper_config = { 'shipperName': 'to-oss', 'targetConfiguration': { 'bufferInterval': 300, 'bufferSize': 256, 'compressType': 'snappy', 'enable': True, 'ossBucket': 'YOUR_oss-bucket', 'ossPrefix': 'prefix', 'pathFormat': '%Y/%m/%d/%H/%M', 'roleArn': 'acs:ram::YOUR_ALIYUN_UID:role/aliyunlogdefaultrole', 'storage': {'detail': {'enableTag': False}, 'format': 'json'} }, 'targetType': 'oss'} # 用户角色标识(ARN)。 try: client.create_shipper(project, logstore, shipper_config) except LogException as ex: if 'shipperName already exists' in ex.get_error_message(): # create index if index not exists ret = client.update_shipper(project, logstore, shipper_config) else: raise ex