如果需要实时消费表中数据,您需要调用CreateTunnel接口为数据表创建一个通道,一张数据表上可以创建多个通道。在创建通道时需要指定数据表名称、通道名称和通道类型。
前提条件
已初始化TunnelClient。
已创建数据表。具体操作,请参见创建数据表。
参数
请求参数
参数 | 说明 |
TableName | 数据表名称。 |
TunnelName | 通道的名称。 |
TunnelType | 通道的类型。取值范围如下:
创建增量或者全量加增量类型的通道时,系统默认创建通道后写入的数据为增量数据。如果要消费指定时间点后的增量数据,请配置增量数据的起始时间戳(startTime)。
|
响应参数
参数 | 说明 |
TunnelId | 通道的ID。 |
ResponseInfo | 返回的一些其它字段。 |
RequestId | 当次请求的Request ID。 |
示例
创建全量类型的通道
以下示例用于为数据表创建一个全量类型的通道。
//支持创建三种类型的通道TunnelType.BaseData(全量)、TunnelType.Stream(增量)和TunnelType.BaseAndStream(全量加增量)。
//本示例为创建全量类型的通道,如果需要创建其它类型的通道,则将CreateTunnelRequest中的TunnelType设置为相应的类型。
private static void createTunnel(TunnelClient client, String tableName, String tunnelName) {
CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseData);
CreateTunnelResponse resp = client.createTunnel(request);
System.out.println("RequestId: " + resp.getRequestId());
System.out.println("TunnelId: " + resp.getTunnelId());
}
创建增量或者全量加增量类型的通道
以下示例用于为数据表创建增量或全量加增量类型的通道,并指定读取的增量数据时间范围。
//创建增量或者全量加增量类型的通道,指定起始时间戳或结束时间戳,表示读取的增量数据时间范围。对于全量类型的通道,StreamTunnelConfig的配置不生效。
private static void createStreamTunnelByOffset(TunnelClient client,String tableName,String tunnelName, long startTime, long endTime){
CreateTunnelRequest createTunnelRequest = new CreateTunnelRequest(tableName,tunnelName, TunnelType.Stream);//创建增量类型通道。
//CreateTunnelRequest createTunnelRequest = new CreateTunnelRequest(tableName,tunnelName, TunnelType.BaseAndStream);//创建全量加增量类型通道。
StreamTunnelConfig streamTunnelConfig = new StreamTunnelConfig();
/*
指定增量数据的起始时间戳(startTime)和结束时间戳(endTime)。单位为毫秒,取值范围为[CurrentSystemTime - StreamExpiration + 5 minute, CurrentSystemTime)。
其中CurrentSystemTime为当前系统时间的毫秒单位时间戳;StreamExpiration为增量日志过期时间的毫秒单位时间戳,最大值为7天。您可以在为数据表开启Stream功能时设置。
结束时间戳的取值必须大于起始时间戳。
*/
streamTunnelConfig.setStartOffset(startTime);
streamTunnelConfig.setEndOffset(endTime);
createTunnelRequest.setStreamTunnelConfig(streamTunnelConfig);
CreateTunnelResponse resp = client.createTunnel(createTunnelRequest);
System.out.println("RequestId: " + resp.getRequestId());
System.out.println("TunnelId: " + resp.getTunnelId());
}
常见问题
使用通道服务消费数据时报错OTSTrimmedDataAccess Requested stream data is already trimmed or does not exist
相关文档
关于API说明的更多信息,请参见CreateTunnel。
如果要快速使用通道服务消费数据,请参见快速使用通道服务文档进行操作。
如果要查看指定表的所有通道信息,您可以通过获取表内的通道信息实现。更多信息,请参见获取表内的通道信息。
如果要查看指定通道的详细信息,您可以通过获取通道的具体信息实现。更多信息,请参见获取通道的具体信息。
如果不再使用某个通道,您可以删除相应通道。更多信息,请参见删除通道。
使用通道服务可以实现数据迁移。更多信息,请参见将表格存储数据表中数据同步到另一个数据表或将表格存储时序表中数据同步到另一个时序表。
实时计算Flink能将通道服务的数据通道作为流式数据的输入,可实现通过Flink计算与分析表格存储数据,更多信息,请参见使用教程(宽表模型)和使用教程(时序模型)。
当前通道服务本身没有额外的费用开销。在消费通道服务数据时,表格存储会根据实际拉取的数据产生读吞吐量计量计费。更多信息,请参见计费概述。