全部产品
云市场

算法本地调试说明

更新时间:2019-07-24 13:50:23

1 开发环境准备

1.1 Spark客户端下载

  1. 本页面下载并解压spark客户端。
  1. tar xzvf spark-2.3.0-odps0.32.0.tar.gz

1.2 设置环境变量(如果本地已经安装了JAVA环境,请直接跳到1.3)

JAVA_HOME

  1. export JAVA_HOME=/path/to/jdk
  2. export PATH=$JAVA_HOME/bin/:$PATH

SPARK_HOME

  1. export SPARK_HOME=/path/to/spark_extracted_package
  2. export PATH=$SPARK_HOME/bin/:$PATH

1.3 设置spark-defaults.conf

初始化配置文件

  1. cd $SPARK_HOME/conf
  2. cp spark-defaults.conf.template spark-defaults.conf

配置账号

  1. spark.hadoop.odps.project.name=MAXCOMPUTE的项目
  2. spark.hadoop.odps.access.id=阿里云accessid
  3. spark.hadoop.odps.access.key=阿里云accesskey
  4. spark.hadoop.odps.end.point= http://service.odps.aliyun.com/api

使用SparkSQL访问ODPS的必要配置

  1. spark.sql.catalogImplementation=odps

配置资源(这部分已经有默认的了,也可以根据自己需要调整)

  1. spark.executor.instances=
  2. spark.executor.cores=
  3. spark.executor.memory=
  4. spark.driver.cores=
  5. spark.driver.memory=

2 编写PySpark代码

下面是一个读取输入表,新建输出表。将用户选择id列和内容列写入输出表的代码示例

  1. from pyspark import SparkContext, SparkConf
  2. from pyspark.sql import SQLContext, DataFrame, SparkSession
  3. import sys
  4. def mainFunc():
  5. # 处理输入参数
  6. arg_dict = {}
  7. for arg in sys.argv:
  8. argParam = arg.split('=', 1)
  9. if len(argParam) > 1:
  10. arg_dict[argParam[0]] = argParam[1]
  11. #定义输入节点
  12. INPUT_TABLE = arg_dict["inputTable1"]
  13. OUTPUT_TABLE = arg_dict["outputTable1"]
  14. ID_COL = arg_dict["idCol"]
  15. CONTENT_COL = arg_dict["contentCol"]
  16. conf = SparkConf().setAppName("odps_pyspark")
  17. sc = SparkContext(conf=conf)
  18. sql_context = SQLContext(sc)
  19. # 清理老数据表
  20. spark = SparkSession.builder.appName("spark sql").getOrCreate()
  21. spark.sql("DROP TABLE IF EXISTS " + OUTPUT_TABLE)
  22. spark.sql("CREATE TABLE IF NOT EXISTS " + OUTPUT_TABLE + "(id STRING,content STRING)")
  23. print ("Create odps table finished")
  24. normal_df = spark.sql("SELECT * FROM " + INPUT_TABLE)
  25. print ("Read normal odps table finished")
  26. spark.sql("INSERT INTO " + OUTPUT_TABLE + " SELECT " + ID_COL + " as id," + CONTENT_COL + " as content FROM " + INPUT_TABLE)
  27. print ("Write normal odps table finished")
  28. result_df = spark.sql("SELECT * FROM " + OUTPUT_TABLE)
  29. for i in result_df.collect():
  30. print (i)
  • 入口参数:

      1. read_example.mainFunc
  • 用户输入参数

    • inputTable1 — 用户输入表,对应算法配置的输入桩1
    • outputTable1 — 结果输出表,对应算法配置输出桩2
    • idCol — 输入表ID列选择,需为String类型
    • contentCol — 输入表内容列选择,需为String类型
    • currentProject — 当前项目名称

3 本地调试PySpark

将样例代码命名为read_example.py,并保存到桌面。在桌面新建a.py,填入如下启动内容

  1. from read_example import mainFunc
  2. if __name__ == '__main__':
  3. mainFunc()

在spark-defaults.conf中加以下配置

  1. spark.master=local[4]

运行命令

  1. cd $SPARK_HOME
  2. ./bin/spark-submit --driver-class-path cupid/odps-spark-datasource_2.11-3.3.8.jar --py-files python/lib/pyspark.zip,python/lib/py4j-0.10.6-src.zip ~/Desktop/a.py(文件a.py的路径) inputTable1=输入表名 outputTable1=输出表名 idCol=id列名 contentCol=内容列名