文档

创建通道

更新时间:

如果需要实时消费表中数据,您需要调用CreateTunnel接口为数据表创建一个通道,一张数据表上可以创建多个通道。在创建通道时需要指定数据表名称、通道名称和通道类型。

前提条件

  • 已初始化TunnelClient。

  • 已创建数据表。具体操作,请参见创建数据表

参数

请求参数

参数

说明

TableName

数据表名称。

TunnelName

通道的名称。

TunnelType

通道的类型。取值范围如下:

  • BaseData:全量类型。只能消费处理全量数据。

  • Stream:增量类型。只能消费处理增量数据。

  • BaseAndStream:全量加增量类型。全量数据消费处理完成后,再消费处理增量数据。

创建增量或者全量加增量类型的通道时,系统默认创建通道后写入的数据为增量数据。如果要消费指定时间点后的增量数据,请配置增量数据的起始时间戳(startTime)。

  • 起始时间戳的取值范围为[当前系统时间-Stream过期时间+5分钟 , 当前系统时间],单位为毫秒

    说明

    Stream过期时间为增量日志过期时长的毫秒单位时间戳,最大值为7天。您可以在为数据表开启Stream功能时设置,过期时长一经设置不能修改。

  • 您也可以配置增量数据的结束时间戳(endTime),结束时间戳的取值必须大于起始时间戳。

响应参数

参数

说明

TunnelId

通道的ID。

ResponseInfo

返回的一些其它字段。

RequestId

当次请求的Request ID。

示例

创建全量类型的通道

以下示例用于为数据表创建一个全量类型的通道。

//支持创建三种类型的通道TunnelType.BaseData(全量)、TunnelType.Stream(增量)和TunnelType.BaseAndStream(全量加增量)。
//本示例为创建全量类型的通道,如果需要创建其它类型的通道,则将CreateTunnelRequest中的TunnelType设置为相应的类型。
private static void createTunnel(TunnelClient client, String tableName, String tunnelName) {
    CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseData);
    CreateTunnelResponse resp = client.createTunnel(request);
    System.out.println("RequestId: " + resp.getRequestId());
    System.out.println("TunnelId: " + resp.getTunnelId());
}

创建增量或者全量加增量类型的通道

以下示例用于为数据表创建增量或全量加增量类型的通道,并指定读取的增量数据时间范围。

//创建增量或者全量加增量类型的通道,指定起始时间戳或结束时间戳,表示读取的增量数据时间范围。对于全量类型的通道,StreamTunnelConfig的配置不生效。
private static void createStreamTunnelByOffset(TunnelClient client,String tableName,String tunnelName, long startTime, long endTime){
    CreateTunnelRequest createTunnelRequest = new CreateTunnelRequest(tableName,tunnelName, TunnelType.Stream);//创建增量类型通道。
    //CreateTunnelRequest createTunnelRequest = new CreateTunnelRequest(tableName,tunnelName, TunnelType.BaseAndStream);//创建全量加增量类型通道。
    StreamTunnelConfig streamTunnelConfig = new StreamTunnelConfig();  
    /*
        指定增量数据的起始时间戳(startTime)和结束时间戳(endTime)。单位为毫秒,取值范围为[CurrentSystemTime - StreamExpiration + 5 minute, CurrentSystemTime)。
        其中CurrentSystemTime为当前系统时间的毫秒单位时间戳;StreamExpiration为增量日志过期时间的毫秒单位时间戳,最大值为7天。您可以在为数据表开启Stream功能时设置。
        结束时间戳的取值必须大于起始时间戳。
     */
    streamTunnelConfig.setStartOffset(startTime);
    streamTunnelConfig.setEndOffset(endTime);
    createTunnelRequest.setStreamTunnelConfig(streamTunnelConfig);
    CreateTunnelResponse resp = client.createTunnel(createTunnelRequest);
    System.out.println("RequestId: " + resp.getRequestId());
    System.out.println("TunnelId: " + resp.getTunnelId());
}

常见问题

使用通道服务消费数据时报错OTSTrimmedDataAccess Requested stream data is already trimmed or does not exist

相关文档