Spark是用于大规模数据处理的统一分析引擎,Hologres已与Spark社区版及EMR Spark版实现高效集成,迅速帮助企业构建数据仓库。Hologres提供的Spark Connector支持在Spark集群中创建Hologres Catalog,以外部表的形式实现高性能的数据批量读取和导入,相较于原生JDBC,具有更为卓越的性能。
使用限制
仅V1.3及以上版本的Hologres实例支持Spark Connector。您可以在Hologres管理控制台的实例详情页查看当前实例版本。若您的实例是V1.3以下版本,请使用实例升级或通过搜索(钉钉群号:32314975)加入实时数仓Hologres交流群申请升级实例。
准备工作
您需要安装对应版本的Spark环境,能够运行spark-sql、spark-shell或pyspark命令,建议使用Spark 3.3.0及以上版本,以避免依赖问题,并获得更丰富的功能体验。
您可以使用阿里云EMR Spark来快速构建Spark环境对接Hologres实例,详情请参见EMR Spark功能。
您也可以独立在所需环境中搭建Spark环境,详情请参见Apache Spark。
Spark读写Hologres需要引用的连接器JAR包
hologres-connector-spark-3.x
。本文使用当前最新版本1.5.2。您可以通过Maven中央仓库进行下载。更多关于Connector的资源都已开源,详情请参见Hologres-Connectors。若您需要使用Java语言开发Spark作业,并在如IntelliJ IDEA工具进行本地调试时,可直接引入下方Maven依赖进行pom.xml文件配置。
<dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>hologres-connector-spark-3.x</artifactId> <version>1.5.2</version> <classifier>jar-with-dependencies</classifier> </dependency>
Hologres Catalog
Spark Connector从1.5.2版本开始支持Hologres Catalog,您可以通过外部表的方式方便地读写Hologres。
Spark中每个Hologres Catalog对应Hologres中的各个Database,每个Hologres Catalog中的Namespace对应Database中的各个Schema。以下部分将展示如何在Spark中使用Hologres Catalog。
目前Hologres Catalog暂不支持创建表。
本文Hologres实例中对应数据库和表名如下:
test_db --数据库
public.test_table1 --public模式下表
public.test_table2
test_schema.test_table3 -- test_schema模式下表
Hologres Catalog初始化
在Spark集群中启动spark-sql,加载Hologres Connector并指定Catalog参数。
spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
--conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
--conf spark.sql.catalog.hologres_external_test_db.username=*** \
--conf spark.sql.catalog.hologres_external_test_db.password=*** \
--conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db
Hologres Catalog常用命令
加载Hologres Catalog
Spark中的Hologres Catalog完全对应一个Hologres的Database,使用过程中无法更改。
USE hologres_external_test_db;
查询所有Namespace。
Spark中的Namespace,对应Hologres中的Schema,默认为public,使用过程中可以使用
USE
指令调整默认的Schema。-- 查看Hologres Catalog中的所有Namespace, 即Hologres中所有的Schema。 SHOW NAMESPACES;
查询Namespace下的表。
查询所有表。
SHOW TABLES;
查询指定Namespace下的表。
USE test_schema; SHOW TABLES; -- 或者使用 SHOW TABLES IN test_schema;
读取和写入表。
使用SELECT和INSERT语句对Catalog中的Hologres外部表进行读写。
-- 读取表。 SELECT * FROM public.test_table1; -- 写入表。 INSERT INTO test_schema.test_table3 SELECT * FROM public.test_table1;
导入Hologres
本节测试Hologres数据来源TPC-H数据集中的customer
表。Spark可以通过CSV格式读取Hologres表数据。您可以直接下载customer数据。customer
表结构创建SQL如下。
CREATE TABLE customer_holo_table
(
c_custkey BIGINT ,
c_name TEXT ,
c_address TEXT ,
c_nationkey INT ,
c_phone TEXT ,
c_acctbal DECIMAL(15,2) ,
c_mktsegment TEXT ,
c_comment TEXT
);
使用Spark-SQL导入
在使用Spark-SQL时,借助Catalog加载Hologres表的元数据更加便捷,同时也可以通过创建临时表的方式来声明一个Hologres表。
Hologres-Connector-Spark 1.5.2以下版本,不支持Catalog,只能通过创建临时表的方式声明Hologres表。
Hologres-Connector-Spark更多参数信息,详情请参见参数说明。
初始化Hologres Catalog。
spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \ --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \ --conf spark.sql.catalog.hologres_external_test_db.username=*** \ --conf spark.sql.catalog.hologres_external_test_db.password=*** \ --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db
从CSV源表导入数据至Hologres外表。
Spark的INSERT INTO语法不支持通过
column_list
指定部分列进行写入,例如使用INSERT INTO hologresTable(c_custkey) SELECT c_custkey FROM csvTable
来表示只写入c_custkey这一个字段。若您希望写入部分所需字段,可以使用
CREATE TEMPORARY VIEW
的方式声明仅所需字段的Hologres临时表。CATALOG写入TEMPORARY VIEW写入-- 加载Hologres Catalog。 USE hologres_external_test_db; -- 创建csv数据源 CREATE TEMPORARY VIEW csvTable ( c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INT, c_phone STRING, c_acctbal DECIMAL(15, 2), c_mktsegment STRING, c_comment STRING) USING csv OPTIONS ( path "resources/customer", sep "," -- 本地测试,直接用文件所在绝对路径。 ); -- 将csv表的数据写入到Hologres中 INSERT INTO public.customer_holo_table SELECT * FROM csvTable;
-- 创建csv数据源 CREATE TEMPORARY VIEW csvTable ( c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INT, c_phone STRING, c_acctbal DECIMAL(15, 2), c_mktsegment STRING, c_comment STRING) USING csv OPTIONS ( path "resources/customer", sep "," ); -- 创建hologres临时表 CREATE TEMPORARY VIEW hologresTable ( c_custkey BIGINT, c_name STRING, c_phone STRING) USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db", username "***", password "***", table "customer_holo_table" ); INSERT INTO hologresTable SELECT c_custkey,c_name,c_phone FROM csvTable;
使用DataFrame导入
使用spark-shell、pyspark等开发Spark作业,您也可以调用DataFrame的write接口来进行写入。不同的开发语言会将读取到CSV文件数据转为DataFrame后,再写入Hologres实例,相关示例代码如下。Hologres-Connector-Spark更多参数信息,详情请参见参数说明。
import org.apache.spark.sql.types._
import org.apache.spark.sql.SaveMode
// csv源的schema
val schema = StructType(Array(
StructField("c_custkey", LongType),
StructField("c_name", StringType),
StructField("c_address", StringType),
StructField("c_nationkey", IntegerType),
StructField("c_phone", StringType),
StructField("c_acctbal", DecimalType(15, 2)),
StructField("c_mktsegment", StringType),
StructField("c_comment", StringType)
))
// 从csv文件读取数据为DataFrame
val csvDf = spark.read.format("csv").schema(schema).option("sep", ",").load("resources/customer")
// 将读取到的DataFrame写入到Hologres中
csvDf.write
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.mode(SaveMode.Append)
.save()
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.SaveMode;
import java.util.Arrays;
import java.util.List;
public class SparkTest {
public static void main(String[] args) {
// csv源的schema
List<StructField> asList =
Arrays.asList(
DataTypes.createStructField("c_custkey", DataTypes.LongType, true),
DataTypes.createStructField("c_name", DataTypes.StringType, true),
DataTypes.createStructField("c_address", DataTypes.StringType, true),
DataTypes.createStructField("c_nationkey", DataTypes.IntegerType, true),
DataTypes.createStructField("c_phone", DataTypes.StringType, true),
DataTypes.createStructField("c_acctbal", new DecimalType(15, 2), true),
DataTypes.createStructField("c_mktsegment", DataTypes.StringType, true),
DataTypes.createStructField("c_comment", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(asList);
// 使用本地模式运行
SparkSession spark = SparkSession.builder()
.appName("Spark CSV Example")
.master("local[*]")
.getOrCreate();
// 从csv文件读取数据为DataFrame
// 本地测试采用customer数据的绝对路径
Dataset<Row> csvDf = spark.read().format("csv").schema(schema).option("sep", ",").load("resources/customer");
// 将读取到的DataFrame写入到Hologres中
csvDf.write.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").mode(
"append").save()
}
}
Maven文件所需的配置如下。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.5.4</version>
<scope>provided</scope>
</dependency>
from pyspark.sql.types import *
# csv源的schema
schema = StructType([
StructField("c_custkey", LongType()),
StructField("c_name", StringType()),
StructField("c_address", StringType()),
StructField("c_nationkey", IntegerType()),
StructField("c_phone", StringType()),
StructField("c_acctbal", DecimalType(15, 2)),
StructField("c_mktsegment", StringType()),
StructField("c_comment", StringType())
])
# 从csv文件读取数据为DataFrame
csvDf = spark.read.csv("resources/customer", header=False, schema=schema, sep=',')
# 将读取到的DataFrame写入到Hologres中
csvDf.write.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").mode(
"append").save()
Spark执行不同语言的作业,具体操作如下:
Scala
您可以用示例代码生成sparktest.scala文件,通过如下方式执行作业。
-- 加载依赖 spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar -- 本地测试使用绝对路径加载文件 scala> :load D:/sparktest.scala
您也可以在加载依赖完成后,直接将示例代码粘贴进去执行。
Java
您可以使用开发工具引入示例代码,通过Maven工具完成打包。例如包名spark_test.jar。通过下方代码执行作业。
-- 作业jar包采用绝对路径 spark-submit --class SparkTest --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar D:\spark_test.jar
Python
您可以在下方代码执行完成后,直接将示例代码粘贴进去执行。
pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
读取Hologres
从spark-connector1.3.2版本起,支持读取Hologres。相比Spark默认的
jdbc-connector
,spark-connector
可以按照Hologres表的shard进行并发读取,性能更好。读取的并发与表的shard数有关,spark-connector
可以通过参数read.max_task_count
进行限制,最终作业会生成Min(shardCount, max_task_count)
个读取Task。而且也支持Schema推断,不传入Schema时,会根据Hologres表的Schema推断出Spark侧的Schema。从spark-connector1.5.0版本起,读取Hologres表支持了谓词下推,LIMIT下推以及字段裁剪。同时,也支持传入Hologres的
SELECT QUERY
来读取数据。此版本开始支持了批量模式读取,相比之前版本,读取性能提升3-4倍。
使用Spark-SQL读取
使用Spark-SQL时,通过Catalog加载Hologres表的元数据更加方便,您也可以通过创建临时表的方式声明一个Hologres表。
Hologres-Connector-Spark 1.5.2以下版本,不支持Catalog,只能通过创建临时表的方式声明Hologres表。
Hologres-Connector-Spark更多参数信息,详情请参见参数说明。
初始化Hologres Catalog。
spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \ --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \ --conf spark.sql.catalog.hologres_external_test_db.username=*** \ --conf spark.sql.catalog.hologres_external_test_db.password=*** \ --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db
读取Hologres数据。
通过Catalog读取。
-- 加载Hologres Catalog。 USE hologres_external_test_db; -- 读取Hologres表,支持字段裁剪和谓词下推。 SELECT c_custkey,c_name,c_phone FROM public.customer_holo_table WHERE c_custkey < 500 LIMIT 10;
通过创建临时表读取。
CREATE TEMPORARY VIEW(table)CREATE TEMPORARY VIEW(query)CREATE TEMPORARY VIEW hologresTable USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db", username "***", password "***", read.max_task_count "80", // Hologres表最多分为多少个task进行读取 table "customer_holo_table" ); -- 支持字段裁剪和谓词下推 SELECT c_custkey,c_name,c_phone FROM hologresTable WHERE c_custkey < 500 LIMIT 10;
CREATE TEMPORARY VIEW hologresTable USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db", username "***", password "***", read.query "select c_custkey,c_name,c_phone from customer_holo_table where c_custkey < 500 limit 10" ); SELECT * FROM hologresTable LIMIT 5;
读取Hologres数据为DataFrame
使用spark-shell、pyspark等开发Spark作业,可以调用Spark的Read接口将数据读取为DataFrame以进行后续的计算。不同语言读取Hologres表为DataFrame的示例如下。Hologres-Connector-Spark更多参数信息,详情请参见参数说明。
val readDf = (
spark.read
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.option("read.max_task_count", "80") // Hologres表最多分为多少个task进行读取
.load()
.filter("c_custkey < 500")
)
readDf.select("c_custkey", "c_name", "c_phone").show(10)
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class SparkSelect {
public static void main(String[] args) {
// 使用本地模式运行
SparkSession spark = SparkSession.builder()
.appName("Spark CSV Example")
.master("local[*]")
.getOrCreate();
Dataset<Row> readDf = (
spark.read
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.option("read.max_task_count", "80") // Hologres表最多分为多少个task进行读取
.load()
.filter("c_custkey < 500")
);
readDf.select("c_custkey", "c_name", "c_phone").show(10);
}
}
Maven文件所需的配置如下。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.5.4</version>
<scope>provided</scope>
</dependency>
readDf = spark.read.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").option(
"read.max_task_count", "80").load()
readDf.select("c_custkey", "c_name", "c_phone").show(10)
Spark执行不同语言的作业,具体操作如下:
Scala
您可以用示例代码生成sparkselect.scala文件,通过如下方式执行作业。
-- 加载依赖 spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar -- 本地测试使用绝对路径加载文件 scala> :load D:/sparkselect.scala
您也可以在加载依赖完成后,直接将示例代码粘贴进去执行。
Java
您可以使用开发工具引入示例代码,通过Maven工具完成打包。例如包名spark_select.jar。通过下方代码执行作业。
-- 作业jar包采用绝对路径 spark-submit --class SparkSelect --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar D:\spark_select.jar
Python
您可以在下方代码执行完成后,直接将示例代码粘贴进去执行。
pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
参数说明
通用参数
参数 | 默认值 | 是否必填 | 说明 |
参数 | 默认值 | 是否必填 | 说明 |
username | 无 | 是 |
|
password | 无 | 是 |
|
table | 无 | 是 | Hologres读写数据的表名称。 读取数据时也可以选择使用 |
jdbcurl | 无 | 是 | Hologres实时数据API的jdbcUrl,格式为 |
enable_serverless_computing | false | 否 | 是否使用Serverless资源。仅对读取和 |
serverless_computing_query_priority | 3 | 否 | Serverless Computing执行优先级。 |
statement_timeout_seconds | 28800(8 小时) | 否 | 单位为s,表示QUERY执行的超时时间。 |
retry_count | 3 | 否 | 当连接故障时的重试次数。 |
direct_connect | 对于可以直连的环境会默认使用直连。 | 否 | 批量数据读写的瓶颈通常是Endpoint的网络吞吐,因此我们会测试当前环境能否直连Hologres Frontend(接入节点),支持的情况下默认使用直连。此参数设置为 |
写入参数
Hologres Connector支持Spark的SaveMode
参数。对SQL来说,即INSERT INTO
或者 INSERT OVERWRITE
。对DataFrame来说,即WRITE时设置SaveMode为Append或者Overwrite。其中Overwrite会创建临时表进行写入,并在写入成功之后替换原始表,请在必要时使用。
参数名 | 参数曾用名 | 默认值 | 是否必填 | 说明 |
参数名 | 参数曾用名 | 默认值 | 是否必填 | 说明 |
write.mode | copy_write_mode | auto | 否 | 写入的模式,取值如下:
|
write.copy.max_buffer_size | max_cell_buffer_size | 52428800(50MB) | 否 | 使用COPY模式写入时,本地buffer的最大长度通常无需调整,但在写入字段较大(如超长字符串)导致buffer溢出时,可以调大。 |
write.copy.dirty_data_check | copy_write_dirty_data_check | false | 否 | 是否进行脏数据校验。如开启此功能,一旦出现脏数据,能够精确定位到写入失败的具体行。然而,这会对写入性能产生一定影响,因此在非排查环节,建议不予开启。 |
write.on_conflict_action | write_mode | NSERT_OR_REPLACE | 否 | 当INSERT目标表为有主键的表时,采用不同策略:
|
以下参数仅write.mode
为insert
时生效。
参数名 | 参数曾用名 | 默认值 | 是否必填 | 说明 |
参数名 | 参数曾用名 | 默认值 | 是否必填 | 说明 |
write.insert.dynamic_partition | dynamic_partition | false | 否 |
|
write.insert.batch_size | write_batch_size | 512 | 否 | 每个写入线程的最大批次大小。在经过WriteMode合并后的Put数量达到WriteBatchSize时进行一次批量提交。 |
write.insert.batch_byte_size | write_batch_byte_size | 2097152(2 * 1024 * 1024) | 否 | 每个写入线程的最大批次大小,单位为Byte,默认2MB。在经过WriteMode合并后的Put数据字节数达到WriteBatchByteSize时进行一次批量提交 |
write.insert.max_interval_ms | write_max_interval_ms | 10000 | 否 | 距离上次提交超过WriteMaxIntervalMs的时间会触发一次批量提交。 |
write.insert.thread_size | write_thread_size | 1 | 否 | 写入并发线程数(每个并发占用1个数据库连接)。 |
读取参数
参数名 | 参数曾用名 (1.5.0及之前版本) | 默认值 | 是否必填 | 说明 |
参数名 | 参数曾用名 (1.5.0及之前版本) | 默认值 | 是否必填 | 说明 |
read.mode | bulk_read | auto | 否 | 读取的模式,取值如下:
|
read.max_task_count | max_partition_count | 80 | 否 | 将读取的Hologres表分为多个,每个分区对应一个Spark Task。如果Hologres表的ShardCount小于此参数,分区数量最多为ShardCount。 |
read.copy.max_buffer_size | / | 52428800(50MB) | 否 | 使用COPY模式读取时,本地Buffer的最大长度,在字段较大时出现异常应调大长度。 |
read.push_down_predicate | push_down_predicate | true | 否 | 是否进行谓词下推,例如在查询时应用的一些过滤条件。目前支持常见Filter过滤条件的下推,以及列裁剪。 |
read.push_down_limit | push_down_limit | true | 否 | 是否进行Limit下推。 |
read.select.batch_size | scan_batch_size | 256 | 否 |
|
read.select.timeout_seconds | scan_timeout_seconds | 60 | 否 |
|
read.query | query | 无 | 否 | 使用传入的
|
数据类型映射
Spark类型 | Hologres类型 |
Spark类型 | Hologres类型 |
ShortType | SMALLINT |
IntegerType | INT |
LongType | BIGINT |
StringType | TEXT |
StringType | JSON |
StringType | JSONB |
DecimalType | NUMERIC(38, 18) |
BooleanType | BOOL |
DoubleType | DOUBLE PRECISION |
FloatType | FLOAT |
TimestampType | TIMESTAMPTZ |
DateType | DATE |
BinaryType | BYTEA |
BinaryType | ROARINGBITMAP |
ArrayType(IntegerType) | INT4[] |
ArrayType(LongType) | INT8[] |
ArrayType(FloatType) | FLOAT4[] |
ArrayType(DoubleType) | FLOAT8[] |
ArrayType(BooleanType) | BOOLEAN[] |
ArrayType(StringType) | TEXT[] |
连接数计算
Hologres-Connector-Spark在进行读写时,会使用一定的JDBC连接数。可能受到如下因素影响:
Spark的并发,在作业运行时在Spark UI处可以看到的同时运行的Task数量。
Connector每个并发使用的连接数:
COPY方式写入,每个并发仅使用一个JDBC连接。
INSERT方式写入,每个并发会使用
write_thread_size
个JDBC连接。读取时,每个并发使用一个JDBC连接。
其他方面可能使用的连接数:作业启动时,将执行Schema获取等操作,可能会短暂建立一个连接。
因此作业使用的总的连接数可以通过如下公式计算:
工作项 | 使用连接数 |
工作项 | 使用连接数 |
Catalog查询元数据 | 1 |
读取数据 | parallelism * 1 + 1 |
写入COPY模式 | parallelism * 1 + 1 |
写入INSERT模式 | parallelism * write_thread_size + 1 |
以上连接数计算假设Spark可同时运行的Task数大于任务生成Task数。
Spark同时可以运行的Task并发可能受到用户设置的参数影响,如spark.executor.instances
,也可能受到Hadoop对文件分块策略的影响,详情请参见Apache Hadoop。
- 本页导读 (1)
- 使用限制
- 准备工作
- Hologres Catalog
- Hologres Catalog初始化
- Hologres Catalog常用命令
- 导入Hologres
- 使用Spark-SQL导入
- 使用DataFrame导入
- 读取Hologres
- 使用Spark-SQL读取
- 读取Hologres数据为DataFrame
- 参数说明
- 通用参数
- 写入参数
- 读取参数
- 数据类型映射
- 连接数计算