当您需要对Transactional表或Delta Table执行insert
、update
、delete
操作时,可以通过merge into
功能将这些操作合并为一条SQL语句,根据与源表关联的结果,对目标Transactional表执行插入、更新或删除操作,只需要进行一次全表扫描操作,以提高执行效率。
前提条件
执行merge into
操作前需要具备目标Transactional表的读取表数据权限(Select)及更新表数据权限(Update)。授权操作请参见MaxCompute权限。
功能介绍
MaxCompute支持了delete
、update
功能,但当您需要使用多个insert
、update
、delete
对目标表进行批量操作时,需要编写多条SQL语句,然后进行多次全表扫描才能完成操作。MaxCompute提供的merge into
功能,只需要进行一次全表扫描操作,就可以完成全部操作,执行效率要高于insert
+update
+delete
。
merge into
操作具备原子性,作业中的insert
、update
、delete
操作都执行成功时,作业才算执行成功;任一内部逻辑处理失败,则整体作业执行失败。
同时,merge into
可以为您避免分别执行insert
、update
、delete
操作时,可能导致部分操作执行成功,部分操作执行失败,其中成功部分无法回退的问题。
使用限制
不允许在同一条merge into
语句中对相同的行执行多次insert
或update
操作。
命令格式
merge into <target_table> as <alias_name_t> using <source expression|table_name> as <alias_name_s>
--从on开始对源表和目标表的数据进行关联判断。
on <boolean expression1>
--when matched…then指定on的结果为True的行为。多个when matched…then之间的数据无交集。
when matched [and <boolean expression2>] then update set <set_clause_list>
when matched [and <boolean expression3>] then delete
--when not matched…then指定on的结果为False的行为。
when not matched [and <boolean expression4>] then insert values <value_list>
target_table:必填。目标表名称,必须是实际存在的表。
alias_name_t:必填。目标表的别名。
source expression|table_name:必填。关联的源表名称、视图或子查询。
alias_name_s:必填。关联的源表、视图或子查询的别名。
boolean expression1:必填。BOOLEAN类型判断条件,判断结果必须为True或False。
boolean expression2、boolean expression3、boolean expression4:可选。
update
、delete
、insert
操作相应的BOOLEAN类型判断条件。需要注意的是:当出现三个WHEN子句时,
update
、delete
、insert
都只能出现一次。如果
update
和delete
同时出现,出现在前的操作必须包括[and <boolean expression>]
。when not matched
只能出现在最后一个WHEN子句中,并且只支持insert
操作。
set_clause_list:当出现
update
操作时必填。待更新数据信息。更多update
信息,请参见更新数据(UPDATE)。value_list:当出现
insert
操作时必填。待插入数据信息。更多values
信息,请参见VALUES。
使用示例
示例1:创建目标表acid_address_book_base1及源表tmp_table1,并插入数据。执行
merge into
操作,对符合on
条件的数据用源表的数据对目标表进行更新操作,对不符合on
条件并且源表中满足event_type为I的数据插入目标表。命令示例如下:--创建目标表acid_address_book_base1。 create table if not exists acid_address_book_base1 (id bigint,first_name string,last_name string,phone string) partitioned by(year string, month string, day string, hour string) tblproperties ("transactional"="true"); --创建源表tmp_table1。 create table if not exists tmp_table1 (id bigint, first_name string, last_name string, phone string, _event_type_ string); --向目标表acid_address_book_base1插入测试数据。 insert overwrite table acid_address_book_base1 partition(year='2020', month='08', day='20', hour='16') values (4, 'nihaho', 'li', '222'), (5, 'tahao', 'ha', '333'), (7, 'djh', 'hahh', '555'); --查询目标表的数据确认插入测试数据的操作结果。 set odps.sql.allow.fullscan=true; select * from acid_address_book_base1; +------------+------------+------------+------------+------------+------------+------------+------------+ | id | first_name | last_name | phone | year | month | day | hour | +------------+------------+------------+------------+------------+------------+------------+------------+ | 4 | nihaho | li | 222 | 2020 | 08 | 20 | 16 | | 5 | tahao | ha | 333 | 2020 | 08 | 20 | 16 | | 7 | djh | hahh | 555 | 2020 | 08 | 20 | 16 | +------------+------------+------------+------------+------------+------------+------------+------------+ --向源表tmp_table1插入测试数据。 insert overwrite table tmp_table1 values (1, 'hh', 'liu', '999', 'I'), (2, 'cc', 'zhang', '888', 'I'), (3, 'cy', 'zhang', '666', 'I'),(4, 'hh', 'liu', '999', 'U'), (5, 'cc', 'zhang', '888', 'U'),(6, 'cy', 'zhang', '666', 'U'); --查询源表的数据确认插入测试数据的操作结果。 select * from tmp_table1; +------------+------------+------------+------------+--------------+ | id | first_name | last_name | phone | _event_type_ | +------------+------------+------------+------------+--------------+ | 1 | hh | liu | 999 | I | | 2 | cc | zhang | 888 | I | | 3 | cy | zhang | 666 | I | | 4 | hh | liu | 999 | U | | 5 | cc | zhang | 888 | U | | 6 | cy | zhang | 666 | U | +------------+------------+------------+------------+--------------+ --执行merge into操作。 merge into acid_address_book_base1 as t using tmp_table1 as s on s.id = t.id and t.year='2020' and t.month='08' and t.day='20' and t.hour='16' when matched then update set t.first_name = s.first_name, t.last_name = s.last_name, t.phone = s.phone when not matched and (s._event_type_='I') then insert values(s.id, s.first_name, s.last_name,s.phone,'2020','08','20','16'); --查询目标表的数据确认merge into操作结果。 select * from acid_address_book_base1; +------------+------------+------------+------------+------------+------------+------------+------------+ | id | first_name | last_name | phone | year | month | day | hour | +------------+------------+------------+------------+------------+------------+------------+------------+ | 4 | hh | liu | 999 | 2020 | 08 | 20 | 16 | | 5 | cc | zhang | 888 | 2020 | 08 | 20 | 16 | | 7 | djh | hahh | 555 | 2020 | 08 | 20 | 16 | | 1 | hh | liu | 999 | 2020 | 08 | 20 | 16 | | 2 | cc | zhang | 888 | 2020 | 08 | 20 | 16 | | 3 | cy | zhang | 666 | 2020 | 08 | 20 | 16 | +------------+------------+------------+------------+------------+------------+------------+------------+
示例2:创建目标表merge_acid_dp及源表merge_acid_source,并插入数据。以不指定分区方式执行
merge into
命令,进行更新或者插入数据,对目标表的所有分区生效。--创建目标表merge_acid_dp。 create table if not exists merge_acid_dp(c1 bigint not null, c2 bigint not null) partitioned by (dd string, hh string) tblproperties ("transactional" = "true"); --创建源表merge_acid_source。 create table if not exists merge_acid_source(c1 bigint not null, c2 bigint not null, c3 string, c4 string) lifecycle 30; --向目标表merge_acid_dp插入测试数据。 insert overwrite table merge_acid_dp partition (dd='01', hh='01') values (1, 1), (2, 2); insert overwrite table merge_acid_dp partition (dd='02', hh='02') values (4, 1), (3, 2); --查询目标表的数据确认插入测试数据的操作结果。 set odps.sql.allow.fullscan=true; select * from merge_acid_dp; +------------+------------+----+----+ | c1 | c2 | dd | hh | +------------+------------+----+----+ | 1 | 1 | 01 | 01 | | 2 | 2 | 01 | 01 | | 4 | 1 | 02 | 02 | | 3 | 2 | 02 | 02 | +------------+------------+----+----+ --向源表merge_acid_source插入测试数据。 insert overwrite table merge_acid_source values(8, 2, '03', '03'), (5, 5, '05', '05'), (6, 6, '02', '02'); --查询源表的数据确认插入测试数据的操作结果。 select * from merge_acid_source; +------------+------------+----+----+ | c1 | c2 | c3 | c4 | +------------+------------+----+----+ | 8 | 2 | 03 | 03 | | 5 | 5 | 05 | 05 | | 6 | 6 | 02 | 02 | +------------+------------+----+----+ --执行merge into操作。 merge into merge_acid_dp tar using merge_acid_source src on tar.c2 = src.c2 when matched then update set tar.c1 = src.c1 when not matched then insert values(src.c1, src.c2, src.c3, src.c4); --查询目标表的数据确认merge into操作结果。 select * from merge_acid_dp; +------------+------------+----+----+ | c1 | c2 | dd | hh | +------------+------------+----+----+ | 6 | 6 | 02 | 02 | | 5 | 5 | 05 | 05 | | 8 | 2 | 02 | 02 | | 8 | 2 | 01 | 01 | | 1 | 1 | 01 | 01 | | 4 | 1 | 02 | 02 | +------------+------------+----+----+
示例3:创建目标表merge_acid_sp及源表merge_acid_source,并插入数据。以指定分区方式执行
merge into
命令,进行更新或者插入数据,对目标表的指定分区生效。--创建目标表merge_acid_sp。 create table if not exists merge_acid_sp(c1 bigint not null, c2 bigint not null) partitioned by (dd string, hh string) tblproperties ("transactional" = "true"); --创建源表merge_acid_source。 create table if not exists merge_acid_source(c1 bigint not null, c2 bigint not null, c3 string, c4 string) lifecycle 30; --向目标表merge_acid_sp插入测试数据。 insert overwrite table merge_acid_sp partition (dd='01', hh='01') values (1, 1), (2, 2); insert overwrite table merge_acid_sp partition (dd='02', hh='02') values (4, 1), (3, 2); --查询目标表的数据确认插入测试数据的操作结果。 set odps.sql.allow.fullscan=true; select * from merge_acid_sp; +------------+------------+----+----+ | c1 | c2 | dd | hh | +------------+------------+----+----+ | 1 | 1 | 01 | 01 | | 2 | 2 | 01 | 01 | | 4 | 1 | 02 | 02 | | 3 | 2 | 02 | 02 | +------------+------------+----+----+ --向源表merge_acid_source插入测试数据。 insert overwrite table merge_acid_source values(8, 2, '03', '03'), (5, 5, '05', '05'), (6, 6, '02', '02'); --查询源表的数据确认插入测试数据的操作结果。 select * from merge_acid_source; +------------+------------+----+----+ | c1 | c2 | c3 | c4 | +------------+------------+----+----+ | 8 | 2 | 03 | 03 | | 5 | 5 | 05 | 05 | | 6 | 6 | 02 | 02 | +------------+------------+----+----+ --执行merge into操作,同时在on条件中指定只对目标表的dd = '01' and hh = '01'分区执行更新或者插入操作。 merge into merge_acid_sp tar using merge_acid_source src on tar.c2 = src.c2 and tar.dd = '01' and tar.hh = '01' when matched then update set tar.c1 = src.c1 when not matched then insert values(src.c1, src.c2, src.c3, src.c4); --查询目标表的数据确认merge into操作结果。 select * from merge_acid_sp; +------------+------------+----+----+ | c1 | c2 | dd | hh | +------------+------------+----+----+ | 5 | 5 | 05 | 05 | | 6 | 6 | 02 | 02 | | 8 | 2 | 01 | 01 | | 1 | 1 | 01 | 01 | | 4 | 1 | 02 | 02 | | 3 | 2 | 02 | 02 | +------------+------------+----+----+
示例4:创建Delta类型目标表mf_tt6及源表mf_delta,并插入数据。以指定分区方式执行
merge into
命令,进行更新、插入或删除数据,对目标表的指定分区生效。--创建Delta类型目标表mf_tt6。 create table if not exists mf_tt6 (pk bigint not null primary key, val bigint not null) partitioned by (dd string, hh string) tblproperties ("transactional"="true"); --向目标表mf_tt6插入测试数据。 insert overwrite table mf_tt6 partition (dd='01', hh='02') values (1, 1), (2, 2), (3, 3); insert overwrite table mf_tt6 partition (dd='01', hh='01') values (1, 10), (2, 20), (3, 30); --开启全表扫描,仅此Session有效。执行select语句查看表mf_tt6中的数据。 set odps.sql.allow.fullscan=true; select * from mf_tt6; +------------+------------+----+----+ | pk | val | dd | hh | +------------+------------+----+----+ | 1 | 10 | 01 | 01 | | 3 | 30 | 01 | 01 | | 2 | 20 | 01 | 01 | | 1 | 1 | 01 | 02 | | 3 | 3 | 01 | 02 | | 2 | 2 | 01 | 02 | +------------+------------+----+----+ --创建源表mf_delta,并插入测试数据。 create table if not exists mf_delta as select pk, val from values (1, 10), (2, 20), (6, 60) t (pk, val); --查询源表的数据,确认插入测试数据的操作结果。 select * from mf_delta; +------+------+ | pk | val | +------+------+ | 1 | 10 | | 2 | 20 | | 6 | 60 | +------+------+ --执行merge into操作,同时在on条件中指定只对目标表mf_tt6的dd = '01' and hh = '02'分区执行更新、插入或删除操作。 merge into mf_tt6 using mf_delta on mf_tt6.pk = mf_delta.pk and mf_tt6.dd='01' and mf_tt6.hh='02' when matched and (mf_tt6.pk > 1) then update set mf_tt6.val = mf_delta.val when matched then delete when not matched then insert values (mf_delta.pk, mf_delta.val, '01', '02'); --查询目标表的数据确认merge into操作结果。 set odps.sql.allow.fullscan=true; select * from mf_tt6; +------------+------------+----+----+ | pk | val | dd | hh | +------------+------------+----+----+ | 1 | 10 | 01 | 01 | | 3 | 30 | 01 | 01 | | 2 | 20 | 01 | 01 | | 3 | 3 | 01 | 02 | | 6 | 60 | 01 | 02 | | 2 | 20 | 01 | 02 | +------------+------------+----+----+