在DataStream作业中,您可以根据实际需求在作业开发页面配置自定义参数后,再从Main函数中获取该自定义参数。

配置方法

在DataStream作业中配置自定义参数,自定义参数格式为paramName=paramValue,其中paramName为参数名,paramValue为参数值。如下为DataStream作业开发页面默认的注释信息,您可以按照注释信息提示配置自定义参数。
--完整主类名,必填,例如com.alibaba.realtimecompute.DatastreamExample。
blink.main.class=${完整主类名}

--包含完整主类名的JAR包资源名称,多个JAR包时必填,例如blink_datastream.jar。
--blink.main.jar=${完整主类名jar包的资源名称}

--默认state backend配置,当作业代码没有显式声明时生效。
state.backend.type=niagara
state.backend.niagara.ttl.ms=129600000

--默认Checkpoint配置,当作业代码没有显式声明时生效。
blink.checkpoint.interval.ms=180000

--默认启用项目参数。
--enable.project.config=true

--设置自定义参数,代码中获取自定义参数的方法请参考如下链接。
--https://help.aliyun.com/document_detail/127758.html?spm=a2c4g.11174283.6.677.61fb1e49NJoWTR
说明 一个DataStream作业中可定义多个自定义参数。

获取方法

在Datastream作业的Main函数中获取自定义参数。如果您已在作业开发页面配置了自定义参数,例如blink.job.name=jobnametest,则可以通过如下代码将字符串jobnametest赋值于blink.job.name变量。

import org.apache.flink.api.java.utils.ParameterTool;
import java.io.StringReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Properties;

public class getParameterExample {
    public static void main(String[] args)throws Exception {
        final String jobName;
        final ParameterTool params = ParameterTool.fromArgs(args);

        /*此处必须写configFile,读取configFile中的参数。*/
        String configFilePath = params.get("configFile");

        /*创建一个Properties对象,用于保存在平台中设置的相关参数值。*/
        Properties properties = new Properties();

        /*将平台页面中设置的参数值加载到Properties对象中。*/
        properties.load(new StringReader(new String(Files.readAllBytes(Paths.get(configFilePath)), StandardCharsets.UTF_8)));

        /*获取参数。*/
        jobName = (String) properties.get("blink.job.name");
    }
}
说明 该示例仅适用于在Main函数中获取并使用自定义参数。如果您需要在Flink算子中使用自定义参数,则需要在上述代码的基础上增加如下代码,将自定义参数转化为全局作业参数(GlobalJobParameters)后,在Flink算子中通过Configuration类的getproperty方法获取对应的参数。
env.getConfig().setGlobalJobParameters(ParameterTool.fromPropertiesFile(configFilePath));