本文介绍如何通过函数计算控制台创建EventBridge类别的OSS触发器。

注意事项

针对原生OSS触发器,一个Bucket最多支持关联10个触发器。如您需要在一个Bucket内,关联更多的OSS触发器,可以选择创建EventBridge类别的OSS触发器。

说明 通常情况下,不推荐为一个Bucket关联10个以上的触发器。如需创建,建议您创建新的Bucket,并基于新的Bucket创建触发器。

EventBridge类别的OSS触发器与原生OSS触发器不同,您需要关注以下注意事项。

  • 事件总线EventBridge的资源限制数一致,目前最多支持创建250个EventBridge类别的OSS触发器。更多信息,请参见使用限制
  • 无需保证语义唯一性,即事件类型文件前缀文件后缀组成的语义无需唯一。
  • 支持同时配置多个文件前缀文件后缀
  • 触发机制创建成功后,不会立即生效。需等待约30s后,对象存储OSS相关操作事件才能触发函数。

示例场景

您可以配置一个OSS触发器,在触发规则中设置前缀为sourcetest,后缀为.rar.zip。当有压缩文件存入指定的OSS Bucket中的source或者test目录下,且文件后缀为.rar或者.zip时,会自动触发函数执行。函数执行完成后,将解压缩后的文件存放到同一个Bucket的其他目录下。

前提条件

步骤一:创建EventBridge类别的OSS触发器

  1. 登录函数计算控制台,在左侧导航栏,单击服务及函数
  2. 在顶部菜单栏,选择地域,然后在服务列表页面,单击目标服务。
  3. 函数管理页面,单击目标函数名称。
  4. 在函数详情页面,单击触发器管理页签,从版本或别名下拉列表选择要创建触发器的版本或别名,然后单击创建触发器
  5. 在创建触发器面板,触发器类型选择EventBridge触发器类型的对象存储OSS,设置其余配置项,然后单击确定
    配置项操作本文示例
    名称填写自定义的触发器名称。oss-trigger
    版本或别名默认值为LATEST。如果您需要创建其他版本或别名的触发器,需先在函数详情页的版本或别名下拉列表选择该版本。关于版本和别名的简介,请参见管理版本管理别名LATEST
    Bucket 名称选择已创建的OSS Bucket。bucket-zh****
    文件前缀输入要匹配的文件名称的前缀。函数计算建议您配置前缀和后缀,避免触发事件嵌套循环触发引起额外费用。

    您可以在控制台单击+添加文件前缀同时添加多个文件前缀

    重要 文件前缀不能以/开头,否则会导致OSS触发器无法被触发。
    • source
    • test
    文件后缀输入要匹配的文件名称的后缀。函数计算建议您配置前缀和后缀,避免触发事件嵌套循环触发引起额外费用。

    您可以在控制台单击+添加文件后缀同时添加多个文件后缀

    • .zip
    • .rar
    事件类型选择一个或多个触发事件。关于对象存储OSS的事件类型,请参见OSS事件定义oss:ObjectCreated:PutObject
    事件模式内容您创建文件前缀文件后缀触发事件后,该配置项自动填充。
    重要 请谨慎修改事件模式内容,否则可能导致触发器触发失败。请了解事件模式的详细规则后,再决定是否修改。更多信息,请参见事件模式
    {
        "source":[
            "acs.oss"
        ]
    }
    调用方式选择函数调用方式,默认为同步调用。
    • 同步调用:事件触发函数执行,等待函数调用完成后,函数计算返回执行结果。适用场景较广泛,更多信息,请参见同步调用
    • 异步调用:事件触发函数执行后,函数计算立即返回响应结果并且确保函数至少被成功执行一次,但不会返回具体执行结果。适用于调度延时较长的函数。更多信息,请参见功能概览
    同步调用
    启用触发器创建触发器后是否立即启用。
    函数入参模板指定函数入口参数的数据格式,该数据通过函数的Event参数进行传递。

    Event参数支持CloudEvents模板,表示以通用格式描述事件数据的规范,简化不同服务、平台间的事件声明和传输。

    说明 两种类型的OSS触发器的函数入参模板不同。原生OSS触发器与EventBridge类别的OSS触发器不同,其Event参数支持OSSEvents模板,即阿里云对象存储OSS生成的事件格式。
    CloudEvents

    创建完成后,在触发器名称列表中显示已创建的触发器。如需对创建的触发器进行修改或删除,具体操作,请参见触发器管理

步骤二:配置函数入口参数

EventBridge类别的OSS触发器事件源会以CloudEvents模板作为输入参数传递给函数,您可以手动将Event传给函数模拟触发事件。

  1. 在函数详情页面,单击函数代码页签,然后单击xialatubiao图标,从下拉列表中,选择配置测试参数
  2. 配置测试参数面板,选择创建新测试事件编辑已有测试事件页签,填写事件名称和事件内容。然后单击确定
    Event是函数计算的入口参数,当指定的OSS Bucket发生指定事件时,会将事件数据以JSON格式发送给绑定的函数。具体格式如下所示。
    {
        "datacontenttype": "application/json;charset=utf-8",
        "aliyunaccountid": "143199913****",
        "data": {
            "eventVersion": "1.0",
            "responseElements": {
                "requestId": "6364D216511B143733C5A67B"
            },
            "eventSource": "acs:oss",
            "eventTime": "2022-11-04T08:49:26.000Z",
            "requestParameters": {
                "sourceIPAddress": "140.205.XX.XX"
            },
            "eventName": "ObjectCreated:PostObject",
            "userIdentity": {
                "principalId": "143199913****"
            },
            "region": "cn-shenzhen",
            "oss": {
                "bucket": {
                    "name": "bucket-zh***",
                    "arn": "acs:oss:cn-shenzhen:143199913****:bucket-zh***",
                    "virtualBucket": "",
                    "ownerIdentity": "143199913****"
                },
                "ossSchemaVersion": "1.0",
                "object": {
                    "size": 13,
                    "objectMeta": {
                        "mimeType": "text/plain"
                    },
                    "deltaSize": 13,
                    "eTag": "59CA0EFA9F5633CB0371BBC0355478D8",
                    "key": "myPrefix.mySuffix.txt"
                }
            }
        },
        "subject": "acs:oss:cn-shenzhen:143199913****:bucket-zh***/myPrefix.mySuffix.txt",
        "aliyunoriginalaccountid": "143199913****",
        "source": "acs.oss",
        "type": "oss:ObjectCreated:PostObject",
        "aliyunpublishtime": "2022-11-04T08:49:26.745Z",
        "specversion": "1.0",
        "aliyuneventbusname": "default",
        "id": "6364D216511B143733C5A67B",
        "time": "2022-11-04T08:49:26Z",
        "aliyunregionid": "cn-shenzhen"
    }
    Event参数中不同属性字段的解释如下表所示。
    参数类型示例值描述
    datacontenttypeStringapplication/json;charset=utf-8参数data的内容形式。datacontenttype只支持application/json;charset=utf-8格式。
    aliyunaccountidString143199913****阿里云账号ID。
    dataStruct{}OSS事件内容,为JSON对象。CloudEvents包含事件发生时由OSS定义的上下文,data中封装了这些信息。
    subjectStringacs:oss:cn-shenzhen:143199913****:bucket-zh****/myPrefix.mySuffix.txt事件主题。
    sourceStringacs.oss事件源。OSS触发器固定为acs.oss
    typeStringoss:ObjectCreated:PostObject事件类型。
    aliyunpublishtimeTimestamp2022-11-04T08:49:26.745Z接收事件的时间。
    specversionString1.0CloudEvents协议版本。
    aliyuneventbusnameStringdefault接收事件的事件总线名称。
    idString6364D216511B143733C5A67B事件ID。
    timeTimestamp2022-11-04T08:49:26Z事件产生的时间。
    aliyunregionidStringcn-shenzhen接收事件的地域。
    aliyunpublishaddrString140.205.XX.XX接收事件的服务器地址。

步骤三:编写函数代码并测试

OSS触发器创建完成后,您可以开始编写函数代码并测试,以验证代码的正确性。在实际操作过程中发生OSS事件时,会自动触发函数执行。

警告 代码中一定要避免循环触发,否则会产生不必要的费用。一个典型的循环触发场景是OSS的某个Bucket上传文件事件触发函数执行,此函数执行完成后又生成了一个或多个文件再写回到OSS的Bucket里,这个写入动作又触发了函数执行,形成了链状循环。更多信息,请参见原生OSS触发器触发规则
  1. 在函数详情页面,单击函数代码页签,在代码编辑器中编写代码,然后单击部署代码

    执行代码前的准备工作和代码示例如下。

    说明 如果您要在您的函数中读写OSS资源,建议使用OSS内网服务地址进行访问,避免使用公网访问,产生公网费用。关于OSS内网服务地址的格式,请参见访问域名和数据中心
    /*准备工作:
    1.请先在终端中执行以下代码安装package.json文件和jimp图片处理模块。
      a.在终端执行npm init安装package.json文件
      b.在终端执行npm install jimp安装jimp图片处理模块
    2.请确保函数的函数入口(handler)为index.handler。
    */
    
    'use strict';
     console.log('Loading function ...');
     var oss = require('ali-oss');
     var fs = require('fs');
     var jimp = require("jimp");
     module.exports.handler = function (eventBuf, ctx, callback) {
         console.log('Received event:', eventBuf.toString());
         var event = JSON.parse(eventBuf);
    
         var ossEvent = event.data;
    
         // OSS地域以“oss-”为前缀,例如“oss-cn-shanghai”
         var ossRegion = "oss-" + ossEvent.region;
         // 创建OSS客户端
         var client = new oss({
             region: ossRegion,
             // 从上下文中获取凭证
             accessKeyId: ctx.credentials.accessKeyId,
             accessKeySecret: ctx.credentials.accessKeySecret,
             stsToken: ctx.credentials.securityToken
         });
         // 从事件中获取Bucket名称
         client.useBucket(ossEvent.oss.bucket.name);
         // 处理后的图片被存储至processed/目录
         var newKey = ossEvent.oss.object.key.replace("source/", "processed/");
         var tmpFile = "/tmp/processed.png";
         // 获取OSS文件对象
         console.log('Getting object: ', ossEvent.oss.object.key)
         client.get(ossEvent.oss.object.key).then(function (val) {
             // 从缓存中读取OSS文件对象内容
             jimp.read(val.content, function (err, image) {
                 if (err) {
                     console.error("Failed to read image");
                     callback(err);
                     return;
                 }
                 // 调整图片大小,并将其保存至tmp文件中
                 image.resize(128, 128).write(tmpFile, function (err) {
                     if (err) {
                         console.error("Failed to write image locally");
                         callback(err);
                         return;
                     }
                     // 将读取到的文件对象上传到OSS存储空间中并重新命名
                     console.log('Putting object: ', newKey);
                     client.put(newKey, tmpFile).then(function (val) {
                         console.log('Put object:', val);
                         callback(null, val);
                         return;
                     }).catch(function (err) {
                         console.error('Failed to put object: %j', err);
                         callback(err);
                         return
                     });
                 });
             });
         }).catch(function (err) {
             console.error('Failed to get object: %j', err);
             callback(err);
             return
         });
     };
    #准备工作:
    #1.请确保函数所在服务配置的角色具有访问对象存储OSS的权限。您可以登录RAM控制台,为该角色添加访问对象存储OSS的权限。
    #2.请确保函数的函数入口(handler)为index.handler。
    
    # -*- coding: utf-8 -*-
    import oss2, json
    from wand.image import Image
    def handler(event, context):
        evt = json.loads(event)
        creds = context.credentials
        # Required by OSS sdk
        auth=oss2.StsAuth(
            creds.access_key_id,
            creds.access_key_secret,
            creds.security_token)
        evt = evt['data']
        bucket_name = evt['oss']['bucket']['name']
        endpoint = 'oss-' +  evt['region'] + '-internal.aliyuncs.com'
        bucket = oss2.Bucket(auth, endpoint, bucket_name)
        objectName = evt['oss']['object']['key']
        # Processed images will be saved to processed/
        newKey = objectName.replace("source/", "processed/")
        remote_stream = bucket.get_object(objectName)
        if not remote_stream:
            return
        remote_stream = remote_stream.read()
        with Image(blob=remote_stream)  as img:
            with img.clone() as i:
                i.resize(128, 128)
                new_blob = i.make_blob()
                bucket.put_object(newKey, new_blob)
    /*准备工作:
    1.请确保函数所在服务配置的角色具有访问对象存储OSS的权限。您可以登录RAM控制台,为该角色添加访问对象存储OSS的权限。
    2.请确保函数的函数入口(handler)为index.handler。
    */
    
    <?php
      use OSS\OssClient;
    function handler($event, $context) {
      $event           = json_decode($event, $assoc = true);
      $accessKeyId     = $context["credentials"]["accessKeyId"];
      $accessKeySecret = $context["credentials"]["accessKeySecret"];
      $securityToken   = $context["credentials"]["securityToken"];
      $evt        = $event['data'];
      $bucketName = $evt['oss']['bucket']['name'];
      $endpoint   = 'oss-' . $evt['region'] . '-internal.aliyuncs.com';
      $objectName = $evt['oss']['object']['key'];
      $newKey = str_replace("source/", "processed/", $objectName);
      try {
        $ossClient = new OssClient($accessKeyId, $accessKeySecret, $endpoint, false, $securityToken);
        $content = $ossClient->getObject($bucketName , $objectName);
        if ($content == null || $content == "") {
          return;
        }
        $imagick = new Imagick();
        $imagick->readImageBlob($content);
        $imagick->resizeImage(128, 128, Imagick::FILTER_LANCZOS, 1);
        $ossClient->putObject($bucketName, $newKey, $imagick->getImageBlob());
      } catch (OssException $e) {
        print($e->getMessage());
      }
    }
    /*准备工作:
    1.请先增加如下依赖到pom.xml。
    <dependencies>
        <dependency>
          <groupId>com.aliyun.fc.runtime</groupId>
          <artifactId>fc-java-core</artifactId>
          <version>1.4.1</version>
        </dependency>
        <dependency>
          <groupId>com.aliyun.fc.runtime</groupId>
          <artifactId>fc-java-event</artifactId>
          <version>1.2.0</version>
        </dependency>
    </dependencies>
    2.请确保函数的函数入口(handler)为example.App::handleRequest。
    */
    
    package example;
    
    import java.io.*;
    import java.util.Map;
    
    import com.aliyun.fc.runtime.Context;
    import com.aliyun.fc.runtime.StreamRequestHandler;
    import com.aliyun.fc.runtime.event.OSSEvent.Event;
    
    import com.fasterxml.jackson.annotation.JsonCreator;
    import com.fasterxml.jackson.annotation.JsonProperty;
    import com.fasterxml.jackson.core.type.TypeReference;
    import com.fasterxml.jackson.databind.DeserializationFeature;
    import com.fasterxml.jackson.databind.ObjectMapper;
    
    
    
    public class App implements StreamRequestHandler {
    
        private static final ObjectMapper mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        public static final class CloudEvent {
            public final String id;
    
            public final String source;
            public final String specversion;
    
            public final String type;
            public final String datacontenttype;
    
            public final String dataschema;
    
            public final String subject;
    
            public final String time;
    
            public final Map<String, ?> extensions;
    
            public final Event data;
    
            @JsonCreator
            public CloudEvent(@JsonProperty("id") String id, @JsonProperty("source") String source, @JsonProperty("specversion") String specversion, @JsonProperty("type") String type, @JsonProperty("datacontenttype") String datacontenttype, @JsonProperty("dataschema") String dataschema, @JsonProperty("subject") String subject, @JsonProperty("time") String time, @JsonProperty("extensions") Map<String, ?> extensions, @JsonProperty("data") Event data) {
                this.id = id;
                this.source = source;
                this.specversion = specversion;
                this.type = type;
                this.datacontenttype = datacontenttype;
                this.dataschema = dataschema;
                this.subject = subject;
                this.time = time;
                this.extensions = extensions;
                this.data = data;
            }
    
            public String getId() {
                return this.id;
            }
    
            public String getSource() {
                return this.source;
            }
    
            public String getSpecversion() {
                return this.specversion;
            }
    
            public String getType() {
                return this.type;
            }
    
            public String getDatacontenttype() {
                return this.datacontenttype;
            }
    
            public String getDataschema() {
                return this.dataschema;
            }
    
            public String getSubject() {
                return this.subject;
            }
    
            public String getTime() {
                return this.time;
            }
    
            public Map<String, ?> getExtensions() {
                return this.extensions;
            }
    
            public Event getData() {
                return this.data;
            }
        }
        @Override
        public void handleRequest(
                InputStream inputStream, OutputStream outputStream, Context context) throws IOException {
            CloudEvent cloudEvents = mapper.readValue(inputStream, new TypeReference<CloudEvent>() {});
            Event ossEvent = cloudEvents.getData();
            context.getLogger().info(String.format("received %s from %s @ %s", ossEvent.eventName, ossEvent.eventSource, ossEvent.region));
            outputStream.write(String.format("received %s from %s @ %s", ossEvent.eventName, ossEvent.eventSource, ossEvent.region).getBytes());
            outputStream.write(String.format("received bucket %s", ossEvent.oss.bucket.arn).getBytes());
            outputStream.write(String.format("received object %s and it's size is %s", ossEvent.oss.object.key, ossEvent.oss.object.size).getBytes());
        }
    }
  2. 单击函数代码页签的测试函数
    执行完成后,您可以在函数代码页签的上方查看执行结果。