文档

Confluent+数据洞察Databricks最佳实践

更新时间:
一键部署

本文将向您介绍如何使用流数据服务Confluent和数据洞察Databricks搭建自己的离线大数据计算平台。

前提条件

创建集群

Databricks Worker节点公网访问

Databricks的worker节点暂时不支持公网访问,为了能访问Confluent的公网地址,请通过工单联系Databricks的开发人员添加NAT网关。

实践案例一、出租车数据入湖及分析

出租车和网约车在每天的运行中持续产生行驶轨迹和交易数据,这些数据对于车辆调度,流量预测,安全监控等场景有着极大的价值。

本案例中我们使用纽约市的出租车数据来模拟网约车数据从产生并发布到流数据服务Confluent,然后使用Databricks Structured Streaming处理发布的实时数据,最终将数据存储到LakeHouse中。数据存储到LakeHouse后,我们使用Spark和Spark SQL对数据进行分析,并使用Spark的MLlib进行机器学习训练。

步骤一、创建topic

登录Confluent的control center,在左侧选中Topics,点击Add a topic按钮,创建一个名为nyc_taxi_data的topic,将partition设置为3,其他配置保持默认。 topic

步骤二、创建OSS Bucket

在和Databricks同一Region的OSS中,进入到Bucket列表页,点击创建Bucket按钮,新建Bucket,用户可根据需要命名Bucket。

创建好Bucket之后,在新建Bucket下点击新建目录,创建checkpoint_dir和data/nyc_taxi_data两个目录。

步骤三、收集基本信息

  1. Confluent集群ID:在csp的管控界面,集群详情页获取。

  2. 在创建Confluent集群时设置的用户名和密码。

  3. 存储路径:

      • Databricks Structured Streaming的checkpoint存储目录

      • 采集的数据的存储目录

将收集到的基本信息填入如下的变量定义中。

# 集群管控界面获取
confluent_cluster_id = "your_confluent_cluster_id"    
# 使用confluent集群ID拼接得到
confluent_server = "rb-{confluent_cluster_id}.csp.aliyuncs.com:9092" 
control_center_username = "your_confluent_control_center_username"
control_center_password = "your_confluent_control_center_password"

topic = "nyc_taxi_data"

checkpoint_location = "oss://[your_bucket_name]/checkpoint_dir"
taxi_data_delta_lake = "oss://[your_bucket_name]/data/nyc_taxi_data"

步骤四、数据的产生

本案例使用Kaggle上的NYC出租车数据集来模拟数据产生。

我们先安装confluent的python客户端,其他语言的客户端参考Confluent官网

pip install confluent_kafka

构造用于创建Kafka Producer的基础信息:bootstrap.servers为上一步骤中定义的变量confluent_server,sasl.username为上一步骤中定义的变量control_center_username,sasl.password为上一步骤中定义的变量control_center_password

conf = {
    'bootstrap.servers': confluent_server,
    'key.serializer': StringSerializer('utf_8'),
    'value.serializer': StringSerializer('utf_8'),
    'client.id': socket.gethostname(),
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': control_center_username,
    'sasl.password': control_center_password
}

创建Producer:

producer = Producer(conf)

向Kafka中发送消息(模拟数据的产生):

with open("/Path/To/train.csv", "rt") as f:
    float_field = ['fare_amount', 'pickup_longitude', 'pickup_latitude', 
                   'dropoff_longitude', 'dropoff_latitude']
    for row in reader:
        i += 1
        try:
            for field in float_field:
                row[field] = float(row[field])
            row['passenger_count'] = int(row['passenger_count'])
            producer.produce(topic=topic, value=json.dumps(row))
            if i % 1000 == 0:
                producer.flush()
                if i == 200000:
                    break
        except ValueError: # discard null/NAN data
            continue

步骤五、Spark集成Confluent

对spark session的readStream参数进行简单设置即可将Kafka中的实时流数据转换为Spark中的Dataframe:

lines = (spark.readStream
         # 指定数据源kafka
         .format("kafka")
         # 指定kafka bootstrap server的URL
         .option("kafka.bootstrap.servers", confluent_server)
         # 指定订阅的topic
         .option("subscribe", topic)
         # 指定想要读取的数据的offset,earliest表示从每个partition的起始点开始读取
         .option("startingOffsets", "earliest")
         # 指定认证协议
         .option("kafka.security.protocol", "SASL_SSL")
         .option("kafka.sasl.mechanism", "PLAIN")
         # 指定confluent的用户名和密码
         .option("kafka.sasl.jaas.config",
                 f"""org.apache.kafka.common.security.plain.PlainLoginModule 
                 required username="{control_center_username}" password="{control_center_password}";""")
         .load())

从kafka中读取的数据格式如下:

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

由于key和value都是binary格式的,我们需要将value(json)由binary转换为string格式,并定义schema,提取出JSON中的数据,并转换为对应的格式:

schema = (StructType().add('key', TimestampType())
          .add('fare_amount', FloatType())
          .add('pickup_datetime', TimestampType())
          .add('pickup_longitude', FloatType())
          .add('pickup_latitude', FloatType())
          .add('dropoff_longitude', FloatType())
          .add('dropoff_latitude', FloatType())
          .add('passenger_count', IntegerType())
          )

# 将JSON中的列提取出来
lines = (lines.withColumn('data', 
                          from_json(
                              col('value').cast('string'), # binary 转 string
                              schema))                     # 解析为schema
         .select(col('data.*')))                           # select value中的所有列

过滤掉错误,为空,NaN的数据:

lines = (lines.filter(col('pickup_longitude') != 0)
         .filter(col('pickup_latitude') != 0)
         .filter(col('dropoff_longitude') != 0)
         .filter(col('dropoff_latitude') != 0)
         .filter(col('fare_amount') != 0)
         .filter(col('passenger_count') != 0))

最后,我们将解析出来的数据输出到LakeHouse中,以进行后续的分析和机器学习模型训练:

# lakehouse 的存储格式为 delta
query = (lines.writeStream.format('delta')
         .option('checkpointLocation', checkpoint_location)
         .option('path', taxi_data_delta_lake).start())
# 执行job,直到出现异常(如果只想执行该Job一段时间,可以指定timeout参数)
query.awaitTermination()

数据分析

我们先将LakeHouse中的数据使用Spark加载进来:

%pyspark
taxi_data_delta_lake = "oss://[your_bucket_name]/data/nyc_taxi_data/"
df = spark.read.format("delta").load(taxi_data_delta_lake)
df.show(10)
image

然后,我们对该Dataframe创建一个Table View,并探索fare_amount的分布:

%pyspark
df.createOrReplaceTempView("nyc_taxi_data")
%sql
SELECT MIN(`fare_amount`), MAX(`fare_amount`), MEAN(`fare_amount`), STDDEV(`fare_amount`)
FROM `nyc_taxi_data`;
image

可以看到fare_amount的最小值是负数,这显然是一条错误的数据,我们将这些错误的数据过滤,并探索fare_amount的分布:

%pyspark
from pyspark.sql.functions import from_json, col, decode, explode
df = (df.filter(col('fare_amount') > 0)
        .withColumn('int_fare_amount', col('fare_amount').cast('int')))
df.createOrReplaceTempView("nyc_taxi_data")
%sql
SELECT `int_fare_amount`, COUNT(`int_fare_amount`)
FROM nyc_taxi_data
GROUP BY `int_fare_amount`;
image

然后我们探索价格和年份,月份,星期,打车时间的关系:

%sql
SELECT YEAR(`pickup_datetime`), AVG(`fare_amount`)
FROM nyc_taxi_data
GROUP BY YEAR(`pickup_datetime`)
image
%sql
SELECT HOUR(`pickup_datetime`), AVG(`fare_amount`)
FROM nyc_taxi_data
GROUP BY HOUR(`pickup_datetime`)
image

从上面可以看出两点:

  • 出租车的价格和年份有很大关系,从09年到15年呈不断增长的态势

  • 在中午和凌晨打车比上午和下午打车更贵一些。

我们再进一步探索价格和乘客数量的关系:

%sql
SELECT `passenger_count`, AVG(`fare_amount`) 
FROM `nyc_taxi_data`
WHERE `passenger_count` < 7 AND `passenger_count` > 0 
GROUP BY `passenger_count` ORDER BY `passenger_count`;
image

此外,出租车价格的另一个影响因素就是距离,这里我们借助python的geopy包和Spark的UDF来计算给定两个位置的距离,然后再分析费用和距离的关系。

经纬度的范围为[-90, 90],因此,我们第一步是清除错误的数据:

%pyspark
# 过滤掉没有位置信息的数据
df = (df.filter(df.pickup_longitude > -90).filter(df.pickup_longitude < 90).filter(df.pickup_longitude != 0)
      .filter(df.pickup_latitude > -90).filter(df.pickup_latitude < 90).filter(df.pickup_latitude != 0)
      .filter(df.dropoff_longitude > -90).filter(df.dropoff_longitude < 90).filter(df.dropoff_longitude != 0)
      .filter(df.dropoff_latitude > -90).filter(df.dropoff_latitude < 90).filter(df.dropoff_latitude != 0)
      )

然后,我们增加一列数据:出租车行驶的距离,并将距离进行离散化,进行后续的分析:

%pyspark
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from geopy.distance import geodesic


# 定义udf并使用
cal_dis = udf(lambda x1, y1, x2, y2: geodesic((x1, y1), (x2, y2)).miles, FloatType())

# 增加一列:打车的距离
df = df.withColumn('distance', cal_dis(col('pickup_longitude'), col('pickup_latitude'),
                                       col('dropoff_longitude'), col('dropoff_latitude')))

# 将浮点数的距离转化为整数
df = df.withColumn('int_dis', (col('distance') * 10).cast('int'))
df.createOrReplaceTempView("nyc_taxi_data")

统计打车距离的分布:

%sql
SELECT `int_dis`, count(`int_dis`) 
FROM `nyc_taxi_data` 
WHERE `int_dis` < 500
GROUP BY `int_dis` 
ORDER BY `int_dis`;
image

从上图可以看出:打车距离分布在区间[0, 15]miles内,我们继续统计在该区间内,打车价格和打车距离的关系:

%sql
SELECT `int_dis`, AVG(`fare_amount`) 
FROM `nyc_taxi_data` 
WHERE `int_dis` < 150 
GROUP BY `int_dis` 
ORDER BY `int_dis`;
image

如上图所示:打车价格和打车距离呈现出线性增长的趋势。

机器学习建模

在上一小节的数据分析中,我们已经提取了和出租车相关联的一些特征,根据这些特征,我们建立一个简单的线性回归模型:

打车费用~(年份,打车时间,乘客数,距离)

先将特征和目标值提取出来:

%pyspark
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols=['year', 'hour', 'passenger_count', 'distance'], 
                                  outputCol='features')
feature_df = vectorAssembler.transform(df)
linear_reg_df = feature_df.select(['features', 'fare_amount'])
linear_reg_df.show(3)
image

对特征做归一化:

%pyspark
from pyspark.ml.feature import Normalizer
# normalization
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(linear_reg_df)
l1NormData.show(10)
image

分割训练集和测试集:

%pyspark
splits = l1NormData.randomSplit([0.8, 0.2])
train_df = splits[0]
test_df = splits[1]

建立线性回归模型进行训练:

%pyspark
# 开始训练
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='normFeatures', labelCol='fare_amount')
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

训练结果统计:

%pyspark
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
image

使用Evaluator对模型进行评价:

%pyspark
from pyspark.ml.evaluation import RegressionEvaluator

lr_evaluator = RegressionEvaluator(predictionCol="prediction",
                                   labelCol="fare_amount", metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

总结

本文介绍了如何使用阿里云的Confluent Cloud和Databricks来构建您的数据流和LakeHouse,并介绍了如何使用Databricks提供的能力来挖掘数据价值,使用Spark MLlib构建您的机器学习模型。有了Confluent Cloud和Databricks,您可以轻松实现数据入湖,及时在最新版本的数据上进行探索,挖掘您的数据价值。欢迎您试用阿里云ConfluentDatabricks