文档

通过轻量消息队列(原 MNS)触发工作流

更新时间:

工作流集群支持集成阿里云轻量消息队列(原 MNS),利用轻量消息队列(原 MNS)作为中介接入丰富的事件源,利用事件驱动触发工作流运行。当有新的事件触发(如OSS事件触发、EventBridge事件触发)时,轻量消息队列(原 MNS)会收到相应的消息,Argo会根据事件触发的条件自动触发工作流的运行,实现自动化的工作流处理。

前提条件

步骤一:创建Event Bus

Event Bus可以被命名空间中的事件驱动工作流共享,您可以通过NATS和轻量消息队列(原 MNS)两种方式创建,如果已经创建,请直接执行步骤二:创建Event Source

通过NATS创建

  1. 创建event-bus.yaml文件。Event Bus示例代码如下所示:

    apiVersion: argoproj.io/v1alpha1
    kind: EventBus
    metadata:
      name: default
    spec:
      nats:
        native:
          replicas: 3
          auth: token
  2. 执行以下命令,创建EventBus。

    kubectl apply -f event-bus.yaml
    说明

    命令执行成功后,会在default命名空间下创建Event Bus Pod。后续操作需在同一命名空间下。

  3. 执行以下命令,查看Event Bus Pod是否正常启动。

    kubectl get pod

通过轻量消息队列(原 MNS)创建

  1. 登录轻量消息队列(原 MNS)控制台

  2. 主题列表页面创建主题argoeventbus,并在主题详情页面的接入点区域获取Endpoint。

  3. 使用RAM管理员登录RAM控制台

  4. 创建RAM用户,为RAM用户授予AliyunMNSFullAccess权限,并获取RAM用户的AK和SK。

    具体操作,请参见创建RAM用户为RAM用户授权创建AccessKey查看RAM用户的AccessKey信息

  5. 执行以下命令,创建Secret用于存储AK和SK。

    kubectl create secret generic mns-secret\
      --from-literal=accesskey=*** \
      --from-literal=secretkey=***
  6. 创建event-bus-mns.yaml文件,将示例中的参数修改为实际使用的参数值。

    • topic:需替换为步骤2中创建的轻量消息队列(原 MNS)中的主题名称。

    • endpoint:需替换为步骤2中获取的Endpoint。

    apiVersion: argoproj.io/v1alpha1
    kind: EventBus
    metadata:
      name: default
    spec:
      alimns:
        accessKey:
          key: accesskey
          name: mns-secret
        secretKey:
          key: secretkey
          name: mns-secret
        topic: argoeventbus  # 对应轻量消息队列(原 MNS)中的主题名称。
        endpoint: http://165***368.mns.<region>.aliyuncs.com  # 对应轻量消息队列(原 MNS)的Endpoint。
  7. 执行以下命令,应用event-bus-mns.yaml文件创建Event Bus资源。

    kubectl apply -f event-bus-mns.yaml

步骤二:创建Event Source

  1. 登录轻量消息队列(原 MNS)控制台

  2. 队列列表页面创建队列test-event-queue,并在队列详情页面的接入点区域获取Endpoint。

    说明

    若您是通过轻量消息队列(原 MNS)创建的Event Bus,则步骤3~步骤5可直接跳过,直接执行步骤6

  3. 使用RAM管理员登录RAM控制台

  4. 创建RAM用户,为RAM用户授予AliyunMNSFullAccess权限,并获取RAM用户的AK和SK。

    具体操作,请参见创建RAM用户为RAM用户授权创建AccessKey查看RAM用户的AccessKey信息

  5. 执行以下命令,创建Secret用于存储AK和SK。

    kubectl create secret generic mns-secret\
      --from-literal=accesskey=*** \
      --from-literal=secretkey=***
  6. 创建event-source.yaml文件,将示例中的参数修改为实际使用的参数值。

    • topic:需替换为步骤2中创建的轻量消息队列(原 MNS)中的主题名称。

    • endpoint:需替换为步骤2中获取的Endpoint。

    apiVersion: argoproj.io/v1alpha1
    kind: EventSource
    metadata:
      name: ali-mns
    spec:
      mns:
        example:
          jsonBody: true
          accessKey:
            key: accesskey
            name: mns-secret
          secretKey:
            key: secretkey
            name: mns-secret
          queue: test-event-queue # 对应轻量消息队列(原 MNS)中的队列名称。
          waitTimeSeconds: 20
          endpoint: http://165***368.mns.<region>.aliyuncs.com # 对应轻量消息队列(原 MNS)的Endpoint。
  7. 执行以下命令,应用event-source.yaml文件创建Event Source。

    kubectl apply -f event-source.yaml
  8. 执行以下命令,查看Event Source Pod是否正常启动。

    kubectl get pod

步骤三:创建Event Sensor

  1. 创建event-sensor.yaml文件,在Event Sensor中嵌入待执行的工作流定义。Event Sensor示例代码如下所示:

    展开查看示例代码

    apiVersion: argoproj.io/v1alpha1
    kind: Sensor
    metadata:
      name: ali-mns
    spec:
      template:
        serviceAccountName: default
      dependencies:
        - name: test-dep
          eventSourceName: ali-mns    # 匹配eventsource名称。
          eventName: example          # 匹配eventsource中的事件名称定义。
      triggers:
        - template:
            name: mns-workflow
            k8s:
              operation: create
              source:
                resource:
                  apiVersion: argoproj.io/v1alpha1    # 嵌入工作流定义。
                  kind: Workflow
                  metadata:
                    generateName: ali-mns-workflow-
                  spec:
                    entrypoint: whalesay
                    arguments:
                      parameters:        # 参数传递事件内容。
                        - name: message
                          # this is the value that should be overridden
                          value: hello world
                    templates:
                      - name: whalesay
                        inputs:
                          parameters:
                            - name: message
                        container:
                          image: docker/whalesay:latest
                          command: [cowsay]
                          args: ["{{inputs.parameters.message}}"]
              parameters:
                - src:        # 解析事件内容,传递给工作流。
                    dependencyName: test-dep
                    dataKey: body
                  dest: spec.arguments.parameters.0.value
  2. 执行以下命令,应用event-sensor.yaml文件创建Event Sensor。

    kubectl apply -f event-sensor.yaml
  3. 执行以下命令,查看Event Sensor Pod是否正常启动。

    kubectl get pod
说明

通过轻量消息队列(原 MNS)创建Eventbus时,在Event Sensor创建完成后,会自动创建一个轻量消息队列(原 MNS)与之对应,队列命名格式为:ackone-argowf-<namespace>-<sensor-name>-<sensor-uid>

步骤四:验证通过向轻量消息队列(原 MNS)发送消息触发工作流

  1. 登录轻量消息队列(原 MNS)控制台

  2. 队列列表页面中,找到队列test-event-queue,在其操作列选择更多 > 收发消息

  3. 在收发消息快速体验页面中,输入消息内容test trigger argo workflow,然后单击发送消息

  4. 执行以下命令,在工作流集群中查看工作流的运行情况。

    argo list

    预期输出如下:

    NAME                     STATUS    AGE   DURATION   PRIORITY
    ali-mns-workflow-5prz7   Running   6s    6s         0
  5. 获取工作流日志,查看消息内容。

    argo logs ali-mns-workflow-5prz7
    重要
    • 该命令中的工作流名称必须和上一步骤中返回的工作流名称一致,ali-mns-workflow-5prz7仅为示例值,请您修改为实际环境中的返回值。

    • 消息内容使用Base64编码。

    预期输出如下:image.png

步骤五:清除Event相关资源

  1. 执行以下命令清除相关资源。

    • 清除Event Sensor

      kubectl delete sensor ali-mns
    • 清除Event Source

      kubectl delete eventsource ali-mns
    • 清除Event Bus

      kubectl delete eventbus default
  2. 执行以下命令查看Pod状态,确认所有资源已清除。

    kubectl get pod