本文为您展示DataHub的 Java SDK的读写数据操作。
前提条件
状态为
CLOSED
和ACTIVE
的shard
可以读取数据。状态为
ACTIVE
的shard
可以写数据。
如无特殊需求,建议使用High-Level SDK 进行读写操作,High-Level SDK 是对 Low-Level SDK 读写相关 API 的上层封装,更合适做数据的读写。
具体信息请参考:Producer
读数据
操作步骤
获取
cursor
。读取Topic下的数据,需要指定对应的shard,同时需要指定数据读取的游标位置Cursor。Cursor的获取方式有以下四种:
OLDEST : 表示获取的
cursor
指向当前有效数据中时间最久远的record。LATEST : 表示获取的
cursor
指向当前最新的record。SEQUENCE : 表示获取的
cursor
指向该序列的record。SYSTEM_TIME : 表示获取的
cursor
指向该大于等于该时间戳的第一条record。
获取方式选择。
若需从头开始读取数据,选择OLDEST来进行获取,将会从第一条数据开始读取。
若需对数据抽样使用,查看某时间点之后的数据是否正常,选择SYSTEM_TIME 模式,将会从获取到的
sequence+1
位置开始读取。若需查看最新数据情况,选择LATEST来进行获取,读取最新写入的数据。
若需要查看新写入的数据前N条,选择SEQUENCE,通过
SEQUENCE-N
来获取新写入前N条数据。
说明读取的数据一定要在有效期内,也就是生命周期内,否则读取数据会报错。
将获取到的cursor值传入到getRecords方法。
说明DataHub已经提供了订阅功能,用户可直接关联订阅对数据进行消费,服务端自动保存点位,读数据的主要用途在于抽样查看数据的质量。
代码示例
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | String | Topic名称。 |
shardId | String | shard ID |
CursorType | CursorType | cursor类型,有OLDEST、LATEST、SYSTEM_TIME、SEQUENCE 四种 |
异常说明
异常类名 | 错误码 | 异常说明 |
DatahubClientException | - | 并且是所有异常的基类 |
SeekOutOfRangeException |
| getCursor时,给定的sequence不在有效范围内(通常数据已过期),或给定的timestamp大于当前时间。 |
示例代码
对数据抽样使用
需先将Date转换为
timestamp
,然后获取cursor
。public static void getcursor(String projectName,String topicName) { String shardId = "5"; try { //将时间转为时间戳形式 String time = "2019-07-01 10:00:00"; SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long timestamp = 0L; try { Date date = simpleDateFormat.parse(time); timestamp = date.getTime();//获取时间的时间戳 //System.out.println(timestamp); } //获取时间time之后的数据读取位置 String timeCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SYSTEM_TIME, timestamp).getCursor(); System.out.println("get cursor successful"); } catch (DatahubClientException e) { System.out.println(e.getErrorMessage()); } catch (ParseException e) { System.out.println(e.getErrorOffset()); } }
从头开始读取数据
public static void getcursor(String projectName,String topicName) { String shardId = "5"; try { /* OLDEST用法示例 */ String oldestCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor(); System.out.println("get cursor successful"); } catch (DatahubClientException e) { System.out.println(e.getErrorMessage()); } }
查看最新数据情况
最新写入的最后一条数据。
最新写入的前N条数据。
需要先获取最新写入数据的sequence,然后再获取cursor。
public static void getcursor(String projectName,String topicName) { String shardId = "5"; try { /* LATEST用法示例 */ String latestCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.LATEST).getCursor(); /* SEQUENCE用法示例 */ //获取最新数据的sequence long seq = datahubClient.getCursor(projectName, topicName, shardId, CursorType.LATEST).getSequence(); //获取最新的十条数据的读取位置 String seqCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, seq - 9).getCursor(); } catch (DatahubClientException e) { System.out.println(e.getErrorMessage()); } }
读取收据接口
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | String | Topic名称。 |
shardId | String | shard ID |
schema | RecordSchema | 表结构,Tuple类型Topic需要该参数,Blob类型不涉及 |
cursor | String | cursor |
异常说明
异常类名 | 错误码 | 异常说明 |
DatahubClientException | - | 并且是所有异常的基类 |
代码示例
读取Tuple Topic数据。
public static void example(String projectName,String topicName) { //每次最多读取数据量 int recordLimit = 1000; String shardId = "7"; // 获取cursor, 这里获取有效数据中时间最久远的record游标 // 注:正常情况下,getCursor只需在初始化时获取一次,然后使用getRecords的nextCursor进行下一次读取 String cursor = ""; try { cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor(); } catch (DatahubClientException e) { System.out.println(e.getErrorMessage()); } while (true) { try { GetRecordsResult result = datahubClient.getRecords(projectName, topicName, shardId, recordSchema, cursor, recordLimit); if (result.getRecordCount() <= 0) { // 无数据,sleep后读取 Thread.sleep(10000); continue; } for (RecordEntry entry : result.getRecords()) { TupleRecordData data = (TupleRecordData) entry.getRecordData(); System.out.println("field1:" + data.getField("field1") + "\t" + "field2:" + data.getField("field2")); } // 拿到下一个游标 cursor = result.getNextCursor(); } catch (InvalidCursorException ex) { // 非法游标或游标已过期,建议重新定位后开始消费 cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor(); } catch (DatahubClientException e) { System.out.println(e.getErrorMessage());; } } }
读取Blob topic数据。
public static void example(String projectName,String topicName) { //每次最多读取数据量 int recordLimit = 1000; String shardId = "7"; // 获取cursor, 这里获取有效数据中时间最久远的record游标 // 注:正常情况下,getCursor只需在初始化时获取一次,然后使用getRecords的nextCursor进行下一次读取 String cursor = ""; try { cursor = datahubClient.getCursor(projectName, blobTopicName, shardId, CursorType.OLDEST).getCursor(); } catch (DatahubClientException e) { System.out.println(e.getErrorMessage()); } while (true) { try { GetRecordsResult result = datahubClient.getRecords(projectName, blobTopicName, shardId, recordSchema, cursor, recordLimit); if (result.getRecordCount() <= 0) { // 无数据,sleep后读取 Thread.sleep(10000); continue; } /* 消费数据 */ for (RecordEntry record: result.getRecords()){ BlobRecordData data = (BlobRecordData) record.getRecordData(); System.out.println(new String(data.getData())); } // 拿到下一个游标 cursor = result.getNextCursor(); } catch (InvalidCursorException ex) { // 非法游标或游标已过期,建议重新定位后开始消费 cursor = datahubClient.getCursor(Constant.projectName, Constant.blobTopicName, shardId, CursorType.OLDEST).getCursor(); } catch (DatahubClientException e) { System.out.println(e.getErrorMessage()); } } }
写数据
服务器2.12之后版本开始支持PutRecordsByShardResult接口,之前版本putRecords接口,使用putRecordsByShard接口时需指定写入的shard,否则会默认写入第一个处于ACTIVE状态的shard。两个方法中传入参数records是一个List对象,每个元素为一个record,但是必须为相同类型的record,即Tuple类型或者Blob类型。DataHub目前支持按照Shard写入 (服务端 >= 2.12版本) 以及混合写入,分别对应putRecordsByShard
和putRecords
两个接口。针对第二个接口,用户需要判断PutRecordsResult
结果以确认数据是否写入成功;而putRecordsByShard
接口则直接通过异常告知用户是否成功。如果服务端支持,建议用户使用putRecordsByShard
接口。
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | String | Topic名称。 |
shardId | String | shard ID |
异常说明
异常类名 | 错误码 | 异常说明 |
DatahubClientException | - | 并且是所有异常的基类 |
代码示例
写入Tuple topic
// 写入Tuple型数据 public static void tupleExample(String project,String topic,int retryTimes) { // 获取schema RecordSchema recordSchema = datahubClient.getTopic(project,topic ).getRecordSchema(); // 生成十条数据 List<RecordEntry> recordEntries = new ArrayList<>(); for (int i = 0; i < 10; ++i) { RecordEntry recordEntry = new RecordEntry(); // 对每条数据设置额外属性,例如ip 机器名等。可以不设置额外属性,不影响数据写入 recordEntry.addAttribute("key1", "value1"); TupleRecordData data = new TupleRecordData(recordSchema); data.setField("field1", "HelloWorld"); data.setField("field2", 1234567); recordEntry.setRecordData(data); recordEntries.add(recordEntry); } try { PutRecordsResult result = datahubClient.putRecords(project, topic, recordEntries); int i = result.getFailedRecordCount(); if (i > 0) { retry(datahubClient, result.getFailedRecords(), retryTimes, project, topic); } } catch (DatahubClientException e) { System.out.println("requestId:" + e.getRequestId() + "\tmessage:" + e.getErrorMessage()); } } //重试机制 public static void retry(DatahubClient client, List<RecordEntry> records, int retryTimes, String project, String topic) { boolean suc = false; while (retryTimes != 0) { retryTimes = retryTimes - 1; PutRecordsResult recordsResult = client.putRecords(project, topic, records); if (recordsResult.getFailedRecordCount() > 0) { retry(client,recordsResult.getFailedRecords(),retryTimes,project,topic); } suc = true; break; } if (!suc) { System.out.println("retryFailure"); } } ```java <br /> <br />**2). 写入Blob topic**<br /> ```java // 写入blob型数据 public static void blobExample() { // 生成十条数据 List<RecordEntry> recordEntries = new ArrayList<>(); String shardId = "4"; for (int i = 0; i < 10; ++i) { RecordEntry recordEntry = new RecordEntry(); // 对每条数据设置额外属性 recordEntry.addAttribute("key1", "value1"); BlobRecordData data = new BlobRecordData("123456".getBytes(Charsets.UTF_8)); recordEntry.setRecordData(data); recordEntry.setShardId(shardId); recordEntries.add(recordEntry); recordEntry.setShardId("0"); } while (true) { try { // 服务端从2.12版本开始支持,之前版本请使用putRecords接口 //datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries); datahubClient.putRecords(Constant.projectName, Constant.topicName, recordEntries); System.out.println("write data successful"); break; } catch (DatahubClientException e) { System.out.println(e.getErrorMessage()); } } }
多方式写入
在DataHub 2.12之前版本,DataHub仅支持
putRecords
接口,在RecordEntry
类中包含shardId
、partitionKey
和hashKey
三个属性,用户通过指定对应属性的值决定数据写入到哪个Shard中。说明2.12及之后版本,建议用户使用putRecordsByShard接口,避免服务端partition造成的性能损耗。
按照ShardID写入推荐方式,使用示例如下:
RecordEntry entry = new RecordEntry(); entry.setShardId("0");
按HashKey写入指定一个128 bit的MD5值。 按照HashKey写入,根据Shard的Shard操作决定数据写入的Shard使用示例:
RecordEntry entry = new RecordEntry(); entry.setHashKey("7FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD");
按PartitionKey写入指定一个String类型参数作为PartitionKey,系统根据该String的MD5值以及Shard的Shard操作决定写入的Shard使用示例:
RecordEntry entry = new RecordEntry(); entry.setPartitionKey("TestPartitionKey");