利用MaxCompute External Volume处理非结构化数据

External VolumeMaxCompute提供的分布式文件系统和数据存储方案,为OSS路径在MaxCompute中的映射对象。MaxCompute支持通过创建External Volume去挂载OSS的一个路径,并利用MaxCompute权限管理系统对用户访问External Volume做细粒度的权限控制,同时利用MaxCompute引擎处理External Volume内部的文件数据。每个Project中可以有多个External Volume。本文为您介绍如何利用MaxCompute External Volume处理非结构化数据。

前提条件

  • 申请开通External Volume,详情请参见新功能试用申请

  • 已安装v0.43.0或以上版本的MaxCompute客户端,详情请参见使用本地客户端(odpscmd)连接

    通过SDK操作时,Java SDK版本需为v0.43.0及以上版本,详情请参见版本更新记录

  • 已开通OSS服务并创建存储空间,同时授予MaxCompute项目访问权限,详情请参见STS模式授权

    说明

    External Volume中的数据存储在OSS上,MaxCompute侧不会对External Volume的数据重复收取存储费用。使用MaxCompute的各个计算引擎读取计算External Volume的数据,例如Spark on MaxCompute、MapReduce任务等会收取计算费用。 MaxCompute的引擎计算结果放在External Volume中,例如Proxima生成的索引数据,也由OSS收取存储费用。

快速使用

  1. 授权。

    说明

    使用External Volume,您需要同时具有以下相关权限:CreateInstance、CreateVolume、List、Read、Write权限,详细内容请参见MaxCompute权限

    1. 使用如下命令确认当前用户权限是否包含CreateVolume

      SHOW grants FOR <user_name>;
    2. 如没有CreateVolume 权限,需执行下面命令进行授权。

      GRANT CreateVolume ON project <project_name> TO USER <user_name>;

      如需取消授权请执行如下命令。

      REVOKE CreateVolume ON project <project_name> FROM USER <user_name>;
    3. 再次执行SHOW GRANTS命令,确认当前用户权限是否包含CreateVolume权限。

  2. 创建External Volume。

    使用如下命令创建External Volume。

    vfs -create <volume_name>  
        -storage_provider <oss> 
        -url <oss://oss_endpoint/bucket/path>
        -acd <true|false>
        -role_arn <arn:aliyun:xxx/aliyunodpsdefaultrole> 

    参数说明及更多External Volume操作请参见External Volume操作

    创建完成的External VolumeMaxCompute中的路径为:odps://[project_name]/[volume_name],其中project_nameMaxCompute项目名称;volume_nameExternal Volume名称。Spark引擎和MapReduce任务等都可以使用External VolumeMaxCompute中的路径。

  3. 查看已经创建的External Volume。

    使用如下命令查看已创建的External Volume。

    vfs -ls /;

使用场景

利用Spark on MaxCompute通过External Volume引用或处理OSS数据

Spark on MaxComputeMaxCompute提供的兼容开源Spark的计算服务。它在统一的计算资源和数据集权限体系之上,提供Spark计算框架,支持您以熟悉的开发使用方式提交运行Spark作业,满足更丰富的数据处理分析需求。Spark在运行过程中需要加载作业运行资源(File、Archive),其中一种方式是使用Spark直接访问OSS,详情请参见Spark访问OSS。如果需要对资源和数据做细粒度的权限控制,则使用External Volume的方式,通过数仓的权限体系,对资源做访问控制。

引用External Volume资源

Spark on MaxCompute支持在作业启动时直接引用External Volume资源,通过参数配置的External Volume资源在作业启动时会自动下载到作业工作目录,当前支持如下两种文件类型:

  • File:File可以是任意类型的文件(如jarpy)。

  • Archive:Archive必须是ziptar.gztar这几种压缩类型。

二者的区别是File类型只会直接下载文件到任务的当前工作目录;Archive类型除了下载文件,还会在当前工作目录自动解压文件,此时需要用到两个 External Volume相关的参数来指引Spark程序处理External Volume对象包含的OSS数据:

说明

以下参数需要配置在DataWorksODPS Spark节点配置项的参数中或配置在spark-defaults.conf文件中,不能配置在代码中。

参数

说明

spark.hadoop.odps.cupid.volume.files

该参数指定任务运行所需要的类型文件,任务可以同时指定多个,用逗号隔开,文件将会下载到Spark任务的当前工作目录。

  • 参数值格式:

    odps://[project_name]/[volume_name]/[path_to_file],[path_to_file]

    其中project_nameMaxCompute项目名称;volume_nameExternal Volume名称;path_to_file为文件名称。

    重要

    参数值可以包含多级目录,但必须要指定到具体文件,不能是目录。

  • 参数配置示例:

    spark.hadoop.odps.cupid.volume.files=
    odps://mc_project/external_volume/data/mllib/kmeans_data.txt,
    odps://mc_project/external_volume/target/PythonKMeansExample/KMeansModel/data/part-00000-a2d44ac5-54f6-49fd-b793-f11e6a189f90-c000.snappy.parquet

    配置后Spark任务当前工作目录下将会生成两个文件:kmeans_data.txt

    part-00000-a2d44ac5-54f6-49fd-b793-f11e6a189f90-c000.snappy.parquet

spark.hadoop.odps.cupid.volume.archives

该参数指定任务运行所需要的Archive类型文件,可以同时指定多个,用逗号隔开,文件将会下载到Spark的当前工作目录并进行解压。

  • 参数值格式:

    odps://[project_name]/[volume_name]/[archive_file_name],[archive_file_name]

    其中project_nameMaxCompute项目名称;volume_nameExternal Volume名称;archive_file_nameArchive类型文件名称。

    重要

    参数值可以包含多级目录,但必须要指定到具体文件,不能是目录。

  • 默认值:空。

  • 参数配置示例:

    spark.hadoop.odps.cupid.volume.archives = 
    odps://spark_test_wj2/external_volume/pyspark-3.1.1.zip,
    odps://spark_test_wj2/external_volume/python-3.7.9-ucs4.tar.gz

    配置后Spark任务启动时会在当前工作目录下自动生成两个目录:pyspark-3.1.1.zip

    python-3.7.9-ucs4.tar.gz

处理External Volume OSS资源

Spark on MaxCompute支持在作业运行时通过代码获取External Volume资源,如需获取External Volume资源需在Spark作业代码中配置如下参数。

参数

说明

spark.hadoop.odps.volume.common.filesystem

Spark on MaxCompute识别External Volume开关,需要设置为true

默认值为false,即默认不识别External Volume。

spark.hadoop.odps.cupid.volume.paths

指定需要访问的External Volume路径。

  • 参数格式:

    odps://[project_name]/[volume_name]/

    其中project_nameMaxCompute项目名称;volume_nameExternal Volume名称。

  • 默认值:空。

spark.hadoop.fs.odps.impl

Spark on MaxCompute访问OSS的实现类。

参数值固定:org.apache.hadoop.fs.aliyun.volume.OdpsVolumeFileSystem

spark.hadoop.fs.AbstractFileSystem.odps.impl

Spark on MaxCompute访问OSS的实现类。

参数值固定org.apache.hadoop.fs.aliyun.volume.abstractfsimpl.OdpsVolumeFs

示例代码:利用Kmeans算法通过训练数据(odps://ms_proj1_dev/volume_yyy1/kmeans_data.txt)生成模型到odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel路径下,再通过调用模型将目标数据做分类,将结果存入odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel/data路径下。

-- 配置项
spark.hadoop.odps.cupid.volume.paths=odps://ms_proj1_dev/volume_yyy1/
spark.hadoop.odps.volume.common.filesystem=true
spark.hadoop.fs.odps.impl=org.apache.hadoop.fs.aliyun.volume.OdpsVolumeFileSystem
spark.hadoop.fs.AbstractFileSystem.odps.impl=org.apache.hadoop.fs.aliyun.volume.abstractfsimpl.OdpsVolumeFs

spark.hadoop.odps.access.id=xxxxxxxxx
spark.hadoop.odps.access.key=xxxxxxxxx
spark.hadoop.fs.oss.endpoint=oss-cn-beijing-internal.aliyuncs.com
spark.hadoop.odps.cupid.resources=ms_proj1_dev.jindofs-sdk-3.8.0.jar
spark.hadoop.fs.oss.impl=com.aliyun.emr.fs.oss.JindoOssFileSystem

spark.hadoop.odps.cupid.resources=public.python-2.7.13-ucs4.tar.gz
spark.pyspark.python=./public.python-2.7.13-ucs4.tar.gz/python-2.7.13-ucs4/bin/python
spark.hadoop.odps.spark.version=spark-2.4.5-odps0.34.0

-- 代码
from numpy import array
from math import sqrt

from pyspark import SparkContext
from pyspark.mllib.clustering import KMeans, KMeansModel

if __name__ == "__main__":
    sc = SparkContext(appName="KMeansExample")  # SparkContext

    # Load and parse the data
    data = sc.textFile("odps://ms_proj1_dev/volume_yyy1/kmeans_data.txt")
    parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))

    # Build the model (cluster the data)
    clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")

    # Evaluate clustering by computing Within Set Sum of Squared Errors
    def error(point):
        center = clusters.centers[clusters.predict(point)]
        return sqrt(sum([x**2 for x in (point - center)]))

    WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
    print("Within Set Sum of Squared Error = " + str(WSSSE))

    # Save and load model
    clusters.save(sc, "odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel")

    print(parsedData.map(lambda feature: clusters.predict(feature)).collect())

    sameModel = KMeansModel.load(sc, "odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel")
    
    print(parsedData.map(lambda feature: sameModel.predict(feature)).collect())
    sc.stop()

执行后在External Volume映射的OSS目录下可以看到结果数据。

利用Proxima CEMaxCompute中做向量计算

使用Proxima CEMaxCompute中做向量计算,使用说明和示例如下:

  1. 安装Proxima CE资源包。

    详情请参见安装与运行

  2. 运行任务。

    • 使用限制:

      • Proxima Java SDK目前只支持在LinuxMac系统下的MaxCompute客户端执行任务命令。

        说明

        Proxima CE在运行时分为两部分:本地运行任务和MaxCompute任务。本地运行任务是指里面没有涉及到MaxComputeSQL、MapReduceGraph任务的功能模块部分;MaxCompute任务是指基于MaxComputeSQL、MapReduceGraph等引擎执行的任务,二者会交替执行。Proxima CE运行后,会先尝试在本地机器上(使用MaxCompute客户端运行Proxima CE的机器)加载Proxima内核,如果成功则会在本地运行某些模块调用基于Proxima内核的函数;如果加载失败会报错,但不影响后续运行,模块将调用其他函数替代。由于任务jar包内部为Linux相关依赖,因此不支持在Windows系统下的MaxCompute客户端运行。

      • 暂不支持通过DataWorksMapReduce节点执行任务。因为MapReduce节点集成的底层MaxCompute客户端版本正在升级中,任务会执行失败,请您暂时用MaxCompute客户端提交任务。

    • 数据准备:

      -- 创建输入表
      CREATE TABLE doc_table_float_smoke(pk STRING, vector STRING) PARTITIONED BY (pt STRING);
      CREATE TABLE query_table_float_smoke(pk STRING, vector STRING) PARTITIONED BY (pt STRING);
      
      -- 插入doc数据(底库表)
      ALTER TABLE doc_table_float_smoke add PARTITION(pt='20230116');
      INSERT OVERWRITE TABLE doc_table_float_smoke PARTITION (pt='20230116') VALUES
      ('1.nid','1~1~1~1~1~1~1~1'),
      ('2.nid','2~2~2~2~2~2~2~2'),
      ('3.nid','3~3~3~3~3~3~3~3'),
      ('4.nid','4~4~4~4~4~4~4~4'),
      ('5.nid','5~5~5~5~5~5~5~5'),
      ('6.nid','6~6~6~6~6~6~6~6'),
      ('7.nid','7~7~7~7~7~7~7~7'),
      ('8.nid','8~8~8~8~8~8~8~8'),
      ('9.nid','9~9~9~9~9~9~9~9'),
      ('10.nid','10~10~10~10~10~10~10~10');
      
      -- 插入query数据(查询表)
      ALTER TABLE query_table_float_smoke add PARTITION(pt='20230116');
      INSERT OVERWRITE TABLE query_table_float_smoke PARTITION (pt='20230116') VALUES
      ('q1.nid','1~1~1~1~2~2~2~2'),
      ('q2.nid','4~4~4~4~3~3~3~3'),
      ('q3.nid','9~9~9~9~5~5~5~5');
    • 示例任务代码:

      jar -libjars proxima-ce-aliyun-1.0.0.jar 
      -classpath proxima-ce-aliyun-1.0.0.jar com.alibaba.proxima2.ce.ProximaCERunner 
      -doc_table doc_table_float_smoke 
      -doc_table_partition 20230116 
      -query_table query_table_float_smoke 
      -query_table_partition 20230116 
      -output_table output_table_float_smoke 
      -output_table_partition 20230116 
      -data_type float 
      -dimension 8 
      -topk 1 
      -job_mode train:build:seek:recall 
      -external_volume shanghai_vol_ceshi
      -owner_id 1248953xxx
      ;
    • 示例结果:使用select * from output_table_float_smoke where pt='20230116';命令查询结果表。

      +------------+------------+------------+------------+
      | pk         | knn_result | score      | pt         |
      +------------+------------+------------+------------+
      | q1.nid     | 2.nid      | 4.0        | 20230116   |
      | q1.nid     | 1.nid      | 4.0        | 20230116   |
      | q1.nid     | 3.nid      | 20.0       | 20230116   |
      | q2.nid     | 4.nid      | 4.0        | 20230116   |
      | q2.nid     | 3.nid      | 4.0        | 20230116   |
      | q2.nid     | 2.nid      | 20.0       | 20230116   |
      | q3.nid     | 7.nid      | 32.0       | 20230116   |
      | q3.nid     | 8.nid      | 40.0       | 20230116   |
      | q3.nid     | 6.nid      | 40.0       | 20230116   |
      +------------+------------+------------+------------+