Connect Apache NiFi to Hologres

更新时间:
复制 MD 格式

This document provides an example of how to connect Apache NiFi to Hologres.

Background

Apache NiFi is an easy-to-use and reliable system for data processing and distribution. It is designed to automate the flow of data between systems. Apache NiFi provides a highly interactive and user-friendly web interface for managing and processing data streams between or within systems.

Prerequisites

Write a local JSON file to Hologres

The following workflow writes data from a local JSON file to Hologres.

  1. GetFile: Reads a JSON file.

  2. ConvertJSONToSQL: Converts the elements in the JSON file into a SQL INSERT statement.

  3. PutSQL: Executes the SQL statement from the previous processor to insert data into the database.

  1. Create a database and a table

    1. Log on to your Hologres instance and create a database named demo. For more information, see Create a database.

    2. Create a table.

      Run the following SQL statement to create the destination table.

      DROP TABLE IF EXISTS user_info;
      CREATE TABLE IF NOT EXISTS user_info (
          id int,
          first_name text,
          last_name text,
          email text
      );
  2. Configure the GetFile processor

    1. Add a GetFile processor.

      For more information, see Adding a Processor.

    2. Specify the path to the JSON file.

      On the PROPERTIES tab, enter the storage path of your JSON file in the Input Directory field. This example uses a JSON file named user_info.json, stored in the /opt/nifi/nifi-current/file_source directory on the Apache NiFi server. The file contains the following data:

      {
          "id": 1,
          "first_name": "Sig",
          "last_name": "Olivo",
          "email": "solivo0@blinklist.com"
      }

      The following is an example configuration. Keep other properties at their default values: File Filter is [^\.].*, Keep Source File is false (deletes the source file after reading), Batch Size is 10, Recurse Subdirectories is true, and Polling Interval is 0 sec.

    3. Click Apply to save the configuration.

  3. Configure the ConvertJSONToSQL processor

    1. Add a ConvertJSONToSQL processor.

    2. For the JDBC Connection Pool property, create a new service. Set Compatible Controller Services to DBCPConnectionPool and the Controller Service Name to hologres.

    3. Click the Go To arrow (→) on the far right of the JDBC Connection Pool row to configure the connection string.

    4. Find the DBCPConnectionPool you just created and click its settings icon.

      On the CONTROLLER SERVICES tab, find the DBCPConnectionPool service named hologres and click the settings icon (a gear) on the right.

    5. On the PROPERTIES tab of the settings page, configure the following parameters.

      Parameter

      Description

      Description

      Database Connection URL

      The JDBC connection string for the Hologres instance. The format is jdbc:postgresql://<endpoint>/<database name>. Example: jdbc:postgresql://hgpostcn-cn-xxxxxxxxxxx-cn-shanghai.hologres.aliyuncs.com:80/demo.

      The endpoint must be a public or Virtual Private Cloud (VPC) endpoint. To get the endpoint, go to the instance details page in the Hologres console.

      Database Driver Class Name

      org.postgresql.Driver

      N/A

      Database Driver Location(s)

      The path where the PostgreSQL JDBC driver is stored. Example: /opt/nifi/nifi-current/jdbc_driver/postgresql-42.3.4.jar.

      You can download the JDBC driver from the official PostgreSQL website. We recommend using JDBC driver version 42.2.25 or later.

      Database User

      The AccessKey ID of your Alibaba Cloud account.

      To get your AccessKey ID, go to AccessKey Management.

      Password

      The AccessKey Secret of your Alibaba Cloud account.

    6. Click OK to complete the configuration.

    7. Click ENABLE to start the controller service.

    8. Return to the ConvertJSONToSQL processor configuration and set the following parameters. For more information, see the official NiFi documentation.

      Parameter

      Description

      Statement Type

      The type of SQL statement to generate. In this example, use INSERT.

      Table Name

      The name of the destination table. In this example, use user_info.

      Schema Name

      The schema of the destination table. In this example, use public.

    9. Click Apply to complete the configuration.

  4. Configure the PutSQL processor

    1. Add a PutSQL processor.

    2. Set JDBC Connection Pool to the DBCPConnectionPool you configured in the previous step. In this example, the name of the DBCPConnectionPool is hologres.

    3. Set Support Fragmented Transactions to false.

    4. Click Apply to complete the configuration.

  5. Start writing data

    You have completed the configuration. Start all processors to read the JSON file and write its data to Hologres. The NiFi data flow pipeline consists of three processors connected in sequence: GetFile (reads the local JSON file, 105 bytes out) → ConvertJSONToSQL (converts JSON to a SQL statement, 83 bytes out) → PutSQL (executes the SQL statement to write to the database). After the run, the empty success and sql queues indicate that the data was successfully written to Hologres.

  6. Query the data

    Run the following command in Hologres to query the user_info table and view the imported data.

    SELECT * FROM user_info;

    The query returns four columns (id, first_name, last_name, and email) and one row of data: id=1, first_name=Sig, last_name=Olivo, email=solivo0@blinklist.com. This result confirms that the data from the local JSON file was successfully written to Hologres.