CREATE JOB

您可以在一个作业(JOB)中通过Flink SQL开源语法实现业务的流计算处理逻辑,包括过滤、转换、增强和聚合,并将计算结果写入Lindorm中。

引擎与版本

CREATE JOB仅适用于流引擎。要求3.1.8及以上版本。

说明

您可以通过控制台查看并升级小版本

语法

delimiter $$
create_job_statement ::= CREATE JOB job_name
                          '('
                              flink_sqls
                          ')'
$$
delimiter ;

使用说明

Flink任务名称(job_name

必填参数。Flink任务名称的设置需遵循以下规则:

  • 可包含数字、大写英文字符、小写英文字符、半角句号(.)、中划线(-)和下划线(_)。

  • 不能以半角句号(.)或中划线(-)开头。

  • 长度为1~255字符。

Flink SQL语句(flink_sqls

必填参数。用于描述计算逻辑。详细语法,请参见Flink社区文档

示例

打印随机生成的数据。

delimiter $$
CREATE JOB datagen_job (
  SET 'parallelism.default' = '6';
  CREATE TABLE datagen (
    f_sequence INT,
    f_random INT,
    f_random_str STRING,
    ts AS localtimestamp,
    WATERMARK FOR ts AS ts
  ) WITH (
    'connector' = 'datagen',
    -- optional options --
    'rows-per-second'='5',
    'fields.f_sequence.kind'='sequence',
    'fields.f_sequence.start'='1',
    'fields.f_sequence.end'='50000000',
    'fields.f_random.min'='1',
    'fields.f_random.max'='500',
    'fields.f_random_str.length'='10'
  );

  CREATE TABLE print_table (
    f_sequence INT,
    f_random INT,
    f_random_str STRING
    ) WITH (
    'connector' = 'print'
  );

  INSERT INTO print_table select f_sequence,f_random,f_random_str from datagen;
)
$$
delimiter ;

结果验证

您可以执行SHOW JOBS;语句来查看是否创建成功。