全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 更多
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 智能硬件
流计算

数据源表概念

更新时间:2018-03-28 15:43:33

流计算的源表是指流式数据存储,流式数据存储驱动流计算的运行。因此,每个流计算子作业必须提供至少一个流式数据存储!

语法:

  1. CREATE TABLE tableName
  2. (columnName dataType [, columnName dataType ]*)
  3. [ WITH (propertyName=propertyValue [, propertyName=propertyValue ]*) ];

示例:

  1. create table datahub_stream(
  2. name varchar,
  3. age BIGINT,
  4. birthday BIGINT
  5. ) with (
  6. type='datahub',
  7. endPoint='http://dh-et2.aliyun-inc.com',
  8. project='blink_xxx',
  9. topic='test_xxx',
  10. accessId='0i70Rxxxxx',
  11. accessKey='yF60EwUxxxx',
  12. startTime='2017-07-21 00:00:00'
  13. );

Watermark 定义

Watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性,watermark的定义是source表DDL定义的一部分。Blink提供了如下语法定义 watermark:

  1. WATERMARK [watermarkName] FOR <rowtime_field> AS withOffset(<rowtime_field>, offset)
  • watermarkName 标识了这个 watermark 的名字,可选。
  • <rowtime_field> 必须是表中已定义的一列(当前仅支持为Timestamp类型),含义是基于该列生成 watermark,并且标识该列为 Event Time 列,可以在后续 query 中用来定义窗口。
  • withOffset 是目前提供的watermark的生成策略,是根据<rowtime_field> - offset生成watermark的值。withOffset的第一个参数必须是<rowtime_field>
  • offset 单位为毫秒,含义为watermark值与event time值的偏移量。

通常一条记录中的某个字段就代表了该记录的发生时间,比如表中有个 rowtime 字段,类型为Timestamp,其中某一值为1501750584000(2017-08-03 08:56:24.000)。那么如果用户需要定义一个基于该rowtime列的watermark,watermark策略为偏移4秒,需要如下定义。

  1. WATERMARK FOR rowtime AS withOffset(rowtime, 4000)

那么,这条数据的watermark时间就是 1501750584000 - 4000 = 1501750580000(2017-08-03 08:56:20.000)。这条数据的watermark时间是什么含义呢?即:timestamp小于1501750580000(2017-08-03 08:56:20.000)的数据,都已经到达了。

计算列

计算列是虚拟列,并非实际存储在表中。 计算列的表达式可以使用其他列中的数据来计算其所属列的值,可以使用表达式、内置函数、或是自定义函数,灵活度与 SELECT 中的表达式一样。计算列在 Blink 中可以像普通字段一样被使用。

用途:目前 watermark 的 rowtime 列,只支持 Timestamp 类型(未来会支持 Long 类型),watermark 只能定义在源表 DDL 中,如果用户源表中没有 Timestamp 类型的列,需要从其他类型的字段转换而来,可以使用计算列来转换。

语法:

  1. <computed_column_definition> ::= column_name AS computed_column_expression

示例:

  1. #如datahub的TIME字段是微妙级别的(16位Unix时间戳),可以用计算列来转换。
  2. CREATE TABLE sls_stream(
  3. a INT,
  4. b BIGINT,
  5. TIME BIGINT,
  6. ts AS TO_TIMESTAMP(TIME/1000),
  7. WATERMARK FOR ts AS withOffset(ts, 1000)
  8. ) with (
  9. type = 'DATAHUB',
  10. ...
  11. );

如上示例中所示,源表数据中的字段 TIME 包含时间信息,但是BIGINT类型。用计算列的功能将字段 TIME 转换成了 Timestamp 类型的 ts 字段,并将 ts 字段作为 watermark 的 rowtime 字段。

本文导读目录