前提条件
已完成准备工作中 DataHub服务开通和Project创建
创建Topic
测试Topic对应Schema为以下内容
字段名称 | 字段类型 |
field1 | STRING |
field2 | SMALLINT |
field3 | BOOLEAN |
field4 | TINYINT |
field5 | INTEGER |
field6 | BIGINT |
field7 | FLOAT |
field8 | DOUBLE |
field9 | TIMESTAMP |
field10 | DECIMAL |
field11 | JSON |
创建Topic操作详情见Topic操作
写入示例
相关依赖
<!-- 零信任凭证相关 -->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>credentials-java</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>datahub-client-library</artifactId>
<version>1.4.11</version>
</dependency>
写入示例(异步写入)
endpoint、projectName、topicName需替换为您使用的相关信息
public static void main(String[] args) throws InterruptedException {
// 通过环境变量获取AK信息
EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();
String endpoint ="https://dh-cn-hangzhou.aliyuncs.com";
String projectName = "test_project";
String topicName = "test_topic";
// 初始化Producer,这里直接使用默认配置
ProducerConfig config = new ProducerConfig(endpoint, provider);
DatahubProducer producer = new DatahubProducer(projectName, topicName, config);
RecordSchema schema = producer.getTopicSchema();
// 如果开启了多version schema,这里也可以获取指定version的schema
// RecordSchema schema = producer.getTopicSchema(3);
// 对于异步写入,可以根据需要来选择是否注册回调函数
WriteCallback callback = new WriteCallback() {
@Override
public void onSuccess(String shardId, List<RecordEntry> records, long elapsedTimeMs, long sendTimeMs) {
System.out.println("write success");
}
@Override
public void onFailure(String shardId, List<RecordEntry> records, long elapsedTimeMs, DatahubClientException e) {
System.out.println("write failed");
}
};
for (int i = 0; i < 10000; ++i) {
try {
// generate data by schema
TupleRecordData data = new TupleRecordData(schema);
data.setField("field1", "hello");
data.setField("field2", 12);
data.setField("field3", Boolean.parseBoolean("true"));
data.setField("field4", 12);
data.setField("field5", 103);
data.setField("field6", 12);
data.setField("field8", 5.0);
data.setField("field9", Long.parseLong("124455677"));
data.setField("field10", BigDecimal.valueOf(Long.valueOf(240134)));
data.setField("field11", " {\n"
+ " \"name\":\"水浒传\",\n"
+ " \"price\":19.9\n"
+ " }");
RecordEntry recordEntry = new RecordEntry();
recordEntry.setRecordData(data);
producer.sendAsync(recordEntry, callback);
// 如果不需要关心数据是否发送成功,那么就不需要注册回调,直接发送
// producer.sendAsync(recordEntry, null);
} catch (DatahubClientException e) {
// TODO 处理异常,一般是不可重试错误或者超过重试次数;
Thread.sleep(1000);
}
}
// 保证退出前,数据全部被发送完
producer.flush(true);
producer.close();
}
DataHub支持多种写入方式,更多方式请参考SDK介绍
查看数据写入情况
写入完成后,可点击Topic页面 shard数据标签查看数据写入shard情况
指标查看
点击Topic页面 Metric统计页签查看写数据相关QPS/RPS/吞吐/网络延迟指标
指标详情说明见指标查看
读取示例
相关依赖
<!-- 零信任凭证相关 -->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>credentials-java</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>datahub-client-library</artifactId>
<version>1.4.11</version>
</dependency>
读取示例
endpoint、projectName、topicName、subId需替换为您正在使用的相关信息
public static void main(String[] args) throws InterruptedException {
// 通过环境变量获取AK信息
EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();
String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
String projectName = "test_project";
String topicName = "test_topic";
String subId = "";
ConsumerConfig config = new ConsumerConfig(endpoint, provider);
DatahubConsumer consumer = new DatahubConsumer(projectName, topicName, subId, config);
while (true) {
RecordEntry recordEntry = null;
try {
recordEntry = consumer.read(5000);
if (recordEntry != null) {
TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
// handle data
System.out.println("read record: " + data.getField("field1") + ", " + data.getField("field2") + ", " +
data.getField("field3") + ", " + data.getField("field4") +
", " + data.getField("field5") + ", " +data.getField("field6") +"," +
data.getField("field7") +data.getField("field8") +","
+ data.getField("field9") +"," +data.getField("field10")); }
} catch (DatahubClientException e) {
// TODO 处理异常,一般是不可重试错误或者超过重试次数;
}
}
}
查看消费点位
点击Topic页面 订阅列表页签
查看各个shard消费进度以及延迟情况
指标查看
点击Topic页面 Metric统计页签查看读数据相关QPS/RPS/吞吐/网络延迟指标
指标详情说明见指标查看
DataHub支持多种读取方式,更多方式请参考SDK介绍
云监控
DataHub目前已经支持云监控报警,你可以通过创建报警规则方式对DataHub相关指标进行监控并报警,目前支持readMetric(读取)、writeMetric(写入)、订阅消费指标监控项 您可以通过云监控对读写相应指标进行报警,详细操作见云监控报警
该文章对您有帮助吗?