Hologres: Real-time data warehouse

更新时间:
复制 MD 格式

Use the Hologres connector to read from and write to Hologres tables in stream and batch modes, with support for CDC, binlog consumption, and partial updates.

Overview

Hologres is a unified real-time data warehouse that supports standard SQL (PostgreSQL-compatible), petabyte-scale OLAP, ad hoc analysis, and high-concurrency low-latency data serving. It integrates with MaxCompute, Realtime Compute for Apache Flink, and DataWorks. The following table describes the connector capabilities.

Category

Description

Supported types

source, dimension, and sink tables

Execution mode

stream and batch modes

Data format

Not supported

Specific monitoring metrics

Monitoring metrics

  • Source table:

    • numRecordsIn

    • numRecordsInPerSecond

  • Sink table:

    • numRecordsOut

    • numRecordsOutPerSecond

    • currentSendTime

    Note

    Metric details: Monitoring metrics.

API types

DataStream and SQL

Supports data updates or deletions in sink tables

Yes

Features

Feature

Description

Real-time consumption of Hologres data

Supports reading Hologres data with or without binlog in both CDC and non-CDC modes.

Unified full and incremental consumption

Supports full, incremental, and unified full and incremental consumption.

Primary key conflict handling

Supports ignoring new data, replacing entire rows, or updating only specific fields.

Multi-stream merge and partial updates

Updates only the modified columns instead of the entire row.

Consume binlog from partitioned tables (Beta)

Supports consuming binlog from both physical and logical partitioned tables. For physical partitioned tables, a single job can monitor all partitions, including newly added ones.

Write to partitioned tables

Supports writing to a parent table to automatically create the corresponding child partitions.

Real-time synchronization for a single table or an entire database

Supports real-time synchronization of a single table or an entire database, with the following key features:

  • Automatic detection of source table schema evolution: When the schema of a source table changes, Hologres synchronizes these changes to the sink table in real time.

  • Automatic handling of schema changes: When new data is ingested, Flink first modifies the sink table schema before writing the data.

For more information, see CREATE TABLE AS (CTAS) statement and Quick Start for real-time database synchronization.

Limitations and recommendations

Limitations

  • Foreign tables not supported: The Hologres connector does not support Hologres foreign tables, such as MaxCompute foreign tables.

  • Time type restriction: The Hologres connector does not currently support real-time consumption of TIMESTAMP data. When creating tables, use the TIMESTAMPTZ type exclusively.

  • Source table scan mode (Ververica Runtime (VVR) 8 and earlier): By default, the connector reads data in batch mode, scanning the full table only once. Consequently, it does not consume newly added data.

  • Watermark restriction (Ververica Runtime (VVR) 8 and earlier): CDC mode does not support watermark definitions. To perform windowed aggregation, use a non-windowed aggregation solution instead.

  • Behavior of time functions in COPY mode: When you use the COPY_STREAM or COPY_BULK_LOAD write mode, the values of table columns that use CURRENT_TIMESTAMP or NOW() as the default are fixed to the connection start time and do not update for each row. The hg_binlog_timestamp_us binlog metadata column provides the actual ingestion time.

Recommendations

  • Storage format selection:

    • For dimension table point lookups: Use row-oriented storage. You must set a primary key and a clustering key.

    • For one-to-many queries on dimension tables: Use column-oriented storage. For optimal performance, properly configure the distribution key and segment key.

    • For tables requiring frequent updates and analytical queries: If a table must support both real-time binlog consumption and OLAP analysis, we strongly recommend using row-column hybrid storage.

    Important

    If you do not specify a storage format when you create a table in Hologres, it defaults to column-oriented storage. You cannot change the storage format after table creation. Create a Table in Hologres | Table Storage Formats: Row-oriented, Column-oriented, and Row-column Hybrid.

  • Job parallelism settings: Set the Flink job parallelism to match the number of shards in the Hologres table.

    -- In HoloWeb, run the following statement to find the number of shards in a table. Replace <tablename> with your table name.
    select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';
  • Versions and features: Regularly check the Hologres connector Release Notes for known issues, feature updates, and version compatibility information.

Usage notes

  • Compatibility and limitations for Hologres and VVR consumption modes

    Source table

    • For VVR 8 and earlier, select the consumption mode using the sdkMode parameter.

    • For VVR 11 and later, select the consumption mode using the source.binlog.read-mode parameter.

    VVR version

    Hologres version

    Default/recommended value

    Actual consumption mode

    Notes

    ≥ 6.0.7

    < 2.0

    Custom

    holohub (default)

    We recommend setting the value to jdbc.

    6.0.7 to 8.0.4

    ≥ 2.0

    jdbc (automatic switchover, no configuration required)

    jdbc (forced)

    The holohub service is deprecated in Hologres 2.0 and later. The system automatically switches to the jdbc mode, which may cause permission issues. For permission configuration, see Permission issues.

    ≥ 8.0.5

    ≥ 2.1

    jdbc (automatic switchover, no configuration required)

    jdbc (forced)

    No permission issues. For Hologres 2.1.27 and later, the mode switches to jdbc_fixed.

    ≥ 11.1

    Any version

    AUTO (default)

    Automatically selected based on the Hologres version

    • For Hologres 2.1.27 and later, the jdbc mode is selected with lightweight connections enabled by default (the connection.fixed.enabled parameter is set to true).

    • For Hologres versions from 2.1.0 to 2.1.26, the jdbc mode is selected.

    • For Hologres 2.0 and earlier, the holohub mode is selected.

    Important

    In VVR 11.1 and later, the connector consumes binlog data by default. Ensure you have enabled binlog. Otherwise, errors may occur.

    Permission issues

    If you are not a superuser, you must grant the required permissions to consume binlogs in JDBC mode.

    user_name is the ID of your Alibaba Cloud account or RAM user (Account overview).

    -- In the standard PostgreSQL authorization model, grant the CREATE permission to the user and grant the replication role on the instance to the user.
    GRANT CREATE ON DATABASE <db_name> TO <user_name>;
    alter role <user_name> replication;
    
    -- If the database uses the simple permission model (SLMP), you cannot run the GRANT statement. Use spm_grant to grant the Admin permission on the database to the user. You can also grant the permission in the HoloWeb console.
    call spm_grant('<db_name>_admin', '<user_name>');
    alter role <user_name> replication;

    Sink table

    • For VVR 8 and earlier, select the data writing mode using the sdkMode parameter.

    • For VVR 11 and later, select the data writing mode using the sink.write-mode parameter.

    VVR version

    Hologres version

    RPC mode affected

    Actual write mode

    Default/recommended value

    Notes

    6.0.4 to 8.0.2

    < 2.0

    No

    rpc

    Custom

    N/A

    6.0.4 to 8.0.2

    ≥ 2.0

    Yes

    jdbc_fixed (automatic switchover)

    Custom

    To prevent deduplication, set 'jdbcWriteBatchSize'='1'.

    ≥ 8.0.3

    Any version

    Yes

    jdbc_fixed (automatic switchover)

    Custom

    If you configure the mode as rpc, the system automatically switches the mode to jdbc_fixed and sets 'jdbcWriteBatchSize'='1' to prevent deduplication.

    ≥ 8.0.5

    Any version

    Yes

    jdbc_fixed (automatic switchover)

    Custom

    If you configure the mode as rpc, the system automatically switches the mode to jdbc_fixed and sets 'deduplication.enabled'='false' to prevent deduplication.

    Important
    • The rpc service is deprecated in Hologres 2.0 and later. If you set this parameter to rpc, Flink automatically switches the value to jdbc_fixed. If you set the parameter to a different value, Flink uses the specified value.

    • The rpc mode is removed from VVR 11.1 and later. We recommend using the jdbc mode for connections.

    • For write operations in high-concurrency scenarios, we recommend that you use the jdbc_copy or COPY_STREAM mode.

    Dimension table

    VVR version

    Hologres version

    RPC mode affected

    Actual consumption mode

    Default/recommended value

    Notes

    6.0.4 to 8.0.2

    < 2.0

    No

    rpc

    Custom

    N/A

    6.0.4 to 8.0.2

    ≥ 2.0

    Yes

    jdbc_fixed (automatic switchover)

    Custom

    The rpc service is deprecated for Hologres instances of version 2.0 or later. If you set this parameter to rpc, Flink automatically switches the value to jdbc_fixed. However, if you set the parameter to a different value, Flink uses the value that you specified.

    ≥ 8.0.3

    Any version

    Yes

    jdbc_fixed (automatic switchover)

    Custom

    ≥ 8.0.5

    Any version

    Yes

    jdbc_fixed (automatic switchover)

    Custom

    Important

    The rpc mode is removed from VVR 11.1 and later. By default, the jdbc mode is used for connections. You can also enable the lightweight connection mode by setting the connection.fixed.enabled parameter.

  • To read JSONB data from a binlog source table in JDBC mode, you must enable a GUC parameter at the database level.

    -- Enable the GUC parameter at the database level. Only a superuser can execute this command. You need to run this command only once for each database.
    alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;
  • An UPDATE operation generates two consecutive binlog records: the update_before record for the old data, followed by the update_after record for the new data.

  • Avoid running TRUNCATE or other table rebuild operations on a binlog source table. For more information, see FAQ.

  • Ensure that the precision of the DECIMAL type is consistent between Flink and Hologres to avoid errors. For more information, see the FAQ.

  • When you use the initial mode for unified full and incremental data consumption from a source table, global ordering is not guaranteed. If downstream systems depend on time-based calculations, use a different, binlog-only consumption mode.

Enable binlog

For new tables

The real-time data read feature is disabled by default. Therefore, when you create a table in HoloWeb by using a DDL statement, you must set the binlog.level and binlog.ttl parameters. The following is an example.

begin;
create table test_table(
  id int primary key, 
  title text not null, 
  body text);
call set_table_property('test_table', 'orientation', 'row');--Create a row-oriented table named test_table.
call set_table_property('test_table', 'clustering_key', 'id');--Create a clustering key on the id column.
call set_table_property('test_table', 'binlog.level', 'replica');--Enable the binlog feature.
call set_table_property('test_table', 'binlog.ttl', '86400');--Set the time-to-live (TTL) for the binlog in seconds.
commit;

For existing tables

In HoloWeb, you can use the following statement to enable Binlog for an existing table and set the Binlog TTL. table_name is the name of the table for which you want to enable Binlog.

-- Enable the binlog feature.
begin;
call set_table_property('<table_name>', 'binlog.level', 'replica');
commit;

-- Set the binlog time-to-live (TTL) in seconds.
begin;
call set_table_property('<table_name>', 'binlog.ttl', '2592000');
commit;

WITH parameters

VVR 11 renamed or removed some Hologres connector options but remains backward compatible with VVR 8. Refer to the parameter documentation for your version.

Type mapping

For data type mapping between Flink and Hologres, see Data type mapping between Flink and Hologres.

Note

Hologres supports the GENERATED ALWAYS AS syntax to define a generated column. For example:

ds TIMESTAMP NOT NULL GENERATED ALWAYS AS (date_trunc('month', create_time)) STORED

The NOT NULL constraint on a generated column is mapped to a nullable field, as expected. Because the value of a generated column is computed by Hologres, Flink does not write a value for this field. If the NOT NULL constraint were preserved, the write operation would fail validation in the Hologres client. This behavior does not affect the NOT NULL constraint on a regular column (for example, ds TIMESTAMP NOT NULL).

Examples

Source table examples

Binlog source table

CDC mode

In this mode, the source consumes binlog data and automatically sets the appropriate Flink RowKind type for each row based on the hg_binlog_event_type, eliminating the need for explicit declarations. These types include INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER. This enables table data mirroring, similar to the Change Data Capture (CDC) functionality in databases such as MySQL or PostgreSQL. The following is an example DDL for the source table.

VVR 11+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='${secret_values.ak_id}',            --Use variables for your AccessKey pair to prevent key leakage. 
  'password'='${secret_values.ak_secret}',        
  'endpoint'='<yourEndpoint>',
  'source.binlog.change-log-mode'='ALL',  --Reads all changelog types, including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER.
  'retry-count'='10',                     --The number of retries on a binlog read error.
  'retry-sleep-step-ms'='5000',           --The incremental backoff time between retries. The first retry waits 5 seconds, the second waits 10 seconds, and so on.
  'source.binlog.batch-size'='512'        --The batch size for reading binlog data.
);

VVR 8+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --Use variables for your AccessKey pair to prevent key leakage.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'sdkMode'='jdbc',
  'binlogMaxRetryTimes' = '10',     --The number of retries on a binlog read error.
  'binlogRetryIntervalMs' = '500',  --The retry interval in milliseconds after a binlog read error.
  'binlogBatchReadSize' = '100'     --The batch size for reading binlog data.
);

Non-CDC mode

In this mode, the source passes consumed binlog data to downstream nodes as regular Flink data, treating all records as the INSERT type. You can handle records with a specific hg_binlog_event_type based on your business requirements. The following is an example DDL for the source table.

VVR 11+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --Use variables for your AccessKey pair to prevent key leakage.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'source.binlog.change-log-mode'='ALL_AS_APPEND_ONLY',  --All changelog types are treated as INSERT operations.
  'retry-count'='10',                     --The number of retries on a binlog read error.
  'retry-sleep-step-ms'='5000',           --The incremental backoff time between retries. The first retry waits 5 seconds, the second waits 10 seconds, and so on.
  'source.binlog.batch-size'='512'        --The batch size for reading binlog data.
);

VVR 8+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --Use variables for your AccessKey pair to prevent key leakage.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'binlogMaxRetryTimes' = '10',     --The number of retries on a binlog read error.
  'binlogRetryIntervalMs' = '500',  --The retry interval in milliseconds after a binlog read error.
  'binlogBatchReadSize' = '100'     --The batch size for reading binlog data.
);

Non-binlog source table

VVR 11+

Important

Starting with VVR 11.1, the connector consumes binlog data by default. To read from a non-binlog source table, you must explicitly set 'source.binlog' to 'false'. For more information, see Binlog source table.

CREATE TEMPORARY TABLE hologres_source (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --Use variables for your AccessKey pair to prevent key leakage.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'source.binlog'='false'                      --Specifies whether to consume binlog data.
);

VVR 8+

CREATE TEMPORARY TABLE hologres_source (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --Use variables for your AccessKey pair to prevent key leakage.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'sdkMode' = 'jdbc'
);

Sink table

CREATE TEMPORARY TABLE datagen_source(
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='datagen'
);
CREATE TEMPORARY TABLE hologres_sink (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       -- Use variable management to prevent AK/SK key leakage.
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>'
);
INSERT INTO hologres_sink SELECT * from datagen_source;

Dimension table example

CREATE TEMPORARY TABLE datagen_source (
  a INT,
  b BIGINT,
  c STRING,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hologres_dim (
  a INT, 
  b VARCHAR, 
  c VARCHAR
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       -- Use variables for your AccessKey pair to prevent key leakage.
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>'
);
CREATE TEMPORARY TABLE blackhole_sink (
  a INT,
  b STRING
) WITH (
  'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;

Advanced features

Unified full and incremental ingestion

Scenarios

  • This feature applies only to source tables that have a primary key. It is recommended for Hologres source tables that use CDC mode.

  • Hologres allows you to enable Binlog on demand. You can enable Binlog for existing tables that already contain data.

Code example

VVR 11+

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'source.binlog.startup-mode' = 'INITIAL',   --Reads all historical data, then incrementally consumes the Binlog.
  'retry-count'='10',                         --Number of retries if an error occurs while reading Binlog data.
  'retry-sleep-step-ms'='5000',               --Incremental wait time between retries. The first retry waits 5 seconds, the second waits 10 seconds, and so on.
  'source.binlog.batch-size'='512'            --Number of rows to read from the Binlog in a single batch.
  );
Note
  • Set source.binlog.startup-mode to INITIAL to perform an initial full read of the table before switching to incremental consumption of the Binlog.

  • If the startTime parameter is set, either directly or by selecting a start time in the startup UI, it takes precedence and sets binlogStartUpMode to timestamp mode, overriding other mode settings, because the startTime parameter has a higher priority.

VVR 8+

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'binlogStartUpMode' = 'initial', --Reads all historical data, then incrementally consumes the Binlog.
  'binlogMaxRetryTimes' = '10',     --Number of retries if an error occurs while reading Binlog data.
  'binlogRetryIntervalMs' = '500',  --Retry interval in milliseconds after a Binlog read error.
  'binlogBatchReadSize' = '100'     --Number of rows to read from the Binlog in a single batch.
  );
Note
  • Set binlogStartUpMode to initial to perform an initial full read of the table before switching to incremental consumption of the Binlog.

  • If the startTime parameter is set, either directly or by selecting a start time in the startup UI, it takes precedence and sets binlogStartUpMode to timestamp mode, overriding other mode settings, because the startTime parameter has a higher priority.

Primary key conflict resolution

The connector offers three strategies for handling duplicate primary keys on write.

VVR 11+

You can specify a strategy by setting the sink.on-conflict-action parameter.

Value

Description

INSERT_OR_IGNORE

Keeps the first-arriving record and ignores subsequent duplicates.

INSERT_OR_REPLACE

Overwrites the existing record with the new one.

INSERT_OR_UPDATE (Default)

Updates only the columns provided in the sink table, leaving other columns in the existing record unchanged.

VVR 8+

You can specify a strategy by setting the mutatetype parameter.

Value

Description

insertorignore (Default)

Keeps the first-arriving record and ignores subsequent duplicates.

insertorreplace

Overwrites the existing record with the new one.

insertorupdate

Updates only the columns provided in the sink table, leaving other columns in the existing record unchanged.

For example, assume a table has columns a, b, c, and d, and a is the primary key. If the sink table provides only columns a and b, setting the strategy to INSERT_OR_UPDATE updates only column b. Columns c and d remain unchanged.
Note

However, any columns in the physical table that are omitted from the sink table must be nullable. Otherwise, the write operation will fail.

Write to partitioned tables

By default, the Hologres sink writes data to a single, non-partitioned table. To write to a partitioned table by targeting its parent table, you must enable the following options.

VVR 11+

To allow the connector to automatically create a child partition if it does not exist, set sink.create-missing-partition to true.

Note
  • VVR 11.1 and later versions support writing to partitioned tables by default and automatically route data to the correct child partitions.

  • Set the tablename parameter to the name of the parent table.

  • If a required child partition does not exist and sink.create-missing-partition=true is not set, the write operation will fail.

VVR 8+

  • To automatically route data to the corresponding child partitions, set partitionRouter to true.

  • To allow the connector to automatically create a child partition if it does not exist, set createparttable to true.

Note
  • Set the tablename parameter to the name of the parent table.

  • If a required child partition does not exist and createparttable=true is not set, the write operation will fail.

Merge streams and partial updates

When writing multiple streams to a single Hologres wide table, the connector merges records with the same primary key. Partial updates write only modified columns instead of replacing the entire row, improving write performance and data consistency.

Limitations

  • The wide table must have a primary key.

  • Each data stream must include all columns that constitute the primary key.

  • For wide tables that use column-oriented storage, merging streams at a high rate of requests per second (RPS) can cause high CPU usage. To mitigate this, consider disabling dictionary encoding for the table's columns.

Example

Assume you have two Flink data streams. The first stream contains columns a, b, and c. The second stream contains columns a, d, and e. The Hologres wide table, WIDE_TABLE, contains columns a, b, c, d, and e, where column a is the primary key.

VVR 11+

// source1 and source2 are already defined.
CREATE TEMPORARY TABLE hologres_sink ( -- Declare columns a, b, c, d, and e.
  a BIGINT, 
  b STRING,
  c STRING,
  d STRING,
  e STRING,
  primary key(a) not enforced
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourWideTablename>',  -- The Hologres wide table, which contains columns a, b, c, d, and e.
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'sink.on-conflict-action'='INSERT_OR_UPDATE',   -- Update specific columns based on the primary key.
  'sink.delete-strategy'='IGNORE_DELETE',         -- Strategy for handling retraction messages. IGNORE_DELETE is suitable for append-only or upsert streams where delete operations are not required.
  'sink.partial-insert.enabled'='true'            -- Enables partial updates. Only the columns specified in the INSERT statement are sent to the connector.
);

BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1;  -- Declare that only columns a, b, and c are inserted.
INSERT INTO hologres_sink(a,d,e) select * from source2;  -- Declare that only columns a, d, and e are inserted.
END;

VVR 8+

// source1 and source2 are already defined.
CREATE TEMPORARY TABLE hologres_sink ( -- Declare columns a, b, c, d, and e.
  a BIGINT, 
  b STRING,
  c STRING,
  d STRING,
  e STRING,
  primary key(a) not enforced
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourWideTablename>',  -- The Hologres wide table, which contains columns a, b, c, d, and e.
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'mutatetype'='insertorupdate',    -- Update specific columns based on the primary key.
  'ignoredelete'='true',            -- Ignore DELETE requests generated by retraction messages.
  'partial-insert.enabled'='true'   -- Enable partial updates to update only the columns declared in the INSERT statement.
);

BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1;  -- Declare that only columns a, b, and c are inserted.
INSERT INTO hologres_sink(a,d,e) select * from source2;  -- Declare that only columns a, d, and e are inserted.
END;
Note

Set ignoredelete to true to ignore Delete requests that are generated by retraction messages. For VVR 8.0.8 and later, we recommend that you use sink.delete-strategy to configure various strategies for partial updates.

Consume binlog from partitioned tables (Beta)

The Hologres connector supports consuming binlog from both physical and logical partitioned tables. For the differences between them, see CREATE LOGICAL PARTITION TABLE.

Consume binlog from physical partitioned tables

The Hologres connector can consume binlog from a partitioned table and dynamically monitor new partitions within a single job. This significantly improves real-time data processing efficiency and usability.

Usage notes

  • This feature is available only for binlog source tables in JDBC mode. It requires VVR 8.0.11 or later and a Hologres instance of version 2.1.27 or later.

  • The partition name must follow the dynamic partitioning naming convention: {parent_table}_{partition_value}. Non-conforming partitions might not be consumed.

    Important
    • For DYNAMIC mode, VVRversion 8.0.11 does not support partition columns with a - delimiter (such as YYYY-MM-DD).

    • Starting from VVR 11.1, you can consume data from partition columns that use a custom format.

    • This restriction does not apply when you write to partitioned tables.

  • When declaring a Hologres source table in Flink, you must include its partition columns.

  • In DYNAMIC mode, a partitioned table must have dynamic partitioning enabled. In addition, the partition pre-creation parameter auto_partitioning.num_precreate must be greater than 1. Otherwise, the job will throw an exception when it attempts to consume the latest partition.

  • In DYNAMIC mode, after a new partition is added, the connector no longer consumes subsequent data changes from older partitions.

Examples

Mode

Features

Description

DYNAMIC

dynamic partition consumption

Automatically monitors and consumes new partitions in chronological order. This mode is suitable for real-time data streaming scenarios.

STATIC

static partition consumption

Consumes only existing partitions (or manually specified ones) and does not automatically discover new partitions. This mode is suitable for processing historical data within a fixed range.

Dynamic mode

VVR 11+

Assume a Hologres partitioned table is created with the following DDL, and that binlog and dynamic partitioning are enabled.

CREATE TABLE "test_message_src1" (
    id int,
    title text,
    body text,
    dt text,
    PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
    binlog_level = 'replica', 
    auto_partitioning_enable =  'true',   -- Enables dynamic partitioning.
    auto_partitioning_time_unit = 'DAY',  -- Partitions are created daily. Example partition names: test_message_src1_20250512, test_message_src1_20250513.
    auto_partitioning_num_precreate = '2' -- Pre-creates two partitions.
);
-- For an existing partitioned table, you can also enable dynamic partitioning using ALTER TABLE.

In Flink, use the following SQL statement to consume the partitioned table test_message_src1 in DYNAMIC mode.

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  dt VARCHAR  -- The partition column of the Hologres partitioned table.
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src1',  -- The parent table for which dynamic partitioning is enabled.
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'source.binlog.partition-binlog-mode' = 'DYNAMIC', -- Dynamically monitors the latest partitions.
  'source.binlog.startup-mode' = 'initial'           -- Consumes all existing data, then consumes incremental data from the binlog.
);

VVR 8.0.11

Assume a Hologres partitioned table is created with the following DDL, and that binlog and dynamic partitioning are enabled.

CREATE TABLE "test_message_src1" (
    id int,
    title text,
    body text,
    dt text,
    PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
    binlog_level = 'replica', 
    auto_partitioning_enable =  'true',   -- Enables dynamic partitioning.
    auto_partitioning_time_unit = 'DAY',  -- Partitions are created daily. Example partition names: test_message_src1_20241027, test_message_src1_20241028.
    auto_partitioning_num_precreate = '2' -- Pre-creates two partitions.
);

-- For an existing partitioned table, you can also enable dynamic partitioning using ALTER TABLE.

In Flink, use the following SQL statement to consume data from the partitioned table test_message_src1 in DYNAMIC mode.

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  dt VARCHAR  -- The partition column of the Hologres partitioned table.
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src1',  -- The parent table for which dynamic partitioning is enabled.
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'binlog' = 'true',
  'partition-binlog.mode' = 'DYNAMIC',  -- Dynamically monitors the latest partitions.
  'binlogstartUpMode' = 'initial',      -- Consumes all existing data, then consumes incremental data from the binlog.
  'sdkMode' = 'jdbc_fixed'              -- Use this mode to avoid connection limit issues.
);

Static mode

VVR 11+

Assume a Hologres partitioned table is created with the following DDL, and binlog is enabled.

CREATE TABLE test_message_src2 (
    id int,
    title text,
    body text,
    color text,
    PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
    binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');

In Flink, use the following SQL statement to consume the partitioned table test_message_src2 in STATIC mode.

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  color VARCHAR  -- The partition column of the Hologres partitioned table.
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src2',  -- The partitioned table.
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'source.binlog.partition-binlog-mode' = 'STATIC', -- Consumes a fixed set of partitions.
  'source.binlog.partition-values-to-read' = 'red,blue,green',  -- Consumes only the three specified partitions. The 'black' partition is not consumed. New partitions are also not consumed. If this option is not set, the job consumes all partitions of the parent table.
  'source.binlog.startup-mode' = 'initial'  -- Consumes all existing data, then consumes incremental data from the binlog.
);

VVR 8.0.11

Assume a Hologres partitioned table is created with the following DDL, and binlog is enabled.

CREATE TABLE test_message_src2 (
    id int,
    title text,
    body text,
    color text,
    PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
    binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');

In Flink, use the following SQL statement to consume data from the partitioned table test_message_src2 in STATIC mode.

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  color VARCHAR  -- The partition column of the Hologres partitioned table.
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src2',  -- The partitioned table.
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'binlog' = 'true',
  'partition-binlog.mode' = 'STATIC', -- Consumes a fixed set of partitions.
  'partition-values-to-read' = 'red,blue,green',  -- Consumes only the three specified partitions. The 'black' partition is not consumed. New partitions are also not consumed. If this option is not set, the job consumes all partitions of the parent table.
  'binlogstartUpMode' = 'initial',  -- Consumes all existing data, then consumes incremental data from the binlog.
  'sdkMode' = 'jdbc_fixed' -- Use this mode to avoid connection limit issues.
);

Consume binlog from logical partitioned tables

The Hologres connector supports consuming binlog from logical partitioned tables and lets you specify which partitions to consume with options.

Usage notes

  • Consuming binlog from specific partitions of a logical partitioned table requires VVR 11.0.0 or later and a Hologres instance of version V3.1 or later.

  • Consuming binlog from all partitions of a logical partitioned table follows the same approach as a non-partitioned table (Source tables).

Examples

Parameter

Description

Example

source.binlog.logical-partition-filter-column-names

The partition column names that specify which partitions to consume. Enclose column names in double quotation marks ("). Separate multiple column names with commas (,). If a column name contains a double quotation mark, escape it with an additional double quotation mark.

'source.binlog.logical-partition-filter-column-names'='"Pt","id"'

Two partition columns are used: Pt and id.

source.binlog.logical-partition-filter-column-values

The partition values that specify which partitions to consume. A partition is specified by a set of values, one for each partition column. Enclose each value in double quotation marks ("). Separate values for the same partition with a comma (,). Separate partitions with a semicolon (;). If a value contains a double quotation mark, escape it with an additional double quotation mark.

'source.binlog.logical-partition-filter-column-values'='"20240910","0";"special""value","9"'

This specifies two partitions to consume. The first partition value is (20240910, 0), and the second is (special"value, 9).

Assume that you have created the following table in Hologres.

CREATE TABLE holo_table (
    id int not null,
    name text,
    age numeric(18,4),
    "Pt" text,
    primary key(id, "Pt")
)
LOGICAL PARTITION BY LIST ("Pt", id)
WITH (
    binlog_level ='replica'
);

To consume the binlog of this table in Flink:

CREATE TEMPORARY TABLE test_src_binlog_table(
  id INTEGER,
  name VARCHAR,
  age decimal(18,4),
  `Pt` VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='holo_table',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'source.binlog'='true',
  'source.binlog.logical-partition-filter-column-names'='"Pt","id"',
  'source.binlog.logical-partition-filter-column-values'='<yourPartitionColumnValues>',
  'source.binlog.change-log-mode'='ALL',  --Reads all changelog types, including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER.
  'retry-count'='10',                     -- The number of retries for binlog read errors.
  'retry-sleep-step-ms'='5000',           --The incremental backoff time between retries. The first retry waits 5 seconds, the second waits 10 seconds, and so on.
  'source.binlog.batch-size'='512'        --The number of rows to read from the binlog in a single batch.
);

DataStream API

Important

To read from or write to Hologres using the DataStream API, set up the corresponding DataStream connector (How to use DataStream connectors). The Hologres DataStream connector is available in Maven Central. For local debugging, use the Uber JAR (Run and debug jobs that contain connectors locally).

Hologres source table

Binlog source table

VVR provides the HologresBinlogSource class to read Hologres binlog data. The following example shows how to build a Hologres binlog source.

VVR 11.3+

Important

Starting from VVR 11.1.2, the JDBCOptions and startTimeMs parameters have been removed from the HologresBinlogSource constructor. Starting from VVR 11.3, a List<Subscribe.BinlogFilter> parameter has been added. If you use VVR 11 or later, we recommend that you use VVR 11.3 or later.

public class Sample {                                                                                                                                                                          
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // Initialize the schema for the table to read. The schema must match the fields of the Hologres table schema. You can define a subset of the fields.
            TableSchema schema = TableSchema.builder()
                    .field("a", DataTypes.INT())
                    .field("b", DataTypes.STRING())
                    .field("c", DataTypes.TIMESTAMP())
                    .build();

            // The name of the table to read.
            String sourceTableName = "sourceTableName";

            // Parameters for the Hologres connection.
            Configuration config = new Configuration();
            config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
            config.setString(HologresConfigs.USERNAME, "yourUserName");
            config.setString(HologresConfigs.PASSWORD, "yourPassword");
            config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
            config.setString(HologresConfigs.TABLE, sourceTableName);
            config.set(HologresConfigs.BINLOG, true);
            config.set(HologresConfigs.BINLOG_CHANGE_LOG_MODE, BinlogChangeLogMode.ALL);
            // Build the Hologres binlog source.
            HologresBinlogSource source = new HologresBinlogSource(
                    new HologresConnectionParam(config),
                    schema,
                    config,
                    StartupMode.INITIAL,
                    sourceTableName,
                    "",
                    Collections.emptyList(),
                    -1,
                    Collections.emptySet(),
                    Collections.emptyList()
            );
            env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
            env.execute();
        }
  }

VVR 8.0.11+

public class Sample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Initialize the schema for the table to read. The schema must match the fields of the Hologres table schema. You can define a subset of the fields.
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .field("c", DataTypes.TIMESTAMP())
                .build();
        // Parameters for the Hologres connection.
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
        config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
        // Build JDBCOptions.
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        // Build the Hologres binlog source.
        long startTimeMs = 0;
        HologresBinlogSource source = new HologresBinlogSource(
                new HologresConnectionParam(config),
                schema,
                config,
                jdbcOptions,
                startTimeMs,
                StartupMode.INITIAL,
                "",
                "",
                -1,
                Collections.emptySet(),
                new ArrayList<>()
        );
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
        env.execute();
    }
}

VVR 8.0.7+

public class Sample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Initialize the schema for the table to read. The schema must match the fields of the Hologres table schema. You can define a subset of the fields.
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .field("c", DataTypes.TIMESTAMP())
                .build();
        // Parameters for the Hologres connection.
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
        config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
        // Build JDBCOptions.
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        // Build the Hologres binlog source.
        long startTimeMs = 0;
        HologresBinlogSource source = new HologresBinlogSource(
                new HologresConnectionParam(config),
                schema,
                config,
                jdbcOptions,
                startTimeMs,
                StartupMode.INITIAL,
                "",
                "",
                -1,
                Collections.emptySet()
        );
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
        env.execute();
    }
}

VVR 6.0.7+

public class Sample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Initialize the schema for the table to read. The schema must match the fields of the Hologres table schema. You can define a subset of the fields.
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .build();
         // Parameters for the Hologres connection.
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
        config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
        // Build JDBCOptions.
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        // Set or create the default slot name.
        config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));

        boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE) && config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
        // Build the JDBCBinlogRecordConverter.
        JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
                jdbcOptions.getTable(),
                schema,
                new HologresConnectionParam(config),
                cdcMode,
                Collections.emptySet());
        
        // Build the Hologres binlog source.
        long startTimeMs = 0;
        HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
                new HologresConnectionParam(config),
                schema,
                config,
                jdbcOptions,
                startTimeMs,
                StartupMode.TIMESTAMP,
                recordConverter,
                "",
                -1);
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
        env.execute();
    }
}
Important

If you use a Flink engine version earlier than 8.0.5 or Hologres earlier than 2.1, ensure the user is a superuser or has the Replication Role (Hologres permission issues).

Non-binlog source table

VVR provides the HologresBulkreadInputFormat class, an implementation of RichInputFormat, to read data from Hologres tables. The following example shows how to build a Hologres source.

public class Sample {
    public static void main(String[] args) throws Exception {
        // set up the Java DataStream API
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Initialize the schema for the table to read. The schema must match the fields of the Hologres table schema. You can define a subset of the fields.
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .field("c", DataTypes.TIMESTAMP())
                .build();
        // Parameters for the Hologres connection.
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        // Build JDBCOptions.
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(
                new HologresConnectionParam(config),
                jdbcOptions,
                schema,
                "",
                -1);
        TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
        env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo).print();
        env.execute();
    }
}

Maven dependency

The Hologres DataStream connector is available in Maven Central.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-hologres</artifactId>
    <version>${vvr-version}</version>
</dependency>

Hologres sink table

VVR 11+

public class Sample {
      public static void main(String[] args) throws Exception {
          final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          // Initialize the schema for the table to write to. The schema must match the fields of the Hologres table schema. You can define a subset of the fields.
          TableSchema tableSchema = TableSchema.builder()
                  .field("a", DataTypes.INT().notNull())
                  .field("b", DataTypes.STRING())
                  .primaryKey("a")
                  .build();
          // Parameters for the Hologres connection.
          Configuration config = new Configuration();
          config.set(HologresConfigs.ENDPOINT, "yourEndpoint");
          config.set(HologresConfigs.USERNAME, "yourUserName");
          config.set(HologresConfigs.PASSWORD, "yourPassword");
          config.set(HologresConfigs.DATABASE, "yourDatabaseName");
          config.set(HologresConfigs.TABLE, "yourTableName");
          HologresConnectionParam connectionParam = new HologresConnectionParam(config);
          HologresTableSchema hologresTableSchema =
                  HologresTableSchema.get(connectionParam.getJDBCOptions());
          // The indexes of the columns to write to the sink.
          Integer[] targetColumnIndexes = {0, 1};
          // Build the Hologres sink.
          HologresSinkFunction sinkFunction =
                  new HologresSinkFunction(
                          connectionParam, tableSchema, targetColumnIndexes, hologresTableSchema);
          TypeInformation<RowData> typeInfo = InternalTypeInfo.of(tableSchema.toRowDataType().getLogicalType());
          env.fromElements((RowData) GenericRowData.of(101, StringData.fromString("name"))).returns(typeInfo).addSink(sinkFunction);
          env.execute();
      }
  }

VVR 8+

public class Sample {
    public static void main(String[] args) throws Exception {
        // set up the Java DataStream API
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Initialize the schema for the table to write to. The schema must match the fields of the Hologres table schema. You can define a subset of the fields.
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .build();
        // Parameters for the Hologres connection.
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
        
         // Build a Hologres writer to write data as RowData.
        AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
                hologresConnectionParam, 
                schema, 
                HologresTableSchema.get(hologresConnectionParam), 
                new Integer[0]);
        // Build the Hologres sink.
        HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
        TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
        env.fromElements((RowData) GenericRowData.of(101, StringData.fromString("name"))).returns(typeInfo).addSink(sinkFunction);
        env.execute();
    }
}

Metadata columns

VVR 8.0.11 and later support metadata columns in binlog source tables. Declare binlog fields such as hg_binlog_event_type as metadata columns to access source database name, table name, change type, and event timestamp for custom logic (for example, filtering DELETE events).

Parameter

Type

Description

db_name

STRING NOT NULL

The name of the database that contains the row.

table_name

STRING NOT NULL

The name of the table that contains the row.

hg_binlog_lsn

BIGINT NOT NULL

A system field for the binlog sequence number. The value increases monotonically but not contiguously within a shard. Uniqueness and order are not guaranteed across shards.

hg_binlog_timestamp_us

BIGINT NOT NULL

The timestamp of the change event in the database, in microseconds (us).

hg_binlog_event_type

BIGINT NOT NULL

The change type of the row. Valid values are:

  • 5: An INSERT message.

  • 2: A DELETE message.

  • 3: The row image before an UPDATE operation.

  • 7: The row image after an UPDATE operation.

hg_shard_id

INT NOT NULL

The ID of the data shard that contains the row (table group and shard).

In a DDL statement, you can declare a metadata column by using <meta_column_name> <datatype> METADATA VIRTUAL. The following is an example:

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn bigint METADATA VIRTUAL
  hg_binlog_event_type bigint METADATA VIRTUAL
  hg_binlog_timestamp_us bigint METADATA VIRTUAL
  hg_shard_id int METADATA VIRTUAL
  db_name string METADATA VIRTUAL
  table_name string METADATA VIRTUAL
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  ...
  );

FAQ

References