文档

Zeppelin概述

更新时间:

本文介绍阿里云E-MapReduce如何访问Zeppelin。您可以通过访问Zeppelin,进行大数据可视化分析。

前提条件

  • 已创建集群,并选择了Zeppelin服务。

  • 在集群安全组中打开8080端口。

  • 已添加本文示例所需的服务,例如,Presto、Flink和Impala。

访问Zeppelin

  1. 进入集群详情页面。

    1. 登录阿里云E-MapReduce控制台

    2. 在顶部菜单栏处,根据实际情况选择地域和资源组

    3. 单击上方的集群管理页签。

    4. 集群管理页面,单击相应集群所在行的详情

  2. 在左侧导航栏,单击访问链接与端口

  3. 单击Zeppelin所在行的链接。

    您可以直接访问Web UI页面。

示例

以下内容只适用于EMR-3.33.0及之后版本和EMR-4.6.0及之后版本:

如何使用Spark

  1. 在Zeppelin页面,单击Create new note

  2. Create new note对话框,输入Note Name,选择Default Interpreterspark

  3. 单击create

  4. 在Zeppelin的Notebook页面,您可以执行以下命令。

    您无需配置,EMR里的Zeppelin中已经配置了Spark Interpreter。Spark默认执行模式是Yarn-cluster。支持以下三种代码方式:

    • Spark Scala

      %spark表示执行Spark Scala代码。

      %spark
      
      val df = spark.read.options(Map("inferSchema"->"true","delimiter"->";","header"->"true"))
      .csv("file:///usr/lib/spark-current/examples/src/main/resources/people.csv")
      z.show(df)
      df.registerTempTable("people")

      返回信息如下所示。Spark Scala

    • PySpark

      %spark.pyspark表示执行PySpark代码。

      %spark.pyspark
      
      df = spark.read.csv('file:///usr/lib/spark-current/examples/src/main/resources/people.csv',header=True,sep=';')
      df.show()

      返回信息如下所示。PySpark

    • Spark SQL

      %spark.sql表示执行Spark SQL代码。

      %spark.sql
      
      show tables;
      select * from people;

      返回信息如下所示。Spark Sql

如何使用Flink

  1. 在Zeppelin页面,单击Create new note

  2. Create new note对话框,输入Note Name,选择Default Interpreterflink

  3. 单击create

  4. 在Zeppelin的Notebook页面,您可以执行以下命令。

    您无需配置,EMR里的Zeppelin已经为您配置了Flink Interpreter。支持以下三种代码方式:

    • Flink Scala

      %flink表示执行Flink Scala代码。

      %flink
      
      val data = benv.fromElements("hello world","hello flink","hello hadoop")
      data.flatMap(line => line.split("\\s"))
                           .map(w => (w,1))
                           .groupBy(0)
                           .sum(1)
                           .print()

      返回信息如下所示。Flink Scala

    • PyFlink

      %flink.pyflink表示执行PyFlink代码。PyFlink

    • Flink SQL

      %flink.ssql表示执行Flink SQL代码。

      在运行下面的示例前,需要先运行下面的代码以构建一个模拟用户日志的数据。

      %flink
      
      import org.apache.flink.streaming.api.functions.source.SourceFunction
      import org.apache.flink.table.api.TableEnvironment
      import org.apache.flink.streaming.api.TimeCharacteristic
      import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
      import org.apache.flink.runtime.state.filesystem.FsStateBackend
      
      import java.util.Collections
      import scala.collection.JavaConversions._
      
      
      senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
      senv.enableCheckpointing(5000)
      senv.setStateBackend(new FsStateBackend("file:///tmp/flink/checkpoints"));
      
      
      val data = senv.addSource(new SourceFunction[(Long, String)] with ListCheckpointed[java.lang.Long] {
      
      
        val pages = Seq("home", "search", "search", "product", "product", "product")
        var count: Long = 0
        var running : Boolean = true
        // startTime is 2020/1/1
        var startTime: Long = new java.util.Date(2020 - 1900,0,1).getTime
        var sleepInterval = 100
      
        override def run(ctx: SourceFunction.SourceContext[(Long, String)]): Unit = {
          val lock = ctx.getCheckpointLock
      
          while (count < 1000000 && running) {
            lock.synchronized({
              ctx.collect((startTime + count * sleepInterval, pages(count.toInt % pages.size)))
              count += 1
              Thread.sleep(sleepInterval)
            })
          }
        }
      
        override def cancel(): Unit = {
          running = false
        }
      
        override def snapshotState(checkpointId: Long, timestamp: Long): java.util.List[java.lang.Long] = {
          Collections.singletonList(count)
        }
      
        override def restoreState(state: java.util.List[java.lang.Long]): Unit = {
          state.foreach(s => count = s)
        }
      
      }).assignAscendingTimestamps(_._1)
      
      stenv.registerDataStream("log", data, 'time, 'url, 'rowtime.rowtime)

      Zeppelin支持所有类型的Flink SQL语句,包括DDL和DML等。您还可以在Zeppelin可视化流式数据,Flink支持以下三种流式数据的可视化:

      • Single模式

        Single模式适合当输出结果是一行的情况,不适用图形化的方式展现。例如下面的Select语句。这条SQL语句只有一行数据,但这行数据会持续不断的更新。这种模式的数据输出格式是HTML形式,您可以用template来指定输出模板,{i}是第i列的placeholder。

        %flink.ssql(type=single,parallelism=1,refreshInterval=1000,template=<h1>{1}</h1> until <h2>{0}</h2>)
        
        select max(rowtime),count(1) from log

        返回信息如下所示。single

      • Update模式

        Update模式适合多行输出的情况。例如下面的select group by语句。此模式会定期更新这多行数据,输出是Zeppelin的table格式,因此可以用Zeppelin自带的可视化控件。

        %flink.ssql(type=update,parallelism=1,refreshInterval=2000)
        
        select url,count(1) as pv from log group by url

        返回信息如下所示。update

      • Append模式

        Append模式适合不断有新数据输出,但不会覆盖原有数据,只会不断append的情况。例如下面的基于窗口的group by语句。Append模式要求第一列数据类型是TIMESTAMP,本示例的start_time是TIMESTAMP类型。

        %flink.ssql(type=append,parallelism=1,refreshInterval=2000,threshold=60000)
        
        select TUMBLE_START(rowtime,INTERVAL '5' SECOND) start_time,url,count(1) as pv from log
        group by TUMBLE(rowtime,INTERVAL '5' SECOND),url

        返回信息如下所示。append

        如果没有呈现如上所示的图表,可能是您的图表配置不对,请按照如下图所示配置图表,然后运行段落。append_set

如何使用Presto

  1. 在Zeppelin页面,单击Create new note

  2. Create new note对话框,输入Note Name,选择Default Interpreterpresto

  3. 单击create

  4. 在Zeppelin的Notebook页面,您可以执行以下命令查看表信息。

    %presto表示执行Presto SQL代码,您无需配置,Zeppelin会自动连接到EMR集群的Presto服务。

    %presto
    
    show tables;
    select * from test_1;

    返回信息如下所示。Presto

如何使用Impala

  1. 在Zeppelin页面,单击Create new note

  2. Create new note对话框,输入Note Name,选择Default Interpreterimpala

  3. 单击create

  4. 在Zeppelin的Notebook页面,您可以执行以下命令查看表信息。

    %impala表示执行Impala SQL代码,您无需配置,Zeppelin会自动连接到EMR集群的Impala服务。

    %impala
    
    drop table if exists test_1;
    create table test_1(id int,name string);
    insert into  test_1 values(1,'test1');
    insert into  test_1 values(2,'test2');
    select * from test_1;

    返回信息如下所示。impala

如何使用Hive

  1. 在Zeppelin页面,单击Create new note

  2. Create new note对话框,输入Note Name,选择Default Interpreterhive

  3. 单击create

  4. 在Zeppelin的Notebook页面,您可以执行以下命令查看表信息。

    %hive表示执行Hive SQL代码,您无需配置,Zeppelin会自动连接到EMR集群的Hive Thrift Server服务。

    %hive
    
    show tables;
    select * from test_1;

    返回信息如下所示。hive