任务编排中的跨库Spark SQL节点,主要针对各类跨库数据同步和数据加工场景,您可以通过编写Spark SQL,完成各种复杂的数据同步或数据加工的任务开发。

前提条件

支持的数据库类型:
  • MySQL:RDS MySQL、PolarDB MySQL引擎、AnalyticDB MySQL、PolarDB-X、其他来源MySQL。
  • SQL Server:RDS SQL Server、其他来源SQL Server。
  • PostgreSQL:RDS PostgreSQL、PolarDB PostgreSQL引擎、AnalyticDB PostgreSQL。
  • MaxCompute、交互式分析Hologres
  • Oracle
  • DB2
  • OSS存储
说明 支持各种来源的数据库:阿里云数据库、具有公网IP的机器、IDC自建数据库、其他云厂商数据库。

使用限制

Spark SQL任务节点目前处于公测期,公测期间单个任务单次处理数据请不要超过200万条,否则可能影响任务运行效率。

应用场景

跨库Spark SQL任务主要应用于跨库数据同步和跨库数据加工。

  • 跨库数据同步:
    • 在线库同步到数据仓库,用于数据加工。在线业务会产生大量的数据,当需要对这些数据进行加工分析时,一般需要将在线业务的数据同步到专门用于数据加工和分析的数据仓库,再进行数据分析。
    • 数据仓库的数据回流到在线库,用于数据查询。在数据仓库中对数据进行加工分析后,往往需要将数据同步回在线库中,以便在线应用提供相关的数据分析和统计服务。

    示例:某消费服务平台使用的是MySQL数据库,需要在数据仓库AnalyticDB MySQL中对消费数据进行消费金额、消费笔数等的统计分析,将统计分析后的数据回传到消费服务平台上,供用户进行在线查询。

    • 将MySQL中的增量消费数据同步到数据仓库AnalyticDB MySQL中,Spark SQL语句如下:
      INSERT INTO adb_dw.orders
      SELECT * FROM mysql_db.orders
      WHERE dt>${bizdate} AND dt<=${tomorrow}
    • 将数据仓库AnalyticDB MySQL中加工后的数据同步到MySQL中,Spark SQL语句如下:
      INSERT INTO mysql_db.orders_month 
      SELECT * FROM adb_dw.orders_month 
      WHERE dt=${bizdate}
  • 跨库数据加工:跨多个库的数据写入到在线库中,实现在线应用中直接查询数据的功能。

    示例:某电商企业的交易数据保存在MySQL在线库中,而员工数据保存在HR系统中,系统使用的数据库为Oracle,当企业需要按部门统计销售额时,需要对部门、员工和交易数据进行关联查询。以下Spark SQL语句可以实现将mysql_db在线库的交易流水表sales与oracle_db库的用户表users进行关联,并将关联后的数据按部门名称分组,统计交易笔数和金额,加工后的数据写入到mysql_db在线库中。

    INSERT INTO mysql_db.dept_sales 
    SELECT dept_name,trade_date, COUNT(*) cnt, SUM(amt) amt FROM mysql_db.sales t1 JOIN oracle_db.users t2 ON t1.emp_id=t2.id
    WHERE t1.trade_date=${bizdate} GROUP BY t2.dept_name

主要特性

  • 跨库数据处理:支持使用SQL语句操作不同数据库中的数据,数据生态较全面,可通过扩展支持各种数据源。
  • 大数据量处理:对较大规模的数据(十万条以上数据)处理速度快。
  • Spark SQL语法:基于Spark 3.0.3版本部署,提供该版本所有语法特性和原生函数。原生函数包括聚合函数、窗口函数、数组函数、Map函数、日期和时间处理函数、JSON处理函数等。更多信息,请参见Spark官方文档
  • 兼容标准SQL:通过标准的SQL语句,也可实现跨库数据同步和数据加工。
  • Serverless:Spark SQL任务是基于Spark引擎进行数据处理的无服务器化计算服务,用户无需预购计算资源和维护资源,没有运维和升级成本。

支持功能

  • 支持百万级以上大规模数据同步和加工。
  • 支持Spark3.0.3语法和所有函数,详见Spark官方文档
  • 支持SQL语句包括:CREATE TABLE, CREATE SELECT, DROP TABLE, INSERT, INSERT SELECT, ALTER TABLE, TRUNCATE, SET, ANALYZE, MSCK REPAIR。
  • 支持OSS文件存储数据源,支持CSV、JSON、PARQUET、ORC多种文件格式。
说明 使用限制:
  • 不支持DELETE和UPDATE语句。
  • 不支持CREATE DATABASE和DROP DATABASE语句。

操作步骤

  1. 登录数据管理DMS 5.0
    说明 如果您需要切换到旧版数据管理DMS,单击页面右下角返回旧版,进入数据管理DMS平台。具体操作,请参见数据管理DMS 5.0切换至旧版
  2. 在顶部菜单栏中,选择集成与开发(DTS) > 数据开发 > 任务编排
    说明 如果您使用的是旧版数据管理DMS,在顶部菜单栏中,选择全部功能 > 数据工厂 > 任务编排(新)
  3. 单击目标任务流名称,进入任务流详情页面。
    说明 关于新增任务流,请参见新增任务流
  4. 在画布左侧任务类型列表中,拖拽跨库Spark SQL节点到画布空白区域。
  5. 选中跨库Spark SQL
  6. 可选:单击节点信息页签,配置SQL语句中需要引用的变量。关于变量的详细介绍,请参见变量概述
    • 配置节点变量。单击节点变量页签,配置节点变量。具体配置,请参见配置时间变量
      说明 单击变量设置区域右上角的提示,查看变量配置的提示信息。
    • 配置任务流变量。单击任务流变量页签,配置任务流变量。具体配置,请参见配置时间变量
      说明 单击变量设置区域右上角的提示,查看变量配置的提示信息。
    • 查看输入变量。单击输入变量页签,可查看上游变量、运行状态变量和系统变量。
  7. 可选:OSS引用区域,单击添加OSS引用,添加SQL语句中需要引用的OSS,配置完成后单击保存
    配置项 说明
    OSS Bucket 在OSS Bucket下拉框中,搜索并选择目标OSS实例。
    OSS路径 指定数据保存在OSS Bucket上的路径。
    说明
    • 如果路径不存在,将自动创建。
    • 路径中支持使用变量,例如/path/${foldername}
    Spark SQL引用别名 输入该OSS在Spark SQL语句中的别名,默认值为oss。
    说明 引用别名由数字、大小写字母及下划线(_)组成,且长度不超过32个字符。
  8. 可选:数据库引用区域,单击添加数据库引用,添加SQL语句中需要引用的数据库。配置完成后单击保存
    配置项 说明
    数据库类型 选择目标数据库的类型。
    说明 您可以进入SQL窗口,在页面左侧的实例列表中,将鼠标移动到目标数据库所属实例上,查看数据库类型。
    数据库 在数据库下拉框中,搜索并选择目标数据库。
    说明 非安全协同模式的数据库,如果数据库的账号发生变更(如密码、权限变更),可能导致任务运行失败,需要重新选择并登录数据库。
    Spark SQL引用别名 (alias) 输入该数据库在Spark SQL语句中的别名,默认值为数据库名。
    说明 数据库别名由数字、大小写字母及下划线(_)组成,且长度不超过32个字符。
    如需添加多个目标数据库,单击数据库右侧的5加62 数据库增加
  9. 在SQL区域,编写Spark SQL语句,并单击保存
    例如,从qntext数据库的ex_customer表读取数据,并写入到qn_rds数据库的test_table表中。qntext数据库的别名设置为qn,qn_rds数据库的别名设置为rds。输入的SQL语句如下:
    INSERT INTO rds.test_table SELECT * FROM qn.ex_customer;
    说明
    • SQL语句中可以引用变量,引用变量格式为${var_name}。
    • 您可以在编写SQL代码的过程中,随时单击SQL预览,预览SQL代码。
    • 单击有效性检查,可以检查SQL代码的有效性。