更新时间:2019-09-27 16:55
批量归档目前支持从 Mysql、POLARDB 等数据库把业务数据批量归档到 Spark。本文将以 MySQL 为例详细介绍如何使用本工具。
如已经购买了 MySQL 数据库则本小结可以忽略,如未购买可以到 https://www.aliyun.com/product/rds/mysql 页面购买,比如本文购买好的 MySQL 数据库如下:注意:购买的 MySQL 实例的 VPC 需要和 Spark 集群一致。
X-Pack Spark 集群主要用于合并 WAL 数据,把 HBase 列数据转换成行数据,归档后的数据就存储在 X-Pack Spark 集群中。本文购买好的 X-Pack Spark 集群如下:注意图中的 VPC ID 和 VSwitch ID,和上面 MySQL 需要一样,这样才可以连接。
在创建归档任务之前,我们先到 MySQL 里面创建好相关的测试库和表,如下:
MySQL [(none)]> create database spark_archives_db;
Query OK, 1 row affected (0.00 sec)
MySQL [(none)]> use spark_archives_db;
Database changed
MySQL [spark_archives_db]> create table orders(
-> id INT UNSIGNED AUTO_INCREMENT,
-> name VARCHAR(100) NOT NULL,
-> tele VARCHAR(11) NOT NULL,
-> email VARCHAR(30) NOT NULL,
-> order_no VARCHAR(19) NOT NULL,
-> create_time TIMESTAMP,
-> PRIMARY KEY (id)
-> );
Query OK, 0 rows affected (0.00 sec)
上面我们创建了一个 spark_archives_db 库以及 orders 测试表。我们现在往这张表里面插入一些测试数据:
insert into orders values (1, 'zhangsan', '13188888888', 'zhangsan@aliyun.com', '2019091712234223242', '2019-09-17 12:23:42');
insert into orders values (2, 'lisi', '13212345678', 'lisi@aliyun.com', '2019091714122244232', '2019-09-17 14:12:22');
insert into orders values (3, 'wangwu', '13166666666', 'wangwu@aliyun.com', '2019091720113243213', '2019-09-17 20:11:32');
insert into orders values (4, 'zhaoliu', '13288888888', 'zhaoliu@aliyun.com', '2019091809234223242', '2019-09-18 09:23:42');
insert into orders values (5, 'liqi', '13121233453', 'liqi@aliyun.com', '2019091819113412345', '2019-09-18 19:11:34');
insert into orders values (6, 'wuba', '18888888888', 'wuba@aliyun.com', '2019091822430167854', '2019-09-18 22:43:01');
insert into orders values (7, 'spark', '16666666666', 'spark@aliyun.com', '2019091822554298765', '2019-09-18 22:55:42');
insert into orders values (8, 'hbase', '15555555555', 'hbase@aliyun.com', '2019091823123423242', '2019-09-18 23:12:34');
insert into orders values (9, 'cassandra', '13333333333', 'cassandra@aliyun.com', '2019091901024566666', '2019-09-19 01:02:45');
insert into orders values (10, 'hadoop', '13434343434', 'hadoop@aliyun.com', '2019091917552788888', '2019-09-19 17:55:27');
MySQL [spark_archives_db]> select * from orders;
+----+-----------+-------------+----------------------+---------------------+---------------------+
| id | name | tele | email | order_no | create_time |
+----+-----------+-------------+----------------------+---------------------+---------------------+
| 1 | zhangsan | 13188888888 | zhangsan@aliyun.com | 2019091712234223242 | 2019-09-17 12:23:42 |
| 2 | lisi | 13212345678 | lisi@aliyun.com | 2019091714122244232 | 2019-09-17 14:12:22 |
| 3 | wangwu | 13166666666 | wangwu@aliyun.com | 2019091720113243213 | 2019-09-17 20:11:32 |
| 4 | zhaoliu | 13288888888 | zhaoliu@aliyun.com | 2019091809234223242 | 2019-09-18 09:23:42 |
| 5 | liqi | 13121233453 | liqi@aliyun.com | 2019091819113412345 | 2019-09-18 19:11:34 |
| 6 | wuba | 18888888888 | wuba@aliyun.com | 2019091822430167854 | 2019-09-18 22:43:01 |
| 7 | spark | 16666666666 | spark@aliyun.com | 2019091822554298765 | 2019-09-18 22:55:42 |
| 8 | hbase | 15555555555 | hbase@aliyun.com | 2019091823123423242 | 2019-09-18 23:12:34 |
| 9 | cassandra | 13333333333 | cassandra@aliyun.com | 2019091901024566666 | 2019-09-19 01:02:45 |
| 10 | hadoop | 13434343434 | hadoop@aliyun.com | 2019091917552788888 | 2019-09-19 17:55:27 |
+----+-----------+-------------+----------------------+---------------------+---------------------+
10 rows in set (0.00 sec)
到这里,我们已经准备好所有的测试环境,现在我们可以到数据工作台创建相关的归档任务了,具体步骤如下:
点击 确认 之后我们就进入到归档编辑页面中,里面我们可以填写相关的信息,具体如下:
图中序号说明如下:
归档配置里面对应的是我们归档类型的相关配置,目前批量归档的归档类型支持 一次全量、每天全量、每天增量。分别的含义为:
归档配置里面的分区字段 对应的就是每天增量数据存放到 Spark 对应表的分区名字;增量字段代表的是按照 MySQL 对应表中的那个字段来增量抽取数据,一般都会选择表的创建时间或者修改时间来作为增量字段;高级属性选项可以参照 https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html 文档里的说明;
源库名代表的是需要归档的 MYSQL 表所在的数据库名字;
点击完提交按钮之后,会在数据工作台里面的工作流创建相关的工作流,并且会在数据工作台里面的作业管理创建相关作业,具体如下:
同时也会在 X-Pack Spark 里面创建 orders 表,如下:
归档的正常逻辑是按照指定定时调度时间周期性的把数据ETL处理后写入到spark表,但是在测试阶段,可以写入数据,然后手动的运行一下作业看看效果。我们在上一步骤往 orders 表插入了一些测试数据,我们可以手动执行上面创建好的 mysql2spark_archive_test 归档任务,如下:
我们进行测试的日期是 2019-09-19,当我们按照上面的配置来手动运行归档任务的时候,实际上是执行了 T+1 形式的归档,也就是归档了 2019-09-18 这天的数据。我们可以到运行记录里面查看归档的进度。
等作业运行完成之后,我们就可以查到刚刚归档的数据(创建sql类型的交互式查询):
可以先执行
show partitions orders
查看下生成的分区情况:正如图中显示的,X-Pack Spark 正确的归档了 2019-09-18 这一天 MySQL 新增的数据。我们在查看一下里面的数据情况:
可以看到我们正确的将 2019-09-18 这天 MySQL 的数据归档到 Spark 数据仓库的 orders 表里面了。
目前 Spark 支持以下 MySQL 的类型:
MySQL | Spark |
---|---|
TINYINT(1), BOOL, BOOLEAN, BIT(1) | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
MEDIUMINT, INT, INTEGER | INT |
BIGINT | BIGINT |
DECIMAL | DECIMAL |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME, TIMESTAMP | TIMESTAMP |
CHAR | CHAR |
VARCHAR | VARCHAR |
TEXT | TEXT |
BINARY | BINARY |
在文档使用中是否遇到以下问题
更多建议
匿名提交