本文为您介绍数据总线DataHub连接器语法结构、WITH参数和使用示例等。
背景信息
阿里云流数据处理平台DataHub是流式数据(Streaming Data)的处理平台,提供对流式数据的发布(Publish)、订阅(Subscribe)和分发功能,让您可以轻松构建基于流式数据的分析和应用,详情请参见产品概述。
DataHub兼容Kafka协议,因此您可以使用Kafka连接器(不包括Upsert Kafka)来访问DataHub,详情请参见兼容Kafka。
DataHub连接器支持的信息如下。
类别 | 详情 |
支持类型 | 结果表和源表 |
运行模式 | 流模式和批模式 |
数据格式 | 暂不适用 |
特有监控指标 | 暂无 |
API种类 | Datastream和SQL |
是否支持更新或删除目标Topic数据 | 不支持更新和删除目标Topic数据,只支持插入数据。 |
语法结构
CREATE TEMPORARY TABLE datahub_input (
`time` BIGINT,
`sequence` STRING METADATA VIRTUAL,
`shard-id` BIGINT METADATA VIRTUAL,
`system-time` TIMESTAMP METADATA VIRTUAL
) WITH (
'connector' = 'datahub',
'subId' = '<yourSubId>',
'endPoint' = '<yourEndPoint>',
'project' = '<yourProjectName>',
'topic' = '<yourTopicName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}'
);
WITH参数
通用
参数
说明
数据类型
是否必填
默认值
备注
connector
表类型。
String
是
无
固定值为datahub。
endPoint
消费端点信息。
String
是
无
不同地域DataHub有不同的EndPoint,详情请参见域名列表。
project
项目。
String
是
无
创建project详情请参见快速入门。
topic
主题。
String
是
无
创建topic详情请参见快速入门。
说明如果您填写的topic是blob类型(一种无类型的非结构化数据的存储方式),则在Flink消费时,表定义中必须有且只有一个VARBINARY类型的字段。
accessId
阿里云账号的AccessKey ID。
String
是
无
详情请参见如何查看AccessKey ID和AccessKey Secret信息?
重要为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见变量管理。
accessKey
阿里云账号的AccessKey Secret。
String
是
无
详情请参见如何查看AccessKey ID和AccessKey Secret信息?
重要为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey Secret取值,详情请参见变量管理。
retryTimeout
最大持续重试时间。
Integer
否
1800000
单位毫秒,通常不作修改。
retryInterval
重试间隔。
Integer
否
1000
单位毫秒,通常不作修改。
enableSchemaRegistry
是否打开Schema注册。
Boolean
否
false
您需要设置为true。
CompressType
读写的压缩策略。
String
否
lz4
lz4 (默认值):使用lz4压缩。
deflate:使用deflate压缩。
""(空字符串):表示关闭数据压缩。
说明仅VVR 6.0.5及以上版本支持指定CompressType参数。
源表独有
参数
说明
数据类型
是否必填
默认值
备注
subId
订阅ID。
String
是
无
如何创建DataHub订阅,详情请参见创建订阅。
maxFetchSize
单次读取条数。
Integer
否
50
影响读性能的参数,调大可以增加吞吐。
maxBufferSize
异步读取的最大缓存数据条数。
Integer
否
50
影响读性能的参数,调大可以增加吞吐。
fetchLatestDelay
数据源没有数据时,sleep的时间。
Integer
否
500
单位毫秒。在数据源频繁没有数据的情况下,影响吞吐,建议调小。
lengthCheck
单行字段条数检查策略。
String
否
NONE
NONE(默认值):
解析出的字段数大于定义字段数时,按从左到右的顺序,取定义字段数量的数据。
解析出的字段数小于定义字段数时,跳过该行数据。
SKIP:解析出的字段数和定义字段数不同时跳过该行数据。
EXCEPTION:解析出的字段数和定义字段数不同时提示异常。
PAD:按从左到右顺序填充。
解析出的字段数大于定义字段数时,按从左到右的顺序,取定义字段数量的数据。
解析出的字段数小于定义字段数时,按从左到右的顺序,在行尾用Null填充缺少的字段。
columnErrorDebug
是否打开调试开关。
Boolean
否
false
false(默认值):关闭调试功能。
true:打开调试开关,打印解析异常的日志。
startTime
消费日志的开始时间。
String
否
当前时间
格式为yyyy-MM-dd hh:mm:ss。
endTime
消费日志的结束时间。
String
否
无
格式为yyyy-MM-dd hh:mm:ss。
结果表独有
参数
说明
数据类型
是否必填
默认值
备注
batchCount
每次批量写入数据的数量。
Integer
否
500
影响写性能,调大可以增加吞吐,但是会增大延迟。
batchSize
每次批量写入数据的大小。
Integer
否
512000
单位Byte,影响写性能,调大可以增加吞吐,但是会增大延迟。
flushInterval
攒批写入数据的时间。
Integer
否
5000
单位毫秒,影响写性能,调大可以增加吞吐,但是增大延迟。
hashFields
指定列名后,相同列的值会写入到同一个Shard。
String
否
null,即随机写
可以指定多个列值,用逗号(,)分割,例如
hashFields=a,b
。timeZone
数据的时区。
String
否
无
影响TimeStamp等带时区数据的转换。
schemaVersion
向注册的Schema里写入的version。
Integer
否
-1
您需要指定该参数。
类型映射
Flink字段类型 | DataHub字段类型 |
TINYINT | TINYINT |
BOOLEAN | BOOLEAN |
INTEGER | INTEGER |
BIGINT | BIGINT |
BIGINT | TIMESTAMP |
TIMESTAMP | |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL | DECIMAL |
VARCHAR | STRING |
SMALLINT | SMALLINT |
VARBINARY | BLOB |
属性字段
字段名 | 字段类型 | 说明 |
shard-id | BIGINT METADATA VIRTUAL | Shard的ID。 |
sequence | STRING METADATA VIRTUAL | 数据顺序。 |
system-time | TIMESTAMP METADATA VIRTUAL | 系统时间。 |
仅在VVR 3.0.1及以上版本支持获取以上DataHub属性字段。
使用示例
源表
CREATE TEMPORARY TABLE datahub_input ( `time` BIGINT, `sequence` STRING METADATA VIRTUAL, `shard-id` BIGINT METADATA VIRTUAL, `system-time` TIMESTAMP METADATA VIRTUAL ) WITH ( 'connector' = 'datahub', 'subId' = '<yourSubId>', 'endPoint' = '<yourEndPoint>', 'project' = '<yourProjectName>', 'topic' = '<yourTopicName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}' ); CREATE TEMPORARY TABLE test_out ( `time` BIGINT, `sequence` STRING, `shard-id` BIGINT, `system-time` TIMESTAMP ) WITH ( 'connector' = 'print', 'logger' = 'true' ); INSERT INTO test_out SELECT `time`, `sequence` , `shard-id`, `system-time` FROM datahub_input;
结果表
CREATE TEMPORARY table datahub_source( name VARCHAR ) WITH ( 'connector'='datahub', 'endPoint'='<endPoint>', 'project'='<yourProjectName>', 'topic'='<yourTopicName>', 'subId'='<yourSubId>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'startTime'='2018-06-01 00:00:00' ); CREATE TEMPORARY table datahub_sink( name varchar ) WITH ( 'connector'='datahub', 'endPoint'='<endPoint>', 'project'='<yourProjectName>', 'topic'='<yourTopicName>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'batchSize'='512000', 'batchCount'='500' ); INSERT INTO datahub_sink SELECT LOWER(name) from datahub_source;
Datastream API
通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink全托管,DataStream连接器设置方法请参见DataStream连接器使用方法。
DataHub源表
VVR提供了SourceFunction的实现类DatahubSourceFunction来读取DataHub表数据。以下为读取DataHub表数据的示例。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//DataHub连接配置。
DatahubSourceFunction datahubSource =
new DatahubSourceFunction(
<yourEndPoint>,
<yourProjectName>,
<yourTopicName>,
<yourSubId>,
<yourAccessId>,
<yourAccessKey>,
"public",
<enableSchemaRegistry>, // 是否开启schemaRegistry,一般填false即可。
<yourStartTime>,
<yourEndTime>
);
datahubSource.setRequestTimeout(30 * 1000);
datahubSource.enableExitAfterReadFinished();
env.addSource(datahubSource)
.map((MapFunction<RecordEntry, Tuple2<String, Long>>) this::getStringLongTuple2)
.print();
env.execute();
private Tuple2<String, Long> getStringLongTuple2(RecordEntry recordEntry) {
Tuple2<String, Long> tuple2 = new Tuple2<>();
TupleRecordData recordData = (TupleRecordData) (recordEntry.getRecordData());
tuple2.f0 = (String) recordData.getField(0);
tuple2.f1 = (Long) recordData.getField(1);
return tuple2;
}
DataHub结果表
VVR提供了OutputFormatSinkFunction的实现类DatahubSinkFunction将数据写入DataHub。以下为将数据写入DataHub的示例。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//DataHub连接配置。
env.generateSequence(0, 100)
.map((MapFunction<Long, RecordEntry>) aLong -> getRecordEntry(aLong, "default:"))
.addSink(
new DatahubSinkFunction<>(
<yourEndPoint>,
<yourProjectName>,
<yourTopicName>,
<yourSubId>,
<yourAccessId>,
<yourAccessKey>,
"public",
<enableSchemaRegistry>, // 是否开启schemaRegistry,一般填false即可。
<schemaVersion> // 如果开启了schemaRegistry,写入的时候需要指定schemaVersion,其他情况填0即可。
);
env.execute();
private RecordEntry getRecordEntry(Long message, String s) {
RecordSchema recordSchema = new RecordSchema();
recordSchema.addField(new Field("f1", FieldType.STRING));
recordSchema.addField(new Field("f2", FieldType.BIGINT));
recordSchema.addField(new Field("f3", FieldType.DOUBLE));
recordSchema.addField(new Field("f4", FieldType.BOOLEAN));
recordSchema.addField(new Field("f5", FieldType.TIMESTAMP));
recordSchema.addField(new Field("f6", FieldType.DECIMAL));
RecordEntry recordEntry = new RecordEntry();
TupleRecordData recordData = new TupleRecordData(recordSchema);
recordData.setField(0, s + message);
recordData.setField(1, message);
recordEntry.setRecordData(recordData);
return recordEntry;
}
XML
Maven中央库中已经放置了DataHub DataStream连接器。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-datahub</artifactId>
<version>${vvr-version}</version>
</dependency>