数据总线DataHub
本文为您介绍如何使用数据总线DataHub连接器。
背景信息
阿里云流数据处理平台DataHub是流式数据(Streaming Data)的处理平台,提供对流式数据的发布(Publish)、订阅(Subscribe)和分发功能,让您可以轻松构建基于流式数据的分析和应用,详情请参见产品概述。
DataHub连接器支持的信息如下。
类别 | 详情 |
支持类型 | 结果表和源表 |
运行模式 | 流模式和批模式 |
数据格式 | 暂不适用 |
特有监控指标 | 暂无 |
API种类 | Datastream和SQL |
是否支持更新或删除结果表数据 | 不支持更新和删除结果表数据,只支持插入数据。 |
语法结构
CREATE TEMPORARY TABLE datahub_input (
`time` BIGINT,
`sequence` STRING METADATA VIRTUAL,
`shard-id` BIGINT METADATA VIRTUAL,
`system-time` TIMESTAMP METADATA VIRTUAL
) WITH (
'connector' = 'datahub',
'subId' = '<yourSubId>',
'endPoint' = '<yourEndPoint>',
'project' = '<yourProjectName>',
'topic' = '<yourTopicName>',
'accessId' = '<yourAccessId>',
'accessKey' = '<yourAccessKey>'
);
WITH参数
通用
参数
说明
数据类型
是否必填
默认值
备注
connector
表类型。
String
是
无
固定值为datahub。
endPoint
消费端点信息。
String
是
无
不同地域DataHub有不同的EndPoint,具体请参见详情请参见域名列表。
project
项目。
String
是
无
创建project详情请参见快速入门。
topic
主题。
String
是
无
创建topic详情请参见快速入门。
accessId
阿里云账号的AccessKey ID。
String
是
无
获取方法请参见查看AccessKey ID。
accessKey
阿里云账号的AccessKey Secret。
String
是
无
获取方法请参见查看AccessSecret。
retryTimeout
最大持续重试时间。
Integer
否
1800000
单位毫秒,通常不作修改。
retryInterval
重试间隔。
Integer
否
1000
单位毫秒,通常不作修改。
enableSchemaRegistry
是否打开Schema注册。
Boolean
否
false
您需要设置为true。
runMode
运行在弹内还是公有云。
String
否
public
您需要设置为inner。
CompressType
读写的压缩策略。
String
否
lz4
lz4 (默认值):使用lz4压缩。
deflate:使用deflate压缩。
""(空字符串):表示关闭数据压缩。
说明仅VVR 6.0.5及以上版本支持指定CompressType参数。
源表独有
参数
说明
数据类型
是否必填
默认值
备注
subId
订阅ID。
String
是
无
如何创建DataHub订阅,详情请参见创建订阅。
maxFetchSize
单次读取条数。
Integer
否
50
影响读性能的参数,调大可以增加吞吐。
maxBufferSize
异步读取的最大缓存数据条数。
Integer
否
50
影响读性能的参数,调大可以参加吞吐。
fetchLatestDelay
数据源没有数据时,sleep的时间。
Integer
否
500
单位毫秒。在数据源频繁没有数据的情况下,影响吞吐,建议调小。
lengthCheck
单行字段条数检查策略。
String
否
NONE
NONE(默认值):
解析出的字段数大于定义字段数时,按从左到右的顺序,取定义字段数量的数据。
解析出的字段数小于定义字段数时,跳过该行数据。
SKIP:解析出的字段数和定义字段数不同时跳过该行数据。
EXCEPTION:解析出的字段数和定义字段数不同时提示异常。
PAD:按从左到右顺序填充。
解析出的字段数大于定义字段数时,按从左到右的顺序,取定义字段数量的数据。
解析出的字段数小于定义字段数时,按从左到右的顺序,在行尾用Null填充缺少的字段。
columnErrorDebug
是否打开调试开关。
Boolean
否
false
false(默认值):关闭调试功能。
true:打开调试开关,打印解析异常的日志。
结果表独有
参数
说明
数据类型
是否必填
默认值
备注
batchCount
每次批量写入数据的数量。
Integer
否
500
影响写性能,调大可以增加吞吐,但是会增大延迟。
batchSize
每次批量写入数据的大小。
Integer
否
512000 Byte
影响写性能,调大可以增加吞吐,但是会增大延迟。
flushInterval
攒批写入数据的时间。
Integer
否
5000
单位毫秒,影响写性能,调大可以增加吞吐,但是增大延迟。
hashFields
指定列名后,相同列的值会写入到同一个Shard。
String
否
null,即随机写
可以指定多个列值,用逗号(,)分割,例如
hashFields=a,b
。timeZone
数据的时区。
String
否
无
影响TimeStamp等带时区数据的转换。
schemaVersion
向注册的Schema里写入的version。
Integer
否
-1
您需要指定该参数。
类型映射
Flink字段类型 | DataHub字段类型 |
TINYINT | TINYINT |
BOOLEAN | BOOLEAN |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL | DECIMAL |
BIGINT | TIMESTAMP |
VARCHAR | STRING |
SMALLINT | SMALLINT |
VARBINARY | BLOB |
属性字段
字段名 | 字段类型 | 说明 |
shard-id | BIGINT METADATA VIRTUAL | Shard的ID。 |
sequence | STRING METADATA VIRTUAL | 数据顺序。 |
system-time | TIMESTAMP METADATA VIRTUAL | 系统时间。 |
仅在VVR 3.0.1及以上版本支持获取以上DataHub属性字段。
使用示例
源表
CREATE TEMPORARY TABLE datahub_input ( `time` BIGINT, `sequence` STRING METADATA VIRTUAL, `shard-id` BIGINT METADATA VIRTUAL, `system-time` TIMESTAMP METADATA VIRTUAL ) WITH ( 'connector' = 'datahub', 'subId' = '<yourSubId>', 'endPoint' = '<yourEndPoint>', 'project' = '<yourProjectName>', 'topic' = '<yourTopicName>', 'accessId' = '<yourAccessId>', 'accessKey' = '<yourAccessKey>' ); CREATE TEMPORARY TABLE test_out ( `time` BIGINT, `sequence` STRING, `shard-id` BIGINT, `system-time` TIMESTAMP ) WITH ( 'connector' = 'print', 'logger' = 'true' ); INSERT INTO test_out SELECT `time`, `sequence` , `shard-id`, `system-time` FROM datahub_input;
结果表
CREATE TEMPORARY table datahub_source( name VARCHAR ) WITH ( 'connector'='datahub', 'endPoint'='<endPoint>', 'project'='<yourProjectName>', 'topic'='<yourTopicName>', 'subId'='<yourSubId>', 'accessId'='<yourAccessId>', 'accessKey'='<yourAccessKey>', 'startTime'='2018-06-01 00:00:00' ); CREATE TEMPORARY table datahub_sink( name varchar ) WITH ( 'connector'='datahub', 'endPoint'='<endPoint>', 'project'='<yourProjectName>', 'topic'='<yourTopicName>', 'accessId'='<yourAccessId>', 'accessKey'='<yourAccessKey>', 'batchSize'='512000', 'batchCount'='500' ); INSERT INTO datahub_sink SELECT LOWER(name) from datahub_source;
Datastream API
通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink全托管,DataStream连接器设置方法请参见DataStream连接器使用方法。Maven中央库中已经放置了DataHub DataStream连接器。
DataHub源表
VVR提供了SourceFunction的实现类DatahubSourceFunction来读取DataHub表数据。以下为读取DataHub表数据的示例。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //DataHub连接配置。 DatahubSourceFunction datahubSource = new DatahubSourceFunction( <yourEndPoint>, <yourProjectName>, <yourTopicName>, <yourSubId>, <yourAccessId>, <yourAccessKey>, "public", <enableSchemaRegistry>, // 是否开启schemaRegistry,一般填false即可。 <yourStartTime>, <yourEndTime> ); datahubSource.setRequestTimeout(30 * 1000); datahubSource.enableExitAfterReadFinished(); env.addSource(datahubSource) .map((MapFunction<RecordEntry, Tuple2<String, Long>>) this::getStringLongTuple2) .print(); env.execute(); private Tuple2<String, Long> getStringLongTuple2(RecordEntry recordEntry) { Tuple2<String, Long> tuple2 = new Tuple2<>(); TupleRecordData recordData = (TupleRecordData) (recordEntry.getRecordData()); tuple2.f0 = (String) recordData.getField(0); tuple2.f1 = (Long) recordData.getField(1); return tuple2; }
DataHub结果表
VVR提供了OutputFormatSinkFunction的实现类DatahubSinkFunction将数据写入DataHub。以下为将数据写入DataHub的示例。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //DataHub连接配置。 env.generateSequence(0, 100) .map((MapFunction<Long, RecordEntry>) aLong -> getRecordEntry(aLong, "default:")) .addSink( new DatahubSinkFunction<>( <yourEndPoint>, <yourProjectName>, <yourTopicName>, <yourSubId>, <yourAccessId>, <yourAccessKey>, "public", <enableSchemaRegistry>, // 是否开启schemaRegistry,一般填false即可。 <schemaVersion> // 如果开启了schemaRegistry,写入的时候需要指定schemaVersion,其他情况填0即可。 ); env.execute(); private RecordEntry getRecordEntry(Long message, String s) { RecordSchema recordSchema = new RecordSchema(); recordSchema.addField(new Field("f1", FieldType.STRING)); recordSchema.addField(new Field("f2", FieldType.BIGINT)); recordSchema.addField(new Field("f3", FieldType.DOUBLE)); recordSchema.addField(new Field("f4", FieldType.BOOLEAN)); recordSchema.addField(new Field("f5", FieldType.TIMESTAMP)); recordSchema.addField(new Field("f6", FieldType.DECIMAL)); RecordEntry recordEntry = new RecordEntry(); TupleRecordData recordData = new TupleRecordData(recordSchema); recordData.setField(0, s + message); recordData.setField(1, message); recordEntry.setRecordData(recordData); return recordEntry; }