DataScience支持您将自定义DAG转换为Pipeline,并在KubeFlow上运行。本文通过示例为您介绍如何将自定义DAG转为Pipeline。

前提条件

  • 已创建DataScience集群,并且选择了Kubeflow服务,详情请参见创建集群
  • 已通过SSH方式连接DataScience集群,详情请参见登录集群

操作步骤

  1. 准备数据。
    例如,将以下JSON表述的DAG运行在KubeFlow中,借助dag2pipeline,您可以自定义非常复杂的spark-sql进行特征工程,并设置好例行化选项。
    {
        "name": "DataScience",
        "link": "https://emr.console.aliyun.com/#/cn-beijing/cluster/create",
        "nodes": [{
                "name": "rec_tem_behavior_table_test_preprocess",
                "database": "xy_rec_sln_test",
                "type": "SPARKSQL_PARALLEL",
                "relative_start_date": -4,
                "relative_end_date": 0,
                "dependencies": [],
                "sqlfile": "test/feature/rec_tem_behavior_table_test_preprocess.sql",
                "args": [
                    "--num-executors",
                    "1"
                ],
                "comment": "sparksql parallel"
            },
            {
                "name": "rec_tem_user_table_test_preprocess",
                "database": "xy_rec_sln_test",
                "type": "SPARKSQL_PARALLEL",
                "relative_start_date": -4,
                "relative_end_date": 0,
                "dependencies": [],
                "sqlfile": "test/feature/rec_tem_user_table_test_preprocess.sql",
                "args": [
                    "--num-executors",
                    "1"
                ],
                "comment": "sparksql parallel"
            },
            {
                "name": "rec_tem_item_table_test_preprocess",
                "database": "xy_rec_sln_test",
                "type": "SPARKSQL_PARALLEL",
                "relative_start_date": -4,
                "relative_end_date": 0,
                "dependencies": [],
                "sqlfile": "test/feature/rec_tem_item_table_test_preprocess.sql",
                "args": [
                    "--num-executors",
                    "1"
                ],
                "comment": "sparksql parallel"
            },
            {
                "name": "rec_tem_behavior_table_test_preprocess_wide",
                "database": "xy_rec_sln_test",
                "type": "SPARKSQL_PARALLEL",
                "relative_start_date": -4,
                "relative_end_date": 0,
                "dependencies": [
                    "rec_tem_item_table_test_preprocess",
                    "rec_tem_user_table_test_preprocess",
                    "rec_tem_behavior_table_test_preprocess"
                ],
                "sqlfile": "test/feature/rec_tem_behavior_table_test_preprocess_wide.sql",
                "args": [
                    "--num-executors",
                    "1"
                ],
                "comment": "sparksql parallel"
            },
            {
                "name": "rec_tem_user_table_test_preprocess_all_feature",
                "database": "xy_rec_sln_test",
                "type": "SPARKSQL_PARALLEL",
                "relative_start_date": -1,
                "relative_end_date": 0,
                "dependencies": [
                    "rec_tem_behavior_table_test_preprocess_wide",
                    "rec_tem_user_table_test_preprocess"
                ],
                "sqlfile": "test/feature/rec_tem_user_table_test_preprocess_all_feature.sql",
                "args": [
                    "--num-executors",
                    "1"
                ],
                "comment": "sparksql parallel"
            },
            {
                "name": "rec_tem_item_table_test_preprocess_all_feature",
                "database": "xy_rec_sln_test",
                "type": "SPARKSQL_PARALLEL",
                "relative_start_date": -1,
                "relative_end_date": 0,
                "dependencies": [
                    "rec_tem_behavior_table_test_preprocess_wide",
                    "rec_tem_item_table_test_preprocess"
                ],
                "sqlfile": "test/feature/rec_tem_item_table_test_preprocess_all_feature.sql",
                "args": [
                    "--num-executors",
                    "1"
                ],
                "comment": "sparksql parallel"
            },
            {
                "name": "rec_tem_behavior_table_test_preprocess_rank_sample",
                "database": "xy_rec_sln_test",
                "type": "SPARKSQL_PARALLEL",
                "relative_start_date": 0,
                "relative_end_date": 0,
                "dependencies": [
                    "rec_tem_item_table_test_preprocess_all_feature",
                    "rec_tem_behavior_table_test_preprocess",
                    "rec_tem_user_table_test_preprocess_all_feature"
                ],
                "sqlfile": "test/rank/rec_tem_behavior_table_test_preprocess_rank_sample.sql",
                "args": [
                    "--num-executors",
                    "1"
                ],
                "comment": "sparksql parallel"
            },
            {
                "name": "insert_rec_tem_user_table_test_preprocess_all_feature_holo",
                "type": "DI",
                "dependencies": [
                    "rec_tem_user_table_test_preprocess_all_feature"
                ],
                "dataxjson": "test/feature/1.json",
                "comment": "DATAX, for more detail please access https://github.com/alibaba/datax"
            },
            {
                "name": "hadoop_mr_job1",
                "type": "HADOOP_MR",
                "dependencies": [
                    "insert_rec_tem_user_table_test_preprocess_all_feature_holo"
                ],
                "jar": "hadoop-mapreduce-examples-2.8.5.jar",
                "classname": "wordcount",
                "args": [
                    "/wordcount/input/",
                    "/wordcount/output%%DATE%%"
                ],
                "comment": "jar file should be placed in bigdata directoray."
            }
        ]
    }
    1

    上述代码示例中,您可以根据节点类型,将包放置到相应的目录。

    • SPARKSQL_PARALLEL
      可以并行执行sparksql节点。代码详细信息如下所示。
      {
          "name": "rec_tem_behavior_table_test_preprocess",
          "database": "xy_rec_sln_test",
          "type": "SPARKSQL_PARALLEL",
          "relative_start_date": -4,
          "relative_end_date": 0,
          "dependencies": [],
          "sqlfile": "test/feature/rec_tem_behavior_table_test_preprocess.sql",
          "args": [
              "--num-executors",
              "1"
          ],
          "comment": "sparksql parallel"
      }

      代码示例中将test目录置于bigdata/目录下。

      其中,涉及参数描述如下:
      • relative_start_daterelative_end_date:并行执行rec_tem_behavior_table_test_preprocess.sql的起止日期。
      • args:spark-sql参数。
      • database:spark-sql运行的数据库名称。
    • HADOOP_MR
      代码详细信息如下所示。
      {
          "name": "hadoop_mr_job1",
          "type": "HADOOP_MR",
          "dependencies": [
              "insert_rec_tem_user_table_test_preprocess_all_feature_holo"
          ],
          "jar": "hadoop-mapreduce-examples-2.8.5.jar",
          "classname": "wordcount",
          "args": [
              "/wordcount/input/",
              "/wordcount/output%%DATE%%"
          ],
          "comment": "jar file should be placed in bigdata directoray."
      }

      代码示例中将hadoop-mapreduce-examples-2.8.5.jar置于bigdata/目录下。

      其中,涉及参数描述如下:
      • args:Hadoop Job的参数。
      • jar:Hadoop Job的JAR包。
      • classname:JAR包内的ClassName。
    • DI/DATAX
      代码详细信息如下所示。
      {
          "name": "insert_rec_tem_user_table_test_preprocess_all_feature_holo",
          "type": "DI",
          "dependencies": [
              "rec_tem_user_table_test_preprocess_all_feature"
          ],
          "dataxjson": "test/feature/1.json",
          "comment": "DATAX, for more detail please access https://github.com/alibaba/datax"
      }

      代码示例中将test目录置于bigdata/下。

      其中,参数dataxjson表示DataX Job的参数,详细信息请参见DataX

  2. 执行以下命令,编译DAG生成bigdata_pipeline.tar.gz。
    python3.7 tools/convertdag2pipeline.py bigdata/test.json
  3. 上传和运行Pipeline。
    详细信息,请参见Kubeflow Easyrec Pipeline示例