本文为您展示DataHub的 Java SDK的Connector操作。
创建Connector
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | String | Topic名称。 |
ConnectorType | ConnectorType | The type of connector which you want create. |
columnFields | List | Which fields you want synchronize. |
sinkStartTime | long | Start time to sink from datahub. Unit: Ms |
config | SinkConfig | Detail config of specified connector type. |
异常说明
异常类名 | 错误码 | 异常说明 |
DatahubClientException | - | 并且是所有异常的基类 |
InvalidParameterException |
| 非法参数。 |
AuthorizationFailureException |
| Authorization 签名解析异常,检查AK是否填写正确。 |
ResourceNotFoundException |
| 访问的资源不存在。 说明 进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 |
代码示例
ODPS示例
public static void createConnector(String projectName,String topicName) {
List<String> columnFields = Arrays.asList("field1", "field2");
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
SinkOdpsConfig config = new SinkOdpsConfig() {{
setEndpoint(Constant.odps_endpoint);
setProject(Constant.odps_project);
setTable(Constant.odps_table);
setAccessId(Constant.odps_accessId);
setAccessKey(Constant.odps_accessKey);
setPartitionMode(PartitionMode.SYSTEM_TIME);
setTimeRange(60);
}};
//设置分区格式
SinkOdpsConfig.PartitionConfig partitionConfig = new SinkOdpsConfig.PartitionConfig() {{
addConfig("ds", "%Y%m%d");
addConfig("hh", "%H");
addConfig("mm", "%M");
}};
config.setPartitionConfig(partitionConfig);
try {
//创建Connector
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_ODPS, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
OSS示例
public static void createOssConnector(String projectName,String topicName) {
List<String> columnFields = Arrays.asList("field1", "field2");
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
SinkOssConfig config = new SinkOssConfig() {{
setAccessId(Constant.oss_accessId);
setAccessKey(Constant.oss_accessKey);
setAuthMode(AuthMode.STS);
setBucket(Constant.oss_bucket);
setEndpoint(Constant.oss_endpoint);
setPrefix(Constant.oss_prefix);
setTimeFormat(Constant.oss_timeFormat);
setTimeRange(60);
}};
try {
//创建Connector
datahubClient.createConnector(projectName,topicName, ConnectorType.SINK_OSS, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
TableStore示例
public static void createOtsConnector(String projectName,String topicName) {
List<String> columnFields = Arrays.asList("field1", "field2");
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
final SinkOtsConfig config = new SinkOtsConfig() {{
setAccessId(Constant.ots_accessId);
setAccessKey(Constant.ots_accessKey);
setEndpoint(Constant.ots_endpoint);
setInstance(Constant.ots_instance);
setTable(Constant.ots_table);
setAuthMode(AuthMode.AK);
}};
try {
//创建Connector
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_OTS, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Hologres示例
public static void createHoloConnector(String projectName,String topicName) {
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
List<String> columnFields = Arrays.asList("field1", "field2");
final SinkHologresConfig config = new SinkHologresConfig() {{
setAccessId(Constant.accessId);
setAccessKey(Constant.accessKey);
setProjectName(Constant.projectName);
setTopicName(Constant.topicName);
setAuthMode(AuthMode.AK);
setInstanceId(Constant.instanceId);
//设置时间格式
setTimestampUnit(TimestampUnit.MILLISECOND);
}};
try {
//创建Connector
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_HOLOGRES, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
ElasticSearch 示例
public static void createEsConnector(String projectName,String topicName){
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
List<String> columnFields = Arrays.asList("field1", "field2");
final SinkEsConfig config = new SinkEsConfig() {{
setEndpoint(Constant.es_endpoint);
setIdFields(Constant.es_fields);
setIndex(Constant.es_index);
setPassword(Constant.es_password);
setProxyMode(Constant.es_proxyMode);
setTypeFields(Constant.es_typeFields);
setUser(Constant.es_user);
}};
try {
//创建Connector
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_ES, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
函数计算示例
public static void createFcConnector(String projectName,String topicName){
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
List<String> columnFields = Arrays.asList("field1", "field2");
final SinkFcConfig config = new SinkFcConfig() {{
setEndpoint(Constant.fc_endpoint);
setAccessId(Constant.fc_accessId);
setAccessKey(Constant.fc_accessKey);
setAuthMode(AuthMode.AK);
setFunction(Constant.fc_function);
setService(Constant.fc_service);
}};
try {
//创建Connector
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_FC, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
MySQL示例
public static void createMysqlConnector(String projectName,String topicName){
List<String> columnFields = Arrays.asList("field1", "field2");
final SinkMysqlConfig config = new SinkMysqlConfig() {{
setDatabase( Constant.mysql_database);
setHost(Constant.mysql_host);
setInsertMode(InsertMode.OVERWRITE);
setPassword(Constant.mysql_password);
setPort(Constant.mysql_port);
setTable(Constant.mysql_table);
setUser(Constant.mysql_user);
}};
try {
//创建Connector
datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_MYSQL, columnFields, config);
System.out.println("create connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
删除Connector
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | String | Topic名称。 |
ConnectorType | ConnectorType | The type of connector which you want create. |
columnFields | List | Which fields you want synchronize. |
sinkStartTime | long | Start time to sink from datahub. Unit: Ms |
config | SinkConfig | Detail config of specified connector type. |
异常说明
异常类名 | 错误码 | 异常说明 |
DatahubClientException | - | 并且是所有异常的基类 |
InvalidParameterException |
| 非法参数。 |
AuthorizationFailureException |
| Authorization 签名解析异常,检查AK是否填写正确。 |
ResourceNotFoundException |
| 访问的资源不存在。 说明 进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 |
代码说明
public static void deleteConnector(String projectName,String topicName) {
try {
datahubClient.deleteConnector(projectName, topicName, ConnectorType.SINK_ODPS);
System.out.println("delete connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
查询Connector
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | String | Topic名称。 |
ConnectorType | ConnectorType | The type of connector which you want create. |
异常说明
异常类名 | 错误码 | 异常说明 |
DatahubClientException | - | 并且是所有异常的基类 |
InvalidParameterException |
| 非法参数。 |
AuthorizationFailureException |
| Authorization 签名解析异常,检查AK是否填写正确。 |
ResourceNotFoundException |
| 访问的资源不存在。 说明 进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 |
代码示例
public static void getConnector(String projectName,String topicName) {
try {
GetConnectorResult getConnectorResult = datahubClient.getConnector(projectName, topicName, ConnectorType.SINK_ODPS);
System.out.println(getConnectorResult.getState() + "\t" + getConnectorResult.getSubId());
for (String fieldName : getConnectorResult.getColumnFields()) {
System.out.println(fieldName);
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
更新Connector
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | String | Topic名称。 |
ConnectorType | ConnectorType | The type of connector which you want create. |
config | SinkConfig | Detail config of specified connector type. |
异常说明
异常类名 | 错误码 | 异常说明 |
DatahubClientException | - | 并且是所有异常的基类 |
InvalidParameterException |
| 非法参数。 |
AuthorizationFailureException |
| Authorization 签名解析异常,检查AK是否填写正确。 |
ResourceNotFoundException |
| 访问的资源不存在。 说明 进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 |
代码示例
public static void updateConnector(String projectName,String topicName) {
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
SinkOdpsConfig config = (SinkOdpsConfig) datahubClient.getConnector(projectName, topicName, ConnectorType.SINK_ODPS).getConfig();
//修改ak
config.setTimeRange(100);
config.setAccessId(accessId);
config.setAccessKey(accessKey);
//修改timestamp类型
config.setTimestampUnit(ConnectorConfig.TimestampUnit.MICROSECOND);
try {
datahubClient.updateConnector(projectName, topicName, ConnectorType.SINK_ODPS, config);
System.out.println("update connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
更多操作
更新Connector Field
更新Connector state
更新 Connector offset
列出Connector
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | String | Topic名称。 |
异常说明
异常类名 | 错误码 | 异常说明 |
DatahubClientException | - | 并且是所有异常的基类 |
InvalidParameterException |
| 非法参数。 |
AuthorizationFailureException |
| Authorization 签名解析异常,检查AK是否填写正确。 |
ResourceNotFoundException |
| 访问的资源不存在。 说明 进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 |
代码示例
public static void listConnector(String projectName,String topicName) {
try {
ListConnectorResult listConnectorResult = datahubClient.listConnector(projectName, topicName);
for (String cName : listConnectorResult.getConnectorNames()) {
System.out.println(cName);
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
查询 Connector Shard 状态
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | String | Topic名称。 |
connectorType | ConnectorType | The type of connector. |
shardId | String | The id of the shard. |
异常说明
异常类名 | 错误码 | 异常说明 |
DatahubClientException | - | 并且是所有异常的基类 |
InvalidParameterException |
| 非法参数。 |
AuthorizationFailureException |
| Authorization 签名解析异常,检查AK是否填写正确。 |
ResourceNotFoundException |
| 访问的资源不存在。 说明 进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 |
代码示例
public static void getConnectorShardStatusByShard(String projectName,String topicName,String shardId) {
try {
ConnectorShardStatusEntry connectorShardStatusEntry = datahubClient.getConnectorShardStatus(projectName, topicName, ConnectorType.SINK_ODPS, shardId);
System.out.println(connectorShardStatusEntry.getState() + "\t"
+ connectorShardStatusEntry.getCurrSequence() + "\t"
+ connectorShardStatusEntry.getDiscardCount() + "\t"
+ connectorShardStatusEntry.getUpdateTime());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
public static void getConnectorShardStatus(String projectName,String topicName) {
try {
GetConnectorShardStatusResult getConnectorShardStatusResult = datahubClient.getConnectorShardStatus(projectName, topicName, ConnectorType.SINK_ODPS);
for (Map.Entry<String, ConnectorShardStatusEntry> entry : getConnectorShardStatusResult.getStatusEntryMap().entrySet()) {
System.out.println(entry.getKey() + " : " + entry.getValue().getState() + "\t"
+ entry.getValue().getCurrSequence() + "\t"
+ entry.getValue().getDiscardCount() + "\t"
+ entry.getValue().getUpdateTime());
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
重启 Connector
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | String | Topic名称。 |
connectorType | ConnectorType | The type of connector. |
shardId | String | The id of the shard. |
异常说明
异常类名 | 错误码 | 异常说明 |
DatahubClientException | - | 并且是所有异常的基类 |
InvalidParameterException |
| 非法参数。 |
AuthorizationFailureException |
| Authorization 签名解析异常,检查AK是否填写正确。 |
ResourceNotFoundException |
| 访问的资源不存在。 说明 进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 |
代码示例
public static void reloadConnector(String projectName,String topicName ) {
try {
datahubClient.reloadConnector(projectName, topicName, ConnectorType.SINK_ODPS);
System.out.println("reload connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
public static void reloadConnectorByShard(String projectName,String topicName,String shardId) {
try {
datahubClient.reloadConnector(projectName, topicName, ConnectorType.SINK_ODPS, shardId);
System.out.println("reload connector successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
查询 Connector 完成时间
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | String | Topic名称。 |
connectorType | ConnectorType | The type of connector. |
异常说明
异常类名 | 错误码 | 异常说明 |
DatahubClientException | - | 并且是所有异常的基类 |
InvalidParameterException |
| 非法参数。 |
AuthorizationFailureException |
| Authorization 签名解析异常,检查AK是否填写正确。 |
ResourceNotFoundException |
| 访问的资源不存在。 说明 进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 |
代码示例
public static void getDoneTime(String projectName,String topicName ) {
try {
GetConnectorDoneTimeResult getConnectorDoneTimeResult = datahubClient.getConnectorDoneTime(projectName, topicName, ConnectorType.SINK_ODPS);
System.out.println(getConnectorDoneTimeResult.getDoneTime());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
更新VPC白名单
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
vpcids | String | The vpcIds to modify. |
异常说明
异常类名 | 错误码 | 异常说明 |
DatahubClientException | - | 并且是所有异常的基类 |
InvalidParameterException |
| 非法参数。 |
AuthorizationFailureException |
| Authorization 签名解析异常,检查AK是否填写正确。 |
ResourceNotFoundException |
| 访问的资源不存在。 说明 进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 |
代码示例
public static void updateProjectVpcWhitelist(String projectName) {
String vpcid = "12345";
try {
datahubClient.updateProjectVpcWhitelist(projectName, vpcid);
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
Connector添加新的Field
可以给Connector添加新的field,但是需要MaxCompute表中存在和datahub中对应的列。
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | String | Topic名称。 |
connectorType | ConnectorType | The type of connector. |
fieldName | String | The field to append. Field value must allow null. |
异常说明
异常类名 | 错误码 | 异常说明 |
DatahubClientException | - | 并且是所有异常的基类 |
InvalidParameterException |
| 非法参数。 |
AuthorizationFailureException |
| Authorization 签名解析异常,检查AK是否填写正确。 |
ResourceNotFoundException |
| 访问的资源不存在。 说明 进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 |
代码示例
public static void appendConnectorField(String projectName,String topicName) {
String newField = "newfield";
try {
//要求topic的schema和MaxCompute的table中都存在列newfield,并且表结构完全一致
datahubClient.appendConnectorField(projectName, topicName, ConnectorType.SINK_ODPS, newField);
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}
/