数据传输服务DTS提供流式数据ETL(Extract Transform Load)数据处理功能,详情请参见什么是ETL。ETL结合DTS的高效流数据复制能力,可以实现流式数据的抽取、数据转换、加工和数据装载,支持的典型场景有:数据过滤、数据脱敏、记录数据修改时间和数据变更审计。本文介绍在DTS链路内配置ETL。
背景信息
DTS是一个数据迁移和同步服务,通常用于数据搬迁或实时数据传输。但有时候用户有数据处理的需求,希望先对实时数据做一定转换或过滤,再写入库。为了满足此类需求,DTS提供了流式数据ETL数据处理功能,支持使用DSL(Domain Specific Language)脚本语言灵活地定义数据处理逻辑。DSL的介绍及配置语法,请参见数据处理DSL语法简介。
DTS支持通过以下两种方式配置ETL。
说明 DTS迁移任务和同步任务中均支持配置ETL,本文以同步任务为例,迁移任务的配置方法相同。
支持的数据库
ETL支持的源库和目标库如下表所示。
源库 | 目标库 |
---|---|
SQL Server |
|
MySQL |
|
自建Oracle |
|
PolarDB MySQL |
|
PolarDB O引擎 |
|
PolarDB-X 1.0 |
|
自建Db2 for LUW | MySQL |
自建Db2 for i | MySQL |
PolarDB PostgreSQL |
|
PostgreSQL |
|
TiDB |
|
在创建同步任务时配置ETL
注意事项
如果您配置的ETL脚本中,包含新增列操作,那么需要您手动在目标端添加列。否则ETL脚本不生效。例如script:e_set(`new_column`, dt_now())
,此处new_column
需要您手动在目标端添加。
在已有同步任务上修改ETL配置
修改已有同步任务的ETL配置包括:
- 如果已有同步任务未配置ETL,即创建同步任务时配置ETL功能设置为否,支持将否修改为是,并配置DSL脚本。
- 如果已有同步任务已配置ETL,支持修改已有的DSL脚本或将配置ETL功能修改为否。
注意事项
- 已有同步任务上修改ETL配置暂不支持对目标端表的表结构进行变更,如果需要变更,您需要在启动同步任务前在目标端变更表结构。
- 修改ETL配置可能造成链路中断,请谨慎操作。
- ETL配置的修改仅对启动同步任务后的增量数据生效,对修改ETL配置前的历史数据不生效。
数据处理DSL语法简介
典型场景示例
- 数据过滤:
- 按数值列条件过滤:如果id>10000,则丢弃这条记录,不同步到目标库:e_if(op_gt(`id`, 10000), e_drop)。
- 按字符串匹配条件过滤:如果name包含“hangzhou”,则丢弃这条记录:e_if(str_contains(`name`, "hangzhou"), e_drop)。
- 按日期过滤:如果订单时间早于某个时间,则不同步:e_if(op_lt(`order_timestamp`, "2015-02-23 23:54:55"), e_drop)。
- 按多条件过滤:
- 如果id>1000且name包含“hangzhou”,则丢弃这条记录:e_if(op_and(str_contains(`name`, "hangzhou"), op_gt(`id`, 1000)), e_drop())。
- 如果id>1000或name包含“hangzhou”,则丢弃这条记录:e_if(op_or(str_contains(`name`, "hangzhou"), op_gt(`id`, 1000)), e_drop())。
- 数据脱敏:
- 遮掩:将phone手机号列的后四位用星号替换,e_set(`phone`, str_mask(`phone`, 7, 10, '*'))。
- 记录数据修改时间:
- 对所有表新增列:__OPERATION__的值为INSERT或UPDATE或DELETE时新增1列“dts_sync_time”,值为日志提交时间(__COMMIT_TIMESTAMP__)。
e_if(op_or(op_or( op_eq(__OPERATION__, __OP_INSERT__), op_eq(__OPERATION__, __OP_UPDATE__)), op_eq(__OPERATION__, __OP_DELETE__)), e_set(dts_sync_time, __COMMIT_TIMESTAMP__))
- 对指定表“dts_test_table”新增列: __OPERATION__的值为INSERT或UPDATE或DELETE时新增1列“dts_sync_time”,
值为日志提交时间(__COMMIT_TIMESTAMP__)。
e_if(op_and( op_eq(__TB__,'dts_test_table'), op_or(op_or( op_eq(__OPERATION__,__OP_INSERT__), op_eq(__OPERATION__,__OP_UPDATE__)), op_eq(__OPERATION__,__OP_DELETE__))), e_set(dts_sync_time,__COMMIT_TIMESTAMP__))
说明 上述新增列操作需要您在任务启动前自行修改目标端表定义,添加“dts_sync_time”列。
- 对所有表新增列:__OPERATION__的值为INSERT或UPDATE或DELETE时新增1列“dts_sync_time”,值为日志提交时间(__COMMIT_TIMESTAMP__)。
- 数据变更审计:
- 记录表数据变化的类型和时间:在目标端的“operation_type”列记录数据变化类型;在目标端的“updated”列记录数据发生变化的时间。
e_compose( e_switch( op_eq(__OPERATION__,__OP_DELETE__), e_set(operation_type, 'DELETE'), op_eq(__OPERATION__,__OP_UPDATE__), e_set(operation_type, 'UPDATE'), op_eq(__OPERATION__,__OP_INSERT__), e_set(operation_type, 'INSERT')), e_set(updated, __COMMIT_TIMESTAMP__), e_set(__OPERATION__,__OP_INSERT__) )
说明 您需要在任务启动前在目标端表中添加“operation_type”列和“updated”列。
- 记录表数据变化的类型和时间:在目标端的“operation_type”列记录数据变化类型;在目标端的“updated”列记录数据发生变化的时间。
数据处理DSL语法
常量与变量
- 常量
类型 示例 int 123 float 123.4 string "hello1_world" boolean true或false datetime '2021-01-01 10:10:01' - 变量
变量 含义 数据类型 示例值 __TB__ 表名 string table __DB__ 库名 string mydb __OPERATION__ 操作类型 string __OP_INSERT__,__OP_UPDATE__,__OP_DELETE__ __COMMIT_TIMESTAMP__ 事务提交时间 datetime '2021-01-01 10:10:01' `column` 某条数据对应column的值 string `id`、`name`
表达式函数
- 数值运算
功能 语法 取值范围 返回值 示例 加法 op_sum(value1, value2) - value1:整数或浮点数
- value2:整数或浮点数
若参数均为整数,则返回整数,否则返回浮点数。 op_sum(`col1`, 1.0) 减法 op_sub(value1, value2) - value1:整数或浮点数
- value2:整数或浮点数
若参数均为整数,则返回整数,否则返回浮点数。 op_sub(`col1`, 1.0) 乘法 op_mul(value1, value2) - value1:整数或浮点数
- value2:整数或浮点数
若参数均为整数,则返回整数,否则返回浮点数。 op_mul(`col1`, 1.0) 除法 op_div_true(value1, value2) - value1:整数或浮点数
- value2:整数或浮点数
若参数均为整数,则返回整数,否则返回浮点数。 op_div_true(`col1`, 2.0), 若col1=15,则返回7.5。 取模 op_mod(value1, value2) - value1:整数或浮点数
- value2:整数或浮点数
若参数均为整数,则返回整数,否则返回浮点数。 op_mod(`col1`, 10),若col1=23,则返回3 - 逻辑运算
功能 语法 取值范围 返回值 示例 是否相等 op_eq(value1, value2) - value1:整数、浮点数、字符串
- value2:整数、浮点数、字符串
boolean类型,true或false op_eq(`col1`, 23) 是否大于 op_gt(value1, value2) - value1:整数、浮点数、字符串
- value2:整数、浮点数、字符串
boolean类型,true或false op_gt(`col1`, 1.0) 是否小于 op_lt(value1, value2) - value1:整数、浮点数、字符串
- value2:整数、浮点数、字符串
boolean类型,true或false op_lt(`col1`, 1.0) 是否大于等于 op_ge(value1, value2) - value1:整数、浮点数、字符串
- value2:整数、浮点数、字符串
boolean类型,true或false op_ge(`col1`, 1.0) 是否小于等于 op_le(value1, value2) - value1:整数、浮点数、字符串
- value2:整数、浮点数、字符串
boolean类型,true或false op_le(`col1`, 1.0) AND运算 op_and(value1, value2) - value1:boolean类型
- value2:boolean类型
boolean类型,true或false op_and(`is_male`, `is_student`) OR运算 op_or(value1, value2) - value1:boolean类型
- value2:boolean类型
boolean类型,true或false op_or(`is_male`, `is_student`) - 字符串函数
功能 语法 取值范围 返回值 示例 字符串格式化,字符串拼接 str_format(format, value1, value2, value3, ...) - format:字符串类型,以大括号作为占位符,如 "part1: {}, part2: {}"。
- value1:任意
- value2:任意
格式化好的字符串 str_format("part1: {}, part2: {}", `col1`, `col2`),若col1="ab", col2="12", 则返回"part1: ab, part2: 12"。 字符串替换 str_replace(original, oldStr, newStr, count) - original:原来的字符串
- oldStr:待替换的字符串
- newStr:替换后的字符串
- count:整数,最多替换次数。若设置为-1,则全部替换。
替换后的字符串 str_replace(`name`, "a", 'b', 1),若name="aba", 则返回"bba" ;str_replace(`name`, "a", 'b', -1);若name="aba", 则返回"bbb"。 移除字符串首尾的特定字符 str_strip(string_val, charSet) - string_val:原来的字符串
- char_set:待移除的字符集合
移除首尾字符后的字符串 str_strip(`name`, 'ab'),若name=axbzb, 则返回xbz。 字符串转小写 str_lower(value1) value1:字符串列或字符串常量 小写字符串 str_lower(`str_col`) 字符串转大写 str_upper(value1) value1:字符串列或字符串常量 大写字符串 str_upper(`str_col`) 字符串统计 str_count(str, pattern) - str:字符串列或字符串常量
- pattern:要查找的子串
子串出现的次数。 str_count(`str_col`, 'abc'), 若str_col="zabcyabcz",则返回2。 字符串查找 str_find(str, pattern)。 - str:字符串列或字符串常量
- pattern:要查找的子串
子串首次匹配的位置,没有则返回`-1`。 str_find(`str_col`, 'abc'), 若`str_col="xabcy"`,则返回`1`。 判断是否全是字母组成的字符串。 str_isalpha(str) str:字符串列或字符串常量 true或false str_isalpha(`str_col`) 判断是否全是数字组成的字符串 str_isdigit(str) str:字符串列或字符串常量 true或false。 str_isdigit(`str_col`)。 使用指定字符遮掩字符串的一部分,可用于数据脱敏,例如把手机号的后四位替换为星号。 str_mask(str, start, end, maskStr) - str:字符串列或字符串常量
- start:整数,遮掩的起始位置,最小值为0。
- end:整数,遮掩的结束位置,最大值为字符串长度减一。
- maskStr:字符串,长度为1的字符串,例如 '#'。
遮掩掉start至end后的字符串。 str_mask(`phone`, 7, 10, '#')。 获取当前时区的DateTime dt_now() 无 当前时区的DateTime e_set(`dt_col`, dt_now()) 获取UTC时区的DateTime dt_utcnow() 无 UTC时区的DateTime e_set(`dt_col`, dt_utcnow())
全局函数
- 流程控制函数
功能 语法 参数说明 示例 if语句 e_if(bool_expr, func_invoke) - bool_expr:bool常量或函数调用。常量:true或false。函数调用:op_gt(`id`, 10)。
- func_invoke:函数调用。e_drop,e_keep,e_set,e_if,e_compose
e_if(op_gt(`id`, 10), e_drop()), 如果ID大于10,则丢弃这条记录。 if else语句 e_if_else(bool_expr, func_invoke1, func_invoke2) - bool_expr:bool常量或函数调用。常量:true或false。函数调用:op_gt(`id`, 10)。
- func_invoke1:函数调用。条件为true时执行。
- func_invoke2:函数调用。条件为false时执行。
e_if_else(op_gt(`id`, 10), e_set(`tag`, 'large'), e_set(`tag`, 'small')),如果ID大于10,则设置tag列为"large", 否则设置为"small"。 类switch语句,进行多次条件判断,第一次满足条件时执行对应操作,如无匹配则执行默认操作。 s_switch(condition1, func1, condition2, func2, ..., default = default_func) - condition1:bool常量或函数调用。常量:true或false。函数调用:op_gt(`id`, 10)。
- func_invoke:函数调用。检查condition1,若为true则执行此函数,并退出整个switch,若为false则继续检查下一个条件。
- default_func:函数调用。当前面的所有condition都为false时,执行此默认函数。
e_switch(op_gt(`id`, 100), e_set(`str_col`, '>100'), op_gt(`id`, 90), e_set(`str_col`, '>90'), default=e_set(`str_col`, '<=90'))。 组合多个操作 e_compose(func1, func2, func3, ...) - func1:函数调用。可以为e_set, e_drop, e_if。
- func2:函数调用。可以为e_set, e_drop, e_if。
e_compose(e_set(`str_col`, 'test'), e_set(`dt_col`, dt_now())), 设置str_col列的值为test,并设置dt_col列的值为当前时间。 - 数据操作函数
功能 语法 参数说明 示例 丢弃此条数据,不同步 e_drop() 无 e_if(op_gt(`id`, 10), e_drop()),丢弃ID大于10的记录。 保留此条数据,同步到目标端 e_keep(condition) condition:boolean类型表达式 e_keep(op_gt(id, 1)) ,仅同步ID大于1的数据。 设置列值 e_set(`col`, val) - col:列名
- val:常量或函数调用。类型需要和col的类型匹配
- e_set(`dt_col`, dt_now()),设置dt_col为当前时间。
- e_set(`col1`, `col2` + 1),设置col1为col2+1。