文档

Create FJOB

更新时间:

FJOB(Flink Job,简称FJOB),用户可以在一个FJOB中通过Flink SQL语法实现业务的流计算处理逻辑(过滤、转换、增强、聚合),并将计算结果写入到Lindorm中。本文介绍通过Create FJOB语法在Lindorm流引擎中提交任务。

语法

CREATE FJOB fjob_name (
	flink_sqls
);  

参数说明

参数

是否必选

说明

fjob_name

创建的FJOB名字。

flink_sqls

Flink SQL用来描述计算逻辑,详细语法,请参见Flink社区文档

示例

CREATE FJOB order_compute(
   SET 'parallelism.default' = '1'; --任务并发度
   SET 'execution.checkpointing.interval' = '60000'; --checkpoint周期
  
   CREATE TABLE order_detail( 
      `biz` VARCHAR,
      `order_id` VARCHAR,
      `price` DOUBLE,
      `detail` VARCHAR,
      `timestamp` BIGINT,
      `time_ltz` AS TO_TIMESTAMP_LTZ(`timestamp`, 3),
      WATERMARK FOR `time_ltz` AS `time_ltz` - INTERVAL '5' SECOND
    ) WITH (  
      'connector'='kafka',
      'topic'='order_topic',
      'properties.group.id' = 'order_group',
      'scan.startup.mode' = 'earliest-offset',
      'properties.bootstrap.servers'='Lindorm Stream Kafka地址(xxx:30080)',
      'format'='json');    

  CREATE TABLE order_stat(
      `biz` VARCHAR,
      `window_start` TIMESTAMP(3),
      `window_end` TIMESTAMP(3),
      `total_order_price` DOUBLE,
      `count` BIGINT,
      PRIMARY KEY (biz, window_start) NOT ENFORCED
    ) WITH (  
      'connector'='lindorm',
      'tableName'='order_stat',
      'seedServer'='Lindorm 宽表地址(xxx:30020)',
      'namespace'='default',
      'userName' = 'root',
      'password' = 'root');

  INSERT INTO order_stat
  SELECT biz, window_start, window_end, SUM(price) AS total_order_price, COUNT(*) AS `count`
  FROM TABLE(TUMBLE(TABLE order_detail, DESCRIPTOR(`time_ltz`), INTERVAL '1' HOUR))
  GROUP BY biz, window_start, window_end;
);

  • 本页导读 (1)
文档反馈