费用说明
在新版告警公测期间,仅短信通知和语音通知会产生费用。详细价格,请参见产品定价。
操作 | 说明 |
短信通知 | 告警短信通知费用,按通知次数收费。
说明 某些运营商可能将内容过长(例如超过70字符)的短信拆分成2条发送,所以当您编辑的短信内容过长时,您可能收到2条短信,但日志服务只按照1条短信收费。 |
语音通知 | 告警语音通知费用,按通知次数收费。
说明 - 告警语音未拨通时,不会重复拨打,将以短信方式发送一次通知。
- 无论告警语音是否拨通均按一次计费。未拨通的提示短信,不会额外产生短信费用。
|
管理告警监控规则
代码示例如下。具体的参数说明,请参见告警监控规则数据结构。
import os
from aliyun.log import LogClient
# 日志服务的服务接入点。
endpoint = 'cn-huhehaote.log.aliyuncs.com'
# 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
accesskey_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accesskey_secret = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# 创建日志服务Client。
client = LogClient(endpoint, accesskey_id, accesskey_secret)
project = 'demo-alert'
alert_id = 'nginx-status-error'
def create_alert():
alert = {
'name': alert_id,
'displayName': 'Nginx Status Error',
'type': 'Alert',
'state': 'Enabled',
'schedule': {
'type': 'FixedRate',
'interval': '1m'
},
'configuration': {
'version': '2.0',
'type': 'default',
'dashboard': 'internal-alert-analysis',
'queryList': [{
# 告警类型 log:日志库 metric:时序库 meta:资源数据
'storeType': 'log',
# 开服地域
'region': 'cn-huhehaote',
# 项目名称
'project': 'demo-alert',
# 日志库或者时序库名称
'store': 'nginx-access-log',
# 查询语句
'query': 'status >= 400 | select count(*) as cnt',
# 时间片类型
'timeSpanType': 'Truncated',
# 开始时间
'start': '-1m',
# 结束时间
'end': 'absolute',
# 是否开启独享 SQL
'powerSqlMode': 'auto'
}],
'groupConfiguration': {
'type': 'no_group',
'fields': []
},
'joinConfigurations': [],
'severityConfigurations': [{
'severity': 6,
'evalCondition': {
'condition': 'cnt > 0',
'countCondition': ''
}
}],
'labels': [{
'key': 'service',
'value': 'nginx'
}],
'annotations': [{
'key': 'title',
'value': 'Nginx Status Error'
}, {
'key': 'desc',
'value': 'Nginx Status Error, count: ${cnt}'
}],
'autoAnnotation': True,
'sendResolved': False,
'threshold': 1,
'noDataFire': False,
'noDataSeverity': 6,
'policyConfiguration': {
'alertPolicyId': 'sls.builtin.dynamic',
'actionPolicyId': 'test-action-policy',
'repeatInterval': '1m',
'useDefault': False
}
}
}
res = client.create_alert(project, alert)
res.log_print()
def get_and_update_alert():
res = client.get_alert(project, alert_id)
res.log_print()
alert = res.get_body()
alert['configuration']['queryList'][0]['query'] = 'status >= 400 | select count(*) as cnt'
res = client.update_alert(project, alert)
res.log_print()
def enable_and_disable_alert():
res = client.disable_alert(project, alert_id)
res.log_print()
res = client.enable_alert(project, alert_id)
res.log_print()
def list_alerts():
res = client.list_alert(project, offset=0, size=100)
res.log_print()
def delete_alert():
res = client.delete_alert(project, alert_id)
res.log_print()
if __name__ == '__main__':
create_alert()
get_and_update_alert()
enable_and_disable_alert()
list_alerts()
delete_alert()
管理告警资源数据
代码示例如下。具体的参数说明,请参见告警资源数据结构。
管理用户
import os
from aliyun.log import LogClient
from aliyun.log.resource_params import ResourceRecord
# 日志服务的服务入口。资源数据的写操作,必须使用河源地域,读操作可以使用其它地域。
endpoint = 'cn-heyuan.log.aliyuncs.com'
# 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
accesskey_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accesskey_secret = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# 创建日志服务Client。
client = LogClient(endpoint, accesskey_id, accesskey_secret)
user_resource_name = 'sls.common.user'
def create_user():
user = {
'user_id': 'alex',
'user_name': 'Alex',
'email': [
'****@example.com'
],
'country_code': '86',
'phone': '133****3333',
'enabled': True,
'sms_enabled': True,
'voice_enabled': True
}
record = ResourceRecord(user['user_id'], user['user_name'], user)
res = client.create_resource_record(user_resource_name, record)
print('[create user]')
res.log_print()
def get_user():
res = client.get_resource_record(user_resource_name, 'alex')
print('[get user]')
print(res.get_record().to_dict())
def update_user():
user = {
'user_id': 'alex',
'user_name': 'Alex',
'email': [
'****@example.com'
],
'country_code': '86',
'phone': '133****3333',
'enabled': False,
'sms_enabled': True,
'voice_enabled': True
}
record = ResourceRecord(user['user_id'], user['user_name'], user)
res = client.update_resource_record(user_resource_name, record)
print('[update user]')
res.log_print()
def list_users():
res = client.list_resource_records(user_resource_name, offset=0, size=100)
print('[list users]')
print([r.to_dict() for r in res.get_records()])
def delete_user():
res = client.delete_resource_record(user_resource_name, ['alex'])
print('[delete user]')
res.log_print()
if __name__ == '__main__':
create_user()
get_user()
update_user()
list_users()
delete_user()
管理用户组
def create_user_group():
user_group = {
'user_group_id': 'devops',
'user_group_name': 'DevOps Team',
'enabled': True,
'members': ['alex']
}
record = ResourceRecord(user_group['user_group_id'], user_group['user_group_name'], user_group)
res = client.create_resource_record('sls.common.user_group', record)
print('[create user group]')
res.log_print()
管理Webhook集成
def create_webhook_integration():
webhooks = [{
'id': 'dingtalk',
'name': 'Dingtalk Webhook',
'type': 'dingtalk',
'url': 'https://oapi.dingtalk.com/robot/send?access_token=**********',
'method': 'POST',
# 钉钉加签密钥。如果您在钉钉侧设置安全验证方式为签名校验,则需配置该字段。您可以在钉钉机器人管理界面获取加签密钥。
# 'secret': 'SEC**********',
'headers': []
}, {
'id': 'wechat',
'name': 'Wechat Webhook',
'type': 'wechat',
'url': 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=**********',
'method': 'POST',
'headers': []
}, {
'id': 'feishu',
'name': 'Feishu Webhook',
'type': 'lark',
'url': 'https://open.feishu.cn/open-apis/bot/v2/hook/**********',
'method': 'POST',
# 飞书加签密钥。如果您在飞书侧设置安全验证方式为签名校验,则需配置该字段。您可以在飞书机器人管理界面获取加签密钥。
# 'secret': '**********',
'headers': []
}, {
'id': 'slack',
'name': 'Slack Webhook',
'type': 'slack',
'url': 'https://hooks.slack.com/services/**********',
'method': 'POST',
'headers': []
}, {
'id': 'webhook',
'name': 'Common Webhook',
'type': 'custom',
'url': 'https://example.com/***********',
'method': 'POST',
'headers': [{
'key': 'Authorization',
'value': 'Basic YWRtaW46Zm9vYmFy'
}]
}]
for webhook in webhooks:
record = ResourceRecord(webhook['id'], webhook['name'], webhook)
res = client.create_resource_record('sls.alert.action_webhook', record)
print('[create webhook integration] ' + webhook['id'])
res.log_print()
管理行动策略
def create_action_policy():
action_policy = {
'action_policy_id': 'test-action-policy',
'action_policy_name': 'Test Action Policy',
'primary_policy_script': 'fire(type="sms", users=["alex"], groups=[], oncall_groups=[], receiver_type="static", external_url="", external_headers={}, template_id="sls.builtin.cn", period="any")',
'secondary_policy_script': 'fire(type="voice", users=["alex"], groups=[], oncall_groups=[], receiver_type="static", external_url="", external_headers={}, template_id="sls.builtin.cn", period="any")',
'escalation_start_enabled': False,
'escalation_start_timeout': '10m',
'escalation_inprogress_enabled': False,
'escalation_inprogress_timeout': '30m',
'escalation_enabled': True,
'escalation_timeout': '1h'
}
record = ResourceRecord(
action_policy['action_policy_id'], action_policy['action_policy_name'], action_policy)
res = client.create_resource_record('sls.alert.action_policy', record)
print('[create action policy]')
res.log_print()
管理告警策略
def create_alert_policy():
alert_policy = {
'policy_id': 'test-alert-policy',
'policy_name': 'Test Alert Policy',
'parent_id': '',
'group_script': 'fire(action_policy="test-action-policy", group={"alert.alert_id": alert.alert_id}, group_by_all_labels=true, group_wait="15s", group_interval="5m", repeat_interval="1h")',
'inhibit_script': '',
'silence_script': ''
}
record = ResourceRecord(alert_policy['policy_id'], alert_policy['policy_name'], alert_policy)
res = client.create_resource_record('sls.alert.alert_policy', record)
print('[create alert policy]')
res.log_print()
管理内容模板
def create_content_template():
template = {
'template_id': 'test-template',
'template_name': 'Test Template',
'templates': {
'sms': {
'locale': 'zh-CN',
'content': ''
},
'voice': {
'locale': 'zh-CN',
'content': ''
},
'email': {
'locale': 'zh-CN',
'subject': 'SLS Alert',
'content': ''
},
'message_center': {
'locale': 'zh-CN',
'content': ''
},
'dingtalk': {
'locale': 'zh-CN',
'title': 'SLS Alert',
'content': ''
},
'wechat': {
'locale': 'zh-CN',
'title': 'SLS Alert',
'content': ''
},
'lark': {
'locale': 'zh-CN',
'title': 'SLS Alert',
'content': ''
},
'slack': {
'locale': 'zh-CN',
'title': 'SLS Alert',
'content': ''
},
'webhook': {
'locale': 'zh-CN',
'send_type': 'batch',
'limit': 0,
'content': ''
},
'fc': {
'locale': 'zh-CN',
'limit': 0,
'send_type': 'batch',
'content': ''
},
'event_bridge': {
'locale': 'zh-CN',
'subject': 'SLS Alert',
'content': ''
},
}
}
record = ResourceRecord(template['template_id'], template['template_name'], template)
res = client.create_resource_record('sls.alert.content_template', record)
print('[create content template]')
res.log_print()