Topic操作

本文为您展示DataHub的 Java SDKTopic操作。

Topic说明

Topic是 DataHub 订阅和发布的最小单位,用户可以用Topic来表示一类或者一种流数据,目前支持TupleBlob两种类型:

  • Blob类型Topic支持写入一块二进制数据作为一个Record。

  • Tuple类型的Topic支持类似于数据库的记录的数据,每条记录包含多个列,需要指定Record Schema,因为网络传输中,数据都是以字符串的形式发送,需要schema来转换成对应的类型。

    仅支持以下数据类型:

    类型

    含义

    值域

    BIGINT

    8字节有符号整型

    -9223372036854775807 ~ 9223372036854775807

    DOUBLE

    8字节双精度浮点数

    -1.0 _10^308 ~ 1.0 _10^308

    BOOLEAN

    布尔类型

    可取以下任意一组:

    • True/False

    • true/false

    • 0/1

    TIMESTAMP

    时间戳类型

    表示到微秒的时间戳。

    STRING

    字符串,只支持UTF-8编码

    单个STRING列最长允许2MB。

    TINYINT

    单字节整型

    -128 ~127

    SMALLINT

    双字节整型

    -32768 ~ 32767

    INTEGER

    4字节整型

    -2147483648 ~ 2147483647

    FLOAT

    4字节单精度浮点数

    -3.40292347_10^38 ~ 3.40292347_10^38

    说明

    DataHub 中的 TINYINTSMALLINTINTEGERFLOAT类型从java sdk 2.16.1-public开始支持。

创建Topic

创建Tuple Topic

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

shardCount

int

初始shard数量。

lifeCycle

int

数据生命周期(单位:天)。

recordType

RecordType

写入的Record类型,现仅支持 TUPLEBLOB。

recordSchema

RecordSchema

Topicrecords schema。

comment

String

Topic的描述。

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

代码示例

 public static void createTupleTopic(String projectName, String topicName, int shardCount, int lifeCycle,  String topicComment) {
   RecordSchema schema = new RecordSchema();
   schema.addField(new Field("bigint_field", FieldType.BIGINT));
   schema.addField(new Field("double_field", FieldType.DOUBLE));
   schema.addField(new Field("boolean_field", FieldType.BOOLEAN));
   schema.addField(new Field("timestamp_field", FieldType.TIMESTAMP));
   schema.addField(new Field("tinyint_field", FieldType.TINYINT));
   schema.addField(new Field("smallint_field", FieldType.SMALLINT));
   schema.addField(new Field("integer_field", FieldType.INTEGER));
   schema.addField(new Field("floar_field", FieldType.FLOAT));
   schema.addField(new Field("decimal_field", FieldType.DECIMAL));
   schema.addField(new Field("string_field", FieldType.STRING));
   try {
       datahubClient.createTopic(projectName,topicName, shardCount, lifeCycle, RecordType.TUPLE, schema, topicComment);
       System.out.println("create topic successful");
   } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
   }
 }

创建Blob Topic

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称

shardCount

int

初始shard数量

lifeCycle

int

数据生命周期(单位:天)。

recordType

RecordType

写入的Record类型,现仅支持 TUPLEBLOB

comment

String

Topic的说明。

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

InvalidParameterException

InvalidParameter

InvalidCursor

非法参数。

AuthorizationFailureException

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

ResourceNotFoundException

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在。

说明

进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常

ResourceAlreadyExistException

ResourceAlreadyExist

ProjectAlreadyExist

TopicAlreadyExist

ConnectorAlreadyExist

资源已存在。(创建时如果资源已存在,就会抛出这个异常)。

代码示例

public static void createBlobTopic(String projectName, String topicName, int shardCount, int lifeCycle,  String topicComment) {
  try {
      datahubClient.createTopic(projectName, blobTopicName, shardCount, lifeCycle, RecordType.BLOB, topicComment);
      System.out.println("create topic successful");
  } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
  }

删除Topic

删除Topic之前需保证Topic没有subscriptionconnector,否则会异常:NoPermissionException

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

异常描述

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

NoPermissionException

NoPermission

OperationDenied

没有权限,通常是RAM配置不正确,或没有正确授权子账号。

代码示例

public static void deleteTopic(String projectName, String topicName) {
  try {
      datahubClient.deleteTopic(projectName, topicName);
      System.out.println("delete topic successful");
  } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

  }
}

列出Topic

以列表的形式列出配置项目下的所有Topic。

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

代码示例

   public static void listTopic(String projectName ) {
      try {
          ListTopicResult listTopicResult = datahubClient.listTopic(projectName);
          if (listTopicResult.getTopicNames().size() > 0) {
              for (String tName : listTopicResult.getTopicNames()) {
                  System.out.println(tName);
              }
          }
      } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());          
      }
  }

更新Topic

更新Topic信息,可更新Topic的描述以及Topic的生命周期

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

comment

int

Topic描述

lifeCycle

String

Topic生命周期

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

代码示例

   public static void updateTopic(String projectName, String topicName, int lifeCycle, String comment) {
        try {
            comment = "new topic comment";
             lifeCycle = 1;
            datahubClient.updateTopic(projectName, Constant.topicName,lifeCycle, comment);
            System.out.println("update topic successful");
            //查看更新后结果
            GetTopicResult getTopicResult = datahubClient.getTopic(projectName, topicName);
            System.out.println(getTopicResult.getComment());
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());
        }
    }

查询Topic

根据项目名称和Topic名称来查询Topic的相关属性。

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

代码示例

   public static void getTopic(String projectName, String topicName) {
        try {
            GetTopicResult getTopicResult = datahubClient.getTopic(projectName, topicName);
            System.out.println(getTopicResult.getShardCount() + "\t"
                    + getTopicResult.getLifeCycle() + "\t"
                    + getTopicResult.getRecordType() + "\t"
                    + getTopicResult.getComment());
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());

        }
    }

更多操作

Tuple Topic 新增 Field

在对Tuple Topic新增Field时,可新增一列,也可一次性插入多列。

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

fields

Field

新增列,不允许为空

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

代码示例

public static void appendNewField(String projectName,String topicName) {
    try {
        Field newField = new Field("newField", FieldType.STRING, true,"comment");
        datahubClient.appendField(projectName, topicName, newField);
        System.out.println("append field successful");
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}