PySpark

更新时间:
复制为 MD 格式

本文以spark-3.x为示例做代码演示。

下载模板项目工程

如已下载项目工程,该步骤可跳过。下载步骤参考本地模式下使用Spark客户端

模板项目工程中提供了spark-1.x、spark-2.x、spark-3.x的样例代码,本文将以spark-3.x为主做案例演示。

示例一:MaxCompute Table读写

  • 详细代码

    点击展开查看代码

    # -*- coding: utf-8 -*-
    import sys
    from pyspark.sql import SparkSession
    
    try:
      # for python 2
      reload(sys)
      sys.setdefaultencoding('utf8')
    except:
      # python 3 not needed
      pass
    
    if __name__ == '__main__':
      spark = SparkSession.builder\
        .appName("spark sql")\
        .config("spark.sql.broadcastTimeout", 20 * 60)\
        .config("spark.sql.crossJoin.enabled", True)\
        .getOrCreate()
    
      tableName = "mc_test_table"
      ptTableName = "mc_test_pt_table"
      data = [i for i in range(0, 100)]
    
      # Drop Create
      spark.sql("DROP TABLE IF EXISTS %s" % tableName)
      spark.sql("DROP TABLE IF EXISTS %s" % ptTableName)
    
      spark.sql("CREATE TABLE %s (name STRING, num BIGINT)" % tableName)
      spark.sql("CREATE TABLE %s (name STRING, num BIGINT) PARTITIONED BY (pt1 STRING, pt2 STRING)" % ptTableName)
    
      df = spark.sparkContext.parallelize(data, 2).map(lambda s: ("name-%s" % s, s)).toDF("name: string, num: int")
      pt_df = spark.sparkContext.parallelize(data, 2).map(lambda s: ("name-%s" % s, s, "2018", "0601")).toDF("name: string, num: int, pt1: string, pt2: string")
    
      # 写普通表
      df.write.insertInto(tableName) # insertInto语义
      df.writeTo(tableName).overwritePartitions() # insertOverwrite use datasourcev2
    
      # 写分区表
      # DataFrameWriter 无法指定分区写入,需要通过临时表再用SQL写入特定分区
      df.createOrReplaceTempView("%s_tmp_view" % ptTableName)
      spark.sql("insert into table %s partition (pt1='2018', pt2='0601') select * from %s_tmp_view" % (ptTableName, ptTableName))
      spark.sql("insert overwrite table %s partition (pt1='2018', pt2='0601') select * from %s_tmp_view" % (ptTableName, ptTableName))
    
      pt_df.write.insertInto(ptTableName) # 动态分区 insertInto语义
      pt_df.write.insertInto(ptTableName, True) # 动态分区 insertOverwrite语义
    
      # 读普通表
      rdf = spark.sql("select name, num from %s" % tableName)
      print("rdf count, %s\n" % rdf.count())
      rdf.printSchema()
    
      # 读分区表
      rptdf = spark.sql("select name, num, pt1, pt2 from %s where pt1 = '2018' and pt2 = '0601'" % ptTableName)
      print("rptdf count, %s" % (rptdf.count()))
      rptdf.printSchema()
  • 提交方式

    1. 根据所需的Python版本添加相关依赖并修改配置文件,详情参考PySpark Python版本和依赖支持

    2. 执行如下代码

      cd $SPARK_HOME
      bin/spark-submit {{PATH_TO_PROJECT}}/MaxCompute-Spark/spark-3.x/src/main/python/spark_sql.py

示例二:阿里云OSS

  • 详细代码

    点击展开查看代码

    # -*- coding: utf-8 -*-
    import sys
    from pyspark.sql import SparkSession
    
    try:
      # for python 2
      reload(sys)
      sys.setdefaultencoding('utf8')
    except:
      # python 3 not needed
      pass
    
    if __name__ == '__main__':
      spark = SparkSession.builder\
        .appName("spark write df to oss")\
        .getOrCreate()
    
      data = [i for i in range(0, 100)]
    
      df = spark.sparkContext.parallelize(data, 2).map(lambda s: ("name-%s" % s, s)).toDF("name: string, num: int")
    
      df.show(n=10)
    
      # write to oss
      pathout = 'oss://<your-bucket>/test.csv'
      df.write.csv(pathout)
  • 提交方式

    1. 根据所需的Python版本添加相关依赖并修改配置文件,详情参考PySpark Python版本和依赖支持

    2. 添加OSS依赖,详情参考访问阿里云OSS

    3. 执行如下代码

      cd $SPARK_HOME
      bin/spark-submit {{PATH_TO_PROJECT}}/MaxCompute-Spark/spark-3.x/src/main/python/spark_oss.py