本文介绍如何将Doris数据迁移至云原生数据仓库AnalyticDB PostgreSQL版。
准备工作
已开通IDC专线服务。具体内容,请参见通过物理专线实现本地IDC与云上VPC互通。
已开通阿里云对象存储服务(OSS)。具体内容,请参见什么是对象存储OSS。
已创建OSS存储空间。具体内容,请参见创建存储空间。
已创建云原生数据仓库AnalyticDB PostgreSQL版实例。具体内容,请参见创建实例。
操作步骤
步骤一:创建用于装载数据的目标表
在云原生数据仓库AnalyticDB PostgreSQL版实例中创建用于装载Doris数据的目标表。目标表结构需与源表结构对应。建表语法,请参见建表语句。
步骤二:将Doris的数据导入到OSS
Doris支持使用S3协议将数据导出至对象存储,OSS同样支持S3协议。您可以将数据直接上传至OSS,但针对不同的Doris版本其支持的导出能力也有所不同:
Doris2.0及以上版本支持使用
EXPORT TABLE
语句导出数据,导出格式为csv、text、orc和parquet。其中以parquet格式导出的数据与主流parquet格式并不兼容,并且使用csv或text格式不支持导出数据中含有换行符等特殊字符,因此更推荐使用orc格式,该格式导出速度较快。Doris1.2版本的
EXPORT TABLE
语句仅支持导出格式为csv,导出格式较为单一,不推荐使用。推荐使用SELECT INTO OUTFILE
语句,导出格式为orc,速度相比EXPORT TABLE
语句稍慢一些,但支持使用WHERE
子句进行过滤。
使用示例
Doris2.0和Doris1.2版本的导出语句如下:
------ Doris 2.0
EXPORT TABLE s3_test TO "s3://bucket/dir/"
PROPERTIES (
"format"="orc"
)
WITH s3 (
"AWS_ENDPOINT" = "oss-cn-shanghai-internal.aliyuncs.com",
"AWS_ACCESS_KEY" = "LTA****",
"AWS_SECRET_KEY" = "lQEI1TSJIY0******",
"AWS_REGION" = "shanghai"
)
---- Doris 1.2 c列 为datetime格式
SELECT a, b, CAST(c AS string) AS c FROM s3_test INTO OUTFILE "s3://bucket/dir/"
FORMAT AS orc
PROPERTIES
(
"AWS_ENDPOINT" = "oss-cn-shanghai-internal.aliyuncs.com",
"AWS_ACCESS_KEY" = "LTA****",
"AWS_SECRET_KEY" = "lQEI1TSJIY0******",
"AWS_REGION" = "shanghai"
);
如果表中含有DATETIME类型的列,则只能使用SELECT INTO FILE语句。因为Doris的DATETIME类型导出的格式与主流产品不兼容,需要转换为STRING类型。
步骤三:将OSS数据导入到AnalyticDB PostgreSQL版
您可以通过COPY命令或使用OSS外表将数据导入云原生数据仓库AnalyticDB PostgreSQL版:
使用COPY命令导入OSS数据的方法,请参见使用COPY或UNLOAD命令导入或导出数据到OSS。
使用OSS外表导入OSS数据的方法,请参见使用OSS Foreign Table进行数据湖分析。
使用示例
使用COPY命令导入OSS的语句如下:
COPY test1 FROM 'oss://bucket/dir/' ACCESS_KEY_ID 'LTAI5t********' SECRET_ACCESS_KEY 'lQEI1T*******'
FORMAT AS orc ENDPOINT 'oss-*****-
internal.aliyuncs.com' FDW 'oss_fdw' ;
语法转换
数据类型
Doris | AnalyticDB PostgreSQL版 | 备注 |
BOOLEAN | BOOLEAN | 无 |
TINYINT | SMALLINT | 云原生数据仓库AnalyticDB PostgreSQL版没有TINYINT。 |
SMALLINT | SMALLINT | 无 |
INT | INT | 无 |
BIGINT | BIGINT | 无 |
LARGEINT | DECIMAL | 无 |
FLOAT | FLOAT | 无 |
DOUBLE | DOUBLE | 无 |
DECIMAL | DECIMAL | 无 |
DATE | DATE | 无 |
DATETIME | TIMESTAMP/TIMSTAMPTZ | 无 |
CHAR | CHAR | 无 |
VARCHAR | VARCHAR | 无 |
STRING | TEXT | 无 |
HLL | / |
|
BITMAP | / | BITMAP类型的列可以在Aggregate表或Unique表中使用。 |
QUANTILE_STATE | / | 无 |
ARRAY | [ ] | 无 |
MAP | 自定义复合类型 | 无 |
STRUCT | 自定义复合类型 | 无 |
JSON | JSON | 无 |
AGG_STATE | / | 无 |
VARIANT | 自定义复合类型 | 无 |
建表语句
以下为几种常见的建表语句模型。
模型一:明细模型
明细模型没有主键和聚合列限制,在建表语句中指定的DUPLICATE KEY,是用来指明底层数据按照哪些列进行排序。在云原生数据仓库AnalyticDB PostgreSQL版中对应转换为AOCS或BEAM表,并使用ORDER BY语句指定排序键,启动AUTOMERGE可以定期进行数据自动排序。
使用示例
CREATE TABLE IF NOT EXISTS example_tbl_by_default
(
`timestamp` DATETIME NOT NULL COMMENT "日志时间",
`type` INT NOT NULL COMMENT "日志类型",
`error_code` INT COMMENT "错误码",
`error_msg` VARCHAR(1024) COMMENT "错误详细信息",
`op_id` BIGINT COMMENT "负责人id",
`op_time` DATETIME COMMENT "处理时间"
)
DUPLICATE KEY (`timestamp`,`type`,`error_code`)
DISTRIBUTED BY HASH(`type`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
----AnalyticDB PostgreSQL
CREATE TABLE IF NOT EXISTS example_tbl_by_default
(
"timestamp" TIMESTAMP NOT NULL ,
"type" INT NOT NULL ,
error_code INT ,
error_msg VARCHAR(1024),
op_id BIGINT,
op_time TIMESTAMP
)
WITH(appendonly = true, orientation = column)
DISTRIBUTED BY("type")
ORDER BY("timestamp","type",error_code);
COMMENT ON COLUMN example_tbl_by_default.timestamp IS '日志时间';
模型二:主键模型
主键模型的主要目的是为了确保数据主键的唯一性,使用UNIQUE KEY来指定唯一性约束,在云原生数据仓库AnalyticDB PostgreSQL版中对应heap表,并使用PRIMARY KEY来指定唯一键。
使用示例
CREATE TABLE IF NOT EXISTS example_tbl_unique
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`username` VARCHAR(50) NOT NULL COMMENT "用户昵称",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`phone` LARGEINT COMMENT "用户电话",
`address` VARCHAR(500) COMMENT "用户地址",
`register_time` DATETIME COMMENT "用户注册时间"
)
UNIQUE KEY(`user_id`, `username`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
----AnalyticDB PostgreSQL
CREATE TABLE IF NOT EXISTS example_tbl_unique
(
user_id BIGINT NOT NULL,
username VARCHAR(50) NOT NULL,
city VARCHAR(20),
age SMALLINT,
sex SMALLINT,
phone BIGINT,
address VARCHAR(500),
register_time TIMESTAMP,
PRIMARY KEY (user_id, username)
)
DISTRIBUTED BY (user_id);
COMMENT ON COLUMN example_tbl_unique.user_id IS '用户id';
模型三:聚合模型
聚合模型导入数据时,会将Aggregate Key列相同的行聚合成一行,而将Value列按照设置的 AggregationType
进行聚合。在云原生数据仓库AnalyticDB PostgreSQL版中对应使用heap表 ,对于Aggregate Key创建唯一索引,同时插入数据时使用UPSERT方式进行。具体内容,请参见使用INSERT ON CONFLICT覆盖写入数据。
使用示例
CREATE TABLE IF NOT EXISTS example_tbl_agg1
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
-----AnalyticDB PostgreSQL 不支持自动预聚合
CREATE TABLE IF NOT EXISTS example_tbl_agg1
(
user_id BIGINT NOT NULL,
"date" DATE NOT NULL,
city VARCHAR(20),
age SMALLINT,
sex SMALLINT,
last_visit_date TIMESTAMP DEFAULT '1970-01-01 00:00:00',
cost BIGINT DEFAULT 0,
max_dwell_time INT DEFAULT 0,
min_dwell_time INT DEFAULT 99999,
UNIQUE (user_id, "date", city, age, sex)
)
DISTRIBUTED BY(user_id);
INSERT INTO example_tbl_agg1 VALUES (10000,'2024-08-22','beijing', 18, 0, '2024-08-22 12:00:00', 20, 1000, 1000) ON CONFLICT (user_id, "date", city, age, sex) DO UPDATE SET last_visit_date = excluded.last_visit_date, cost = example_tbl_agg1.cost + excluded.cost, max_dwell_time = GREATEST(example_tbl_agg1.max_dwell_time, excluded.max_dwell_time), min_dwell_time = LEAST(example_tbl_agg1.min_dwell_time, excluded.min_dwell_time);
分区分桶
Doris通过PARTITION BY进行分区,DISTRIBUTED BY进行分桶,使用BUCKETS指定分桶数量,在云原生数据仓库AnalyticDB PostgreSQL版中对应分区键PARTITION BY和分布键DISTRIBUTED BY。
使用示例
CREATE TABLE IF NOT EXISTS example_range_tbl
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`timestamp` DATETIME NOT NULL COMMENT "数据灌入的时间戳",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
ENGINE=OLAP
AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`)
PARTITION BY RANGE(`date`)
(
PARTITION `p201701` VALUES LESS THAN ("2017-02-01"),
PARTITION `p201702` VALUES LESS THAN ("2017-03-01"),
PARTITION `p201703` VALUES LESS THAN ("2017-04-01"),
PARTITION `p2018` VALUES [("2018-01-01"), ("2019-01-01"))
)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 16
PROPERTIES
(
"replication_num" = "1"
);
----AnalyticDB PostgreSQL
CREATE TABLE IF NOT EXISTS example_range_tbl
(
user_id BIGINT NOT NULL,
"date" DATE NOT NULL,
city VARCHAR(20),
age SMALLINT,
sex SMALLINT,
visit_date TIMESTAMP DEFAULT '1970-01-01 00:00:00',
a_cost BIGINT DEFAULT 0,
dwell_time INT DEFAULT 0
)
PARTITION BY RANGE("date")
(
PARTITION p201701 VALUES START ("2017-02-01") INCLUSIVE,
PARTITION p201702 VALUES START ("2017-03-01") INCLUSIVE,
PARTITION p201703 VALUES START ("2017-04-01") INCLUSIVE,
PARTITION p2018 VALUES START ("2018-01-01") INCLUSIVE END ("2019-01-01") EXCLUSIVE
)
DISTRIBUTED BY (user_id);