Flink SQL uses standard SQL syntax to simplify ETL (Extract, Transform, Load) development and lower the skill barrier. Compared to the visual DAG mode, Flink SQL is more powerful and allows you to write advanced syntax that DAG mode does not support. This topic shows you how to configure an ETL task by using Flink SQL.
Background
This feature is being deprecated and is available for free only to users who have previously used it. New users can no longer access this feature. We recommend that you configure ETL tasks within a data synchronization or data migration instance. For more information, see Configure an ETL task in a DTS data migration or synchronization task.If you encounter any issues, join the DingTalk group (ID: 32326646) for support.
-
Before you configure an ETL task, understand the following concepts:
-
Input/Dimension Table is the source database for the ETL task.
-
Output is the destination database where ETL writes the processed data.
-
-
DTS provides a streaming ETL feature within the data synchronization process, allowing you to add transformation components between the source and destination databases. These components perform a rich set of data transformations and write the processed data to the destination in real time. For example, you can join two stream tables to create a wide table and write it to the destination database. You can also add a new field to a source table, assign values to it by using a function, and then write the transformed field to the destination database.
Prerequisites
-
You can create ETL tasks only in the following regions: China (Hangzhou), China (Shanghai), China (Qingdao), China (Beijing), China (Zhangjiakou), China (Shenzhen), China (Guangzhou), and China (Hong Kong).
-
The supported source databases are MySQL, PolarDB for MySQL, Oracle, PostgreSQL, DB2 for iSeries (AS/400), DB2 for LUW, PolarDB-X (Formerly DRDS), PolarDB PostgreSQL, MariaDB, PolarDB for Oracle, SQL Server, and PolarDB-X 2.0.
-
The supported destination databases are MySQL, PolarDB for MySQL, Oracle, AnalyticDB for MySQL V3.0, PolarDB PostgreSQL, PostgreSQL, DB2 for LUW, DB2 for iSeries (AS/400), AnalyticDB for PostgreSQL, SQL Server, MariaDB, PolarDB-X (Formerly DRDS), PolarDB for Oracle, and Tablestore.
-
The ETL feature does not support schema migration. You must create the table schema in the destination database manually based on your transformation logic. For example, if you join Table A (with fields 1, 2, and 3) and Table B (with fields 2, 3, and 4) to output only fields 2 and 3, you must first create a destination Table C that contains only field 2 and field 3.
-
The ETL feature does not support full data synchronization. You can only perform real-time transformations on incremental data.
Usage notes
-
All source and destination databases must be in the same region.
-
All Stream Table must originate from the same instance.
-
Database and table names must be unique.
-
Cross-account tasks are not currently supported.
Procedure
-
Go to the ETL task list page.
-
Log on to the DTS console.
-
In the left-side navigation pane, click ETL.
-
-
In the upper-left corner, click
. In the Create Data Flow dialog box, enter a name for the ETL task in the Data Flow Name field, and set Development Method to FlinkSQL. -
Click Confirm.
-
In the Data Flow Information section of the Streaming ETL page, add the source and destination databases.
Parameter
Description
Region
Select the region of the data source.
Type
Select the table type.
-
When you configure a source table, select Stream Table if the source is a stream table (a table that changes in real time and can be joined with a dimension table for data enrichment). Select Dimension Tables if the source is a dimension table (a table that is updated infrequently and is typically used to create wide tables by joining it with real-time data).
-
When you configure a destination table, select Output.
Database type
Select the database type of the source or destination database.
Instance
Enter the name or ID to find and select the source and destination instances.
ImportantYou must first add the source and destination instances in Data Management (DMS). For instructions, see Instance management.
Database
Select the source or destination database that contains the data to be transformed.
Physical table
Select the source or destination table that contains the data to be transformed.
Alias of physical table
Set a concise and readable alias for the source or destination table. This alias helps ETL locate the correct table when running SQL statements.
-
-
In the SQL command window on the Streaming ETL page, add the SQL statements to configure the ETL task.
This example uses the following SQL statements to join the
test_ordersstream table with theproductdimension table and insert the result into thetest_orders_newdestination table.ImportantSQL statements must be separated by semicolons (
;).CREATE TABLE `etltest_test_orders` ( `order_id` BIGINT, `user_id` BIGINT, `product_id` BIGINT, `total_price` DECIMAL(15,2), `order_date` TIMESTAMP(6), `dts_etl_schema_db_table` STRING, `dts_etl_db_log_time` BIGINT, `pt` AS PROCTIME(), WATERMARK FOR `order_date` AS `order_date` - INTERVAL '5' SECOND ) WITH ( 'streamType'= 'append', 'alias'= 'test_orders', 'vertexType'= 'stream' ); CREATE TABLE `etltest_product` ( `product_id` BIGINT, `product_name` STRING, `product_price` DECIMAL(15,2) ) WITH ( 'alias'= 'product', 'vertexType'= 'lookup' ); CREATE VIEW `etltest_test_orders_JOIN_etltest_product` AS SELECT `etltest_test_orders`.`order_id` AS `order_id`, `etltest_test_orders`.`user_id` AS `user_id`, `etltest_test_orders`.`product_id` AS `product_id`, `etltest_test_orders`.`total_price` AS `total_price`, `etltest_test_orders`.`order_date` AS `order_date`, `etltest_test_orders`.`dts_etl_schema_db_table` AS `dts_etl_schema_db_table`, `etltest_test_orders`.`dts_etl_db_log_time` AS `dts_etl_db_log_time`, `etltest_product`.`product_id` AS `product_id_0001011101`, `etltest_product`.`product_name` AS `product_name`, `etltest_product`.`product_price` AS `product_price` FROM `etltest_test_orders` LEFT JOIN `etltest_product` FOR SYSTEM_TIME AS OF `etltest_test_orders`.`pt` ON etltest_test_orders.product_id = etltest_product.product_id ; CREATE TABLE `test_orders_new` ( `order_id` BIGINT, `user_id` BIGINT, `product_id` BIGINT, `total_price` DECIMAL(15,2), `order_date` TIMESTAMP(6), `product_name` STRING, `product_price` DECIMAL(15,2) ) WITH ( 'alias'= 'test_orders_new', 'vertexType'= 'sink' ); INSERT INTO `test_orders_new` ( `order_id`, `user_id`, `product_id`, `total_price`, `order_date`, `product_name`, `product_price` ) SELECT `etltest_test_orders_JOIN_etltest_product`.`order_id`, `etltest_test_orders_JOIN_etltest_product`.`user_id`, `etltest_test_orders_JOIN_etltest_product`.`product_id`, `etltest_test_orders_JOIN_etltest_product`.`total_price`, `etltest_test_orders_JOIN_etltest_product`.`order_date`, `etltest_test_orders_JOIN_etltest_product`.`product_name`, `etltest_test_orders_JOIN_etltest_product`.`product_price` FROM `etltest_test_orders_JOIN_etltest_product`;Type
Description
Source and destination table information
-
Use the
CREATE TABLEstatement to define the source and destination tables. -
You can set three parameters in the
WITHclause of a SQL statement:streamType,alias, andvertexType. For a stream table, all three parameters are required. For dimension tables and output tables, onlyaliasandvertexTypeare required.-
streamType: The stream type. During data processing, ETL converts a stream into a dynamic table. Continuous queries run on this dynamic table, which is constantly modified byINSERT,UPDATE, andDELETEoperations, to produce a new dynamic table. When the new dynamic table is written to the destination database, DTS converts it back into a stream. You must specify this parameter to encode the changes when the dynamic table is converted into a stream.-
Upsert: An upsert stream. Data in the dynamic table can be modified by
INSERT,UPDATE, andDELETEoperations. When converted to a stream,INSERTandUPDATEoperations are encoded as upsert messages, andDELETEoperations are encoded as delete messages.NoteThis encoding requires the dynamic table to have a unique key, which can be composite.
-
append: An append-only stream. Data in the dynamic table can only be modified by
INSERToperations. When converted to a stream, only the inserted rows are emitted.
-
-
alias: The Alias of Physical Table that you set when configuring the source and destination databases in Step 4.
-
-
vertexType: The table type.-
stream: A stream table. -
lookup: A dimension table. -
sink: A destination table.
-
Data transformation logic
Use the
CREATE VIEWstatement to define the data transformation logic.Transformed destination table information
Use the
INSERT INTOstatement to write data into the transformed destination table. -
-
After configuring the source database, destination database, and SQL statements, click Generate Flink SQL Validation.
Note-
You can also click Publish to directly perform validation and a precheck.
-
If the Flink SQL validation is successful, you can click
to view the validation details. -
If the Flink SQL validation fails, you can click
, fix the SQL statement based on the prompt message, and rerun the Flink SQL validation.
-
-
After the Flink SQL validation is successful, click Publish to start the precheck.
-
After the precheck passes, click Next: Purchase (Free).
NoteIf the precheck fails, click View Details next to the failed item, fix the issue based on the details provided, and run the precheck again.
-
On the Purchase page, select an Instance Class. The number of Compute Units (CUs) is fixed at 2 during the public preview. Read and select the checkboxes for Data Transmission Service (Pay-as-you-go) Service Terms and Service Terms for Public Preview.
NoteDuring the public preview, each user can create and use two ETL instances for free.
-
Click Buy and Start to start the ETL task.