Create FJOB
FJOB(Flink Job,简称FJOB),用户可以在一个FJOB中通过Flink SQL语法实现业务的流计算处理逻辑(过滤、转换、增强、聚合),并将计算结果写入到Lindorm中。本文介绍通过Create FJOB语法在Lindorm流引擎中提交任务。
语法
CREATE FJOB fjob_name (
flink_sqls
);
参数说明
参数 | 是否必选 | 说明 |
fjob_name | 是 | 创建的FJOB名字。 |
flink_sqls | 是 | Flink SQL用来描述计算逻辑,详细语法,请参见Flink社区文档。 |
示例
CREATE FJOB order_compute(
SET 'parallelism.default' = '1'; --任务并发度
SET 'execution.checkpointing.interval' = '60000'; --checkpoint周期
CREATE TABLE order_detail(
`biz` VARCHAR,
`order_id` VARCHAR,
`price` DOUBLE,
`detail` VARCHAR,
`timestamp` BIGINT,
`time_ltz` AS TO_TIMESTAMP_LTZ(`timestamp`, 3),
WATERMARK FOR `time_ltz` AS `time_ltz` - INTERVAL '5' SECOND
) WITH (
'connector'='kafka',
'topic'='order_topic',
'properties.group.id' = 'order_group',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers'='Lindorm Stream Kafka地址(xxx:30080)',
'format'='json');
CREATE TABLE order_stat(
`biz` VARCHAR,
`window_start` TIMESTAMP(3),
`window_end` TIMESTAMP(3),
`total_order_price` DOUBLE,
`count` BIGINT,
PRIMARY KEY (biz, window_start) NOT ENFORCED
) WITH (
'connector'='lindorm',
'tableName'='order_stat',
'seedServer'='Lindorm 宽表地址(xxx:30020)',
'namespace'='default',
'userName' = 'root',
'password' = 'root');
INSERT INTO order_stat
SELECT biz, window_start, window_end, SUM(price) AS total_order_price, COUNT(*) AS `count`
FROM TABLE(TUMBLE(TABLE order_detail, DESCRIPTOR(`time_ltz`), INTERVAL '1' HOUR))
GROUP BY biz, window_start, window_end;
);