Dataphin集成任务中数据过滤场景以及对应功能解决方案

更新时间:
复制为 MD 格式

背景

Dataphin集成任务中数据过滤场景以及对应功能解决方案

场景一、输入端数据是关系型数据库,如mysql 、oracle等传统关系型数据库,Dataphin集成任务对接的是jdbc接口提交sql查询获取数据

场景二、输入端数据是非关系型数据库中的Hadoop系列,如MaxComputer 、hive 等传统Hadoop系列的大数据平台等,Dataphin对接的是对应的分布式文件存储系统的文件获取数据

场景三、输入端是对接非关系型数据库中的半结构独立类型数据库,如MongoDB ,ElasticSearch

原理

Dataphin集成任务本质是一个运行在 liunx shell 任务内的jvm程序,输入组件对接对应数据库客户端或文件系统客户端,过滤组件是集成任务dlink jvm应用程序内的一个组件

Dataphin集成任务对接数据库获取数据目前有以下三大类方式

  • 关系型数据库,如mysql 、oracle等传统关系型数据库,Dataphin集成任务对接的是jdbc接口提交sql查询获取数据,针对这类直接读取表或分区表的文件的形式,用户只能用集成任务自带的过滤组件进一步过滤所需的数据(过滤)

  • 非关系型数据库中的Hadoop系列,如MaxComputer 、hive 等传统Hadoop系列的大数据平台等,Dataphin对接的是对应的分布式文件存储系统的文件获取数据

  • 除开以上两类的其它非关系型数据库,比如MongoDB ,ElasticSearch等,有自成一派的sdk或客户端对接,查询语言也是类json的,可能sdk和客户端因为版本不一样支持的语法范围也不一样。

解决方案(产品实操)

场景一、数据过滤主要用输入组件的【输入过滤】,以下以mysql为例

image图中是构造的一个例子,输入组件的过滤条件设置可以支持

(复杂场景的调试建议用对应数据库客户端调试出需要的sql,语法也是参考对应数据库的语法手册)

  • 对应数据库的支持函数

  • 对应数据库支持的where 子查询

  • 可以拼接Dataphin定义的本地变量或者全局变量参数(这样最终拼接成提交给数据库的完整sql前会渲染出实际参数拼到sql中提交到对应数据库服务端)

场景二、数据过滤依赖输入组件分区参数设置或集成任务过滤组件功能,以下以MaxComputer为例

  • 2.1 用户希望过滤MaxComputer分区表中的范围分区数据,比如分区是ds字段名,值样本形如20200202, 用户希望自动调度过滤每天最近2天的分区数据出来,设置如下图/*query*/ds >=${yesterday} and ds <= ${bizdate} (/*query*/是这个功能点的hint语法前缀必须保留)

b68d556dc574453193a39eabd4b97fbc

其中 ${yesterday} 需要设置为Dataphin的本地变量或者全局变量赋值,以下以设置为本地变量为例其它需求可以手动编辑这个值加偏移量

image

  • 2.2 用户希望过滤MaxComputer多级分区表中的多级分区数据,比如一级分区是ds字段名,值样本形如20200202, 二级分区是class字段名,值样本形如a,b,c,d用户希望自动调度过滤每天最近1天的一级分区,二级分区是 a分区的数据出来,设置如下图ds=${bizdate},class=a

image

不过一般需求是需要 class的所有分区数据,此需求下可以设置为ds=${bizdate},class=*

  • 2.3 用户希望过滤MaxComputer分区表中,比如一级分区是ds字段名,值样本形如20200202, 非分区字段中字段名是event_time,类型是datetime类型,值样本形如 2020-02-02 00:00:00 中最近3个月数据。

因为MaxComputer输入组件对接的是底层文件系统获取数据,所以不能写sql的where语句过滤,加上这个字段不是分区字段,也不能用以上2中分支场景的分区值设置来过滤,所以只能用集成任务过滤组件过滤,步骤如下:

①设置一个时间类型而且输出为时间戳的全局变量,变量名和偏移量可按需设置,输出的时间戳为utc时间戳

image

  ②输入组件MaxComputer后接过滤组件,设置好字段后点击切换脚本模式

image

image

③转脚本模式后编辑栏内 把全局变量和000拼接上去并确认(因为过滤组件这边没有类型转换,集成任务dlink jvm应用程序的底层对datetime类型对比是用毫秒时间戳,所以必须把全局变量的秒级时间戳再拼三个零出来)

image

④配置完毕后点击预览成功的话会如下图,正常运行且配置生效。

image

image

场景三、过滤MongoDB库中集合中最近24小时的数据

以下是一个对应MongoDB集合在 compass客户端查询最近24小时的语法

image

但是Dataphin的语法支持范围受限于Dataphin目前应用的MongoDB sdk版本支持范围,所以以上语句不能直接设置到对应组件内,因为gettime函数不支持image

可以设置为

{ 
"code": "C021011003000031004",
"time": {
"$gte": 1745118234000
}
}

但是需求是任务对应的最近一天,所以 1745118234000时间戳部分需要用全局变量的时间戳替换,这里不做全局变量的设置赘述,可以参考本文中的场景二分支情况3的全局变量设置

最后设置改成,补三个零也是为了对齐毫秒

{ 
"code": "C021011003000031004",
"time": {
"$gte": "${全局变量}000"
}
}

这里补充下MongoDB数据库输入组件常见注意点:

  • MongoDB因为是文档数据库记录的是半结构json数据,所以每条数据记录的字段情况是不完全一致的,查询数据是没有对应的元数据来校验的,所以新建输入组件的对接字段名称必须对其大小写,对不齐的时候会导致返回空数据,而且请求提交一个不存在的集合名称,那么sdk和客户端也会返回空数据集合导致任务成功,这个是这个数据库的特性导致的不是bug

  • MongoDB侧数据可能因为数据业务上中途修改过,导致集合的底层_id主键不唯一,最后集成任务并发的时候根据这个主键切分会导致数据膨胀写入下游,这个场景要设置1的并发

更多信息

  • 全局变量设置说明

https://help.aliyun.com/zh/Dataphin/fullmanaged/user-guide/new-global-variable?spm=a2c4g.11186623.help-menu-search-87584.d_0

  • 本地变量的值配置说明

https://help.aliyun.com/zh/Dataphin/fullmanaged/user-guide/configure-offline-pipeline-operation-parameters?spm=a2c4g.11186623.help-menu-search-87584.d_9

  • MongoDB过滤mql语法参考

https://www.mongodb.com/zh-cn/docs/manual/reference/mql/

适用于

  • 产品Dataphin中的集成任务过滤数据场景