Use the Hologres connector to read from and write to Hologres tables in stream and batch modes, with support for CDC, binlog consumption, and partial updates.
Overview
Hologres is a unified real-time data warehouse that supports standard SQL (PostgreSQL-compatible), petabyte-scale OLAP, ad hoc analysis, and high-concurrency low-latency data serving. It integrates with MaxCompute, Realtime Compute for Apache Flink, and DataWorks. The following table describes the connector capabilities.
|
Category |
Description |
|
Supported types |
source, dimension, and sink tables |
|
Execution mode |
stream and batch modes |
|
Data format |
Not supported |
|
Specific monitoring metrics |
|
|
API types |
DataStream and SQL |
|
Supports data updates or deletions in sink tables |
Yes |
Features
|
Feature |
Description |
|
Supports reading Hologres data with or without binlog in both CDC and non-CDC modes. |
|
|
Supports full, incremental, and unified full and incremental consumption. |
|
|
Supports ignoring new data, replacing entire rows, or updating only specific fields. |
|
|
Updates only the modified columns instead of the entire row. |
|
|
Supports consuming binlog from both physical and logical partitioned tables. For physical partitioned tables, a single job can monitor all partitions, including newly added ones. |
|
|
Supports writing to a parent table to automatically create the corresponding child partitions. |
|
|
Real-time synchronization for a single table or an entire database |
Supports real-time synchronization of a single table or an entire database, with the following key features:
For more information, see CREATE TABLE AS (CTAS) statement and Quick Start for real-time database synchronization. |
Limitations and recommendations
Limitations
-
Foreign tables not supported: The Hologres connector does not support Hologres foreign tables, such as MaxCompute foreign tables.
-
Time type restriction: The Hologres connector does not currently support real-time consumption of
TIMESTAMPdata. When creating tables, use theTIMESTAMPTZtype exclusively. -
Source table scan mode (Ververica Runtime (VVR) 8 and earlier): By default, the connector reads data in batch mode, scanning the full table only once. Consequently, it does not consume newly added data.
-
Watermark restriction (Ververica Runtime (VVR) 8 and earlier): CDC mode does not support watermark definitions. To perform windowed aggregation, use a non-windowed aggregation solution instead.
-
Behavior of time functions in COPY mode: When you use the
COPY_STREAMorCOPY_BULK_LOADwrite mode, the values of table columns that useCURRENT_TIMESTAMPorNOW()as the default are fixed to the connection start time and do not update for each row. Thehg_binlog_timestamp_usbinlog metadata column provides the actual ingestion time.
Recommendations
-
Storage format selection:
-
For dimension table point lookups: Use row-oriented storage. You must set a primary key and a clustering key.
-
For one-to-many queries on dimension tables: Use column-oriented storage. For optimal performance, properly configure the distribution key and segment key.
-
For tables requiring frequent updates and analytical queries: If a table must support both real-time binlog consumption and OLAP analysis, we strongly recommend using row-column hybrid storage.
ImportantIf you do not specify a storage format when you create a table in Hologres, it defaults to column-oriented storage. You cannot change the storage format after table creation. Create a Table in Hologres | Table Storage Formats: Row-oriented, Column-oriented, and Row-column Hybrid.
-
-
Job parallelism settings: Set the Flink job parallelism to match the number of shards in the Hologres table.
-- In HoloWeb, run the following statement to find the number of shards in a table. Replace <tablename> with your table name. select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>'; -
Versions and features: Regularly check the Hologres connector Release Notes for known issues, feature updates, and version compatibility information.
Usage notes
-
Compatibility and limitations for Hologres and VVR consumption modes
Source table
-
For VVR 8 and earlier, select the consumption mode using the
sdkModeparameter. -
For VVR 11 and later, select the consumption mode using the
source.binlog.read-modeparameter.
VVR version
Hologres version
Default/recommended value
Actual consumption mode
Notes
≥ 6.0.7
< 2.0
Custom
holohub (default)
We recommend setting the value to
jdbc.6.0.7 to 8.0.4
≥ 2.0
jdbc (automatic switchover, no configuration required)
jdbc (forced)
The
holohubservice is deprecated in Hologres 2.0 and later. The system automatically switches to thejdbcmode, which may cause permission issues. For permission configuration, see Permission issues.≥ 8.0.5
≥ 2.1
jdbc (automatic switchover, no configuration required)
jdbc (forced)
No permission issues. For Hologres 2.1.27 and later, the mode switches to
jdbc_fixed.≥ 11.1
Any version
AUTO (default)
Automatically selected based on the Hologres version
-
For Hologres 2.1.27 and later, the
jdbcmode is selected with lightweight connections enabled by default (theconnection.fixed.enabledparameter is set totrue). -
For Hologres versions from 2.1.0 to 2.1.26, the
jdbcmode is selected. -
For Hologres 2.0 and earlier, the
holohubmode is selected.
ImportantIn VVR 11.1 and later, the connector consumes binlog data by default. Ensure you have enabled binlog. Otherwise, errors may occur.
Sink table
-
For VVR 8 and earlier, select the data writing mode using the
sdkModeparameter. -
For VVR 11 and later, select the data writing mode using the
sink.write-modeparameter.
VVR version
Hologres version
RPC mode affected
Actual write mode
Default/recommended value
Notes
6.0.4 to 8.0.2
< 2.0
No
rpc
Custom
N/A
6.0.4 to 8.0.2
≥ 2.0
Yes
jdbc_fixed (automatic switchover)
Custom
To prevent deduplication, set 'jdbcWriteBatchSize'='1'.
≥ 8.0.3
Any version
Yes
jdbc_fixed (automatic switchover)
Custom
If you configure the mode as
rpc, the system automatically switches the mode tojdbc_fixedand sets 'jdbcWriteBatchSize'='1' to prevent deduplication.≥ 8.0.5
Any version
Yes
jdbc_fixed (automatic switchover)
Custom
If you configure the mode as
rpc, the system automatically switches the mode tojdbc_fixedand sets 'deduplication.enabled'='false' to prevent deduplication.Important-
The
rpcservice is deprecated in Hologres 2.0 and later. If you set this parameter torpc, Flink automatically switches the value tojdbc_fixed. If you set the parameter to a different value, Flink uses the specified value. -
The
rpcmode is removed from VVR 11.1 and later. We recommend using thejdbcmode for connections. -
For write operations in high-concurrency scenarios, we recommend that you use the
jdbc_copyorCOPY_STREAMmode.
Dimension table
VVR version
Hologres version
RPC mode affected
Actual consumption mode
Default/recommended value
Notes
6.0.4 to 8.0.2
< 2.0
No
rpc
Custom
N/A
6.0.4 to 8.0.2
≥ 2.0
Yes
jdbc_fixed (automatic switchover)
Custom
The
rpcservice is deprecated for Hologres instances of version 2.0 or later. If you set this parameter torpc, Flink automatically switches the value tojdbc_fixed. However, if you set the parameter to a different value, Flink uses the value that you specified.≥ 8.0.3
Any version
Yes
jdbc_fixed (automatic switchover)
Custom
≥ 8.0.5
Any version
Yes
jdbc_fixed (automatic switchover)
Custom
ImportantThe
rpcmode is removed from VVR 11.1 and later. By default, thejdbcmode is used for connections. You can also enable the lightweight connection mode by setting theconnection.fixed.enabledparameter. -
-
To read
JSONBdata from a binlog source table in JDBC mode, you must enable a GUC parameter at the database level.-- Enable the GUC parameter at the database level. Only a superuser can execute this command. You need to run this command only once for each database. alter database <db_name> set hg_experimental_enable_binlog_jsonb = on; -
An
UPDATEoperation generates two consecutive binlog records: theupdate_beforerecord for the old data, followed by theupdate_afterrecord for the new data. -
Avoid running
TRUNCATEor other table rebuild operations on a binlog source table. For more information, see FAQ. -
Ensure that the precision of the
DECIMALtype is consistent between Flink and Hologres to avoid errors. For more information, see the FAQ. -
When you use the
initialmode for unified full and incremental data consumption from a source table, global ordering is not guaranteed. If downstream systems depend on time-based calculations, use a different, binlog-only consumption mode.
Enable binlog
For new tables
The real-time data read feature is disabled by default. Therefore, when you create a table in HoloWeb by using a DDL statement, you must set the binlog.level and binlog.ttl parameters. The following is an example.
begin;
create table test_table(
id int primary key,
title text not null,
body text);
call set_table_property('test_table', 'orientation', 'row');--Create a row-oriented table named test_table.
call set_table_property('test_table', 'clustering_key', 'id');--Create a clustering key on the id column.
call set_table_property('test_table', 'binlog.level', 'replica');--Enable the binlog feature.
call set_table_property('test_table', 'binlog.ttl', '86400');--Set the time-to-live (TTL) for the binlog in seconds.
commit;
For existing tables
In HoloWeb, you can use the following statement to enable Binlog for an existing table and set the Binlog TTL. table_name is the name of the table for which you want to enable Binlog.
-- Enable the binlog feature.
begin;
call set_table_property('<table_name>', 'binlog.level', 'replica');
commit;
-- Set the binlog time-to-live (TTL) in seconds.
begin;
call set_table_property('<table_name>', 'binlog.ttl', '2592000');
commit;
WITH parameters
VVR 11 renamed or removed some Hologres connector options but remains backward compatible with VVR 8. Refer to the parameter documentation for your version.
Type mapping
For data type mapping between Flink and Hologres, see Data type mapping between Flink and Hologres.
Hologres supports the GENERATED ALWAYS AS syntax to define a generated column. For example:
ds TIMESTAMP NOT NULL GENERATED ALWAYS AS (date_trunc('month', create_time)) STORED
The NOT NULL constraint on a generated column is mapped to a nullable field, as expected. Because the value of a generated column is computed by Hologres, Flink does not write a value for this field. If the NOT NULL constraint were preserved, the write operation would fail validation in the Hologres client. This behavior does not affect the NOT NULL constraint on a regular column (for example, ds TIMESTAMP NOT NULL).
Examples
Source table examples
Binlog source table
CDC mode
In this mode, the source consumes binlog data and automatically sets the appropriate Flink RowKind type for each row based on the hg_binlog_event_type, eliminating the need for explicit declarations. These types include INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER. This enables table data mirroring, similar to the Change Data Capture (CDC) functionality in databases such as MySQL or PostgreSQL. The following is an example DDL for the source table.
VVR 11+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='${secret_values.ak_id}', --Use variables for your AccessKey pair to prevent key leakage.
'password'='${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'source.binlog.change-log-mode'='ALL', --Reads all changelog types, including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER.
'retry-count'='10', --The number of retries on a binlog read error.
'retry-sleep-step-ms'='5000', --The incremental backoff time between retries. The first retry waits 5 seconds, the second waits 10 seconds, and so on.
'source.binlog.batch-size'='512' --The batch size for reading binlog data.
);
VVR 8+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', --Use variables for your AccessKey pair to prevent key leakage.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'sdkMode'='jdbc',
'binlogMaxRetryTimes' = '10', --The number of retries on a binlog read error.
'binlogRetryIntervalMs' = '500', --The retry interval in milliseconds after a binlog read error.
'binlogBatchReadSize' = '100' --The batch size for reading binlog data.
);
Non-CDC mode
In this mode, the source passes consumed binlog data to downstream nodes as regular Flink data, treating all records as the INSERT type. You can handle records with a specific hg_binlog_event_type based on your business requirements. The following is an example DDL for the source table.
VVR 11+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', --Use variables for your AccessKey pair to prevent key leakage.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'source.binlog.change-log-mode'='ALL_AS_APPEND_ONLY', --All changelog types are treated as INSERT operations.
'retry-count'='10', --The number of retries on a binlog read error.
'retry-sleep-step-ms'='5000', --The incremental backoff time between retries. The first retry waits 5 seconds, the second waits 10 seconds, and so on.
'source.binlog.batch-size'='512' --The batch size for reading binlog data.
);
VVR 8+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', --Use variables for your AccessKey pair to prevent key leakage.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'binlogMaxRetryTimes' = '10', --The number of retries on a binlog read error.
'binlogRetryIntervalMs' = '500', --The retry interval in milliseconds after a binlog read error.
'binlogBatchReadSize' = '100' --The batch size for reading binlog data.
);
Non-binlog source table
VVR 11+
Starting with VVR 11.1, the connector consumes binlog data by default. To read from a non-binlog source table, you must explicitly set 'source.binlog' to 'false'. For more information, see Binlog source table.
CREATE TEMPORARY TABLE hologres_source (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', --Use variables for your AccessKey pair to prevent key leakage.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'source.binlog'='false' --Specifies whether to consume binlog data.
);
VVR 8+
CREATE TEMPORARY TABLE hologres_source (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', --Use variables for your AccessKey pair to prevent key leakage.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'sdkMode' = 'jdbc'
);
Sink table
CREATE TEMPORARY TABLE datagen_source(
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='datagen'
);
CREATE TEMPORARY TABLE hologres_sink (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', -- Use variable management to prevent AK/SK key leakage.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>'
);
INSERT INTO hologres_sink SELECT * from datagen_source;
Dimension table example
CREATE TEMPORARY TABLE datagen_source (
a INT,
b BIGINT,
c STRING,
proctime AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hologres_dim (
a INT,
b VARCHAR,
c VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', -- Use variables for your AccessKey pair to prevent key leakage.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>'
);
CREATE TEMPORARY TABLE blackhole_sink (
a INT,
b STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;
Advanced features
Unified full and incremental ingestion
Scenarios
-
This feature applies only to source tables that have a primary key. It is recommended for Hologres source tables that use CDC mode.
-
Hologres allows you to enable Binlog on demand. You can enable Binlog for existing tables that already contain data.
Code example
VVR 11+
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'source.binlog.startup-mode' = 'INITIAL', --Reads all historical data, then incrementally consumes the Binlog.
'retry-count'='10', --Number of retries if an error occurs while reading Binlog data.
'retry-sleep-step-ms'='5000', --Incremental wait time between retries. The first retry waits 5 seconds, the second waits 10 seconds, and so on.
'source.binlog.batch-size'='512' --Number of rows to read from the Binlog in a single batch.
);
-
Set
source.binlog.startup-modetoINITIALto perform an initial full read of the table before switching to incremental consumption of the Binlog. -
If the
startTimeparameter is set, either directly or by selecting a start time in the startup UI, it takes precedence and setsbinlogStartUpModetotimestampmode, overriding other mode settings, because thestartTimeparameter has a higher priority.
VVR 8+
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'binlogStartUpMode' = 'initial', --Reads all historical data, then incrementally consumes the Binlog.
'binlogMaxRetryTimes' = '10', --Number of retries if an error occurs while reading Binlog data.
'binlogRetryIntervalMs' = '500', --Retry interval in milliseconds after a Binlog read error.
'binlogBatchReadSize' = '100' --Number of rows to read from the Binlog in a single batch.
);
-
Set
binlogStartUpModetoinitialto perform an initial full read of the table before switching to incremental consumption of the Binlog. -
If the
startTimeparameter is set, either directly or by selecting a start time in the startup UI, it takes precedence and setsbinlogStartUpModetotimestampmode, overriding other mode settings, because thestartTimeparameter has a higher priority.
Primary key conflict resolution
The connector offers three strategies for handling duplicate primary keys on write.
VVR 11+
You can specify a strategy by setting the sink.on-conflict-action parameter.
|
Value |
Description |
|
INSERT_OR_IGNORE |
Keeps the first-arriving record and ignores subsequent duplicates. |
|
INSERT_OR_REPLACE |
Overwrites the existing record with the new one. |
|
INSERT_OR_UPDATE (Default) |
Updates only the columns provided in the sink table, leaving other columns in the existing record unchanged. |
VVR 8+
You can specify a strategy by setting the mutatetype parameter.
|
Value |
Description |
|
insertorignore (Default) |
Keeps the first-arriving record and ignores subsequent duplicates. |
|
insertorreplace |
Overwrites the existing record with the new one. |
|
insertorupdate |
Updates only the columns provided in the sink table, leaving other columns in the existing record unchanged. |
For example, assume a table has columns a, b, c, and d, and a is the primary key. If the sink table provides only columns a and b, setting the strategy to INSERT_OR_UPDATE updates only column b. Columns c and d remain unchanged.
However, any columns in the physical table that are omitted from the sink table must be nullable. Otherwise, the write operation will fail.
Write to partitioned tables
By default, the Hologres sink writes data to a single, non-partitioned table. To write to a partitioned table by targeting its parent table, you must enable the following options.
VVR 11+
To allow the connector to automatically create a child partition if it does not exist, set sink.create-missing-partition to true.
-
VVR 11.1 and later versions support writing to partitioned tables by default and automatically route data to the correct child partitions.
-
Set the
tablenameparameter to the name of the parent table. -
If a required child partition does not exist and
sink.create-missing-partition=trueis not set, the write operation will fail.
VVR 8+
-
To automatically route data to the corresponding child partitions, set
partitionRoutertotrue. -
To allow the connector to automatically create a child partition if it does not exist, set
createparttabletotrue.
-
Set the
tablenameparameter to the name of the parent table. -
If a required child partition does not exist and
createparttable=trueis not set, the write operation will fail.
Merge streams and partial updates
When writing multiple streams to a single Hologres wide table, the connector merges records with the same primary key. Partial updates write only modified columns instead of replacing the entire row, improving write performance and data consistency.
Limitations
-
The wide table must have a primary key.
-
Each data stream must include all columns that constitute the primary key.
-
For wide tables that use column-oriented storage, merging streams at a high rate of requests per second (RPS) can cause high CPU usage. To mitigate this, consider disabling dictionary encoding for the table's columns.
Example
Assume you have two Flink data streams. The first stream contains columns a, b, and c. The second stream contains columns a, d, and e. The Hologres wide table, WIDE_TABLE, contains columns a, b, c, d, and e, where column a is the primary key.
VVR 11+
// source1 and source2 are already defined.
CREATE TEMPORARY TABLE hologres_sink ( -- Declare columns a, b, c, d, and e.
a BIGINT,
b STRING,
c STRING,
d STRING,
e STRING,
primary key(a) not enforced
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourWideTablename>', -- The Hologres wide table, which contains columns a, b, c, d, and e.
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'sink.on-conflict-action'='INSERT_OR_UPDATE', -- Update specific columns based on the primary key.
'sink.delete-strategy'='IGNORE_DELETE', -- Strategy for handling retraction messages. IGNORE_DELETE is suitable for append-only or upsert streams where delete operations are not required.
'sink.partial-insert.enabled'='true' -- Enables partial updates. Only the columns specified in the INSERT statement are sent to the connector.
);
BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1; -- Declare that only columns a, b, and c are inserted.
INSERT INTO hologres_sink(a,d,e) select * from source2; -- Declare that only columns a, d, and e are inserted.
END;
VVR 8+
// source1 and source2 are already defined.
CREATE TEMPORARY TABLE hologres_sink ( -- Declare columns a, b, c, d, and e.
a BIGINT,
b STRING,
c STRING,
d STRING,
e STRING,
primary key(a) not enforced
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourWideTablename>', -- The Hologres wide table, which contains columns a, b, c, d, and e.
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'mutatetype'='insertorupdate', -- Update specific columns based on the primary key.
'ignoredelete'='true', -- Ignore DELETE requests generated by retraction messages.
'partial-insert.enabled'='true' -- Enable partial updates to update only the columns declared in the INSERT statement.
);
BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1; -- Declare that only columns a, b, and c are inserted.
INSERT INTO hologres_sink(a,d,e) select * from source2; -- Declare that only columns a, d, and e are inserted.
END;
Set ignoredelete to true to ignore Delete requests that are generated by retraction messages. For VVR 8.0.8 and later, we recommend that you use sink.delete-strategy to configure various strategies for partial updates.
Consume binlog from partitioned tables (Beta)
The Hologres connector supports consuming binlog from both physical and logical partitioned tables. For the differences between them, see CREATE LOGICAL PARTITION TABLE.
Consume binlog from physical partitioned tables
The Hologres connector can consume binlog from a partitioned table and dynamically monitor new partitions within a single job. This significantly improves real-time data processing efficiency and usability.
Usage notes
-
This feature is available only for binlog source tables in JDBC mode. It requires VVR 8.0.11 or later and a Hologres instance of version 2.1.27 or later.
-
The partition name must follow the dynamic partitioning naming convention:
{parent_table}_{partition_value}. Non-conforming partitions might not be consumed.Important-
For DYNAMIC mode, VVRversion 8.0.11 does not support partition columns with a
-delimiter (such as YYYY-MM-DD). -
Starting from VVR 11.1, you can consume data from partition columns that use a custom format.
-
This restriction does not apply when you write to partitioned tables.
-
-
When declaring a Hologres source table in Flink, you must include its partition columns.
-
In DYNAMIC mode, a partitioned table must have dynamic partitioning enabled. In addition, the partition pre-creation parameter
auto_partitioning.num_precreatemust be greater than 1. Otherwise, the job will throw an exception when it attempts to consume the latest partition. -
In DYNAMIC mode, after a new partition is added, the connector no longer consumes subsequent data changes from older partitions.
Examples
|
Mode |
Features |
Description |
|
DYNAMIC |
dynamic partition consumption |
Automatically monitors and consumes new partitions in chronological order. This mode is suitable for real-time data streaming scenarios. |
|
STATIC |
static partition consumption |
Consumes only existing partitions (or manually specified ones) and does not automatically discover new partitions. This mode is suitable for processing historical data within a fixed range. |
Dynamic mode
VVR 11+
Assume a Hologres partitioned table is created with the following DDL, and that binlog and dynamic partitioning are enabled.
CREATE TABLE "test_message_src1" (
id int,
title text,
body text,
dt text,
PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
binlog_level = 'replica',
auto_partitioning_enable = 'true', -- Enables dynamic partitioning.
auto_partitioning_time_unit = 'DAY', -- Partitions are created daily. Example partition names: test_message_src1_20250512, test_message_src1_20250513.
auto_partitioning_num_precreate = '2' -- Pre-creates two partitions.
);
-- For an existing partitioned table, you can also enable dynamic partitioning using ALTER TABLE.
In Flink, use the following SQL statement to consume the partitioned table test_message_src1 in DYNAMIC mode.
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
dt VARCHAR -- The partition column of the Hologres partitioned table.
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src1', -- The parent table for which dynamic partitioning is enabled.
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'source.binlog.partition-binlog-mode' = 'DYNAMIC', -- Dynamically monitors the latest partitions.
'source.binlog.startup-mode' = 'initial' -- Consumes all existing data, then consumes incremental data from the binlog.
);
VVR 8.0.11
Assume a Hologres partitioned table is created with the following DDL, and that binlog and dynamic partitioning are enabled.
CREATE TABLE "test_message_src1" (
id int,
title text,
body text,
dt text,
PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
binlog_level = 'replica',
auto_partitioning_enable = 'true', -- Enables dynamic partitioning.
auto_partitioning_time_unit = 'DAY', -- Partitions are created daily. Example partition names: test_message_src1_20241027, test_message_src1_20241028.
auto_partitioning_num_precreate = '2' -- Pre-creates two partitions.
);
-- For an existing partitioned table, you can also enable dynamic partitioning using ALTER TABLE.
In Flink, use the following SQL statement to consume data from the partitioned table test_message_src1 in DYNAMIC mode.
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
dt VARCHAR -- The partition column of the Hologres partitioned table.
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src1', -- The parent table for which dynamic partitioning is enabled.
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'binlog' = 'true',
'partition-binlog.mode' = 'DYNAMIC', -- Dynamically monitors the latest partitions.
'binlogstartUpMode' = 'initial', -- Consumes all existing data, then consumes incremental data from the binlog.
'sdkMode' = 'jdbc_fixed' -- Use this mode to avoid connection limit issues.
);
Static mode
VVR 11+
Assume a Hologres partitioned table is created with the following DDL, and binlog is enabled.
CREATE TABLE test_message_src2 (
id int,
title text,
body text,
color text,
PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');
In Flink, use the following SQL statement to consume the partitioned table test_message_src2 in STATIC mode.
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
color VARCHAR -- The partition column of the Hologres partitioned table.
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src2', -- The partitioned table.
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'source.binlog.partition-binlog-mode' = 'STATIC', -- Consumes a fixed set of partitions.
'source.binlog.partition-values-to-read' = 'red,blue,green', -- Consumes only the three specified partitions. The 'black' partition is not consumed. New partitions are also not consumed. If this option is not set, the job consumes all partitions of the parent table.
'source.binlog.startup-mode' = 'initial' -- Consumes all existing data, then consumes incremental data from the binlog.
);
VVR 8.0.11
Assume a Hologres partitioned table is created with the following DDL, and binlog is enabled.
CREATE TABLE test_message_src2 (
id int,
title text,
body text,
color text,
PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');
In Flink, use the following SQL statement to consume data from the partitioned table test_message_src2 in STATIC mode.
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
color VARCHAR -- The partition column of the Hologres partitioned table.
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src2', -- The partitioned table.
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'binlog' = 'true',
'partition-binlog.mode' = 'STATIC', -- Consumes a fixed set of partitions.
'partition-values-to-read' = 'red,blue,green', -- Consumes only the three specified partitions. The 'black' partition is not consumed. New partitions are also not consumed. If this option is not set, the job consumes all partitions of the parent table.
'binlogstartUpMode' = 'initial', -- Consumes all existing data, then consumes incremental data from the binlog.
'sdkMode' = 'jdbc_fixed' -- Use this mode to avoid connection limit issues.
);
Consume binlog from logical partitioned tables
The Hologres connector supports consuming binlog from logical partitioned tables and lets you specify which partitions to consume with options.
Usage notes
-
Consuming binlog from specific partitions of a logical partitioned table requires VVR 11.0.0 or later and a Hologres instance of version V3.1 or later.
-
Consuming binlog from all partitions of a logical partitioned table follows the same approach as a non-partitioned table (Source tables).
Examples
|
Parameter |
Description |
Example |
|
source.binlog.logical-partition-filter-column-names |
The partition column names that specify which partitions to consume. Enclose column names in double quotation marks ("). Separate multiple column names with commas (,). If a column name contains a double quotation mark, escape it with an additional double quotation mark. |
'source.binlog.logical-partition-filter-column-names'='"Pt","id"' Two partition columns are used: Pt and id. |
|
source.binlog.logical-partition-filter-column-values |
The partition values that specify which partitions to consume. A partition is specified by a set of values, one for each partition column. Enclose each value in double quotation marks ("). Separate values for the same partition with a comma (,). Separate partitions with a semicolon (;). If a value contains a double quotation mark, escape it with an additional double quotation mark. |
'source.binlog.logical-partition-filter-column-values'='"20240910","0";"special""value","9"' This specifies two partitions to consume. The first partition value is (20240910, 0), and the second is (special"value, 9). |
Assume that you have created the following table in Hologres.
CREATE TABLE holo_table (
id int not null,
name text,
age numeric(18,4),
"Pt" text,
primary key(id, "Pt")
)
LOGICAL PARTITION BY LIST ("Pt", id)
WITH (
binlog_level ='replica'
);
To consume the binlog of this table in Flink:
CREATE TEMPORARY TABLE test_src_binlog_table(
id INTEGER,
name VARCHAR,
age decimal(18,4),
`Pt` VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='holo_table',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'source.binlog'='true',
'source.binlog.logical-partition-filter-column-names'='"Pt","id"',
'source.binlog.logical-partition-filter-column-values'='<yourPartitionColumnValues>',
'source.binlog.change-log-mode'='ALL', --Reads all changelog types, including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER.
'retry-count'='10', -- The number of retries for binlog read errors.
'retry-sleep-step-ms'='5000', --The incremental backoff time between retries. The first retry waits 5 seconds, the second waits 10 seconds, and so on.
'source.binlog.batch-size'='512' --The number of rows to read from the binlog in a single batch.
);
DataStream API
To read from or write to Hologres using the DataStream API, set up the corresponding DataStream connector (How to use DataStream connectors). The Hologres DataStream connector is available in Maven Central. For local debugging, use the Uber JAR (Run and debug jobs that contain connectors locally).
Hologres source table
Binlog source table
VVR provides the HologresBinlogSource class to read Hologres binlog data. The following example shows how to build a Hologres binlog source.
VVR 11.3+
Starting from VVR 11.1.2, the JDBCOptions and startTimeMs parameters have been removed from the HologresBinlogSource constructor. Starting from VVR 11.3, a List<Subscribe.BinlogFilter> parameter has been added. If you use VVR 11 or later, we recommend that you use VVR 11.3 or later.
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Initialize the schema for the table to read. The schema must match the fields of the Hologres table schema. You can define a subset of the fields.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// The name of the table to read.
String sourceTableName = "sourceTableName";
// Parameters for the Hologres connection.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, sourceTableName);
config.set(HologresConfigs.BINLOG, true);
config.set(HologresConfigs.BINLOG_CHANGE_LOG_MODE, BinlogChangeLogMode.ALL);
// Build the Hologres binlog source.
HologresBinlogSource source = new HologresBinlogSource(
new HologresConnectionParam(config),
schema,
config,
StartupMode.INITIAL,
sourceTableName,
"",
Collections.emptyList(),
-1,
Collections.emptySet(),
Collections.emptyList()
);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}
VVR 8.0.11+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Initialize the schema for the table to read. The schema must match the fields of the Hologres table schema. You can define a subset of the fields.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// Parameters for the Hologres connection.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// Build JDBCOptions.
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// Build the Hologres binlog source.
long startTimeMs = 0;
HologresBinlogSource source = new HologresBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.INITIAL,
"",
"",
-1,
Collections.emptySet(),
new ArrayList<>()
);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}
VVR 8.0.7+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Initialize the schema for the table to read. The schema must match the fields of the Hologres table schema. You can define a subset of the fields.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// Parameters for the Hologres connection.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// Build JDBCOptions.
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// Build the Hologres binlog source.
long startTimeMs = 0;
HologresBinlogSource source = new HologresBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.INITIAL,
"",
"",
-1,
Collections.emptySet()
);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}
VVR 6.0.7+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Initialize the schema for the table to read. The schema must match the fields of the Hologres table schema. You can define a subset of the fields.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Parameters for the Hologres connection.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// Build JDBCOptions.
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// Set or create the default slot name.
config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));
boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE) && config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
// Build the JDBCBinlogRecordConverter.
JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
jdbcOptions.getTable(),
schema,
new HologresConnectionParam(config),
cdcMode,
Collections.emptySet());
// Build the Hologres binlog source.
long startTimeMs = 0;
HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.TIMESTAMP,
recordConverter,
"",
-1);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}
If you use a Flink engine version earlier than 8.0.5 or Hologres earlier than 2.1, ensure the user is a superuser or has the Replication Role (Hologres permission issues).
Non-binlog source table
VVR provides the HologresBulkreadInputFormat class, an implementation of RichInputFormat, to read data from Hologres tables. The following example shows how to build a Hologres source.
public class Sample {
public static void main(String[] args) throws Exception {
// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Initialize the schema for the table to read. The schema must match the fields of the Hologres table schema. You can define a subset of the fields.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// Parameters for the Hologres connection.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
// Build JDBCOptions.
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(
new HologresConnectionParam(config),
jdbcOptions,
schema,
"",
-1);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo).print();
env.execute();
}
}
Maven dependency
The Hologres DataStream connector is available in Maven Central.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-hologres</artifactId>
<version>${vvr-version}</version>
</dependency>
Hologres sink table
VVR 11+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Initialize the schema for the table to write to. The schema must match the fields of the Hologres table schema. You can define a subset of the fields.
TableSchema tableSchema = TableSchema.builder()
.field("a", DataTypes.INT().notNull())
.field("b", DataTypes.STRING())
.primaryKey("a")
.build();
// Parameters for the Hologres connection.
Configuration config = new Configuration();
config.set(HologresConfigs.ENDPOINT, "yourEndpoint");
config.set(HologresConfigs.USERNAME, "yourUserName");
config.set(HologresConfigs.PASSWORD, "yourPassword");
config.set(HologresConfigs.DATABASE, "yourDatabaseName");
config.set(HologresConfigs.TABLE, "yourTableName");
HologresConnectionParam connectionParam = new HologresConnectionParam(config);
HologresTableSchema hologresTableSchema =
HologresTableSchema.get(connectionParam.getJDBCOptions());
// The indexes of the columns to write to the sink.
Integer[] targetColumnIndexes = {0, 1};
// Build the Hologres sink.
HologresSinkFunction sinkFunction =
new HologresSinkFunction(
connectionParam, tableSchema, targetColumnIndexes, hologresTableSchema);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(tableSchema.toRowDataType().getLogicalType());
env.fromElements((RowData) GenericRowData.of(101, StringData.fromString("name"))).returns(typeInfo).addSink(sinkFunction);
env.execute();
}
}
VVR 8+
public class Sample {
public static void main(String[] args) throws Exception {
// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Initialize the schema for the table to write to. The schema must match the fields of the Hologres table schema. You can define a subset of the fields.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.build();
// Parameters for the Hologres connection.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
// Build a Hologres writer to write data as RowData.
AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
hologresConnectionParam,
schema,
HologresTableSchema.get(hologresConnectionParam),
new Integer[0]);
// Build the Hologres sink.
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.fromElements((RowData) GenericRowData.of(101, StringData.fromString("name"))).returns(typeInfo).addSink(sinkFunction);
env.execute();
}
}
Metadata columns
VVR 8.0.11 and later support metadata columns in binlog source tables. Declare binlog fields such as hg_binlog_event_type as metadata columns to access source database name, table name, change type, and event timestamp for custom logic (for example, filtering DELETE events).
|
Parameter |
Type |
Description |
|
db_name |
STRING NOT NULL |
The name of the database that contains the row. |
|
table_name |
STRING NOT NULL |
The name of the table that contains the row. |
|
hg_binlog_lsn |
BIGINT NOT NULL |
A system field for the binlog sequence number. The value increases monotonically but not contiguously within a shard. Uniqueness and order are not guaranteed across shards. |
|
hg_binlog_timestamp_us |
BIGINT NOT NULL |
The timestamp of the change event in the database, in microseconds (us). |
|
hg_binlog_event_type |
BIGINT NOT NULL |
The change type of the row. Valid values are:
|
|
hg_shard_id |
INT NOT NULL |
The ID of the data shard that contains the row (table group and shard). |
In a DDL statement, you can declare a metadata column by using <meta_column_name> <datatype> METADATA VIRTUAL. The following is an example:
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn bigint METADATA VIRTUAL
hg_binlog_event_type bigint METADATA VIRTUAL
hg_binlog_timestamp_us bigint METADATA VIRTUAL
hg_shard_id int METADATA VIRTUAL
db_name string METADATA VIRTUAL
table_name string METADATA VIRTUAL
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
...
);
FAQ
References
-
To learn how to create and use Hologres catalogs, see Manage Hologres catalogs.
-
Hologres integrates with Flink to provide a unified solution for building a real-time data warehouse. For details, see Build a real-time data warehouse with Hologres.
-
Hologres supports efficient data updates and corrections, making it suitable for building wide tables in multi-stream write scenarios. For an example, see User behavior analysis with Flink, MongoDB, and Hologres.