配置跨库Spark SQL节点

任务编排中的跨库Spark SQL节点,主要针对各类跨库数据同步和数据加工场景,您可以通过编写Spark SQL,完成各种复杂的数据同步或数据加工的任务开发。

前提条件

  • 支持的数据库类型:

    • MySQL:RDS MySQLPolarDB MySQL版MyBase MySQLPolarDB分布式版AnalyticDB for MySQL、其他来源MySQL

    • SQL Server:RDS SQL ServerMyBase SQL Server、其他来源SQL Server

    • PostgreSQL:RDS PostgreSQLPolarDB PostgreSQL版MyBase PostgreSQLAnalyticDB for PostgreSQL、其他来源PostgreSQL

    • Oracle

    • DB2

    • MaxCompute

    • Hologres

    • OSS

  • 若需要添加OSS引用,建议您提前获取OSS文件路径或录入OSS。详情请参见录入对象存储OSS

使用限制

  • Spark SQL任务节点基于Spark计算引擎运行,单个任务单次处理数据时不要超过200万条,否则可能影响任务运行效率。

  • 因计算资源有限,任务运行高峰期无法保证计算时效。

  • 系统在计算数据量过大且缺失主键的表时,会导致内存溢出或内存耗尽(OOM)。

  • 周期调度节点最近一次运行成功后,若连续运行失败10次及以上,离线集成任务直接执行失败,且不会再提交Spark任务。此时,您需要手动运行成功该任务节点。

应用场景

跨库Spark SQL任务主要应用于跨库数据同步和跨库数据加工:

  • 跨库数据同步:

    • 在线库同步到数据仓库,用于数据加工。在线业务会产生大量的数据,当需要对这些数据进行加工分析时,一般需要将在线业务的数据同步到专门用于数据加工和分析的数据仓库,再进行数据分析。

    • 数据仓库的数据回流到在线库,用于数据查询。在数据仓库中对数据进行加工分析后,往往需要将数据同步回在线库中,以便在线应用提供相关的数据分析和统计服务。

    示例:某消费服务平台使用的是MySQL数据库,需要在数据仓库AnalyticDB for PostgreSQL中对消费数据进行消费金额、消费笔数等的统计分析,将统计分析后的数据回传到消费服务平台上,供用户进行在线查询。

    • 将MySQL中的增量消费数据同步到AnalyticDB for PostgreSQL中,Spark SQL语句如下:

      INSERT INTO adb_dw.orders
      SELECT * FROM mysql_db.orders
      WHERE dt>${bizdate} AND dt<=${tomorrow}
    • AnalyticDB for PostgreSQL中加工后的数据同步到MySQL中,Spark SQL语句如下:

      INSERT INTO mysql_db.orders_month 
      SELECT * FROM adb_dw.orders_month 
      WHERE dt=${bizdate}
  • 跨库数据加工:

    跨多个库的数据写入到在线库中,实现在线应用中直接查询数据的功能。

    示例:某电商企业的交易数据保存在MySQL在线库中,而员工数据保存在HR系统中,系统使用的数据库为Oracle,当企业需要按部门统计销售额时,需要对部门、员工和交易数据进行关联查询。以下Spark SQL语句可以实现将mysql_db在线库的交易流水表sales与oracle_db库的用户表users进行关联,并将关联后的数据按部门名称分组,统计交易笔数和金额,加工后的数据写入到mysql_db在线库中。

    INSERT INTO mysql_db.dept_sales 
    SELECT dept_name,trade_date, COUNT(*) cnt, SUM(amt) amt FROM mysql_db.sales t1 JOIN oracle_db.users t2 ON t1.emp_id=t2.id
    WHERE t1.trade_date=${bizdate} GROUP BY t2.dept_name

功能特性

  • 跨库数据处理:支持使用SQL语句操作不同数据库中的数据,数据生态较全面,可通过扩展支持各种数据源。

  • 大数据量处理:支持快速处理较大规模的数据(十万条以上数据)。

  • Spark SQL语法:基于Spark 3.1.2版本部署,提供该版本所有语法特性和原生函数。原生函数包括聚合函数、窗口函数、数组函数、Map函数、日期和时间处理函数、JSON处理函数等。

  • 兼容标准SQL:通过标准的SQL语句,也可实现跨库数据同步和数据加工。

  • Serverless:Spark SQL任务是基于Spark引擎进行数据处理的无服务器化计算服务,用户无需预购计算资源和维护资源,没有运维和升级成本。

  • 支持的SQL语句包括:CREATE TABLE, CREATE SELECT, DROP TABLE, INSERT, INSERT SELECT, ALTER TABLE, TRUNCATE, SET, ANALYZE, MSCK REPAIR。

    说明
    • TRUNCATE语句仅支持OSS类型的表,其他类型执行无效果。

    • 不支持的SQL语句包括:SELECT、DELETE、UPDATE、CREATE DATABASE、DROP DATABASE。

  • 支持OSS文件存储数据源,包括CSV、JSON、PARQUET、ORC多种文件格式。

操作步骤

  1. 登录数据管理DMS 5.0
  2. 在顶部菜单栏中,选择集成与开发 > 数据开发 > 任务编排

    说明

    若您使用的是极简模式的控制台,请单击控制台左上角的2023-01-28_15-57-17.png图标,选择全部功能 > 集成与开发 > 数据开发 > 任务编排

  3. 单击目标任务流名称,进入任务流详情页面。

    说明

    如果您需要新增任务流,请参见新增任务流

  4. 在画布左侧任务类型列表中,拖拽跨库Spark SQL节点到画布空白区域。

  5. 双击跨库Spark SQL

  6. 可选:在跨库Spark SQL配置页面,单击变量设置,配置SQL语句中需要引用的变量。您可以单击变量设置区域右上角的xiangq,查看配置变量的提示信息。

    • 单击节点变量页签,配置节点变量。具体配置,请参见配置时间变量

    • 单击任务流变量页签,配置任务流变量。具体配置,请参见配置时间变量

    • 单击输入变量页签,可查看上游变量、运行状态变量和系统变量。

  7. 可选:OSS引用区域,单击添加OSS引用,添加SQL语句中需要引用的OSS,配置完成后单击保存

    配置项

    是否必填

    说明

    数据库

    在下拉框中,搜索并选择目标数据库。

    重要

    非安全协同模式的数据库,如果数据库的账号发生变更(如密码、权限变更),可能导致任务运行失败,需要重新登录数据库并保存节点配置。

    OSS路径

    指定数据保存在OSS Bucket上的路径。

    说明
    • 如果路径不存在,将自动创建。

    • 路径中支持使用变量,例如/path/${foldername}

    Spark SQL引用别名

    输入该OSS在Spark SQL语句中的别名,默认值为oss。

    说明

    引用别名由数字、大小写字母及下划线(_)组成,且长度不超过32个字符。

  8. 可选:数据库引用区域,单击添加数据库引用,添加SQL语句中需要引用的数据库。配置完成后单击保存

    配置项

    是否必填

    说明

    数据库类型

    选择目标数据库的类型。

    说明

    您可以进入SQL窗口,在页面左侧的实例列表中,将鼠标移动到目标数据库所属实例上,查看数据库类型。

    数据库

    在数据库下拉框中,搜索并选择目标数据库。

    重要

    非安全协同模式的数据库,如果数据库的账号发生变更(如密码、权限变更),可能导致任务运行失败,需要重新登录数据库并保存节点配置。

    Spark SQL引用别名

    输入该数据库在Spark SQL语句中的别名,默认值为数据库名。

    说明

    数据库别名由数字、大小写字母及下划线(_)组成,且长度不超过32个字符。

    如需添加多个目标数据库,单击数据库右侧的5加6

  9. 在SQL区域,编写Spark SQL语句,并进行试运行。

    1. 编写Spark SQL语句,并单击保存

      例如,从qntext数据库的ex_customer表读取数据,并写入到qn_rds数据库的test_table表中。qntext数据库的别名设置为qn,qn_rds数据库的别名设置为rds。输入的SQL语句如下:

      INSERT INTO rds.test_table SELECT * FROM qn.ex_customer;
      说明
      • SQL语句中可以引用变量,引用变量格式为${var_name}。

      • 您可以在编写SQL代码的过程中,随时单击SQL预览,预览SQL代码。

      • 单击有效性检查,可以检查SQL代码的有效性。

    2. 单击试运行

      • 如果执行日志的最后一行出现status SUCCEEDED,表明任务试运行成功。

      • 如果执行日志的最后一行出现status FAILED,表明任务试运行失败,在执行日志中查看执行失败的节点和原因,修改配置后重新尝试。

相关文档