本文为您介绍如何通过Spark Connector及JDBC导入Spark中的数据至Hologres。
背景信息
Spark是处理大规模数据的统一分析引擎。Hologres与Spark深度连通,内置Spark Connector,支持使用流方式(Stream)或批方式(Batch)导入Spark的数据至Hologres,帮助您快速搭建数据仓库。
说明 Hologres支持的Spark类型包括开源Spark和EMR Spark。
导入Spark数据的方式如下:
通过Spark Connector导入数据至Hologres
Hologres支持使用内置的Spark Connector将Spark数据写入Hologres,相比其他写入方式,调用Connector的方式性能更优,推荐您使用。具体操作步骤如下:
使用示例
通过JDBC导入数据至Hologres
Hologres兼容PostgreSQL生态,提供JDBC/ODBC Driver,因此Spark也可以通过JDBC的方式写入数据至Hologres。
使用JDBC方式导入数据之前,您需要前往官网下载PostgresSQL JDBC JAR文件(请下载42.2.18及以上版本),在Spark Shell中执行如下命令启动该JAR。
./bin/spark-shell --jars /path/to/postgresql-42.2.18.jar
使用JDBC导入数据分为Batch方式和Stream方式,具体如下:
- Batch方式
在Spark中准备数据并配置批量导入数据至Hologres的信息。语句如下。
参数说明如下表所示。//准备数据。 import org.apache.spark.sql.types._ import org.apache.spark.sql.Row val data = Seq( Row(1L, "test"), Row(2L, "test2"), Row(3L, "test2"), Row(4L, "test2"), Row(5L, "test2"), Row(6L, "test2") ) val schema = StructType(Array( StructField("a", LongType), StructField("b", StringType) )) val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema) // 配置导入数据的信息。 .option("checkpointLocation", checkpointLocation) df.write.format("jdbc") .option("url", "jdbc:postgresql://endpoint:port/database") .option("driver", "org.postgresql.Driver") .option("dbtable", "table")//Hologres用于接收数据的表名称。 .option("user", "accesskey id")//当前阿里云账号的AccessKey ID。 .option("password", "accesskey secret")//当前阿里云账号的Accesskey Secret。 .option("isolationLevel", "NONE") .save()
参数 描述 jdbc:postgresql://endpoint:port/database
- endpoint:Hologres实例的网络地址。
- port:Hologres实例的端口。
- database:Hologres的数据库名称。
table Hologres用于接收数据的表名称。 accesskey id 当前阿里云账号的AccessKey ID。 您可以单击用户信息管理,获取AccessKey ID。
accesskey secret 当前阿里云账号的AccessKey Secret。 您可以单击用户信息管理,获取AccessKey Secret。
- Stream方式
- 生成JDBC JAR文件。
开源Spark或EMR Spark均可以使用如下命令获取JDBC Driver,并编译生成
emr-jdbc_2.11-2.1.0-SNAPSHOT.jar
文件。./bin/spark-shell --jars /path_to/emr-jdbc_2.11-2.1.0-SNAPSHOT.jar,postgresql-42.2.18.jar --driver-class-path /path_to/emr-jdbc_2.11-2.1.0-SNAPSHOT.jar,postgresql-42.2.18.jar
- 在Spark Shell中执行如下命令使用JDBC JAR文件。
./bin/spark-shell --jars /path_to/emr-jdbc_2.11-2.1.0-SNAPSHOT.jar,postgresql-42.2.18.jar --driver-class-path /path_to/emr-jdbc_2.11-2.1.0-SNAPSHOT.jar,postgresql-42.2.18.jar
- 准备Spark数据并导入至Hologres。
在Spark中创建一张表并写入数据。配置导入数据至Hologres的信息,并使用流方式实时导入数据至Hologres。
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count() val query = wordCounts.writeStream .outputMode("complete") .format("jdbc2") .option("url", "jdbc:postgresql://endpoint:port/database")//配置为相应Hologres的实例信息。 .option("driver", "org.postgresql.Driver") .option("dbtable", "table")//Hologres用于接收数据的表名称。 .option("user", "accesskey id")//当前阿里云账号的AccessKey ID。 .option("password", "accesskey secret")//当前阿里云账号的AccessKey Secret。 .option("batchsize", 100) //配置批量导入数据的条数,此处示例配置为100条。 .option("isolationLevel", "NONE") .option("checkpointLocation", checkpointLocation) .start() query.awaitTermination()
说明- 合理设置
batchsize
参数可以提高导入数据的性能。 - 当Provider的名称配置为jdbc2时,流计算模式的EMR-SDK性能更好。开源Spark和EMR Spark均可以直接使用jdbc2。
- 合理设置
- 生成JDBC JAR文件。
数据类型映射
Spark和Hologres的数据类型映射如下表所示。
Spark | Hologres |
---|---|
LongType | BIGINT |
StringType | TEXT |
DecimalType | NUMERIC(38, 18) |
BooleanType | BOOL |
DoubleType | DOUBLE PRECISION |
TimestampType | TIMESTAMPTZ |
在文档使用中是否遇到以下问题
更多建议
匿名提交