通过轻量消息队列感知并响应抢占式实例中断事件

更新时间:2025-03-10 05:47:22

抢占式实例具有被中断的风险,如果您的业务对实例中断敏感,则需要注意及时感知抢占式实例的中断事件,并对中断做出合理的响应处理以免造成业务损失。本文以轻量消息队列(原MNS)为例向您介绍如何借助轻量消息队列感知抢占式实例中断事件并进行响应处理。

整体流程

image

准备工作

  1. 创建AccessKey

    由于阿里云账号(主账号)拥有资源的所有权限,其AccessKey一旦泄露风险巨大,所以建议您使用满足最小化权限需求的RAM用户的AccessKey。获取方法请参见创建AccessKey

  2. RAM用户授权

    RAM用户授予操作轻量消息队列SMQ(原MNS)相关资源的权限。本文提供的示例代码需要从轻量消息队列消费消息,建议授予以下权限:

    云产品

    授予权限

    云产品

    授予权限

    轻量消息队列SMQ(原MNS)

    AliyunMNSFullAccess

  3. 配置访问凭证和访问域名

    本文示例代码会从系统环境变量中读取访问凭证和访问域名:

  4. 安装SMQ SDK

    获取SMQ SDK,本文通过添加Maven依赖的方式来安装SMQ Java SDK。更多安装方式,请参见安装 SMQ SDK

    添加Maven依赖的示例如下:

    <dependencies>
        <!-- 阿里云轻量消息队列的SDK -->
        <dependency>
          <groupId>com.aliyun.mns</groupId>
          <artifactId>aliyun-sdk-mns</artifactId>
          <version>1.2.0</version>
        </dependency>
    </dependencies>

操作步骤

  1. 创建轻量消息队列

    创建轻量消息队列,用来接收云监控发送的抢占式实例中断通知消息。

    1. 登录轻量消息队列(原 MNS)控制台,在左侧导航栏,选择队列模型 > 队列列表

    2. 在顶部菜单栏,选择地域,在队列列表页面,单击创建队列

    3. 创建队列面板根据提示配置参数,单击确定

      image

  2. 创建订阅策略

    云监控实时监控抢占式实例的中断事件,并在发生事件报警时通过订阅策略中指定的推送渠道推送抢占式实例中断通知。

    1. 登录云监控控制台,在左侧导航栏,选择事件中心 > 事件订阅

    2. 订阅策略页签,单击创建订阅策略,下一步,设置订阅策略的相关参数。

      本示例仅说明订阅抢占式实例中断事件所涉及的主要参数,其他参数可根据需要和提示进行填写。更多参数说明,请参见管理事件订阅(推荐)

      • 订阅类型:选择系统事件

        image

      • 订阅范围:按照下图进行填写。

        image

      • 推送与集成:单击添加渠道,弹出窗口中单击增加渠道,选择第1步创建的轻量消息队列,其他参数根据提示完成填写即可。更多推送渠道说明,请参见管理推送渠道

  3. 模拟中断事件

    抢占式实例的中断事件为被动触发事件,当您在开发抢占式实例中断事件处理程序过程中,无法有效地进行代码调试。您可以借助调试事件订阅模拟抢占式实例中断事件。

    1. 订阅策略页签,单击调试事件订阅

    2. 创建事件调试面板,产品选择云服务器ECS名称选择抢占式实例中断通知

      系统自动生成JSON格式的调试内容,您需要将JSON文件中资源相关的信息替换为待模拟中断事件的抢占式实例的信息。

      • 阿里云账号UID变量需要替换为当前登录的阿里云账号UID。

      • <resource-id>以及<instanceId>需要替换为抢占式实例的实例ID。

      • <地域ID>需要替换为抢占式实例所属的地域ID。

        {
            "product": "ECS",
            "resourceId": "acs:ecs:cn-shanghai:阿里云账号UID:instance/<resource-id>",
            "level": "WARN",
            "instanceName": "instanceName",
            "regionId": "<地域ID>",
            "groupId": "0",
            "name": "Instance:PreemptibleInstanceInterruption",
            "content": {
                "instanceId": "<instanceId>",
                "instanceName": "wor***b73",
                "action": "delete"
            },
            "status": "Normal"
        }
    3. 单击确定,系统提示操作成功,云监控自动向轻量消息队列发送一条报警测试通知消息。

  4. 拉取并响应消息

    模拟中断事件处理程序,从轻量消息队列中拉取消费抢占式实例中断通知消息,同时可以根据您的需求添加对应的业务处理逻辑。下面示例代码以一个图片灰度转换处理程序为例演示如何响应和处理中断事件:

    1. 使用线程任务模拟图片转换处理程序。

      import com.aliyun.mns.client.CloudAccount;
      import com.aliyun.mns.client.CloudQueue;
      import com.aliyun.mns.client.MNSClient;
      import com.aliyun.mns.common.utils.ServiceSettings;
      import com.aliyun.mns.model.Message;
      import org.json.JSONObject;
      
      import javax.imageio.ImageIO;
      import java.awt.image.BufferedImage;
      import java.io.File;
      import java.util.Base64;
      import java.util.concurrent.atomic.AtomicBoolean;
      
      /**
       * 可中断的图片处理器实现类,支持灰度转换
       * 中断检测机制(原子变量+线程中断标志)
       * 实现特性:
       * 1. 分块处理自动保存进度
       * 2. 抢占式中断立即响应
       * 3. 中断后生成部分结果文件
       */
      public class InterruptibleImageProcessor implements Runnable {
          /**
           * 使用原子布尔值实现线程安全的状态控制
           */
          private final AtomicBoolean running = new AtomicBoolean(true);
          /**
           * 存储处理中的图像数据
           */
          private BufferedImage processedImage;
          /**
           * 处理进度百分比(0-100)
           */
          private int progress;
          /**
           * 线程执行入口
           * **中断处理逻辑**:
           * 1. 捕获中断异常后保存当前进度
           * 2. 恢复线程中断状态(保持中断语义)
           */
          @Override
          public void run() {
              try {
                  convertToGrayScale(new File("input.jpg"), new File("output.jpg"));
                  // 模拟运行后中断
                  Thread.sleep(5000); 
                  System.out.println("图片处理完成");
              } catch (InterruptedException e) {
                  System.out.println("处理中断,已保存进度至" + progress + "%");
                  saveProgress(new File("partial_output.jpg"));
                  Thread.currentThread().interrupt(); // 恢复中断状态
              } catch (Exception e) {
                  System.err.println("处理错误: " + e.getMessage());
              }
          }
      
          /**
           * 外部中断触发方法
           * **协作机制**:
           * 与线程中断标志配合实现双重中断检测
           */
          public void stop() {
              running.set(false);
          }
      }
      
    2. 图片灰度转换方法。

      /**
       * 将输入图片转换为灰度图并保存
       * @param inputFile 原始图片文件对象
       * @param outputFile 输出文件对象
       * @throws Exception 包含IO异常和中断异常
       *
       * **算法说明**:
       * 使用加权平均法进行灰度转换,系数符合人眼亮度感知公式:
       * Gray = 0.30*R + 0.59*G + 0.11*B
       * 参考:ITU-R BT.601标准
       */
      public void convertToGrayScale(File inputFile, File outputFile) throws Exception {
          // 读取原始图像数据
          BufferedImage original = ImageIO.read(inputFile);
          int width = original.getWidth();
          int height = original.getHeight();
          // 创建灰度模式图像缓冲区
          processedImage = new BufferedImage(width, height, BufferedImage.TYPE_BYTE_GRAY);
      
          // 分块处理以支持进度保存
          for (int y = 0; y < height && running.get(); y++) {
              // 逐像素处理
              for (int x = 0; x < width; x++) {
                  // 第一重中断检测:检查线程中断标志
                  if (Thread.interrupted()) {
                      throw new InterruptedException("图像处理被中断");
                  }
                  /* 灰度转换核心算法 */
                  int rgb = original.getRGB(x, y);
                  // 分解RGB通道(ARGB格式)
                  // 红色通道
                  int r = (rgb >> 16) & 0xFF;
                  // 绿色通道
                  int g = (rgb >> 8) & 0xFF;
                  // 蓝色通道
                  int b = rgb & 0xFF;
                  // 计算灰度值(加权平均法)
                  int gray = (int)(0.3 * r + 0.59 * g + 0.11 * b);
                  // 重构RGB值(灰度值复制到三个通道)
                  processedImage.setRGB(x, y, (gray << 16) | (gray << 8) | gray);
                  // 更新进度百分比(注意整数除法问题)
                  progress = (y * width + x) * 100 / (width * height);
              }
              // 每处理50行自动保存进度(检查点机制)
              if (y % 50 == 0) {
                  saveProgress(outputFile);
              }
          }
          // 最终保存完整结果
          ImageIO.write(processedImage, "jpg", outputFile);
      }
    3. 图片处理进度保存。

      /**
       * 保存处理进度到指定文件
       * @param outputFile 输出文件对象
       *
       * **注意**:
       * 1. 使用静默失败策略避免中断保存过程
       * 2. 生成临时文件名为partial_output.jpg
       */
      private void saveProgress(File outputFile) {
          try {
              // 使用临时文件名避免覆盖最终文件
              ImageIO.write(processedImage, "jpg", new File("partial_output.jpg"));
          } catch (Exception e) {
              System.err.println("自动保存失败: " + e.getMessage());
          }
      }
    4. 响应处理测试。

      测试图片处理程序运行时,感知到抢占式实例即将中断回收并进行响应处理。

      /**
       * 主方法(测试用)
       * **测试场景**:
       * 1. 启动处理线程
       * 2. 拉取中断事件通知消息
       * 3. 等待线程终止
       */
      public static void main(String[] args) throws InterruptedException {
          // 初始化账号MNSClient
          CloudAccount account = new CloudAccount(
                  ServiceSettings.getMNSAccessKeyId(),
                  ServiceSettings.getMNSAccessKeySecret(),
                  ServiceSettings.getMNSAccountEndpoint());
          MNSClient client = account.getMNSClient();
          //判断是否匹配抢占式实例中断事件
          boolean isMatch = false;
          //启动图片处理程序
          InterruptibleImageProcessor processor = new InterruptibleImageProcessor();
          Thread processThread = new Thread(processor);
          processThread.start();
          try{
              //从消息队列获取消息
              CloudQueue queue = client.getQueueRef("spot-interruption");
              Message popMsg = queue.popMessage();
              if (popMsg != null){
                  //消息体默认Base64加密
                  System.out.println("message body: " + popMsg.getMessageBodyAsRawString());
                  //Base64解码
                  byte[] decodedBytes = Base64.getDecoder().decode(popMsg.getMessageBodyAsRawString());
                  String decodedString = new String(decodedBytes);
                  System.out.println("message content: " + decodedString);
                  //json字符串解析
                  JSONObject json = new JSONObject(decodedString);
                  // 获取事件名称 "name"字段的值
                  String name = json.getString("name");
                  isMatch = "Instance:PreemptibleInstanceInterruption".equals(name);
                  //响应抢占式实例中断事件处理
                  if(isMatch){
                      System.out.println("抢占式实例即将被中断回收");
                      //终止图片处理程序
                      processor.stop();
                      processThread.interrupt();
                      System.out.println("程序终止");
                      processThread.join();
                      //删除消息
                      queue.deleteMessage(popMsg.getReceiptHandle());
                  }
              }
          }catch (Exception e){
              System.out.println("Unknown exception happened!");
              e.printStackTrace();
          }
          client.close();
      }

    完整示例代码如下:

    import com.aliyun.mns.client.CloudAccount;
    import com.aliyun.mns.client.CloudQueue;
    import com.aliyun.mns.client.MNSClient;
    import com.aliyun.mns.common.utils.ServiceSettings;
    import com.aliyun.mns.model.Message;
    import org.json.JSONObject;
    import javax.imageio.ImageIO;
    import java.awt.image.BufferedImage;
    import java.io.File;
    import java.util.Base64;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    /**
     * 可中断的图片处理器实现类,支持灰度转换
     * 中断检测机制(原子变量+线程中断标志)
     * 实现特性:
     * 1. 分块处理自动保存进度
     * 2. 抢占式中断立即响应
     * 3. 中断后生成部分结果文件
     */
    public class InterruptibleImageProcessor implements Runnable {
        /**
         * 使用原子布尔值实现线程安全的状态控制
         */
        private final AtomicBoolean running = new AtomicBoolean(true);
        /**
         * 存储处理中的图像数据
         */
        private BufferedImage processedImage;
        /**
         * 处理进度百分比(0-100)
         */
        private int progress;
    
        /**
         * 将输入图片转换为灰度图并保存
         * @param inputFile 原始图片文件对象
         * @param outputFile 输出文件对象
         * @throws Exception 包含IO异常和中断异常
         *
         * **算法说明**:
         * 使用加权平均法进行灰度转换,系数符合人眼亮度感知公式:
         * Gray = 0.30*R + 0.59*G + 0.11*B
         * 参考:ITU-R BT.601标准
         */
        public void convertToGrayScale(File inputFile, File outputFile) throws Exception {
            // 读取原始图像数据
            BufferedImage original = ImageIO.read(inputFile);
            int width = original.getWidth();
            int height = original.getHeight();
            // 创建灰度模式图像缓冲区
            processedImage = new BufferedImage(width, height, BufferedImage.TYPE_BYTE_GRAY);
    
            // 分块处理以支持进度保存
            for (int y = 0; y < height && running.get(); y++) {
                // 逐像素处理
                for (int x = 0; x < width; x++) {
                    // 第一重中断检测:检查线程中断标志
                    if (Thread.interrupted()) {
                        throw new InterruptedException("图像处理被中断");
                    }
                    /* 灰度转换核心算法 */
                    int rgb = original.getRGB(x, y);
                    // 分解RGB通道(ARGB格式)
                    // 红色通道
                    int r = (rgb >> 16) & 0xFF;
                    // 绿色通道
                    int g = (rgb >> 8) & 0xFF;
                    // 蓝色通道
                    int b = rgb & 0xFF;
                    // 计算灰度值(加权平均法)
                    int gray = (int)(0.3 * r + 0.59 * g + 0.11 * b);
                    // 重构RGB值(灰度值复制到三个通道)
                    processedImage.setRGB(x, y, (gray << 16) | (gray << 8) | gray);
                    // 更新进度百分比(注意整数除法问题)
                    progress = (y * width + x) * 100 / (width * height);
                }
                // 每处理50行自动保存进度(检查点机制)
                if (y % 50 == 0) {
                    saveProgress(outputFile);
                }
            }
            // 最终保存完整结果
            ImageIO.write(processedImage, "jpg", outputFile);
        }
    
        /**
         * 保存处理进度到指定文件
         * @param outputFile 输出文件对象
         *
         * **注意**:
         * 1. 使用静默失败策略避免中断保存过程
         * 2. 生成临时文件名为partial_output.jpg
         */
        private void saveProgress(File outputFile) {
            try {
                // 使用临时文件名避免覆盖最终文件
                ImageIO.write(processedImage, "jpg", new File("partial_output.jpg"));
            } catch (Exception e) {
                System.err.println("自动保存失败: " + e.getMessage());
            }
        }
    
        /**
         * 线程执行入口
         * **中断处理逻辑**:
         * 1. 捕获中断异常后保存当前进度
         * 2. 恢复线程中断状态(保持中断语义)
         */
        @Override
        public void run() {
            try {
                convertToGrayScale(new File("/Users/shaoberlin/Desktop/idea_workspace/aliyun/src/main/resources/input.jpg"), new File("output.jpg"));
                // 模拟运行后中断
                Thread.sleep(5000); 
                System.out.println("图片处理完成");
            } catch (InterruptedException e) {
                System.out.println("处理中断,已保存进度至" + progress + "%");
                saveProgress(new File("partial_output.jpg"));
                Thread.currentThread().interrupt(); // 恢复中断状态
            } catch (Exception e) {
                System.err.println("处理错误:" + e.getMessage());
            }
        }
    
        /**
         * 外部中断触发方法
         * **协作机制**:
         * 与线程中断标志配合实现双重中断检测
         */
        public void stop() {
            running.set(false);
        }
    
        /**
         * 主方法(测试用)
         * **测试场景**:
         * 1. 启动处理线程
         * 2. 5000ms后触发中断
         * 3. 等待线程终止
         */
        public static void main(String[] args) throws InterruptedException {
            // 初始化账号MNSClient
            CloudAccount account = new CloudAccount(
                    ServiceSettings.getMNSAccessKeyId(),
                    ServiceSettings.getMNSAccessKeySecret(),
                    ServiceSettings.getMNSAccountEndpoint());
            MNSClient client = account.getMNSClient();
            //判断是否匹配抢占式实例中断事件
            boolean isMatch = false;
            //启动图片处理程序
            InterruptibleImageProcessor processor = new InterruptibleImageProcessor();
            Thread processThread = new Thread(processor);
            processThread.start();
            try{
                //从消息队列获取消息
                CloudQueue queue = client.getQueueRef("spot-interruption");
                Message popMsg = queue.popMessage();
                if (popMsg != null){
                    //消息体默认Base64加密
                    System.out.println("message body: " + popMsg.getMessageBodyAsRawString());
                    //Base64解码
                    byte[] decodedBytes = Base64.getDecoder().decode(popMsg.getMessageBodyAsRawString());
                    String decodedString = new String(decodedBytes);
                    System.out.println("message content: " + decodedString);
                    //json字符串解析
                    JSONObject json = new JSONObject(decodedString);
                    // 获取事件名称 "name"字段的值
                    String name = json.getString("name");
                    isMatch = "Instance:PreemptibleInstanceInterruption".equals(name);
                    //响应抢占式实例中断事件处理
                    if(isMatch){
                        System.out.println("抢占式实例即将被中断回收");
                        //终止图片处理程序
                        processor.stop();
                        processThread.interrupt();
                        System.out.println("程序终止");
                        processThread.join();
                        //删除消息
                        queue.deleteMessage(popMsg.getReceiptHandle());
                    }
                }
            }catch (Exception e){
                System.out.println("Unknown exception happened!");
                e.printStackTrace();
            }
            client.close();
        }
    }
    
说明
  • 业务逻辑如果涉及创建快照,请参考CreateSnapshot

  • 业务逻辑如果涉及创建自定义镜像,请参考CreateImage

相关文档

如果您的抢占式实例上保存了重要数据或配置,建议您了解抢占式实例数据保留和恢复的方法,并提前做好相关配置,避免数据丢失。更多信息,请参见抢占式实例数据保留和恢复

  • 本页导读 (1)
  • 整体流程
  • 准备工作
  • 操作步骤
  • 相关文档
AI助理

点击开启售前

在线咨询服务

你好,我是AI助理

可以解答问题、推荐解决方案等