全部产品
云市场

MQ消息回执通知

更新时间:2019-07-17 17:04:29

本文以异步查询OSS数据为例,介绍如何在DLA中使用MQ做消息回执通知。

前提条件

  1. 开通OSS服务

    在DLA中异步执行SQL时,必须开通OSS服务,存储SQL执行结果。

  2. 创建存储空间

  3. 新建OSS Schema

  4. 创建OSS表

  5. 创建MQ实例、创建Topic、创建Group,详情请参见MQ文档主账号 - 快速入门子账号 - 快速入门

    MQ和DLA所属Region相同。

操作步骤

步骤一: 新建RAM角色

  1. 使用DLA所属账号登录RAM控制台

  2. 单击RAM角色管理 > 新建RAM角色,在新建RAM角色页面进行以下配置。

    • 可信实体类型选择阿里云账号

    • 受信云账号选择当前云账号

    • RAM角色名称AliyunOpenAnalyticsAccessingMQRole

      创建RAM角色

步骤二: 修改角色授信策略

在RAM角色列表中单击刚刚创建的角色,切换至RAM角色内容页签,单击修改信任策略,用以下策略替换原始策略。

  1. {
  2. "Statement": [
  3. {
  4. "Action": "sts:AssumeRole",
  5. "Effect": "Allow",
  6. "Principal": {
  7. "Service": [
  8. "openanalytics.aliyuncs.com"
  9. ]
  10. }
  11. }
  12. ],
  13. "Version": "1"
  14. }

修改角色

步骤三: 修改角色权限

在RAM角色列表中单击刚刚创建的角色,单击角色右侧的添加权限为角色添加系统权限策略AliyunMQFullAccess

添加授权策略

步骤四:异步执行SQL并使用MQ做消息回执通知

语法

  1. /*+ run_async=true, mq-notify-by=ons, mq-topic=${您的mq的topic},
  2. mq-producer-id=${您的group Id}, mq-endpoint=${您的某个endpoint,与DLA所属region相同} */
  3. select * from xxxx ....;

示例

  1. /*+ run_async=true,
  2. mq-notify-by=ons,
  3. mq-topic=dla_hangzhou_topic,
  4. mq-producer-id=GID_dla_hangzhou,
  5. mq-endpoint=http://MQ_INST_*****_BagJ6yLU.cn-hangzhou.mq-internal.aliyuncs.com:8080 */ select * from oss_schema.oss_json;

执行上述SQL,得到SQL任务IDASYNC_TASK_ID,例如q201903271454hze921092f0064543。通过show query_task得到mq_message_id

  1. mysql> show query_task where id = 'q201903271454hze921092f0064543'\G
  2. *************************** 1. row ***************************
  3. id: q201903271454hze921092f0064543
  4. mpp_query_id: 20190322_120525_12951_rdxtt
  5. status: SUCCESS
  6. task_name: SELECT
  7. table_schema: oss_schema
  8. command: /*+ run_async=true, mq-notify-by=mns, mq-topic=dla_hangzhou_topic, mq-producer-id=GID_dla_hangzhou, mq-endpoint=http://MQ_INST_*****_BagJ6yLU.cn-hangzhou.mqinternal.aliyuncs.com:8080 */ select * from oss_schema.oss_json
  9. creator_id: ${您的dla账号}
  10. create_time: 2019-03-27 14:55:09
  11. update_time: 2019-03-27 14:55:09
  12. connection_id: 49409305627697
  13. message:
  14. row_count: 4
  15. elapse_time: 490
  16. scanned_row_count: 4
  17. scanned_data_bytes: 230
  18. result_file_oss_file: oss://aliyun-oa-query-results-***-oss-cn-hangzhou/DLA_Result/2019/03/27/q201903271454hze921092f0064543/result.csv
  19. cancellable_task: 0
  20. mq_product: NULL
  21. mq_model: queue
  22. mq_topic: dla_hangzhou_topic
  23. mq_queue: NULL
  24. mq_producer_id: GID_dla_hangzhou
  25. mq_endpoint: http://MQ_INST_*****_BagJ6yLU.cn-hangzhou.mqinternal.aliyuncs.com:8080
  26. mq_status: SUCCESS
  27. mq_error_msg: NULL
  28. mq_message_id: AC13140925E468C4039C6FFCFD50001B
  29. mq_total_time: 307

mq_message_id就是MQ返回的idAC13140925E468C4039C6FFCFD50001B,通过mq_message_id,可以在MQ控制台上接收DLA消息。

MQ成功接收DLA消息后,您就可以通过MQ实时了解DLA异步执行SQL的任务详情。关于MQ,请参见消息队列MQ