本文介绍如何集成阿里云对象存储OSS与阿里云轻量消息队列(原 MNS),通过将数据上传文件至对象存储OSS中,自动触发工作流运行文件,并生成结果。
前提条件
已开通以下服务与功能。
开启事件驱动功能。具体操作,请参见开启事件驱动功能。
开通轻量消息队列(原 MNS)。该功能会涉及轻量消息队列(原 MNS)相关计费,具体收费情况,请参见计费说明。
已创建工作流集群并下载阿里云Argo CLI。
创建工作流集群,请参见创建工作流集群。
下载并安装阿里云Argo CLI,请参见阿里云Argo CLI。
创建资源入口:连接工作流集群后,在本地终端操作。具体操作,请参见获取集群KubeConfig并通过kubectl工具连接集群。
步骤一:配置OSS Bucket事件通知
登录OSS管理控制台。
单击Bucket 列表,创建OSS Bucket或者选择已有Bucket。如需创建,请参见创建存储空间。
在文件管理页面,选择数据处理 > 事件通知,单击创建规则,完成相关参数配置后,单击确定。如需自定义规则,请参见通过事件通知实时处理OSS文件变动。
配置项
示例
规则名称
upload-complete
事件类型
PutObject, PostObject
资源描述
选择前后缀,并设置后缀为
complete
,即上传以.complete
结尾的文件,触发事件后运行工作流。接收终端
选择队列,名称设置为
oss-event-queue
。规则创建后,在轻量消息队列(原 MNS)中会自动创建一个主题与之对应。
单击队列列表,创建队列oss-event-queue,创建方法,请参见创建队列。创建完成后,在队列详情页面的接入点区域获取Endpoint。
重要创建队列的名称需要与步骤3中配置的接收终端的队列名称保持一致。
步骤二:创建Event Bus
Event Bus可以被命名空间中的事件驱动工作流共享。如果已经创建,请执行步骤三:创建Event Source。
方式一:使用NATS
创建
event-bus.yaml
文件。Event Bus示例代码如下所示:apiVersion: argoproj.io/v1alpha1 kind: EventBus metadata: name: default spec: nats: native: replicas: 3 auth: token
执行以下命令,创建EventBus。
kubectl apply -f event-bus.yaml
说明命令执行成功后,会在default命名空间下创建Event Bus Pod。后续操作需在同一命名空间下。
执行以下命令,查看Event Bus Pod是否正常启动。
kubectl get pod
方式二:使用轻量消息队列(原 MNS)
在主题列表页面创建主题argoeventbus,并在主题详情页面的接入点区域获取Endpoint。
使用RAM管理员登录RAM控制台。
创建RAM用户,授权
AliyunMNSFullAccess
,并获取RAM用户的AK和SK。执行以下命令,创建Secret用于存储AK和SK。
kubectl create secret generic mns-secret\ --from-literal=accesskey=*** \ --from-literal=secretkey=***
创建
event-bus-mns.yaml
文件,EventBus示例代码如下所示:apiVersion: argoproj.io/v1alpha1 kind: EventBus metadata: name: default spec: alimns: accessKey: key: accesskey name: mns-secret secretKey: key: secretkey name: mns-secret topic: argoeventbus # 对应轻量消息队列(原 MNS)中的主题名称。 endpoint: http://165***368.mns.<region>.aliyuncs.com
执行以下命令,创建
event-bus-mns.yaml
。kubectl apply -f event-bus.yaml
使用轻量消息队列(原 MNS)方式创建Event Bus时,不会创建Pod。
如需使用Trigger功能,请使用NATS方式创建EventBus。目前轻量消息队列(原 MNS)方式不支持开源Argo Event的Sensor Trigger。
步骤三:创建Event Source
使用RAM管理员登录RAM控制台。
创建RAM用户,为其授予
AliyunMNSFullAccess
权限,并获取RAM用户的AK和SK。具体操作,请参见创建RAM用户、为RAM用户授权、创建AccessKey和查看RAM用户的AccessKey信息。执行以下命令,创建Secret用于存储AK和SK。
kubectl create secret generic mns-secret\ --from-literal=accesskey=*** \ --from-literal=secretkey=***
创建
event-source.yaml
文件,Event Source示例代码如下所示:apiVersion: argoproj.io/v1alpha1 kind: EventSource metadata: name: ali-mns spec: mns: example: jsonBody: true accessKey: key: accesskey name: mns-secret secretKey: key: secretkey name: mns-secret queue: oss-event-queue # 步骤一中创建的轻量消息队列(原 MNS)名称。 waitTimeSeconds: 20 endpoint: http://165***368.mns.<region>.aliyuncs.com # 步骤一中创建的轻量消息队列(原 MNS)接入点。
执行以下命令,创建Event Source。
kubectl apply -f event-source.yaml
执行以下命令,查看Event Source Pod是否正常启动。
kubectl get pod
步骤四:创建Event Sensor
创建
event-sensor.yaml
文件,在Event Sensor中嵌入待执行的工作流定义。Event Sensor示例代码如下所示:执行以下命令,创建Event Sensor。
kubectl apply -f event-sensor.yaml
执行以下命令,查看Event Sensor Pod是否正常启动。
kubectl get pod
使用轻量消息队列(原 MNS)方式创建Eventbus时,在Event Sensor创建完成后,会自动创建一个轻量消息队列(原 MNS)与之对应,队列命名格式为:ackone-argowf-<namespace>-<sensor-name>-<sensor-uid>。
步骤五:验证向OSS上传文件触发工作流
登录OSS管理控制台。
向步骤一:配置OSS Bucket事件通知中的OSS Bucket中上传以下2个文件(该文件需自备),触发工作流运行。
datafile:数据文件,文本格式,内容自定义。
datafile.complete:trigger文件,可以是空文件。
执行以下命令,在工作流集群中查看工作流运行情况。
argo list
预期输出如下:
NAME STATUS AGE DURATION PRIORITY process-oss-file-kmb4k Running 13s 13s 0
执行以下命令,获取工作流日志,查看消息内容。
argo logs process-oss-file-kmb4k
重要该命令中的工作流名称必须和上一步骤中返回的工作流名称一致,
ali-mns-workflow-5prz7
仅为示例值,请您修改为实际环境中的返回值。消息内容使用Base64编码。
预期输出如下:
步骤六:清除Event相关资源
依次执行以下命令,清除Event相关资源。
kubectl delete sensor process-oss-file kubectl delete eventsource ali-mns kubectl delete eventbus default
执行以下命令查看Pod,确认所有资源已清除。
kubectl get pod