Spark是用于大规模数据处理的统一分析引擎,Hologres已与Spark社区版及EMR Spark版实现高效集成,迅速帮助企业构建数据仓库。Hologres提供的Spark Connector支持在Spark集群中创建Hologres Catalog,以外部表的形式实现高性能的数据批量读取和导入,相较于原生JDBC,具有更为卓越的性能。
使用限制
仅V1.3及以上版本的Hologres实例支持Spark Connector。您可以在Hologres管理控制台的实例详情页查看当前实例版本。若您的实例是V1.3以下版本,请使用实例升级或通过搜索(钉钉群号:32314975)加入实时数仓Hologres交流群申请升级实例。
准备工作
- 您需要安装对应版本的Spark环境,能够运行spark-sql、spark-shell或pyspark命令,建议使用Spark 3.3.0及以上版本,以避免依赖问题,并获得更丰富的功能体验。 - 您可以使用阿里云EMR Spark来快速构建Spark环境对接Hologres实例,详情请参见EMR Spark功能。 
- 您也可以独立在所需环境中搭建Spark环境,详情请参见Apache Spark。 
 
- Spark读写Hologres需要引用的连接器JAR包 - hologres-connector-spark-3.x。本文使用当前最新版本1.5.2。您可以通过Maven中央仓库进行下载。更多关于Connector的资源都已开源,详情请参见Hologres-Connectors。
- 若您需要使用Java语言开发Spark作业,并在如IntelliJ IDEA工具进行本地调试时,可直接引入下方Maven依赖进行pom.xml文件配置。 - <dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>hologres-connector-spark-3.x</artifactId> <version>1.5.2</version> <classifier>jar-with-dependencies</classifier> </dependency>
Hologres Catalog
Spark Connector从1.5.2版本开始支持Hologres Catalog,您可以通过外部表的方式方便地读写Hologres。
Spark中每个Hologres Catalog对应Hologres中的各个Database,每个Hologres Catalog中的Namespace对应Database中的各个Schema。以下部分将展示如何在Spark中使用Hologres Catalog。
目前Hologres Catalog暂不支持创建表。
本文Hologres实例中对应数据库和表名如下:
test_db --数据库
  public.test_table1 --public模式下表
  public.test_table2
  test_schema.test_table3  -- test_schema模式下表 Hologres Catalog初始化
在Spark集群中启动spark-sql,加载Hologres Connector并指定Catalog参数。
spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
--conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
--conf spark.sql.catalog.hologres_external_test_db.username=*** \
--conf spark.sql.catalog.hologres_external_test_db.password=*** \
--conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_dbHologres Catalog常用命令
- 加载Hologres Catalog - Spark中的Hologres Catalog完全对应一个Hologres的Database,使用过程中无法更改。 - USE hologres_external_test_db;
- 查询所有Namespace。 - Spark中的Namespace,对应Hologres中的Schema,默认为public,使用过程中可以使用 - USE指令调整默认的Schema。- -- 查看Hologres Catalog中的所有Namespace, 即Hologres中所有的Schema。 SHOW NAMESPACES;
- 查询Namespace下的表。 - 查询所有表。 - SHOW TABLES;
- 查询指定Namespace下的表。 - USE test_schema; SHOW TABLES; -- 或者使用 SHOW TABLES IN test_schema;
 
- 读取和写入表。 - 使用SELECT和INSERT语句对Catalog中的Hologres外部表进行读写。 - -- 读取表。 SELECT * FROM public.test_table1; -- 写入表。 INSERT INTO test_schema.test_table3 SELECT * FROM public.test_table1;
导入Hologres
本节测试Hologres数据来源TPC-H数据集中的customer表。Spark可以通过CSV格式读取Hologres表数据。您可以直接下载customer数据。customer表结构创建SQL如下。
CREATE TABLE customer_holo_table
(
  c_custkey    BIGINT ,
  c_name       TEXT   ,
  c_address    TEXT   ,
  c_nationkey  INT    ,
  c_phone      TEXT   ,
  c_acctbal    DECIMAL(15,2) ,
  c_mktsegment TEXT   ,
  c_comment    TEXT
);使用Spark-SQL导入
在使用Spark-SQL时,借助Catalog加载Hologres表的元数据更加便捷,同时也可以通过创建临时表的方式来声明一个Hologres表。
- Hologres-Connector-Spark 1.5.2以下版本,不支持Catalog,只能通过创建临时表的方式声明Hologres表。 
- Hologres-Connector-Spark更多参数信息,详情请参见参数说明。 
- 初始化Hologres Catalog。 - spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \ --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \ --conf spark.sql.catalog.hologres_external_test_db.username=*** \ --conf spark.sql.catalog.hologres_external_test_db.password=*** \ --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db
- 从CSV源表导入数据至Hologres外表。 说明- Spark的INSERT INTO语法不支持通过 - column_list指定部分列进行写入,例如使用- INSERT INTO hologresTable(c_custkey) SELECT c_custkey FROM csvTable来表示只写入c_custkey这一个字段。- 若您希望写入部分所需字段,可以使用 - CREATE TEMPORARY VIEW的方式声明仅所需字段的Hologres临时表。- CATALOG写入- -- 加载Hologres Catalog。 USE hologres_external_test_db; -- 创建csv数据源 CREATE TEMPORARY VIEW csvTable ( c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INT, c_phone STRING, c_acctbal DECIMAL(15, 2), c_mktsegment STRING, c_comment STRING) USING csv OPTIONS ( path "resources/customer", sep "," -- 本地测试,直接用文件所在绝对路径。 ); -- 将csv表的数据写入到Hologres中 INSERT INTO public.customer_holo_table SELECT * FROM csvTable;- TEMPORARY VIEW写入- -- 创建csv数据源 CREATE TEMPORARY VIEW csvTable ( c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INT, c_phone STRING, c_acctbal DECIMAL(15, 2), c_mktsegment STRING, c_comment STRING) USING csv OPTIONS ( path "resources/customer", sep "," ); -- 创建hologres临时表 CREATE TEMPORARY VIEW hologresTable ( c_custkey BIGINT, c_name STRING, c_phone STRING) USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db", username "***", password "***", table "customer_holo_table" ); INSERT INTO hologresTable SELECT c_custkey,c_name,c_phone FROM csvTable;
使用DataFrame导入
使用spark-shell、pyspark等开发Spark作业,您也可以调用DataFrame的write接口来进行写入。不同的开发语言会将读取到CSV文件数据转为DataFrame后,再写入Hologres实例,相关示例代码如下。Hologres-Connector-Spark更多参数信息,详情请参见参数说明。
Scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.SaveMode
// csv源的schema
val schema = StructType(Array(
  StructField("c_custkey", LongType),
  StructField("c_name", StringType),
  StructField("c_address", StringType),
  StructField("c_nationkey", IntegerType),
  StructField("c_phone", StringType),
  StructField("c_acctbal", DecimalType(15, 2)),
  StructField("c_mktsegment", StringType),
  StructField("c_comment", StringType)
))
// 从csv文件读取数据为DataFrame
val csvDf = spark.read.format("csv").schema(schema).option("sep", ",").load("resources/customer")
// 将读取到的DataFrame写入到Hologres中
csvDf.write
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.mode(SaveMode.Append)
.save()Java
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.SaveMode;
import java.util.Arrays;
import java.util.List;
public class SparkTest {
    public static void main(String[] args) {
        // csv源的schema
        List<StructField> asList =
                Arrays.asList(
                        DataTypes.createStructField("c_custkey", DataTypes.LongType, true),
                        DataTypes.createStructField("c_name", DataTypes.StringType, true),
                        DataTypes.createStructField("c_address", DataTypes.StringType, true),
                        DataTypes.createStructField("c_nationkey", DataTypes.IntegerType, true),
                        DataTypes.createStructField("c_phone", DataTypes.StringType, true),
                        DataTypes.createStructField("c_acctbal", new DecimalType(15, 2), true),
                        DataTypes.createStructField("c_mktsegment", DataTypes.StringType, true),
                        DataTypes.createStructField("c_comment", DataTypes.StringType, true));
        StructType schema = DataTypes.createStructType(asList);
        // 使用本地模式运行
        SparkSession spark = SparkSession.builder()
                .appName("Spark CSV Example")
                .master("local[*]") 
                .getOrCreate();
        // 从csv文件读取数据为DataFrame
        // 本地测试采用customer数据的绝对路径
        Dataset<Row> csvDf = spark.read().format("csv").schema(schema).option("sep", ",").load("resources/customer");
        // 将读取到的DataFrame写入到Hologres中
        csvDf.write.format("hologres").option(
           "username", "***").option(
           "password", "***").option(
           "jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
           "table", "customer_holo_table").mode(
           "append").save()
    }
}Maven文件所需的配置如下。
<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.13</artifactId>
      <version>3.5.4</version>
      <scope>provided</scope>
</dependency>Python
from pyspark.sql.types import *
# csv源的schema
schema = StructType([
    StructField("c_custkey", LongType()),
    StructField("c_name", StringType()),
    StructField("c_address", StringType()),
    StructField("c_nationkey", IntegerType()),
    StructField("c_phone", StringType()),
    StructField("c_acctbal", DecimalType(15, 2)),
    StructField("c_mktsegment", StringType()),
    StructField("c_comment", StringType())
])
# 从csv文件读取数据为DataFrame
csvDf = spark.read.csv("resources/customer", header=False, schema=schema, sep=',')
# 将读取到的DataFrame写入到Hologres中
csvDf.write.format("hologres").option(
    "username", "***").option(
    "password", "***").option(
    "jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
    "table", "customer_holo_table").mode(
    "append").save()Spark执行不同语言的作业,具体操作如下:
- Scala - 您可以用示例代码生成sparktest.scala文件,通过如下方式执行作业。 - -- 加载依赖 spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar -- 本地测试使用绝对路径加载文件 scala> :load D:/sparktest.scala
- 您也可以在加载依赖完成后,直接将示例代码粘贴进去执行。 
 
- Java - 您可以使用开发工具引入示例代码,通过Maven工具完成打包。例如包名spark_test.jar。通过下方代码执行作业。 - -- 作业jar包采用绝对路径 spark-submit --class SparkTest --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar D:\spark_test.jar
- Python - 您可以在下方代码执行完成后,直接将示例代码粘贴进去执行。 - pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
读取Hologres
- 从spark-connector1.3.2版本起,支持读取Hologres。相比Spark默认的 - jdbc-connector,- spark-connector可以按照Hologres表的shard进行并发读取,性能更好。读取的并发与表的shard数有关,- spark-connector可以通过参数- read.max_task_count进行限制,最终作业会生成- Min(shardCount, max_task_count)个读取Task。而且也支持Schema推断,不传入Schema时,会根据Hologres表的Schema推断出Spark侧的Schema。
- 从spark-connector1.5.0版本起,读取Hologres表支持了谓词下推,LIMIT下推以及字段裁剪。同时,也支持传入Hologres的 - SELECT QUERY来读取数据。此版本开始支持了批量模式读取,相比之前版本,读取性能提升3-4倍。
使用Spark-SQL读取
使用Spark-SQL时,通过Catalog加载Hologres表的元数据更加方便,您也可以通过创建临时表的方式声明一个Hologres表。
- Hologres-Connector-Spark 1.5.2以下版本,不支持Catalog,只能通过创建临时表的方式声明Hologres表。 
- Hologres-Connector-Spark更多参数信息,详情请参见参数说明。 
- 初始化Hologres Catalog。 - spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \ --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \ --conf spark.sql.catalog.hologres_external_test_db.username=*** \ --conf spark.sql.catalog.hologres_external_test_db.password=*** \ --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db
- 读取Hologres数据。 - 通过Catalog读取。 - -- 加载Hologres Catalog。 USE hologres_external_test_db; -- 读取Hologres表,支持字段裁剪和谓词下推。 SELECT c_custkey,c_name,c_phone FROM public.customer_holo_table WHERE c_custkey < 500 LIMIT 10;
- 通过创建临时表读取。 - CREATE TEMPORARY VIEW(table)- CREATE TEMPORARY VIEW hologresTable USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db", username "***", password "***", read.max_task_count "80", // Hologres表最多分为多少个task进行读取 table "customer_holo_table" ); -- 支持字段裁剪和谓词下推 SELECT c_custkey,c_name,c_phone FROM hologresTable WHERE c_custkey < 500 LIMIT 10;- CREATE TEMPORARY VIEW(query)- CREATE TEMPORARY VIEW hologresTable USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db", username "***", password "***", read.query "select c_custkey,c_name,c_phone from customer_holo_table where c_custkey < 500 limit 10" ); SELECT * FROM hologresTable LIMIT 5;
 
读取Hologres数据为DataFrame
使用spark-shell、pyspark等开发Spark作业,可以调用Spark的Read接口将数据读取为DataFrame以进行后续的计算。不同语言读取Hologres表为DataFrame的示例如下。Hologres-Connector-Spark更多参数信息,详情请参见参数说明。
Scala
val readDf = (
  spark.read
    .format("hologres")
    .option("username", "***")
    .option("password", "***")
    .option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
    .option("table", "customer_holo_table")
    .option("read.max_task_count", "80") // Hologres表最多分为多少个task进行读取
    .load()
    .filter("c_custkey < 500")
)
readDf.select("c_custkey", "c_name", "c_phone").show(10)Java
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class SparkSelect {
    public static void main(String[] args) {
        
        // 使用本地模式运行
        SparkSession spark = SparkSession.builder()
                .appName("Spark CSV Example")
                .master("local[*]") 
                .getOrCreate();
                
        Dataset<Row> readDf = (
           spark.read
                .format("hologres")
                .option("username", "***")
                .option("password", "***")
                .option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
                .option("table", "customer_holo_table")
                .option("read.max_task_count", "80") // Hologres表最多分为多少个task进行读取
                .load()
                .filter("c_custkey < 500")
        );
        readDf.select("c_custkey", "c_name", "c_phone").show(10);
    }
}Maven文件所需的配置如下。
<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.13</artifactId>
      <version>3.5.4</version>
      <scope>provided</scope>
</dependency>Python
readDf = spark.read.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").option(
"read.max_task_count", "80").load()
readDf.select("c_custkey", "c_name", "c_phone").show(10)Spark执行不同语言的作业,具体操作如下:
- Scala - 您可以用示例代码生成sparkselect.scala文件,通过如下方式执行作业。 - -- 加载依赖 spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar -- 本地测试使用绝对路径加载文件 scala> :load D:/sparkselect.scala
- 您也可以在加载依赖完成后,直接将示例代码粘贴进去执行。 
 
- Java - 您可以使用开发工具引入示例代码,通过Maven工具完成打包。例如包名spark_select.jar。通过下方代码执行作业。 - -- 作业jar包采用绝对路径 spark-submit --class SparkSelect --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar D:\spark_select.jar
- Python - 您可以在下方代码执行完成后,直接将示例代码粘贴进去执行。 - pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
参数说明
通用参数
| 参数 | 默认值 | 是否必填 | 说明 | 
| username | 无 | 是 | 
 | 
| password | 无 | 是 | 
 | 
| table | 无 | 是 | Hologres读写数据的表名称。 说明  读取数据时也可以选择使用 | 
| jdbcurl | 无 | 是 | Hologres实时数据API的jdbcUrl,格式为 | 
| enable_serverless_computing | false | 否 | 是否使用Serverless资源。仅对读取和 | 
| serverless_computing_query_priority | 3 | 否 | Serverless Computing执行优先级。 | 
| statement_timeout_seconds | 28800(8 小时) | 否 | 单位为s,表示QUERY执行的超时时间。 | 
| retry_count | 3 | 否 | 当连接故障时的重试次数。 | 
| direct_connect | 对于可以直连的环境会默认使用直连。 | 否 | 批量数据读写的瓶颈通常是Endpoint的网络吞吐,因此我们会测试当前环境能否直连Hologres Frontend(接入节点),支持的情况下默认使用直连。此参数设置为 | 
写入参数
Hologres Connector支持Spark的SaveMode参数。对SQL来说,即INSERT INTO或者 INSERT OVERWRITE。对DataFrame来说,即WRITE时设置SaveMode为Append或者Overwrite。其中Overwrite会创建临时表进行写入,并在写入成功之后替换原始表,请在必要时使用。
| 参数名 | 参数曾用名 | 默认值 | 是否必填 | 说明 | 
| write.mode | copy_write_mode | auto | 否 | 写入的模式,取值如下,各写入模式之间的对比请参见批量写入模式对比。 
 | 
| write.copy.max_buffer_size | max_cell_buffer_size | 52428800(50MB) | 否 | 使用COPY模式写入时,本地buffer的最大长度通常无需调整,但在写入字段较大(如超长字符串)导致buffer溢出时,可以调大。 | 
| write.copy.dirty_data_check | copy_write_dirty_data_check | false | 否 | 是否进行脏数据校验。如开启此功能,一旦出现脏数据,能够精确定位到写入失败的具体行。然而,这会对写入性能产生一定影响,因此在非排查环节,建议不予开启。 | 
| write.on_conflict_action | write_mode | INSERT_OR_REPLACE | 否 | 当INSERT目标表为有主键的表时,采用不同策略: 
 | 
以下参数仅write.mode为insert时生效。
| 参数名 | 参数曾用名 | 默认值 | 是否必填 | 说明 | 
| write.insert.dynamic_partition | dynamic_partition | false | 否 | 
 | 
| write.insert.batch_size | write_batch_size | 512 | 否 | 每个写入线程的最大批次大小。在经过WriteMode合并后的Put数量达到WriteBatchSize时进行一次批量提交。 | 
| write.insert.batch_byte_size | write_batch_byte_size | 2097152(2 * 1024 * 1024) | 否 | 每个写入线程的最大批次大小,单位为Byte,默认2MB。在经过WriteMode合并后的Put数据字节数达到WriteBatchByteSize时进行一次批量提交 | 
| write.insert.max_interval_ms | write_max_interval_ms | 10000 | 否 | 距离上次提交超过WriteMaxIntervalMs的时间会触发一次批量提交。 | 
| write.insert.thread_size | write_thread_size | 1 | 否 | 写入并发线程数(每个并发占用1个数据库连接)。 | 
读取参数
| 参数名 | 参数曾用名 (1.5.0及之前版本) | 默认值 | 是否必填 | 说明 | 
| read.mode | bulk_read | auto | 否 | 读取的模式,取值如下: 
 | 
| read.max_task_count | max_partition_count | 80 | 否 | 将读取的Hologres表分为多个,每个分区对应一个Spark Task。如果Hologres表的ShardCount小于此参数,分区数量最多为ShardCount。 | 
| read.copy.max_buffer_size | / | 52428800(50MB) | 否 | 使用COPY模式读取时,本地Buffer的最大长度,在字段较大时出现异常应调大长度。 | 
| read.push_down_predicate | push_down_predicate | true | 否 | 是否进行谓词下推,例如在查询时应用的一些过滤条件。目前支持常见Filter过滤条件的下推,以及列裁剪。 | 
| read.push_down_limit | push_down_limit | true | 否 | 是否进行Limit下推。 | 
| read.select.batch_size | scan_batch_size | 256 | 否 | 
 | 
| read.select.timeout_seconds | scan_timeout_seconds | 60 | 否 | 
 | 
| read.query | query | 无 | 否 | 使用传入的 说明  
 | 
数据类型映射
| Spark类型 | Hologres类型 | 
| ShortType | SMALLINT | 
| IntegerType | INT | 
| LongType | BIGINT | 
| StringType | TEXT | 
| StringType | JSON | 
| StringType | JSONB | 
| DecimalType | NUMERIC(38, 18) | 
| BooleanType | BOOL | 
| DoubleType | DOUBLE PRECISION | 
| FloatType | FLOAT | 
| TimestampType | TIMESTAMPTZ | 
| DateType | DATE | 
| BinaryType | BYTEA | 
| BinaryType | ROARINGBITMAP | 
| ArrayType(IntegerType) | INT4[] | 
| ArrayType(LongType) | INT8[] | 
| ArrayType(FloatType) | FLOAT4[] | 
| ArrayType(DoubleType) | FLOAT8[] | 
| ArrayType(BooleanType) | BOOLEAN[] | 
| ArrayType(StringType) | TEXT[] | 
连接数计算
Hologres-Connector-Spark在进行读写时,会使用一定的JDBC连接数。可能受到如下因素影响:
- Spark的并发,在作业运行时在Spark UI处可以看到的同时运行的Task数量。 
- Connector每个并发使用的连接数: - COPY方式写入,每个并发仅使用一个JDBC连接。 
- INSERT方式写入,每个并发会使用 - write_thread_size个JDBC连接。
- 读取时,每个并发使用一个JDBC连接。 
 
- 其他方面可能使用的连接数:作业启动时,将执行Schema获取等操作,可能会短暂建立一个连接。 
因此作业使用的总的连接数可以通过如下公式计算:
| 工作项 | 使用连接数 | 
| Catalog查询元数据 | 1 | 
| 读取数据 | parallelism * 1 + 1 | 
| 写入COPY模式 | parallelism * 1 + 1 | 
| 写入INSERT模式 | parallelism * write_thread_size + 1 | 
以上连接数计算假设Spark可同时运行的Task数大于任务生成Task数。
Spark同时可以运行的Task并发可能受到用户设置的参数影响,如spark.executor.instances,也可能受到Hadoop对文件分块策略的影响,详情请参见Apache Hadoop。