Flink Table Store与Trino集成

E-MapReduce的Flink Table Store服务支持在Trino中查询数据。本文通过示例为您介绍如何在Trino中查询Flink Table Store中的数据。

使用限制

仅EMR-3.45.0版本、EMR-5.11.0版本的集群,支持在Trino中查询Flink Table Store中的数据。

操作步骤

  1. 指定warehouse路径。

    Flink Table Store将数据和元数据都保存在文件系统或对象存储中,存储的根路径由warehouse参数指定,您可以根据实际情况修改。

    1. 进入Trino服务的配置页面。

      1. 登录EMR on ECS控制台

      2. 在顶部菜单栏处,根据实际情况选择地域和资源组

      3. EMR on ECS页面,单击目标集群操作列的集群服务

      4. 集群服务页面,单击Trino服务区域的配置

    2. 配置页面,单击tablestore.properties页签。

    3. 修改warehouse的参数值。

    4. 保存配置。

      1. 单击保存

      2. 在弹出的对话框中,输入执行原因,单击保存

  2. 可选:指定MetaStore类型。

    Trino使用的MetaStore类型根据您创建集群时选择的服务自动设置。如果您想更改MetaStore类型,可以在Trino服务配置页面的tablestore.properties页签,修改metastore的参数值。

    Flink Table Store共有以下三种MetaStore类型:

    • filesystem:元数据仅保存在文件系统或对象存储中。

    • hive:元数据同步到指定的Hive MetaStore中。

    • dlf:元数据同步到DLF中。

  3. 重启Trino服务。

    1. 在Trino服务配置页面,选择右上角的更多操作 > 重启

    2. 在弹出的对话框中,输入执行原因,单击确定

    3. 确认对话框中,单击确定

  4. 查询Flink Table Store数据。

    下面以Spark写入Filesystem Catalog,Trino查询为例。

    1. 执行以下命令,启动Spark SQL。

      spark-sql --conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog --conf spark.sql.catalog.tablestore.metastore=filesystem --conf spark.sql.catalog.tablestore.warehouse=oss://oss-bucket/warehouse
    2. 执行以下Spark SQL语句,在Catalog中创建一张表,并写入数据。

      -- 在之前创建的Catalog中,创建并使用一个测试database。
      CREATE DATABASE tablestore.test_db;
      USE tablestore.test_db;
      
      -- 创建Flink Table Store表。
      CREATE TABLE test_tbl (
          uuid int,
          name string,
          price double
      ) TBLPROPERTIES (
          'primary-key' = 'uuid'
      );
      
      -- 向Flink Table Store表中写入数据。
      INSERT INTO test_tbl VALUES (1, 'apple', 3.5), (2, 'banana', 4.0), (3, 'cherry', 20.5);
    3. 执行以下命令,启动Trino。

      trino --server master-1-1:9090 --catalog tablestore --schema default --user hadoop
    4. 执行以下命令,查询刚刚写入的数据。

      USE test_db;
      
      SELECT * FROM test_tbl;