Java SDK

安装SDK

在Maven项目中添加依赖:

<dependency>
  <groupId>com.aliyun.datahub</groupId>
  <artifactId>aliyun-sdk-datahub</artifactId>
  <version>2.25.1</version>
</dependency>
  1. JDK:使用1.8及以上版本。

  2. 如果API开启了密钥认证,您还需要在工程中配置相应的Access Key和Secret Key,请参考下文身份验证

身份验证

背景信息

AccessKey(简称AK)是阿里云提供给阿里云用户的访问密钥,用于访问阿里云OpenAPI时的身份验证。AccessKey包括AccessKey ID和AccessKey Secret,需妥善保管。AK如果泄露,会威胁该账号下所有资源的安全。访问阿里云OpenAPI时,如果在代码中硬编码明文AK,容易因代码仓库权限管理不当造成AK泄露。

Alibaba Cloud Credentials是阿里云为阿里云开发者用户提供的身份凭证管理工具。配置了Credentials默认凭据链后,访问阿里云OpenAPI时,您无需在代码中硬编码明文AK,可有效保证您账号下云资源的安全。

前提条件

  • 已获取RAM用户账号的AccessKey ID和AccessKey Secret。相关操作,请参见查看RAM用户的AccessKey信息

  • 重要

    阿里云账号(即主账号)的AccessKey泄露会威胁该账号下所有资源的安全。为保证账号安全,强烈建议您为RAM用户创建AccessKey,非必要情况下请勿为阿里云主账号创建AccessKey。

    RAM用户的AccessKey Secret只能在创建AccessKey时显示,创建完成后不支持查看。请在创建好AccessKey后,及时并妥善保存AccessKey Secret。

  • 已安装阿里云SDK Credentials工具。

    Maven安装方式(推荐使用Credentials最新版本):

    <dependency>
     <groupId>com.aliyun</groupId>
     <artifactId>credentials-java</artifactId>
     <version>0.2.11</version>
    </dependency>
  • JDK版本为1.7及以上。

配置方案

本文示例的是通过配置环境变量方式,更多方式请访问配置环境变量

重要

使用配置文件的方案时,请确保您系统中不存在环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET。否则,配置文件将不生效。

阿里云SDK支持通过定义ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET环境变量来创建默认的访问凭证。调用接口时,程序直接访问凭证,读取您的访问密钥(即AccessKey)并自动完成鉴权。

配置方法

配置环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET

  • Linux和macOS系统配置方法

    执行以下命令:

    export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id>
    export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>

    <access_key_id>需替换为已准备好的AccessKey ID,<access_key_secret>替换为AccessKey Secret。

  • Windows系统配置方法

    1. 新建环境变量文件,添加环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET,并写入已准备好的AccessKey ID和AccessKey Secret。

    2. 重启Windows系统。

代码示例

    Client credentialClient = new Client();
    String accessKeyId = credentialClient.getAccessKeyId();
    String accessKeySecret = credentialClient.getAccessKeySecret();

注意事项

  1. 对于SDK版本升级由2.9版本升级上来的用户:必须要注意setTimestampInms接口替换后,显示将值乘以1000。

  2. 接口调用正常情况下,SDK只有putRecords / putRecordsByShardgetRecords接口是需要进行频繁调用进行数据读写的,其他的接口比如getTopicgetCursorlistShard等接口一般只有初始化时需要调用。

  3. Client初始化在项目工程中,可以有一个或者多个DatahubClient实例,DatahubClient实例可以并发使用。

  4. 不同包中的同名类遇到相同类名不同包路径的情况,2.12版本使用的均为com.aliyun.datahub.client包中的类,其他包中的类是为了兼容版本低于2.12的使用方式。例如:

    // 2.12版本
    com.aliyun.datahub.client.model.RecordSchema
    // 使用2.12之前版本SDK编写的代码,如果在升级SDK后无需修改代码,则继续使用此类型
    com.aliyun.datahub.common.data.RecordSchema
  5. 出现错误Parse body failed, Offset: 0,尝试将enableBinary参数设置为false

SDK实践指南

初始化

用户可以使用阿里云认证账号访问DataHub,并需要提供云账号AccessId和AccessKey,同时需要提供访问DataHub的服务地址。以下代码用于使用域名列表新建DataHubClient:

sdk 2.25.1及以上版本(推荐)

//使用新的Batch传输协议创建DataHubClient实例
DatahubConfig.Protocol protocol = DatahubConfig.Protocol.BATCH;
DatahubClient datahubClient = DatahubClientBuilder.newBuilder().setDatahubConfig(
  		         //Protocol可不设置,不设置默认使用PROTOBUF传输协议
                new DatahubConfig(endpoint, new AliyunAccount(accessId, accessKey), protocol)
        ).setHttpConfig(new HttpConfig().setCompressType(CompressType.ZSTD)).build();


配置描述:

  1. DatahubConfig

名称

描述

endpoint

DataHub服务地址。

account

阿里云账号信息。

enableBinary

是否采用二进制传输,服务端从2.12版本开始支持,之前版本需设置为false,专有云使用时出现错误Parse body failed, Offset: 0,尝试设置为false。

  1. HttpConfig

名称

描述

readTimeout

Socket读写超时时间,默认10s。

connTimeout

TCP连接超时时间,默认10s。

maxRetryCount

请求失败重试,默认1,不建议修改,重试由上层业务层处理。

debugRequest

是否打印请求日志信息,默认false。

compressType

数据传输压缩方式,默认lz4压缩,支持lz4, deflate,ztsd压缩。

proxyUri

代理服务器主机地址。

proxyUsername

代理服务器验证的用户名。

proxyPassword

代理服务器验证的密码。

  1. SDK统计信息

SDK支持针对put/get等请求进行QPS等统计,开启方式:

ClientMetrics.startMetrics();

metric统计信息默认打印到日志文件中,需要配置slf4j,其中metric package为:com.aliyun.datahub.client.metrics

写入数据到DataHub

以Tuple类型Topic为例。

  public void writeTupleTopic(int maxRetry) {
    String shardId = "9";
    // 生成十条数据
    List<RecordEntry> recordEntries = new ArrayList<>();
    for (int i = 0; i < 10; ++i) {
      RecordEntry recordEntry = new RecordEntry();
      // 对每条数据设置额外属性
      recordEntry.addAttribute("key1", "value11");
      TupleRecordData data = new TupleRecordData(this.recordSchema);
      data.setField("field1", "Hello World");
      data.setField("field2", 1234567);
      recordEntry.setRecordData(data);
      recordEntry.setShardId(shardId);
      recordEntries.add(recordEntry);
    }

    int retryNum = 0;
    while (retryNum < maxRetry) {
      try {
        // 服务端从2.12版本开始支持,之前版本请使用putRecords接口
        //datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
        PutRecordsResult putRecordsResult = this.datahubClient.putRecords(Constant.projectName,
            Constant.topicName, recordEntries);
        System.out.println("write tuple data successful");
        System.out.println(putRecordsResult.getPutErrorEntries());
        break;
      } catch (InvalidParameterException e) {
        // invalid parameter
        e.printStackTrace();
        throw e;
      } catch (AuthorizationFailureException e) {
        // AK error
        e.printStackTrace();
        throw e;
      } catch (ResourceNotFoundException e) {
        // project or topic not found
        e.printStackTrace();
        throw e;
      } catch (ShardSealedException e) {
        // shard status is CLOSED, read only
        e.printStackTrace();
        throw e;
      } catch (LimitExceededException e) {
        // limit exceed, retry
        e.printStackTrace();
        retryNum++;
      } catch (DatahubClientException e) {
        // other error
        e.printStackTrace();
        retryNum++;
      }
    }
  }

创建订阅消费DataHub数据

//点位消费示例,并在消费过程中进行点位的提交
public static void example() {
  String shardId = "0";
  List<String> shardIds = Arrays.asList("0", "1");
  OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(Constant.projectName, Constant.topicName, subId, shardIds);
  SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
  // 1、获取当前点位的cursor,如果当前点位已过期则获取生命周期内第一条record的cursor,未消费同样获取生命周期内第一条record的cursor
  String cursor = null;
  //sequence < 0说明未消费
  if (subscriptionOffset.getSequence() < 0) {
      // 获取生命周期内第一条record的cursor
      cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
  } else {
      // 获取下一条记录的Cursor
      long nextSequence = subscriptionOffset.getSequence() + 1;
      try {
          //按照SEQUENCE getCursor可能报SeekOutOfRange错误,表示当前cursor的数据已过期
          cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
      } catch (SeekOutOfRangeException e) {
          // 获取生命周期内第一条record的cursor
          cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
      }
  }
  // 2、读取并保存点位,这里以读取Tuple数据为例,并且每1000条记录保存一次点位
  long recordCount = 0L;
  // 每次读取10条record
  int fetchNum = 10;
  while (true) {
      try {
          GetRecordsResult getRecordsResult = datahubClient.getRecords(Constant.projectName, Constant.topicName, shardId, schema, cursor, fetchNum);
          if (getRecordsResult.getRecordCount() <= 0) {
              // 无数据,sleep后读取
              Thread.sleep(1000);
              continue;
          }
          for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
              //消费数据
              TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
              System.out.println("field1:" + data.getField("field1") + "\t"
                      + "field2:" + data.getField("field2"));
              // 处理数据完成后,设置点位
              ++recordCount;
              subscriptionOffset.setSequence(recordEntry.getSequence());
              subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
              if (recordCount % 1000 == 0) {
        //提交点位
                  Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
                  offsetMap.put(shardId, subscriptionOffset);
                  datahubClient.commitSubscriptionOffset(Constant.projectName, Constant.topicName, subId, offsetMap);
                  System.out.println("commit offset successful");
              }
          }
          cursor = getRecordsResult.getNextCursor();
      } catch (SubscriptionOfflineException | SubscriptionSessionInvalidException e) {
          // 退出. Offline: 订阅下线; SubscriptionSessionInvalid: 表示订阅被其他客户端同时消费
          break;
      } catch (SubscriptionOffsetResetException e) {
          // 表示点位被重置,重新获取SubscriptionOffset信息,这里以Sequence重置为例
          // 如果以Timestamp重置,需要通过CursorType.SYSTEM_TIME获取cursor
          subscriptionOffset = datahubClient.getSubscriptionOffset(Constant.projectName, Constant.topicName, subId, shardIds).getOffsets().get(shardId);
          long nextSequence = subscriptionOffset.getSequence() + 1;
          cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
      } catch (DatahubClientException e) {
          // TODO: 针对不同异常决定是否退出
      } catch (Exception e) {
          break;
      }
  }
}

异常类型

Java SDK(>= 2.12)对datahub的异常类型进行了整理,用户try catch机制对异常类型进行捕获并进行相应处理。

其中异常类型中,除DatahubClientException和LimitExceededException之外,其余均属于不可重试错误,而DatahubClientException中包含部分可重试错误,例如server busy,server unavailable等,因此建议遇到DatahubClientException和LimitExceededException时,可以在代码逻辑中添加重试逻辑,但应严格限制重试次数。

以下为使用2.12以上版本的各类异常,包路径为com.aliyun.datahub.client.exception

类名

错误码

描述

InvalidParameterException

InvalidParameter, InvalidCursor

非法参数。

ResourceNotFoundException

ResourceNotFound, NoSuchProject, NoSuchTopic, NoSuchShard, NoSuchSubscription, NoSuchConnector, NoSuchMeteringInfo

访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。

ResourceAlreadyExistException

ResourceAlreadyExist, ProjectAlreadyExist, TopicAlreadyExist, ConnectorAlreadyExist

资源已存在(创建时如果资源已存在,就会抛出这个异常。

SeekOutOfRangeException

SeekOutOfRange

getCursor时,给的sequence不在有效范围内(通常数据已过期),或给的timestamp大于当前时间。

AuthorizationFailureException

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

NoPermissionException

NoPermission, OperationDenied

没有权限,通常是RAM配置不正确,或没有正确授权子账号。

ShardSealedException

InvalidShardOperation

shard 处于CLOSED状态可读不可写,继续往CLOSED的shard 写数据,或读到最后一条数据后继续读取,会抛出该异常。

LimitExceededException

LimitExceeded

接口使用超限,参考限制描述

SubscriptionOfflineException

SubscriptionOffline

订阅处于下线状态不可用。

SubscriptionSessionInvalidException

OffsetSessionChanged, OffsetSessionClosed

订阅会话异常,使用订阅时会建立一个session,用于提交点位,如果有其他客户端使用该订阅,会得到该异常。

SubscriptionOffsetResetException

OffsetReseted

订阅点位被重置。

MalformedRecordException

MalformedRecord,ShardNotReady

非法的 Record 格式,可能的情况有:schema 不正确、包含非utf-8字符、客户端使用pb而服务端不支持、等等。

DatahubClientException

其他所有,并且是所有异常的基类

如排除以上异常情况,通常重试即可,但应限制重试次数。

API说明

Project操作

项目(Project)是DataHub数据的基本组织单元,下面包含多个Topic。值得注意的是,DataHub的项目空间与MaxCompute的项目空间是相互独立的。用户在MaxCompute中创建的项目不能复用于DataHub,需要独立创建。

创建 Project

说明

CreateProjectResult createProject(String projectName, String comment);

创建Project需要提供Project的名字和描述,Project的名字长度限制为[3,32],必须以英文字母开头,仅允许英文字母、数字及“_”,大小写不敏感。

  • 参数

    • projectName project name

    • comment project comment

  • Exception

    • DatahubClientException

  • 示例

public static void createProject(String projectName,String projectComment) {
  try {
      datahubClient.createProject(projectName, projectComment);
      System.out.println("create project successful");
  } catch (DatahubClientException e) {
      System.out.println(e.getErrorMessage());
  }
}

删除 Project

说明

DeleteProjectResult deleteProject(String projectName);删除Project时必须保证Project中已经没有Topic。 参数 projectName project name。

  • Exception

    • DatahubClientException

    • NoPermissionException, project内仍有topic时则会抛出的异常

  • 示例

public static void deleteProject(String projectName) {
  try {
      datahubClient.deleteProject(projectName);
      System.out.println("delete project successful");
  } catch (DatahubClientException e) {
      System.out.println(e.getErrorMessage());
  }
}

更新 Project

说明

UpdateProjectResult updateProject(String projectName, String comment);更新project信息,目前只支持更新comment。 参数 projectName project name comment project comment。

  • Exception

    • DatahubClientException

  • 示例

public static void updateProject(String projectName,String newComment) {
    try {
        datahubClient.updateProject(projectName, newComment);
        System.out.println("update project successful");
    } catch (DatahubClientException e) {
        System.out.println("other error");
    }
}

列出 Project

说明

ListProjectResult listProject();listProject返回的结果是ListProjectResult对象,其中包含成员projectNames,是一个包含project名字的list。

  • 参数

  • Exception

    • DatahubClientException

  • 示例

public static void listProject() {
    try {
        ListProjectResult listProjectResult = datahubClient.listProject();
        if (listProjectResult.getProjectNames().size() > 0) {
            for (String pName : listProjectResult.getProjectNames()) {
                System.out.println(pName);
            }
        }
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
    }
}

查询 Project

说明

GetProjectResult getProject(String projectName);可以查看当前Project的一些属性信息。

  • Exception

    • DatahubClientException

  • 示例

public static void getProject(String projectName) {
    try {
        GetProjectResult getProjectResult = datahubClient.getProject(projectName );
        System.out.println(getProjectResult.getCreateTime() + "\t"
                + getProjectResult.getLastModifyTime() + "\t"
                + getProjectResult.getComment());
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
    }
}

Topic 操作

Topic是 DataHub 订阅和发布的最小单位,用户可以用Topic来表示一类或者一种流数据。

目前支持Tuple与Blob两种类型:

  1. Blob类型Topic支持写入一块二进制数据作为一个Record。

  2. Tuple类型的Topic支持类似于数据库的记录的数据,每条记录包含多个列,需要指定Record Schema,因为网络传输中,数据都是以字符串的形式发送,需要schema来转换成对应的类型。

目前支持以下几种数据类型:

类型

含义

值域

BIGINT

8字节有符号整型

-9223372036854775807 ~ 9223372036854775807

DOUBLE

8字节双精度浮点数

-1.0 _10^308 ~ 1.0 _10^308

BOOLEAN

布尔类型

True/False或true/false或0/1

TIMESTAMP

时间戳类型

表示到微秒的时间戳类型

STRING

字符串,只支持UTF-8编码

单个STRING列最长允许2MB

TINYINT

单字节整型

-128 ~127

SMALLINT

双字节整型

-32768 ~ 32767

INTEGER

4字节整型

-2147483648 ~ 2147483647

FLOAT

4字节单精度浮点数

-3.40292347_10^38 ~ 3.40292347_10^38

DataHub 中的 TINYINT、SMALLINT、INTEGER、FLOAT类型从java sdk 2.16.1-public开始支持。

创建 Tuple Topic

说明

CreateTopicResult createTopic(String projectName, String topicName, int shardCount, int lifeCycle, RecordType recordType, RecordSchema recordSchema, String comment);

  • 参数

    • projectName The name of the project in which you create.

    • topicName The name of the topic.

    • shardCount The initial shard count of the topic.

    • lifeCycle The expire time of the data (Unit: DAY). The data written before that time is not accessible.

    • recordType The type of the record you want to write. Now support TUPLE and BLOB.

    • recordSchema The records schema of this topic.

    • comment The comment of the topic.

  • Exception

    • DatahubClientException

  • 示例

 public static void createTupleTopic(String projectName, String topicName, int shardCount, int lifeCycle,  String topicComment) {
   RecordSchema schema = new RecordSchema();
   schema.addField(new Field("bigint_field", FieldType.BIGINT));
   schema.addField(new Field("double_field", FieldType.DOUBLE));
   schema.addField(new Field("boolean_field", FieldType.BOOLEAN));
   schema.addField(new Field("timestamp_field", FieldType.TIMESTAMP));
   schema.addField(new Field("tinyint_field", FieldType.TINYINT));
   schema.addField(new Field("smallint_field", FieldType.SMALLINT));
   schema.addField(new Field("integer_field", FieldType.INTEGER));
   schema.addField(new Field("floar_field", FieldType.FLOAT));
   schema.addField(new Field("decimal_field", FieldType.DECIMAL));
   schema.addField(new Field("string_field", FieldType.STRING));
   try {
       datahubClient.createTopic(projectName,topicName, shardCount, lifeCycle, RecordType.TUPLE, schema, topicComment);
       System.out.println("create topic successful");
   } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
   }
 }

创建 Blob Topic

说明

CreateTopicResult createTopic(String projectName, String topicName, int shardCount, int lifeCycle, RecordType recordType, String comment);

  • 参数

    • projectName The name of the project in which you create.

    • topicName The name of the topic.

    • shardCount The initial shard count of the topic.

    • lifeCycle The expire time of the data (Unit: DAY). The data written before that time is not accessible.

    • recordType The type of the record you want to write. Now support TUPLE and BLOB.

    • comment The comment of the topic.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

    • ResourceAlreadyExistException

  • 示例

public static void createBlobTopic(String projectName, String topicName, int shardCount, int lifeCycle,  String topicComment) {
  try {
      datahubClient.createTopic(projectName, blobTopicName, shardCount, lifeCycle, RecordType.BLOB, topicComment);
      System.out.println("create topic successful");
  } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
  }

删除 Topic

删除topic之前要保证topic中没有subscription和connector,否则会报错NoPermission。

说明

DeleteTopicResult deleteTopic(String projectName, String topicName);

  • 参数

    • projectName The name of the project in which you delete.

    • topicName The name of the topic.

  • Exception

    • DatahubClientException

    • NoPermissionException, topic内存在subscription和connector会报此错误

  • 示例

public static void deleteTopic(String projectName, String topicName) {
  try {
      datahubClient.deleteTopic(projectName, topicName);
      System.out.println("delete topic successful");
  } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

  }
}

列出 Topic

说明

ListTopicResult listTopic(String projectName);

  • 参数

    • projectName The name of the project in which you list.

  • 示例

   public static void listTopic(String projectName ) {
      try {
          ListTopicResult listTopicResult = datahubClient.listTopic(projectName);
          if (listTopicResult.getTopicNames().size() > 0) {
              for (String tName : listTopicResult.getTopicNames()) {
                  System.out.println(tName);
              }
          }
      } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());          
      }
  }

更新 Topic

更新Topic信息,目前支持更新comment和lifeCycle。

说明

UpdateTopicResult updateTopic(String projectName, String topicName, int lifeCycle, String comment);

  • 参数

    • projectName The name of the project in which you list.

    • topicName The name of the topic.

    • comment The comment to modify.

    • lifeCycle the lifeCycle of the topic

  • Exception

    • DatahubClientException

  • 示例

   public static void updateTopic(String projectName, String topicName, int lifeCycle, String comment) {
        try {
            comment = "new topic comment";
             lifeCycle = 1;
            datahubClient.updateTopic(projectName, Constant.topicName,lifeCycle, comment);
            System.out.println("update topic successful");
            //查看更新后结果
            GetTopicResult getTopicResult = datahubClient.getTopic(projectName, topicName);
            System.out.println(getTopicResult.getComment());
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());
        }
    }

查询 Topic

说明

GetTopicResult getTopic(String projectName, String topicName);可以获取Topic的相关属性。

  • 参数

    • projectName The name of the project in which you get.

    • topicName The name of the topic.

  • Exception

    • DatahubClientException

  • 示例

   public static void getTopic(String projectName, String topicName) {
        try {
            GetTopicResult getTopicResult = datahubClient.getTopic(projectName, topicName);
            System.out.println(getTopicResult.getShardCount() + "\t"
                    + getTopicResult.getLifeCycle() + "\t"
                    + getTopicResult.getRecordType() + "\t"
                    + getTopicResult.getComment());
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());

        }
    }

Tuple Topic 新增 Field

新增Field既可以新增一列,也可以一次性插入多列

说明

AppendFieldResult appendField(String projectName, String topicName, Field field);

  • 参数

    • projectName The name of the project in which you get.

    • topicName The name of the topic.

    • fields The fields to append. All field value must allow null.

  • Exception

    • DatahubClientException

  • 示例

public static void appendNewField(String projectName,String topicName) {
    try {
        Field newField = new Field("newField", FieldType.STRING, true,"comment");
        datahubClient.appendField(projectName, topicName, newField);
        System.out.println("append field successful");
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}
说明

AppendFieldResult appendField(String projectName, String topicName, List fields);

  • 参数

    • projectName The name of the project in which you get.

    • topicName The name of the topic.

    • fields The fields to append. All field value must allow null.

  • Exception

    • DatahubClientException

  • 示例

    public static void appendNewField(String projectName,String topicName) {
        try {
            List<Field>  list = new ArrayList<>();
            Field newField1 = new Field("newField1", FieldType.STRING, true,"comment");
            list.add(newField1);
            datahubClient.appendField(projectName, topicName, list);
            System.out.println("append field successful");
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());

        }
    }

Shard操作

Shard表示对一个Topic进行数据传输的并发通道,每个Shard会有对应的ID。每个Shard会有多种状态:Opening - 启动中,Active - 启动完成可服务。每个Shard启用以后会占用一定的服务端资源,建议按需申请Shard数量。

列出 Shard

说明

ListShardResult listShard(String projectName, String topicName);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

  • Exception

    • DatahubClientException

  • 示例

public static void listShard(String projectName, String topicName) {
  try {
      ListShardResult listShardResult = datahubClient.listShard(projectName, topicName);
      if (listShardResult.getShards().size() > 0) {
          for (ShardEntry entry : listShardResult.getShards()) {
              System.out.println(entry.getShardId() + "\t"
                      + entry.getState() + "\t"
                      + entry.getLeftShardId() + "\t"
                      + entry.getRightShardId());
          }
      }
  } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

  }
}

分裂 Shard

指定一个Topic中的一个状态为ACTIVE的Shard进行分裂,生成两个Shard,新Shard状态为ACTIVE,原Shard状态会变为CLOSED。CLOSED状态的shard只可以读,不可以写,可以采用默认splitKey进行分裂,也可以指定splitKey进行分裂。

说明

SplitShardResult splitShard(String projectName, String topicName, String shardId); SplitShardResult splitShard(String projectName, String topicName, String shardId, String splitKey);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • shardId The shard which to split.

    • splitKey The split key which is used to split shard.

  • Exception

    • DatahubClientException

  • 示例

public static void splitShard(String projectName, String topicName, String shardId) {
    try {
        shardId = "0";
        SplitShardResult splitShardResult = datahubClient.splitShard(projectName, topicName, shardId);
        for (ShardEntry entry : splitShardResult.getNewShards()) {
            System.out.println(entry.getShardId());
        }
    }  catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}

合并 Shard

合并一个Topic中两个处于ACTIVE状态的Shard,要求两个Shard的位置必须相邻。每个Shard相邻的两个Shard可以参考listShard的结果。

说明

MergeShardResult mergeShard(String projectName, String topicName, String shardId, String adjacentShardId);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • shardId The shard which will be merged.

    • adjacentShardId The adjacent shard of the specified shard.

  • Exception

    • DatahubClientException

  • 示例

public static void mergeShard() {
    try {
        String shardId = "7";
        //adjacentShardId位置必须和shardId相邻,shard相邻信息可在listShard返回结果中查看
        String adjacentShardId = "8";
        MergeShardResult mergeShardResult = datahubClient.mergeShard(Constant.projectName, Constant.topicName, shardId, adjacentShardId);
        System.out.println("merge successful");
        System.out.println(mergeShardResult.getShardId());
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}

Shard扩展

​ shard扩展要求扩展的shard数量不得小于原有shard数量

说明

ExtendShardResult extendShard(String projectName, String topicName, int shardCount);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • shardCount The num of shards to extend to.

    • adjacentShardId The adjacent shard of the specified shard.

  • Exception

    • DatahubClientException

  • 示例

        public static void extendTopic(String projectName, String topicName, int shardCount) {
            try {
                ExtendShardResult extendShardResult = datahubClient.extendShard(projectName, topicName, shardCount);
    
            } catch (DatahubClientException e) {
                System.out.println(e.getErrorMessage());
            }
    
        }

读写数据

状态为CLOSED和ACTIVE的shard都可以读取数据,不过只有状态为ACTIVE的shard可以写数据。

1.读数据

读数据分为两步,首先是获取cursor,接着将获取到的cursor值传入到getRecords方法,需要注意的是,DataHub已经提供了订阅功能,用户可直接关联订阅对数据进行消费,服务端自动保存点位,读数据的主要用途在于抽样查看数据的质量

2.获取cursor

读取Topic下的数据,需要指定对应的shard,同时需要指定数据读取的游标位置Cursor。Cursor的获取方式有四种,分别是OLDEST, LATEST, SEQUENCE, SYSTEM_TIME。

  • OLDEST : 表示获取的cursor指向当前有效数据中时间最久远的record。

  • LATEST : 表示获取的cursor指向当前最新的record。

  • SEQUENCE : 表示获取的cursor指向该序列的record。

  • SYSTEM_TIME : 表示获取的cursor指向该大于等于该时间戳的第一条record。

应该选择哪种呢?

首先要明白一点,读取的数据一定要在有效期内,也就是生命周期内,否则读取数据会报错

  • 如果场景是需要从头开始读取数据的话,选取OLDEST无疑是最合适的,如果数据都在有效期内,则会从第一条数据开始读取。

  • 如果是做抽样使用,需要查看某个时间点之后数据是否是正常的,则应该选择SYSTEM_TIME 模式,将会从获取到的sequence+1位置开始读取。

  • 使用场景是查看当前最新数据的情况,则应该使用LATEST ,用户可通过LATEST 模式一直读取最新写入的数据,当前也可以读取最新写入数据的前N条,这就需要先获取到sequence,然后sequence-N距离最新写入前N条的数据。

说明

GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type); GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type, long param);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • shardId The id of the shard.

    • CursorType Which type used to get cursor.

  • Exception

    • DatahubClientException

    • SeekOutOfRangeException

  • 示例代码

    • 抽样场景,先将Date转换为timestamp,然后获取cursor。

public static void getcursor(String projectName,String topicName) {
    String shardId = "5";
    try {
        //将时间转为时间戳形式
        String time = "2019-07-01 10:00:00";
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        long timestamp = 0L;
        try {
            Date date = simpleDateFormat.parse(time);
            timestamp = date.getTime();//获取时间的时间戳
            //System.out.println(timestamp);
        } 
        //获取时间time之后的数据读取位置
        String timeCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SYSTEM_TIME, timestamp).getCursor();
        System.out.println("get cursor successful");
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());  
    } catch (ParseException e) {
            System.out.println(e.getErrorOffset());
        }
}
  • 从头开始读取数据

public static void getcursor(String projectName,String topicName) {
    String shardId = "5";
    try {
        /* OLDEST用法示例 */
        String oldestCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
        System.out.println("get cursor successful");
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}
  • 读取最新写入的数据

    • 分为两种情况,第一种是最新写入的最后一条数据。

    • 第二种是最新写入的前N条数据。

      • 需要先获取最新写入数据的sequence,然后再获取cursor。

public static void getcursor(String projectName,String topicName) {
    String shardId = "5";
    try {
        /* LATEST用法示例 */
        String latestCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.LATEST).getCursor();
        /* SEQUENCE用法示例 */
        //获取最新数据的sequence
        long seq = datahubClient.getCursor(projectName, topicName, shardId, CursorType.LATEST).getSequence();
        //获取最新的十条数据的读取位置
        String seqCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, seq - 9).getCursor();
        }
         catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }

}

读取数据接口:

说明

GetRecordsResult getRecords(String projectName, String topicName, String shardId, String cursor, int limit); GetRecordsResult getRecords(String projectName, String topicName, String shardId, RecordSchema schema, String cursor, int limit);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • shardId The id of the shard.

    • schema If you read TUPLE records, you need this parameter.

    • cursor The start cursor used to read data.

    • limit Max record size to read.

  • Exception

    • DatahubClientException

  • 示例

1). 读取Tuple topic数据

 public static void example(String projectName,String topicName) {
    //每次最多读取数据量
     int recordLimit = 1000;
     String shardId = "7";
     // 获取cursor, 这里获取有效数据中时间最久远的record游标
     // 注:正常情况下,getCursor只需在初始化时获取一次,然后使用getRecords的nextCursor进行下一次读取
     String cursor = "";
     try {
         cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
     }  catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());         
     }
     while (true) {
         try {
             GetRecordsResult result = datahubClient.getRecords(projectName, topicName, shardId, recordSchema, cursor, recordLimit);
             if (result.getRecordCount() <= 0) {
                // 无数据,sleep后读取
                 Thread.sleep(10000);
                 continue;
             }
             for (RecordEntry entry : result.getRecords()) {
                 TupleRecordData data = (TupleRecordData) entry.getRecordData();
                 System.out.println("field1:" + data.getField("field1") + "\t"
                         + "field2:" + data.getField("field2"));
             }
             // 拿到下一个游标
             cursor = result.getNextCursor();
         } catch (InvalidCursorException ex) {
             // 非法游标或游标已过期,建议重新定位后开始消费
             cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
         }  catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());;

         } 
     }
 }

2). 读取Blob topic数据

public static void example(String projectName,String topicName) {
    //每次最多读取数据量
    int recordLimit = 1000;
    String shardId = "7";
    // 获取cursor, 这里获取有效数据中时间最久远的record游标
    // 注:正常情况下,getCursor只需在初始化时获取一次,然后使用getRecords的nextCursor进行下一次读取
    String cursor = "";
    try {
        cursor = datahubClient.getCursor(projectName, blobTopicName, shardId, CursorType.OLDEST).getCursor();
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());     
    }
    while (true) {
        try {
            GetRecordsResult result = datahubClient.getRecords(projectName, blobTopicName, shardId, recordSchema, cursor, recordLimit);
            if (result.getRecordCount() <= 0) {
                // 无数据,sleep后读取
                Thread.sleep(10000);
                continue;
            }
            /* 消费数据 */
            for (RecordEntry record: result.getRecords()){
                 BlobRecordData data = (BlobRecordData) record.getRecordData();
                 System.out.println(new String(data.getData()));
            }
            // 拿到下一个游标
            cursor = result.getNextCursor();
        } catch (InvalidCursorException ex) {
            // 非法游标或游标已过期,建议重新定位后开始消费
            cursor = datahubClient.getCursor(Constant.projectName, Constant.blobTopicName, shardId, CursorType.OLDEST).getCursor();
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage()); 
        } 
    }
}

写数据

服务器2.12之后版本开始支持PutRecordsByShardResult接口,之前版本putRecords接口,使用putRecordsByShard接口时需指定写入的shard,否则会默认写入第一个处于ACTIVE状态的shard。两个方法中传入参数records是一个List对象,每个元素为一个record,但是必须为相同类型的record,即Tuple类型或者Blob类型。DataHub目前支持按照Shard写入 (服务端 >= 2.12版本) 以及混合写入,分别对应putRecordsByShardputRecords两个接口。针对第二个接口,用户需要判断PutRecordsResult结果以确认数据是否写入成功;而putRecordsByShard接口则直接通过异常告知用户是否成功。如果服务端支持,建议用户使用putRecordsByShard接口。

说明

PutRecordsResult putRecords(String projectName, String topicName, List records); PutRecordsByShardResult putRecordsByShard(String projectName, String topicName, String shardId, List records);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • shardId The id of the shard.

    • records Records list to written.

  • Exception

    • DatahubClientException

1). 写入Tuple topic

    // 写入Tuple型数据
    public static void tupleExample(String project,String topic,int retryTimes) {
        // 获取schema
        RecordSchema recordSchema = datahubClient.getTopic(project,topic ).getRecordSchema();
        // 生成十条数据
        List<RecordEntry> recordEntries = new ArrayList<>();
        for (int i = 0; i < 10; ++i) {
            RecordEntry recordEntry = new RecordEntry();
            // 对每条数据设置额外属性,例如ip 机器名等。可以不设置额外属性,不影响数据写入
            recordEntry.addAttribute("key1", "value1");
            TupleRecordData data = new TupleRecordData(recordSchema);
            data.setField("field1", "HelloWorld");
            data.setField("field2", 1234567);
            recordEntry.setRecordData(data);
            recordEntries.add(recordEntry);
        }
        try {
            PutRecordsResult result = datahubClient.putRecords(project, topic, recordEntries);
            int i = result.getFailedRecordCount();
            if (i > 0) {
                retry(datahubClient, result.getFailedRecords(), retryTimes, project, topic);
            }
        }  catch (DatahubClientException e) {
            System.out.println("requestId:" + e.getRequestId() + "\tmessage:" + e.getErrorMessage());
        }
    }
    //重试机制
    public static void retry(DatahubClient client, List<RecordEntry> records, int retryTimes, String project, String topic) {
        boolean suc = false;
        while (retryTimes != 0) {
            retryTimes = retryTimes - 1;
            PutRecordsResult recordsResult = client.putRecords(project, topic, records);
            if (recordsResult.getFailedRecordCount() > 0) {
                retry(client,recordsResult.getFailedRecords(),retryTimes,project,topic);
            }
            suc = true;
            break;
        }
        if (!suc) {
            System.out.println("retryFailure");
        }
    }
​```java
<br />
<br />**2). 写入Blob topic**<br />

​```java
// 写入blob型数据
public static void blobExample() {
    // 生成十条数据
    List<RecordEntry> recordEntries = new ArrayList<>();
    String shardId = "4";
    for (int i = 0; i < 10; ++i) {
        RecordEntry recordEntry = new RecordEntry();
        // 对每条数据设置额外属性
        recordEntry.addAttribute("key1", "value1");
        BlobRecordData data = new BlobRecordData("123456".getBytes(Charsets.UTF_8));
        recordEntry.setRecordData(data);
        recordEntry.setShardId(shardId);
        recordEntries.add(recordEntry);
        recordEntry.setShardId("0");
    }
    while (true) {
        try {
            // 服务端从2.12版本开始支持,之前版本请使用putRecords接口
            //datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
            datahubClient.putRecords(Constant.projectName, Constant.topicName, recordEntries);
            System.out.println("write data  successful");
            break;
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());            
        }
    }
}

多方式写入

在DataHub 2.12之前版本,DataHub仅支持putRecords接口,在RecordEntry类中包含shardIdpartitionKeyhashKey三个属性,用户通过指定对应属性的值决定数据写入到哪个Shard中。

说明

2.12及之后版本,建议用户使用putRecordsByShard接口,避免服务端partition造成的性能损耗。

1). 按照ShardID写入推荐方式,使用示例如下:

RecordEntry entry = new RecordEntry();
entry.setShardId("0");

2). 按HashKey写入指定一个128 bit的MD5值。 按照HashKey写入,根据Shard的Shard操作决定数据写入的Shard使用示例:

RecordEntry entry = new RecordEntry();
entry.setHashKey("7FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD");

2). 按PartitionKey写入指定一个String类型参数作为PartitionKey,系统根据该String的MD5值以及Shard的Shard操作决定写入的Shard使用示例:

RecordEntry entry = new RecordEntry();
entry.setPartitionKey("TestPartitionKey");

Meter 操作

获取Meter

说明

GetMeterInfoResult getMeterInfo(String projectName, String topicName, String shardId);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • shardId The id of the shard.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • 示例

public static void getMeter(String projectName,String topicName) {
  String shardId = "5";
  try {
      GetMeterInfoResult getMeterInfoResult = datahubClient.getMeterInfo(projectName, topicName, shardId);
      System.out.println("get meter successful");
      System.out.println(getMeterInfoResult.getActiveTime() + "\t" + getMeterInfoResult.getStorage());
  } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());      
  }
}

Subscribtion操作

订阅服务提供了服务端保存用户消费点位的功能,只需要通过简单配置和处理,就可以实现高可用的点位存储服务。

创建 Subscription

说明

CreateSubscriptionResult createSubscription(String projectName, String topicName, String comment);

comment格式为:{“application”:”应用”,”description”:”描述”}

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • comment The comment of the subscription.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • 示例

public static void createSubscription(String projectName,String topicName) {
  try {
      CreateSubscriptionResult createSubscriptionResult = datahubClient.createSubscription(projectName, topicName, Constant.subscribtionComment);
      System.out.println("create subscription successful");
      System.out.println(createSubscriptionResult.getSubId());
  } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());      
  }
}

删除 Subscription

说明

DeleteSubscriptionResult deleteSubscription(String projectName, String topicName, String subId);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • subId The id of the subscription.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • 示例

public static void deleteSubscription(String projectName,String topicName,String subId) {
  try {
      datahubClient.deleteSubscription(projectName, topicName, subId);
      System.out.println("delete subscription successful");
  } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());      
  }
}

更新 Subscription

更新已存在的Subscription,目前只支持更新comment。

说明

UpdateSubscriptionResult updateSubscription(String projectName, String topicName, String subId, String comment);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • subId The id of the subscription.

    • comment The comment you want to update.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • 示例

public static void updateSubscription(String projectName, String topicName, String subId, String comment){
        try {
            datahubClient.updateSubscription(projectName,topicName,subId,comment)
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());

        }

    }

列出 Subscription

listSubscription的参数pageNum和pageSize取指定范围的subscription信息,如pageNum =1, pageSize =10,获取1-10个subscription; pageNum =2, pageSize =5则获取6-10的subscription。

说明

ListSubscriptionResult listSubscription(String projectName, String topicName, int pageNum, int pageSize);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • pageNum The page number used to list subscriptions.

    • pageSize The page size used to list subscriptions.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • 示例代码

  • 示例代码

public static void listSubscription(String projectName, String topicName, int pageNum, int pageSize) {
    try {
        ListSubscriptionResult listSubscriptionResult = datahubClient.listSubscription(projectName, topicName, pageNum, pageSize);
        if (listSubscriptionResult.getSubscriptions().size() > 0) {
            System.out.println(listSubscriptionResult.getTotalCount());
            System.out.println(listSubscriptionResult.getSubscriptions().size());
            for (SubscriptionEntry entry : listSubscriptionResult.getSubscriptions()) {
                System.out.println(entry.getSubId() + "\t"
                        + entry.getState() + "\t"
                        + entry.getType() + "\t"
                        + entry.getComment());
            }
        }
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}

查询 Subscription

说明

GetSubscriptionResult getSubscription(String projectName, String topicName, String subId);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • subId The id of the subscription.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

public static void getSubscription(String projectName, String topicName, String subId) {
    try {
        GetSubscriptionResult getSubscriptionResult = datahubClient.getSubscription(projectName, topicName, subId);
        System.out.println(getSubscriptionResult.getSubId() + "\t"
                + getSubscriptionResult.getState() + "\t"
                + getSubscriptionResult.getType() + "\t"
                + getSubscriptionResult.getComment());
    }  catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}

更新 Subscription 状态

Subscription有两种状态,OFFLINE和ONLINE,分别表示离线和在线。

说明

UpdateSubscriptionStateResult updateSubscriptionState(String projectName, String topicName, String subId, SubscriptionState state);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • subId The id of the subscription.

    • state The state you want to change.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • 示例

public static void updateSubscriptionState(String projectName, String topicName,String subId) {
    try {
        datahubClient.updateSubscriptionState(projectName, topicName, subId, SubscriptionState.ONLINE);
        System.out.println("update subscription state successful");
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());        
    }
}

offset 操作

一个subscription创建后,初始状态是未消费的,要使用subscription服务提供的点位存储功能,需要进行一些offset操作。

初始化 offset

openSubscriptionSession只需要初始化一次,再次调用会重新生成一个消费sessionId,之前的session会失效,无法commit点位。

说明

OpenSubscriptionSessionResult openSubscriptionSession(String projectName, String topicName, String subId, List shardIds);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • subId The id of the subscription.

    • shardIds The id list of the shards.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • 示例

public static void openSubscriptionSession(String projectName, String topicName) {
    shardId = "4";
    shardIds = new ArrayList<String>();
    shardIds.add("0");
    shardIds.add("4");
    try {
        OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
        SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
        System.out.println(subscriptionOffset.getSessionId() + "\t"
                + subscriptionOffset.getVersionId() + "\t"
                + subscriptionOffset.getSequence());
    }  catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());      
    }
}

获取 offset

说明

GetSubscriptionOffsetResult getSubscriptionOffset(String projectName, String topicName, String subId, List shardIds);

getSubscriptionOffset返回结果是GetSubscriptionOffsetResult对象,与openSubscriptionSession返回结果基本上相同,但是GetSubscriptionOffsetResult中的offset没有sessionId的,是作为只读的方法来使用。

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • subId The id of the subscription.

    • shardIds The id list of the shards.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • 示例

//获取点位
public static void getSubscriptionOffset(String projectName, String topicName,String subId) {
    shardId = "4";
    shardIds = new ArrayList<String>();
    shardIds.add("0");
    shardIds.add("4");
    try {
        GetSubscriptionOffsetResult getSubscriptionOffsetResult = datahubClient.getSubscriptionOffset(projectName, topicName, subId, shardIds);
        SubscriptionOffset subscriptionOffset = getSubscriptionOffsetResult.getOffsets().get(shardId);
        System.out.println(subscriptionOffset.getVersionId() + "\t"
                + subscriptionOffset.getSequence());
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}

提交 offset

说明

CommitSubscriptionOffsetResult commitSubscriptionOffset(String projectName, String topicName, String subId, Map offsets);

提交点位会验证versionId和sessionId,必须与当前的一致;提交的点位信息没有严格限制,建议按照record中的真实sequence和timestamp来填写。

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • subId The id of the subscription.

    • offsets The offset map of shards.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

    • SubscriptionOffsetResetException

    • SubscriptionSessionInvalidException

    • SubscriptionOfflineException

  • 示例

//提交点位
public static void commitSubscriptionOffset(String projectName, String topicName,String subId) {
  while (true) {
      try {
          OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
          SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
          //这里仅仅测试提交,完整过程请参考点位消费样例
          subscriptionOffset.setSequence(10);
          subscriptionOffset.setTimestamp(100);
          Map<String, SubscriptionOffset> offsets = new HashMap<String, SubscriptionOffset>();
          offsets.put(shardId, subscriptionOffset);
          // 提交点位
          datahubClient.commitSubscriptionOffset(Constant.projectName, Constant.topicName, subId, offsets);
      } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());


      }
  }
}

重置 offset

说明

ResetSubscriptionOffsetResult resetSubscriptionOffset(String projectName, String topicName, String shardId, Map offsets);

重置点位可以将消费点位设置到某个时间点,如果在这个时间点有多个record,那么点位会设置到该时间点的第一条record的位置。重置点位在修改点位信息的同时更新versionId,运行中的任务在使用旧的versionId来提交点位时会收到SubscriptionOffsetResetException,通过getSubscriptionOffset接口可以拿到新的versionId。

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • subId The id of the subscription.

    • offsets The offset map of shards.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • 示例

//重置点位
public static void resetSubscriptionOffset(String projectName, String topicName) throws ParseException {
    List<String> shardIds = Arrays.asList("0");
    //选择想要重置点位到的时间,并转换为时间戳
    String time = "2019-07-09 10:00:00";
    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    Date date = simpleDateFormat.parse(time);

    long timestamp = date.getTime();//获取时间的时间戳
    long sequence = client.getCursor(projectName, topicName, subId, CursorType.SYSTEM_TIME, timestamp).getSequence();
    SubscriptionOffset offset = new SubscriptionOffset();
    offset.setTimestamp(timestamp);
    offset.setSequence(sequence);
    Map<String, SubscriptionOffset> offsets = new HashMap<String, SubscriptionOffset>();
    for (String shardId : shardIds) {
        offsets.put(shardId, offset);
    }

    try {
        datahubClient.resetSubscriptionOffset(projectName, topicName, subId, offsets);
        System.out.println("reset successful");
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
    }
}

关联订阅消费DataHub数据

同读取DataHub数据类似,使用订阅进行消费的不同是订阅存储了消费的点位,用户可自由选择消费点位

  • 注意事项:

    1. 首先调用openSubscriptionSession初始化offset,获取version + session信息,全局只初始化一次,多次调用此方法,会造成原有session失效,无法提交点位

    2. 调用getcursor获取订阅的点位进行消费,消费完一条数据后,调用getNextCursor获取下一条数据点位,继续消费

    3. 提交点位时,调用commitSubscriptionOffset提交点位,commit操作会检查version和session信息,必须完全一致才能提交成功

//点位消费示例,并在消费过程中进行点位的提交
public static void example(String projectName, String topicName,String subId) {
  String shardId = "0";
  List<String> shardIds = Arrays.asList("0", "1");
  OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
  SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
  // 1、获取当前点位的cursor,如果当前点位已过期则获取生命周期内第一条record的cursor,未消费同样获取生命周期内第一条record的cursor
  String cursor = null;
  //sequence < 0说明未消费
  if (subscriptionOffset.getSequence() < 0) {
      // 获取生命周期内第一条record的cursor
      cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
  } else {
      // 获取下一条记录的Cursor
      long nextSequence = subscriptionOffset.getSequence() + 1;
      try {
          //按照SEQUENCE getCursor可能报SeekOutOfRange错误,表示当前cursor的数据已过期
          cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
      } catch (SeekOutOfRangeException e) {
          // 获取生命周期内第一条record的cursor
          cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
      }
  }
  // 2、读取并保存点位,这里以读取Tuple数据为例,并且每1000条记录保存一次点位
  long recordCount = 0L;
  // 每次读取10条record
  int fetchNum = 10;
  while (true) {
      try {
          GetRecordsResult getRecordsResult = datahubClient.getRecords(projectName, topicName, shardId, schema, cursor, fetchNum);
          if (getRecordsResult.getRecordCount() <= 0) {
              // 无数据,sleep后读取
              Thread.sleep(1000);
              continue;
          }
          for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
              //消费数据
              TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
              System.out.println("field1:" + data.getField("field1") + "\t"
                      + "field2:" + data.getField("field2"));
              // 处理数据完成后,设置点位
              ++recordCount;
              subscriptionOffset.setSequence(recordEntry.getSequence());
              subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
              if (recordCount % 1000 == 0) {
    点位//提交点位点位
                  Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
                  offsetMap.put(shardId, subscriptionOffset);
                  datahubClient.commitSubscriptionOffset(projectName, topicName, subId, offsetMap);
                  System.out.println("commit offset successful");
              }
          }
          cursor = getRecordsResult.getNextCursor();
      } catch (SubscriptionOfflineException | OffsetSessionChangedException e) {
          // 退出. Offline: 订阅下线; SessionChange: 表示订阅被其他客户端同时消费
          break;
      } catch (OffsetResetedException e) {
          // 表示点位被重置,重新获取SubscriptionOffset信息,这里以Sequence重置为例
          // 如果以Timestamp重置,需要通过CursorType.SYSTEM_TIME获取cursor
          subscriptionOffset = datahubClient.getSubscriptionOffset(projectName, topicName, subId, shardIds).getOffsets().get(shardId);
          long nextSequence = subscriptionOffset.getSequence() + 1;
          cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
      } catch (DatahubClientException e) {
          // TODO: 针对不同异常决定是否退出
      } catch (Exception e) {
          break;
      }
  }
}

Connector 操作

DataHub Connector是把DataHub服务中的流式数据到其他云产品中的功能,目前支持将Topic中的数据实时/准实时同步到MaxCompute(ODPS)、Oss、RDS&Mysql、TableStore、Oss、ElasticSearch、函数计算中。用户只需要向DataHub中写入一次数据,并在DataHub服务中配置好同步功能,便可以在其他云产品中使用这份数据。

创建 Connector

说明

CreateConnectorResult createConnector(String projectName, String topicName, ConnectorType connectorType, List columnFields, SinkConfig config); CreateConnectorResult createConnector(String projectName, String topicName, ConnectorType connectorType, long sinkStartTime, List columnFields, SinkConfig config);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • ConnectorType The type of connector which you want create.

    • columnFields Which fields you want synchronize.

    • sinkStartTime Start time to sink from datahub. Unit: Ms

    • config Detail config of specified connector type.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • ODPS示例

public static void createConnector(String projectName,String topicName) {
    List<String> columnFields = Arrays.asList("field1", "field2");
  	@Value("${datahub.accessId}")
		String accessId;
		@Value("${datahub.accessKey}")
  	String accessKey;
    SinkOdpsConfig config = new SinkOdpsConfig() {{
        setEndpoint(Constant.odps_endpoint);
        setProject(Constant.odps_project);
        setTable(Constant.odps_table);
        setAccessId(Constant.odps_accessId);
        setAccessKey(Constant.odps_accessKey);
        setPartitionMode(PartitionMode.SYSTEM_TIME);
        setTimeRange(60);
    }};
    //设置分区格式
    SinkOdpsConfig.PartitionConfig partitionConfig = new SinkOdpsConfig.PartitionConfig() {{
        addConfig("ds", "%Y%m%d");
        addConfig("hh", "%H");
        addConfig("mm", "%M");
    }};
    config.setPartitionConfig(partitionConfig);
    try {
        //创建Connector
        datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_ODPS, columnFields, config);
        System.out.println("create  connector successful");
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());        
    }
}
  • OSS示例:

    public static void createOssConnector(String projectName,String topicName) {
        List<String> columnFields = Arrays.asList("field1", "field2");
        @Value("${datahub.accessId}")
				String accessId;
				@Value("${datahub.accessKey}")
  			String accessKey;
        SinkOssConfig config = new SinkOssConfig() {{
            setAccessId(Constant.oss_accessId);
            setAccessKey(Constant.oss_accessKey);
            setAuthMode(AuthMode.STS);
            setBucket(Constant.oss_bucket);
            setEndpoint(Constant.oss_endpoint);
            setPrefix(Constant.oss_prefix);
            setTimeFormat(Constant.oss_timeFormat);
            setTimeRange(60);

        }};

        try {
            //创建Connector
            datahubClient.createConnector(projectName,topicName, ConnectorType.SINK_OSS, columnFields, config);
            System.out.println("create  connector successful");
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());
        }
    }
  • TableStore示例:

       public static void createOtsConnector(String projectName,String topicName) {
        List<String> columnFields = Arrays.asList("field1", "field2");
  			@Value("${datahub.accessId}")
				String accessId;
				@Value("${datahub.accessKey}")
  			String accessKey;         
        final SinkOtsConfig config = new SinkOtsConfig() {{
            setAccessId(Constant.ots_accessId);
            setAccessKey(Constant.ots_accessKey);
            setEndpoint(Constant.ots_endpoint);
            setInstance(Constant.ots_instance);
            setTable(Constant.ots_table);
            setAuthMode(AuthMode.AK);
        }};

        try {
            //创建Connector
            datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_OTS, columnFields, config);
            System.out.println("create  connector successful");
        }  catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());    
        }
    }
  • Hologres示例:

       public static void createHoloConnector(String projectName,String topicName) {
       @Value("${datahub.accessId}")
			 String accessId;
	     @Value("${datahub.accessKey}")
  	   String accessKey;
        List<String> columnFields = Arrays.asList("field1", "field2");
        final SinkHologresConfig config = new SinkHologresConfig() {{
            setAccessId(Constant.accessId);
            setAccessKey(Constant.accessKey);
            setProjectName(Constant.projectName);
            setTopicName(Constant.topicName);
            setAuthMode(AuthMode.AK);
            setInstanceId(Constant.instanceId);
            //设置时间格式
            setTimestampUnit(TimestampUnit.MILLISECOND);
        }};

        try {
            //创建Connector
            datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_HOLOGRES, columnFields, config);
            System.out.println("create  connector successful");
        }  catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());    
        }
    }
  • ElasticSearch 示例:

    public static void createEsConnector(String projectName,String topicName){
   	@Value("${datahub.accessId}")
		String accessId;
		@Value("${datahub.accessKey}")
		String accessKey;
        List<String> columnFields = Arrays.asList("field1", "field2");
        final SinkEsConfig config = new SinkEsConfig() {{
            setEndpoint(Constant.es_endpoint);
            setIdFields(Constant.es_fields);
            setIndex(Constant.es_index);
            setPassword(Constant.es_password);
            setProxyMode(Constant.es_proxyMode);
            setTypeFields(Constant.es_typeFields);
            setUser(Constant.es_user);

        }};

        try {
            //创建Connector
            datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_ES, columnFields, config);
            System.out.println("create  connector successful");
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());
        }
    }
  • 函数计算示例:

   public static void createFcConnector(String projectName,String topicName){
    @Value("${datahub.accessId}")
		String accessId;
		@Value("${datahub.accessKey}")
		String accessKey;   
     List<String> columnFields = Arrays.asList("field1", "field2");
        final SinkFcConfig config = new SinkFcConfig() {{
            setEndpoint(Constant.fc_endpoint);
            setAccessId(Constant.fc_accessId);
            setAccessKey(Constant.fc_accessKey);
            setAuthMode(AuthMode.AK);
            setFunction(Constant.fc_function);
            setService(Constant.fc_service);

        }};

        try {
            //创建Connector
            datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_FC, columnFields, config);
            System.out.println("create  connector successful");
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());

        }
    }
  • Mysql示例:

    public static void createMysqlConnector(String projectName,String topicName){
        List<String> columnFields = Arrays.asList("field1", "field2");

        final SinkMysqlConfig config = new SinkMysqlConfig() {{
         setDatabase( Constant.mysql_database);
         setHost(Constant.mysql_host);
         setInsertMode(InsertMode.OVERWRITE);
         setPassword(Constant.mysql_password);
         setPort(Constant.mysql_port);
         setTable(Constant.mysql_table);
         setUser(Constant.mysql_user);
        }};

        try {
            //创建Connector
            datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_MYSQL, columnFields, config);
            System.out.println("create  connector successful");
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());  
        }
    }

删除 Connector

说明

DeleteConnectorResult deleteConnector(String projectName, String topicName, ConnectorType connectorType);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • ConnectorType The type of connector which you want delete.

    • columnFields Which fields you want synchronize.

    • sinkStartTime Start time to sink from datahub. Unit: Ms

    • config Detail config of specified connector type.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • 示例

public static void deleteConnector(String projectName,String topicName) {
  try {
      datahubClient.deleteConnector(projectName, topicName, ConnectorType.SINK_ODPS);
      System.out.println("delete  connector successful");
  } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());    
  }
}

查询 Connector

说明

GetConnectorResult getConnectorResult = datahubClient.getConnector(projectName,topicName,ConnectorType.SINK_ODPS);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • ConnectorType The type of connector which you want get.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • 示例

public static void getConnector(String projectName,String topicName) {
  try {
      GetConnectorResult getConnectorResult = datahubClient.getConnector(projectName, topicName, ConnectorType.SINK_ODPS);
      System.out.println(getConnectorResult.getState() + "\t" + getConnectorResult.getSubId());
      for (String fieldName : getConnectorResult.getColumnFields()) {
          System.out.println(fieldName);
      }
  } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

  }
}

更新 Connector

更新Connector的配置。

说明

UpdateConnectorResult updateConnector(String projectName, String topicName, ConnectorType connectorType, SinkConfig config);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • ConnectorType The type of connector which you want update.

    • config Detail config of specified connector type.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • 示例

public static void updateConnector(String projectName,String topicName) {
  @Value("${datahub.accessId}")
	String accessId;
	@Value("${datahub.accessKey}")
	String accessKey;
    SinkOdpsConfig config = (SinkOdpsConfig) datahubClient.getConnector(projectName, topicName, ConnectorType.SINK_ODPS).getConfig();
    //修改ak
    config.setTimeRange(100);
    config.setAccessId(accessId);
    config.setAccessKey(accessKey);
    //修改timestamp类型
    config.setTimestampUnit(ConnectorConfig.TimestampUnit.MICROSECOND);
    try {
        datahubClient.updateConnector(projectName, topicName, ConnectorType.SINK_ODPS, config);
        System.out.println("update  connector successful");
    }  catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
    }
}

更新Connector Field

说明

UpdateConnectorResult updateConnector(String projectName, String topicName, String connectorId, List columnFields);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • connectorId The id of connector which you want update.

    • columnFields New import fields.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • 示例

    public static void updateConnector(String projectName,String topicName) {
        String connectorId = "";
        //columnField代表的是同步到下游的所有字段,并不只是新增的字段
        List<String> columnField = new ArrayList<>();
        columnField.add("f1");
        try {
            batchClient.updateConnector(projectName, topicName,connectorId,columnField);
            System.out.println("update  connector successful");
        }  catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());
        }
    }

更新 Connector state

说明

UpdateConnectorStateResult updateConnectorState(String projectName, String topicName, ConnectorType connectorType, ConnectorState connectorState);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • ConnectorType The type of connector.

    • connectorState The state of the connector. Support: ConnectorState.STOPPED, ConnectorState.RUNNING.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • 示例

public static void updateConnectorState(String projectName,String topicName) {
      try {
          datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.STOPPED);
          datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.RUNNING);
          System.out.println("update  connector state successful");
      } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
      }
  }

更新 Connector offset

说明

UpdateConnectorOffsetResult updateConnectorOffset(String projectName, String topicName, ConnectorType connectorType, String shardId, ConnectorOffset offset);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • ConnectorType The type of connector.

    • shardId The id of the shard. If shardId is null, then update all shards offset.

    • offset The connector offset.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • 示例

public static void updateConnectorOffset(String projectName,String topicName) {
    ConnectorOffset offset = new ConnectorOffset() {{
        setSequence(10);
        setTimestamp(1000);
    }};
    try {
        //更新Connector点位需要先停止Connector
        datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.STOPPED);
        datahubClient.updateConnectorOffset(projectName, topicName, ConnectorType.SINK_ODPS, shardId, offset);
        datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.RUNNING);
        System.out.println("update  connector offset successful");
    }  catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
    }
}

列出 Connector

说明

ListConnectorResult listConnector(String projectName, String topicName);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • 示例

 public static void listConnector(String projectName,String topicName) {
  try {
       ListConnectorResult listConnectorResult = datahubClient.listConnector(projectName, topicName);
      for (String cName : listConnectorResult.getConnectorNames()) {
          System.out.println(cName);
      }
  } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
   }
 }

查询 Connector Shard 状态

说明

GetConnectorShardStatusResult getConnectorShardStatus(String projectName, String topicName, ConnectorType connectorType); ConnectorShardStatusEntry getConnectorShardStatus(String projectName, String topicName, ConnectorType connectorType, String shardId);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • ConnectorType The type of connector.

    • shardId The id of the shard.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • 示例

public static void getConnectorShardStatusByShard(String projectName,String topicName,String shardId) {
    try {
        ConnectorShardStatusEntry connectorShardStatusEntry = datahubClient.getConnectorShardStatus(projectName, topicName, ConnectorType.SINK_ODPS, shardId);
        System.out.println(connectorShardStatusEntry.getState() + "\t"
                + connectorShardStatusEntry.getCurrSequence() + "\t"
                + connectorShardStatusEntry.getDiscardCount() + "\t"
                + connectorShardStatusEntry.getUpdateTime());
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
    }
}
public static void getConnectorShardStatus(String projectName,String topicName) {
    try {
        GetConnectorShardStatusResult getConnectorShardStatusResult = datahubClient.getConnectorShardStatus(projectName, topicName, ConnectorType.SINK_ODPS);
        for (Map.Entry<String, ConnectorShardStatusEntry> entry : getConnectorShardStatusResult.getStatusEntryMap().entrySet()) {
            System.out.println(entry.getKey() + " : " + entry.getValue().getState() + "\t"
                    + entry.getValue().getCurrSequence() + "\t"
                    + entry.getValue().getDiscardCount() + "\t"
                    + entry.getValue().getUpdateTime());
        }
    } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());
    }
}

重启 Connector

说明

ReloadConnectorResult reloadConnector(String projectName, String topicName, ConnectorType connectorType); ReloadConnectorResult reloadConnector(String projectName, String topicName, ConnectorType connectorType, String shardId);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • ConnectorType The type of connector.

    • shardId The id of the shard.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • 示例

public static void reloadConnector(String projectName,String topicName ) {
    try {
        datahubClient.reloadConnector(projectName, topicName, ConnectorType.SINK_ODPS);
        System.out.println("reload connector successful");
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());      
    }
}
public static void reloadConnectorByShard(String projectName,String topicName,String shardId) {
    try {
        datahubClient.reloadConnector(projectName, topicName, ConnectorType.SINK_ODPS, shardId);
        System.out.println("reload connector successful");
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}

查询 Connector 完成时间

说明

GetConnectorDoneTimeResult getConnectorDoneTime(String projectName, String topicName, ConnectorType connectorType);

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • ConnectorType The type of connector.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • 示例

public static void getDoneTime(String projectName,String topicName ) {
    try {
        GetConnectorDoneTimeResult getConnectorDoneTimeResult = datahubClient.getConnectorDoneTime(projectName, topicName, ConnectorType.SINK_ODPS);
        System.out.println(getConnectorDoneTimeResult.getDoneTime());
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}

更新VPC白名单

说明

UpdateProjectVpcWhitelistResult updateProjectVpcWhitelist(String projectName, String vpcIds);

  • 参数

    • projectName The name of the project.

    • vpcids The vpcIds to modify.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • 示例

public static void updateProjectVpcWhitelist(String projectName) {
    String vpcid = "12345";
    try {
        datahubClient.updateProjectVpcWhitelist(projectName, vpcid);
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
    }
}

添加新的Field

说明

AppendConnectorFieldResult appendConnectorField(String projectName, String topicName, ConnectorType connectorType, String fieldName);

可以给Connector添加新的field,但是需要MaxCompute表中存在和datahub中对应的列。

  • 参数

    • projectName The name of the project.

    • topicName The name of the topic.

    • ConnectorType The type of connector.

    • fieldName The field to append. Field value must allow null.

  • Exception

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • 示例

public static void appendConnectorField(String projectName,String topicName) {
    String newField = "newfield";
    try {
        //要求topic的schema和MaxCompute的table中都存在列newfield,并且表结构完全一致
        datahubClient.appendConnectorField(projectName, topicName, ConnectorType.SINK_ODPS, newField);
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}

example示例

示例demo

批量操作

请使用console命令工具