数据总线DataHub

本文为您介绍数据总线DataHub连接器语法结构、WITH参数和使用示例等。

背景信息

阿里云流数据处理平台DataHub是流式数据(Streaming Data)的处理平台,提供对流式数据的发布(Publish)、订阅(Subscribe)和分发功能,让您可以轻松构建基于流式数据的分析和应用,详情请参见产品概述

说明

DataHub兼容Kafka协议,因此您可以使用Kafka连接器(不包括Upsert Kafka)来访问DataHub,详情请参见兼容Kafka

DataHub连接器支持的信息如下。

类别

详情

支持类型

结果表和源表

运行模式

流模式和批模式

数据格式

暂不适用

特有监控指标

暂无

API种类

Datastream和SQL

是否支持更新或删除目标Topic数据

不支持更新和删除目标Topic数据,只支持插入数据。

语法结构

CREATE TEMPORARY TABLE datahub_input (
  `time` BIGINT,
  `sequence`  STRING METADATA VIRTUAL,
  `shard-id` BIGINT METADATA VIRTUAL,
  `system-time` TIMESTAMP METADATA VIRTUAL
) WITH (
  'connector' = 'datahub',
  'subId' = '<yourSubId>',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'topic' = '<yourTopicName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}'
);

WITH参数

  • 通用

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    表类型。

    String

    固定值为datahub。

    endPoint

    消费端点信息。

    String

    不同地域DataHub有不同的EndPoint,详情请参见域名列表

    project

    项目。

    String

    创建project详情请参见快速入门

    topic

    主题。

    String

    创建topic详情请参见快速入门

    说明

    如果您填写的topic是blob类型(一种无类型的非结构化数据的存储方式),则在Flink消费时,表定义中必须有且只有一个VARBINARY类型的字段。

    accessId

    阿里云账号的AccessKey ID。

    String

    详情请参见如何查看AccessKey ID和AccessKey Secret信息?

    重要

    为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见变量管理

    accessKey

    阿里云账号的AccessKey Secret。

    String

    详情请参见如何查看AccessKey ID和AccessKey Secret信息?

    重要

    为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey Secret取值,详情请参见变量管理

    retryTimeout

    最大持续重试时间。

    Integer

    1800000

    单位毫秒,通常不作修改。

    retryInterval

    重试间隔。

    Integer

    1000

    单位毫秒,通常不作修改。

    enableSchemaRegistry

    是否打开Schema注册。

    Boolean

    false

    您需要设置为true。

    CompressType

    读写的压缩策略。

    String

    lz4

    • lz4 (默认值):使用lz4压缩。

    • deflate:使用deflate压缩。

    • ""(空字符串):表示关闭数据压缩。

    说明

    仅VVR 6.0.5及以上版本支持指定CompressType参数。

  • 源表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    subId

    订阅ID。

    String

    如何创建DataHub订阅,详情请参见创建订阅

    maxFetchSize

    单次读取条数。

    Integer

    50

    影响读性能的参数,调大可以增加吞吐。

    maxBufferSize

    异步读取的最大缓存数据条数。

    Integer

    50

    影响读性能的参数,调大可以增加吞吐。

    fetchLatestDelay

    数据源没有数据时,sleep的时间。

    Integer

    500

    单位毫秒。在数据源频繁没有数据的情况下,影响吞吐,建议调小。

    lengthCheck

    单行字段条数检查策略。

    String

    NONE

    • NONE(默认值):

      • 解析出的字段数大于定义字段数时,按从左到右的顺序,取定义字段数量的数据。

      • 解析出的字段数小于定义字段数时,跳过该行数据。

    • SKIP:解析出的字段数和定义字段数不同时跳过该行数据。

    • EXCEPTION:解析出的字段数和定义字段数不同时提示异常。

    • PAD:按从左到右顺序填充。

      • 解析出的字段数大于定义字段数时,按从左到右的顺序,取定义字段数量的数据。

      • 解析出的字段数小于定义字段数时,按从左到右的顺序,在行尾用Null填充缺少的字段。

    columnErrorDebug

    是否打开调试开关。

    Boolean

    false

    • false(默认值):关闭调试功能。

    • true:打开调试开关,打印解析异常的日志。

    startTime

    消费日志的开始时间。

    String

    当前时间

    格式为yyyy-MM-dd hh:mm:ss。

    endTime

    消费日志的结束时间。

    String

    格式为yyyy-MM-dd hh:mm:ss。

  • 结果表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    batchCount

    每次批量写入数据的数量。

    Integer

    500

    影响写性能,调大可以增加吞吐,但是会增大延迟。

    batchSize

    每次批量写入数据的大小。

    Integer

    512000

    单位Byte,影响写性能,调大可以增加吞吐,但是会增大延迟。

    flushInterval

    攒批写入数据的时间。

    Integer

    5000

    单位毫秒,影响写性能,调大可以增加吞吐,但是增大延迟。

    hashFields

    指定列名后,相同列的值会写入到同一个Shard。

    String

    null,即随机写

    可以指定多个列值,用逗号(,)分割,例如hashFields=a,b

    timeZone

    数据的时区。

    String

    影响TimeStamp等带时区数据的转换。

    schemaVersion

    向注册的Schema里写入的version。

    Integer

    -1

    您需要指定该参数。

类型映射

Flink字段类型

DataHub字段类型

TINYINT

TINYINT

BOOLEAN

BOOLEAN

INTEGER

INTEGER

BIGINT

BIGINT

BIGINT

TIMESTAMP

TIMESTAMP

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL

DECIMAL

VARCHAR

STRING

SMALLINT

SMALLINT

VARBINARY

BLOB

属性字段

字段名

字段类型

说明

shard-id

BIGINT METADATA VIRTUAL

Shard的ID。

sequence

STRING METADATA VIRTUAL

数据顺序。

system-time

TIMESTAMP METADATA VIRTUAL

系统时间。

说明

仅在VVR 3.0.1及以上版本支持获取以上DataHub属性字段。

使用示例

  • 源表

    CREATE TEMPORARY TABLE datahub_input (
      `time` BIGINT,
      `sequence`  STRING METADATA VIRTUAL,
      `shard-id` BIGINT METADATA VIRTUAL,
      `system-time` TIMESTAMP METADATA VIRTUAL
    ) WITH (
      'connector' = 'datahub',
      'subId' = '<yourSubId>',
      'endPoint' = '<yourEndPoint>',
      'project' = '<yourProjectName>',
      'topic' = '<yourTopicName>',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}'
    );
    
    CREATE TEMPORARY TABLE test_out (
      `time` BIGINT,
      `sequence`  STRING,
      `shard-id` BIGINT,
      `system-time` TIMESTAMP
    ) WITH (
      'connector' = 'print',
      'logger' = 'true'
    );
    
    INSERT INTO test_out
    SELECT
      `time`,
      `sequence` ,
      `shard-id`,
      `system-time`
    FROM datahub_input;
  • 结果表

    CREATE TEMPORARY table datahub_source(
      name VARCHAR
    ) WITH (
      'connector'='datahub',
      'endPoint'='<endPoint>',
      'project'='<yourProjectName>',
      'topic'='<yourTopicName>',
      'subId'='<yourSubId>',
      'accessId'='${secret_values.ak_id}',
      'accessKey'='${secret_values.ak_secret}',
      'startTime'='2018-06-01 00:00:00'
    );
    
    CREATE TEMPORARY table datahub_sink(
      name varchar
    ) WITH (
      'connector'='datahub',
      'endPoint'='<endPoint>',
      'project'='<yourProjectName>',
      'topic'='<yourTopicName>',
      'accessId'='${secret_values.ak_id}',
      'accessKey'='${secret_values.ak_secret}',
      'batchSize'='512000',
      'batchCount'='500'
    );
    
    INSERT INTO datahub_sink
    SELECT
      LOWER(name)
    from datahub_source;

Datastream API

重要

通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink全托管,DataStream连接器设置方法请参见DataStream连接器使用方法

DataHub源表

VVR提供了SourceFunction的实现类DatahubSourceFunction来读取DataHub表数据。以下为读取DataHub表数据的示例。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//DataHub连接配置。
DatahubSourceFunction datahubSource =
    new DatahubSourceFunction(
    <yourEndPoint>,
    <yourProjectName>,
    <yourTopicName>,
    <yourSubId>,
    <yourAccessId>,
    <yourAccessKey>,
    "public",
    <enableSchemaRegistry>, // 是否开启schemaRegistry,一般填false即可。
    <yourStartTime>,
    <yourEndTime>
    );
datahubSource.setRequestTimeout(30 * 1000);
datahubSource.enableExitAfterReadFinished();
env.addSource(datahubSource)
    .map((MapFunction<RecordEntry, Tuple2<String, Long>>) this::getStringLongTuple2)
    .print();
env.execute();
private Tuple2<String, Long> getStringLongTuple2(RecordEntry recordEntry) {
    Tuple2<String, Long> tuple2 = new Tuple2<>();
    TupleRecordData recordData = (TupleRecordData) (recordEntry.getRecordData());
    tuple2.f0 = (String) recordData.getField(0);
    tuple2.f1 = (Long) recordData.getField(1);
    return tuple2;
}

DataHub结果表

VVR提供了OutputFormatSinkFunction的实现类DatahubSinkFunction将数据写入DataHub。以下为将数据写入DataHub的示例。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//DataHub连接配置。
env.generateSequence(0, 100)
    .map((MapFunction<Long, RecordEntry>) aLong -> getRecordEntry(aLong, "default:"))
    .addSink(
    new DatahubSinkFunction<>(
       <yourEndPoint>,
       <yourProjectName>,
       <yourTopicName>,
       <yourSubId>,
       <yourAccessId>,
       <yourAccessKey>,
       "public",
       <enableSchemaRegistry>, // 是否开启schemaRegistry,一般填false即可。
       <schemaVersion> // 如果开启了schemaRegistry,写入的时候需要指定schemaVersion,其他情况填0即可。
       );
env.execute();
private RecordEntry getRecordEntry(Long message, String s) {
    RecordSchema recordSchema = new RecordSchema();
    recordSchema.addField(new Field("f1", FieldType.STRING));
    recordSchema.addField(new Field("f2", FieldType.BIGINT));
    recordSchema.addField(new Field("f3", FieldType.DOUBLE));
    recordSchema.addField(new Field("f4", FieldType.BOOLEAN));
    recordSchema.addField(new Field("f5", FieldType.TIMESTAMP));
    recordSchema.addField(new Field("f6", FieldType.DECIMAL));
    RecordEntry recordEntry = new RecordEntry();
    TupleRecordData recordData = new TupleRecordData(recordSchema);
    recordData.setField(0, s + message);
    recordData.setField(1, message);
    recordEntry.setRecordData(recordData);
    return recordEntry;
}

XML

Maven中央库中已经放置了DataHub DataStream连接器

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-datahub</artifactId>
    <version>${vvr-version}</version>
</dependency>

常见问题