本文以一个示例为您介绍Apache Nifi如何连接Hologres。
背景信息
Apache NiFi是一个易用、可靠的数据处理与分发系统,Apache NiFi的设计目标是自动化管理系统间的数据流。Apache Nifi是一个基于WEB-UI用户界面,具有很强交互性和易用性,为不同系统间或系统内提供数据流管理与处理的系统。
前提条件
开通Hologres,详情请参见购买Hologres。
安装Apache Nifi,详情请参见How to install and start NiFi。
获取本地JSON文件写入Hologres
获取本地JSON文件写入Hologres的流程如下图所示。
GetFile:读取JSON格式的文件。
ConvertJSONToSQL:将JSON中的元素转化为SQL中的Insert语句。
PutSQL:执行上一个Processor生成的SQL语句,将JSON中的元素插入到数据库中。
创建数据库和表
登录Hologres实例,并且建立数据库,命名为demo,详情请参见创建数据库。
创建数据表。
使用如下SQL创建数据表,之后会将数据写入该表中。
DROP TABLE IF EXISTS user_info; CREATE TABLE IF NOT EXISTS user_info ( id int, first_name text, last_name text, email text );
配置GetFile Processor
添加一个GetFile Processor。
详情请参见Adding a Processor。
填写JSON文件存放的地址。
在PROPERTIES选项卡中的Input Directory中填写JSON文件存放的地址,例如我们将JSON文件存放在Apache Nifi服务器的
/opt/nifi/nifi-current/file_source
位置,文件名为user_info.json文件内容如下。{ "id": 1, "first_name": "Sig", "last_name": "Olivo", "email": "solivo0@blinklist.com" }
配置样例如下所示。
单击APPLY保存配置。
配置ConvertJSONToSQL Processor
添加一个ConvertJSONToSQL Processor。
在JDBC Connection Pool中增加一个Service,选择Compatible Controller Services为DBCPConnectionPool,并且设置Controller Service Name,此处设置为hologres。
单击JDBC Connection Pool行最右侧的向右箭头按钮,开始配置连接字符串。
从中找到刚才创建的DBCPConnectionPool,单击设置按钮。
在设置页面的PROPERTIES选项卡中配置如下参数。
参数名称
描述
说明
Database Connection URL
连接Hologres实例的JDBC连接字符串格式为:
jdbc:postgresql://<endpoint>/<database name>
,例如jdbc:postgresql://hgpostcn-cn-xxxxxxxxxxx-cn-shanghai.hologres.aliyuncs.com:80/demo
。其中Endpoint需要使用公网或者VPC的Endpoint,进入Hologres管理控制台的实例详情页获取Endpoint。
Database Driver Class Name
org.postgresql.Driver
不涉及
Database Driver Location(s)
PostgreSQL的JDBC存放地址,例如
/opt/nifi/nifi-current/jdbc_driver/postgresql-42.3.4.jar
。您可以从PostgreSQL官网下载JDBC,推荐使用42.2.25以上版本的JDBC。
Database User
当前阿里云账号的AccessKey ID。
您可以单击AccessKey 管理,获取AccessKey ID。
Password
当前账号的AccessKey Secret。
单击OK完成配置。
单击ENABLE启动该服务。
回到最初的ConvertJSONToSQL Processor,选择修改如下参数,更多说明请参见官方手册。
参数名称
描述
Statement Type
需要执行的SQL语句,该示例中使用
INSERT
。Table Name
需要写入数据的表名称,该示例中使用
user_info
。Schema Name
需要写入数据的表的Schema名称,该示例中使用
public
。单击APPLY完成配置。
配置PutSQL Processor
添加一个PutSQL Processor。
将JDBC Connection Pool设置为上一步配置的DBCPConnectionPool,该案例中DBCPConnectionPool名称为
hologres
。将Support Fragmented Transactions置为false。
单击APPLY完成配置。
开始写入数据
至此,您就完成了所有配置,将所有节点置为运行状态,Nifi即开始读取JSON文件写入Hologres。
查询数据
使用如下命令在Hologres中查询user_info表,即可看到写入的数据。
SELECT * FROM user_info;
查询结果如下。