使用OOS将RocketMQ实例标签继承到Topic和Group

通过创建和执行系统运维管理(OOS)自定义模板,您可以将RocketMQ实例的标签继承到实例中的Topic和Group上。

使用限制

默认情况下,某地域的OOS负责管理本地域的资源。例如,华东1(杭州)的OOS默认管理华东1(杭州)的ECS实例。但是,作为例外,用户可以在模板ExecuteAPI里指定RegionId的值,来调用其他地域的OpenAPI(不建议这样做)。因此,此处OOS的地域需要与RocketMQ实例地域保持一致。关于OOS的使用限制,请参见使用限制

操作步骤

  1. 登录OOS控制台

  2. 创建子模板Ons_topicExtendInstanceTags。

    模板示例,请参见子模板:根据实例ID及其他条件更新Topic的标签。创建模板的具体操作,请参见创建模板

  3. 创建子模板Ons_groupExtendInstanceTags。

    模板示例,请参见子模板:根据实例ID及输入条件更新Group的标签。创建模板的具体操作,请参见创建模板

  4. 创建父模板RocketMQInstanceTagInherit,父模板中会嵌套以上两个子模板。

  5. 对父模板RocketMQInstanceTagInherit创建执行。

    您需要配置以下信息:

    • 选择需要继承标签的地域。

    • 选择需要继承的标签。

    • 选择需要继承标签的资源类型。

    • 选择相同标签键不同标签值时是否需要覆盖。

    创建执行的具体操作,请参见执行概述

  6. 登录消息队列RocketMQ版控制台,验证RocketMQ实例中的Topic和Group是否继承了实例的对应标签。

子模板:根据实例ID及其他条件更新Topic的标签

模板名称:Ons_topicExtendInstanceTags

模板内容:

FormatVersion: OOS-2019-06-01
Description: 标签继承
Parameters:
  regionId:
    Type: String
    Description:
      en: The id of region
      zh-cn: 地域ID
    Label:
      en: Region
      zh-cn: 地域
    AssociationProperty: RegionId
    Default: '{{ ACS::RegionId }}'
  instanceId:
    Type: String
    Description:
      en: The id of OnsInstance
      zh-cn: RocketMQ实例ID
    Label:
      en: InstanceId
      zh-cn: RocketMQ实例ID
  tagKeys:
    Type: List
    Description: 标签Key的集合
  isUpdate:
    Type: Boolean
    Description: 是否覆盖
    Default: false
RamRole: ''
Tasks:
  - Name: getOnsInstanceAndTags
    Action: ACS::ExecuteAPI
    Description: ListTagResources
    Properties:
      Service: ONS
      API: ListTagResources
      Parameters:
        RegionId: '{{ regionId }}'
        ResourceId:
          - '{{instanceId}}'
        ResourceType: instance
    Outputs:
      onsTags:
        Type: Json
        ValueSelector: '.TagResources | map(select( .TagKey | test("^(?!acs).*"))) | map(select(.TagKey as $tagKey | {{tagKeys}} | index($tagKey) >=0)) '
  - Name: getTopicAndTags
    Action: ACS::ExecuteAPI
    Description: OnsTopicList
    Properties:
      Service: Ons
      API: OnsTopicList
      Parameters:
        RegionId: '{{ regionId }}'
        InstanceId: '{{instanceId}}'
    Outputs:
      updateTopicAndTags:
        Type: List
        ValueSelector: .Data.PublishInfoDo[] | {"Topic":.Topic, "Tags":.Tags}
  - Name: tagResources
    Action: ACS::ExecuteAPI
    Description: TagTopics
    Properties:
      Service: Ons
      API: TagResources
      Parameters:
        RegionId: '{{ regionId }}'
        ResourceId:
          Fn::Jq:
            - All
            - .Topic
            - '{{ACS::TaskLoopItem}}'
        Tag:
          Fn::If:
            - '{{isUpdate}}'
            - Fn::Jq:
                - All
                - '.[] |{"Key":.TagKey, "Value": .TagValue} '
                - '{{getOnsInstanceAndTags.onsTags}}'
            - Fn::Jq:
                - All
                - '.Tags.Tag as $topicTags | {{getOnsInstanceAndTags.onsTags}} | map(select(.TagKey as $tagKey | [$topicTags[].Key] | index($tagKey) <0)) |.[] |{"Key":.TagKey, "Value": .TagValue}'
                - '{{ACS::TaskLoopItem}}'
        ResourceType: topic
        InstanceId: '{{instanceId}}'
    Outputs:
      reqResult:
        Type: Json
        ValueSelector: .RequestId
    Loop:
      RateControl:
        Mode: Concurrency
        MaxErrors: 100
        Concurrency: 1
      Items: '{{getTopicAndTags.updateTopicAndTags}}'
      Outputs:
        tagResult:
          AggregateType: Fn::ListJoin
          AggregateField: reqResult

子模板:根据实例ID及输入条件更新Group的标签

模板名称:Ons_groupExtendInstanceTags

模板内容:

FormatVersion: OOS-2019-06-01
Description: 标签继承
Parameters:
  regionId:
    Type: String
    Description:
      en: The id of region
      zh-cn: 地域ID
    Label:
      en: Region
      zh-cn: 地域
    AssociationProperty: RegionId
    Default: '{{ ACS::RegionId }}'
  instanceId:
    Type: String
    Description:
      en: The id of OnsInstance
      zh-cn: 实例ID
    Label:
      en: InstanceId
      zh-cn: 实例ID
  tagKeys:
    Type: List
    Description: 标签Key的集合
  isUpdate:
    Type: Boolean
    Description: 是否覆盖
    Default: false
RamRole: ''
Tasks:
  - Name: getOnsInstanceAndTags
    Action: ACS::ExecuteAPI
    Description: ListTagResources
    Properties:
      Service: ONS
      API: ListTagResources
      Parameters:
        RegionId: '{{ regionId }}'
        ResourceId:
          - '{{instanceId}}'
        ResourceType: instance
    Outputs:
      onsTags:
        Type: Json
        ValueSelector: '.TagResources | map(select( .TagKey | test("^(?!acs).*"))) | map(select(.TagKey as $tagKey | {{tagKeys}} | index($tagKey) >=0)) '
  - Name: getGroupAndTags
    Action: ACS::ExecuteAPI
    Description: OnsGroupList
    Properties:
      Service: Ons
      API: OnsGroupList
      Parameters:
        RegionId: '{{ regionId }}'
        InstanceId: '{{instanceId}}'
    Outputs:
      updateGroupAndTags:
        Type: List
        ValueSelector: .Data.SubscribeInfoDo[] | {"Group":.GroupId, "Tags":.Tags}
  - Name: tagResources
    Action: ACS::ExecuteAPI
    Description: TagGroups
    Properties:
      Service: Ons
      API: TagResources
      Parameters:
        RegionId: '{{ regionId }}'
        ResourceId:
          Fn::Jq:
            - All
            - .Group
            - '{{ACS::TaskLoopItem}}'
        Tag:
          Fn::If:
            - '{{isUpdate}}'
            - Fn::Jq:
                - All
                - '.[] |{"Key":.TagKey, "Value": .TagValue} '
                - '{{getOnsInstanceAndTags.onsTags}}'
            - Fn::Jq:
                - All
                - '.Tags.Tag as $groupTags | {{getOnsInstanceAndTags.onsTags}} | map(select(.TagKey as $tagKey | [$groupTags[].Key] | index($tagKey) <0)) |.[] |{"Key":.TagKey, "Value": .TagValue}'
                - '{{ACS::TaskLoopItem}}'
        ResourceType: group
        InstanceId: '{{instanceId}}'
    Outputs:
      reqResult:
        Type: Json
        ValueSelector: .RequestId
    Loop:
      RateControl:
        Mode: Concurrency
        MaxErrors: 100
        Concurrency: 1
      Items: '{{getGroupAndTags.updateGroupAndTags}}'
      Outputs:
        tagResult:
          AggregateType: Fn::ListJoin
          AggregateField: reqResult

父模板:按照输入条件更新某个地域下所有RocketMQ实例中的Topic和Group的标签

模板名称:RocketMQInstanceTagInherit

模板内容:

FormatVersion: OOS-2019-06-01
Description: 标签继承
Parameters:
  regionIds:
    Type: List
    Description:
      en: The id of region
      zh-cn: 地域ID
    Label:
      en: Region
      zh-cn: 地域
    AssociationProperty: RegionId
  tags:
    Type: List
    AssociationProperty: Tags
    AssociationPropertyMetadata:
      ResourceType: ALIYUN::MQ::INSTANCE
    Description: 消息队列RocketMQ实例上的标签,选择需要继承到Topic或Group上的标签键及标签值。
  resourceTypes:
    Type: List
    Description: 所选择继承的资源类型
    AllowedValues:
      - topic
      - group
  isUpdate:
    Type: Boolean
    Description: 如果被继承的资源存在相同Key但是Value不相同时,是否覆盖Value。
    Default: false
RamRole: ''
Tasks:
  - Name: getOnsInstance
    Action: ACS::ExecuteAPI
    Description: OnsInstanceInServiceList
    Properties:
      Service: ONS
      API: OnsInstanceInServiceList
      Parameters:
        RegionId: '{{ACS::TaskLoopItem}}'
    Outputs:
      onsInstance:
        Type: List
        ValueSelector: '.Data.InstanceVO[] |. + {"RegionId": "{{ACS::TaskLoopItem}}" }'
    Loop:
      RateControl:
        Mode: Concurrency
        MaxErrors: 100
        Concurrency: 1
      Items: '{{regionIds}}'
      Outputs:
        allOnsInstances:
          AggregateType: Fn::ListJoin
          AggregateField: onsInstance
  - Name: isTagTopic
    Action: ACS::Choice
    Description:
      en: Choose next task by resource type
      zh-cn: 根据资源类型选择下一个任务
    Properties:
      DefaultTask: isTagGroup
      Choices:
        - When:
            Fn::Equals:
              - true
              - Fn::Jq:
                  - First
                  - contains(["topic"])
                  - '{{resourceTypes}}'
          NextTask: tagTopic
  - Name: tagTopic
    Action: ACS::Template
    Description:
      en: Update topic tags by ons instance tags
      zh-cn: 通过RocketMQ实例标签更新Topic的标签
    Properties:
      TemplateName: Ons_topicExtendInstanceTags
      Parameters:
        regionId:
          Fn::Jq:
            - First
            - .RegionId
            - '{{ACS::TaskLoopItem}}'
        instanceId:
          Fn::Jq:
            - First
            - .InstanceId
            - '{{ACS::TaskLoopItem}}'
        tagKeys:
          Fn::Jq:
            - All
            - .[].Key
            - '{{tags}}'
        isUpdate: '{{isUpdate}}'
    Loop:
      RateControl:
        Mode: Concurrency
        MaxErrors: 100
        Concurrency: 1
      Items:
        Fn::Jq:
          - All
          - .[][]
          - '{{getOnsInstance.allOnsInstances}}'
      Outputs:
        tagResult:
          AggregateType: Fn::ListJoin
          AggregateField: reqResult
  - Name: isTagGroup
    Action: ACS::Choice
    Description:
      en: Choose next task by resource type
      zh-cn: 根据资源类型选择下一个任务
    Properties:
      DefaultTask: ACS::END
      Choices:
        - When:
            Fn::Equals:
              - true
              - Fn::Jq:
                  - First
                  - contains(["group"])
                  - '{{resourceTypes}}'
          NextTask: tagGroup
  - Name: tagGroup
    Action: ACS::Template
    Description:
      en: Update snapshot tags by ecs instance tags
      zh-cn: 通过RocketMQ实例标签更新Group的标签
    Properties:
      TemplateName: Ons_groupExtendInstanceTags
      Parameters:
        regionId:
          Fn::Jq:
            - First
            - .RegionId
            - '{{ACS::TaskLoopItem}}'
        instanceId:
          Fn::Jq:
            - First
            - .InstanceId
            - '{{ACS::TaskLoopItem}}'
        tagKeys:
          Fn::Jq:
            - All
            - .[].Key
            - '{{tags}}'
        isUpdate: '{{isUpdate}}'
    Loop:
      RateControl:
        Mode: Concurrency
        MaxErrors: 100
        Concurrency: 1
      Items:
        Fn::Jq:
          - All
          - .[][]
          - '{{getOnsInstance.allOnsInstances}}'
      Outputs:
        tagResult:
          AggregateType: Fn::ListJoin
          AggregateField: reqResult