本文为您介绍Datastream作业开发POM依赖包、Datastream作业开发示例和Datastream Connector。

说明
  • 仅独享模式Blink3.2.2及以上版本支持Datastream功能。
  • 建议使用IntelliJ IDEA中的maven工程进行Datastream作业开发。
  • 实时计算独享模式不支持归档保存已停止(含暂停)的作业的运行日志。若需要查询已停止(含暂停)的作业运行日志,请将日志输出至用户自定义的SLS或OSS中。日志输出步骤参见日志下载

POM依赖包

请根据需求自行添加开源版本所支持的POM依赖包。Blink3.2.2版本的POM文件,示例如下。

<properties>
    <scala.version>2.11.12</scala.version>
    <scala.binary.version>2.11</scala.binary.version>
    <blink.version>blink-3.2.2</blink.version>
    <java.version>1.8</java.version>
</properties>
<dependencies>
    <dependency>
        <groupId>com.alibaba.blink</groupId>
        <artifactId>flink-core</artifactId>
        <version>${blink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba.blink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${blink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba.blink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${blink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba.blink</groupId>
        <artifactId>flink-table_2.11</artifactId>
        <version>${blink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.8.1</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.11.12</version>
    </dependency>
</dependencies>
下载查看完整依赖包示例
说明 如果您需要依赖Snapshot版本,可以自行添加Snapshot版本所支持的POM依赖包

作业示例

public class Datastreamtest {
    public static void main(String[] args) throws Exception {

        // 创建Execution Environment。
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 通过从本地TXT文件读取数据。
        //DataStream<String> text = env.readTextFile("./test.txt");
        DataStream<String> text = env.fromElements("hello world", "hello jason", "wonderful world");
        // 解析数据,先按word进行分组,然后进行开窗和聚合操作。
        DataStream<Tuple2<String, Integer>> windowCounts = text
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(Tuple2.of(word, 1));
                        }
                    }
                })
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        // 将结果打印到控制台。请使用单线程打印,而非多线程打印。
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }

}

Connector列表

Blink 3.2版本新增如下Datastream Connector:
  • Kafka
  • Kafka(开源版本)
  • Hbase(开源版本)
  • JDBC
  • RDS SINK
  • Elesticsearch
  • MongoDB
  • Redis
说明 Datastream支持的部分Connector已完成开源,开源信息请参见alibaba-flink-connectors