全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 钉钉智能硬件
流计算

物理执行图

更新时间:2017-06-07 13:26:11

不同于业务数据拓扑表现的业务上下游拓扑信息,系统拓扑反映的是底层计算节点拓扑关系图,表现经过流计算SQL引擎处理后的底层流式Task处理关系图。

组件分类

目前流计算对于底层运行节点(Task)进行分类,不同的Task属于不同类型(负责完成同一个计算逻辑的多个并发Task称之为组件),目前主要分为四类:

组件分类图

  • Source

    读取流式数据存储节点,类似Storm的Spout,主要负责向源头请求数据,并提交给下游计算处理节点。

  • Map

    类似Hadoop/ODPS的Map节点,主要负责处理数据Map阶段操作,并根据数据分区(Shuffle)提交给下游Reduce节点。

  • Reduce

    类似Hadoop/ODPS的Reduce节点,主要负责数据Reduce阶段操作,从上游Map收集数据进行计算,并提交给下游Merge节点。

  • Merge

    阿里云流计算StreamSQL为增量计算专门设计的增量计算节点,主要负责将状态进行增量计算并进行持久化操作的节点。

组件拓扑

组件拓扑关系反映的是底层不同组件上下游数据流依赖关系,类似Hadoop/ODPS的MapReduce模型,组件拓扑同样反映出StreamSQL经过引擎处理后底层运行的物理节点实际分布图。如下图:

系统拓扑

  • 组件图上,展示不同组件的名称(节点上半部分)和类型(节点下半部分)。

  • 点击特定组件,流计算将在组件拓扑右侧提供组件详情信息,包括:

    • 指标详情,提供该组件的运行指标详情,包括名称、类型等等。(详情参看下文的表格信息)

    • Task流量分布,展示组件下不同Task流量分布情况,以辅助用户判断数据倾斜问题

  • 点击组件与组件之间的连线,流计算将显示最近十分钟组件之间的流量,这个信息非常有助于我们系统定位数据缺失问题,即数据在某个组件的处理后数据全被过滤或者JOIN全部失败情况。这个功能非常有用!

针对不同的组件类型,流计算提供不同的指标详情,分别为:

  • Source类型
名称 解释
名称 组件名称
类型 组件类型,Source
并发量 组件下属Task并发个数
启动时延 一批数据到来后,Source节点开始处理的时间
读取耗时 从源头读取一批数据平均耗时
处理耗时 一批源头数据解析、处理的时间
读取速率 单位为RPS,反映Source节点读取速度
读取流量 单位为Bytes/s,反映Source节点读取流速
  • Map类型
名称 解释
名称 组件名称
类型 组件类型,Mapper
并发量 组件下属Task并发个数
启动时延 一批数据到来后,Mapper节点开始处理的时间
处理耗时 一批数据进行Map处理的时间
  • Reduce类型
名称 解释
名称 组件名称
类型 组件类型,Reducer
并发量 组件下属Task并发个数
启动时延 一批数据到来后,Mapper节点开始处理的时间
收集时延 启动后,从上游收集齐数据,直到收集齐数据的时延
处理耗时 一批数据进行Reduce处理的时间

注,通常流计算会对Reduce阶段和Merge阶段进行优化,合并为一个节点操作。

  • Merge类型
名称 解释
名称 组件名称
类型 组件类型,Merger
并发量 组件下属Task并发个数
启动时延 一批数据到来后,Mapper节点开始处理的时间
收集时延 启动后,从上游收集齐数据,直到收集齐数据的时延
处理耗时 一批数据进行Merge的时间
状态持久化耗时 一批数据进行Reduce阶段后,需要进行增量计算和状态持久化操作耗时

注,通常流计算会对Reduce阶段和Merge阶段进行优化,合并为一个节点操作。

关键路径

为方便用户定位作业运行瓶颈,流计算专门提供了关键路径分析,协助用户追踪作业时延最大的关键路径。用法如下:

  1. 选择一个需要进行关键路径定位的作业,点击”系统拓扑”,选择一个叶子节点(所谓叶子节点就是没有下游的节点,通常都是输出节点)。

  2. 点击右下角的追踪关键路径,流计算系统产出追踪出的关键路径信息,并以时间线维度展示。通过观察时间线图,用户即可查看造成该计算最大时延的关键路径。

如下图:

关键路径

  • 关键路径追踪算法是根据用户指定的输出叶子节点,追踪系统拓扑中每层最大耗时节点,并以时间线维度展现

  • 时间线图从顶至下,是这批数据从最开始的Source节点到最末端的Output节点(通常是Mapper或者Merger节点)处理Timeline结构图。横坐标单位为ms,表示各个节点的各个阶段耗时分布;纵坐标是组件名称。

  • 横坐标起点是一批数据开始进入流计算系统后,流计算作业各个组件开始计时,到这个组件收集上游数据,直至最终这个组件处理完毕这批数据,该矩形框结束。因此,对于不同类型的组件存在不同处理步骤,基本上都是:

    1. 等待上游数据处理完并开始发送到该节点(启动时延),使用蓝色表示;

    2. 读取数据/收集数据的时间,使用绿色或者黄色表示;

    3. 实际计算处理的耗时,使用紫色表示。 一般用户实际业务需要关心的是处理耗时,这部分执行的是用户实际业务SQL代码。

  • 用户找到占比最大的耗时,通常是一些Join、GroupBy操作,定位出具体”计算慢”的原因,可以进行下一步调优工作。

SQL和组件对应关系

SQL经过流计算系统编译为底层执行的组件进行物理执行,但通常流计算会对SQL进行深度的优化编译,导致最终的组件和业务SQL难以存在一对一的映射关系。用户时常需要根据组件寻找如何修改SQL,用以解决SQL业务问题。此时,流计算提供了强大的映射关系图,方便用户寻找组件和SQL对应关系。

点击任意一个组件图,点击下属的对应关系按钮,查看对应关系:

对应关系

弹出的文本框是JSON内容,里面记录的是当前组件下运行了哪几类Operator,每个Operator从语义上可以对应出具体的SQL含义,目前有几类常见的Operator如下:

  • ProducerOperator: 源头数据读取的Operator

  • OutputOperator: 目的端数据写出的Operator

  • MapperOperator: 执行Map操作的Operator

  • GroupingOperator: 执行GroupBy操作的Operator

  • MergerOperator: 执行Merger操作的Operator

  • ReducerOperator: 执行Reducer操作的Operator

以一个具体的实例讲解如何查看SQL和组件对应关系,SQL如下:

  1. CREATE STREAM TABLE stream_sls (
  2. `create` STRING
  3. );
  4. CREATE RESULT TABLE stream_result (
  5. id STRING,
  6. counter BIGINT
  7. );
  8. REPLACE INTO stream_result
  9. SELECT
  10. date_format(`create`, 'yyyy-MM-dd HH:mm:ss.S', 'yyyy-MM-dd HH:mm'),
  11. count(1)
  12. FROM
  13. stream_sls
  14. GROUP BY
  15. '1';

此时生成了的底层组件图为:

底层组件图

对应的JSON内容依次为:

  • MODEL0_stream_sls_source
  1. 代表从内存产生数据的源头Operator
  2. {
  3. "RandomProducerOperator-207": [
  4. "stream_sls"
  5. ]
  6. }
  • MODEL1_mrm_mapper
  1. 代表执行了一系列MAP操作,包括GroupingOperator操作(以下第一个)、获取常量表达式(以下第二个)、date_format格式化操作(以下第三个)、count操作(以下第四个)
  2. {
  3. "GroupingOperator-211": [
  4. "InAdapterProcessor{Schema[GroupingOperator-211; in:g_127,bayes_1020_test_stream_result_0,bayes_1020_test_stream_result_1;out:g_127,bayes_1020_test_stream_result_0,bayes_1020_test_stream_result_1}"
  5. ],
  6. "MapperOperator-208": [
  7. "InAdapterProcessor{Schema[MapperOperator-208; in:bayes_1020_test_stream_sls_0;out:bayes_1020_test_stream_sls_0,g_127}",
  8. "EvaluatorProcessor{ConstantValueEvaluator['1']}"
  9. ],
  10. "MapperOperator-209": [
  11. "InAdapterProcessor{Schema[MapperOperator-209; in:bayes_1020_test_stream_sls_0,g_127;out:bayes_1020_test_stream_sls_0,g_127,bayes_1020_test_stream_result_0}",
  12. "EvaluatorProcessor{GenericUDFEvaluator[date_format(bayes_1020_test_stream_sls_0, 'yyyy-MM-dd HH:mm:ss.S', 'yyyy-MM-dd HH:mm')]}"
  13. ],
  14. "MapperOperator-210": [
  15. "InAdapterProcessor{Schema[MapperOperator-210; in:g_127,bayes_1020_test_stream_result_0;out:g_127,bayes_1020_test_stream_result_0,bayes_1020_test_stream_result_1}",
  16. "EvaluatorProcessor{GenericUDAFMapEvaluator[count(1)]}"
  17. ]
  18. }
  • MODEL1_mrm_reducer_merger
  1. 代表一系列reducermerge操作,最为重要的是第二个Count操作。
  2. {
  3. "MergerOperator-213": [
  4. "InAdapterProcessor{Schema[MapperOperator-208; in:bayes_1020_test_stream_sls_0;out:bayes_1020_test_stream_sls_0,g_127}",
  5. "EvaluatorProcessor{ConstantValueEvaluator['1']}"
  6. ],
  7. "ReducerOperator-212": [
  8. "InAdapterProcessor{Schema[ReducerOperator-212; in:bayes_1020_test_stream_result_0,bayes_1020_test_stream_result_1;out:bayes_1020_test_stream_result_0,bayes_1020_test_stream_result_1}",
  9. "EvaluatorProcessor{GenericUDAFReduceEvaluator[count(bayes_1020_test_stream_result_1)]}"
  10. ]
  11. }
  • MODEL2_stream_result_mapper
  1. 代表输出的Map节点,注意该节点实际上并没有实际上输出,为下游实际输出的Reduce节点发送数据。
  2. {
  3. "OutputOperator": [
  4. "stream_result"
  5. ]
  6. }
  • MODEL2_stream_result_reducer
  1. 代表输出的Reduce节点,实际负责对外写数据,即将流计算数据写出到外部存储。
  2. {
  3. "OutputOperator": [
  4. "stream_result"
  5. ]
  6. }
本文导读目录