DataScience支持您将自定义DAG转换为Pipeline,并在KubeFlow上运行。本文通过示例为您介绍如何将自定义DAG转为Pipeline。
操作步骤
- 准备数据。例如,将以下JSON表述的DAG运行在KubeFlow中,借助dag2pipeline,您可以自定义非常复杂的spark-sql进行特征工程,并设置好例行化选项。
{ "name": "DataScience", "link": "https://emr.console.aliyun.com/#/cn-beijing/cluster/create", "nodes": [{ "name": "rec_tem_behavior_table_test_preprocess", "database": "xy_rec_sln_test", "type": "SPARKSQL_PARALLEL", "relative_start_date": -4, "relative_end_date": 0, "dependencies": [], "sqlfile": "test/feature/rec_tem_behavior_table_test_preprocess.sql", "args": [ "--num-executors", "1" ], "comment": "sparksql parallel" }, { "name": "rec_tem_user_table_test_preprocess", "database": "xy_rec_sln_test", "type": "SPARKSQL_PARALLEL", "relative_start_date": -4, "relative_end_date": 0, "dependencies": [], "sqlfile": "test/feature/rec_tem_user_table_test_preprocess.sql", "args": [ "--num-executors", "1" ], "comment": "sparksql parallel" }, { "name": "rec_tem_item_table_test_preprocess", "database": "xy_rec_sln_test", "type": "SPARKSQL_PARALLEL", "relative_start_date": -4, "relative_end_date": 0, "dependencies": [], "sqlfile": "test/feature/rec_tem_item_table_test_preprocess.sql", "args": [ "--num-executors", "1" ], "comment": "sparksql parallel" }, { "name": "rec_tem_behavior_table_test_preprocess_wide", "database": "xy_rec_sln_test", "type": "SPARKSQL_PARALLEL", "relative_start_date": -4, "relative_end_date": 0, "dependencies": [ "rec_tem_item_table_test_preprocess", "rec_tem_user_table_test_preprocess", "rec_tem_behavior_table_test_preprocess" ], "sqlfile": "test/feature/rec_tem_behavior_table_test_preprocess_wide.sql", "args": [ "--num-executors", "1" ], "comment": "sparksql parallel" }, { "name": "rec_tem_user_table_test_preprocess_all_feature", "database": "xy_rec_sln_test", "type": "SPARKSQL_PARALLEL", "relative_start_date": -1, "relative_end_date": 0, "dependencies": [ "rec_tem_behavior_table_test_preprocess_wide", "rec_tem_user_table_test_preprocess" ], "sqlfile": "test/feature/rec_tem_user_table_test_preprocess_all_feature.sql", "args": [ "--num-executors", "1" ], "comment": "sparksql parallel" }, { "name": "rec_tem_item_table_test_preprocess_all_feature", "database": "xy_rec_sln_test", "type": "SPARKSQL_PARALLEL", "relative_start_date": -1, "relative_end_date": 0, "dependencies": [ "rec_tem_behavior_table_test_preprocess_wide", "rec_tem_item_table_test_preprocess" ], "sqlfile": "test/feature/rec_tem_item_table_test_preprocess_all_feature.sql", "args": [ "--num-executors", "1" ], "comment": "sparksql parallel" }, { "name": "rec_tem_behavior_table_test_preprocess_rank_sample", "database": "xy_rec_sln_test", "type": "SPARKSQL_PARALLEL", "relative_start_date": 0, "relative_end_date": 0, "dependencies": [ "rec_tem_item_table_test_preprocess_all_feature", "rec_tem_behavior_table_test_preprocess", "rec_tem_user_table_test_preprocess_all_feature" ], "sqlfile": "test/rank/rec_tem_behavior_table_test_preprocess_rank_sample.sql", "args": [ "--num-executors", "1" ], "comment": "sparksql parallel" }, { "name": "insert_rec_tem_user_table_test_preprocess_all_feature_holo", "type": "DI", "dependencies": [ "rec_tem_user_table_test_preprocess_all_feature" ], "dataxjson": "test/feature/1.json", "comment": "DATAX, for more detail please access https://github.com/alibaba/datax" }, { "name": "hadoop_mr_job1", "type": "HADOOP_MR", "dependencies": [ "insert_rec_tem_user_table_test_preprocess_all_feature_holo" ], "jar": "hadoop-mapreduce-examples-2.8.5.jar", "classname": "wordcount", "args": [ "/wordcount/input/", "/wordcount/output%%DATE%%" ], "comment": "jar file should be placed in bigdata directoray." } ] }
上述代码示例中,您可以根据节点类型,将包放置到相应的目录。
- SPARKSQL_PARALLEL
可以并行执行sparksql节点。代码详细信息如下所示。
{ "name": "rec_tem_behavior_table_test_preprocess", "database": "xy_rec_sln_test", "type": "SPARKSQL_PARALLEL", "relative_start_date": -4, "relative_end_date": 0, "dependencies": [], "sqlfile": "test/feature/rec_tem_behavior_table_test_preprocess.sql", "args": [ "--num-executors", "1" ], "comment": "sparksql parallel" }
代码示例中将test目录置于bigdata/目录下。
其中,涉及参数描述如下:- relative_start_date和relative_end_date:并行执行rec_tem_behavior_table_test_preprocess.sql的起止日期。
- args:spark-sql参数。
- database:spark-sql运行的数据库名称。
- HADOOP_MR
代码详细信息如下所示。
{ "name": "hadoop_mr_job1", "type": "HADOOP_MR", "dependencies": [ "insert_rec_tem_user_table_test_preprocess_all_feature_holo" ], "jar": "hadoop-mapreduce-examples-2.8.5.jar", "classname": "wordcount", "args": [ "/wordcount/input/", "/wordcount/output%%DATE%%" ], "comment": "jar file should be placed in bigdata directoray." }
代码示例中将hadoop-mapreduce-examples-2.8.5.jar置于bigdata/目录下。
其中,涉及参数描述如下:- args:Hadoop Job的参数。
- jar:Hadoop Job的JAR包。
- classname:JAR包内的ClassName。
- DI/DATAX
代码详细信息如下所示。
{ "name": "insert_rec_tem_user_table_test_preprocess_all_feature_holo", "type": "DI", "dependencies": [ "rec_tem_user_table_test_preprocess_all_feature" ], "dataxjson": "test/feature/1.json", "comment": "DATAX, for more detail please access https://github.com/alibaba/datax" }
代码示例中将test目录置于bigdata/下。
其中,参数dataxjson表示DataX Job的参数,详细信息请参见DataX。
- SPARKSQL_PARALLEL
- 执行以下命令,编译DAG生成bigdata_pipeline.tar.gz。
python3.7 tools/convertdag2pipeline.py bigdata/test.json
- 上传和运行Pipeline。详细信息,请参见Kubeflow Easyrec Pipeline示例。