通过DataWorks控制台,您可以在MaxCompute中使用merge_udf.jar包将表格存储的增量数据转换为全量数据格式。
前提条件
- 已导出表格存储全量数据到MaxCompute,且已配置同步表格存储增量数据到MaxCompute。具体操作,请分别参见全量导出(脚本模式)和增量同步(脚本模式)。
- 已下载merge_udf.jar包。具体下载路径请参见merge_udf.jar。
步骤一:新建JAR资源
通过新建JAR资源,将下载的merge_udf.jar包上传到MaxCompute中。
步骤二:新建并注册函数
步骤三:编写ODPS SQL并运行
附录:模式选择
模式包括单版本模式、多版本模式V1和多版本模式V2,请根据表类型选择合适的模式。
- 单版本模式
- 类名:com.aliyun.ots.stream.utils.mergecell.oneversion.MergeCell
- 参数列表
参数列表的格式为
pknum,colnum,colnames,pknames,colname,version,colvalue,optype,sequenceinfo
。具体参数说明请参见下表,请根据实际填写pknum、colnum、colnames和pknames,其他参数保持参数名称即可。参数 类型 描述 INT pknum 常量 表格存储数据表的主键个数。 INT colnum 常量 表格存储数据表中的属性列个数。 List<String> colnames 常量 要合并增量变更的属性列名称。 List<String> pknames 变量 表格存储数据表的主键列表。 STRING colname 变量 属性列。 BIGINT version 变量 版本号。 STRING colvalue 变量 增量值。 STRING optype 变量 增量操作类型。 STRING sequenceinfo 变量 自增保序sequenceid。 - 示例
当表格存储数据表的主键为
pk1,pk2
且属性列为col1,col2,col3
时,则在MaxCompute中创建的增量表Schema为pk1,pk2,colname,version,colvalue,optype,sequenceinfo
,如果要合并增量变更的属性列为col1,col2,col3
,设置参数列表为2,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo
,ODPS SQL语句示例如下:select mergeCell(2,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo) as (pk1,pk2,col1,col1_is_deleted,col2,col2_is_deleted,col3,col3_is_deleted) from( select * from stream_table_name distribute by pk1,pk2 sort by pk1,pk2,sequenceInfo )t;
执行ODPS SQL语句后的输出结果请参见下表。pk1 pk2 col1 co1_is_deleted col2 col2_is_deleted col3 col3_is_deleted test 0 \N \N 20 \N \N true 输出结果的说明如下:- 当col列和col_is_deleted列均为
\N
时,表示col列无任何增量操作。 - 当col列为具体的值且col_is_deleted列为
\N
时,表示col列的值被修改为对应值。 - 当col列为
\N
且col_is_deleted列为true时,表示col列被删除。
- 当col列和col_is_deleted列均为
- 多版本模式V1
- 类名:com.aliyun.ots.stream.utils.mergecell.multiversion.MergeCellV1
- 参数列表
参数列表的格式为
pknum,colnum,colnames,pknames,colname,version,colvalue,optype,sequenceinfo
。具体参数说明请参见下表,请根据实际填写pknum、colnum、colnames和pknames,其他参数保持参数名称即可。参数 类型 描述 INT pknum 常量 表格存储数据表的主键个数。 INT colnum 常量 表格存储数据表中的属性列个数。 List<String> colnames 常量 要合并增量变更的属性列名称。 List<String> pknames 变量 表格存储数据表的主键列表。 STRING colname 变量 属性列。 BIGINT version 变量 版本号。 STRING colvalue 变量 增量值。 STRING optype 变量 增量操作类型。 STRING sequenceinfo 变量 自增保序sequenceid。 - 示例
当表格存储数据表的主键为
pk1,pk2
且属性列为col1,col2,col3
时,则在MaxCompute中创建的增量表Schema为pk1,pk2,colname,version,colvalue,optype,sequenceinfo
,如果要合并增量变更的属性列为col1,col2,col3
,设置参数列表为2,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo
,ODPS SQL语句示例如下:select mergeCell(2,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo) as (pk1,pk2,version,col1,col1_is_deleted,col2,col2_is_deleted,col3,col3_is_deleted) from( select * from stream_table_name distribute by pk1,pk2 sort by pk1,pk2,sequenceinfo )t;
执行ODPS SQL语句后的输出结果请参见下表。pk1 pk2 version col1 co1_is_deleted col2 col2_is_deleted col3 col3_is_deleted test 0 123 \N \N 20 \N \N true 输出结果的说明如下:- 当version列为具体的值,且col列和col_is_deleted列均为
\N
时,表示col列对应版本没有任何增量操作。 - 当version列和col列均为具体的值,且col_is_deleted列为
\N
时,表示col列对应版本的值被修改为具体的值。 - 当version列为具体的值,col列为
\N
,且col_is_deleted列为true时,表示col列对应版本被删除。 - 当version列和col列均为
\N
,且col_is_deleted列为true时,表示存在删除一列所有版本的操作。
- 当version列为具体的值,且col列和col_is_deleted列均为
- 多版本模式V2
- 类名:com.aliyun.ots.stream.utils.mergecell.multiversion.MergeCellV2
- 参数列表
参数列表的格式为
pknum,colnum,maxversion,colnames,pknames,colname,version,colvalue,optype,sequenceinfo
。具体参数说明请参见下表,请根据实际填写pknum、colnum、maxversion、colnames和pknames,其他参数保持参数名称即可。参数 类型 描述 BIGINT pknum 常量 表格存储数据表的主键个数。 BIGINT colnum 常量 表格存储数据表中的属性列个数。 BIGINT maxversion 常量 最大版本数。 List<String> colnames 常量 要合并增量变更的属性列名称。 List<String> pknames 变量 表格存储数据表的主键列表。 STRING colname 变量 属性列。 BIGINT version 变量 版本号。 STRING colvalue 变量 增量值。 STRING optype 变量 增量操作类型。 STRING sequenceinfo 变量 自增保序sequenceid。 - 示例
当表格存储数据表的主键为
pk1,pk2
,属性列为col1,col2,col3
且最大版本数为3时,则在MaxCompute中创建的增量表Schema为pk1,pk2,colname,version,colvalue,optype,sequenceinfo
,如果要合并增量变更的属性列为col1,col2,col3
,设置参数列表为2,3,3,"col1","col2","col3",pk1,pk2,colName,version,colValue,opType,sequenceInfo
,ODPS SQL语句示例如下:select mergeCell(2,3,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,opyype,sequenceinfo) as (pk1,pk2,col1,col2,col3) from( select * from stream_table_name distribute by pk1,pk2 sort by pk1,pk2,sequenceinfo )t;
执行ODPS SQL语句后的输出结果请参见下表。pk1 pk2 col1 col2 col3 test 02 {"data":[{"version":1621330803390,"value":"value001"},{"version":1621330795198,"value":"value002"},{"version":1621330785936,"value":"value003"}],"needDeleteAllVersionFirst":true,"deleteVersions":[]} \N \N 输出结果的说明如下:- data表示新写入的数据列表,按照版本号降序排序。最多保留maxversion个版本的数据。
- needDeleteAllVersionFirst表示该列是否需要删除原有全部版本。当出现删除一行DeleteRow或删除一列的所有版本DeleteColumns时,该值为true,否则为false。
- deleteVersions表示该列需要删除的版本列表,按照版本号降序排序。最多保留maxversion个版本。
deleteVersions中的版本号不会与data中的版本号相同,当needDeleteAllVersionFirst为true时,deleteVersions为空列表。