DataWorks提供开放消息能力,您可以在DataWorks开放平台开启消息订阅功能,通过订阅DataWorks的事件消息,应用系统对接DataWorks,实时获取相关内容的状态变化。例如,您可以通过开放消息获取DataWorks中数据库表和调度任务的状态变更信息,实现个性化响应行为。

背景信息

DataWorks的事件变更消息首先发布至Kafka的Topic中,订阅此类消息的Consumer再通过Kafka的Topic订阅消费,具体逻辑请参考概述

在DataWorks上使用开放消息时,您需要关注以下几个重要的功能:
  • 通过Topic订阅设置管理topic订阅的工作空间及事件类型,即指定工作空间下的哪些开发行为事件会触发检查,将事件消息通过该Topic发送到用户端服务。您可以使用消息测试功能测试消息发送是否正常、使用消息查询功能查询该Topic发送过的消息记录
  • 通过Group管理对Kafka的Comsumer Group进行管理。
  • 通过用户及权限管理页面控制哪些用户有权限消费Topic的数据。如果您需要在本地消费Kafka某个Topic的数据,您可以通过用户及权限管理页面为指定用户授权该Topic、group,以便在本地服务有权限访问该Kafka并正常消费数据。
说明
  • 阿里云DataWorks为您供公共Kafka,因此您仅需创建用于订阅消息的Topic即可,无需进行其他Kafka的配置。
  • 关于Topic、Consumer Group及用户的详细介绍请参见名词解释

使用限制

  • 仅在华北2(北京)、华东1(杭州)、华南1(深圳)和华东2(上海)地域启动公测。
  • 仅支持DataWorks企业版用户参与公测活动,正式商业化方案及时间另行通知。

    企业版用户可以直接参与公测并试用开放消息能力,无需申请和支付相关费用。

  • 仅支持阿里云主账号或者有AliyunDataWorksFullAccess权限的RAM用户进行界面相关配置。
  • 公测期间,仅支持创建2个Topic、2个Consumer Group和2个用户。
    说明 关于Topic、Consumer Group及用户的详细介绍请参见名词解释

事件列表及相关概念

DataWorks开放消息页面为您展示当前已存在的事件列表。事件列表
界面参数 含义
事件名称 操作触发的事件名称。
事件描述 事件相关定义。
是否扩展点 当前事件是否需要回调OpenAPI,即是否需要将逻辑处理结果返回DataWorks。
回调OpenAPI 回调的OpenApi名称。
其中:
  • 扩展点(Extension Points)是一种“Hook”,当事件发生时,将对事件进行“中断”。
  • 扩展程序(Extension),接收事件消息并对该事件进行处理,最后回调OpenAPI返回事件处理结果,系统将根据返回的结果继续执行下一步操作。

消息订阅配置

使用开放消息前,需要按照如下指导在DataWorks上先开启消息订阅功能并配置好消息队列Kafka版的相关配置信息,方便后续使用消息队列Kafka版开发消息订阅。

  1. 登录DataWorks控制台,在左侧导航栏,单击开放平台
  2. 查看接入点信息。
    在消息订阅配置处,查看并记录接入点信息。接入点信息DataWorks和Consumer可通过Kafka的SSL接入点接入并收发消息,DataWorks提供的公共Kafka的SSL接入点信息可在此页面查看。不同语言SDK的消息订阅开发指导请参考文档开发消息订阅
  3. 选择开放消息,打开启动消息订阅开关,即可开启消息订阅服务。
    说明 开启消息订阅服务后,DataWorks将会把指定事件类型的消息推送到您创建的Topic中,若关闭启动消息订阅开关,则会停止所有Topic的消息推送。
  4. 单击新建Topic,填写Topic名称和描述,单击确定
    说明 该Topic主要描述消息的主题,用于消息分类。Topic名称一旦创建,无法修改。
  5. 在所创建的Topic右侧,单击订阅设置,在Topic订阅内容设置窗口,为当前Topic选择需要订阅的工作空间和事件类型。
    Topic订阅内容设置
    说明
    • 一个Topic最多可以订阅5个工作空间。
    • 目前,订阅事件类型支持:
      • 任务变更事件
      • 文件发布前置事件(扩展点)
      • 文件提交前置事件(扩展点)
      • 代码运行前置事件(扩展点)
      • 文件删除前置事件(扩展点)
      • 表提交前置事件(扩展点)
      • 表发布前置事件(扩展点)
      其中,出任务变更事件外,其他扩展点类的事件支持关联扩展程序,通过扩展程序来对此类的事件结果做审批,扩展程序相关的介绍与配置指导请参见注册并管理扩展程序
  6. 单击新建用户,在Topic下拉列表,选择需要订阅的Topic,填写Consumer Group名称、用户名、密码、描述信息,单击确定,即可创建一个用户。
    说明
    • Consumer表示从消息队列Kafka版接收消息的应用。
    • Consumer Group代表一组具有相同Group ID的Consumer。
    • 此处创建的用户及密码用于消息队列Kafka版的PLAIN机制的安全校验。
    • Consumer Group用户名一旦创建,无法修改。
    • 如果需要修改Consumer Group用户名,则需要删除该用户,重新创建即可。
    • 目前密码不支持修改,如需更改,可在用户列表区域进行重置即可。

消息测试与查询

在完成订阅对象Topic在Comsumer的相关配置,和在Consumer上配置好消息队列Kafka版的接入信息,用于接收订阅的消息后,您可以通过消息测试功能测试Comsumer是否可以正常消费数据。消息测试与查询
  • 消息测试
    单击消息测试后,您可以进行消息发送与接收测试。消息测试选择待测试的工作空间订阅事件类型,DataWorks会根据您选择的工作空间和事件类型,自动生成一段消息体内容。单击发送测试消息后,DataWorks即会发送一条测试消息,您可在接收消息的应用侧查收消息。
  • 消息查询

    单击消息查询后,即可在弹窗中查看历史测试情况。

Group管理

DataWorks的事件变更消息首先发布至Kafka的Topic中,订阅此类消息的Consumer再通过Kafka的Topic订阅消费DataWorks的变更消息。您可以在此界面对Kafka Consumer group进行管理。group其中核心概念包括:
  • Consumer表示从消息队列Kafka版接收消息的应用。
  • Group代表一组具有相同Group ID的Consumer。
  • Group一旦创建,无法修改,只能删除重新新建。

用户及权限管理

用于给Kafka鉴权使用,如果您需要在本地消费Kafka数据时,您需要为用户授权该Kafka的topic、group权限,以便在用户服务调用时有权限访问该Kafka消息。新建用户单击新建用户,即可在弹窗中新建用户,为用户授权访问的Topic、所属的Group。其中:
  • 此处创建的用户及密码用于消息队列Kafka版的PLAIN机制的安全校验。
  • 如果需要修改Consumer Group和用户名,则需要删除该用户,重新创建即可。
  • 目前密码不支持修改,如需更改,可在用户列表区域进行重置即可。

后续步骤

现在您已经学习了如何开启消息订阅功能以及如何创建消息队列Kafka版的Topic、Consumer Group和用户等信息,请务必妥善保存这些信息,方便后续开发消息订阅。接下来,您将学习如何开发消息订阅,实现自由订阅DataWorks的事件消息。