全部产品
云市场

数据写入AnalyticDB

更新时间:2019-04-12 10:17:05

本文以在DLA中读取OSS中的数据(dla_table_1、dla_table_2表),并通过INSERT…SELECT将符合条件的数据写入AnalyticDB(shipping、order_table)为例,介绍如何通过DLA向AnalyticDB中写入数据。

操作步骤

步骤一:创建OSS Schema

  1. CREATE SCHEMA oss_data_schema with DBPROPERTIES(
  2. catalog='oss',
  3. location = 'oss://oss_bucket_name/table/'
  4. );

步骤二:创建OSS表

dla_table_1

  1. CREATE EXTERNAL TABLE IF NOT EXISTS dla_table_1 (
  2. id bigint NOT NULL COMMENT '',
  3. origin_state varchar NOT NULL COMMENT '',
  4. origin_zip varchar NOT NULL COMMENT '',
  5. destination_state varchar NOT NULL COMMENT '',
  6. destination_zip varchar NOT NULL COMMENT '',
  7. package_weight int NOT NULL COMMENT ''
  8. )
  9. ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
  10. STORED AS TEXTFILE
  11. LOCATION 'oss://oss_bucket_name/table/';

dla_table_2

  1. CREATE EXTERNAL TABLE IF NOT EXISTS dla_table_2 (
  2. customer_id bigint NOT NULL COMMENT '',
  3. order_id varchar NOT NULL COMMENT '',
  4. order_time date NOT NULL COMMENT '',
  5. order_amount double NOT NULL COMMENT '',
  6. order_type varchar NOT NULL COMMENT '',
  7. address varchar NOT NULL COMMENT '',
  8. city varchar NOT NULL COMMENT '',
  9. order_season bigint COMMENT '',
  10. PRIMARY KEY (customer_id)
  11. )
  12. ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
  13. STORED AS TEXTFILE
  14. LOCATION 'oss://oss_bucket_name/table/';

步骤三:创建AnalyticDB Schema

  1. CREATE SCHEMA ads_database_schema WITH DBPROPERTIES
  2. (
  3. CATALOG = 'ads',
  4. LOCATION = 'jdbc:mysql://ads-database-********-vpc.cn-shanghai-1.ads.aliyuncs.com:10001/ads_database',
  5. USER='AnalyticDB AccessKey ID',
  6. PASSWORD='AnalyticDB AccessKey Secret'
  7. );

步骤四:创建AnalyticDB表

shipping

  1. CREATE EXTERNAL TABLE shipping (
  2. id bigint NOT NULL COMMENT '',
  3. origin_state varchar NOT NULL COMMENT '',
  4. origin_zip varchar NOT NULL COMMENT '',
  5. destination_state varchar NOT NULL COMMENT '',
  6. destination_zip varchar NOT NULL COMMENT '',
  7. package_weight int NOT NULL COMMENT '',
  8. PRIMARY KEY (id)
  9. )

order_table1

  1. CREATE EXTERNAL TABLE order_table1 (
  2. customer1_id bigint NOT NULL COMMENT '',
  3. order1_id bigint NOT NULL COMMENT '',
  4. order1_time date NOT NULL COMMENT '',
  5. order1_amount double NOT NULL COMMENT '',
  6. order1_type varchar NOT NULL COMMENT '',
  7. address1 varchar NOT NULL COMMENT '',
  8. city1 varchar NOT NULL COMMENT '',
  9. order1_season bigint COMMENT '',
  10. PRIMARY KEY (customer1_id)
  11. )
  12. tblproperties (
  13. table_mapping = 'ads_database.order_table',
  14. column_mapping = 'customer1_id,customer_id; order1_id,order_id; order1_time:order_time,
  15. order1_amount:order_amount, order1_type:order_type, address1:address,
  16. city1:city,order1_season:order_season'
  17. );

步骤五:执行INSERT…SELECT将OSS中的数据插入AnalyticDB

异步执行以下SQL,将OSS中table目录下dla_table_1文件中的数据插入AnalyticDB中ads_database数据库中的shipping表。

  1. -- 执行OSSAnalyticDB的全量数据插入
  2. /*+run-async=true*/
  3. INSERT INTO ads_database_schema.shipping
  4. SELECT * FROM oss_data_schema.dla_table_1;

异步执行以下SQL,将OSS中table目录下dla_table_1文件中order_amount > 2的数据插入AnalyticDB中ads_database数据库中的order_table表。

  1. -- 执行OSSAnalyticDB的数据插入,包含对OSS数据的筛选逻辑
  2. /*+run-async=true*/
  3. INSERT INTO ads_database_schema.order_table1 (customer1_id, order1_id, order1_time, order1_amount,order1_type,address1,city1,order1_season)
  4. SELECT customer_id, order_id, order_time, order_amount,order_type,address,city,order_season
  5. FROM oss_data_schema.dla_table_2
  6. WHERE order_amount > 2
  7. LIMIT 10000;