当您需要对Transactional表或Delta表执行INSERT、UPDATE
、DELETE
操作时,可以通过MERGE INTO
功能将这些操作合并为一条SQL语句,根据与源表关联的结果,对目标Transactional表执行插入、更新或删除操作时,只需要执行一次全表扫描操作,以提高执行效率。
前提条件
执行MERGE INTO
操作前需要具备目标Transactional表的读取表数据权限(Select)及更新表数据权限(Update)。授权操作请参见MaxCompute权限。
功能介绍
MaxCompute支持了DELETE
、UPDATE
功能,但当您需要使用多个INSERT
、UPDATE
、DELETE
对目标表进行批量操作时,需要编写多条SQL语句,然后进行多次全表扫描才能完成操作。MaxCompute提供的MERGE INTO
功能,只需要进行一次全表扫描操作,就可以完成全部操作,执行效率要高于INSERT
+UPDATE
+DELETE
。
MERGE INTO
操作具备原子性,作业中的INSERT
、UPDATE
、DELETE
操作都执行成功时,作业才算执行成功;任一内部逻辑处理失败,则整体作业执行失败。
同时,MERGE INTO
可以为您避免分别执行INSERT
、UPDATE
、DELETE
操作时,可能导致部分操作执行成功,部分操作执行失败,其中成功部分无法回退的问题。
使用限制
不允许在同一条MERGE INTO
语句中对相同的行执行多次INSERT
或UPDATE操作。
命令格式
MERGE INTO <target_table> AS <alias_name_t> USING <source expression|table_name> AS <alias_name_s>
--从on开始对源表和目标表的数据进行关联判断。
ON <BOOLEAN expression1>
--when matched…then指定on的结果为True的行为。多个when matched…then之间的数据无交集。
WHEN matched [AND <BOOLEAN expression2>] THEN UPDATE SET <set_clause_list>
WHEN matched [AND <BOOLEAN expression3>] THEN DELETE
--when not matched…then指定on的结果为False的行为。
WHEN NOT matched [AND <BOOLEAN expression4>] THEN INSERT VALUES <value_list>
参数说明
参数名 | 是否必填 | 描述 |
target_table | 是 | 目标表名称,必须是实际存在的表。 |
alias_name_t | 是 | 目标表的别名。 |
source expression|table_name | 是 | 关联的源表名称、视图或子查询。 |
alias_name_s | 是 | 关联的源表、视图或子查询的别名 |
BOOLEAN expression1 | 是 | BOOLEAN类型判断条件,判断结果必须为True或False。 |
BOOLEAN expression2、BOOLEAN expression3、BOOLEAN expression4 | 否 |
|
set_clause_list | 否 | 待更新数据信息。当出现 更多 |
value_list | 否 | 待插入数据信息。当出现 更多 |
使用示例
示例1:创建目标表acid_address_book_base1及源表tmp_table1,并插入数据。执行
MERGE INTO
操作,对符合ON
条件的数据用源表的数据对目标表进行更新操作,对不符合ON
条件并且源表中满足_event_type_为I的数据插入目标表。命令示例如下:--创建目标表acid_address_book_base1。 CREATE TABLE IF NOT EXISTS acid_address_book_base1 (id BIGINT,first_name STRING,last_name STRING,phone STRING) PARTITIONED BY(year STRING, month STRING, day STRING, hour STRING) tblproperties ("transactional"="true"); --创建源表tmp_table1。 CREATE TABLE IF NOT EXISTS tmp_table1 (id BIGINT, first_name STRING, last_name STRING, phone STRING, _event_type_ STRING); --向目标表acid_address_book_base1插入测试数据。 INSERT OVERWRITE TABLE acid_address_book_base1 PARTITION(year='2020', month='08', day='20', hour='16') VALUES (4, 'nihaho', 'li', '222'), (5, 'tahao', 'ha', '333'), (7, 'djh', 'hahh', '555'); --查询目标表的数据确认插入测试数据的操作结果。 SET odps.sql.allow.fullscan=true; SELECT * FROM acid_address_book_base1; --返回结果 +------------+------------+------------+------------+------------+------------+------------+------------+ | id | first_name | last_name | phone | year | month | day | hour | +------------+------------+------------+------------+------------+------------+------------+------------+ | 4 | nihaho | li | 222 | 2020 | 08 | 20 | 16 | | 5 | tahao | ha | 333 | 2020 | 08 | 20 | 16 | | 7 | djh | hahh | 555 | 2020 | 08 | 20 | 16 | +------------+------------+------------+------------+------------+------------+------------+------------+ --向源表tmp_table1插入测试数据。 INSERT OVERWRITE TABLE tmp_table1 VALUES (1, 'hh', 'liu', '999', 'I'), (2, 'cc', 'zhang', '888', 'I'), (3, 'cy', 'zhang', '666', 'I'),(4, 'hh', 'liu', '999', 'U'), (5, 'cc', 'zhang', '888', 'U'),(6, 'cy', 'zhang', '666', 'U'); --查询源表的数据确认插入测试数据的操作结果。 SET odps.sql.allow.fullscan=true; SELECT * FROM tmp_table1; --返回结果 +------------+------------+------------+------------+--------------+ | id | first_name | last_name | phone | _event_type_ | +------------+------------+------------+------------+--------------+ | 1 | hh | liu | 999 | I | | 2 | cc | zhang | 888 | I | | 3 | cy | zhang | 666 | I | | 4 | hh | liu | 999 | U | | 5 | cc | zhang | 888 | U | | 6 | cy | zhang | 666 | U | +------------+------------+------------+------------+--------------+ --执行merge into操作。 MERGE INTO acid_address_book_base1 AS t USING tmp_table1 as s ON s.id = t.id AND t.year='2020' AND t.month='08' AND t.day='20' AND t.hour='16' WHEN MATCHED THEN UPDATE SET t.first_name = s.first_name, t.last_name = s.last_name, t.phone = s.phone WHEN NOT MATCHED AND (s._event_type_='I') THEN INSERT VALUES(s.id, s.first_name, s.last_name,s.phone,'2020','08','20','16'); --查询目标表的数据确认merge into操作结果。 SET odps.sql.allow.fullscan=true; SELECT * FROM acid_address_book_base1; --返回结果 +------------+------------+------------+------------+------------+------------+------------+------------+ | id | first_name | last_name | phone | year | month | day | hour | +------------+------------+------------+------------+------------+------------+------------+------------+ | 4 | hh | liu | 999 | 2020 | 08 | 20 | 16 | | 5 | cc | zhang | 888 | 2020 | 08 | 20 | 16 | | 7 | djh | hahh | 555 | 2020 | 08 | 20 | 16 | | 1 | hh | liu | 999 | 2020 | 08 | 20 | 16 | | 2 | cc | zhang | 888 | 2020 | 08 | 20 | 16 | | 3 | cy | zhang | 666 | 2020 | 08 | 20 | 16 | +------------+------------+------------+------------+------------+------------+------------+------------+
示例2:创建目标表merge_acid_dp及源表merge_acid_source,并插入数据。以不指定分区方式执行
MERGE INTO
命令,进行更新或者插入数据,对目标表的所有分区生效。--创建目标表merge_acid_dp。 CREATE TABLE IF NOT EXISTS merge_acid_dp(c1 BIGINT NOT NULL, c2 BIGINT NOT NULL) PARTITIONED BY (dd STRING, hh STRING) tblproperties ("transactional" = "true"); --创建源表merge_acid_source。 CREATE TABLE IF NOT EXISTS merge_acid_source(c1 BIGINT NOT NULL, c2 BIGINT NOT NULL, c3 STRING, c4 STRING) lifecycle 30; --向目标表merge_acid_dp插入测试数据。 INSERT OVERWRITE TABLE merge_acid_dp PARTITION (dd='01', hh='01') VALUES (1, 1), (2, 2); INSERT OVERWRITE TABLE merge_acid_dp PARTITION (dd='02', hh='02') VALUES (4, 1), (3, 2); --查询目标表的数据确认插入测试数据的操作结果。 SET odps.sql.allow.fullscan=true; SELECT * FROM merge_acid_dp; --返回结果 +------------+------------+----+----+ | c1 | c2 | dd | hh | +------------+------------+----+----+ | 1 | 1 | 01 | 01 | | 2 | 2 | 01 | 01 | | 4 | 1 | 02 | 02 | | 3 | 2 | 02 | 02 | +------------+------------+----+----+ --向源表merge_acid_source插入测试数据。 INSERT OVERWRITE TABLE merge_acid_source VALUES(8, 2, '03', '03'), (5, 5, '05', '05'), (6, 6, '02', '02'); --查询源表的数据确认插入测试数据的操作结果。 SET odps.sql.allow.fullscan=true; SELECT * FROM merge_acid_source; --返回结果 +------------+------------+----+----+ | c1 | c2 | c3 | c4 | +------------+------------+----+----+ | 8 | 2 | 03 | 03 | | 5 | 5 | 05 | 05 | | 6 | 6 | 02 | 02 | +------------+------------+----+----+ --执行merge into操作。 SET odps.sql.allow.fullscan=true; MERGE INTO merge_acid_dp tar USING merge_acid_source src ON tar.c2 = src.c2 WHEN MATCHED THEN UPDATE SET tar.c1 = src.c1 WHEN NOT MATCHED THEN INSERT VALUES(src.c1, src.c2, src.c3, src.c4); --查询目标表的数据确认merge into操作结果。 SET odps.sql.allow.fullscan=true; SELECT * FROM merge_acid_dp; --返回结果 +------------+------------+----+----+ | c1 | c2 | dd | hh | +------------+------------+----+----+ | 6 | 6 | 02 | 02 | | 5 | 5 | 05 | 05 | | 8 | 2 | 02 | 02 | | 8 | 2 | 01 | 01 | | 1 | 1 | 01 | 01 | | 4 | 1 | 02 | 02 | +------------+------------+----+----+
示例3:创建目标表merge_acid_sp及源表merge_acid_source,并插入数据。以指定分区方式执行
MERGE INTO
命令,进行更新或者插入数据,对目标表的指定分区生效。--创建目标表merge_acid_sp。 CREATE TABLE IF NOT EXISTS merge_acid_sp(c1 BIGINT NOT NULL, c2 BIGINT NOT NULL) PARTITIONED BY (dd STRING, hh STRING) tblproperties ("transactional" = "true"); --创建源表merge_acid_source。 CREATE TABLE IF NOT EXISTS merge_acid_source(c1 BIGINT NOT NULL, c2 BIGINT NOT NULL, c3 STRING, c4 STRING) lifecycle 30; --向目标表merge_acid_sp插入测试数据。 INSERT OVERWRITE TABLE merge_acid_sp PARTITION (dd='01', hh='01') VALUES (1, 1), (2, 2); INSERT OVERWRITE TABLE merge_acid_sp PARTITION (dd='02', hh='02') VALUES (4, 1), (3, 2); --查询目标表的数据确认插入测试数据的操作结果。 SET odps.sql.allow.fullscan=true; SELECT * FROM merge_acid_sp; --返回结果 +------------+------------+----+----+ | c1 | c2 | dd | hh | +------------+------------+----+----+ | 1 | 1 | 01 | 01 | | 2 | 2 | 01 | 01 | | 4 | 1 | 02 | 02 | | 3 | 2 | 02 | 02 | +------------+------------+----+----+ --向源表merge_acid_source插入测试数据。 INSERT OVERWRITE TABLE merge_acid_source VALUES(8, 2, '03', '03'), (5, 5, '05', '05'), (6, 6, '02', '02'); --查询源表的数据确认插入测试数据的操作结果。 SET odps.sql.allow.fullscan=true; SELECT * FROM merge_acid_source; --返回结果 +------------+------------+----+----+ | c1 | c2 | c3 | c4 | +------------+------------+----+----+ | 8 | 2 | 03 | 03 | | 5 | 5 | 05 | 05 | | 6 | 6 | 02 | 02 | +------------+------------+----+----+ --执行merge into操作,同时在on条件中指定只对目标表的dd = '01' and hh = '01'分区执行更新或者插入操作。 SET odps.sql.allow.fullscan=true; MERGE INTO merge_acid_sp tar USING merge_acid_source src ON tar.c2 = src.c2 AND tar.dd = '01' AND tar.hh = '01' WHEN MATCHED THEN UPDATE SET tar.c1 = src.c1 WHEN NOT MATCHED THEN INSERT VALUES(src.c1, src.c2, src.c3, src.c4); --查询目标表的数据确认merge into操作结果。 SET odps.sql.allow.fullscan=true; SELECT * FROM merge_acid_sp; +------------+------------+----+----+ | c1 | c2 | dd | hh | +------------+------------+----+----+ | 5 | 5 | 05 | 05 | | 6 | 6 | 02 | 02 | | 8 | 2 | 01 | 01 | | 1 | 1 | 01 | 01 | | 4 | 1 | 02 | 02 | | 3 | 2 | 02 | 02 | +------------+------------+----+----+
示例4:创建Delta Table类型目标表mf_tt6及源表mf_delta,并插入数据。以指定分区方式执行
MERGE INTO
命令,进行更新、插入或删除数据,对目标表的指定分区生效。--创建Transaction Table2.0类型目标表mf_tt6。 CREATE TABLE IF NOT EXISTS mf_tt6 (pk BIGINT NOT NULL PRIMARY key, val BIGINT NOT NULL) PARTITIONED BY (dd STRING, hh STRING) tblproperties ("transactional"="true"); --向目标表mf_tt6插入测试数据。 INSERT OVERWRITE TABLE mf_tt6 PARTITION (dd='01', hh='02') VALUES (1, 1), (2, 2), (3, 3); INSERT OVERWRITE TABLE mf_tt6 PARTITION (dd='01', hh='01') VALUES (1, 10), (2, 20), (3, 30); --开启全表扫描,仅此Session有效。执行select语句查看表mf_tt6中的数据。 SET odps.sql.allow.fullscan=true; SELECT * FROM mf_tt6; --返回结果 +------------+------------+----+----+ | pk | val | dd | hh | +------------+------------+----+----+ | 1 | 10 | 01 | 01 | | 3 | 30 | 01 | 01 | | 2 | 20 | 01 | 01 | | 1 | 1 | 01 | 02 | | 3 | 3 | 01 | 02 | | 2 | 2 | 01 | 02 | +------------+------------+----+----+ --创建源表mf_delta,并插入测试数据。 CREATE TABLE IF NOT EXISTS mf_delta AS SELECT pk, val FROM VALUES (1, 10), (2, 20), (6, 60) t (pk, val); --查询源表的数据,确认插入测试数据的操作结果。 SELECT * FROM mf_delta; --返回结果 +------+------+ | pk | val | +------+------+ | 1 | 10 | | 2 | 20 | | 6 | 60 | +------+------+ --执行merge into操作,同时在on条件中指定只对目标表mf_tt6的dd = '01' and hh = '02'分区执行更新、插入或删除操作。 MERGE INTO mf_tt6 USING mf_delta ON mf_tt6.pk = mf_delta.pk AND mf_tt6.dd='01' AND mf_tt6.hh='02' WHEN MATCHED AND (mf_tt6.pk > 1) THEN UPDATE SET mf_tt6.val = mf_delta.val WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT VALUES (mf_delta.pk, mf_delta.val, '01', '02'); --查询目标表的数据确认merge into操作结果。 SET odps.sql.allow.fullscan=true; SELECT * FROM mf_tt6; --返回结果 +------------+------------+----+----+ | pk | val | dd | hh | +------------+------------+----+----+ | 1 | 10 | 01 | 01 | | 3 | 30 | 01 | 01 | | 2 | 20 | 01 | 01 | | 3 | 3 | 01 | 02 | | 6 | 60 | 01 | 02 | | 2 | 20 | 01 | 02 | +------------+------------+----+----+