文档

读写数据

更新时间:

读写数据

  • 用户可以调用SDK方法进行读写数据状态为CLOSED和ACTIVE的shard都可以读取数据,不过只有状态为ACTIVE的shard可以写数据。

  • 同时用户可以引入datahub-client-library依赖,datahub-client-library是在Java-SDK读写功能的封装,用户可以使用Producer实现均匀写入shard,也可以使用Consumer实现协同消费,(建议使用)

读数据

读取数据有两种方式,

  1. 使用SDK

  2. 使用协同消费

    使用SDK

    步骤一:获取cursor

    读取Topic下的数据,需要指定对应的shard,同时需要指定数据读取的游标位置Cursor。Cursor的获取方式有四种,分别是OLDEST, LATEST, SEQUENCE, SYSTEM_TIME。

  • OLDEST : 表示获取的cursor指向当前有效数据中时间最久远的record。

  • LATEST : 表示获取的cursor指向当前最新的record。

  • SEQUENCE : 表示获取的cursor指向该序列的record。

  • SYSTEM_TIME : 表示获取的cursor指向该大于等于该时间戳的第一条record。

    说明

    GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type);GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type, long param);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • shardId The id of the shard.

    • CursorType Which type used to get cursor.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

    • SeekOutOfRangeException

  • 示例代码

    public static void getcursor() {
      String shardId = "5";
      try {
          /* OLDEST用法示例 */
          String oldestCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
          /* LATEST用法示例 */
          String latestCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.LATEST).getCursor();
          /* SEQUENCE用法示例 */
          //获取最新数据的sequence
          long seq = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.LATEST).getSequence();
          //获取最新的十条数据的读取位置
          String seqCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, seq - 9).getCursor();
          /* SYSTEM_TIME用法示例 */
          //将时间转为时间戳形式
          String time = "2019-07-01 10:00:00";
          SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
          long timestamp = 0L;
          try {
              Date date = simpleDateFormat.parse(time);
              timestamp = date.getTime();//获取时间的时间戳
              //System.out.println(timestamp);
          } catch (ParseException e) {
              System.exit(-1);
          }
          //获取时间time之后的数据读取位置
          String timeCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SYSTEM_TIME, timestamp).getCursor();
          System.out.println("get cursor successful");
      } catch (InvalidParameterException e) {
          System.out.println("invalid parameter, please check your parameter");
          System.exit(1);
      } catch (AuthorizationFailureException e) {
          System.out.println("AK error, please check your accessId and accessKey");
          System.exit(1);
      } catch (ResourceNotFoundException e) {
          System.out.println("project or topic or shard not found");
          System.exit(1);
      } catch (SeekOutOfRangeException e) {
          System.out.println("offset invalid or has expired");
          System.exit(1);
      } catch (DatahubClientException e) {
          System.out.println("other error");
          System.out.println(e);
          System.exit(1);
      }
    }

步骤二:读取数据接口:

说明

GetRecordsResult getRecords(String projectName, String topicName, String shardId, String cursor, int limit);GetRecordsResult getRecords(String projectName, String topicName, String shardId, RecordSchema schema, String cursor, int limit);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • shardId The id of the shard.

    • schema If you read TUPLE records, you need this parameter.

    • cursor The start cursor used to read data.

    • limit Max record size to read.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

    • ShardSealedException

    • LimitExceededException

  • 示例

1). 读取Tuple topic数据

public static void example() {
     //每次最多读取数据量
     int recordLimit = 1000;
     String shardId = "7";
     // 获取cursor, 这里获取有效数据中时间最久远的record游标
     // 注: 正常情况下,getCursor只需在初始化时获取一次,然后使用getRecords的nextCursor进行下一次读取
     String cursor = "";
     try {
         cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
     } catch (InvalidParameterException e) {
         System.out.println("invalid parameter, please check your parameter");
         System.exit(1);
     } catch (AuthorizationFailureException e) {
         System.out.println("AK error, please check your accessId and accessKey");
         System.exit(1);
     } catch (ResourceNotFoundException e) {
         System.out.println("project or topic or shard not found");
         System.exit(1);
     } catch (SeekOutOfRangeException e) {
         System.out.println("offset invalid or has expired");
         System.exit(1);
     } catch (DatahubClientException e) {
         System.out.println("other error");
         System.out.println(e);
         System.exit(1);
     }
     while (true) {
         try {
             GetRecordsResult result = datahubClient.getRecords(Constant.projectName, Constant.topicName, shardId, recordSchema, cursor, recordLimit);
             if (result.getRecordCount() <= 0) {
                 // 无数据,sleep后读取
                 Thread.sleep(10000);
                 continue;
             }
             for (RecordEntry entry : result.getRecords()) {
                 TupleRecordData data = (TupleRecordData) entry.getRecordData();
                 System.out.println("field1:" + data.getField("field1") + "\t"
                         + "field2:" + data.getField("field2"));
             }
             // 拿到下一个游标
             cursor = result.getNextCursor();
         } catch (InvalidCursorException ex) {
             // 非法游标或游标已过期,建议重新定位后开始消费
             cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
         } catch (SeekOutOfRangeException e) {
             System.out.println("offset invalid");
             System.exit(1);
         } catch (ResourceNotFoundException e) {
             System.out.println("project or topic or shard not found");
             System.exit(1);
         } catch (ShardSealedException e) {
             System.out.println("shard is closed, all data has been read");
             System.exit(1);
         } catch (LimitExceededException e) {
             System.out.println("maybe exceed limit, retry");
         } catch (DatahubClientException e) {
             System.out.println("other error");
             System.out.println(e);
             System.exit(1);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
     }
 }

2). 读取Blob topic数据

public static void example() {
    //每次最多读取数据量
    int recordLimit = 1000;
    String shardId = "7";
    // 获取cursor, 这里获取有效数据中时间最久远的record游标
    // 注: 正常情况下,getCursor只需在初始化时获取一次,然后使用getRecords的nextCursor进行下一次读取
    String cursor = "";
    try {
        cursor = datahubClient.getCursor(Constant.projectName, Constant.blobTopicName, shardId, CursorType.OLDEST).getCursor();
    } catch (InvalidParameterException e) {
        System.out.println("invalid parameter, please check your parameter");
        System.exit(1);
    } catch (AuthorizationFailureException e) {
        System.out.println("AK error, please check your accessId and accessKey");
        System.exit(1);
    } catch (ResourceNotFoundException e) {
        System.out.println("project or topic or shard not found");
        System.exit(1);
    } catch (SeekOutOfRangeException e) {
        System.out.println("offset invalid or has expired");
        System.exit(1);
    } catch (DatahubClientException e) {
        System.out.println("other error");
        System.out.println(e);
        System.exit(1);
    }
    while (true) {
        try {
            GetRecordsResult result = datahubClient.getRecords(Constant.projectName, Constant.blobTopicName, shardId, recordSchema, cursor, recordLimit);
            if (result.getRecordCount() <= 0) {
                // 无数据,sleep后读取
                Thread.sleep(10000);
                continue;
            }
            /* 消费数据 */
            for (RecordEntry record: result.getRecords()){
                 BlobRecordData data = (BlobRecordData) record.getRecordData();
                 System.out.println(new String(data.getData()));
            }
            // 拿到下一个游标
            cursor = result.getNextCursor();
        } catch (InvalidCursorException ex) {
            // 非法游标或游标已过期,建议重新定位后开始消费
            cursor = datahubClient.getCursor(Constant.projectName, Constant.blobTopicName, shardId, CursorType.OLDEST).getCursor();
        } catch (SeekOutOfRangeException e) {
            System.out.println("offset invalid");
            System.exit(1);
        } catch (ResourceNotFoundException e) {
            System.out.println("project or topic or shard not found");
            System.exit(1);
        } catch (ShardSealedException e) {
            System.out.println("shard is closed, all data has been read");
            System.exit(1);
        } catch (LimitExceededException e) {
            System.out.println("maybe exceed limit, retry");
        } catch (DatahubClientException e) {
            System.out.println("other error");
            System.out.println(e);
            System.exit(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

使用协同消费

步骤一:初始化Consumer

配置

名称

描述

autoCommit

是否自动提交点位,默认为true。点位的提交会在后台线程按配置的时间间隔执行,自动提交的逻辑是当read接口被调用时,认为之前读的数据已经处理完毕。如果设置为false,那么每条record处理完必须ack,后台提交点位会保证该点位之前的record全部被ack。

offsetCommitTimeoutMs

点位的提交间隔,单位毫秒,默认30000ms,范围[3000, 300000]

sessionTimeoutMs

会话超时时间,心跳间隔会设为改置的2/3,超过时间没有心跳,认为客户端已停止,服务端会重新分配被占有shard,单位毫秒,默认60000ms,范围[60000, 180000]

fetchSize

单个shard异步读取记录的大小,会缓存2倍于该值的记录,少于2倍会触发异步任务去读取,默认1000,必须大于0

您还需要在工程中配置相应的Access Key和Secret Key,推荐使用环境变量的形式在配置文件中配置。

datahub.endpoint=<yourEndpoint>
datahub.accessId=<yourAccessKeyId>
datahub.accessKey=<yourAccessKeySecret>
重要

阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。

强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。

{
				@Value("${datahub.endpoint}")
				String endpoint ;
				@Value("${datahub.accessId}")
				String accessId;
				@Value("${datahub.accessKey}")
				String accessKey;
        // Endpoint以Region: 华东1为例,其他Region请按实际情况填写
        String projectName = "<YourProjectName>";
        String topicName = "<YourTopicName>";
        String subId = "<YourSubscriptionId>";
        /**
         * 1.使用协同消费,使用点位服务
         *使用协同消费:
             如果有两台机器使用同一个subid来消费topic(有5个shard)中的数据,不需要手动指定哪台机器消费哪几个shard, 
            服务器端会自动分配,当有第三台机器加入了后也会做balance
         *
         * 使用点位服务:
             消费的时候,会根据服务端subid的点位来读(如果是新建的订阅,还没有点位就从头读),如果要指定从哪个时间点开始读,可以在页面上重置subid的点位
         * 
         * */
        ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
        Consumer consumer = new Consumer(projectName, topicName, subId, config);



        /**
         * 2. 不使用协同消费,使用点位服务,提供subId和Consumer读取的shard列表
         *不使用协同消费:
             如果有两台机器使用同一个subid来消费topic(有5个shard)中的数据,那需要手动指定哪台机器消费哪几个shard,
            (客户端机器A,消费0,1,2号shard;客户端机器B,消费3,4号shard)当有第三台机器加入了后需要自己重新指定消费策略
         * 
         * */

        //客户端A消费shardid为0,1,2的示例
        List<String> shardlists = Arrays.asList("0", "1", "2");
        ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
        Consumer consumer = new Consumer(projectName, topicName, subId, shardlists, config);


        //客户端B消费shardid为3,4的示例
//        List<String> shardlists = Arrays.asList("3", "4");
//        ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
//        Consumer consumer = new Consumer(projectName, topicName, subId, shardlists, config);



        /**
         *3.不使用协同消费,不使用点位服务
         *不使用点位服务:
             就是自己需要找个存储(db/redis等)来记录自己哪个shard消费到什么时间/Sequence,每次读的时候都要根据自己记录的点位来初始化
         * 
         * */
        Map<String, Offset> offsetMap = new HashMap<>();
// 提供sequence和timestamp,若sequence超出范围则使用timestamp获取Cursor
        offsetMap.put("0", new Offset(100, 1548573440756L));
// 只提供sequence,按照sequence获取Cursor
        offsetMap.put("1", new Offset().setSequence(1));
// 只提供timestamp,按照timestamp获取Cursor
        offsetMap.put("2", new Offset().setTimestamp(1548573440756L));
        ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
        Consumer consumer = new Consumer(projectName, topicName, subId, offsetMap, config);
    }

步骤二:协同代码示例

import com.aliyun.datahub.client.exception.AuthorizationFailureException;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.exception.InvalidParameterException;
import com.aliyun.datahub.client.exception.NoPermissionException;
import com.aliyun.datahub.client.exception.SubscriptionOfflineException;
import com.aliyun.datahub.client.exception.SubscriptionOffsetResetException;
import com.aliyun.datahub.client.exception.SubscriptionSessionInvalidException;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.model.TupleRecordData;
import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import com.aliyun.datahub.clientlibrary.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

public class DatahubReader {
    private static final Logger LOG = LoggerFactory.getLogger(DatahubReader.class);

    private static void sleep(long milliSeconds) {
        try {
            TimeUnit.MILLISECONDS.sleep(milliSeconds);
        } catch (InterruptedException e) {
            // TODO:自行处理异常
        }
    }

    public static Consumer createConsumer(ConsumerConfig config, String project, String topic, String subId)
    {
        return new Consumer(project, topic, subId, config);
    }

    public static void main(String[] args) {
        String projectName = "<YourProjectName>";
        String topicName = "<YourTopicName>";
        String subId = "<YourSubscriptionId>";

        ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
        Consumer consumer = createConsumer(config, projectName, topicName, subId);

        int maxRetry = 3;
        boolean stop = false;
        try {
            while (!stop) {
                try {
                    while (true) {
                        // 协同消费刚初始化,需要等待服务端分配shard,约40秒,期间只能返回null
                        // 自动提交模式,每次调用read,认为之前读的数据都已处理完成,自动ack
                        RecordEntry record = consumer.read(maxRetry);

                        // 处理数据
                        if (record != null) {
                            TupleRecordData data = (TupleRecordData) record.getRecordData();
                            // 根据自己的schema来处理数据,此处打印第一列的内容
                            LOG.info("field1: {}", data.getField(0));

                            // 根据列名取数据
                            // LOG.info("field2: {}", data.getField("field2"));

                            // 非自动提交模式,每条record处理完后都需要ack
                            // 自动提交模式,ack不会做任何操作
                            // 1.1.7版本及以上
                            record.getKey().ack();
                        } else {
                            LOG.info("read null");
                        }
                    }
                } catch (SubscriptionOffsetResetException e) {
                    // 点位被重置,重新初始化consumer
                    try {
                        consumer.close();
                        consumer = createConsumer(config, projectName, topicName, subId);
                    } catch (DatahubClientException e1) {
                        // 初始化失败,重试或直接抛异常
                        LOG.error("create consumer failed", e);
                        throw e;
                    }
                } catch (InvalidParameterException |
                        SubscriptionOfflineException |
                        SubscriptionSessionInvalidException |
                        AuthorizationFailureException |
                        NoPermissionException e) {
                    // 请求参数非法
                    // 订阅被下线
                    // 订阅下相同shard被其他客户端占用
                    // 签名不正确
                    // 没有权限
                    LOG.error("read failed", e);
                    throw e;
                } catch (DatahubClientException e) {
                    // 基类异常,包含网络问题等,可以选择重试
                    LOG.error("read failed, retry", e);
                    sleep(1000);
                }
            }
        } catch (Throwable e) {
            LOG.error("read failed", e);
        } finally {
            // 确保资源正确释放
            // 会提交已ack的点位
            consumer.close();
        }
    }
}

写数据

使用SDK

服务器2.12之后版本开始支持PutRecordsByShardResult接口,之前版本putRecords接口,使用putRecordsByShard接口时需指定写入的shard,否则会默认写入第一个处于ACTIVE状态的shard。两个方法中传入参数records是一个List对象,每个元素为一个record,但是必须为相同类型的record,即Tuple类型或者Blob类型。DataHub目前支持按照Shard写入 (服务端 >= 2.12版本) 以及混合写入,分别对应putRecordsByShardputRecords两个接口。针对第二个接口,用户需要判断PutRecordsResult结果以确认数据是否写入成功;而putRecordsByShard接口则直接通过异常告知用户是否成功。如果服务端支持,建议用户使用putRecordsByShard接口。

说明

PutRecordsResult putRecords(String projectName, String topicName, List records);PutRecordsByShardResult putRecordsByShard(String projectName, String topicName, String shardId, List records);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • shardId The id of the shard.

    • records Records list to written.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

    • ShardSealedException

    • LimitExceededException

1). 写入Tuple topic

// 写入Tuple型数据
public static void tupleExample() {
    String shardId = "9";
    // 获取schema
    recordSchema = datahubClient.getTopic(Constant.projectName, Constant.topicName).getRecordSchema();
    // 生成十条数据
    List<RecordEntry> recordEntries = new ArrayList<>();
    for (int i = 0; i < 10; ++i) {
        RecordEntry recordEntry = new RecordEntry();
        // 对每条数据设置额外属性,例如ip 机器名等。可以不设置额外属性,不影响数据写入
        recordEntry.addAttribute("key1", "value1");
        TupleRecordData data = new TupleRecordData(recordSchema);
        data.setField("field1", "HelloWorld");
        data.setField("field2", 1234567);
        recordEntry.setRecordData(data);
        recordEntry.setShardId(shardId);
        recordEntries.add(recordEntry);
    }
    try {
        // 服务端从2.12版本开始支持,之前版本请使用putRecords接口
        //datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
        datahubClient.putRecords(Constant.projectName, Constant.topicName, recordEntries);
        System.out.println("write data successful");
    } catch (InvalidParameterException e) {
        System.out.println("invalid parameter, please check your parameter");
        System.exit(1);
    } catch (AuthorizationFailureException e) {
        System.out.println("AK error, please check your accessId and accessKey");
        System.exit(1);
    } catch (ResourceNotFoundException e) {
        System.out.println("project or topic or shard not found");
        System.exit(1);
    } catch (ShardSealedException e) {
        System.out.println("shard status is CLOSED, can not write");
        System.exit(1);
    } catch (DatahubClientException e) {
        System.out.println("other error");
        System.out.println(e);
        System.exit(1);
    }
}

2). 写入Blob topic

// 写入blob型数据
public static void blobExample() {
    // 生成十条数据
    List<RecordEntry> recordEntries = new ArrayList<>();
    String shardId = "4";
    for (int i = 0; i < 10; ++i) {
        RecordEntry recordEntry = new RecordEntry();
        // 对每条数据设置额外属性
        recordEntry.addAttribute("key1", "value1");
        BlobRecordData data = new BlobRecordData("123456".getBytes(Charsets.UTF_8));
        recordEntry.setRecordData(data);
        recordEntry.setShardId(shardId);
        recordEntries.add(recordEntry);
        recordEntry.setShardId("0");
    }
    while (true) {
        try {
            // 服务端从2.12版本开始支持,之前版本请使用putRecords接口
            //datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
            datahubClient.putRecords(Constant.projectName, Constant.topicName, recordEntries);
            System.out.println("write data  successful");
            break;
        } catch (InvalidParameterException e) {
            System.out.println("invalid parameter, please check your parameter");
            System.exit(1);
        } catch (AuthorizationFailureException e) {
            System.out.println("AK error, please check your accessId and accessKey");
            System.exit(1);
        } catch (ResourceNotFoundException e) {
            System.out.println("project or topic or shard not found");
            System.exit(1);
        } catch (ShardSealedException e) {
            System.out.println("shard status is CLOSED, can not write");
            System.exit(1);
        } catch (LimitExceededException e) {
            System.out.println("maybe qps exceed limit, retry");
        } catch (DatahubClientException e) {
            System.out.println("other error");
            System.out.println(e);
            System.exit(1);
        }
    }
}

使用Producer

步骤一:引入依赖

<dependency>
    <groupId>com.aliyun.datahub</groupId>
    <artifactId>aliyun-sdk-datahub</artifactId>
    <version>2.19.0-public</version>
</dependency>
<dependency>
      <groupId>com.aliyun.datahub</groupId>
      <artifactId>datahub-client-library</artifactId>
      <version>1.1.12-public</version>
</dependency>

步骤二:代码示例

import com.aliyun.datahub.client.exception.AuthorizationFailureException;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.exception.InvalidParameterException;
import com.aliyun.datahub.client.exception.MalformedRecordException;
import com.aliyun.datahub.client.exception.NoPermissionException;
import com.aliyun.datahub.client.exception.ShardNotFoundException;
import com.aliyun.datahub.client.model.Field;
import com.aliyun.datahub.client.model.FieldType;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.model.RecordSchema;
import com.aliyun.datahub.client.model.TupleRecordData;
import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
import com.aliyun.datahub.clientlibrary.producer.Producer;
import com.aliyun.datahub.exception.ResourceNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class DatahubWriter {
    private static final Logger LOG = LoggerFactory.getLogger(DatahubWriter.class);

    private static void sleep(long milliSeconds) {
        try {
            TimeUnit.MILLISECONDS.sleep(milliSeconds);
        } catch (InterruptedException e) {
            // TODO:自行处理异常
        }
    }

    private static List<RecordEntry> genRecords(RecordSchema schema) {
        List<RecordEntry> recordEntries = new ArrayList<>();
        for (int cnt = 0; cnt < 10; ++cnt) {
            RecordEntry entry = new RecordEntry();

            entry.addAttribute("key1", "value1");
            entry.addAttribute("key2", "value2");

            TupleRecordData data = new TupleRecordData(schema);
            data.setField("field1", "testValue");
            data.setField("field2", 1);

            entry.setRecordData(data);
            recordEntries.add(entry);
        }
        return recordEntries;
    }

    private static void sendRecords(Producer producer, List<RecordEntry> recordEntries) {
        int maxRetry = 3;
        while (true) {
            try {
                // 自动选择shard写入
                producer.send(recordEntries, maxRetry);

                // 指定写入shard "0"
                // producer.send(recordEntries, "0", maxRetry);
                LOG.error("send records: {}", recordEntries.size());
                break;
            } catch (MalformedRecordException e) {
                // record 格式非法,根据业务场景选择忽略或直接抛异常
                LOG.error("write fail", e);
                throw e;
            } catch (InvalidParameterException |
                    AuthorizationFailureException |
                    NoPermissionException e) {
                // 请求参数非法
                // 签名不正确
                // 没有权限
                LOG.error("write fail", e);
                throw e;
            } catch (ShardNotFoundException e) {
                // shard 不存在, 如果不是写入自己指定的shard,可以不用处理
                LOG.error("write fail", e);
                sleep(1000);
            } catch (ResourceNotFoundException e) {
                // project, topic 或 shard 不存在
                LOG.error("write fail", e);
                throw e;
            } catch (DatahubClientException e) {
                // 基类异常,包含网络问题等,可以选择重试
                LOG.error("write fail", e);
                sleep(1000);
            }
        }
    }

    public static void main(String[] args) {
        String projectName = "<YourProjectName>";
        String topicName = "<YourTopicName>";

        RecordSchema schema =  datahubClient.getTopic(projectName, topicName).getRecordSchema();



        ProducerConfig config = new ProducerConfig(endpoint, accessId, accessKey);
        Producer producer = new Producer(projectName, topicName, config);

        // 根据场景控制循环
        boolean stop = false;
        try {
            while (!stop) {
                List<RecordEntry> recordEntries = genRecords(schema);
                sendRecords(producer, recordEntries);
            }
        } finally {
            // 确保资源正确释放
            producer.close();
        }
    }
}

多方式写入

在DataHub 2.12之前版本,DataHub仅支持putRecords接口,在RecordEntry类中包含shardIdpartitionKeyhashKey三个属性,用户通过指定对应属性的值决定数据写入到哪个Shard中注意:开启Shard extend模式无法按Hashkey和PartitionKey方式写入

说明

2.12及之后版本,建议用户使用putRecordsByShard接口,避免服务端partition造成的性能损耗

1). 按照ShardID写入推荐方式,使用示例如下:

RecordEntry entry = new RecordEntry();
entry.setShardId("0");

2). 按HashKey写入指定一个128 bit的MD5值。 按照HashKey写入,根据Shard的Shard操作决定数据写入的Shard使用示例:

RecordEntry entry = new RecordEntry();
entry.setHashKey("7FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD");

2). 按PartitionKey写入指定一个String类型参数作为PartitionKey,系统根据该String的MD5值以及Shard的Shard操作决定写入的Shard使用示例:

RecordEntry entry = new RecordEntry();
entry.setPartitionKey("TestPartitionKey");

注意事项

Consumer和Producer都不支持多线程访问,如果需要使用多线程,则在每个线程都使用不同的Consumer或Producer对象。

  • 本页导读 (0)
文档反馈