本文通过实践案例为您介绍云监控如何通过轻量消息队列(原 MNS)的队列实现自动化处理ECS主机状态变化事件。
前提条件
请您确保已在轻量消息队列(原 MNS)控制台,创建队列,例如:ecs-cms-event。
关于如何创建队列,请参见创建队列。
请您确保已在云监控控制台,创建系统事件报警规则,有关具体操作,请参见管理系统事件报警规则(旧版)。
请您确保已安装Python依赖。
本文所有代码均以Python 3.6为例,您也可以使用其他编程语言,例如:Java和PHP。
关于如何安装Python SDK,请参见Python SDK安装。
背景信息
ECS在已有的系统事件基础上,通过云监控新发布了状态变化类事件和抢占型实例的中断通知事件。每当ECS主机的状态发生变化时,都会触发一条ECS状态变化事件。这种变化包括您通过控制台、OpenAPI或SDK操作导致的变化,也包括弹性伸缩或欠费等原因而自动触发的变化,还包括因为系统异常而触发的变化。
云监控提供四种事件报警处理方式,包括:轻量消息队列(原 MNS)、函数计算、URL回调和日志服务。本文以轻量消息队列(原 MNS)为例,为您介绍云监控自动化处理ECS主机状态变更事件的三种最佳实践。
操作步骤
云监控将ECS主机所有的状态变化事件投递到轻量消息队列(原 MNS),轻量消息队列(原 MNS)获取消息并进行消息处理。
实践一:对所有ECS主机的创建和释放事件进行记录。
目前ECS控制台无法查询已经释放的实例。如果您有查询需求,可以通过ECS主机状态变化事件将所有ECS主机的生命周期记录在数据库或日志服务中。每当您创建ECS主机时,会发送一个Pending事件,每当您释放ECS主机时,会发送一个Deleted事件。
编辑一个Conf文件。
Conf文件中需要包含轻量消息队列(原 MNS)的
endpoint
、阿里云的access_key
和access_key_secret
、region_id
(例如:cn-beijing)和queue_name
。说明endpoint
可以在轻量消息队列(原 MNS)控制台的队列页面,单击获取Endpoint。import os # 请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID 和 ALIBABA_CLOUD_ACCESS_KEY_SECRET。 # 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例使用环境变量获取 AccessKey 的方式进行调用,仅供参考,建议使用更安全的 STS 方式 class Conf: endpoint = 'http://<id>.mns.<region>.aliyuncs.com/' access_key = os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'] access_key_secret = os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'] region_id = 'cn-beijing' queue_name = 'test' vsever_group_id = '<your_vserver_group_id>'
使用轻量消息队列(原 MNS)的SDK编写一个MNS Client用来获取MNS消息。
# -*- coding: utf-8 -*- import json from mns.mns_exception import MNSExceptionBase import logging from mns.account import Account from . import Conf class MNSClient(object): def __init__(self): self.account = Account(Conf.endpoint, Conf.access_key, Conf.access_key_secret) self.queue_name = Conf.queue_name self.listeners = dict() def regist_listener(self, listener, eventname='Instance:StateChange'): if eventname in self.listeners.keys(): self.listeners.get(eventname).append(listener) else: self.listeners[eventname] = [listener] def run(self): queue = self.account.get_queue(self.queue_name) while True: try: message = queue.receive_message(wait_seconds=5) event = json.loads(message.message_body) if event['name'] in self.listeners: for listener in self.listeners.get(event['name']): listener.process(event) queue.delete_message(receipt_handle=message.receipt_handle) except MNSExceptionBase as e: if e.type == 'QueueNotExist': logging.error('Queue %s not exist, please create queue before receive message.', self.queue_name) else: logging.error('No Message, continue waiting') class BasicListener(object): def process(self, event): pass
上述代码只对轻量消息队列(原 MNS)获取的数据,调用Listener消费消息之后删除消息。
注册一个指定Listener消费事件。这个简单的Listener判断收到Pending和Deleted事件时,打印一行日志。
# -*- coding: utf-8 -*- import logging from .mns_client import BasicListener class ListenerLog(BasicListener): def process(self, event): state = event['content']['state'] resource_id = event['content']['resourceId'] if state == 'Panding': logging.info(f'The instance {resource_id} state is {state}') elif state == 'Deleted': logging.info(f'The instance {resource_id} state is {state}')
Main函数写法如下:
mns_client = MNSClient() mns_client.regist_listener(ListenerLog()) mns_client.run()
实际生产环境下,可能需要将事件存储在数据库或日志服务SLS中,方便后期的搜索和审计。
实践二:ECS主机关机自动重启。
在某些场景下,ECS主机会非预期地关机,您可能需要自动重启已经关机的ECS主机。
为了实现ECS主机关机后自动重启,您可以复用实践一中的MNS Client,添加一个新的Listener。当您收到Stopped事件时,对该ECS主机执行Start命令。
# -*- coding: utf-8 -*- import logging from aliyunsdkecs.request.v20140526 import StartInstanceRequest from aliyunsdkcore.client import AcsClient from .mns_client import BasicListener from .config import Conf class ECSClient(object): def __init__(self, acs_client): self.client = acs_client # 启动ECS主机 def start_instance(self, instance_id): logging.info(f'Start instance {instance_id} ...') request = StartInstanceRequest.StartInstanceRequest() request.set_accept_format('json') request.set_InstanceId(instance_id) self.client.do_action_with_exception(request) class ListenerStart(BasicListener): def __init__(self): acs_client = AcsClient(Conf.access_key, Conf.access_key_secret, Conf.region_id) self.ecs_client = ECSClient(acs_client) def process(self, event): detail = event['content'] instance_id = detail['resourceId'] if detail['state'] == 'Stopped': self.ecs_client.start_instance(instance_id)
在实际生产环境下,执行完Start命令后,可能还需要继续接收后续的Starting、Running或Stopped等事件,再配合计时器和计数器,进行成功或失败之后的处理。
实践三:抢占型实例释放前,自动从负载均衡SLB移除。
抢占型实例在释放之前五分钟左右,会发出释放告警事件,您可以在这短暂的时间运行业务不中断逻辑,例如:主动从负载均衡SLB的后端服务器中去掉这台即将被释放的抢占型实例,而非被动等待实例释放后负载均衡SLB的自动处理。
您复用实践一的MNS Client,添加一个新的Listener,当收到抢占型实例的释放告警时,调用负载均衡SLB的SDK。
# -*- coding: utf-8 -*- from aliyunsdkcore.client import AcsClient from aliyunsdkcore.request import CommonRequest from .mns_client import BasicListener from .config import Conf class SLBClient(object): def __init__(self): self.client = AcsClient(Conf.access_key, Conf.access_key_secret, Conf.region_id) self.request = CommonRequest() self.request.set_method('POST') self.request.set_accept_format('json') self.request.set_version('2014-05-15') self.request.set_domain('slb.aliyuncs.com') self.request.add_query_param('RegionId', Conf.region_id) def remove_vserver_group_backend_servers(self, vserver_group_id, instance_id): self.request.set_action_name('RemoveVServerGroupBackendServers') self.request.add_query_param('VServerGroupId', vserver_group_id) self.request.add_query_param('BackendServers', "[{'ServerId':'" + instance_id + "','Port':'80','Weight':'100'}]") response = self.client.do_action_with_exception(self.request) return str(response, encoding='utf-8') class ListenerSLB(BasicListener): def __init__(self, vsever_group_id): self.slb_caller = SLBClient() self.vsever_group_id = Conf.vsever_group_id def process(self, event): detail = event['content'] instance_id = detail['instanceId'] if detail['action'] == 'delete': self.slb_caller.remove_vserver_group_backend_servers(self.vsever_group_id, instance_id)
重要抢占型实例释放告警的event name与前面不同,应该是
mns_client.regist_listener(ListenerSLB(Conf.vsever_group_id), 'Instance:PreemptibleInstanceInterruption')
。在实际生产环境下,您需要再申请一台新的抢占型实例,挂载到负载均衡SLB,来保证服务能力。