Alink是基于实时计算引擎Flink研发的机器学习算法平台,提供了分类、聚类、回归、推荐、图算法、NLP、异常检测、关联规则和相似度计算等几十种常见的机器学习算法。在此基础上,Alink也为Python用户提供了PyAlink。本文为您介绍如何在DataScience上使用基于KubernetesFlink集群运行PyAlink任务。

背景信息

创建DataSicienc集群后,您开启一个终端可以查看到已经安装的K8s集群,您需要在K8s集群上进一步部署Flink集群,并让PyAlink在该Flink集群上运行。您需要安装PyAlink,还需要在K8s部署Flink Service。Flink Service通过标签关联一组Pod(一个JobManager角色和若干TaskManager角色的Pod),该Service的类型为NodePort,您可以在浏览器通过<公网IP地址>:nodePort访问Flink UI。PyAlink

前提条件

已在E-MapReduce控制台上创建DataScience集群,且打开了挂载公网开关,详情请参见创建集群

使用限制

MasterCore实例组都需要有公网IP地址,Core实例组请在ECS控制台上绑定IP地址,详情请参见绑定和解绑弹性公网IP

操作步骤

  1. 开启32211端口,详情请参见添加安全组规则
  2. 登录集群的Master节点,下载deploy_alink.zip并上传至任意目录下。
    本文示例是上传至/root/deploy_alink/目录下。登录集群详情,请参见登录集群
  3. 执行以下命令,安装PyAlink。
    pip3 install pyalink -i https://pypi.tuna.tsinghua.edu.cn/simple
  4. 解压缩ZIP文件。
    1. 执行以下命令,进入ZIP文件上传的目录。
      cd /root/deploy_alink/
    2. 执行以下命令,解压缩上传的ZIP文件。
      unzip deploy_alink.zip
  5. 执行以下命令,启动服务。
    sh start-flink.sh
    该步骤会创建一个TaskManagerFlink集群。您可以手动增加TaskManager节点。例如,本文示例增加TaskManager节点为3个。
    kubectl scale deployment flink-taskmanager --replicas 3
  6. 执行以下命,检查服务状态。
    kubectl get pods
    STATUS显示为Running时,表示服务已成功启动。Start  Service
  7. 编写PyAlink脚本并运行。
    以下是一个KMeans算法的示例。代码示例中的32211是默认端口号,如果您没有在jobmanager-service.yml文件中修改该端口,则使用默认端口号。
    1. 编写脚本文件demo.py
      from pyalink.alink import *
      import pandas as pd
      
      # useRemoteEnv参数为:Flink集群地址、端口和并发度。
      useRemoteEnv("localhost",32211,1)
      
      df = pd.DataFrame([
          [0, "0 0 0"],
          [1, "0.1,0.1,0.1"],
          [2, "0.2,0.2,0.2"],
          [3, "9 9 9"],
          [4, "9.1 9.1 9.1"],
          [5, "9.2 9.2 9.2"]
      ])
      
      data = BatchOperator.fromDataframe(df, schemaStr='id int, vec string')
      
      kmeans = KMeansTrainBatchOp()\
          .setVectorCol("vec")\
          .setK(2)\
          .linkFrom(data)
      
      predictBatch = KMeansPredictBatchOp()\
          .setPredictionCol("pred")\
          .linkFrom(kmeans, data)
      
      predictBatch.print()
                                      
    2. 执行以下命令,运行脚本文件。
      python3 demo.py
      返回以下信息。
      JVM listening on 127.0.0.1:36576
          id          vec pred
      0 0  0        0 0 0    1
      1 0  1  0.1,0.1,0.1    1
      2 0  2  0.2,0.2,0.2    1
      3 0  3        9 9 9    0
      4 0  4  9.1 9.1 9.1    0
      5 0  5  9.2 9.2 9.2    0
    3. 在浏览器中输入<公网IP地址>:32211
      即可查看Flink集群状态。<公网IP地址>可以在集群基础信息页面的主机信息区域获取。
  8. 执行以下命令,停止服务。
    sh stop-flink.sh