EMR Spark功能增强

阿里云E-MapReduce产品构建于阿里云云服务器ECS上,基于开源的Apache Hadoop和Apache Spark,做了大量优化。本文为您介绍E-MapReduce(简称EMR)Spark相对开源增强的功能。

背景信息

阿里云EMR 100%采用社区开源组件,随开源版本升级迭代,基于开源组件,优化和增强阿里云部署环境。

功能增强

Spark针对开源功能增强的功能如下表。

EMR-5.x系列

EMR版本

组件版本

功能增强

EMR-5.17.0

Spark 3.4.2

Spark3升级至3.4.2版本。

EMR-5.16.0

Spark 3.3.1

修复了Commons Text漏洞。

EMR-5.15.1

Spark 3.3.1

移除了hive-site.xml中与jdo相关的配置。

EMR-5.12.1

Spark 3.3.1

  • Spark History Server支持默认使用OSS-HDFS存储。

  • Spark3 Native Engine支持使用OSS和OSS-HDFS存储。

EMR-5.10.0

Spark 3.3.1

升级至3.3.1版本。

EMR-5.9.0

Spark 3.3.0

  • 升级至3.3版本。

  • 支持开启Kerberos身份认证。

EMR-5.8.0

Spark 3.2.1

支持一键对接LDAP。

EMR-5.6.0

Spark 3.2.1

升级至3.2.1版本。

EMR-5.5.0

Spark 3.2.0

  • COUNT DISTINCT函数支持IF语句以及优化Case when的用法。

    设置参数spark.sql.optimizer.rewriteConditionalDistinctAggregates为true。

  • Shuffle Hash Join支持fallback到Sort Merge Join。

    设置参数spark.sql.join.preferSortMergeJoin为false,设置参数spark.sql.join.enableShuffledHashJoinFallback为true。

  • 支持非动态分区自动合并小文件。

    设置spark.sql.adaptive.merge.output.small.files.enabled参数为true。

  • GroupingSet以及Distinct等场景会自动调整并发度。

    设置参数spark.sql.execution.optimizeExpand为true。

  • 优化了Hive on Spark。

  • 支持Time Travel语法。

  • 适配JindoSDK。

EMR-5.4.0

Spark 3.1.2

  • 升级Spark至3.1.2版本。

  • Spark 3.x系列,SparkSQL优化了Distinct计算性能,即当聚合算子中包含多个count(distinct case ... when ...)时会触发优化功能。

  • 修复了AQE在Stats缺失情况下数组越界的问题。

  • 修复了AQE和Cache在特定场景下报错的问题。

EMR-5.3.0

Spark 3.1.1

修复了与Delta Lake兼容性的问题。

EMR-5.2.1

Spark 3.1.1

重要

EMR-5.2.1版本的Spark(3.1.1)与Kudu(1.11.1)不兼容。

  • 支持数据湖格式Delta Lake和Hudi。

  • 支持Remote Shuffle Service。

  • 支持Livy 。

  • 优化E-MapReduce控制台上,Spark服务配置页面的spark-defaults页签的配置项名称。

  • 优化CBO(Cost-Based Optimization)、DPP(DynamicPartitionPruning)以及Z-Order等功能,性能比开源Spark 3版本提升50%。

  • 支持阿里云Log Service、DataHub和消息队列RocketMQ版(简称ONS)等数据源。

EMR-4.x系列

EMR版本

组件版本

功能增强

EMR-4.10.0

Spark 2.4.8

  • 升级至2.4.8版本。

  • 修复了Adaptive Execution部分场景无法生效的问题。

  • 修复了统计聚合函数行为和Hive不一致的问题。

  • 修复了读取Hive ORC表char类型数据正确性问题。

  • 优化了Thriftserver的默认配置。

  • 优化E-MapReduce控制台上,Spark服务配置页面的spark-defaults页签的配置项名称。

  • 优化了Hive on Spark。

  • 修复了AQE在Stats缺失情况下数组越界的问题。

  • 修复了AQE和Cache在特定场景下报错的问题。

  • 移除了无效配置Log4j MetricsAppender。

  • 修复了SparkContext启动过程中空指针异常的问题。

  • 支持ZSTD(Zstandard)压缩格式。

EMR-4.9.0

Spark 2.4.7

  • 修复Adaptive Execution部分场景无法生效的问题。

  • 修复统计聚合函数行为和Hive不一致的问题。

  • 修复读取Hive ORC表char类型数据正确性的问题。

EMR-4.8.0

Spark 2.4.7

  • 优化了部分默认配置。

  • 性能优化:支持Window TopK下推。

  • 增强Hive读写CSV或JSON表的兼容性。

  • ANALYZE语句支持省略全表列名。

  • 支持一键开启或关闭LDAP功能。

  • 改进Spark Beeline工具的易用性。

EMR-4.6.0

Spark 2.4.7

  • 升级至2.4.7版本。

  • 升级jQuery至3.5.1版本。

  • 兼容Hive方式自动更新表和分区大小。

  • 支持Spark元数据和作业运行信息输出至DataWorks。

EMR-4.5.0

Spark 2.4.5

支持数据湖构建(DLF)元数据。

EMR-4.3.0

Spark 2.4.5

  • 升级至2.4.5版本。

  • 升级关联的Delta Lake至0.6.0版本。

  • 修复开启Ranger Hive后,Pyspark无法正常运行的缺陷。

EMR-3.x系列

EMR版本

组件版本

功能增强

EMR-3.51.0

Spark 3.4.2

Spark3升级至3.4.2版本。

EMR-3.50.0

Spark 3.3.1

修复了Commons Text漏洞。

EMR-3.49.0

Spark 3.3.1

移除了hive-site.xml中与jdo相关的配置。

EMR-3.46.1

Spark 3.3.1

  • Spark History Server支持默认使用OSS-HDFS存储。

  • Spark3 Native Engine支持使用OSS和OSS-HDFS存储。

EMR-3.44.0

Spark 3.3.1

升级至3.3.1版本。

EMR-3.43.0

Spark 3.3.0

  • 升级至3.3版本。

  • 支持开启Kerberos身份认证。

EMR-3.40.0

Spark 3.2.1

升级至3.2.1版本。

EMR-3.39.1

Spark 2.4.8

  • 优化了Hive on Spark。

  • 适配JindoSDK。

EMR-3.38.1

Spark 2.4.8

  • 移除了无效配置Log4j MetricsAppender。

  • 修复了SparkContext启动过程中空指针异常的问题。

EMR-3.38.0

Spark 2.4.8

  • 升级Spark至2.4.8版本。

  • 同时支持Spark 2.4.8和Spark 3.1.2。

    说明

    Spark3暂不支持Delta和Remote Shuffle Service。

  • Spark 3.x系列,SparkSQL优化了Distinct计算性能,即当聚合算子中包含多个count(distinct case ... when ...)时会触发优化功能。

  • 修复了AQE在Stats缺失情况下数组越界的问题。

  • 修复了AQE和Cache在特定场景下报错的问题。

EMR-3.37.0

Spark 2.4.7

修复了与Delta Lake兼容性的问题。

EMR-3.36.1

Spark 2.4.7

  • 优化E-MapReduce控制台上,Spark服务配置页面的spark-defaults页签的配置项名称。

  • 优化输出日志性能。

  • 支持ZSTD(Zstandard)压缩格式。

EMR-3.35.0

Spark 2.4.7

  • 修复Adaptive Execution部分场景无法生效的问题。
  • 修复统计聚合函数行为和Hive不一致的问题。
  • 修复读取Hive ORC表char类型数据正确性的问题。

EMR-3.34.0

Spark 2.4.7

  • 优化了部分默认配置。
  • 性能优化:支持Window TopK下推。
  • 增强Hive读写CSV或JSON表的兼容性。
  • ANALYZE语句支持省略全表列名。
  • 支持一键开启或关闭LDAP功能。
  • 改进Spark Beeline工具的易用性。

EMR-3.33.0

Spark 2.4.7

  • 升级至2.4.7版本。
  • 升级jQuery至3.5.1版本。
  • 兼容Hive方式,自动更新表和分区大小。
  • 支持Spark元数据和作业运行信息输出至DataWorks。

EMR-3.32.0

Spark 2.4.5

JindoTable支持打开或关闭数据采集功能。

EMR-3.30.0

Spark 2.4.5

  • 支持阿里云DLF(Data Lake Formation)元数据。
  • 升级HAS依赖至2.0.1。
  • 修复Streaming SQL反引号问题。
  • 移除Delta的JAR包,修改为Delta单独部署。
  • 修改日志路径统一写至HDFS下。

EMR-3.29.0

Spark 2.4.5

  • Spark升级至2.4.5.2.0。
  • 支持第三方Metastore的功能。
  • 增加datalake metastore-client。

EMR-3.28.0

Spark 2.4.5

  • 升级至2.4.5版本。
  • 兼容DataFactory的streaming-sql脚本。
  • 支持Delta 0.6.0版本。

EMR-3.27.0

Spark 2.4.3

  • CUBE中支持日期类型分区字段。
  • 调大Spark-Submit的stack深度。

EMR-3.25.0

Spark 2.4.3

  • 支持在控制台配置spark.sql.extensions等Delta相关参数。
  • 支持Hive读取Delta table,避免set inputformat。
  • 支持ALTER TABLE SET TBLPROPERTIES和UNSET TBLPROPERTIES语句。

EMR-3.24.0

Spark 2.4.3

  • 增加Delta相关参数支持。
  • 增加对Ranger spark plugin配置的支持。
  • JindoCube升级到0.3.0版本。

EMR-3.23.0

Spark 2.4.3

  • 更新spark thriftserver,解决class loader问题。
  • 重构spark事务相关代码,提升稳定性。
  • 解决升builtin hive至2.3版本后orc格式读写问题。
  • 支持merge into语法。
  • 支持scan和stream语法。
  • Structured Streaming Kafka sink支持EOS。
  • delta更新至0.4.0。

EMR-3.22.0

Spark 2.4.3

  • Relational Cache

    支持Relational Cache,Relational Cache通过预计算加速用户查询。用户可以创建Relational Cache对数据进行预计算,在执行用户查询时,Spark Optimizer自动发现合适的Cache,并改写SQL执行计划,基于Cache的数据继续计算,从而提升查询速度,适用于报表、Dashboard、数据同步和多维分析等场景。

    • 通过DDL,进行CACHE、UNCACHE、ALTER、SHOW等操作,Cache的数据支持Spark的所有数据源和数据格式。
    • 支持自动的Cache数据更新以及通过REFRESH命令更新Cache数据,支持基于分区的增量更新。
    • 支持基于Relational Cache的执行计划优化。
  • Streaming SQL
    • 规范Stream Query Writer的参数配置。
    • 优化Kafka数据表Schema兼容性检查。
    • Kafka数据表Schema不存在时自动创建到SchemaRegistry。
    • 优化Kafka Schema不兼容时的日志信息。
    • 修复查询结果写Kafka表时必须显式指定列名的问题。
    • 去掉流式SQL查询只支持Kafka和Loghub数据输入源的限制。
  • Delta

    新增Delta,用户可使用Spark创建Delta datasource,以支持流式数据写入、事务性读写、数据校验和数据回溯等应用场景。详情请参见Delta详细信息

    • 支持使用DataFrame API从Delta读取数据或者写入数据到Delta。
    • 支持使用Stuctured Streaming API以Delta作为source或者sink进行数据的读或写。
    • 支持使用Delta API对数据进行update、delete、merge、vaccum、optimize等操作。
    • 支持使用SQL创建基于Delta的表、导入数据到Delta和读取Delta表等操作。
  • Others
    • constraint feature,支持主键和外键。
    • 解决servlet等jar冲突问题。