You can use the MaxCompute Flink connector to write Flink data to standard and delta tables in MaxCompute, which simplifies data ingestion. This topic describes the connector's capabilities and outlines the procedures for writing data.
Background information
Supported write modes
The Flink connector supports two write modes:
upsertandinsert. Inupsertmode, you can group data streams in one of the following ways:Group by primary key
Group by partition field
While suitable for a large number of partitions, grouping by partition field may cause data skew.
For the
upsertwrite procedure and recommended parameters, see Real-time data ingestion into a data warehouse.Specify the write mode using Flink connector parameters. For a complete list of connector parameters, see Appendix: Flink connector parameters.
Set the checkpoint interval for Flink upsert write jobs to at least 3 minutes. Shorter intervals can reduce write efficiency and create a large number of small files.
The following table maps data types between Realtime Compute for Apache Flink and MaxCompute.
Flink data type
MaxCompute data type
CHAR(p)
CHAR(p)
VARCHAR(p)
VARCHAR(p)
STRING
STRING
BOOLEAN
BOOLEAN
TINYINT
TINYINT
SMALLINT
SMALLINT
INT
INT
BIGINT
BIGINT
FLOAT
FLOAT
DOUBLE
DOUBLE
DECIMAL(p, s)
DECIMAL(p, s)
DATE
DATE
TIMESTAMP(9) WITHOUT TIME ZONE, TIMESTAMP_LTZ(9)
TIMESTAMP
TIMESTAMP(3) WITHOUT TIME ZONE, TIMESTAMP_LTZ(3)
DATETIME
BYTES
BINARY
ARRAY<T>
ARRAY<T>
MAP<K, V>
MAP<K, V>
ROW
STRUCT
NoteThe Flink TIMESTAMP data type does not include time zone information, whereas the MaxCompute TIMESTAMP data type does. This difference can cause an 8-hour time discrepancy. To align the timestamps, use TIMESTAMP_LTZ(9).
-- Flink SQL CREATE TEMPORARY TABLE odps_source( id BIGINT NOT NULL COMMENT 'ID', created_time TIMESTAMP NOT NULL COMMENT 'Creation time', updated_time TIMESTAMP_LTZ(9) NOT NULL COMMENT 'Update time', PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'maxcompute', ... );
Write data from a self-managed Flink cluster
Prerequisites: Create a MaxCompute table.
You must first create a MaxCompute table for Flink to write data to. The following example demonstrates this process by creating two tables (a non-partitioned delta table and a partitioned table). For information about table property settings, see delta table parameters.
-- Create a non-partitioned delta table. CREATE TABLE mf_flink_tt ( id BIGINT not null, name STRING, age INT, status BOOLEAN, primary key (id) ) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ; --Create a partitioned delta table. CREATE TABLE mf_flink_tt_part ( id BIGINT not null, name STRING, age INT, status BOOLEAN, primary key (id) ) partitioned by (dd string, hh string) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ;Set up an open source Flink cluster. The connector supports Flink 1.13, 1.15, 1.16, and 1.17. Select the Flink connector that matches your Flink version:
NoteThe Flink connector for Flink 1.16 is compatible with Flink 1.17.
This topic uses the Flink connector for Flink 1.13 as an example. Download and decompress the package.
Download the Flink connector and add it to your Flink cluster package.
Download the Flink connector JAR package to your local environment.
Add the Flink connector JAR package to the lib directory of the decompressed Flink installation package.
mv flink-connector-odps-1.13-shaded.jar $FLINK_HOME/lib/flink-connector-odps-1.13-shaded.jar
Start the Flink instance service.
cd $FLINK_HOME/bin ./start-cluster.shStart the Flink client.
cd $FLINK_HOME/bin ./sql-client.shCreate a Flink table and configure the Flink connector parameters.
You can create a Flink table and configure its parameters using either Flink SQL or the DataStream API. The following sections provide core examples for both approaches.
Flink SQL
In the Flink SQL editor, run the following commands to create a table and configure parameters.
-- Register a non-partitioned table in Flink SQL. CREATE TABLE mf_flink ( id BIGINT, name STRING, age INT, status BOOLEAN, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_tt', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI****************', 'odps.access.key'='********************', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj' ); -- Register a partitioned table in Flink SQL. CREATE TABLE mf_flink_part ( id BIGINT, name STRING, age INT, status BOOLEAN, dd STRING, hh STRING, PRIMARY KEY(id) NOT ENFORCED ) PARTITIONED BY (`dd`,`hh`) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_tt_part', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI****************', 'odps.access.key'='********************', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj' );Write data to the Flink table and query the MaxCompute table to verify the result.
-- Insert data into the non-partitioned table in the Flink SQL client. INSERT INTO mf_flink VALUES (1,'Danny',27, false); -- Query result in MaxCompute. SELECT * FROM mf_flink_tt; +------------+------+------+--------+ | id | name | age | status | +------------+------+------+--------+ | 1 | Danny | 27 | false | +------------+------+------+--------+ -- Insert data into the non-partitioned table in the Flink SQL client to update the record. INSERT INTO mf_flink VALUES (1,'Danny',28, false); -- Query result in MaxCompute. SELECT * FROM mf_flink_tt; +------------+------+------+--------+ | id | name | age | status | +------------+------+------+--------+ | 1 | Danny | 28 | false | +------------+------+------+--------+ -- Insert data into the partitioned table in the Flink SQL client. INSERT INTO mf_flink_part VALUES (1,'Danny',27, false, '01','01'); -- Query result in MaxCompute. SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01; +------------+------+------+--------+----+----+ | id | name | age | status | dd | hh | +------------+------+------+--------+----+----+ | 1 | Danny | 27 | false | 01 | 01 | +------------+------+------+--------+----+----+ -- Insert data into the partitioned table in the Flink SQL client to update the record. INSERT INTO mf_flink_part VALUES (1,'Danny',30, false, '01','01'); -- Query result in MaxCompute. SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01; +------------+------+------+--------+----+----+ | id | name | age | status | dd | hh | +------------+------+------+--------+----+----+ | 1 | Danny | 30 | false | 01 | 01 | +------------+------+------+--------+----+----+
DataStream API
To use the DataStream API, add the following dependency.
<dependency> <groupId>com.aliyun.odps</groupId> <artifactId>flink-connector-maxcompute</artifactId> <version>xxx</version> <scope>system</scope> <systemPath>${mvn_project.basedir}/lib/flink-connector-maxcompute-xxx-shaded.jar</systemPath> </dependency>NoteReplace "xxx" with the actual version number.
The following sample code shows how to create a table and configure parameters.
package com.aliyun.odps.flink.examples; import org.apache.flink.configuration.Configuration; import org.apache.flink.odps.table.OdpsOptions; import org.apache.flink.odps.util.OdpsConf; import org.apache.flink.odps.util.OdpsPipeline; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.data.RowData; public class Examples { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(120 * 1000); StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env); Table source = streamTableEnvironment.sqlQuery("SELECT * FROM source_table"); DataStream<RowData> input = streamTableEnvironment.toAppendStream(source, RowData.class); Configuration config = new Configuration(); config.set(OdpsOptions.SINK_OPERATION, "upsert"); config.set(OdpsOptions.UPSERT_COMMIT_THREAD_NUM, 8); config.set(OdpsOptions.UPSERT_MAJOR_COMPACT_MIN_COMMITS, 100); OdpsConf odpsConfig = new OdpsConf("accessid", "accesskey", "endpoint", "project", "tunnel endpoint"); OdpsPipeline.Builder builder = OdpsPipeline.builder(); builder.projectName("sql2_isolation_2a") .tableName("user_ledger_portfolio") .partition("") .configuration(config) .odpsConf(odpsConfig) .sink(input, false); env.execute(); } }
Write data from fully managed Flink
Prerequisites: Create a MaxCompute table.
You must create a target MaxCompute table for the Flink data. The following example shows how to create a delta table.
SET odps.sql.type.system.odps2=true; DROP TABLE mf_flink_upsert; CREATE TABLE mf_flink_upsert ( c1 int not null, c2 string, gt timestamp, primary key (c1) ) PARTITIONED BY (ds string) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ;The Flink connector is pre-loaded on fully managed Flink, so no manual installation is required. You can view connector details in the Realtime Compute for Apache Flink console.
Create a Flink table, construct real-time Flink data by using a Flink SQL job, and then deploy the job after development.
On the Flink job development page, create and edit a SQL job. The following example defines a source table that generates random data, a result table that connects to MaxCompute, and an INSERT statement to transfer the data. For more information about how to develop a SQL job, see Job development map.
-- Create a Flink source table. CREATE TEMPORARY TABLE fake_src_table ( c1 int, c2 VARCHAR, gt AS CURRENT_TIMESTAMP ) WITH ( 'connector' = 'faker', 'fields.c2.expression' = '#{superhero.name}', 'rows-per-second' = '100', 'fields.c1.expression' = '#{number.numberBetween ''0'',''1000''}' ); -- Create a temporary result table in Flink. CREATE TEMPORARY TABLE test_c_d_g ( c1 int, c2 VARCHAR, gt TIMESTAMP, ds varchar, PRIMARY KEY(c1) NOT ENFORCED ) PARTITIONED BY(ds) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_upsert', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI****************', 'odps.access.key'='********************', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj', 'upsert.write.bucket.num'='64' ); -- Flink computation logic. INSERT INTO test_c_d_g SELECT c1 AS c1, c2 AS c2, gt AS gt, date_format(gt, 'yyyyMMddHH') AS ds FROM fake_src_table;Parameters:
odps.end.point: Use the internal network endpoint of the corresponding region.upsert.write.bucket.num: This value must be consistent with the value of the write.bucket.num property of the delta table created in MaxCompute.Query the MaxCompute table to verify that the data was written.
SELECT * FROM mf_flink_upsert WHERE ds=2023061517; -- Your results may differ because the source data is randomly generated. +------+----+------+----+ | c1 | c2 | gt | ds | +------+----+------+----+ | 0 | Skaar | 2023-06-16 01:59:41.116 | 2023061517 | | 21 | Supah Century | 2023-06-16 01:59:59.117 | 2023061517 | | 104 | Dark Gorilla Grodd | 2023-06-16 01:59:57.117 | 2023061517 | | 126 | Leader | 2023-06-16 01:59:39.116 | 2023061517 |
Appendix: Flink connector parameters
Basic parameters
Parameter
Required
Default value
Description
connector
Yes
—
Set the connector type to
MaxCompute.odps.project.name
Yes
—
The MaxCompute project name.
odps.access.id
Yes
—
Your account's AccessKey ID. See the AccessKey Pair page.
odps.access.key
Yes
—
Your account's AccessKey secret. See the AccessKey Pair page.
odps.end.point
Yes
—
The MaxCompute endpoint. For a list of regional endpoints, see Endpoints.
odps.tunnel.end.point
No
—
The public endpoint of the Tunnel service. By default, requests are automatically routed to the appropriate Tunnel endpoint. Set this parameter to use a specific endpoint and disable automatic routing.
For more information about the Tunnel endpoints in different regions and networks, see Endpoints.
odps.tunnel.quota.name
No
—
The name of the Tunnel quota that is used to access MaxCompute.
table.name
Yes
—
The name of the MaxCompute table in the format
[project.][schema.]table.odps.namespace.schema
No
false
Specifies whether to use the three-layer model. For more information about the three-layer model, see Schema operations.
sink.operation
Yes
insert
The write type. Valid values are
insertorupsert.NoteThe
upsertmode is supported only for MaxCompute delta tables.sink.parallelism
No
—
The write parallelism. If not set, this defaults to the upstream source's parallelism.
NoteEnsure that the table property
write.bucket.numis an integer multiple of the configuration value for optimal write performance and maximum memory savings on the Sink node.sink.meta.cache.time
No
400
The metadata cache size.
sink.meta.cache.expire.time
No
1200
The cache expiration time for metadata, in seconds.
sink.coordinator.enable
No
true
Specifies whether to enable coordinator mode.
Partition parameters
Parameter
Required
Default value
Description
sink.partition
No
—
The name of the partition to which data is written.
If you use dynamic partitioning, this parameter specifies the name of the parent partition of the dynamic partitions.
sink.partition.default-value
No
__DEFAULT_PARTITION__
The default partition name when dynamic partitioning is used.
sink.dynamic-partition.limit
No
100
The maximum number of partitions that can be concurrently written to in a single checkpoint during dynamic partitioning.
NoteIncreasing this value significantly can cause out-of-memory (OOM) errors on the sink node. If the number of concurrent partitions exceeds this limit, the job will fail.
sink.group-partition.enable
No
false
Specifies whether to group by partition when using dynamic partitioning.
sink.partition.assigner.class
No
—
The
PartitionAssignerimplementation class.FileCached mode write parameters
Use file cache mode for jobs with a large number of dynamic partitions. The following parameters configure this mode.
Parameter
Required
Default value
Description
sink.file-cached.enable
No
false
Enables FileCached mode. Recommended for jobs with a large number of dynamic partitions.
false: FileCached mode is disabled.
true: FileCached mode is enabled.
NoteWhen the number of dynamic partitions is large, you can use the file cache mode.
sink.file-cached.tmp.dirs
No
./local
The default file cache directory in FileCached mode.
sink.file-cached.writer.num
No
16
The number of concurrent data upload threads for a single task in FileCached mode.
NoteDo not significantly increase the value of this parameter. If an excessive number of partitions are concurrently written, OOM errors are likely to occur.
sink.bucket.check-interval
No
60000
The interval at which to check the file size in FileCached mode. Unit: milliseconds (ms).
sink.file-cached.rolling.max-size
No
16 M
The maximum size for a single cache file.
When a file exceeds this size, it is uploaded.
sink.file-cached.memory
No
64 M
The maximum size of off-heap memory that is used for file writes in FileCached mode.
sink.file-cached.memory.segment-size
No
128 KB
The buffer size that is used for file writes in FileCached mode.
sink.file-cached.flush.always
No
true
Specifies whether to use cache for file writes in FileCached mode.
sink.file-cached.write.max-retries
No
3
The number of retries for uploading data in FileCached mode.
InsertorUpsertwrite parametersUpsert write parameters
Parameter
Required
Default value
Description
upsert.writer.max-retries
No
3
The number of retries after an upsert writer fails to write data to a bucket.
upsert.writer.buffer-size
No
64 MB
The cache size for a single upsert writer in Flink.
NoteWhen the sum of the buffer sizes of all buckets reaches the preset threshold, the system automatically triggers a flush operation to update the data to the server.
An upsert writer writes data to multiple buckets simultaneously. We recommend that you increase the value of this parameter to improve write efficiency.
If data is written to a large number of partitions, OOM errors may occur. In this case, you can reduce the value of this parameter.
upsert.writer.bucket.buffer-size
No
1 MB
The cache size for a single bucket in Flink. If memory resources on the Flink server are insufficient, you can reduce the value of this parameter.
upsert.write.bucket.num
Yes
—
The number of buckets for the destination table must be the same as the value of
write.bucket.num.upsert.write.slot-num
No
1
The number of Tunnel slots used by a single session.
upsert.commit.max-retries
No
3
The number of retries for an upsert session commit.
upsert.commit.thread-num
No
16
The parallelism of an upsert session commit.
Do not set this value too high. A high number of concurrent commits leads to increased resource consumption, which may cause performance issues or excessive resource consumption.
upsert.major-compact.min-commits
No
100
The minimum number of commits required to trigger a major compaction.
upsert.commit.timeout
No
600
The timeout period for an upsert session commit. Unit: seconds (s).
upsert.major-compact.enable
No
false
Specifies whether to enable major compaction.
upsert.flush.concurrent
No
2
The maximum number of buckets to which data can be concurrently written in a single partition.
NoteWhen data in a bucket is flushed, a Tunnel slot is occupied.
NoteFor more information about recommended parameter configurations for upsert writes, see Recommended parameter configurations for upsert writes.
Insert write parameters
Parameter
Required
Default value
Description
insert.commit.thread-num
No
16
The parallelism of a commit session.
insert.arrow-writer.enable
No
false
Specifies whether to use the Arrow format.
insert.arrow-writer.batch-size
No
512
The maximum number of rows in an Arrow batch.
insert.arrow-writer.flush-interval
No
100000
The writer flush interval. Unit: milliseconds (ms).
insert.writer.buffer-size
No
64 MB
The cache size of the buffered writer.