您可使用开源CloudEvents Java SDK发布事件。

前提条件

  1. 您已下载IntelliJ IDEA。更多信息,请参见下载IntelliJ IDEA

    说明

    您可以使用IntelliJ IDEA或者Eclipse的IDE,本文以IntelliJ IDEA为例。

  2. 在pom.xml中加入依赖。

    <dependency>
        <groupId>io.cloudevents</groupId>
        <artifactId>cloudevents-core</artifactId>
        <version>${cloudevents.version}</version>
    </dependency>
    <dependency>
        <groupId>io.cloudevents</groupId>
        <artifactId>cloudevents-http-vertx</artifactId>
        <version>${cloudevents.version}</version>
    </dependency>
    <dependency>
        <groupId>io.cloudevents</groupId>
        <artifactId>cloudevents-api</artifactId>
        <version>${cloudevents.version}</version>
    </dependency>
    <dependency>
        <groupId>io.cloudevents</groupId>
        <artifactId>cloudevents-json-jackson</artifactId>
        <version>${cloudevents.version}</version>
    </dependency>
    说明

    cloudevents.version使用最新里程碑版本2.0.0-milestone1。

背景信息

发布事件

二进制示例代码如下:

import java.net.URI;
import java.util.UUID;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.http.vertx.VertxMessageFactory;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;

public class SampleBinaryHTTPClient{

    private static String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    private static String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    private static String endpoint = "http://endpoint/openapi/putEvents";

    public static void main(String[] args) throws Exception {

        final Vertx vertx = Vertx.vertx();
        final HttpClient httpClient = vertx.createHttpClient();

        // 创建一个事件模板来设置基本的CloudEvent属性。
        CloudEventBuilder eventTemplate = CloudEventBuilder.v1()
            .withSource(URI.create("https://github.com/cloudevents/sdk-java/tree/master/examples/vertx"))
            .withType("vertx.example");

        // 创建HTTP请求。
        final HttpClientRequest request = httpClient.postAbs(endpoint)
            .handler(response -> {
                System.out.println(response.statusMessage());

            })
            .exceptionHandler(System.err::println);

        String id = UUID.randomUUID()
            .toString();
        String data = "{\"name\":\"Eventbridge\",\"number\":100}";

        // 从模板中创建事件。
        final CloudEvent event = eventTemplate.newBuilder()
            .withId(id)
            .withData("application/json", data.getBytes())
            .withExtension("aliyuneventbusname", "jingluo-bus")
            .withSource(URI.create("https://github.com/cloudevents/sdk-java/tree/master/examples/vertx"))
            .withType("vertx.example")
            .withSubject("acs:oss:cn-hangzhou:1234567:xls-papk/game_apk/123.jpg")
            .build();

        request.putHeader("content-type", "application/json");
        request.putHeader("authorization",
            "acs" + ":" + accessKeyId + ":" + SignatureHelper.getSignature(SignatureHelper.getStringToSign(request),
                accessKeySecret) + "");
        VertxMessageFactory.createWriter(request)
            .writeBinary(event);
    }
}

结构化示例代码如下:

import java.net.URI;
import java.util.UUID;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.http.vertx.VertxMessageFactory;
import io.cloudevents.jackson.JsonFormat;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;

public class SampleStructuredHTTPClient {

    private static String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
;
    private static String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    private static String endpoint = "http://endpoint/openapi/putEvents";

    public static void main(String[] args) throws Exception {

        final Vertx vertx = Vertx.vertx();
        final HttpClient httpClient = vertx.createHttpClient();

        // 创建一个事件模板来设置基本的CloudEvent属性。
        CloudEventBuilder eventTemplate = CloudEventBuilder.v1()
            .withSource(URI.create("https://github.com/cloudevents/sdk-java/tree/master/examples/vertx"))
            .withType("vertx.example");

        // 创建HTTP请求。
        final HttpClientRequest request = httpClient.postAbs(endpoint)
            .handler(response -> {
                System.out.println(response.statusMessage());

            })
            .exceptionHandler(System.err::println);

        String id = UUID.randomUUID()
            .toString();
        String data = "{\"name\":\"Eventbridge\",\"number\":100}";

        // 从模板中创建事件。
        final CloudEvent event = eventTemplate.newBuilder()
            .withId(id)
            .withData("application/json", data.getBytes())
            .withExtension("aliyuneventbusname", "jingluo-bus")
            .withSource(URI.create("https://github.com/cloudevents/sdk-java/tree/master/examples/vertx"))
            .withType("vertx.example")
            .withSubject("acs:oss:cn-hangzhou:1234567:xls-papk/game_apk/123.jpg")
            .build();

        request.putHeader("content-type", "application/cloudevents+json");
        request.putHeader("authorization",
            "acs" + ":" + accessKeyId + ":" + SignatureHelper.getSignature(SignatureHelper.getStringToSign(request),
                accessKeySecret) + "");
        VertxMessageFactory.createWriter(request)
            .writeStructured(event, new JsonFormat());
    }

}

签名生成工具

示例代码如下:

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;

import io.netty.util.internal.StringUtil;
import io.vertx.core.MultiMap;
import io.vertx.core.http.HttpClientRequest;

public class SignatureHelper {

    public static String getStringToSign(HttpClientRequest request) {
        String method = request.method()
            .name();
        String pathname = request.path();
        MultiMap headers = request.headers();
        Map<String, String> query = buildQueryMap(request.query());
        String contentMD5 = headers.get("content-md5") == null ? "" : (String)headers.get("content-md5");
        String contentType = headers.get("content-type") == null ? "" : (String)headers.get("content-type");
        String date = headers.get("date") == null ? "null" : (String)headers.get("date");
        String header = method + "\n" + contentMD5 + "\n" + contentType + "\n" + date + "\n";
        String canonicalizedHeaders = getCanonicalizedHeaders(headers);
        String canonicalizedResource = getCanonicalizedResource(pathname, query);
        String stringToSign = header + canonicalizedHeaders + canonicalizedResource;
        return stringToSign;
    }

    private static Map<String, String> buildQueryMap(String query) {
        Map<String, String> map = new HashMap<>();
        if (!StringUtil.isNullOrEmpty(query)) {
            String[] params = query.split("&");
            Arrays.stream(params)
                .forEach(param -> {
                    String[] kv = param.split("=");
                    map.put(kv[0], kv[1]);
                });
        }
        return map;
    }

    protected static String getCanonicalizedHeaders(MultiMap headers) {
        String prefix = "x-acs";
        Set<String> keys = headers.names();
        List<String> canonicalizedKeys = new ArrayList();
        Iterator var4 = keys.iterator();

        while (var4.hasNext()) {
            String key = (String)var4.next();
            if (key.startsWith(prefix)) {
                canonicalizedKeys.add(key);
            }
        }
        String[] canonicalizedKeysArray = (String[])canonicalizedKeys.toArray(new String[canonicalizedKeys.size()]);
        Arrays.sort(canonicalizedKeysArray);
        StringBuilder result = new StringBuilder();

        for (int i = 0; i < canonicalizedKeysArray.length; ++i) {
            String key = canonicalizedKeysArray[i];
            result.append(key);
            result.append(":");
            result.append(((String)headers.get(key)).trim());
            result.append("\n");
        }

        return result.toString();
    }

    protected static String getCanonicalizedResource(String pathname, Map<String, String> query) {
        String[] keys = (String[])query.keySet()
            .toArray(new String[query.size()]);
        if (keys.length <= 0) {
            return pathname;
        } else {
            Arrays.sort(keys);
            StringBuilder result = new StringBuilder(pathname);
            result.append("?");

            for (int i = 0; i < keys.length; ++i) {
                String key = keys[i];
                result.append(key);
                String value = (String)query.get(key);
                if (!StringUtil.isNullOrEmpty(value) && !"".equals(value.trim())) {
                    result.append("=");
                    result.append(value);
                }

                result.append("&");
            }
            return result.deleteCharAt(result.length() - 1)
                .toString();
        }
    }

    public static String getSignature(String stringToSign, String secret) throws Exception {
        Mac mac = Mac.getInstance("HmacSHA1");
        mac.init(new SecretKeySpec(secret.getBytes("UTF-8"), "HmacSHA1"));
        byte[] signData = mac.doFinal(stringToSign.getBytes("UTF-8"));
        return Base64.getEncoder()
            .encodeToString(signData);
    }
}