Fusion引擎

Fusion引擎是EMR Serverless Spark内置的高性能向量化SQL执行引擎,相比开源Spark在TPC-DS基准测试上有2倍性能提升。Fusion引擎与开源Spark完全兼容,您无需对现有代码做任何修改。在EMR Serverless Spark中,只需在创建会话时开启使用Fusion加速开关,即可启用该引擎。

注意事项

Fusion引擎使用堆外内存,您需要在创建会话时,在Spark配置中添加配置项spark.memory.offHeap.enabled=true以开启Spark的堆外内存。同时,合理配置堆外内存spark.memory.offHeap.size

Fusion加速适用场景

Fusion引擎加速适用于Spark SQL和DataFrame任务,支持大部分的算子、表达式和数据类型实现性能提升,而对于RDD任务或者包含用户自定义函数(UDF)的执行阶段暂不支持加速效果。

存储格式

Fusion引擎支持的数据存储格式,如下所示。

  • Parquet

  • Paimon

  • ORC(partial)

算子

Fusion引擎为大部分常见算子提供加速,具体分类如下所示。

类型

算子列表

Source

  • FileSourceScanExec

  • HiveTableScanExec

  • BatchScanExec

  • InMemoryTableScanExec

Sink

DataWritingCommandExec

常见操作

  • FilterExec

  • ProjectExec

  • SortExec

  • UnionExec

聚合

HashAggregateExec

Join

  • BroadcastHashJoinExec

  • ShuffledHashJoinExec

  • SortMergeJoinExec

  • BroadcastNestedLoopJoinExec

  • CartesianProductExec

窗口

  • WindowExec

  • WindowTopK

Exchange

  • ShuffleExchangeExec

  • ReusedExchangeExec

  • BroadcastExchangeExec

  • CoalesceExec

Limit

  • GlobalLimitExec

  • LocalLimitExec

  • TakeOrderedAndProjectExec

Subquery

SubqueryBroadcastExec

其他

  • ExpandExec

  • GenerateExec

表达式

Fusion引擎目前支持的表达式,如下所示。

类型

表达式列表

比较/逻辑

!、!=、<、<=、>、>=、<=>、<>、=、==、||、and、between、is not null、is null、negative、null if、or

算术

%、+、-、*、/、isnan、mod、negative、not、positive、abs、acos、acosh、asin、asinh、atan、atan2、atanh、cbrt、ceil、ceiling、cos、cosh、degrees、e、exp、floor、ln、log、log10、log2、pi、pmod、pow、power、radians、rand、random、rint、round、shiftleft、shiftright、sign、signum、sin、sqrt、tan、tanh

位运算

^、|、&、~、bit_and、bit_count、bit_or、bit_xor、bit_length

条件表达式

case、if、when

集合

in、find_in_set

String计算

ascii、char、chr、char_length、character_length、concat、instr、lcase、lower、length、locate、lower、lpad、ltrim、

overlay、replace、reverse、rtrim、split、split_part、substr、substring、trim、ucase、upper、like、regexp、regexp_extract、regexp_extract_all、regexp_like、regexp_replace、rlike

聚合

aggregate、approx_count_distinct、avg、collect_list、collect_set、corr、count、covar_pop、covar_samp、first、first_value、kurtosis、last、last_value、max、max_by、mean、min、regr_avgx、regr_avgy、regr_count、regr_r2、

regr_intercept、regr_slope、regr_sxy、regr_sxx、regr_syy、skewness、std、stddev、stddev_pop、stddev_samp、sum、var_pop、var_samp、variance

窗口

cume_dist、dense_rank、lag、lead、nth_value、ntile、percent_rank、rank、row_number

时间

add_months、current_date、current_timestamp、current_timezone、date、date_add、date_format、date_from_unix_date、date_sub、datediff、day、dayofmonth、dayofweek、dayofyear、from_unixtime、from_utc_timestamp、hour、last_day、make_date、minute、month、next_day、now、quarter、second、timestamp_micros、timestamp_millis、to_date、to_unix_timestamp、unix_seconds、unix_millis、unix_micros、weekday、weekofyear、year

json

get_json_object、json_array_length

array

array、array_contains、array_distinct、array_except、array_intersect、array_join、array_max、array_min、array_position、array_remove、array_repeat、array_sort、arrays_overlap、arrays_zip、element_at、exists、filter、forall、flatten、shuffle、size、sort_array

map

map、get_map_value、map_from_arrays、map_keys、map_values、map_zip_with、named_struct、struct、str_to_map

编码

crc32、hash、md5、sha1、sha2

其他

current_catalog、current_database、greatest、least、monotonically_increasing_id、nanvl、spark_partition_id、stack、uuid、rand

数据类型

Fusion引擎支持的数据类型,如下所示。

  • Byte、Short、Int、Long

  • Boolean

  • String、Binary

  • Decimal

  • Float、Double

  • Date、Timestamp

Fusion加速暂不支持的场景

算子

类型

算子

聚合

  • ObjectHashAggregateExec

  • SortAggregateExec

Exchange

CustomShuffleReaderExec

Pandas

  • AggregateInPandasExec

  • FlatMapGroupsInPandasExec

  • ArrowEvalPythonExec

  • MapInPandasExec

  • WindowInPandasExec

其他

  • CollectLimitExec

  • RangeExec

  • SampleExec

数据类型

  • Struct

  • Array

  • Map