当OSS中的数据格式比较复杂,内置的Extractor无法满足需求时,您可以自定义Extractor来读取OSS上的非结构化数据(文本数据和非文本数据)。本文为您介绍如何自定义Extractor,并基于自定义Extractor来创建OSS外部表,实现访问存储在OSS上的数据。
前提条件
使用限制
- 本文中自定义Extractor的方法不支持访问OSS文本文件中的DATETIME类型数据。如果有此需求,请参见MaxCompute自定义Extractor访问OSS文本文件DateTime类型数据。
- OSS外部表的结构必须与OSS上的数据文件结构一致。
- 您只能通过MaxCompute SQL操作OSS外部表。
注意事项
使用OSS外部表时,您需要注意:
- 如果您创建的OSS外部表为分区表,分区数据在OSS上的存放路径必须符合一定的格式要求。更多格式要求,请参见OSS外部表的数据分区。
- OSS外部表只是在系统中记录与OSS目录的关联关系。当删除外部表时,不会删除对应OSS目录下的数据。
- 如果OSS上的数据文件类型为归档文件,需要先解冻文件。更多解冻操作,请参见解冻文件。
创建外部表语法格式
create external table [if not exists] <mc_oss_extable_name>
(
<col_name> <date_type>,
...
)
[partitioned by (<col_name> <data_type>, ...)]
--指定自定义的Extractor。
stored by '<StorageHandler>'
--指定外部表相关参数。
with serdeproperties (
'delimiter'='<delimiter>',
'odps.properties.rolearn'='<ram_arn>'
--OSS文件为GZIP压缩格式时需要配置。
[,'odps.text.option.gzip.input.enabled'='true']
--其他OSS外部表相关属性,根据实际需要配置。
[,'<property_name>'='<property_value>'[,'<property_name>'='<property_value>'...]]
)
location '<oss_location>'
using '<jar_name>';
- if not exists:可选。如果不指定if not exists选项而存在同名表,会报错。如果指定if not exists,无论是否存在同名表,即使原表结构与要创建的目标表结构不一致,均返回成功。已存在的同名表的元数据信息不会被改动。
- mc_oss_extable_name:必填。待创建的OSS外部表的名称。
- col_name:必填。OSS外部表的列名称。
- date_type:必填。OSS外部表的列的数据类型。
- partitioned by (<col_name> <data_type>, ...):可选。指定OSS外部表为分区表时的分区信息。
- col_name:必填。分区列的名称。
- date_type:必填。分区列的数据类型。
- StorageHandler:必填。指定自定义的StorageHandler的类名称。
- odps.properties.rolearn'='<ram_arn>:必填。指定RAM中AliyunODPSDefaultRole的ARN信息。您可以通过RAM控制台中的角色详情获取。
- location:必填。指定数据文件的OSS路径。OSS目录格式为
oss://<oss_endpoint>/<Bucket名称>/<目录名称>/
。系统会默认读取该目录下的所有文件。- oss_endpoint:OSS访问域名信息。建议您使用OSS提供的内网域名,否则将产生OSS流量费用。更多OSS内网域名信息,请参见访问域名和数据中心。建议数据存放的OSS区域与MaxCompute项目所在区域保持一致。由于MaxCompute只在部分区域部署,跨区域的数据连通性可能存在问题。
- Bucket名称:OSS存储空间名称,即Bucket名称。查看存储空间名称操作,请参见列举存储空间。
- 目录名称:指定OSS目录名称。目录后不需要指定文件名,错误用法如下:
http://oss-odps-test.oss-cn-shanghai-internal.aliyuncs.com/Demo/ -- 不支持HTTP连接。 https://oss-odps-test.oss-cn-shanghai-internal.aliyuncs.com/Demo/ -- 不支持HTTPS连接。 oss://oss-odps-test.oss-cn-shanghai-internal.aliyuncs.com/Demo -- 连接地址错误。 oss://oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/vehicle.csv -- 不需要指定文件名。
- 'odps.text.option.gzip.input.enabled'='true':当OSS数据文件为GZIP压缩格式时,必须配置。
- <property_name>'='<property_value>':可选。property_name为属性名称,property_value为属性值。更多属性信息,请参见内置Extractor访问OSS。
- USING:指定Extractor类定义所在的JAR包。
自定义Extractor访问OSS上的文本数据
以文本数据文件为例,记录之间的列通过
|
分隔。存储路径为/demo/SampleData/CustomTxt/AmbulanceData/vehicle.csv
,数据内容如下。1|1|51|1|46.81006|-92.08174|9/14/2014 0:00|S
1|2|13|1|46.81006|-92.08174|9/14/2014 0:00|NE
1|3|48|1|46.81006|-92.08174|9/14/2014 0:00|NE
1|4|30|1|46.81006|-92.08174|9/14/2014 0:00|W
1|5|47|1|46.81006|-92.08174|9/14/2014 0:00|S
1|6|9|1|46.81006|-92.08174|9/14/2014 0:00|S
1|7|53|1|46.81006|-92.08174|9/14/2014 0:00|N
1|8|63|1|46.81006|-92.08174|9/14/2014 0:00|SW
1|9|4|1|46.81006|-92.08174|9/14/2014 0:00|NE
1|10|31|1|46.81006|-92.08174|9/14/2014 0:00|N
操作流程如下:
- 通过IDEA工具定义一个Extractor类。
编写一个通用的Extractor,将分隔符作为参数传入,可以处理所有类似格式的TEXT文件。代码示例如下。
/** * Text extractor that extract schematized records from formatted plain-text(csv, tsv etc.) **/ public class TextExtractor extends Extractor { private InputStreamSet inputs; private String columnDelimiter; private DataAttributes attributes; private BufferedReader currentReader; private boolean firstRead = true; public TextExtractor() { // default to ",", this can be overwritten if a specific delimiter is provided (via DataAttributes) this.columnDelimiter = ","; } // no particular usage for execution context in this example @Override public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes) { this.inputs = inputs; //inputs是一个InputStreamSet,每次调用next()返回一个InputStream,这个InputStream可以读取一个OSS文件的所有内容。 this.attributes = attributes; // check if "delimiter" attribute is supplied via SQL query String columnDelimiter = this.attributes.getValueByKey("delimiter"); //delimiter通过DDL语句传参。 if ( columnDelimiter != null) { this.columnDelimiter = columnDelimiter; } // note: more properties can be inited from attributes if needed } @Override public Record extract() throws IOException {//extactor()调用返回一条Record,代表外部表中的一条记录。 String line = readNextLine(); if (line == null) { return null; // 返回NULL来表示这个表中已经没有记录可读。 } return textLineToRecord(line); // textLineToRecord将一行数据按照delimiter分割为多个列。 } @Override public void close(){ // no-op } }
说明 更多使用TextLineToRecord将数据分割的完整实现,请参见TextExtractor.java。 - 通过IDEA工具定义一个StorageHandler类。
StorageHandler是外部表自定义逻辑的统一入口。代码示例如下。
package com.aliyun.odps.udf.example.text; public class TextStorageHandler extends OdpsStorageHandler { @Override public Class<? extends Extractor> getExtractorClass() { return TextExtractor.class; } @Override public Class<? extends Outputer> getOutputerClass() { return TextOutputer.class; } }
- 通过IDEA工具编译上述自定义类代码并打包为JAR包,通过MaxCompute客户端添加为MaxCompute资源。
添加MaxCompute资源命令示例如下。
add jar odps-udf-example.jar;
- 通过MaxCompute客户端创建外部表。
命令示例如下,其中delimeter是OSS数据的分隔符名称。
create external table if not exists ambulance_data_txt_external ( vehicleId int, recordId int, patientId int, calls int, locationLatitute double, locationLongtitue double, recordTime string, direction string ) stored by 'com.aliyun.odps.udf.example.text.TextStorageHandler' with serdeproperties ( 'delimiter'='\\|', 'odps.properties.rolearn'='acs:ram::xxxxxxxxxxxxx:role/aliyunodpsdefaultrole' ) location 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/SampleData/CustomTxt/AmbulanceData/' using 'odps-udf-example.jar';
- 通过MaxCompute客户端执行如下SQL语句查询外部表。
select recordId, patientId, direction from ambulance_data_txt_external where patientId > 25;
自定义Extractor访问OSS上的非文本数据
以语音数据(WAV格式文件)为例,为您介绍如何通过自定义的Extractor访问并处理OSS上的非文本数据文件。
操作流程如下:
- 自定义Extractor、StorageHandler类并编译、打包为JAR包。更多逻辑实现,请参见SpeechSentenceSnrExtractor。
- 通过MaxCompute客户端执行如下SQL语句创建外部表。
create external table if not exists speech_sentence_snr_external ( sentence_snr double, id string ) stored by 'com.aliyun.odps.udf.example.speech.SpeechStorageHandler' with serdeproperties ( 'mlfFileName'='sm_random_5_utterance.text.label' , 'speechSampleRateInKHz' = '16' ) location 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/dev/SpeechSentenceTest/' using 'odps-udf-example.jar,sm_random_5_utterance.text.label';
- com.aliyun.odps.udf.example.speech.SpeechStorageHandler:封装的Extractor可计算语音文件的平均有效语句信噪比,并将抽取出来的结构化数据直接进行SQL运算。
- location:
oss://oss-cn-hangzhou-zmf.aliyuncs.com/oss-odps-test/dev/SpeechSentenceTest/
存储了多个原始的WAV格式的语音文件,MaxCompute框架会读取该地址上的所有文件,并按照文件级别分片,自动将文件分配给多个计算节点处理。
- 通过MaxCompute客户端执行如下SQL语句查询结果。
select sentence_snr, id from speech_sentence_snr_external where sentence_snr > 10.0;
返回结果如下。-------------------------------------------------------------- | sentence_snr | id | -------------------------------------------------------------- | 34.4703 | J310209090013_H02_K03_042 | -------------------------------------------------------------- | 31.3905 | tsh148_seg_2_3013_3_6_48_80bd359827e24dd7_0 | -------------------------------------------------------------- | 35.4774 | tsh148_seg_3013_1_31_11_9d7c87aef9f3e559_0 | -------------------------------------------------------------- | 16.0462 | tsh148_seg_3013_2_29_49_f4cb0990a6b4060c_0 | -------------------------------------------------------------- | 14.5568 | tsh_148_3013_5_13_47_3d5008d792408f81_0 | --------------------------------------------------------------
通过自定义Extractor,可以在SQL语句上分布式地处理多个OSS上的语音数据文件。您可以使用同样的方法利用MaxCompute的大规模计算能力,处理图像、视频等各种类型的非结构化数据。
在文档使用中是否遇到以下问题
更多建议
匿名提交