Access AnalyticDB for PostgreSQL

更新时间:
复制 MD 格式

This tutorial shows how to use AnalyticDB for PostgreSQL as both a dimension table and a result table in a Flink SQL job — a common pattern for real-time enrichment pipelines.

By the end, you will have a running Flink job that reads from a Datagen source, looks up user information from an AnalyticDB for PostgreSQL dimension table, and writes the enriched records to an AnalyticDB for PostgreSQL result table.

Limitations

  • Realtime Compute for Apache Flink cannot read from AnalyticDB for PostgreSQL in serverless mode.

  • The AnalyticDB for PostgreSQL connector requires Ververica Runtime (VVR) 6.0.0 or later.

  • AnalyticDB for PostgreSQL V7.0 requires VVR 8.0.1 or later.

To use a custom connector instead, see Manage custom connectors.

Prerequisites

Before you begin, ensure that you have:

If they are in different VPCs, see How does fully managed Flink access a service across VPCs?

Step 1: Configure a whitelist and prepare data

  1. Log on to the AnalyticDB for PostgreSQL console.

  2. Add the CIDR block of the fully managed Flink workspace to the whitelist of the AnalyticDB for PostgreSQL instance.

    1. Find the CIDR block of the vSwitch that your fully managed Flink workspace uses. See How do I configure a whitelist?

    2. Add the CIDR block to the whitelist of the AnalyticDB for PostgreSQL instance. See Procedure.

    If you access the instance over the Internet, add the public IP address instead.
  3. On the instance details page, click Log On to Database in the upper-right corner and enter your username and password. See Use client tools to connect to an instance for details.

  4. Create a dimension table named adbpg_dim_table and insert 50 rows of sample data.

    -- Create the dimension table
    CREATE TABLE adbpg_dim_table(
      id int,
      username text,
      PRIMARY KEY(id)
    );
    
    -- Insert 50 rows: id ranges from 1 to 50, username is "username" followed by the row number
    INSERT INTO adbpg_dim_table(id, username)
    SELECT i, 'username'||i::text
    FROM generate_series(1, 50) AS t(i);

    Run SELECT * FROM adbpg_dim_table ORDER BY id; to verify the inserted data.

  5. Create a result table named adbpg_sink_table for Flink to write output to.

    CREATE TABLE adbpg_sink_table(
      id int,
      username text,
      score int
    );

Step 2: Create a stream draft

  1. Log on to the Realtime Compute for Apache Flink console, find your workspace, and click Console in the Actions column.

  2. In the left-side navigation pane, go to Development > ETL. In the upper-left corner of the SQL Editor page, click + and select New Blank Stream Draft.

  3. In the New Draft dialog box, configure the following parameters.

    Parameter Description Example
    Name The name of the draft. Must be unique within the project. adbpg-test
    Location The folder where the draft is saved. Click the icon next to an existing folder to create a subfolder. Draft
    Engine version The Flink engine version. See Engine versions for version details and lifecycle. vvr-8.0.1-flink-1.17
  4. Click Create.

Step 3: Write and deploy the draft

  1. Copy the following SQL into the code editor. The SQL defines three tables and a lookup join that enriches the Datagen stream with user data from AnalyticDB for PostgreSQL.

    -- Source table: Datagen generates sequential IDs (1-50) and random scores (70-100).
    -- No changes needed in the WITH clause for this example.
    CREATE TEMPORARY TABLE datagen_source (
      id INT,
      score INT
    ) WITH (
      'connector' = 'datagen',
      'fields.id.kind' = 'sequence',
      'fields.id.start' = '1',
      'fields.id.end' = '50',
      'fields.score.kind' = 'random',
      'fields.score.min' = '70',
      'fields.score.max' = '100'
    );
    
    -- Dimension table: backed by AnalyticDB for PostgreSQL.
    -- Flink queries this table at processing time to look up usernames by ID.
    -- Replace the WITH clause values with your actual connection details.
    CREATE TEMPORARY TABLE dim_adbpg(
      id int,
      username varchar,
      PRIMARY KEY(id) NOT ENFORCED
    ) WITH (
      'connector' = 'adbpg',
      'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest',
      'tablename' = 'adbpg_dim_table',
      'username' = 'flinktest',
      'password' = '${secret_values.adb_password}',
      'maxRetryTimes' = '2',
      'cache' = 'lru',
      'cacheSize' = '100'
    );
    
    -- Result table: Flink writes enriched records here.
    -- Replace the WITH clause values with your actual connection details.
    CREATE TEMPORARY TABLE sink_adbpg (
      id int,
      username varchar,
      score int
    ) WITH (
      'connector' = 'adbpg',
      'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest',
      'tablename' = 'adbpg_sink_table',
      'username' = 'flinktest',
      'password' = '${secret_values.adb_password}',
      'maxRetryTimes' = '2',
      'conflictMode' = 'ignore',
      'retryWaitTime' = '200'
    );
    
    -- Lookup join: for each record from datagen_source, Flink looks up the matching
    -- row in dim_adbpg at the time the record is processed (PROCTIME()).
    INSERT INTO sink_adbpg
    SELECT ts.id, ts.username, ds.score
    FROM datagen_source AS ds
    JOIN dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS ts
    ON ds.id = ts.id;

    About the lookup join syntax: FOR SYSTEM_TIME AS OF PROCTIME() tells Flink to look up the dimension table at the point in time when each source record is processed. This means each record is enriched with the dimension data that exists at processing time, and already-written results are not updated if the dimension table changes later.

  2. Update the connection parameters for the dimension table and result table. Replace the placeholder values in the WITH clauses with your actual AnalyticDB for PostgreSQL connection details. The Datagen source table does not require changes. For the full parameter reference and data type mappings, see AnalyticDB for PostgreSQL connector.

    Parameter Required Default Description
    url Yes JDBC URL in the format jdbc:postgresql://<Internal endpoint>:<Port>/<Database name>. Find this on the Database Connection page of the instance in the AnalyticDB for PostgreSQL console.
    tablename Yes The table name in the AnalyticDB for PostgreSQL database.
    username Yes The username for accessing the database.
    password Yes The password for the database account.
    targetSchema No public The schema name. Specify this only if your table is not in the public schema.
    maxRetryTimes No Maximum number of retries after a write failure.
    cache No Cache policy for dimension table lookups. Set to lru to keep recently accessed entries in memory. LRU caching reduces database traffic and improves lookup throughput, but cached entries may be stale. This is a trade-off between throughput and data freshness — tune cacheSize and consider your tolerance for stale data before enabling.
    cacheSize No Maximum number of entries to cache. Larger values reduce database requests but consume more memory.
    conflictMode No Action taken when a write conflicts with an existing primary key or index. Set to ignore to skip conflicting rows.
    retryWaitTime No Time in milliseconds to wait between write retries.
  3. In the upper-right corner of the SQL Editor page, click Validate to check the syntax.

  4. Click Deploy.

  5. On the O&M > Deployments page, find your deployment and click Start in the Actions column.

Step 4: Verify the result

  1. Log on to the AnalyticDB for PostgreSQL console.

  2. Click Log On to Database. See Connect to an instance from a client for details.

  3. Run the following query to view the records that Flink wrote to the result table.

    SELECT * FROM adbpg_sink_table ORDER BY id;

    The result should contain 50 rows, each with a user ID, the matching username from the dimension table, and a random score between 70 and 100.

    image.png

References