本文介绍阿里云E-MapReduce如何访问Zeppelin。您可以通过访问Zeppelin,进行大数据可视化分析。
前提条件
已创建集群,并选择了Zeppelin服务。
在集群安全组中打开8080端口。
已添加本文示例所需的服务,例如,Presto、Flink和Impala。
访问Zeppelin
进入集群详情页面。
在顶部菜单栏处,根据实际情况选择地域和资源组。
单击上方的集群管理页签。
在集群管理页面,单击相应集群所在行的详情。
在左侧导航栏,单击访问链接与端口。
单击Zeppelin所在行的链接。
您可以直接访问Web UI页面。
示例
以下内容只适用于EMR-3.33.0及之后版本和EMR-4.6.0及之后版本:
如何使用Spark
在Zeppelin页面,单击Create new note。
在Create new note对话框,输入Note Name,选择Default Interpreter为spark。
单击create。
在Zeppelin的Notebook页面,您可以执行以下命令。
您无需配置,EMR里的Zeppelin中已经配置了Spark Interpreter。Spark默认执行模式是Yarn-cluster。支持以下三种代码方式:
Spark Scala
%spark
表示执行Spark Scala代码。%spark val df = spark.read.options(Map("inferSchema"->"true","delimiter"->";","header"->"true")) .csv("file:///usr/lib/spark-current/examples/src/main/resources/people.csv") z.show(df) df.registerTempTable("people")
返回信息如下所示。
PySpark
%spark.pyspark
表示执行PySpark代码。%spark.pyspark df = spark.read.csv('file:///usr/lib/spark-current/examples/src/main/resources/people.csv',header=True,sep=';') df.show()
返回信息如下所示。
Spark SQL
%spark.sql
表示执行Spark SQL代码。%spark.sql show tables; select * from people;
返回信息如下所示。
如何使用Flink
在Zeppelin页面,单击Create new note。
在Create new note对话框,输入Note Name,选择Default Interpreter为flink。
单击create。
在Zeppelin的Notebook页面,您可以执行以下命令。
您无需配置,EMR里的Zeppelin已经为您配置了Flink Interpreter。支持以下三种代码方式:
Flink Scala
%flink
表示执行Flink Scala代码。%flink val data = benv.fromElements("hello world","hello flink","hello hadoop") data.flatMap(line => line.split("\\s")) .map(w => (w,1)) .groupBy(0) .sum(1) .print()
返回信息如下所示。
PyFlink
%flink.pyflink
表示执行PyFlink代码。Flink SQL
%flink.ssql
表示执行Flink SQL代码。在运行下面的示例前,需要先运行下面的代码以构建一个模拟用户日志的数据。
%flink import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.table.api.TableEnvironment import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.checkpoint.ListCheckpointed import org.apache.flink.runtime.state.filesystem.FsStateBackend import java.util.Collections import scala.collection.JavaConversions._ senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) senv.enableCheckpointing(5000) senv.setStateBackend(new FsStateBackend("file:///tmp/flink/checkpoints")); val data = senv.addSource(new SourceFunction[(Long, String)] with ListCheckpointed[java.lang.Long] { val pages = Seq("home", "search", "search", "product", "product", "product") var count: Long = 0 var running : Boolean = true // startTime is 2020/1/1 var startTime: Long = new java.util.Date(2020 - 1900,0,1).getTime var sleepInterval = 100 override def run(ctx: SourceFunction.SourceContext[(Long, String)]): Unit = { val lock = ctx.getCheckpointLock while (count < 1000000 && running) { lock.synchronized({ ctx.collect((startTime + count * sleepInterval, pages(count.toInt % pages.size))) count += 1 Thread.sleep(sleepInterval) }) } } override def cancel(): Unit = { running = false } override def snapshotState(checkpointId: Long, timestamp: Long): java.util.List[java.lang.Long] = { Collections.singletonList(count) } override def restoreState(state: java.util.List[java.lang.Long]): Unit = { state.foreach(s => count = s) } }).assignAscendingTimestamps(_._1) stenv.registerDataStream("log", data, 'time, 'url, 'rowtime.rowtime)
Zeppelin支持所有类型的Flink SQL语句,包括DDL和DML等。您还可以在Zeppelin可视化流式数据,Flink支持以下三种流式数据的可视化:
Single模式
Single模式适合当输出结果是一行的情况,不适用图形化的方式展现。例如下面的
Select
语句。这条SQL语句只有一行数据,但这行数据会持续不断的更新。这种模式的数据输出格式是HTML形式,您可以用template
来指定输出模板,{i}
是第i列的placeholder。%flink.ssql(type=single,parallelism=1,refreshInterval=1000,template=<h1>{1}</h1> until <h2>{0}</h2>) select max(rowtime),count(1) from log
返回信息如下所示。
Update模式
Update模式适合多行输出的情况。例如下面的
select group by
语句。此模式会定期更新这多行数据,输出是Zeppelin的table格式,因此可以用Zeppelin自带的可视化控件。%flink.ssql(type=update,parallelism=1,refreshInterval=2000) select url,count(1) as pv from log group by url
返回信息如下所示。
Append模式
Append模式适合不断有新数据输出,但不会覆盖原有数据,只会不断append的情况。例如下面的基于窗口的
group by
语句。Append模式要求第一列数据类型是TIMESTAMP,本示例的start_time
是TIMESTAMP类型。%flink.ssql(type=append,parallelism=1,refreshInterval=2000,threshold=60000) select TUMBLE_START(rowtime,INTERVAL '5' SECOND) start_time,url,count(1) as pv from log group by TUMBLE(rowtime,INTERVAL '5' SECOND),url
返回信息如下所示。
如果没有呈现如上所示的图表,可能是您的图表配置不对,请按照如下图所示配置图表,然后运行段落。
如何使用Presto
在Zeppelin页面,单击Create new note。
在Create new note对话框,输入Note Name,选择Default Interpreter为presto。
单击create。
在Zeppelin的Notebook页面,您可以执行以下命令查看表信息。
%presto
表示执行Presto SQL代码,您无需配置,Zeppelin会自动连接到EMR集群的Presto服务。%presto show tables; select * from test_1;
返回信息如下所示。
如何使用Impala
在Zeppelin页面,单击Create new note。
在Create new note对话框,输入Note Name,选择Default Interpreter为impala。
单击create。
在Zeppelin的Notebook页面,您可以执行以下命令查看表信息。
%impala
表示执行Impala SQL代码,您无需配置,Zeppelin会自动连接到EMR集群的Impala服务。%impala drop table if exists test_1; create table test_1(id int,name string); insert into test_1 values(1,'test1'); insert into test_1 values(2,'test2'); select * from test_1;
返回信息如下所示。
如何使用Hive
在Zeppelin页面,单击Create new note。
在Create new note对话框,输入Note Name,选择Default Interpreter为hive。
单击create。
在Zeppelin的Notebook页面,您可以执行以下命令查看表信息。
%hive
表示执行Hive SQL代码,您无需配置,Zeppelin会自动连接到EMR集群的Hive Thrift Server服务。%hive show tables; select * from test_1;
返回信息如下所示。