本文介绍阿里云E-MapReduce如何访问Zeppelin。您可以通过访问Zeppelin,进行大数据可视化分析。

前提条件

  • 已创建集群,并选择了Zeppelin服务,详情请参见创建集群
  • 在集群安全组中打开8080端口,详情请参见访问链接与端口
  • 已添加本文示例所需的服务,例如,Presto、Flink和Impala。

    添加服务详情请参见添加服务

访问Zeppelin

  1. 已通过阿里云账号登录阿里云E-MapReduce控制台
  2. 在顶部菜单栏处,根据实际情况选择地域和资源组
  3. 单击上方的集群管理页签。
  4. 集群管理页面,单击相应集群所在行的详情
  5. 在左侧导航栏,单击访问链接与端口
  6. 单击Zeppelin所在行的链接。
    您可以直接访问Web UI页面。

示例

以下内容只适用于EMR-3.33.0及之后版本和EMR-4.6.0及之后版本:

如何使用Spark

  1. 在Zeppelin页面,单击Create new note
  2. Create new note对话框,输入Note Name,选择Default Interpreterspark
  3. 单击create
  4. 在Zeppelin的Notebook页面,您可以执行以下命令。
    您无需配置,EMR里的Zeppelin中已经配置了Spark Interpreter。Spark默认执行模式是Yarn-cluster。支持以下三种代码方式:
    • Spark Scala
      %spark表示执行Spark Scala代码。
      %spark
      
      val df = spark.read.options(Map("inferSchema"->"true","delimiter"->";","header"->"true"))
      .csv("file:///usr/lib/spark-current/examples/src/main/resources/people.csv")
      z.show(df)
      df.registerTempTable("people")
      返回信息如下所示。Spark Scala
    • PySpark
      %spark.pyspark表示执行PySpark代码。
      %spark.pyspark
      
      df = spark.read.csv('file:///usr/lib/spark-current/examples/src/main/resources/people.csv',header=True,sep=';')
      df.show()
      返回信息如下所示。PySpark
    • Spark SQL
      %spark.sql表示执行Spark SQL代码。
      %spark.sql
      
      show tables;
      select * from people;
      返回信息如下所示。Spark Sql

如何使用Flink

  1. 在Zeppelin页面,单击Create new note
  2. Create new note对话框,输入Note Name,选择Default Interpreterflink
  3. 单击create
  4. 在Zeppelin的Notebook页面,您可以执行以下命令。
    您无需配置,EMR里的Zeppelin已经为您配置了Flink Interpreter。支持以下三种代码方式:
    • Flink Scala
      %flink表示执行Flink Scala代码。
      %flink
      
      val data = benv.fromElements("hello world","hello flink","hello hadoop")
      data.flatMap(line => line.split("\\s"))
                           .map(w => (w,1))
                           .groupBy(0)
                           .sum(1)
                           .print()
      返回信息如下所示。Flink Scala
    • PyFlink
      %flink.pyflink表示执行PyFlink代码。PyFlink
    • Flink SQL

      %flink.ssql表示执行Flink SQL代码。

      在运行下面的示例前,需要先运行下面的代码以构建一个模拟用户日志的数据。
      %flink
      
      import org.apache.flink.streaming.api.functions.source.SourceFunction
      import org.apache.flink.table.api.TableEnvironment
      import org.apache.flink.streaming.api.TimeCharacteristic
      import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
      import org.apache.flink.runtime.state.filesystem.FsStateBackend
      
      import java.util.Collections
      import scala.collection.JavaConversions._
      
      
      senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
      senv.enableCheckpointing(5000)
      senv.setStateBackend(new FsStateBackend("file:///tmp/flink/checkpoints"));
      
      
      val data = senv.addSource(new SourceFunction[(Long, String)] with ListCheckpointed[java.lang.Long] {
      
      
        val pages = Seq("home", "search", "search", "product", "product", "product")
        var count: Long = 0
        var running : Boolean = true
        // startTime is 2020/1/1
        var startTime: Long = new java.util.Date(2020 - 1900,0,1).getTime
        var sleepInterval = 100
      
        override def run(ctx: SourceFunction.SourceContext[(Long, String)]): Unit = {
          val lock = ctx.getCheckpointLock
      
          while (count < 1000000 && running) {
            lock.synchronized({
              ctx.collect((startTime + count * sleepInterval, pages(count.toInt % pages.size)))
              count += 1
              Thread.sleep(sleepInterval)
            })
          }
        }
      
        override def cancel(): Unit = {
          running = false
        }
      
        override def snapshotState(checkpointId: Long, timestamp: Long): java.util.List[java.lang.Long] = {
          Collections.singletonList(count)
        }
      
        override def restoreState(state: java.util.List[java.lang.Long]): Unit = {
          state.foreach(s => count = s)
        }
      
      }).assignAscendingTimestamps(_._1)
      
      stenv.registerDataStream("log", data, 'time, 'url, 'rowtime.rowtime)
      Zeppelin支持所有类型的Flink SQL语句,包括DDL和DML等。您还可以在Zeppelin可视化流式数据,Flink支持以下三种流式数据的可视化:
      • Single模式

        Single模式适合当输出结果是一行的情况,不适用图形化的方式展现。例如下面的Select语句。这条SQL语句只有一行数据,但这行数据会持续不断的更新。这种模式的数据输出格式是HTML形式,您可以用template来指定输出模板,{i}是第i列的placeholder。

        %flink.ssql(type=single,parallelism=1,refreshInterval=1000,template=<h1>{1}</h1> until <h2>{0}</h2>)
        
        select max(rowtime),count(1) from log
        返回信息如下所示。single
      • Update模式
        Update模式适合多行输出的情况。例如下面的select group by语句。此模式会定期更新这多行数据,输出是Zeppelin的table格式,因此可以用Zeppelin自带的可视化控件。
        %flink.ssql(type=update,parallelism=1,refreshInterval=2000)
        
        select url,count(1) as pv from log group by url
        返回信息如下所示。update
      • Append模式
        Append模式适合不断有新数据输出,但不会覆盖原有数据,只会不断append的情况。例如下面的基于窗口的group by语句。Append模式要求第一列数据类型是TIMESTAMP,本示例的start_time是TIMESTAMP类型。
        %flink.ssql(type=append,parallelism=1,refreshInterval=2000,threshold=60000)
        
        select TUMBLE_START(rowtime,INTERVAL '5' SECOND) start_time,url,count(1) as pv from log
        group by TUMBLE(rowtime,INTERVAL '5' SECOND),url
        返回信息如下所示。append
        如果没有呈现如上所示的图表,可能是您的图表配置不对,请按照如下图所示配置图表,然后运行段落。append_set

如何使用Presto

  1. 在Zeppelin页面,单击Create new note
  2. Create new note对话框,输入Note Name,选择Default Interpreterpresto
  3. 单击create
  4. 在Zeppelin的Notebook页面,您可以执行以下命令查看表信息。
    %presto表示执行Presto SQL代码,您无需配置,Zeppelin会自动连接到EMR集群的Presto服务。
    %presto
    
    show tables;
    select * from test_1;
    返回信息如下所示。Presto

如何使用Impala

  1. 在Zeppelin页面,单击Create new note
  2. Create new note对话框,输入Note Name,选择Default Interpreterimpala
  3. 单击create
  4. 在Zeppelin的Notebook页面,您可以执行以下命令查看表信息。
    %impala表示执行Impala SQL代码,您无需配置,Zeppelin会自动连接到EMR集群的Impala服务。
    %impala
    
    drop table if exists test_1;
    create table test_1(id int,name string);
    insert into  test_1 values(1,'test1');
    insert into  test_1 values(2,'test2');
    select * from test_1;
    返回信息如下所示。impala

如何使用Hive

  1. 在Zeppelin页面,单击Create new note
  2. Create new note对话框,输入Note Name,选择Default Interpreterhive
  3. 单击create
  4. 在Zeppelin的Notebook页面,您可以执行以下命令查看表信息。
    %hive表示执行Hive SQL代码,您无需配置,Zeppelin会自动连接到EMR集群的Hive Thrift Server服务。
    %hive
    
    show tables;
    select * from test_1;
    返回信息如下所示。hive

问题反馈

如果您在使用阿里云E-MapReduce过程中有任何疑问,欢迎您扫描下面的二维码加入钉钉群进行反馈。

emr_dingding