Flink Table Store与Hive集成

E-MapReduce的Flink Table Store服务支持在Hive中查询数据。本文通过示例为您介绍如何在Hive中查询Flink Table Store中的数据。

使用限制

仅EMR-3.45.0版本、EMR-5.11.0版本的集群,支持在Hive中查询Flink Table Store中的数据。

操作步骤

  1. 查询Hive Catalog与DLF Catalog中的表。

    通过Hive Catalog同步元数据到Hive MetaStore后,可以在Hive中直接查询Hive Catalog中的表。如果创建集群时,元数据选择为DLF统一元数据,则其它服务也可以通过DLF Catalog同步元数据到DLF,并在Hive中查询。

    下面以Spark写入Hive Catalog,在Hive查询为例。

    1. 执行以下命令,启动Spark SQL。

      spark-sql --conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog --conf spark.sql.catalog.tablestore.metastore=hive --conf spark.sql.catalog.tablestore.uri=thrift://master-1-1:9083 --conf spark.sql.catalog.tablestore.warehouse=oss://oss-bucket/warehouse
    2. 执行以下Spark SQL语句,在Catalog中创建一张表,并写入数据。

      -- 在之前创建的Catalog中,创建并使用一个测试database。
      CREATE DATABASE tablestore.test_db;
      USE tablestore.test_db;
      
      -- 创建Flink Table Store表。
      CREATE TABLE test_tbl (
          uuid int,
          name string,
          price double
      ) TBLPROPERTIES (
          'primary-key' = 'uuid'
      );
      
      -- 向Flink Table Store表中写入数据。
      INSERT INTO test_tbl VALUES (1, 'apple', 3.5), (2, 'banana', 4.0), (3, 'cherry', 20.5);
    3. 执行以下命令,启动Hive CLI。

      hive
    4. 执行以下Hive SQL,查询刚刚写入的数据。

      select * from test_db.test_tbl;
  2. 添加并查询外表。

    Hive也可以将指定路径下的Flink Table Store表添加为外表并查询。

    CREATE EXTERNAL TABLE test_ext_tbl
    STORED BY 'org.apache.flink.table.store.hive.TableStoreHiveStorageHandler'
    LOCATION 'oss://oss-bucket/warehouse/test_db.db/test_tbl';
    
    SELECT * FROM test_ext_tbl;