MaxCompute支持您通过INSERT方式将MaxCompute项目中的数据通过映射的OSS外部表直接输出到OSS目录。本文为您介绍向OSS写入数据的方法并提供示例。

背景信息

与读取OSS数据类似,MaxCompute支持将内部表数据或处理外部表得到的数据写入OSS:
  • 通过内置文本或开源数据解析器将数据写入OSS

    当需要将数据以CSV、TSV、开源格式或MaxCompute支持的压缩格式写入OSS时,您可以通过基于MaxCompute内置文本数据解析器或开源数据解析器创建的OSS外部表,执行INSERT操作将数据写入OSS。

  • 通过自定义解析器将数据写入OSS

    当需要将数据以自定义格式文件写入OSS时,您可以通过基于自定义解析器创建的OSS外部表,执行INSERT操作将数据写入OSS。

    实现示例请参见示例:通过自定义解析器将数据写入OSS

  • 通过OSS分片上传功能将数据写入OSS

    当需要将数据以开源格式写入OSS时,您可以通过基于开源数据解析器创建的OSS外部表,及OSS的分片上传功能,执行INSERT操作将数据写入OSS。该功能需要在Session级别或Project级别设置odps.sql.unstructured.oss.commit.mode属性进行开启或关闭。默认关闭。更多分片上传功能信息,请参见分片上传

    在使用该功能前,您需要确认MaxCompute项目已启用jobconf2来生成作业执行计划,即odps.sql.jobconf.odps2属性值为True。

    说明 odps.sql.jobconf.odps2属性默认值为True,如果取值非True,请在Session级别执行set odps.sql.jobconf.odps2=true;命令进行设置。
    odps.sql.unstructured.oss.commit.mode属性设置不同取值时的实现原理如下:
    • 取值为False:MaxCompute写入到OSS外部表的数据,会存储在LOCATION目录下的.odps文件夹中。.odps文件夹中维护了一个.meta文件,用于保证MaxCompute数据的一致性。.odps的内容只有MaxCompute能正确处理,在其他数据处理引擎中可能无法正确解析,会导致报错。
    • 取值为True:MaxCompute使用分片上传功能,以two-phase commit的方式保证数据的一致性,同时也不会有.odps目录以及.meta文件。兼容其他数据处理引擎。
    注意 insert overwrite操作场景,在极端情况下如果作业运行失败会出现与预期不一致的问题,表现为旧数据被删除,但是新数据并没有写入。

    问题原因:新写入的数据因为极低概率的硬件故障或元数据更新失败等原因没有成功写入目标表,且OSS的删除操作不支持回滚,导致被删除的旧数据无法恢复。

    解决方案:
    • 如果基于旧数据再覆写OSS外部表,例如insert overwrite table T select * from table T;,需要提前做好OSS数据备份,作业运行失败时,再基于备份的旧数据覆写OSS外部表。
    • 如果insert overwrite作业可以重复提交,当作业运行失败时重新提交作业即可。

前提条件

在向OSS写入数据前,请您确认已完成如下操作:

  • (可选)已准备好OSS存储空间(Bucket)、OSS目录及OSS数据文件。
    说明 MaxCompute已支持在OSS侧自动创建目录,对于携带外部表及UDF的SQL语句,您可以通过一条SQL语句执行读写外部表及UDF的操作。原手动创建目录方式仍然支持。

    更多OSS存储空间、OSS目录、上传数据文件操作信息,请参见创建存储空间创建目录上传文件

示例:通过内置文本数据解析器将数据写入OSS-非分区路径

读取示例:通过内置文本数据解析器创建OSS外部表-非分区表中创建的OSS外部表mc_oss_csv_external1的数据,并以CSV格式写入OSS目录oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/Demo1/output/

操作流程如下:

  1. 登录MaxCompute客户端,执行如下命令创建OSS外部表映射目标OSS目录。
    create external table if not exists mc_oss_csv_external4
    (
    vehicleId int,
    recordId int,
    patientId int,
    calls int,
    locationLatitute double,
    locationLongtitue double,
    recordTime string,
    direction string
    )
    stored by 'com.aliyun.odps.CsvStorageHandler' 
    with serdeproperties (
     'odps.properties.rolearn'='acs:ram::xxxxxx:role/aliyunodpsdefaultrole'
    ) 
    location 'oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/Demo1/output/';                       
  2. 通过MaxCompute客户端对外部表执行insert overwriteinsert into命令,将数据写入OSS。
    insert into table mc_oss_csv_external4 select * from mc_oss_csv_external1;

    执行成功后,您可以在OSS目录下查看到导出的文件。output文件夹下会产生一个.odps文件夹,包含.meta文件和保存.csv文件的文件夹。例如output/.odps/20210330*********/R2_1_0_0-0_TableSink1-0-.csv

    导出结果
    说明
    • .odps文件夹中的.meta文件为MaxCompute额外输出的宏数据文件,用于记录当前文件夹中有效的数据。正常情况下,如果INSERT操作成功,可以认为当前文件夹的所有数据均是有效数据。只有在有作业失败的情况下,需要对该宏数据进行解析。即使是在作业中途失败或被中止的情况下,对于INSERT OVERWRITE操作,再运行一次即可。
    • 对于MaxCompute内置的TSV或CSV解析器,处理产生的文件数目与对应SQL的并发度相同。
    • 如果insert overwrite ... select ... from ...;操作在源数据表from_tablename上分配了1000个Mapper,则最后将产生1000个TSV或CSV文件。
    • 如果您需要控制生成文件的数目,可以通过MaxCompute的各种灵活语义和配置来实现。如果Outputer在Mapper里,可以通过调整odps.stage.mapper.split.size的大小来控制Mapper的并发数,从而调整产生的文件数目。如果Outputer在Reducer或Joiner里,也可以分别通过odps.stage.reducer.numodps.stage.joiner.num来调整。

示例:通过内置文本数据解析器将数据写入OSS-分区路径

读取示例:通过内置文本数据解析器创建OSS外部表-分区表中创建的OSS外部表mc_oss_csv_external2的数据,并以CSV格式写入OSS目录oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/Demo2/output/

操作流程如下:

  1. 登录MaxCompute客户端,执行如下命令创建OSS外部表映射目标OSS目录。
    create external table if not exists mc_oss_csv_external5
    (
    vehicleId int,
    recordId int,
    patientId int,
    calls int,
    locationLatitute double,
    locationLongtitue double,
    recordTime string
    )
    partitioned by (
    direction string
    )
    stored by 'com.aliyun.odps.CsvStorageHandler' 
    with serdeproperties (
     'odps.properties.rolearn'='acs:ram::xxxxxx:role/aliyunodpsdefaultrole'
    ) 
    location 'oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/Demo2/output/';                      
  2. 通过MaxCompute客户端对外部表执行insert overwriteinsert into命令,将数据写入OSS。
    insert into table mc_oss_csv_external5 partition (direction) select * from mc_oss_csv_external2;

    执行成功后,您可以在OSS目录下查看到导出的文件。output文件夹下根据INSERT语句指定的分区值生成对应的分区子目录。分区子目录中会产生一个.odps文件夹,包含.meta文件和保存.csv文件的文件夹。例如output/direction=N/.odps/20210330*********/R2_1_0_0-0_TableSink1-0-.csv

    导出结果
    说明
    • .odps文件夹中的.meta文件为MaxCompute额外输出的宏数据文件,用于记录当前文件夹中有效的数据。正常情况下,如果INSERT操作成功,可以认为当前文件夹的所有数据均是有效数据。只有在有作业失败的情况下,需要对该宏数据进行解析。即使是在作业中途失败或被中止的情况下,对于INSERT OVERWRITE操作,再运行一次即可。
    • 对于MaxCompute内置的TSV或CSV解析器,处理产生的文件数目与对应SQL的并发度相同。
    • 如果insert overwrite ... select ... from ...;操作在源数据表from_tablename上分配了1000个Mapper,则最后将产生1000个TSV或CSV文件。
    • 如果您需要控制生成文件的数目,可以通过MaxCompute的各种灵活语义和配置来实现。如果Outputer在Mapper里,可以通过调整odps.stage.mapper.split.size的大小来控制Mapper的并发数,从而调整产生的文件数目。如果Outputer在Reducer或Joiner里,也可以分别通过odps.stage.reducer.numodps.stage.joiner.num来调整。

示例:通过内置文本数据解析器将数据以压缩方式写入OSS

读取示例:通过内置文本数据解析器创建OSS外部表-非分区表中创建的OSS外部表mc_oss_csv_external1的数据,并以GZIP压缩格式写入OSS目录oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/Demo3/output/

操作流程如下:

  1. 登录MaxCompute客户端,执行如下命令创建OSS外部表映射目标OSS目录。
    create external table if not exists mc_oss_csv_external6
    (
    vehicleId int,
    recordId int,
    patientId int,
    calls int,
    locationLatitute double,
    locationLongtitue double,
    recordTime string,
    direction string
    )
    stored by 'com.aliyun.odps.CsvStorageHandler' 
    with serdeproperties (
     'odps.properties.rolearn'='acs:ram::xxxxxx:role/aliyunodpsdefaultrole',
     'odps.text.option.gzip.input.enabled'='true',
     'odps.text.option.gzip.output.enabled'='true' 
    ) 
    location 'oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/Demo3/output/';
  2. 通过MaxCompute客户端对外部表执行insert overwriteinsert into命令,将数据写入OSS。
    insert into table mc_oss_csv_external6 select * from mc_oss_csv_external1;

    执行成功后,您可以在OSS目录下查看到导出的文件。output文件夹下会产生一个.odps文件夹,包含.meta文件和保存.gz文件的文件夹。例如output/.odps/20220413*********/M1_0_0-0_TableSink1-0-.csv.gz

    导出结果
    说明
    • .odps文件夹中的.meta文件为MaxCompute额外输出的宏数据文件,用于记录当前文件夹中有效的数据。正常情况下,如果INSERT操作成功,可以认为当前文件夹的所有数据均是有效数据。只有在有作业失败的情况下,需要对该宏数据进行解析。即使是在作业中途失败或被中止的情况下,对于INSERT OVERWRITE操作,再运行一次即可。
    • 对于MaxCompute内置的TSV或CSV解析器,处理产生的文件数目与对应SQL的并发度相同。
    • 如果insert overwrite ... select ... from ...;操作在源数据表from_tablename上分配了1000个Mapper,则最后将产生1000个TSV或CSV文件。
    • 如果您需要控制生成文件的数目,可以通过MaxCompute的各种灵活语义和配置来实现。如果Outputer在Mapper里,可以通过调整odps.stage.mapper.split.size的大小来控制Mapper的并发数,从而调整产生的文件数目。如果Outputer在Reducer或Joiner里,也可以分别通过odps.stage.reducer.numodps.stage.joiner.num来调整。

示例:通过内置开源数据解析器将数据写入OSS

读取示例:通过内置文本数据解析器创建OSS外部表-非分区表中创建的OSS外部表mc_oss_csv_external1的数据,并以ORC格式写入OSS目录oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/Demo4/output/

操作流程如下:

  1. 登录MaxCompute客户端,执行如下命令创建OSS外部表映射目标OSS目录。
    create external table if not exists mc_oss_orc_external
    (
    vehicleId int,
    recordId int,
    patientId int,
    calls int,
    locationLatitute double,
    locationLongtitue double,
    recordTime string,
    direction string
    )
    stored as orc  
    location 'oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/Demo4/output/';
  2. 通过MaxCompute客户端对外部表执行insert overwriteinsert into命令,将数据写入OSS。
    insert into table mc_oss_orc_external select * from mc_oss_csv_external1;

    执行成功后,您可以在OSS目录下查看到导出的文件。output文件夹下会产生一个.odps文件夹,包含.meta文件和保存ORC文件的文件夹。例如output/.odps/20220413*********/M1_0_0-0_TableSink1

    导出结果
    说明
    • .odps文件夹中的.meta文件为MaxCompute额外输出的宏数据文件,用于记录当前文件夹中有效的数据。正常情况下,如果INSERT操作成功,可以认为当前文件夹的所有数据均是有效数据。只有在有作业失败的情况下,需要对该宏数据进行解析。即使是在作业中途失败或被中止的情况下,对于INSERT OVERWRITE操作,再运行一次即可。
    • 对于MaxCompute内置的TSV或CSV解析器,处理产生的文件数目与对应SQL的并发度相同。
    • 如果insert overwrite ... select ... from ...;操作在源数据表from_tablename上分配了1000个Mapper,则最后将产生1000个TSV或CSV文件。
    • 如果您需要控制生成文件的数目,可以通过MaxCompute的各种灵活语义和配置来实现。如果Outputer在Mapper里,可以通过调整odps.stage.mapper.split.size的大小来控制Mapper的并发数,从而调整产生的文件数目。如果Outputer在Reducer或Joiner里,也可以分别通过odps.stage.reducer.numodps.stage.joiner.num来调整。

示例:通过自定义解析器将数据写入OSS

基于示例:通过自定义解析器创建OSS外部表中开发的自定义解析器,为您介绍MaxCompute如何将数据输出到OSS。

操作流程如下:

  1. 登录MaxCompute客户端,执行如下命令创建OSS外部表映射目标OSS目录。
    create external table if not exists mc_oss_external 
    (
    vehicleId int,
    recordId int,
    patientId int,
    calls int,
    locationLatitute double,
    locationLongtitue double,
    recordTime string,
    direction string
    )
    stored by 'com.aliyun.odps.udf.example.text.TextStorageHandler' 
      with serdeproperties (
    'delimiter'='|',  
    'odps.properties.rolearn'='acs:ram::xxxxxxxxxxxxx:role/aliyunodpsdefaultrole'
    )
    location 'oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/SampleData/output'
    using 'javatest-1.0-SNAPSHOT.jar'; 
  2. 通过MaxCompute客户端对外部表执行insert overwriteinsert into命令,将数据写入OSS。
    insert into table mc_oss_external select * from mc_oss_csv_external1;

    执行成功后,您可以在OSS目录下查看到导出的文件。output文件夹下会产生一个.odps文件夹,包含.meta文件和保存TEXT文件的文件夹。例如output/.odps/20220413*********/M1_0_0-0_TableSink1-0

    导出结果
    说明
    • .odps文件夹中的.meta文件为MaxCompute额外输出的宏数据文件,用于记录当前文件夹中有效的数据。正常情况下,如果INSERT操作成功,可以认为当前文件夹的所有数据均是有效数据。只有在有作业失败的情况下,需要对该宏数据进行解析。即使是在作业中途失败或被中止的情况下,对于INSERT OVERWRITE操作,再运行一次即可。
    • 对于MaxCompute内置的TSV或CSV解析器,处理产生的文件数目与对应SQL的并发度相同。
    • 如果insert overwrite ... select ... from ...;操作在源数据表from_tablename上分配了1000个Mapper,则最后将产生1000个TSV或CSV文件。
    • 如果您需要控制生成文件的数目,可以通过MaxCompute的各种灵活语义和配置来实现。如果Outputer在Mapper里,可以通过调整odps.stage.mapper.split.size的大小来控制Mapper的并发数,从而调整产生的文件数目。如果Outputer在Reducer或Joiner里,也可以分别通过odps.stage.reducer.numodps.stage.joiner.num来调整。