读写PostgreSQL

Spark原生支持通过JDBC连接器访问PostgreSQL。Serverless Spark的部分版本在启动时会自动加载PostgreSQL JDBC驱动,因此您可以直接通过Serverless SparkSQL会话、Spark批处理任务或Notebook等方式进行连接,以便进行数据的读取与写入操作。

前提条件

  • 已创建Serverless Spark工作空间,详情请参见创建工作空间

  • 已创建PostgreSQL实例。

    您可以选择自建的PostgreSQL实例,或选择阿里云提供的RDS PostgreSQLPolarDB PostgreSQL数据库。

    本文以阿里云的RDS PostgreSQL为例,详情请参见快速创建RDS PostgreSQL实例

注意事项

  • JDBC驱动版本要求:

    • 如果您使用的Serverless Spark引擎是以下版本,则无需准备PostgreSQL JDBC Driver,因为Serverless Spark已内置该驱动(版本号为42.7.6)。

      • esr-4.4.0及后续版本

      • esr-3.4.0及后续版本

      • esr-2.8.0及后续版本

    • 如果使用的是低于上述版本的引擎,则需要手动下载PostgreSQL JDBC Driver并上传至OSS,同时在会话管理Spark配置中填写以下参数。

      spark.emr.serverless.user.defined.jars oss://<bucket>/path/to/postgresql-<version>.jar
  • 网络配置:确保Serverless Spark能够与PostgreSQL服务之间的网络互通。具体配置请参见EMR Serverless Spark与其他VPC间网络互通

    说明

    配置安全组规则时,请根据实际需求选择性开放必要的端口范围(1~65535)。本文示例需开启TCP 5432端口。

操作步骤

方式一:使用SQL会话

  1. 创建SQL会话。 在会话管理中创建SQL会话,并选择预先配置的网络连接。具体配置请参见创建SQL会话

  2. 创建SQL任务。 在数据开发中创建SQL > SparkSQL类型的任务,使用以下SQL进行测试。

    CREATE TEMPORARY VIEW test
    USING org.apache.spark.sql.jdbc
    OPTIONS (
      url 'jdbc:postgresql://<jdbc_url>/<database>',
      dbtable '<schema>.<table>',
      user '<username>',
      password '<password>'
    );
    
    SELECT * FROM test;

    本文涉及参数说明如下所示。

    参数

    说明

    url

    JDBC连接字符串,包含PostgreSQL主机地址、端口和数据库名。

    填写格式为jdbc:postgresql://<jdbc_url>/<database>,需替换为实际值。

    dbtable

    待读取的数据库表名,格式为<schema>.<table> 。

    user

    PostgreSQL数据库用户名。

    说明

    需具备目标表的读取权限。

    password

    PostgreSQL数据库密码。

    如果能够正确输出表的内容,则说明连接成功。

    image

  3. 插入数据。 请使用以下命令向表中插入数据。

    INSERT INTO test VALUES(4, 'd'),(5, 'e');
    SELECT * FROM test;

    如果能正确查询到插入的数据,说明写入功能正常。

    image

方式二:使用Notebook会话

  1. 创建Notebook会话。 在会话管理中创建Notebook会话,并选择预先配置的网络连接。具体配置请参见创建Notebook会话

  2. 创建Notebook任务。 在数据开发中创建Python > Notebook类型的任务,使用以下Python代码进行测试。

    df = spark.read \
      .format("jdbc") \
      .option("url", "jdbc:postgresql://<jdbc_url>/<database>") \
      .option("dbtable", "<schema>.<table>") \
      .option("user", "<username>") \
      .option("password", "<password>") \
      .load()
    df.show()

    如果能够正确输出表的内容,说明连接成功。

    image

  3. 插入数据。 请使用以下代码向表中插入数据。

    df = spark.createDataFrame([(6, 'f'), (7, 'g')], ["id", "name"])
    df.write \
      .format("jdbc") \
      .mode("append") \
      .option("url", "jdbc:postgresql://<jdbc_url>/<database>") \
      .option("dbtable", "<schema>.<table>") \
      .option("user", "<username>") \
      .option("password", "<password>") \
      .save()
    df.show()

    涉及参数mode("append"),该参数指定写入模式为追加模式,确保新数据被追加至目标表中,而不会覆盖或删除已存数据。

    如果能正确返回插入的数据,说明写入功能正常。

    image

方式三:使用Spark批任务

  1. 编写测试代码。 使用以下Scala代码编译并打包为JAR文件。

    package spark.test
    
    import org.apache.spark.sql.SparkSession
    
    object Main {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
          .appName("test")
          .getOrCreate()
          
        val newRows = spark.createDataFrame(Seq((6, "f"), (7, "g"))).toDF("id", "name")
        newRows.write.format("jdbc")
          .mode("append")
          .option("url", "jdbc:postgresql://<jdbc_url>/<database>")
          .option("dbtable", "<schema>.<table>")
          .option("user", "<username>")
          .option("password", "<password>")
          .save()
    
        spark.read.format("jdbc")
          .option("url", "jdbc:postgresql://<jdbc_url>/<database>")
          .option("dbtable", "<schema>.<table>")
          .option("user", "<username>")
          .option("password", "<password>")
          .load()
          .show()
    
        spark.stop()
      }
    }
  2. 创建批任务。 在数据开发中创建批任务 > JAR类型的任务,然后配置以下参数进行测试。

    • jar资源:选择或者填写打包好的JAR文件地址。

    • Main Classspark.test.Main

    • 网络连接:选择预先配置的网络连接。

  3. 查看验证结果 任务执行后,您可以单击下方运行记录区域中的日志探查,在Driver日志Stdout页签,查看到PostgreSQL对应表中的内容。