使用自定义算法上传功能时,您需要先开发算法包,基于开发好的算法包创建并发布算法。本文为您介绍如何开发算法包。

操作步骤

  1. 准备开发环境。
    1. 下载并解压Spark客户端
      tar xzvf spark-2.3.0-odps0.32.0.tar.gz
    2. 可选:设置JAVA_HOME和SPARK_HOME环境变量。
      说明 如果本地设备已安装Java环境,可跳过该步骤。
      • JAVA_HOME
        export JAVA_HOME=/path/to/jdk
        export PATH=$JAVA_HOME/bin/:$PATH
      • SPARK_HOME
        export SPARK_HOME=/path/to/spark_extracted_package
        export PATH=$SPARK_HOME/bin/:$PATH
  2. 设置spark-defaults.conf。
    1. 初始化配置文件。命令示例如下。
      cd $SPARK_HOME/conf
      cp spark-defaults.conf.template spark-defaults.conf
    2. 配置账号。命令示例如下。
      spark.hadoop.odps.project.name=MaxCompute项目名称
      spark.hadoop.odps.access.id=账号的AccessKey ID
      spark.hadoop.odps.access.key=账号的AccessKey Secret
      spark.hadoop.odps.end.point=MaxCompute项目的Endpoint
    3. 可选:使用SparkSQL访问MaxCompute,需要配置如下信息。
      spark.sql.catalogImplementation=odps
    4. 配置资源。命令示例如下。
      说明 您可以根据需要自行调整配置。
      spark.executor.instances=
      spark.executor.cores=
      spark.executor.memory=
      spark.driver.cores=
      spark.driver.memory=
  3. 编写代码。
    以PySpark为例,代码命名为read_example.py,命令示例如下。
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SQLContext, DataFrame, SparkSession
    import sys
    def mainFunc():
        # 处理输入参数
        arg_dict = {}
        for arg in sys.argv:
            argParam = arg.split('=', 1)
            if len(argParam) > 1:
                arg_dict[argParam[0]] = argParam[1]
        #定义输入节点
        INPUT_TABLE = arg_dict["inputTable1"]
        OUTPUT_TABLE = arg_dict["outputTable1"]
        ID_COL = arg_dict["idCol"]
        CONTENT_COL = arg_dict["contentCol"]
        conf = SparkConf().setAppName("odps_pyspark")
        sc = SparkContext(conf=conf)
        sql_context = SQLContext(sc)
        # 清理老数据表
        spark = SparkSession.builder.appName("spark sql").getOrCreate()
        spark.sql("DROP TABLE IF EXISTS " + OUTPUT_TABLE)
        spark.sql("CREATE TABLE IF NOT EXISTS " + OUTPUT_TABLE + "(id STRING,content STRING)")
        print ("Create odps table finished")
        normal_df = spark.sql("SELECT * FROM " + INPUT_TABLE)
        print ("Read normal odps table finished")
        spark.sql("INSERT INTO " + OUTPUT_TABLE + " SELECT " + ID_COL + " as id," + CONTENT_COL + " as content FROM " + INPUT_TABLE)
        print ("Write normal odps table finished")
        result_df = spark.sql("SELECT * FROM " + OUTPUT_TABLE)
        for i in result_df.collect():
            print (i)
    • 入口参数
      read_example.mainFunc
    • 用户输入参数
      • inputTable1:输入表,对应算法配置的输入桩。
      • outputTable1:结果输出表,对应算法配置输出桩。
      • idCol:输入表的ID列,需要为STRING数据类型。
      • contentCol:输入表的内容列,需要为STRING数据类型。
      • currentProject:项目名称。
  4. 调试代码。以PySpark为例。
    1. 将read_example.py保存到本地设备。并新建a.py,输入如下启动内容。
      from read_example import mainFunc
      if __name__ == '__main__':
          mainFunc()
    2. 在spark-defaults.conf文件中增加以下配置。
      spark.master=local[4]
    3. 运行如下命令调试代码。
      cd $SPARK_HOME
      ./bin/spark-submit --driver-class-path cupid/odps-spark-datasource_2.11-3.3.8.jar --py-files python/lib/pyspark.zip,python/lib/py4j-0.10.6-src.zip ~/Desktop/a.py(文件a.py的路径) inputTable1=输入表名 outputTable1=输出表名 idCol=id列名 contentCol=内容列名