文档

通过向OSS上传文件触发工作流

更新时间:

本文介绍如何集成阿里云对象存储OSS与阿里云消息服务MNS,通过将数据上传文件至对象存储OSS中,自动触发工作流运行文件,并生成结果。

前提条件

步骤一:配置OSS Bucket事件通知

  1. 登录OSS管理控制台

  2. 单击Bucket 列表,创建OSS Bucket或者选择已有Bucket。如需创建,请参见创建存储空间

  3. 文件管理页面,选择数据处理 > 事件通知,单击创建规则,完成相关参数配置后,单击确定。如需自定义规则,请参见通过事件通知实时处理OSS文件变动

    配置项

    示例

    规则名称

    upload-complete

    事件类型

    PutObject, PostObject

    资源描述

    选择前后缀,并设置后缀为complete,即上传以.complete结尾的文件,触发事件后运行工作流。

    接收终端

    选择队列,名称设置为oss-event-queue

    规则创建后,在消息服务MNS中会自动创建一个主题与之对应。

  4. 登录消息服务MNS控制台

  5. 单击队列列表,创建队列oss-event-queue,创建方法,请参见创建队列。创建完成后,在队列详情页面的接入点区域获取Endpoint

    重要

    创建队列的名称需要与步骤3中配置的接收终端的队列名称保持一致。

步骤二:创建Event Bus

Event Bus可以被命名空间中的事件驱动工作流共享。如果已经创建,请执行步骤三:创建Event Source

方式一:使用NATS

  1. 创建event-bus.yaml文件。Event Bus示例代码如下所示:

    apiVersion: argoproj.io/v1alpha1
    kind: EventBus
    metadata:
      name: default
    spec:
      nats:
        native:
          replicas: 3
          auth: token
  2. 执行以下命令,创建EventBus。

    kubectl apply -f event-bus.yaml
    说明

    命令执行成功后,会在default命名空间下创建Event Bus Pod。后续操作需在同一命名空间下。

  3. 执行以下命令,查看Event Bus Pod是否正常启动。

    kubectl get pod

方式二:使用MNS消息队列

  1. 登录消息服务MNS控制台

  2. 主题列表页面创建主题argoeventbus,并在主题详情页面的接入点区域获取Endpoint。

  3. 使用阿里云账号(主账号)或具有管理权限的RAM用户登录RAM控制台

  4. 创建RAM用户,授权AliyunMNSFullAccess,并获取RAM用户的AK和SK。

  5. 执行以下命令,创建Secret用于存储AK和SK。

    kubectl create secret generic mns-secret\
      --from-literal=accesskey=*** \
      --from-literal=secretkey=***
  6. 创建event-bus-mns.yaml文件,EventBus示例代码如下所示:

    • topic:需替换为2中创建的MNS中的主题名称。

    • endpoint:需替换为2中获取的Endpoint。

    apiVersion: argoproj.io/v1alpha1
    kind: EventBus
    metadata:
      name: default
    spec:
      alimns:
        accessKey:
          key: accesskey
          name: mns-secret
        secretKey:
          key: secretkey
          name: mns-secret
        topic: argoeventbus  # 对应MNS中的主题名称。
        endpoint: http://165***368.mns.<region>.aliyuncs.com
  7. 执行以下命令,创建event-bus-mns.yaml

    kubectl apply -f event-bus.yaml
说明
  • 使用MNS消息队列方式创建Event Bus时,不会创建Pod。

  • 如需使用Trigger功能,请使用NATS方式创建EventBus。目前MNS方式不支持开源Argo Event的Sensor Trigger。

步骤三:创建Event Source

  1. 使用阿里云账号(主账号)或具有管理权限的RAM用户登录RAM控制台

  2. 创建RAM用户,为其授予AliyunMNSFullAccess权限,并获取RAM用户的AK和SK。具体操作,请参见创建RAM用户为RAM用户授权创建AccessKey查看RAM用户的AccessKey信息

  3. 执行以下命令,创建Secret用于存储AK和SK。

    kubectl create secret generic mns-secret\
     --from-literal=accesskey=*** \
     --from-literal=secretkey=***
  4. 创建event-source.yaml文件,Event Source示例代码如下所示:

    • queue:需替换为步骤5中创建的MNS队列名称。

    • endpoint:需替换为步骤5中获取的Endpoint。

    apiVersion: argoproj.io/v1alpha1
    kind: EventSource
    metadata:
      name: ali-mns
    spec:
      mns:
        example:
          jsonBody: true
          accessKey:
            key: accesskey
            name: mns-secret
          secretKey:
            key: secretkey
            name: mns-secret
          queue: oss-event-queue # 步骤一中创建的MNS队列名称
          waitTimeSeconds: 20
          endpoint: http://165***368.mns.<region>.aliyuncs.com # 步骤一中创建的MNS队列接入点
  5. 执行以下命令,创建Event Source。

    kubectl apply -f event-source.yaml
  6. 执行以下命令,查看Event Source Pod是否正常启动。

    kubectl get pod

步骤四:创建Event Sensor

  1. 创建event-sensor.yaml文件,在Event Sensor中嵌入待执行的工作流定义。Event Sensor示例代码如下所示:

    展开查看示例代码

    apiVersion: argoproj.io/v1alpha1
    kind: Sensor
    metadata:
      name: process-oss-file
    spec:
      template:
        serviceAccountName: default
      dependencies:
        - name: dep1
          eventSourceName: ali-mns
          eventName: example
      triggers:
        - template:
            name: process-oss-file-workflow
            k8s:
              operation: create
              source:
                resource:
                  apiVersion: argoproj.io/v1alpha1
                  kind: Workflow
                  metadata:
                    generateName: process-oss-file-
                    namespaces: default
                  spec:
                    entrypoint: process-oss-file
                    volumes:
                    - name: workdir
                      persistentVolumeClaim:
                        claimName: pvc-oss
                    arguments:
                      parameters:
                      - name: message
                        # this is the value that should be overridden
                        value: event message
                    templates:
                    - name: process-oss-file
                      steps:
                      - - name: parse-event-body
                          template: parse-event-body
                      - - name: process-file
                          template: process-file
                          arguments:
                            parameters:
                            - name: file-name
                              value: "{{steps.parse-event-body.outputs.parameters.file-name}}"
                    - name: parse-event-body
                      container:
                        image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/jq-alpine
                        command: [sh,-c]
                        args:
                        - echo "Event body:";
                          echo {{workflow.parameters.message}} | base64 -d;
                          TriggerFileName=$(echo {{workflow.parameters.message}} | base64 -d | jq .events[0].oss.object.key | cut -c2- | rev | cut -c2- |rev);
                          echo "" && echo "TriggerFileName from event is $TriggerFileName";
                          Tmp=${TriggerFileName%%.complete} && DataFileName=${Tmp##*/};
                          echo "DataFileName after cutting .complete is $DataFileName, and pass file name to next step";
                          echo $DataFileName > /tmp/file-name.txt
                      outputs:
                        parameters:
                        - name: file-name
                          valueFrom:
                            path: /tmp/file-name.txt
                    - name: process-file
                      inputs:
                        parameters:
                          - name: file-name
                      container:
                        image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/jq-alpine
                        imagePullPolicy: Always
                        command: [sh,-c]
                        args:
                        - echo "Show data-file:" && echo "";
                          ls -l /mnt/vol/{{inputs.parameters.file-name}};
                          echo "Content of data file:" && echo "";
                          cat /mnt/vol/{{inputs.parameters.file-name}} ;
                          echo "" && echo "Finished" ;
                        volumeMounts:
                        - name: workdir
                          mountPath: /mnt/vol
              parameters:
                - src:
                    dependencyName: dep1
                    dataKey: body
                  dest: spec.arguments.parameters.0.value
  2. 执行以下命令,创建Event Sensor。

    kubectl apply -f event-sensor.yaml
  3. 执行以下命令,查看Event Sensor Pod是否正常启动。

    kubectl get pod
说明

使用MNS消息队列方式创建Eventbus时,在Event Sensor创建完成后,会自动创建一个MNS队列与之对应,队列命名格式为:ackone-argowf-<namespace>-<sensor-name>-<sensor-uid>

步骤五:验证向OSS上传文件触发工作流

  1. 登录OSS管理控制台

  2. 步骤一:配置OSS Bucket事件通知中的OSS Bucket中上传以下2个文件(该文件需自备),触发工作流运行。

    • datafile:数据文件,文本格式,内容自定义。

    • datafile.complete:trigger文件,可以是空文件。

  3. 执行以下命令,在工作流集群中查看工作流运行情况。

    argo list

    预期输出如下:

    NAME STATUS AGE DURATION PRIORITY
    process-oss-file-kmb4k Running 13s 13s 0
  4. 执行以下命令,获取工作流日志,查看消息内容。

    argo logs process-oss-file-kmb4k
    重要
    • 该命令中的工作流名称必须和上一步骤中返回的工作流名称一致,ali-mns-workflow-5prz7仅为示例值,请您修改为实际环境中的返回值。

    • 消息内容使用Base64编码。

    预期输出如下:

    image.png

步骤六:清除Event相关资源

  1. 依次执行以下命令,清除Event相关资源。

    kubectl delete sensor process-oss-file
    kubectl delete eventsource ali-mns
    kubectl delete eventbus default
  2. 执行以下命令查看Pod,确认所有资源已清除。

    kubectl get pod