全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 智能硬件
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 更多
流计算

创建数据总线源表

更新时间:2018-05-17 10:05:06

什么是数据总线(DATAHUB)

DataHub作为一个流式数据总线,为阿里云数加平台提供了大数据的入口服务。结合阿里云众多云产品,可以构建一站式的数据处理平台。流计算通常使用DataHub作为流式数据存储头和输出目的端。同时,上游众多流式数据,包括DTS、IOT等均选择DataHub作为大数据平台的数据入口。DataHub本身是流数据存储,流计算只能将其作为流式数据输入。示例如下:

  1. create table datahub_stream(
  2. name varchar,
  3. age BIGINT,
  4. birthday BIGINT
  5. ) with (
  6. type='datahub',
  7. endPoint='http://dXXXXXXXX.com',
  8. project='blink_datahub_test',
  9. topic='test_topic_1',
  10. accessId='0i70XXXXXXXXs',
  11. accessKey='yF60EwXXXXXXXXXnvQPJ2zhCfHU',
  12. startTime='2017-07-21 00:00:00'
  13. );

属性字段

BlinkSQL支持获取datahub的属性字段,每条记录入datahub的系统时间。

如图所示:12421

字段名 注释说明
timestamp 每条记录入datahub的systemtime

示例

通过 HEADER 关键字获取属性字段(例如:想取每条记录入datahub的systemtime,属性字段并不存在于DATAHUB的字段声明里。可以用timestamp当做字段名然后在后面加上HEADER就可以取出想要的属性值)

测试数据

name(VARCHAT) MsgID(VARCHAT)
ens_altar_flow ems0a

测试案例

  1. CREATE TABLE datahub_log (
  2. `timestamp` varchar HEADER,
  3. result varchar
  4. MsgID varchar
  5. )
  6. WITH
  7. (
  8. type ='datahub'
  9. );
  10. CREATE TABLE RDS_out (
  11. `timestamp` varchar,
  12. MsgID varchar,
  13. Version varchar
  14. )
  15. WITH
  16. (
  17. type ='RDS'
  18. );
  19. INSERT INTO RDS_out
  20. SELECT
  21. `timestamp`,
  22. result,
  23. MsgID
  24. FROM
  25. datahub_log;

测试结果

TIME(VARCHAT) MsgID(VARCHAT) Version(VARCHAT)
1522652455625 ems0a 0.0.1

```

WITH参数

目前只支持tuple模式的topic

参数 注释说明 备注
endPoint 消费端点信息 DATAHUB的Endpoint地址
accessId 读取的accessId
accessKey 读取的密钥
project 读取的项目
topic project下的具体的topic
startTime 启动位点的时间 格式为”yyyy-MM-dd hh:mm:ss”
maxRetryTimes 读取最大尝试次数 可选,默认为20
retryIntervalMs 重试间隔 可选,默认为1000
batchReadSize 单次读取条数 可选,默认为10
lengthCheck 单行字段条数检查策略 可选,默认为SKIP,其它可选值为EXCEPTION,PAD,“SKIP” 字段数目不符合时跳过 ,“EXCEPTION”:字段数目不符合时抛出异常,“PAD”:按顺序填充,不存在的置为null
columnErrorDebug 是否打开调试开关,如果打开,会把解析异常的log打印出来 可选,默认为false

类型映射

DataHub和流计算字段类型对应关系,强烈建议用户使用该对应关系进行DDL声明:

DataHub字段类型 流计算字段类型
BIGINT BIGINT
STRING VARCHAR
DOUBLE DOUBLE
TIMESTAMP BIGINT
BOOLEAN BOOLEAN
DECIMAL DECIMAL

注意:

DataHub的TIMESTAMP是精确到微妙级别的,在Unix时间戳里是16位的。而流计算定义的TIMESTAMP是精确到毫秒级别的,在Unix时间戳里是13位的所以建议大家使用BIGINT来映射。如果一定是要用TIMESTAMP建议使用计算列来做转换。

本文导读目录