将表格存储的增量数据转换为全量数据格式

通过DataWorks大数据开发治理平台,您可以使用ODPS SQLMaxCompute表格存储的增量数据转换为全量数据格式。

前提条件

步骤一:新建JAR资源

新建JAR资源,并上传merge_udf.jar包到DataWorks。

数据开发(Data Studio)旧版

  1. 进入数据开发页面。

    1. 登录DataWorks控制台

    2. 在页面上方,选择资源组和地域。

    3. 在左侧导航栏,单击数据开发与运维 > 数据开发

    4. 数据开发页面的下拉框中,选择对应工作空间后单击进入数据开发

  2. 新建JAR资源。

    1. DataStudio控制台的数据开发页面,单击新建,然后选择新建资源 > MaxCompute > JAR

      您也可以打开相应的业务流程,右键单击MaxCompute,选择新建资源 > JAR

    2. 在新建资源对话框,配置资源的相关参数,然后单击新建

      重要
      • 如果资源未在MaxCompute客户端上传过,则必须勾选上传为ODPS资源;如果资源已上传至MaxCompute客户端,则必须取消勾选上传为ODPS资源

      • 如果上传时勾选了上传为ODPS资源,则在DataWorksMaxCompute中均会存储该资源。

      • 资源名称必须以.jar作为后缀。

  3. 提交JAR资源。

    1. 单击image图标。

    2. 提交对话框,根据需要填写变更描述。

    3. 单击确认

数据开发(Data Studio)新版

  1. 进入资源管理页面。

    1. 登录DataWorks控制台

    2. 在页面上方,选择资源组和地域。

    3. 在左侧导航栏,单击数据开发与运维 > 数据开发

    4. 数据开发页面的下拉框中,选择对应工作空间后单击进入Data Studio

    5. 在左侧导航栏单击资源管理image图标,进入资源管理页面。

  2. 新建JAR资源。

    1. 资源管理页面,单击image图标,然后选择新建资源 > MaxCompute Jar

    2. 新建资源和函数对话框,选择路径并填写资源名称,然后单击确认

      重要

      资源名称必须以.jar作为后缀。

    3. 在新建的资源中,根据下表配置上传资源的相关参数。

      参数

      说明

      文件来源

      目标文件的来源,包括本地OSS两种来源。

      文件内容

      • 如果选择本地,在上传文件中单击点击上传即可上传本地文件。

      • 如果选择OSS,在选择文件下拉框中选择对应的OSS文件。

      数据源

      上传的MaxCompute资源所属的数据源。

    4. 单击保存,保存配置。

  3. 发布JAR资源。

    1. 单击发布

    2. 在资源的发布页签,根据需要输入发布描述,然后单击开始发布生产

    3. 根据发布流程引导,单击确认发布

步骤二:新建函数

数据开发(Data Studio)旧版

  1. 新建函数。

    1. DataStudio控制台的数据开发页面,右键单击目标业务流程,然后选择新建函数 > MaxCompute > 函数

    2. 新建函数对话框,选择路径并填写函数名称。

    3. 单击新建

  2. 注册函数。

    根据下表配置函数的相关参数,然后单击image.png图标,保存配置。

    参数

    说明

    函数类型

    选择其他函数

    类名

    根据Tablestore源表的类型和模式,填写相应的类名。

    说明

    您可通过Tablestore源表的最大版本数(MaxVersion)属性判断其为单版本表或多版本表。

    • 单版本表

      • 单版本模式

        类名:com.aliyun.ots.stream.utils.mergecell.oneversion.MergeCell

    • 多版本表

      • 多版本模式V1

        类名:com.aliyun.ots.stream.utils.mergecell.multiversion.MergeCellV1

      • 多版本模式V2

        类名:com.aliyun.ots.stream.utils.mergecell.multiversion.MergeCellV2

      重要

      多版本模式V1V2是两种不同的策略。

      多版本模式V1,以行级为单位管理版本,在输出结果中,每个版本均保留整行数据的完整快照。

      多版本模式V2,以列级为单位管理版本,在输出结果中,不同列的版本独立,仅保留列的增量变化。

    资源列表

    选择步骤一新增的资源名称。

  3. 提交函数。

    1. 单击image图标。

    2. 提交对话框,根据需要填写变更描述。

    3. 单击确认

数据开发(Data Studio)新版

  1. 新建函数。

    1. 资源管理页面,单击image图标,然后选择新建函数 > MaxCompute Function

    2. 新建资源和函数对话框,选择路径并填写函数名称,然后单击确认

    3. 在新建的函数中,根据下表配置函数的相关参数。

      参数

      说明

      函数类型

      选择OTHER(其他函数)。

      数据源

      MaxCompute函数所属的数据源。

      类名

      根据Tablestore源表的类型和模式,填写相应的类名。

      说明

      您可通过Tablestore源表的最大版本数(MaxVersion)属性判断其为单版本表或多版本表。

      • 单版本表

        • 单版本模式

          类名:com.aliyun.ots.stream.utils.mergecell.oneversion.MergeCell

      • 多版本表

        • 多版本模式V1

          类名:com.aliyun.ots.stream.utils.mergecell.multiversion.MergeCellV1

        • 多版本模式V2

          类名:com.aliyun.ots.stream.utils.mergecell.multiversion.MergeCellV2

        重要

        多版本模式V1V2是两种不同的策略。

        多版本模式V1,以行级为单位管理版本,在输出结果中,每个版本均保留整行数据的完整快照。

        多版本模式V2,以列级为单位管理版本,在输出结果中,不同列的版本独立,仅保留列的增量变化。

      类型

      选择资源函数

      资源列表

      选择步骤一新增的资源名称。

    4. 单击保存,保存配置。

  2. 发布函数。

    1. 单击发布

    2. 在函数的发布页签,根据需要输入发布描述,然后单击开始发布生产

    3. 根据发布流程引导,单击确认发布

步骤三:编写ODPS SQL并运行

数据开发(Data Studio)旧版

  1. 新建ODPS SQL节点。

    1. DataStudio控制台的数据开发页面,单击新建,然后选择新建节点 > MaxCompute > ODPS SQL

      您也可以右键单击目标业务流程,然后选择新建节点 > MaxCompute > ODPS SQL

    2. 新建节点对话框,选择路径并填写名称。

    3. 单击确认

  2. 在节点编辑页面,编写ODPS SQL语句。

    SQL语句的格式如下:

    SELECT function_name(para_list) 
    AS (custom_para_list)
    FROM(
        SELECT * FROM stream_table_name 
        DISTRIBUTE BY primary_keys 
        SORT by primary_keys, SequenceID
    )t;

    参数说明如下:

    参数

    说明

    function_name

    函数名称。填写步骤二新增的函数名称。

    para_list

    参数列表,配置Tablestore源表和MaxCompute增量表的相关参数。

    custom_para_list

    自定义参数列表,定义返回结果中的字段名称。

    stream_table_name

    MaxCompute中存储Tablestore增量数据的表名称。

    primary_keys

    Tablestore源表的主键列表。

    SequenceID

    时序信息。填写sequenceInfo。

    重要

    ODPS SQL示例及参数配置,请根据步骤二中确定的模式(单版本模式多版本模式V1多版本模式V2),参见附录:模式选择的相关内容。

  3. 运行ODPS SQL。

    单击1680170333627-a1e19a43-4e2a-4340-9564-f53f2fa6806e图标,运行结果将显示在结果页签下。

数据开发(Data Studio)新版

  1. DataStudio控制台的数据开发页面,单击image图标,然后选择新建节点 > MaxCompute > MaxCompute SQL

  2. 新建节点对话框,选择路径并填写名称,然后单击确认

  3. 在节点编辑页面,编写ODPS SQL语句。

    SQL语句的格式如下:

    SELECT function_name(para_list) 
    AS (custom_para_list)
    FROM(
        SELECT * FROM stream_table_name 
        DISTRIBUTE BY primary_keys 
        SORT by primary_keys, SequenceID
    )t;

    参数说明如下:

    参数

    说明

    function_name

    函数名称。填写步骤二新增的函数名称。

    para_list

    参数列表,配置Tablestore源表和MaxCompute增量表的相关参数。

    custom_para_list

    自定义参数列表,定义返回结果中的字段名称。

    stream_table_name

    MaxCompute中存储Tablestore增量数据的表名称。

    primary_keys

    Tablestore源表的主键列表。

    SequenceID

    时序信息。填写sequenceInfo。

    重要

    ODPS SQL示例及参数配置,请根据步骤二中确定的模式(单版本模式多版本模式V1多版本模式V2),参见附录:模式选择的相关内容。

  4. 运行ODPS SQL。

    单击运行,运行结果将显示在结果页签下。

附录:模式选择

模式包括单版本模式、多版本模式V1和多版本模式V2,请根据Tablestore表类型选择合适的模式。

单版本模式

类名

com.aliyun.ots.stream.utils.mergecell.oneversion.MergeCell

参数列表

参数列表的格式为pknum,colnum,colnames,pknames,colname,version,colvalue,optype,sequenceinfo,详细信息请参见下表。

参数

类型

说明

pknum

INT(常量)

Tablestore源表的主键数量。

colnum

INT(常量)

需合并增量变更的Tablestore属性列数量。

colnames

List<String>(常量)

需合并增量变更的Tablestore属性列名称,使用逗号(,)分隔。

pknames

List<String>(常量)

Tablestore源表的主键列表,使用逗号(,)分隔。

colname

STRING(变量)

属性列(由增量表字段colname提供)。

version

BIGINT(变量)

版本号(由增量表字段version提供)。

colvalue

STRING(变量)

增量值(由增量表字段colvalue提供)。

optype

STRING(变量)

增量操作类型(由增量表字段optype提供)。

sequenceinfo

STRING(变量)

时序信息(由增量表字段sequenceInfo提供)。

示例

假设Tablestore源表和MaxCompute增量表结构如下:

  • Tablestore源表结构:主键为pk1,pk2,属性列为col1,col2,col3

  • MaxCompute增量表结构:pk1,pk2,colname,version,colvalue,optype,sequenceinfo

如果要将MaxCompute增量表中的增量数据转换为全量数据格式,则需设置参数列表2,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo,并设置自定义参数列表pk1,pk2,col1,col1_is_deleted,col2,col2_is_deleted,col3,col3_is_deleted

ODPS SQL语句示例如下:

SELECT mergeCell(2,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo)
AS (pk1,pk2,col1,col1_is_deleted,col2,col2_is_deleted,col3,col3_is_deleted)
FROM(
    SELECT * FROM stream_table_name 
    DISTRIBUTE BY pk1,pk2 
    SORT by pk1,pk2,sequenceInfo
)t;

执行ODPS SQL语句后,输出结果的示例数据请参见下表。

pk1

pk2

col1

co1_is_deleted

col2

col2_is_deleted

col3

col3_is_deleted

test

0

\N

\N

20

\N

\N

True

输出结果的说明如下:

  • col列和col_is_deleted列均为\N时,表示col列无任何增量操作。

  • col列为具体的值且col_is_deleted列为\N时,表示col列的值被修改为对应值。

  • col列为\Ncol_is_deleted列为true时,表示col列被删除。

多版本模式V1

类名

com.aliyun.ots.stream.utils.mergecell.multiversion.MergeCellV1

参数列表

参数列表的格式为pknum,colnum,colnames,pknames,colname,version,colvalue,optype,sequenceinfo,详细信息请参见下表。

参数

类型

说明

pknum

INT(常量)

Tablestore源表的主键数量。

colnum

INT(常量)

需合并增量变更的Tablestore属性列数量。

colnames

List<String>(常量)

需合并增量变更的Tablestore属性列名称,使用逗号(,)分隔。

pknames

List<String>(常量)

Tablestore源表的主键列表,使用逗号(,)分隔。

colname

STRING(变量)

属性列(由增量表字段colname提供)。

version

BIGINT(变量)

版本号(由增量表字段version提供)。

colvalue

STRING(变量)

增量值(由增量表字段colvalue提供)。

optype

STRING(变量)

增量操作类型(由增量表字段optype提供)。

sequenceinfo

STRING(变量)

时序信息(由增量表字段sequenceInfo提供)。

示例

假设Tablestore源表和MaxCompute增量表结构如下:

  • Tablestore源表结构:主键为pk1,pk2,属性列为col1,col2,col3

  • MaxCompute增量表结构:pk1,pk2,colname,version,colvalue,optype,sequenceinfo

如果要将MaxCompute增量表中的增量数据转换为全量数据格式,则需设置参数列表2,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo,并设置自定义参数列表pk1,pk2,version,col1,col1_is_deleted,col2,col2_is_deleted,col3,col3_is_deleted

ODPS SQL语句示例如下:

SELECT mergeCell(2,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo)
AS (pk1,pk2,version,col1,col1_is_deleted,col2,col2_is_deleted,col3,col3_is_deleted)
FROM(
    SELECT * FROM stream_table_name 
    DISTRIBUTE BY pk1,pk2 
    SORT by pk1,pk2,sequenceInfo
)t;

执行ODPS SQL语句后,输出结果的示例数据请参见下表。

pk1

pk2

version

col1

co1_is_deleted

col2

col2_is_deleted

col3

col3_is_deleted

test

0

123

\N

\N

20

\N

\N

True

输出结果的说明如下:

  • version列为具体的值,且col列和col_is_deleted列均为\N时,表示col列对应版本没有任何增量操作。

  • version列和col列均为具体的值,且col_is_deleted列为\N时,表示col列对应版本的值被修改为具体的值。

  • version列为具体的值,col列为\N,且col_is_deleted列为true时,表示col列对应版本被删除。

  • version列和col列均为\N,且col_is_deleted列为true时,表示存在删除一列所有版本的操作。

多版本模式V2

类名

com.aliyun.ots.stream.utils.mergecell.multiversion.MergeCellV2

参数列表

参数列表的格式为pknum,colnum,maxversion,colnames,pknames,colname,version,colvalue,optype,sequenceinfo,详细信息请参见下表。

参数

类型

说明

pknum

INT(常量)

Tablestore源表的主键数量。

colnum

INT(常量)

需合并增量变更的Tablestore属性列数量。

maxversion

BIGINT(常量)

Tablestore源表的最大版本数。

colnames

List<String>(常量)

需合并增量变更的Tablestore属性列名称,使用逗号(,)分隔。

pknames

List<String>(常量)

Tablestore源表的主键列表,使用逗号(,)分隔。

colname

STRING(变量)

属性列(由增量表字段colname提供)。

version

BIGINT(变量)

版本号(由增量表字段version提供)。

colvalue

STRING(变量)

增量值(由增量表字段colvalue提供)。

optype

STRING(变量)

增量操作类型(由增量表字段optype提供)。

sequenceinfo

STRING(变量)

时序信息(由增量表字段sequenceInfo提供)。

示例

假设Tablestore源表和MaxCompute增量表结构如下:

  • Tablestore源表结构:主键为pk1,pk2,属性列为col1,col2,col3且最大版本数为3。

  • MaxCompute增量表结构:pk1,pk2,colname,version,colvalue,optype,sequenceinfo

如果要将MaxCompute增量表中的增量数据转换为全量数据格式,则需设置参数列表2,3,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo,并设置自定义参数列表pk1,pk2,col1,col2,col3

ODPS SQL语句示例如下:

SELECT mergeCell(2,3,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo)
AS (pk1,pk2,col1,col2,col3)
FROM(
    SELECT * FROM stream_table_name 
    DISTRIBUTE BY pk1,pk2 
    SORT by pk1,pk2,sequenceInfo
)t;

执行ODPS SQL语句后,输出结果的示例数据请参见下表。

pk1

pk2

col1

col2

col3

test

02

{"data":[{"version":1621330803390,"value":"value001"},{"version":1621330795198,"value":"value002"},{"version":1621330785936,"value":"value003"}],"needDeleteAllVersionFirst":true,"deleteVersions":[]}

\N

\N

输出结果的说明如下:

  • data表示新写入的数据列表,按照版本号降序排序。最多保留maxversion个版本的数据。

  • needDeleteAllVersionFirst表示该列是否需要删除原有全部版本。当出现删除一行DeleteRow或删除一列的所有版本DeleteColumns时,该值为true,否则为false。

  • deleteVersions表示该列需要删除的版本列表,按照版本号降序排序。最多保留maxversion个版本。

    deleteVersions中的版本号不会与data中的版本号相同,当needDeleteAllVersionFirsttrue时,deleteVersions为空列表。

常见问题

执行SQL语句进行表格存储数据格式转换时出现类型转换错误问题

  • 问题现象

    DataWorks中通过数据开发执行SQL语句进行表格存储数据格式转换时出现如下错误:

    FAILED ODPS -0010000:System internal error -fuxi job failed, causer by: Failed in UDF/UDTF/UDAF  com.aliyun.otsstream.utils.mergecell.oneversion.MergeCell class, at query location of line 1, column 8
  • 可能原因

    odps.sql.type.system.odps2参数的配置不正确(即odps.sql.type.system.odps2=true)。

  • 解决方案

    SQL语句前添加set odps.sql.type.system.odps2=false; ,并与SQL语句一起提交运行。

属性列映射不到值且未查到值的属性列均显示被删除

  • 问题现象

    执行SQL语句后,返回结果中存在属性列无法映射到实际值,并且与该属性列对应的is_deleted字段值为True

    例如,属性列col1的值均为\Ncol1_is_deleted列值均为True,表示col1列已被删除,但实际col1列的值不为空。

  • 可能原因

    在配置离线同步任务时,导出时序信息的参数配置错误(即"isExportSequenceInfo": false),导致系统误判属性列已被删除。

    说明

    sequenceInfo字段用于存储属性列的历史版本信息。如果未导出sequenceInfo,在将增量数据转换为全量数据格式时,可能无法正确解析版本信息,从而导致相应版本的数据无法映射至目标行中。

  • 解决方案

    步骤三:配置离线同步任务中,修改导出时序信息的参数配置,随后重新同步Tablestore增量数据,最后再执行SQL语句进行数据格式转换。

    • 向导模式

      配置任务步骤的配置数据来源与去向区域,请勾选导出时序信息的单选框,然后保存和提交任务。

    • 脚本模式

      修改Tablestore Stream ReaderisExportSequenceInfo参数的值为true,然后保存和提交任务。