流式入库

支持流式入库的系统都基本遵循了一个思路,流式数据按照小批量数据写小文件到存储系统,然后定时合并这些文件。例如,Hive和Delta Lake。Kudu也支持流式入库,但是Kudu的存储是自己设计的,不属于基于大数据存储系统之上的解决方案。本文以Kafka数据源为例介绍流式入库操作。

前提条件

  • 已在E-MapReduce控制台上,创建选择了DeltaLake服务的DataLake集群或Custom集群,详情请参见创建集群

  • 已在E-MapReduce控制台上,创建选择了Kafka服务的DataFlow集群,详情请参见创建集群

使用限制

创建的DataLake集群或Custom集群需要与DataFlow集群在同一VPC和交换机下,不支持跨VPC。

流式入库演变

阶段

详细情况

以前

以前针对流式入库的需求,通常都是自己动手,事实表按照时间划分Partition,粒度比较细。例如,五分钟一个Partition,每当一个Partition运行完成,触发一个INSERT OVERWRITE动作,合并该Partition内的文件重新写入分区。但是这么做有以下几个问题:

  • 缺少读写隔离,易造成读端失败或者产生数据准确性问题。

  • 流式作业没有Exactly-Once保证,入库作业失败后需要人工介入,确保数据不会写重或者写漏(如果是SparkStreaming,有At-Least-Once保证)。

Hive从0.13版本提供了事务支持,并且从2.0版本开始提供了Hive Streaming功能来实现流式入库的支持。但是在实际使用Hive Streaming功能的案例并不多见。其主要原因如下:

  • Hive事务的实现修改了底层文件,导致公共的存储格式等仅能够被Hive读取,导致很多使用SparkSQL、Presto等进行数据分析的用户无法使用该功能。

  • Hive事务目前仅支持ORC。

  • Hive的模式为Merge-on-read,需要对小文件进行Sort-Merge。小文件数量增多之后读性能急剧下降,所以用户需要及时进行小文件的合并。而小文件的合并作业经常失败,影响用户业务效率。

  • Hive这种模式无法拓展到Data Lake场景,仅仅停留在Data Warehouse场景。在Data Lake场景中,数据来源以及数据需求都是多样性的。

现在

有了Delta,可以很方便地应对流式入库的场景。只需要以下四个动作:

  1. 建表。

  2. 启动Spark Streaming任务写入数据。

  3. 定时Optimize(例如:每个Partition写入完成)。

  4. 定时Vacuum(例如:每天)。

Delta实例展示

从上游Kafka中读取数据,写入Delta表。

  1. 使用SSH方式登录DataFlow集群,详情请参见登录集群

  2. 执行以下命令,创建Kafka Topic。

    sudo su - kafka
    kafka-topics.sh --partitions 3 --replication-factor 2 --bootstrap-server core-1-1:9092 --topic delta_stream_sample --create
    说明

    core-1-1为DataFlow集群中Broker节点的内网IP地址或主机名。

  3. 准备一个Python脚本,不断向Kafka内发送数据。

    #! /usr/bin/env python3
    
    import json
    import time
    
    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    
    bootstrap = ['core-1-1:9092']
    topic = 'delta_stream_sample'
    
    def gnerator():
        id = 0
        line = {}
        while True:
            line['id'] = id
            line['date'] = '2019-11-11'
            line['name'] = 'Robert'
            line['sales'] = 123
            yield line
            id = id + 1  
    
    def sendToKafka():
        producer = KafkaProducer(bootstrap_servers=bootstrap)
    
        for line in gnerator():
            data = json.dumps(line).encode('utf-8')
    
            # Asynchronous by default
            future = producer.send(topic, data)
    
            # Block for 'synchronous' sends
            try:
                record_metadata = future.get(timeout=10)
            except KafkaError as e:
                # Decide what to do if produce request failed
                pass
            time.sleep(0.1)
    
    sendToKafka()

    为了方便,数据只有id不一样。

    {"id": 0, "date": "2019-11-11", "name": "Robert", "sales": 123}
    {"id": 1, "date": "2019-11-11", "name": "Robert", "sales": 123}
    {"id": 2, "date": "2019-11-11", "name": "Robert", "sales": 123}
    {"id": 3, "date": "2019-11-11", "name": "Robert", "sales": 123}
    {"id": 4, "date": "2019-11-11", "name": "Robert", "sales": 123}
    {"id": 5, "date": "2019-11-11", "name": "Robert", "sales": 123}
  4. 启动一个Spark Streaming作业,从Kafka读数据,写入Delta表。

    1. 编写Spark代码。

      以Scala版代码为例,代码示例如下。

      import org.apache.spark.SparkConf
      import org.apache.spark.sql.{SparkSession, functions}
      import org.apache.spark.sql.types.{DataTypes, StructField}
      
      object StreamToDelta {
        def main(args: Array[String]): Unit = {
          val targetDir = "/tmp/delta_table"
          val checkpointLocation = "/tmp/delta_table_checkpoint"
          // 192.168.XX.XX 为kafka内网IP地址
          val bootstrapServers = "192.168.XX.XX:9092"
          val topic = "delta_stream_sample"
      
          val schema = DataTypes.createStructType(Array[StructField](
            DataTypes.createStructField("id", DataTypes.LongType, false),
            DataTypes.createStructField("date", DataTypes.DateType, false),
            DataTypes.createStructField("name", DataTypes.StringType, false),
            DataTypes.createStructField("sales", DataTypes.StringType, false)))
      
          val sparkConf = new SparkConf()
      
          //StreamToDelta为scala的类名
          val spark = SparkSession
            .builder()
            .config(sparkConf)
            .appName("StreamToDelta")
            .getOrCreate()
      
          val lines = spark
            .readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", bootstrapServers)
            .option("subscribe", topic)
            .option("maxOffsetsPerTrigger", 1000)
            .option("startingOffsets", "earliest")
            .option("failOnDataLoss", value = false)
            .load()
            .select(functions.from_json(functions.col("value").cast("string"), schema).as("json"))
            .select("json.*")
      
          val query = lines.writeStream
            .outputMode("append")
            .format("delta")
            .option("checkpointLocation", checkpointLocation)
            .start(targetDir)
      
          query.awaitTermination()
        }
      }
    2. 打包程序并部署到DataLake集群。

      1. 本地调试完成后,通过以下命令打包。

        mvn clean install
      2. 使用SSH方式登录DataLake集群,详情信息请参见登录集群

      3. 上传JAR包至DataLake集群。

        本示例是上传到DataLake集群的根目录下。

    3. 提交运行Spark作业。

      执行以下命令,通过spark-submit提交Spark作业。

      spark-submit \
       --master yarn \
       --deploy-mode cluster \
       --driver-memory 1g \
       --executor-cores 2 \
       --executor-memory 3g \
       --num-executors 1 \
       --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 \
       --class com.aliyun.delta.StreamToDelta \
       delta-demo-1.0.jar
      说明

      delta-demo-1.0.jar为您打包好的JAR包。--class和JAR包请根据您实际信息修改。

  5. 另外新建一个spark-shell,确认已经读到数据。

    • Scala

      1. 执行以下命令,进入spark-shell客户端。

        spark-shell --master local 
      2. 执行以下Scala语句,查询数据。

        val df = spark.read.format("delta").load("/tmp/delta_table")
        df.select("*").orderBy("id").show(10000)
    • SQL

      1. 执行以下命令,进入streaming-sql客户端。

        streaming-sql --master local
      2. 执行以下SQL语句,查询数据。

        SELECT * FROM delta_table ORDER BY id LIMIT 10000;

        现在已经写入了2285条数据。

        |2295|2019-11-11|Robert|  123|
        |2296|2019-11-11|Robert|  123|
        |2297|2019-11-11|Robert|  123|
        |2275|2019-11-11|Robert|  123|
        |2276|2019-11-11|Robert|  123|
        |2277|2019-11-11|Robert|  123|
        |2278|2019-11-11|Robert|  123|
        |2279|2019-11-11|Robert|  123|
        |2280|2019-11-11|Robert|  123|
        |2281|2019-11-11|Robert|  123|
        |2282|2019-11-11|Robert|  123|
        |2283|2019-11-11|Robert|  123|
        |2284|2019-11-11|Robert|  123|
        |2285|2019-11-11|Robert|  123|
        +----+----------+------+-----+

Exactly-Once测试

停掉Spark Streaming作业,再重新启动。重新读一下表,读数据正常的话,数据能够从上次断掉的地方衔接上。

  • Scala

    df.select("*").orderBy("id").show(10000)
  • SQL

    SELECT * FROM delta_table ORDER BY id LIMIT 10000;
    |2878|2019-11-11|Robert|  123|
    |2879|2019-11-11|Robert|  123|
    |2880|2019-11-11|Robert|  123|
    |2881|2019-11-11|Robert|  123|
    |2882|2019-11-11|Robert|  123|
    |2883|2019-11-11|Robert|  123|
    |2884|2019-11-11|Robert|  123|
    |2885|2019-11-11|Robert|  123|
    |2886|2019-11-11|Robert|  123|
    |2887|2019-11-11|Robert|  123|
    |2888|2019-11-11|Robert|  123|
    |2889|2019-11-11|Robert|  123|
    |2890|2019-11-11|Robert|  123|
    |2891|2019-11-11|Robert|  123|