您可以在一个作业(JOB)中通过Flink SQL开源语法实现业务的流计算处理逻辑,包括过滤、转换、增强和聚合,并将计算结果写入Lindorm中。
引擎与版本
CREATE JOB仅适用于流引擎。要求3.1.8及以上版本。
说明
您可以通过控制台查看并升级小版本。
语法
delimiter $$
create_job_statement ::= CREATE JOB job_name
'('
flink_sqls
')'
$$
delimiter ;
使用说明
Flink任务名称(job_name)
必填参数。Flink任务名称的设置需遵循以下规则:
可包含数字、大写英文字符、小写英文字符、半角句号(.)、中划线(-)和下划线(_)。
不能以半角句号(.)或中划线(-)开头。
长度为1~255字符。
Flink SQL语句(flink_sqls)
必填参数。用于描述计算逻辑。详细语法,请参见Flink社区文档。
示例
打印随机生成的数据。
delimiter $$
CREATE JOB datagen_job (
SET 'parallelism.default' = '6';
CREATE TABLE datagen (
f_sequence INT,
f_random INT,
f_random_str STRING,
ts AS localtimestamp,
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'datagen',
-- optional options --
'rows-per-second'='5',
'fields.f_sequence.kind'='sequence',
'fields.f_sequence.start'='1',
'fields.f_sequence.end'='50000000',
'fields.f_random.min'='1',
'fields.f_random.max'='500',
'fields.f_random_str.length'='10'
);
CREATE TABLE print_table (
f_sequence INT,
f_random INT,
f_random_str STRING
) WITH (
'connector' = 'print'
);
INSERT INTO print_table select f_sequence,f_random,f_random_str from datagen;
)
$$
delimiter ;
结果验证
您可以执行SHOW JOBS;
语句来查看是否创建成功。
该文章对您有帮助吗?