本文介绍如何使用Flink SQL方式将Flink中的数据写入云数据库ClickHouse

使用限制

只有Flink计算引擎VVR 3.0.2及以上版本支持使用Flink SQL写入云数据库ClickHouse

前提条件

  • 已在云数据库ClickHouse中创建表。更多信息,请参见创建表
  • 已在云数据库ClickHouse中设置白名单。更多信息,请参见设置白名单
  • 已开通Flink全托管。更多信息,请参见开通Flink全托管

操作步骤

  1. 登录Flink全托管开发控制台,新建作业。
    1. 登录实时计算管理控制台
    2. Flink全托管页签,单击目标工作空间操作列下的控制台
    3. 在左侧导航栏,单击作业开发
    4. 单击新建
    5. 新建文件对话框,填写作业配置信息。
      作业参数 说明
      文件名称 作业的名称。
      说明 作业名称在当前项目中必须保持唯一。
      文件类型 文件类型需要选择为流作业/SQL。
      流作业和批作业均支持以下类型:
      • SQL
      • JAR
      • PYTHON
      说明 实时计算引擎VVR 3.0.1及以上版本支持批作业。
      部署目标 选择作业需要部署的集群,支持以下两种集群模式:
      • Per-Job集群(默认):适用于占用资源比较大或持续稳定运行的作业。因为作业之间资源隔离,每个作业都需要一个独立的JM,小任务JM的资源利用率较低。
      • Session集群:适用于占用资源比较小或任务启停比较频繁的作业。因为多个作业可以复用相同的JM,可以提高JM资源利用率。
      说明 如果您需要开启SQL Preview功能,必须选择Session集群,且已将其设置为SQL Previews集群,详情请参见作业调试配置开发测试环境(Session集群)
      存储位置 指定该作业的代码文件所属的文件夹。

      您还可以在现有文件夹右侧,单击新建文件夹图标,新建子文件夹。

    6. 单击确认
  2. 作业开发页面,编写并执行代码。
    1. 创建源表、结果表,并将源表数据插入到结果表中。
      --创建源表sls_test_single_local。
      CREATE TEMPORARY TABLE sls_test_single_local (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '50'
      );
      --创建结果表clickhouse_output。
      CREATE TEMPORARY TABLE clickhouse_output (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'clickhouse',
        'url' = 'jdbc:clickhouse://demo.aliyuncs.com:8123',
        'userName' = 'test',
        'password' = '280226Ck',
        'tableName' = 'sls_test_single_local',
      );
      --将源表数据插入到结果表。
      INSERT INTO clickhouse_output
      SELECT 
        id,
        name,
        age,
        rate
      FROM sls_test_single_local;
      说明 创建结果表的语法说明,请参见ClickHouse结果表
    2. 单击保存
    3. 单击验证
    4. 单击上线
    5. 在弹出的上线确认窗口中单击确认,将作业发布至生产环境。
    6. 上线成功后,在弹出的上线成功,请前往运维查看详情中单击运维
  3. 作业运维页面,启动作业。
    1. 单击右上方启动
    2. 在弹出的作业启动配置页面,单击确认启动
  4. 云数据库ClickHouse中查询表。
    1. 使用阿里云账号登录云数据库ClickHouse控制台
    2. 在页面左上角,选择目标集群所在的地域。
    3. 集群列表页面,选择默认实例列表云原生版本实例列表,单击目标集群ID。
    4. 如果您的集群是社区兼容版,请单击右上方导航栏的登录数据库
      如果您的集群是云原生版,请单击集群计算组操作列的登录
    5. 登录实例页面,输入数据库账号和密码,单击登录
    6. 输入查询语句并单击执行(F8)。本文使用下面的示例语句。
      select * from db01.sls_test_single_local;
      查询结果如下。flink数据导入ClickHouse表