创建订阅

订阅功能使用介绍

目前用户在DataHub上消费Topic数据,为了做到“断点续消费”的功能,即消费者failover重启后可以继续从failover时间点继续获取数据,需要用户自己保存当前消费的点位信息,同时用户还需要关心自己的点位存储服务的高可用,这无疑增加了用户开发应用程序的复杂度。基于此,DataHub新上线的订阅服务提供了服务端保存用户消费点位的功能,用户只需要通过简单的几步配置,然后在自己的应用逻辑里添加几行简单的处理逻辑,就可以拥有一个对自己“透明”的高可用的点位存储服务。另外,订阅服务还提供了灵活的点位重置功能,从而保证用了“At Least Once”的消费语义,比如用户发现自己应用程序有个时间段消费的数据处理上存在问题,想重新消费,此时只需要将点位重置到对应的时间点,并且无须重启自己的应用程序,可以做到应用程序自动感知。

创建订阅

需要确保自己账号有权限对特定project下的topic有订阅权限,具体授权参见权限控制。创建步骤如下:

  1. 打开Topic页面,点击右上角。

    3-1

  2. 填写订阅详情,点击创建。

    • 订阅应用:用来说明当前订阅的应用名称。

    • 描述:当前订阅的详情描述。

    3-2

  3. 单击消费点位下方搜索按钮,即可查看所有shard消费情况。

    3-3

    3-4

示例代码

订阅功能为用户提供了点位存储的能力,与DataHub读写功能(参见Java SDK)并无必然联系,不过二者可以配合使用,即数据读取后用户需要将消费点位进行存储的场景。

前置条件

您需要在工程中配置相应的Access KeySecret Key,推荐使用环境变量的形式在配置文件中配置,详情可参见:管理访问凭据

datahub.endpoint=<yourEndpoint>
datahub.accessId=<yourAccessKeyId>
datahub.accessKey=<yourAccessKeySecret>
重要

阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。

强烈建议不要把AccessKey IDAccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。

参考代码

//点位消费示例,并在消费过程中进行点位的提交
@Value("${datahub.endpoint}")
String endpoint ;
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
public void offset_consumption(int maxRetry) {
    String projectName = "<YourProjectName>";
    String topicName = "<YourTopicName>";
    String subId = "<YourSubId>";
    String shardId = "0";
    List<String> shardIds = Arrays.asList(shardId);
    // 创建DataHubClient实例
    DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
            .setDatahubConfig(
                    new DatahubConfig(endpoint,
                            // 是否开启二进制传输,服务端2.12版本开始支持
                            new AliyunAccount(accessId, accessKey), true))
            .build();
    RecordSchema schema = datahubClient.getTopic(projectName, topicName).getRecordSchema();
    OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
    SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
    // 1、获取当前点位的cursor,如果当前点位已过期则获取生命周期内第一条record的cursor,未消费同样获取生命周期内第一条record的cursor
    String cursor = "";
    //sequence < 0说明未消费
    if (subscriptionOffset.getSequence() < 0) {
        // 获取生命周期内第一条record的cursor
        cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
    } else {
        // 获取下一条记录的Cursor
        long nextSequence = subscriptionOffset.getSequence() + 1;
        try {
            //按照SEQUENCE getCursor可能报SeekOutOfRange错误,表示当前cursor的数据已过期
            cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
        } catch (SeekOutOfRangeException e) {
            // 获取生命周期内第一条record的cursor
            cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
        }
    }
    // 2、读取并保存点位,这里以读取Tuple数据为例,并且每1000条记录保存一次点位
    long recordCount = 0L;
    // 每次读取1000条record
    int fetchNum = 1000;
    int retryNum = 0;
    int commitNum = 1000;
    while (retryNum < maxRetry) {
        try {
            GetRecordsResult getRecordsResult = datahubClient.getRecords(projectName, topicName, shardId, schema, cursor, fetchNum);
            if (getRecordsResult.getRecordCount() <= 0) {
                // 无数据,sleep后读取
                System.out.println("no data, sleep 1 second");
                Thread.sleep(1000);
                continue;
            }
            for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
                //消费数据
                TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
                System.out.println("field1:" + data.getField("field1") + "\t"
                        + "field2:" + data.getField("field2"));
                // 处理数据完成后,设置点位
                recordCount++;
                subscriptionOffset.setSequence(recordEntry.getSequence());
                subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
                // commit offset every 1000 records
                if (recordCount % commitNum == 0) {
                    //提交点位的点位
                    Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
                    offsetMap.put(shardId, subscriptionOffset);
                    datahubClient.commitSubscriptionOffset(projectName, topicName, subId, offsetMap);
                    System.out.println("commit offset successful");
                }
            }
            cursor = getRecordsResult.getNextCursor();
        } catch (SubscriptionOfflineException | SubscriptionSessionInvalidException e) {
            // 退出. Offline: 订阅下线; SessionChange: 表示订阅被其他客户端同时消费
            e.printStackTrace();
            throw e;
        } catch (SubscriptionOffsetResetException e) {
            // 点位被重置,需要重新获取SubscriptionOffset版本信息
            SubscriptionOffset offset = datahubClient.getSubscriptionOffset(projectName, topicName, subId, shardIds).getOffsets().get(shardId);
            subscriptionOffset.setVersionId(offset.getVersionId());
            // 点位被重置之后,需要重新获取点位,获取点位的方法应该与重置点位时一致,
            // 如果重置点位时,同时设置了sequence和timestamp,那么既可以用SEQUENCE获取,也可以用SYSTEM_TIME获取
            // 如果重置点位时,只设置了sequence,那么只能用sequence获取,
            // 如果重置点位时,只设置了timestamp,那么只能用SYSTEM_TIME获取点位
            // 一般情况下,优先使用SEQUENCE,其次是SYSTEM_TIME,如果都失败,则采用OLDEST获取
            cursor = null;
            if (cursor == null) {
                try {
                    long nextSequence = offset.getSequence() + 1;
                    cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
                    System.out.println("get cursor successful");
                } catch (DatahubClientException exception) {
                    System.out.println("get cursor by SEQUENCE failed, try to get cursor by SYSTEM_TIME");
                }
            }
            if (cursor == null) {
                try {
                    cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SYSTEM_TIME, offset.getTimestamp()).getCursor();
                    System.out.println("get cursor successful");
                } catch (DatahubClientException exception) {
                    System.out.println("get cursor by SYSTEM_TIME failed, try to get cursor by OLDEST");
                }
            }
            if (cursor == null) {
                try {
                    cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
                    System.out.println("get cursor successful");
                } catch (DatahubClientException exception) {
                    System.out.println("get cursor by OLDEST failed");
                    System.out.println("get cursor failed!!");
                    throw e;
                }
            }
        } catch (LimitExceededException e) {
            // limit exceed, retry
            e.printStackTrace();
            retryNum++;
        } catch (DatahubClientException e) {
            // other error, retry
            e.printStackTrace();
            retryNum++;
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }
    }
}

查看运行结果

  • 第一次启动时会从最早的数据开始消费,运行过程中可以刷新webconsole上的订阅页面,shard 的消费点位都在往前移动。

  • 如果在消费过程中,通过webconsole上的重置点位功能来手动调整点位的话,我们的消费程序会自动感知到点位变化从新调整后的点位开始消费,这时客户端通过捕获OffsetResetedException异常后调用getSubscriptionOffset接口从服务端获取到最新的SubscriptionOffset对象,然后继续从新点位开始消费。

说明

同一个订阅下的相同shard不要同时被多个消费线程/进程消费,否则点位会被交替覆盖,也就是最终服务端存储的点位是未定义的,这种情况下服务端会抛OffsetSessionChangedException异常,建议客户端对这类异常进行捕获后做exit处理,检查是否存在重复消费的设计。