MaxCompute的非结构化框架支持您通过INSERT方式将MaxCompute的数据通过关联的OSS外部表直接输出到OSS。本文为您介绍如何将数据输出到OSS。

前提条件

在将数据输出到OSS前,请您确认已完成以下操作:

  • OSS授权。

    更多授权操作信息,请参见STS模式授权

  • 已准备好OSS存储空间(Bucket)、OSS目录及OSS数据文件。

    更多OSS存储空间操作信息,请参见创建存储空间

    更多新建OSS目录操作信息,请参见创建目录

    更多上传数据文件操作信息,请参见上传文件

背景信息

与访问OSS数据类似,MaxCompute支持通过内置Extractor或自定义Extractor,将MaxCompute内部表数据或MaxCompute处理外部表得到的数据输出到OSS:
  • 通过内置Extractor将数据输出到OSS

    使用MaxCompute内置的Extractor时,您需要通过建表语句创建OSS外部表,并通过INSERT操作非常方便地按照约定格式(TSV或CSV)将数据输出到OSS进行存储。

    MaxCompute内置的Extractor包含如下两种:
    • com.aliyun.odps.CsvStorageHandler:定义如何读写CSV格式的数据。数据各列以英文逗号(,)为分隔符,换行符为\n
    • com.aliyun.odps.TsvStorageHandler:定义如何读写TSV格式的数据。数据各列以\t为分隔符,换行符为\n
  • 通过自定义StorageHandler输出到OSS

    MaxCompute非结构化框架还提供了通用的SDK自定义Extractor,支持对外输出自定义数据格式文件。

    与内置Extractor一样,您需要先创建OSS外部表,再通过对外部表的INSERT操作实现将数据输出到OSS。不同点在于创建外部表时,stored by需要指定自定义的Extractor。
    说明 MaxCompute非结构化框架通过StorageHandler接口,来描述对各种数据存储格式的处理。StorageHandler作为一个wrapper class,允许您指定自定义的Extractor(用于数据的读入、解析、处理等)和Outputer(用于数据的处理和输出等)。自定义的StorageHandler应该继承OdpsStorageHandler,实现getExtractorClass和getOutputerClass接口。

更多创建OSS外部表语法信息,请参见创建外部表语法格式

使用限制

执行INSERT操作将数据输出至OSS时,单个文件的大小不能超过5 GB。

通过内置Extractor将数据输出到OSS

假设在OSS上准备的信息如下:
  • OSS内网域名:oss-cn-hangzhou-internal.aliyuncs.com,即华东1(杭州)。
  • 非分区表数据存储路径:oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/Demo1/output/
  • 分区表数据存储路径:oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/Demo2/output/

操作流程如下:

  1. 登录MaxCompute客户端,创建OSS外部表。

    命令示例如下。

    • 创建非分区表
      create external table if not exists mc_oss_csv_external4
      (
      vehicleId int,
      recordId int,
      patientId int,
      calls int,
      locationLatitute double,
      locationLongtitue double,
      recordTime string,
      direction string
      )
      stored by 'com.aliyun.odps.CsvStorageHandler' 
      with serdeproperties (
       'odps.properties.rolearn'='acs:ram::xxxxxx:role/aliyunodpsdefaultrole'
      ) 
      location 'oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/Demo1/output/';                       
    • 创建分区表
      create external table if not exists mc_oss_csv_external5
      (
      vehicleId int,
      recordId int,
      patientId int,
      calls int,
      locationLatitute double,
      locationLongtitue double,
      recordTime string
      )
      partitioned by (
      direction string
      )
      stored by 'com.aliyun.odps.CsvStorageHandler' 
      with serdeproperties (
       'odps.properties.rolearn'='acs:ram::xxxxxx:role/aliyunodpsdefaultrole'
      ) 
      location 'oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/Demo2/output/';                      
    说明 如果with serdeproperties中未设置odps.properties.rolearn属性,且授权方式是采用明文AccessKey,则location格式如下。
    location 'oss://<accessKeyId>:<accessKeySecret>@<oss_endpoint>/<Bucket名称>/<目录名称>/'
  2. 通过MaxCompute客户端对外部表执行insert overwriteinsert into命令,将数据输出到OSS。

    更多INSERT操作信息,请参见插入或覆写数据(INSERT INTO | INSERT OVERWRITE)插入或覆写动态分区数据(DYNAMIC PARTITION)

    命令示例如下。
    • 将非分区表数据输出到OSS
      insert into table mc_oss_csv_external4 select * from mc_oss_csv_external1;

      mc_oss_csv_external1的数据信息,请参见内置Extractor访问OSS

      执行成功后,您可以在OSS路径下查看到导出的文件。output文件夹下产生了一个.odps文件夹,包含.csv.meta文件。

      导出结果
    • 将分区表数据输出到OSS
      insert into table mc_oss_csv_external5 partition (direction) select * from mc_oss_csv_external2;

      mc_oss_csv_external2的数据信息,请参见内置Extractor访问OSS

      执行成功后,您可以在OSS路径下查看到导出的文件。output文件夹下根据INSERT语句指定的分区值生成对应的分区子目录。分区子目录中包括.odps文件夹。例如output/direction=N/.odps/20210330*********/M1_0_0-0_TableSink1-0-.csv

      导出结果
    说明
    • .odps文件夹中的.meta文件为MaxCompute额外输出的宏数据文件,用于记录当前文件夹中有效的数据。正常情况下,如果INSERT操作成功,可以认为当前文件夹的所有数据均是有效数据。只有在有作业失败的情况下,需要对该宏数据进行解析。即使是在作业中途失败或被中止的情况下,对于INSERT OVERWRITE操作,再运行一次即可。
    • 对于MaxCompute内置的TSV或CSV Extractor,处理产生的文件数目与对应SQL的并发度相同。
    • 如果insert overwrite ... select ... from ...;操作在源数据表from_tablename上分配了1000个Mapper,则最后将产生1000个TSV或CSV文件。
    • 如果您需要控制生成文件的数目,可以通过MaxCompute的各种灵活语义和配置来实现。如果Outputer在Mapper里,可以通过调整odps.stage.mapper.split.size的大小来控制Mapper的并发数,从而调整产生的文件数目。如果Outputer在Reducer或Joiner里,也可以分别通过odps.stage.reducer.numodps.stage.joiner.num来调整。

通过自定义StorageHandler输出到OSS

基于自定义Extractor访问OSS上的文本数据中的TextStorageHandler,为您介绍MaxCompute如何通过自定义Extractor将数据输出到OSS,以|为列分隔符,以\n为换行符。
说明 MaxCompute Studio配置好MaxCompute Java Module后,您可以在Examples文件夹中看到对应的示例代码,或通过源代码也可以看到完整代码。

操作流程如下:

  1. 通过IDEA工具定义一个Outputer类。
    输出逻辑都必须实现Outputer接口。
    package com.aliyun.odps.examples.unstructured.text;
    import com.aliyun.odps.data.Record;
    import com.aliyun.odps.io.OutputStreamSet;
    import com.aliyun.odps.io.SinkOutputStream;
    import com.aliyun.odps.udf.DataAttributes;
    import com.aliyun.odps.udf.ExecutionContext;
    import com.aliyun.odps.udf.Outputer;
    import java.io.IOException;
    public class TextOutputer extends Outputer {
        private SinkOutputStream outputStream;
        private DataAttributes attributes;
        private String delimiter;
        public TextOutputer (){
            // default delimiter, this can be overwritten if a delimiter is provided through the attributes.
            this.delimiter = "|";
        }
        @Override
        public void output(Record record) throws IOException {
            this.outputStream.write(recordToString(record).getBytes());
        }
        // no particular usage of execution context in this example
        @Override
        public void setup(ExecutionContext ctx, OutputStreamSet outputStreamSet, DataAttributes attributes) throws IOException {
            this.outputStream = outputStreamSet.next();
            this.attributes = attributes;
            this.delimiter = this.attributes.getValueByKey("delimiter");
            if ( this.delimiter == null)
            {
                this.delimiter=",";
            }
            System.out.println("Extractor using delimiter [" + this.delimiter + "].");
        }
        @Override
        public void close() {
            // no-op
        }
        private String recordToString(Record record){
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < record.getColumnCount(); i++)
            {
                if (null == record.get(i)){
                    sb.append("NULL");
                }
                else{
                    sb.append(record.get(i).toString());
                }
                if (i != record.getColumnCount() - 1){
                    sb.append(this.delimiter);
                }
            }
            sb.append("\n");
            return sb.toString();
        }
    }

    Outputer接口有三个:setup、output和close,与Extractor的setup、extract和close三个接口基本上对称。其中:setup()close()在一个Outputer中只会调用一次,可以在setup()里做初始化准备工作。通常,您需要把setup()传递进来的这三个参数保存成Outputer的class variable,方便在之后output()close()接口中使用。而close()接口用于代码的扫尾工作。

    通常,数据处理发生在output(Record)接口内。MaxCompute根据当前Outputer分配处理的每个输入Record调用一次output(Record)。假设,在一个output(Record)调用返回的时候,代码已经消费完该Record,则在当前output(Record)返回后,系统会将Record所使用的内存另作他用。因此,当Record中的信息在跨多个output()函数调用时,需要调用当前处理的record.clone()方法,将当前Record保存下来。
    说明 使用外部表自定义Extractor实现Outputer接口时,Outputer.output(Record record)中传入的Record是Outputer的上一个操作产生的记录,这个列名发生了变化。这些列名不保证固定,例如表达式some_function(column_a)产生的列名是一个临时列名。

    因此,当您使用record.get(列名)方式来获取列的内容时需特别注意,建议改用record.get(index)方式获取。

  2. 通过IDEA工具定义一个Extractor类。
    Extractor用于数据的读入、解析、处理等,如果输出的表最终不需要再通过MaxCompute进行读取等,无需定义Extractor。
    package com.aliyun.odps.examples.unstructured.text;
    import com.aliyun.odps.Column;
    import com.aliyun.odps.data.ArrayRecord;
    import com.aliyun.odps.data.Record;
    import com.aliyun.odps.io.InputStreamSet;
    import com.aliyun.odps.udf.DataAttributes;
    import com.aliyun.odps.udf.ExecutionContext;
    import com.aliyun.odps.udf.Extractor;
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    /**
     * Text extractor that extract schematized records from formatted plain-text(csv, tsv etc.)
     **/
    public class TextExtractor extends Extractor {
        private InputStreamSet inputs;
        private String columnDelimiter;
        private DataAttributes attributes;
        private BufferedReader currentReader;
        private boolean firstRead = true;
        public TextExtractor() {
            // default to ",", this can be overwritten if a specific delimiter is provided (via DataAttributes)
            this.columnDelimiter = ",";
        }
        // no particular usage for execution context in this example
        @Override
        public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes) {
            this.inputs = inputs;
            this.attributes = attributes;
            // check if "delimiter" attribute is supplied via SQL query
            String columnDelimiter = this.attributes.getValueByKey("delimiter");
            if ( columnDelimiter != null)
            {
                this.columnDelimiter = columnDelimiter;
            }
            System.out.println("TextExtractor using delimiter [" + this.columnDelimiter + "].");
            // note: more properties can be inited from attributes if needed
        }
        @Override
        public Record extract() throws IOException {
            String line = readNextLine();
            if (line == null) {
                return null;
            }
            return textLineToRecord(line);
        }
        @Override
        public void close(){
            // no-op
        }
        private Record textLineToRecord(String line) throws IllegalArgumentException
        {
            Column[] outputColumns = this.attributes.getRecordColumns();
            ArrayRecord record = new ArrayRecord(outputColumns);
            if (this.attributes.getRecordColumns().length != 0){
                // string copies are needed, not the most efficient one, but suffice as an example here
                String[] parts = line.split(columnDelimiter);
                int[] outputIndexes = this.attributes.getNeededIndexes();
                if (outputIndexes == null){
                    throw new IllegalArgumentException("No outputIndexes supplied.");
                }
                if (outputIndexes.length != outputColumns.length){
                    throw new IllegalArgumentException("Mismatched output schema: Expecting "
                            + outputColumns.length + " columns but get " + parts.length);
                }
                int index = 0;
                for(int i = 0; i < parts.length; i++){
                    // only parse data in columns indexed by output indexes
                    if (index < outputIndexes.length && i == outputIndexes[index]){
                        switch (outputColumns[index].getType()) {
                            case STRING:
                                record.setString(index, parts[i]);
                                break;
                            case BIGINT:
                                record.setBigint(index, Long.parseLong(parts[i]));
                                break;
                            case BOOLEAN:
                                record.setBoolean(index, Boolean.parseBoolean(parts[i]));
                                break;
                            case DOUBLE:
                                record.setDouble(index, Double.parseDouble(parts[i]));
                                break;
                            case DATETIME:
                            case DECIMAL:
                            case ARRAY:
                            case MAP:
                            default:
                                throw new IllegalArgumentException("Type " + outputColumns[index].getType() + " not supported for now.");
                        }
                        index++;
                    }
                }
            }
            return record;
        }
        /**
         * Read next line from underlying input streams.
         * @return The next line as String object. If all of the contents of input
         * streams has been read, return null.
         */
        private String readNextLine() throws IOException {
            if (firstRead) {
                firstRead = false;
                // the first read, initialize things
                currentReader = moveToNextStream();
                if (currentReader == null) {
                    // empty input stream set
                    return null;
                }
            }
            while (currentReader != null) {
                String line = currentReader.readLine();
                if (line != null) {
                    return line;
                }
                currentReader = moveToNextStream();
            }
            return null;
        }
        private BufferedReader moveToNextStream() throws IOException {
            InputStream stream = inputs.next();
            if (stream == null) {
                return null;
            } else {
                return new BufferedReader(new InputStreamReader(stream));
            }
        }
    }
  3. 通过IDEA工具定义一个StorageHandler类。
    package com.aliyun.odps.examples.unstructured.text;
    import com.aliyun.odps.udf.Extractor;
    import com.aliyun.odps.udf.OdpsStorageHandler;
    import com.aliyun.odps.udf.Outputer;
    public class TextStorageHandler extends OdpsStorageHandler {
        @Override
        public Class<? extends Extractor> getExtractorClass() {
            return TextExtractor.class;
        }
        @Override
        public Class<? extends Outputer> getOutputerClass() {
            return TextOutputer.class;
        }
    }

    若表无需读取,无需指定Extractor接口。

  4. 通过IDEA工具编译上述自定义类代码并打包为JAR包,通过MaxCompute客户端添加为MaxCompute资源。
    添加MaxCompute资源命令示例如下。
    add jar odps-TextStorageHandler.jar;
  5. 通过MaxCompute客户端创建外部表。
    与使用内置Extractor类似,需要创建OSS外部表,不同的是指定数据输出到OSS外部表时,使用自定义的Extractor。
    • 创建非分区表
      create external table if not exists output_data_txt_external1
      (
      vehicleId int,
      recordId int,
      patientId int,
      calls int,
      locationLatitute double,
      locationLongtitue double,
      recordTime string,
      direction string
      )
      stored by 'com.aliyun.odps.examples.unstructured.text.TextStorageHandler'  
      with serdeproperties(
          'delimiter'='|'
          [,'odps.properties.rolearn'='acs:ram::xxxxxx:role/aliyunodpsdefaultrole'])
      location 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/SampleData/CustomTxt/AmbulanceData/output/'
      using 'odps-TextStorageHandler.jar';                     
    • 创建分区表
      create external table if not exists output_data_txt_external2
      (
      vehicleId int,
      recordId int,
      patientId int,
      calls int,
      locationLatitute double,
      locationLongtitue double,
      recordTime string
      )
      partitioned by (
      direction string
      )
      stored by 'com.aliyun.odps.examples.unstructured.text.TextStorageHandler' 
      with serdeproperties(
          'delimiter'='|'
          [,'odps.properties.rolearn'='acs:ram::xxxxxx:role/aliyunodpsdefaultrole'])
      location 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/SampleData/CustomTxt/AmbulanceData/output/'
      using 'odps-TextStorageHandler.jar';                      
  6. 通过MaxCompute客户端对外部表执行insert overwriteinsert into命令,将数据输出到OSS。

    更多INSERT操作信息,请参见插入或覆写数据(INSERT INTO | INSERT OVERWRITE)插入或覆写动态分区数据(DYNAMIC PARTITION)

    命令示例如下。
    • 将非分区表数据输出到OSS
      insert into table output_data_txt_external1 select * from mc_oss_csv_external1;

      mc_oss_csv_external1的数据信息,请参见内置Extractor访问OSS

      执行成功后,您可以在OSS路径下查看到导出的文件。output文件夹下产生了一个.odps文件夹,包含.csv.meta文件。

    • 将分区表数据输出到OSS
      insert into table output_data_txt_external2 partition (direction) select * from mc_oss_csv_external2;

      mc_oss_csv_external2的数据信息,请参见内置Extractor访问OSS

      执行成功后,您可以在OSS路径下查看到导出的文件。output文件夹下根据INSERT语句指定的分区值生成对应的分区子目录。分区子目录中包括.odps文件夹。例如output/direction=N/.odps/20210330*********/M1_0_0-0_TableSink1-0-.csv

通过OSS分片上传功能输出到OSS

您还可以使用OSS的分片上传(Multipart Upload)功能通过INSERT操作向OSS外部表写入数据。该功能需要在Session级别或Project级别设置odps.sql.unstructured.oss.commit.mode属性进行开启或关闭。默认关闭。更多分片上传功能信息,请参见分片上传

注意 外部表的分片上传功能,只支持自定义的格式,即stored as xxx的外部表,更多stored as xxx信息,请参见支持开源格式数据。不支持自定义StorageHandler、Extractor以及SQL查询中携带UDF的场景。

在使用该功能前,您需要确认MaxCompute项目已启用jobconf2来生成作业执行计划,即odps.sql.jobconf.odps2属性值为True。

说明 odps.sql.jobconf.odps2属性默认值为True,如果取值非True,请在Session级别执行set odps.sql.jobconf.odps2=true;命令进行设置。
odps.sql.unstructured.oss.commit.mode属性设置不同取值时的实现原理如下:
  • 取值为False:MaxCompute写入到OSS外部表的数据,会存储在LOCATION目录下的.odps文件夹中。.odps文件夹中维护了一个.meta文件,用于保证MaxCompute数据的一致性。.odps的内容只有MaxCompute能正确处理,在其他数据处理引擎中可能无法正确解析,会导致报错。
  • 取值为True:MaxCompute使用分片上传功能,以two-phase commit的方式保证数据的一致性,同时也不会有.odps目录以及.meta文件。兼容其他数据处理引擎。

基于数据开放的需求,通过SQL语句写入OSS的数据是需要能够被其他引擎使用的。您可以设置odps.sql.unstructured.oss.commit.mode值为False,保留当前OSS外部表有.odps目录的机制;另外设置odps.sql.unstructured.oss.commit.mode值为True,启用不包含.odps目录的行为,兼容开源引擎。

注意 insert overwrite操作场景,在极端情况下如果作业运行失败会出现与预期不一致的问题,表现为旧数据被删除,但是新数据并没有写入。

问题原因:新写入的数据因为极低概率的硬件故障或元数据更新失败等原因没有成功写入目标表,且OSS的删除操作不支持回滚,导致被删除的旧数据无法恢复。

解决方案:
  • 如果基于旧数据再覆写OSS外部表,例如insert overwrite table T select * from table T;,需要提前做好OSS数据备份,作业运行失败时,再基于备份的旧数据覆写OSS外部表。
  • 如果insert overwrite作业可以重复提交,当作业运行失败时重新提交作业即可。