Connector 操作

本文为您展示DataHub的 Java SDKConnector操作。

创建Connector

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

ConnectorType

ConnectorType

The type of connector which you want create.

columnFields

List

Which fields you want synchronize.

sinkStartTime

long

Start time to sink from datahub. Unit: Ms

config

SinkConfig

Detail config of specified connector type.

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

InvalidParameterException

InvalidParameter

InvalidCursor

非法参数。

AuthorizationFailureException

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

ResourceNotFoundException

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在。

说明

进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常

代码示例

ODPS示例

public static void createConnector(String projectName,String topicName) {
    List<String> columnFields = Arrays.asList("field1", "field2");
  	@Value("${datahub.accessId}")
		String accessId;
		@Value("${datahub.accessKey}")
  	String accessKey;
    SinkOdpsConfig config = new SinkOdpsConfig() {{
        setEndpoint(Constant.odps_endpoint);
        setProject(Constant.odps_project);
        setTable(Constant.odps_table);
        setAccessId(Constant.odps_accessId);
        setAccessKey(Constant.odps_accessKey);
        setPartitionMode(PartitionMode.SYSTEM_TIME);
        setTimeRange(60);
    }};
    //设置分区格式
    SinkOdpsConfig.PartitionConfig partitionConfig = new SinkOdpsConfig.PartitionConfig() {{
        addConfig("ds", "%Y%m%d");
        addConfig("hh", "%H");
        addConfig("mm", "%M");
    }};
    config.setPartitionConfig(partitionConfig);
    try {
        //创建Connector
        datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_ODPS, columnFields, config);
        System.out.println("create  connector successful");
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());        
    }
}

OSS示例

    public static void createOssConnector(String projectName,String topicName) {
        List<String> columnFields = Arrays.asList("field1", "field2");
        @Value("${datahub.accessId}")
				String accessId;
				@Value("${datahub.accessKey}")
  			String accessKey;
        SinkOssConfig config = new SinkOssConfig() {{
            setAccessId(Constant.oss_accessId);
            setAccessKey(Constant.oss_accessKey);
            setAuthMode(AuthMode.STS);
            setBucket(Constant.oss_bucket);
            setEndpoint(Constant.oss_endpoint);
            setPrefix(Constant.oss_prefix);
            setTimeFormat(Constant.oss_timeFormat);
            setTimeRange(60);

        }};

        try {
            //创建Connector
            datahubClient.createConnector(projectName,topicName, ConnectorType.SINK_OSS, columnFields, config);
            System.out.println("create  connector successful");
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());
        }
    }

TableStore示例

       public static void createOtsConnector(String projectName,String topicName) {
        List<String> columnFields = Arrays.asList("field1", "field2");
  			@Value("${datahub.accessId}")
				String accessId;
				@Value("${datahub.accessKey}")
  			String accessKey;         
        final SinkOtsConfig config = new SinkOtsConfig() {{
            setAccessId(Constant.ots_accessId);
            setAccessKey(Constant.ots_accessKey);
            setEndpoint(Constant.ots_endpoint);
            setInstance(Constant.ots_instance);
            setTable(Constant.ots_table);
            setAuthMode(AuthMode.AK);
        }};

        try {
            //创建Connector
            datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_OTS, columnFields, config);
            System.out.println("create  connector successful");
        }  catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());    
        }
    }

Hologres示例

       public static void createHoloConnector(String projectName,String topicName) {
       @Value("${datahub.accessId}")
			 String accessId;
	     @Value("${datahub.accessKey}")
  	   String accessKey;
        List<String> columnFields = Arrays.asList("field1", "field2");
        final SinkHologresConfig config = new SinkHologresConfig() {{
            setAccessId(Constant.accessId);
            setAccessKey(Constant.accessKey);
            setProjectName(Constant.projectName);
            setTopicName(Constant.topicName);
            setAuthMode(AuthMode.AK);
            setInstanceId(Constant.instanceId);
            //设置时间格式
            setTimestampUnit(TimestampUnit.MILLISECOND);
        }};

        try {
            //创建Connector
            datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_HOLOGRES, columnFields, config);
            System.out.println("create  connector successful");
        }  catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());    
        }
    }

ElasticSearch 示例

    public static void createEsConnector(String projectName,String topicName){
   	@Value("${datahub.accessId}")
		String accessId;
		@Value("${datahub.accessKey}")
		String accessKey;
        List<String> columnFields = Arrays.asList("field1", "field2");
        final SinkEsConfig config = new SinkEsConfig() {{
            setEndpoint(Constant.es_endpoint);
            setIdFields(Constant.es_fields);
            setIndex(Constant.es_index);
            setPassword(Constant.es_password);
            setProxyMode(Constant.es_proxyMode);
            setTypeFields(Constant.es_typeFields);
            setUser(Constant.es_user);

        }};

        try {
            //创建Connector
            datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_ES, columnFields, config);
            System.out.println("create  connector successful");
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());
        }
    }

函数计算示例

   public static void createFcConnector(String projectName,String topicName){
    @Value("${datahub.accessId}")
		String accessId;
		@Value("${datahub.accessKey}")
		String accessKey;   
     List<String> columnFields = Arrays.asList("field1", "field2");
        final SinkFcConfig config = new SinkFcConfig() {{
            setEndpoint(Constant.fc_endpoint);
            setAccessId(Constant.fc_accessId);
            setAccessKey(Constant.fc_accessKey);
            setAuthMode(AuthMode.AK);
            setFunction(Constant.fc_function);
            setService(Constant.fc_service);

        }};

        try {
            //创建Connector
            datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_FC, columnFields, config);
            System.out.println("create  connector successful");
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());

        }
    }

MySQL示例

    public static void createMysqlConnector(String projectName,String topicName){
        List<String> columnFields = Arrays.asList("field1", "field2");

        final SinkMysqlConfig config = new SinkMysqlConfig() {{
         setDatabase( Constant.mysql_database);
         setHost(Constant.mysql_host);
         setInsertMode(InsertMode.OVERWRITE);
         setPassword(Constant.mysql_password);
         setPort(Constant.mysql_port);
         setTable(Constant.mysql_table);
         setUser(Constant.mysql_user);
        }};

        try {
            //创建Connector
            datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_MYSQL, columnFields, config);
            System.out.println("create  connector successful");
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());  
        }
    }

删除Connector

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

ConnectorType

ConnectorType

The type of connector which you want create.

columnFields

List

Which fields you want synchronize.

sinkStartTime

long

Start time to sink from datahub. Unit: Ms

config

SinkConfig

Detail config of specified connector type.

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

InvalidParameterException

InvalidParameter

InvalidCursor

非法参数。

AuthorizationFailureException

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

ResourceNotFoundException

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在。

说明

进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常

代码说明

public static void deleteConnector(String projectName,String topicName) {
  try {
      datahubClient.deleteConnector(projectName, topicName, ConnectorType.SINK_ODPS);
      System.out.println("delete  connector successful");
  } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());    
  }
}

查询Connector

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

ConnectorType

ConnectorType

The type of connector which you want create.

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

InvalidParameterException

InvalidParameter

InvalidCursor

非法参数。

AuthorizationFailureException

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

ResourceNotFoundException

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在。

说明

进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常

代码示例

public static void getConnector(String projectName,String topicName) {
  try {
      GetConnectorResult getConnectorResult = datahubClient.getConnector(projectName, topicName, ConnectorType.SINK_ODPS);
      System.out.println(getConnectorResult.getState() + "\t" + getConnectorResult.getSubId());
      for (String fieldName : getConnectorResult.getColumnFields()) {
          System.out.println(fieldName);
      }
  } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

  }
}

更新Connector

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

ConnectorType

ConnectorType

The type of connector which you want create.

config

SinkConfig

Detail config of specified connector type.

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

InvalidParameterException

InvalidParameter

InvalidCursor

非法参数。

AuthorizationFailureException

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

ResourceNotFoundException

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在。

说明

进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常

代码示例

public static void updateConnector(String projectName,String topicName) {
  @Value("${datahub.accessId}")
	String accessId;
	@Value("${datahub.accessKey}")
	String accessKey;
    SinkOdpsConfig config = (SinkOdpsConfig) datahubClient.getConnector(projectName, topicName, ConnectorType.SINK_ODPS).getConfig();
    //修改ak
    config.setTimeRange(100);
    config.setAccessId(accessId);
    config.setAccessKey(accessKey);
    //修改timestamp类型
    config.setTimestampUnit(ConnectorConfig.TimestampUnit.MICROSECOND);
    try {
        datahubClient.updateConnector(projectName, topicName, ConnectorType.SINK_ODPS, config);
        System.out.println("update  connector successful");
    }  catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
    }
}

更多操作

更新Connector Field

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

connectorId

String

The id of connector which you want update.

columnFields

List

New import fields.

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

InvalidParameterException

InvalidParameter

InvalidCursor

非法参数。

AuthorizationFailureException

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

ResourceNotFoundException

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在。

说明

进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常

代码示例
    public static void updateConnector(String projectName,String topicName) {
        String connectorId = "";
        //columnField代表的是同步到下游的所有字段,并不只是新增的字段
        List<String> columnField = new ArrayList<>();
        columnField.add("f1");
        try {
            batchClient.updateConnector(projectName, topicName,connectorId,columnField);
            System.out.println("update  connector successful");
        }  catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());
        }
    }

更新Connector state

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

ConnectorType

ConnectorType

The type of connector.

connectorState

The state of the connector. Support: ConnectorState.STOPPED, ConnectorState.RUNNING.

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

InvalidParameterException

InvalidParameter

InvalidCursor

非法参数。

AuthorizationFailureException

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

ResourceNotFoundException

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在。

说明

进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常

代码示例
public static void updateConnectorState(String projectName,String topicName) {
      try {
          datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.STOPPED);
          datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.RUNNING);
          System.out.println("update  connector state successful");
      } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
      }
  }

更新 Connector offset

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

ConnectorType

ConnectorType

The type of connector.

shardId

String

The id of the shard. If shardId is null, then update all shards offset.

offset

ConnectorOffset

The connector offset.

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

InvalidParameterException

InvalidParameter

InvalidCursor

非法参数。

AuthorizationFailureException

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

ResourceNotFoundException

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在。

说明

进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常

代码示例
public static void updateConnectorOffset(String projectName,String topicName) {
    ConnectorOffset offset = new ConnectorOffset() {{
        setSequence(10);
        setTimestamp(1000);
    }};
    try {
        //更新Connector点位需要先停止Connector
        datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.STOPPED);
        datahubClient.updateConnectorOffset(projectName, topicName, ConnectorType.SINK_ODPS, shardId, offset);
        datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.RUNNING);
        System.out.println("update  connector offset successful");
    }  catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
    }
}

列出Connector

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

InvalidParameterException

InvalidParameter

InvalidCursor

非法参数。

AuthorizationFailureException

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

ResourceNotFoundException

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在。

说明

进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常

代码示例

 public static void listConnector(String projectName,String topicName) {
  try {
       ListConnectorResult listConnectorResult = datahubClient.listConnector(projectName, topicName);
      for (String cName : listConnectorResult.getConnectorNames()) {
          System.out.println(cName);
      }
  } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
   }
 }

查询 Connector Shard 状态

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

connectorType

ConnectorType

The type of connector.

shardId

String

The id of the shard.

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

InvalidParameterException

InvalidParameter

InvalidCursor

非法参数。

AuthorizationFailureException

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

ResourceNotFoundException

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在。

说明

进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常

代码示例

public static void getConnectorShardStatusByShard(String projectName,String topicName,String shardId) {
    try {
        ConnectorShardStatusEntry connectorShardStatusEntry = datahubClient.getConnectorShardStatus(projectName, topicName, ConnectorType.SINK_ODPS, shardId);
        System.out.println(connectorShardStatusEntry.getState() + "\t"
                + connectorShardStatusEntry.getCurrSequence() + "\t"
                + connectorShardStatusEntry.getDiscardCount() + "\t"
                + connectorShardStatusEntry.getUpdateTime());
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
    }
}
public static void getConnectorShardStatus(String projectName,String topicName) {
    try {
        GetConnectorShardStatusResult getConnectorShardStatusResult = datahubClient.getConnectorShardStatus(projectName, topicName, ConnectorType.SINK_ODPS);
        for (Map.Entry<String, ConnectorShardStatusEntry> entry : getConnectorShardStatusResult.getStatusEntryMap().entrySet()) {
            System.out.println(entry.getKey() + " : " + entry.getValue().getState() + "\t"
                    + entry.getValue().getCurrSequence() + "\t"
                    + entry.getValue().getDiscardCount() + "\t"
                    + entry.getValue().getUpdateTime());
        }
    } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());
    }
}

重启 Connector

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

connectorType

ConnectorType

The type of connector.

shardId

String

The id of the shard.

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

InvalidParameterException

InvalidParameter

InvalidCursor

非法参数。

AuthorizationFailureException

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

ResourceNotFoundException

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在。

说明

进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常

代码示例

public static void reloadConnector(String projectName,String topicName ) {
    try {
        datahubClient.reloadConnector(projectName, topicName, ConnectorType.SINK_ODPS);
        System.out.println("reload connector successful");
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());      
    }
}
public static void reloadConnectorByShard(String projectName,String topicName,String shardId) {
    try {
        datahubClient.reloadConnector(projectName, topicName, ConnectorType.SINK_ODPS, shardId);
        System.out.println("reload connector successful");
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}

查询 Connector 完成时间

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

connectorType

ConnectorType

The type of connector.

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

InvalidParameterException

InvalidParameter

InvalidCursor

非法参数。

AuthorizationFailureException

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

ResourceNotFoundException

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在。

说明

进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常

代码示例

public static void getDoneTime(String projectName,String topicName ) {
    try {
        GetConnectorDoneTimeResult getConnectorDoneTimeResult = datahubClient.getConnectorDoneTime(projectName, topicName, ConnectorType.SINK_ODPS);
        System.out.println(getConnectorDoneTimeResult.getDoneTime());
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}

更新VPC白名单

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

vpcids

String

The vpcIds to modify.

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

InvalidParameterException

InvalidParameter

InvalidCursor

非法参数。

AuthorizationFailureException

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

ResourceNotFoundException

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在。

说明

进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常

代码示例

public static void updateProjectVpcWhitelist(String projectName) {
    String vpcid = "12345";
    try {
        datahubClient.updateProjectVpcWhitelist(projectName, vpcid);
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
    }
}

Connector添加新的Field

可以给Connector添加新的field,但是需要MaxCompute表中存在和datahub中对应的列。

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

String

Topic名称。

connectorType

ConnectorType

The type of connector.

fieldName

String

The field to append. Field value must allow null.

异常说明

异常类名

错误码

异常说明

DatahubClientException

-

并且是所有异常的基类

InvalidParameterException

InvalidParameter

InvalidCursor

非法参数。

AuthorizationFailureException

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

ResourceNotFoundException

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在。

说明

进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常

代码示例

public static void appendConnectorField(String projectName,String topicName) {
    String newField = "newfield";
    try {
        //要求topic的schema和MaxCompute的table中都存在列newfield,并且表结构完全一致
        datahubClient.appendConnectorField(projectName, topicName, ConnectorType.SINK_ODPS, newField);
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}

/