External Volume是MaxCompute提供的分布式文件系统和数据存储方案,为OSS路径在MaxCompute中的映射对象。MaxCompute支持通过创建External Volume去挂载OSS的一个路径,并利用MaxCompute权限管理系统对用户访问External Volume做细粒度的权限控制,同时利用MaxCompute引擎处理External Volume内部的文件数据。每个Project中可以有多个External Volume。本文为您介绍如何利用MaxCompute External Volume处理非结构化数据。
前提条件
申请开通External Volume,详情请参见新功能试用申请。
已安装v0.43.0或以上版本的MaxCompute客户端,详情请参见使用本地客户端(odpscmd)连接。
通过SDK操作时,Java SDK版本需为v0.43.0及以上版本,详情请参见版本更新记录。
已开通OSS服务并创建存储空间,同时授予MaxCompute项目访问权限,详情请参见STS模式授权。
说明External Volume中的数据存储在OSS上,MaxCompute侧不会对External Volume的数据重复收取存储费用。使用MaxCompute的各个计算引擎读取计算External Volume的数据,例如Spark on MaxCompute、MapReduce任务等会收取计算费用。 MaxCompute的引擎计算结果放在External Volume中,例如Proxima生成的索引数据,也由OSS收取存储费用。
快速使用
授权。
说明使用External Volume,您需要同时具有以下相关权限:CreateInstance、CreateVolume、List、Read、Write权限,详细内容请参见MaxCompute权限。
使用如下命令确认当前用户权限是否包含
CreateVolume
。SHOW grants FOR <user_name>;
如没有CreateVolume 权限,需执行下面命令进行授权。
GRANT CreateVolume ON project <project_name> TO USER <user_name>;
如需取消授权请执行如下命令。
REVOKE CreateVolume ON project <project_name> FROM USER <user_name>;
再次执行
SHOW GRANTS
命令,确认当前用户权限是否包含CreateVolume
权限。
创建External Volume。
使用如下命令创建External Volume。
vfs -create <volume_name> -storage_provider <oss> -url <oss://oss_endpoint/bucket/path> -acd <true|false> -role_arn <arn:aliyun:xxx/aliyunodpsdefaultrole>
参数说明及更多External Volume操作请参见External Volume操作。
创建完成的External Volume在MaxCompute中的路径为:
odps://[project_name]/[volume_name]
,其中project_name为MaxCompute项目名称;volume_name为External Volume名称。Spark引擎和MapReduce任务等都可以使用External Volume在MaxCompute中的路径。查看已经创建的External Volume。
使用如下命令查看已创建的External Volume。
vfs -ls /;
使用场景
利用Spark on MaxCompute通过External Volume引用或处理OSS数据
Spark on MaxCompute是MaxCompute提供的兼容开源Spark的计算服务。它在统一的计算资源和数据集权限体系之上,提供Spark计算框架,支持您以熟悉的开发使用方式提交运行Spark作业,满足更丰富的数据处理分析需求。Spark在运行过程中需要加载作业运行资源(File、Archive),其中一种方式是使用Spark直接访问OSS,详情请参见Spark访问OSS。如果需要对资源和数据做细粒度的权限控制,则使用External Volume的方式,通过数仓的权限体系,对资源做访问控制。
引用External Volume资源
Spark on MaxCompute支持在作业启动时直接引用External Volume资源,通过参数配置的External Volume资源在作业启动时会自动下载到作业工作目录,当前支持如下两种文件类型:
File:File可以是任意类型的文件(如
jar
或py
)。Archive:Archive必须是
zip
、tar.gz
、tar
这几种压缩类型。
二者的区别是File类型只会直接下载文件到任务的当前工作目录;Archive类型除了下载文件,还会在当前工作目录自动解压文件,此时需要用到两个 External Volume相关的参数来指引Spark程序处理External Volume对象包含的OSS数据:
以下参数需要配置在DataWorks的ODPS Spark节点配置项的参数中或配置在spark-defaults.conf
文件中,不能配置在代码中。
参数 | 说明 |
spark.hadoop.odps.cupid.volume.files | 该参数指定任务运行所需要的类型文件,任务可以同时指定多个,用逗号隔开,文件将会下载到Spark任务的当前工作目录。
|
spark.hadoop.odps.cupid.volume.archives | 该参数指定任务运行所需要的Archive类型文件,可以同时指定多个,用逗号隔开,文件将会下载到Spark的当前工作目录并进行解压。
|
处理External Volume OSS资源
Spark on MaxCompute支持在作业运行时通过代码获取External Volume资源,如需获取External Volume资源需在Spark作业代码中配置如下参数。
参数 | 说明 |
spark.hadoop.odps.volume.common.filesystem | Spark on MaxCompute识别External Volume开关,需要设置为 默认值为 |
spark.hadoop.odps.cupid.volume.paths | 指定需要访问的External Volume路径。
|
spark.hadoop.fs.odps.impl | Spark on MaxCompute访问OSS的实现类。 参数值固定: |
spark.hadoop.fs.AbstractFileSystem.odps.impl | Spark on MaxCompute访问OSS的实现类。 参数值固定: |
示例代码:利用Kmeans算法通过训练数据(odps://ms_proj1_dev/volume_yyy1/kmeans_data.txt
)生成模型到odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel
路径下,再通过调用模型将目标数据做分类,将结果存入odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel/data
路径下。
-- 配置项
spark.hadoop.odps.cupid.volume.paths=odps://ms_proj1_dev/volume_yyy1/
spark.hadoop.odps.volume.common.filesystem=true
spark.hadoop.fs.odps.impl=org.apache.hadoop.fs.aliyun.volume.OdpsVolumeFileSystem
spark.hadoop.fs.AbstractFileSystem.odps.impl=org.apache.hadoop.fs.aliyun.volume.abstractfsimpl.OdpsVolumeFs
spark.hadoop.odps.access.id=xxxxxxxxx
spark.hadoop.odps.access.key=xxxxxxxxx
spark.hadoop.fs.oss.endpoint=oss-cn-beijing-internal.aliyuncs.com
spark.hadoop.odps.cupid.resources=ms_proj1_dev.jindofs-sdk-3.8.0.jar
spark.hadoop.fs.oss.impl=com.aliyun.emr.fs.oss.JindoOssFileSystem
spark.hadoop.odps.cupid.resources=public.python-2.7.13-ucs4.tar.gz
spark.pyspark.python=./public.python-2.7.13-ucs4.tar.gz/python-2.7.13-ucs4/bin/python
spark.hadoop.odps.spark.version=spark-2.4.5-odps0.34.0
-- 代码
from numpy import array
from math import sqrt
from pyspark import SparkContext
from pyspark.mllib.clustering import KMeans, KMeansModel
if __name__ == "__main__":
sc = SparkContext(appName="KMeansExample") # SparkContext
# Load and parse the data
data = sc.textFile("odps://ms_proj1_dev/volume_yyy1/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")
# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
center = clusters.centers[clusters.predict(point)]
return sqrt(sum([x**2 for x in (point - center)]))
WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))
# Save and load model
clusters.save(sc, "odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel")
print(parsedData.map(lambda feature: clusters.predict(feature)).collect())
sameModel = KMeansModel.load(sc, "odps://ms_proj1_dev/volume_yyy1/target/PythonKMeansExample/KMeansModel")
print(parsedData.map(lambda feature: sameModel.predict(feature)).collect())
sc.stop()
执行后在External Volume映射的OSS目录下可以看到结果数据。
利用Proxima CE在MaxCompute中做向量计算
使用Proxima CE在MaxCompute中做向量计算,使用说明和示例如下:
安装Proxima CE资源包。
详情请参见安装与运行。
运行任务。
使用限制:
Proxima Java SDK目前只支持在Linux和Mac系统下的MaxCompute客户端执行任务命令。
说明Proxima CE在运行时分为两部分:本地运行任务和MaxCompute任务。本地运行任务是指里面没有涉及到MaxCompute的SQL、MapReduce和Graph任务的功能模块部分;MaxCompute任务是指基于MaxCompute的SQL、MapReduce和Graph等引擎执行的任务,二者会交替执行。Proxima CE运行后,会先尝试在本地机器上(使用MaxCompute客户端运行Proxima CE的机器)加载Proxima内核,如果成功则会在本地运行某些模块调用基于Proxima内核的函数;如果加载失败会报错,但不影响后续运行,模块将调用其他函数替代。由于任务jar包内部为Linux相关依赖,因此不支持在Windows系统下的MaxCompute客户端运行。
暂不支持通过DataWorks的MapReduce节点执行任务。因为MapReduce节点集成的底层MaxCompute客户端版本正在升级中,任务会执行失败,请您暂时用MaxCompute客户端提交任务。
数据准备:
-- 创建输入表 CREATE TABLE doc_table_float_smoke(pk STRING, vector STRING) PARTITIONED BY (pt STRING); CREATE TABLE query_table_float_smoke(pk STRING, vector STRING) PARTITIONED BY (pt STRING); -- 插入doc数据(底库表) ALTER TABLE doc_table_float_smoke add PARTITION(pt='20230116'); INSERT OVERWRITE TABLE doc_table_float_smoke PARTITION (pt='20230116') VALUES ('1.nid','1~1~1~1~1~1~1~1'), ('2.nid','2~2~2~2~2~2~2~2'), ('3.nid','3~3~3~3~3~3~3~3'), ('4.nid','4~4~4~4~4~4~4~4'), ('5.nid','5~5~5~5~5~5~5~5'), ('6.nid','6~6~6~6~6~6~6~6'), ('7.nid','7~7~7~7~7~7~7~7'), ('8.nid','8~8~8~8~8~8~8~8'), ('9.nid','9~9~9~9~9~9~9~9'), ('10.nid','10~10~10~10~10~10~10~10'); -- 插入query数据(查询表) ALTER TABLE query_table_float_smoke add PARTITION(pt='20230116'); INSERT OVERWRITE TABLE query_table_float_smoke PARTITION (pt='20230116') VALUES ('q1.nid','1~1~1~1~2~2~2~2'), ('q2.nid','4~4~4~4~3~3~3~3'), ('q3.nid','9~9~9~9~5~5~5~5');
示例任务代码:
jar -libjars proxima-ce-aliyun-1.0.0.jar -classpath proxima-ce-aliyun-1.0.0.jar com.alibaba.proxima2.ce.ProximaCERunner -doc_table doc_table_float_smoke -doc_table_partition 20230116 -query_table query_table_float_smoke -query_table_partition 20230116 -output_table output_table_float_smoke -output_table_partition 20230116 -data_type float -dimension 8 -topk 1 -job_mode train:build:seek:recall -external_volume shanghai_vol_ceshi -owner_id 1248953xxx ;
示例结果:使用
select * from output_table_float_smoke where pt='20230116';
命令查询结果表。+------------+------------+------------+------------+ | pk | knn_result | score | pt | +------------+------------+------------+------------+ | q1.nid | 2.nid | 4.0 | 20230116 | | q1.nid | 1.nid | 4.0 | 20230116 | | q1.nid | 3.nid | 20.0 | 20230116 | | q2.nid | 4.nid | 4.0 | 20230116 | | q2.nid | 3.nid | 4.0 | 20230116 | | q2.nid | 2.nid | 20.0 | 20230116 | | q3.nid | 7.nid | 32.0 | 20230116 | | q3.nid | 8.nid | 40.0 | 20230116 | | q3.nid | 6.nid | 40.0 | 20230116 | +------------+------------+------------+------------+