全部产品

MQ消息回执通知

本文以异步查询OSS数据为例,介绍如何在DLA中使用MQ做消息回执通知。

前提条件

  • 开通OSS服务

    说明

    在DLA中异步执行SQL时,必须开通OSS服务,存储SQL执行结果。

操作步骤

  1. 新建RAM角色。

    1. 使用DLA所属账号登录RAM控制台

    2. 单击RAM角色管理 > 新建RAM角色,在新建RAM角色页面进行以下配置。

      • 可信实体类型选择阿里云账号

      • 受信云账号选择当前云账号

      • RAM角色名称AliyunOpenAnalyticsAccessingMQRole

        创建RAM角色
  2. 修改角色授信策略。

    在RAM角色列表中单击刚刚创建的角色,切换至RAM角色内容页签,单击修改信任策略,用以下策略替换原始策略。

    {
        "Statement": [
            {
                "Action": "sts:AssumeRole",
                "Effect": "Allow",
                "Principal": {
                    "Service": [
                        "openanalytics.aliyuncs.com"
                    ]
                }
            }
        ],
        "Version": "1"
    }
  3. 修改角色权限。

    在RAM角色列表中单击刚刚创建的角色,单击角色右侧的添加权限为角色添加系统权限策略AliyunMQFullAccess

    添加授权策略
  4. 异步执行SQL并使用MQ做消息回执通知。

    语法

    /*+ run_async=true, mq-notify-by=ons, mq-topic=${您的mq的topic}, 
    mq-producer-id=${您的group Id}, mq-endpoint=${您的某个endpoint,与DLA所属region相同} */ 
    select * from xxxx ....;

    示例

    /*+ run_async=true, 
    mq-notify-by=ons, 
    mq-topic=dla_hangzhou_topic, 
    mq-producer-id=GID_dla_hangzhou, 
    mq-endpoint=http://MQ_INST_*****_BagJ6yLU.cn-hangzhou.mq-internal.aliyuncs.com:8080 */  select * from oss_schema.oss_json;

    执行上述SQL,得到SQL任务IDASYNC_TASK_ID,例如q201903271454hze921092f0064543。通过show query_task得到mq_message_id

    mysql> show query_task where id = 'q201903271454hze921092f0064543'\G
    *************************** 1. row ***************************
                      id: q201903271454hze921092f0064543
            mpp_query_id: 20190322_120525_12951_rdxtt
                  status: SUCCESS
               task_name: SELECT
            table_schema: oss_schema
                 command: /*+ run_async=true, mq-notify-by=mns, mq-topic=dla_hangzhou_topic, mq-producer-id=GID_dla_hangzhou, mq-endpoint=http://MQ_INST_*****_BagJ6yLU.cn-hangzhou.mqinternal.aliyuncs.com:8080 */  select * from oss_schema.oss_json
              creator_id: ${您的dla账号}
             create_time: 2019-03-27 14:55:09
             update_time: 2019-03-27 14:55:09
           connection_id: 49409305627697
                 message: 
               row_count: 4
             elapse_time: 490
       scanned_row_count: 4
      scanned_data_bytes: 230
    result_file_oss_file: oss://aliyun-oa-query-results-***-oss-cn-hangzhou/DLA_Result/2019/03/27/q201903271454hze921092f0064543/result.csv
        cancellable_task: 0
              mq_product: NULL
                mq_model: queue
                mq_topic: dla_hangzhou_topic
                mq_queue: NULL
          mq_producer_id: GID_dla_hangzhou
             mq_endpoint: http://MQ_INST_*****_BagJ6yLU.cn-hangzhou.mqinternal.aliyuncs.com:8080
               mq_status: SUCCESS
            mq_error_msg: NULL
           mq_message_id: AC13140925E468C4039C6FFCFD50001B
           mq_total_time: 307

    mq_message_id就是MQ返回的idAC13140925E468C4039C6FFCFD50001B,通过mq_message_id,可以在MQ控制台上接收DLA消息。

    MQ成功接收DLA消息后,您就可以通过MQ实时了解DLA异步执行SQL的任务详情。关于MQ,请参见什么是消息队列RocketMQ版?