快速入门(读写示例)

前提条件

  1. 已完成准备工作中 DataHub服务开通和Project创建

  2. 创建Topic

测试Topic对应Schema为以下内容

字段名称

字段类型

field1

STRING

field2

SMALLINT

field3

BOOLEAN

field4

TINYINT

field5

INTEGER

field6

BIGINT

field7

FLOAT

field8

DOUBLE

field9

TIMESTAMP

field10

DECIMAL

field11

JSON

创建Topic操作详情见Topic操作

写入示例

相关依赖

<!-- 零信任凭证相关 -->
<dependency>
	<groupId>com.aliyun</groupId>
	<artifactId>credentials-java</artifactId>
	<version>1.0.2</version>
</dependency>

<dependency>
	<groupId>com.aliyun.datahub</groupId>
	<artifactId>datahub-client-library</artifactId>
	<version>1.4.11</version>
</dependency>

写入示例(异步写入)

endpoint、projectName、topicName需替换为您使用的相关信息

public static void main(String[] args) throws InterruptedException {
	// 通过环境变量获取AK信息
	EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();

	String endpoint ="https://dh-cn-hangzhou.aliyuncs.com";
	String projectName = "test_project";
	String topicName = "test_topic";

	// 初始化Producer,这里直接使用默认配置
	ProducerConfig config = new ProducerConfig(endpoint, provider);
	DatahubProducer producer = new DatahubProducer(projectName, topicName, config);

	RecordSchema schema = producer.getTopicSchema();
	// 如果开启了多version schema,这里也可以获取指定version的schema
	// RecordSchema schema = producer.getTopicSchema(3);

	// 对于异步写入,可以根据需要来选择是否注册回调函数
	WriteCallback callback = new WriteCallback() {
		@Override
		public void onSuccess(String shardId, List<RecordEntry> records, long elapsedTimeMs, long sendTimeMs) {
			System.out.println("write success");
		}

		@Override
		public void onFailure(String shardId, List<RecordEntry> records, long elapsedTimeMs, DatahubClientException e) {
			System.out.println("write failed");
		}
	};

	for (int i = 0; i < 10000; ++i) {
		try {
            // generate data by schema
            TupleRecordData data = new TupleRecordData(schema);
    				data.setField("field1", "hello");
    				data.setField("field2", 12);
    				data.setField("field3", Boolean.parseBoolean("true"));
    				data.setField("field4", 12);
    				data.setField("field5", 103);
    				data.setField("field6", 12);
    				data.setField("field8", 5.0);
    				data.setField("field9", Long.parseLong("124455677"));
    				data.setField("field10", BigDecimal.valueOf(Long.valueOf(240134)));
    				data.setField("field11", "    {\n"
						+ "        \"name\":\"水浒传\",\n"
						+ "        \"price\":19.9\n"
						+ "    }");
            RecordEntry recordEntry = new RecordEntry();
            recordEntry.setRecordData(data);

            producer.sendAsync(recordEntry, callback);
            // 如果不需要关心数据是否发送成功,那么就不需要注册回调,直接发送
            // producer.sendAsync(recordEntry, null);
        } catch (DatahubClientException e) {
            // TODO 处理异常,一般是不可重试错误或者超过重试次数;
            Thread.sleep(1000);
        }
	}

	// 保证退出前,数据全部被发送完
	producer.flush(true);
	producer.close();
}

DataHub支持多种写入方式,更多方式请参考SDK介绍

查看数据写入情况

写入完成后,可点击Topic页面 shard数据标签查看数据写入shard情况

image.png

指标查看

点击Topic页面 Metric统计页签查看写数据相关QPS/RPS/吞吐/网络延迟指标

指标详情说明见指标查看

image.png

读取示例

相关依赖

<!-- 零信任凭证相关 -->
<dependency>
	<groupId>com.aliyun</groupId>
	<artifactId>credentials-java</artifactId>
	<version>1.0.2</version>
</dependency>

<dependency>
	<groupId>com.aliyun.datahub</groupId>
	<artifactId>datahub-client-library</artifactId>
	<version>1.4.11</version>
</dependency>

读取示例

endpoint、projectName、topicName、subId需替换为您正在使用的相关信息

public static void main(String[] args) throws InterruptedException {
    // 通过环境变量获取AK信息
    EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();

    String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
    String projectName = "test_project";
    String topicName = "test_topic";
    String subId = "";

    ConsumerConfig config = new ConsumerConfig(endpoint, provider);
    DatahubConsumer consumer = new DatahubConsumer(projectName, topicName, subId, config);

    while (true) {
        RecordEntry recordEntry = null;
        try {
            recordEntry = consumer.read(5000);
            if (recordEntry != null) {
                TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
                // handle data
  					System.out.println("read record: " + data.getField("field1") + ", " + data.getField("field2") + ", " + 
							data.getField("field3") + ", " + data.getField("field4") +
							", " + data.getField("field5") + ", " +data.getField("field6") +"," +
							data.getField("field7") +data.getField("field8") +"," 
							+ data.getField("field9") +"," +data.getField("field10"));            }
        } catch (DatahubClientException e) {
            // TODO 处理异常,一般是不可重试错误或者超过重试次数;
        }
    }
}

查看消费点位

点击Topic页面 订阅列表页签

image.png

查看各个shard消费进度以及延迟情况

image.png

指标查看

点击Topic页面 Metric统计页签查看读数据相关QPS/RPS/吞吐/网络延迟指标

指标详情说明见指标查看

image.png

DataHub支持多种读取方式,更多方式请参考SDK介绍

云监控

DataHub目前已经支持云监控报警,你可以通过创建报警规则方式对DataHub相关指标进行监控并报警,目前支持readMetric(读取)、writeMetric(写入)、订阅消费指标监控项 您可以通过云监控对读写相应指标进行报警,详细操作见云监控报警