Flink是流式计算引擎。本文为您介绍如何在Zeppelin中使用Flink。

背景信息

Zeppelin支持Flink的3种主流语言,包括Scala、PyFlink和SQL。Zeppelin中所有语言共用一个Flink Application,即共享一个ExecutionEnvironment和StreamExecutionEnvironment。例如,您在Scala里注册的table和UDF是可以被其他语言使用的。

Flink解释器的基本架构如下图所示:Flink

Zeppelin支持Flink,您可以在Zeppelin里使用Flink的所有功能。

Zeppelin的SQL开发环境和Flink自带的SQL-Client类似,但提供了更多的特性,具体内容如下:
  • 同时支持Batch SQL和Streaming SQL

    Zeppelin中同时支持Batch SQL和Streaming SQL,%flink.ssql用来执行Streaming SQL,%flink.bsql用来执行Batch SQL。

  • 支持多条语句

    您可以在Zeppelin的每一个Paragraph中写多条SQL语句,每条SQL语句以分号(;)间隔。

  • 支持Comment

    您可以在SQL中添加Comment,-- 开头的表示单行Comment,/* */包围的表示多行Comment。

  • 支持Job并行度配置
    您可以设置每个Paragraph的parallelism来控制Flink SQL Job的并行度。Flink-Job
  • 支持Multiple insert
    例如,当您有多条INSERT语句读同一个Source时,结果会写到不同的Sink,默认情况下每条SQL语句都会独立运行一个Flink Job,如果您想合并多条语句到同一个Flink Job的话,您就需要设置runAsOne为true。Multiple insert
  • JobName的设置
    对于INSERT语句的Flink Job,您可以通过设置jobName的方式来指定Job名称。
    注意 只有INSERT语句才支持设置jobName,SELECT语句不支持。此方式只适用于单条INSERT语句,不支持Multiple insert。
    JobName

注意事项

Zeppelin的Flink解释器已经为您创建好了所有Environment变量,您无需创建。

Scala(%flink)

Flink on Zeppelin支持的默认语言是Scala(%flink),也是整个Flink Interpreter内部实现的入口。Flink Interpreter内部会创建Flink Scala Shell, 在Flink Scala里会创建Flink的各种Environment。您在Zeppelin上写的Scala代码就会提交到这个Flink Scala Shell里去执行。

Zeppelin已经为您创建了以下7个变量:
  • senv(StreamExecutionEnvironment)
  • benv(ExecutionEnvironment)
  • stenv(StreamTableEnvironment for blink planner)
  • btenv(BatchTableEnvironment for blink planner)
  • stenv_2(StreamTableEnvironment for flink planner)
  • btenv_2(BatchTableEnvironment for flink planner)
  • z(ZeppelinContext)
示例1:使用Scala写的Batch WordCount。Batch wordcount
示例2:使用Scala写的Streaming WordCount。Streaming wordcount

PyFlink (%flink.pyflink)

PyFlink是Flink on Zeppelin上Python语言的入口,Flink Interpreter内部会创建Python Shell。Python Shell内部会创建Flink的各种Environment,但是PyFlink里的各种Environment变量对应的Java变量都是Scala Shell创建的。您在Zeppelin上写的Python代码会提交到这个Python Shell里去执行。

Zeppelin已经为您创建了以下7个变量:
  • s_env(StreamExecutionEnvironment)
  • b_env(ExecutionEnvironment)
  • st_env(StreamTableEnvironment for blink planner)
  • bt_env(BatchTableEnvironment for blink planner)
  • st_env_2(StreamTableEnvironment for flink planner)
  • bt_env_2(BatchTableEnvironment for flink planner)
  • z(ZeppelinContext)

SQL (%flink.ssql和%flink.bsql)

Zeppelin支持以下两种不用场景的SQL:
  • %flink.ssql:Streaming SQL,使用StreamTableEnvironment来执行SQL。
  • %flink.bsql:Batch SQL, 使用BatchTableEnvironment来执行SQL。
Flink支持的SQL语句类型如下:
  • DDL(Data Definition Language)
  • DML(Data Manipulation Language)
  • DQL(Data Query Language)
  • Flink定制语句

    例如,SET和HELP语句。

Streaming SQL结果可视化

Zeppelin对于Select语句的结果以流式的方式可视化。

Zeppelin可视化类型包含Single模式、Update模式和Append模式。
  • Single模式

    Single模式适合当输出结果是一行的情况,不适合用图形化的方式展现,使用文本的方式更适合。

    例如,下面的Select语句,这条SQL语句只有一行数据,但这行数据会持续不断的更新。
    %flink.ssql(type=single, parallelism=1, refreshInterval=3000, template=<h1>{1}</h1> until <h2>{0}</h2>)
    
    select max(event_ts), count(1) from sink_kafka

    此模式的数据输出格式是HTML形式,您可以用template来指定输出模板,{i}是第i列的placeholder。

    结果如下图所示。Single
  • Update模式
    Update模式适合多行输出的情况,此模式会定期更新数据,输出格式是Zeppelin的table格式。例如,SELECT GROUP BY语句。
    %flink.ssql(type=update, refreshInterval=2000, parallelism=1)
    
    select status, count(1) as pv from sink_kafka group by status
    结果如下图所示。Update
  • Append模式

    Append模式适合时间序列数据,不断有新数据输出,但不会覆盖原有数据,只会不断Append的情况。

    例如,基于窗口的GROUP BY语句。Append模式要求第一列数据类型是TIMESTAMP,代码中的start_time就是TIMESTAMP类型。
    %flink.ssql(type=append, parallelism=1, refreshInterval=2000, threshold=60000)
    
    select TUMBLE_START(event_ts, INTERVAL '5' SECOND) as start_time, status, count(1) from sink_kafka
    group by TUMBLE(event_ts, INTERVAL '5' SECOND), status
    结果如下图所示。Append

UDF

在Zeppelin您可以通过以下4种方式使用或者定义UDF:
  • 在Zeppelin中直接写Scala UDF。
    此方式适合写比较简单的UDF。您只需定义UDF的Class,然后在btenv或stenv中注册定义的UDF。完成注册后,您可以在%flink.bsql%flink.ssql中使用,因为%flink.bsql%flink.ssql共享同一个Catalog。
    %flink
    
    class ScalaUpper extends ScalarFunction {
      def eval(str: String) = str.toUpperCase
    }
    
    btenv.registerFunction("scala_upper", new ScalaUpper())
  • 在Zeppelin中直接写PyFlink UDF。
    此方式适合写比较简单的UDF。在Zeppelin中定义PyFlink UDF与Scala UDF类似。您只需定义UDF的Class,然后在btenv或stenv中注册定义的UDF。完成注册后,您可以在%flink.bsql%flink.ssql中使用,因为%flink.bsql%flink.ssql共享同一个Catalog。
    %flink.pyflink
    
    class PythonUpper(ScalarFunction):
      def eval(self, s):
        return s.upper()
    
    bt_env.register_function("python_upper", udf(PythonUpper(), DataTypes.STRING(), DataTypes.STRING()))
  • 使用SQL创建UDF。
    部分简单的UDF可以在Zeppelin里直接写,但是如果是一些复杂的UDF,建议在IDE里写,然后在Zeppelin中创建注册。例如:
    %flink.ssql
    
    CREATE FUNCTION myupper AS 'org.apache.zeppelin.flink.udf.JavaUpper';
    因为使用此方式前提是这个UDF对应的JAR包必须要在CLASSPATH里,所以您需要先配置flink.execution.jars,把这个UDF的JAR包放到CLASSPATH里。
    flink.execution.jars /Users/jzhang/github/flink-udf/target/flink-udf-1.0-SNAPSHOT.jar
  • 使用flink.udf.jars来指定含有UDF的JAR包。
    如果您有多个UDF,并且UDF之间的逻辑比较复杂,您也不想注册UDF,此时您可以使用flink.udf.jars
    1. 在IDE里创建Flink UDF项目,编写UDF。

      示例详情,请参见Flink UDF

    2. 使用flink.udf.jars指定Flink UDF项目的JAR包。
      flink.udf.jars /Users/jzhang/github/flink-udf/target/flink-udf-1.0-SNAPSHOT.jar

      Zeppelin会扫描JAR包,然后检测出所有的UDF,并且自动注册UDF,UDF的名字就是Class名字。

      例如,show functions的结果如下。show-functions
      说明 默认情况下Zeppelin会扫描JAR包里的所有的Class,如果JAR包过大可能会导致性能问题。此时您可以设置flink.udf.jars.packages来指定扫描的Package,以减少扫描的Class数目。

第三方依赖

Flink作业经常会有第三方依赖。Flink通常使用以下2种配置方式添加依赖:
  • flink.excuetion.packages

    此方式和在pom.xml里添加依赖是类似的,会下载相应的Package和所有Transitive依赖到CLASSPATH里。

    例如,添加Kafka Connector依赖。
    flink.execution.packages    org.apache.flink:flink-connector-kafka_2.11:1.10.0,org.apache.flink:flink-connector-kafka-base_2.11:1.10.0,org.apache.flink:flink-json:1.10.0
    Package的格式为artifactGroup:artifactId:version,当有多个Package时,每个Package用逗号(,)分隔。
    说明 此方式需要Zeppelin机器可以访问外网,如果不能访问外网的话,建议您使用flink.execution.jars方式。
  • flink.execution.jars

    如果您的Zeppelin机器不能访问外网或者您的依赖没有发布在Maven Repository里,则您可以将依赖的JAR包放到Zeppelin机器上,然后通过flink.execution.jars来指定这些JAR包,多个JAR包时用逗号(,)分隔。

    例如,添加Kafka Connector依赖。
    flink.execution.jars /Users/jzhang/github/flink-kafka/target/flink-kafka-1.0-SNAPSHOT.jar
    说明 示例中的flink-kafka-1.0-SNAPSHOT.jar是通过pom.xml文件构建的。

内置教程

EMR数据开发集群自带了很多Flink教程,详细信息请在下图页面查看。Flink-Demo

问题反馈

如果您在使用阿里云E-MapReduce过程中有任何疑问,欢迎您扫描下面的二维码加入钉钉群进行反馈。

emr_dingding