读写数据

本文为您展示DataHub的 Java SDK的读写数据操作。

前提条件

  • 状态为CLOSEDACTIVEshard可以读取数据。

  • 状态为ACTIVEshard可以写数据。

重要

如无特殊需求,建议使用High-Level SDK 进行读写操作,High-Level SDK 是对 Low-Level SDK 读写相关 API 的上层封装,更合适做数据的读写。

具体信息请参考:Producer

读数据

操作步骤

  1. 获取cursor

    • 读取Topic下的数据,需要指定对应的shard,同时需要指定数据读取的游标位置Cursor。Cursor的获取方式有以下四种:

      • OLDEST : 表示获取的cursor指向当前有效数据中时间最久远的record

      • LATEST : 表示获取的cursor指向当前最新的record

      • SEQUENCE : 表示获取的cursor指向该序列的record

      • SYSTEM_TIME : 表示获取的cursor指向该大于等于该时间戳的第一条record

    • 获取方式选择。

      • 若需从头开始读取数据,选择OLDEST来进行获取,将会从第一条数据开始读取。

      • 若需对数据抽样使用,查看某时间点之后的数据是否正常,选择SYSTEM_TIME 模式,将会从获取到的sequence+1位置开始读取。

      • 若需查看最新数据情况,选择LATEST来进行获取,读取最新写入的数据。

      • 若需要查看新写入的数据前N,选择SEQUENCE,通过SEQUENCE-N来获取新写入前N条数据。

      说明

      读取的数据一定要在有效期内,也就是生命周期内,否则读取数据会报错。

  2. 将获取到的cursor值传入到getRecords方法。

    说明

    DataHub已经提供了订阅功能,用户可直接关联订阅对数据进行消费,服务端自动保存点位,读数据的主要用途在于抽样查看数据的质量。

代码示例

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

shardId

String

shard ID

CursorType

CursorType

cursor类型,有OLDEST、LATEST、SYSTEM_TIME、SEQUENCE 四种

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

SeekOutOfRangeException

SeekOutOfRange

getCursor时,给定的sequence不在有效范围内(通常数据已过期),或给定的timestamp大于当前时间。

示例代码

  • 对数据抽样使用

    需先将Date转换为timestamp,然后获取cursor

    public static void getcursor(String projectName,String topicName) {
        String shardId = "5";
        try {
            //将时间转为时间戳形式
            String time = "2019-07-01 10:00:00";
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            long timestamp = 0L;
            try {
                Date date = simpleDateFormat.parse(time);
                timestamp = date.getTime();//获取时间的时间戳
                //System.out.println(timestamp);
            } 
            //获取时间time之后的数据读取位置
            String timeCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SYSTEM_TIME, timestamp).getCursor();
            System.out.println("get cursor successful");
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());  
        } catch (ParseException e) {
                System.out.println(e.getErrorOffset());
            }
    }
  • 从头开始读取数据

    public static void getcursor(String projectName,String topicName) {
        String shardId = "5";
        try {
            /* OLDEST用法示例 */
            String oldestCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
            System.out.println("get cursor successful");
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());
    
        }
    }
  • 查看最新数据情况

    • 最新写入的最后一条数据。

    • 最新写入的前N条数据。

      需要先获取最新写入数据的sequence,然后再获取cursor。

      public static void getcursor(String projectName,String topicName) {
          String shardId = "5";
          try {
              /* LATEST用法示例 */
              String latestCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.LATEST).getCursor();
              /* SEQUENCE用法示例 */
              //获取最新数据的sequence
              long seq = datahubClient.getCursor(projectName, topicName, shardId, CursorType.LATEST).getSequence();
              //获取最新的十条数据的读取位置
              String seqCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, seq - 9).getCursor();
              }
               catch (DatahubClientException e) {
              System.out.println(e.getErrorMessage());
      
          }
      
      }

读取收据接口

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

shardId

String

shard ID

schema

RecordSchema

表结构,Tuple类型Topic需要该参数,Blob类型不涉及

cursor

String

cursor

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

代码示例

  • 读取Tuple Topic数据。

     public static void example(String projectName,String topicName) {
        //每次最多读取数据量
         int recordLimit = 1000;
         String shardId = "7";
         // 获取cursor, 这里获取有效数据中时间最久远的record游标
         // 注:正常情况下,getCursor只需在初始化时获取一次,然后使用getRecordsnextCursor进行下一次读取
         String cursor = "";
         try {
             cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
         }  catch (DatahubClientException e) {
                System.out.println(e.getErrorMessage());         
         }
         while (true) {
             try {
                 GetRecordsResult result = datahubClient.getRecords(projectName, topicName, shardId, recordSchema, cursor, recordLimit);
                 if (result.getRecordCount() <= 0) {
                    // 无数据,sleep后读取
                     Thread.sleep(10000);
                     continue;
                 }
                 for (RecordEntry entry : result.getRecords()) {
                     TupleRecordData data = (TupleRecordData) entry.getRecordData();
                     System.out.println("field1:" + data.getField("field1") + "\t"
                             + "field2:" + data.getField("field2"));
                 }
                 // 拿到下一个游标
                 cursor = result.getNextCursor();
             } catch (InvalidCursorException ex) {
                 // 非法游标或游标已过期,建议重新定位后开始消费
                 cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
             }  catch (DatahubClientException e) {
                System.out.println(e.getErrorMessage());;
    
             } 
         }
     }
  • 读取Blob topic数据。

    public static void example(String projectName,String topicName) {
        //每次最多读取数据量
        int recordLimit = 1000;
        String shardId = "7";
        // 获取cursor, 这里获取有效数据中时间最久远的record游标
        // 注:正常情况下,getCursor只需在初始化时获取一次,然后使用getRecordsnextCursor进行下一次读取
        String cursor = "";
        try {
            cursor = datahubClient.getCursor(projectName, blobTopicName, shardId, CursorType.OLDEST).getCursor();
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());     
        }
        while (true) {
            try {
                GetRecordsResult result = datahubClient.getRecords(projectName, blobTopicName, shardId, recordSchema, cursor, recordLimit);
                if (result.getRecordCount() <= 0) {
                    // 无数据,sleep后读取
                    Thread.sleep(10000);
                    continue;
                }
                /* 消费数据 */
                for (RecordEntry record: result.getRecords()){
                     BlobRecordData data = (BlobRecordData) record.getRecordData();
                     System.out.println(new String(data.getData()));
                }
                // 拿到下一个游标
                cursor = result.getNextCursor();
            } catch (InvalidCursorException ex) {
                // 非法游标或游标已过期,建议重新定位后开始消费
                cursor = datahubClient.getCursor(Constant.projectName, Constant.blobTopicName, shardId, CursorType.OLDEST).getCursor();
            } catch (DatahubClientException e) {
                System.out.println(e.getErrorMessage()); 
            } 
        }
    }

写数据

服务器2.12之后版本开始支持PutRecordsByShardResult接口,之前版本putRecords接口,使用putRecordsByShard接口时需指定写入的shard,否则会默认写入第一个处于ACTIVE状态的shard。两个方法中传入参数records是一个List对象,每个元素为一个record,但是必须为相同类型的record,即Tuple类型或者Blob类型。DataHub目前支持按照Shard写入 (服务端 >= 2.12版本) 以及混合写入,分别对应putRecordsByShardputRecords两个接口。针对第二个接口,用户需要判断PutRecordsResult结果以确认数据是否写入成功;而putRecordsByShard接口则直接通过异常告知用户是否成功。如果服务端支持,建议用户使用putRecordsByShard接口。

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

shardId

String

shard ID

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

代码示例

  • 写入Tuple topic

        // 写入Tuple型数据
        public static void tupleExample(String project,String topic,int retryTimes) {
            // 获取schema
            RecordSchema recordSchema = datahubClient.getTopic(project,topic ).getRecordSchema();
            // 生成十条数据
            List<RecordEntry> recordEntries = new ArrayList<>();
            for (int i = 0; i < 10; ++i) {
                RecordEntry recordEntry = new RecordEntry();
                // 对每条数据设置额外属性,例如ip 机器名等。可以不设置额外属性,不影响数据写入
                recordEntry.addAttribute("key1", "value1");
                TupleRecordData data = new TupleRecordData(recordSchema);
                data.setField("field1", "HelloWorld");
                data.setField("field2", 1234567);
                recordEntry.setRecordData(data);
                recordEntries.add(recordEntry);
            }
            try {
                PutRecordsResult result = datahubClient.putRecords(project, topic, recordEntries);
                int i = result.getFailedRecordCount();
                if (i > 0) {
                    retry(datahubClient, result.getFailedRecords(), retryTimes, project, topic);
                }
            }  catch (DatahubClientException e) {
                System.out.println("requestId:" + e.getRequestId() + "\tmessage:" + e.getErrorMessage());
            }
        }
        //重试机制
        public static void retry(DatahubClient client, List<RecordEntry> records, int retryTimes, String project, String topic) {
            boolean suc = false;
            while (retryTimes != 0) {
                retryTimes = retryTimes - 1;
                PutRecordsResult recordsResult = client.putRecords(project, topic, records);
                if (recordsResult.getFailedRecordCount() > 0) {
                    retry(client,recordsResult.getFailedRecords(),retryTimes,project,topic);
                }
                suc = true;
                break;
            }
            if (!suc) {
                System.out.println("retryFailure");
            }
        }
    ​```java
    <br />
    <br />**2). 写入Blob topic**<br />
    
    ​```java
    // 写入blob型数据
    public static void blobExample() {
        // 生成十条数据
        List<RecordEntry> recordEntries = new ArrayList<>();
        String shardId = "4";
        for (int i = 0; i < 10; ++i) {
            RecordEntry recordEntry = new RecordEntry();
            // 对每条数据设置额外属性
            recordEntry.addAttribute("key1", "value1");
            BlobRecordData data = new BlobRecordData("123456".getBytes(Charsets.UTF_8));
            recordEntry.setRecordData(data);
            recordEntry.setShardId(shardId);
            recordEntries.add(recordEntry);
            recordEntry.setShardId("0");
        }
        while (true) {
            try {
                // 服务端从2.12版本开始支持,之前版本请使用putRecords接口
                //datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
                datahubClient.putRecords(Constant.projectName, Constant.topicName, recordEntries);
                System.out.println("write data  successful");
                break;
            } catch (DatahubClientException e) {
                System.out.println(e.getErrorMessage());            
            }
        }
    }
  • 多方式写入

    DataHub 2.12之前版本,DataHub仅支持putRecords接口,在RecordEntry类中包含shardIdpartitionKeyhashKey三个属性,用户通过指定对应属性的值决定数据写入到哪个Shard中。

    说明

    2.12及之后版本,建议用户使用putRecordsByShard接口,避免服务端partition造成的性能损耗。

    • 按照ShardID写入推荐方式,使用示例如下:

      RecordEntry entry = new RecordEntry();
      entry.setShardId("0");
    • HashKey写入指定一个128 bitMD5值。 按照HashKey写入,根据ShardShard操作决定数据写入的Shard使用示例:

      RecordEntry entry = new RecordEntry();
      entry.setHashKey("7FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD");
    • PartitionKey写入指定一个String类型参数作为PartitionKey,系统根据该StringMD5值以及ShardShard操作决定写入的Shard使用示例:

      RecordEntry entry = new RecordEntry();
      entry.setPartitionKey("TestPartitionKey");