Write data

更新时间:
复制 MD 格式

This topic describes how to write, update, and delete data in a Fluss table.

Write data

  • Primary key tables can process all types of messages, such as INSERT, UPDATE, and DELETE.

  • Log tables accept only INSERT messages. You cannot update or delete records.

  1. Log in to the Real-time Compute for Apache Flink console.

  2. In the Actions column of the desired Streaming Compute Flink workspace, click Console.

  3. In the navigation pane on the left, choose Data Development > ETL > New Streaming Job.

Primary table creation

CREATE TEMPORARY TABLE source
(
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
total_amount INT,
PRIMARY KEY (shop_id, user_id) NOT ENFORCED
)
WITH ('connector' = 'datagen');

# Write data to specific columns.
INSERT INTO `fluss-catalog`.`my_db`.`my_pk_tbl` (shop_id, user_id, num_orders)
SELECT shop_id, user_id, num_orders FROM source;

# Write data to all columns.
INSERT INTO `fluss-catalog`.`my_db`.`my_pk_tbl`
SELECT * FROM source;

Log table

CREATE TEMPORARY TABLE source (
order_id BIGINT,
item_id BIGINT,
amount INT,
address STRING
) WITH ('connector' = 'datagen');

INSERT INTO `fluss-catalog`.`my_db`.`my_log_tbl` 
SELECT * FROM source;

Update data

  • You can update data only in primary key tables.

  • You can update only a single record at a time based on an equivalent condition for the primary key.

  1. In the navigation pane on the left, choose Data Development > Data Query > New Temporary Query Script.

  2. Write and run the SQL statement.

    -- Updates are supported only based on an equivalent condition for the primary key.
    UPDATE `my-catalog`.`my_db`.`my_pk_tbl` SET total_amount = 2 WHERE shop_id = 10000 and user_id = 123456;

Delete data

  • You can delete data only from primary key tables.

  • You can delete only a single record at a time based on an equivalent condition for the primary key.

  1. In the navigation pane on the left, choose Data Development > Data Query > New Temporary Query Script.

  2. Write and run the SQL statement.

    -- Deletions are supported only based on an equivalent condition for the primary key.
    DELETE FROM `my-catalog`.`my_db`.`my_pk_tbl` WHERE shop_id = 10000 and user_id = 123456;