使用Kettle调度MaxCompute

MaxCompute支持您通过ETL工具Kettle实现MaxCompute作业调度。您可以通过拖拽控件的方式,方便地定义数据传输的拓扑结构。本文为您介绍如何通过MaxCompute JDBC驱动,连接KettleMaxCompute项目并调度作业。

背景信息

Kettle是一款开源的ETL工具,纯Java实现,可以运行于Windows、Unix、Linux操作系统,为您提供图形化的操作界面。Kettle支持丰富的输入输出数据源,数据库支持Oracle、MySQL、DB2等,也支持各种开源的大数据系统,例如HDFS、HBase、Cassandra、MongoDB等。

您可以在Kettle中通过创建Job的方式连接MaxCompute项目,并按照ETL流程调度作业。

前提条件

在执行操作前,请确认您已满足如下条件:

  • 已创建MaxCompute项目。

    更多创建MaxCompute项目操作,请参见创建MaxCompute项目

  • 已获取可访问MaxCompute项目的AccessKey IDAccessKey Secret。

    您可以进入AccessKey管理页面获取AccessKey IDAccessKey Secret。

  • 已下载包含完整依赖JARjar-with-dependenciesMaxCompute JDBC驱动(v3.2.8及以上版本)

    本文中的MaxCompute JDBC驱动示例版本为v3.2.9。

  • 已下载Kettle安装包并解压至本地路径。

    本文中的Kettle示例版本为8.2.0.0-342。

操作流程

  1. 步骤一:放置MaxCompute JDBC驱动

    MaxCompute JDBC驱动放置于Kettle的驱动目录下,后续Kettle可通过该驱动访问MaxCompute项目。

  2. 步骤二:Kettle连接MaxCompute项目

    通过配置连接参数,连接KettleMaxCompute项目。

  3. 步骤三:创建作业调度流程

    Spoon界面创建作业调度流程并配置作业信息。

  4. 步骤四:运行作业调度流程

    基于创建好的作业调度流程运行作业。

  5. 步骤五:查看作业调度结果

    通过SQL编译器查看作业调度结果。

步骤一:放置MaxCompute JDBC驱动

MaxCompute JDBC驱动JAR包(例如odps-jdbc-3.2.9-jar-with-dependencies.jar)放置于Kettle的安装目录data-integration/lib下。

放置驱动

步骤二:Kettle连接MaxCompute项目

  1. Kettle的安装目录data-integration下,双击Spoon.bat(Windows系统)或者双击Spoon(macOS系统),即可启动Spoon,进入Spoon界面。

  2. 在顶部菜单栏,选择文件 > 新建 > 作业,创建Kettle作业,用于后续创建作业调度流程。

    新建作业

  3. 主对象树页签的DB连接处单击右键选择新建

    新建数据连接

  4. 数据连接对话框单击一般,并配置下表所列参数信息。

    配置连接信息

    参数

    说明

    连接名称

    新建数据连接的名称,用于在系统中区分不同数据库的连接。例如MaxCompute。

    连接类型

    固定选择Generic database

    连接方式

    固定选择Native (JDBC)

    Dialect

    固定选择Hadoop Hive 2

    自定义连接URL

    连接MaxCompute项目的URL。格式为jdbc:odps:<MaxCompute_endpoint>?project=<MaxCompute_project_name>。配置时删除<>符号。参数说明如下:

    • <MaxCompute_endpoint>:必填。MaxCompute项目所属区域的Endpoint。

      各地域的Endpoint信息,请参见Endpoint

    • <MaxCompute_project_name>:必填。待连接的目标MaxCompute项目名称。

      此处为MaxCompute项目名称,非工作空间名称。您可以登录MaxCompute控制台,左上角切换地域后,即可在项目管理页面查看到具体的MaxCompute项目名称。

    自定义驱动类名称

    用于连接MaxCompute项目的驱动程序。固定取值为com.aliyun.odps.jdbc.OdpsDriver

    用户名

    具备目标MaxCompute项目访问权限的AccessKey ID。

    您可以进入AccessKey管理页面获取AccessKey ID。

    密码

    AccessKey ID对应的AccessKey Secret。

  5. 单击测试,连接成功后依次单击确定确认,完成KettleMaxCompute连接。

    测试连接

步骤三:创建作业调度流程

您可以在Spoon界面的核心对象页签通过创建、关联核心对象(作业)的方式构造作业调度流程。

此处以通过LOAD命令从OSS加载数据,并写入MaxCompute内部表的ETL过程为例为您介绍操作流程,对应示例数据请参见通过内置Extractor(StorageHandler)导入数据。该ETL过程涉及的作业可根据核心对象类型拆解如下。

调度流程

  1. Spoon界面,单击核心对象页签。

  2. 基于上图拆解的核心对象,从左侧导航栏中依次拖拽核心对象组件至右侧作业区域中,并按照下图所示结构连接各核心对象。

    连接核心对象的方式为:选中核心对象后,按住Shift同时单击核心对象,即可出现连接线,连接至目标核心对象即可。创建调度流程

  3. 在脚本类型的核心对象上单击右键,选择编辑作业入口,在SQL对话框配置下表所列参数信息后,单击确定。依次完成所有脚本类型核心对象配置。

    编辑核心对象

    参数

    说明

    作业项名称

    调度作业的名称。例如Create table、Load from OSS、Processing。

    数据库连接

    访问的数据连接名称。即步骤二中创建的数据连接。例如MaxCompute。

    SQL脚本作为一条语句发送

    不选中。

    SQL脚本

    调度作业对应的SQL脚本。示例中脚本类型核心对象对应的SQL脚本如下:

    • Create table

      CREATE TABLE ambulance_data_csv_load (
      vehicleId INT,
      recordId INT,
      patientId INT,
      calls INT,
      locationLatitute DOUBLE,
      locationLongtitue DOUBLE,
      recordTime STRING,
      direction STRING);
    • Load from OSS

      LOAD OVERWRITE TABLE ambulance_data_csv_load 
      FROM 
      LOCATION 'oss://oss-cn-hangzhou-internal.aliyuncs.com/mc-test/data_location/' 
      STORED BY 'com.aliyun.odps.CsvStorageHandler' 
      WITH serdeproperties (
      'odps.properties.rolearn'='acs:ram::xxxxx:role/aliyunodpsdefaultrole',   --AliyunODPSDefaultRoleARN信息,可通过RAM角色管理页面获取。
      'odps.text.option.delimiter'=','
      );
    • Processing

      INSERT OVERWRITE TABLE ambulance_data_csv SELECT * FROM ambulance_data_csv_load;

步骤四:运行作业调度流程

  1. 在创建的作业调度流程界面,单击左上角的运行图标后,在执行作业对话框右下角单击执行

    执行

  2. 可选:如果弹出如下对话框,单击,保存创建的作业调度流程,并按照提示指引命名。例如mc。

    保存作业

  3. 通过调度流程界面的DAG图或执行结果区域查看运行状态,当呈现下图所示状态时,表明作业调度流程运行结束。

    运行结束

步骤五:查看作业调度结果

作业调度流程运行完成后,通过简单SQL脚本查看数据是否成功写入目标表中。

  1. Spoon界面单击主对象树页签,在创建的Kettle作业(例如mc)下单击DB连接

  2. 在创建的数据连接(例如MaxCompute)上单击右键,选择SQL编辑器

    SQL编辑器

  3. 简单SQL编辑器对话框,输入SQL脚本并单击执行,即可在预览数据对话框查看到查询结果。

    执行SQL脚本SQL脚本如下:

    SELECT * FROM ambulance_data_csv;