您可以使用DBMS_AQ包中的存储过程添加消息到队列、从队列中删除消息、注册或注销PL/SQL回调存储过程。

PolarDB通过以下SQL命令为DBMS_AQ包提供扩展功能:
  • ALTER QUEUE
  • ALTER QUEUE TABLE
  • CREATE QUEUE
  • CREATE QUEUE TABLE
  • DROP QUEUE
  • DROP QUEUE TABLE
表 1. DBMS_AQ 函数/存储过程
函数/存储过程 返回类型 说明
ENQUEUE N/A 发布消息到队列。
DEQUEUE N/A 如果有消息可用或者在消息可用时,从队列检索消息。
REGISTER N/A 注册回调过程。
UNREGISTER N/A 注销回调过程。
PolarDB支持使用下表列出的常量:
常量 说明 适用参数
DBMS_AQ.BROWSE (0) 读取消息而不锁定。 dequeue_options_t.dequeue_mode
DBMS_AQ.LOCKED (1) 常量,必须指定在PL/SQL常量包的范围内的常量。 dequeue_options_t.dequeue_mode
DBMS_AQ.REMOVE (2) 读取之后删除消息,该参数为默认值。 dequeue_options_t.dequeue_mode
DBMS_AQ.REMOVE_NODATA (3) 常量,必须指定在PL/SQL常量包的范围内的常量。 dequeue_options_t.dequeue_mode
DBMS_AQ.FIRST_MESSAGE (0) 返回与搜索词匹配的第一个可用消息。 dequeue_options_t.navigation
DBMS_AQ.NEXT_MESSAGE (1) 返回与搜索词匹配的下一个可用消息。 dequeue_options_t.navigation
DBMS_AQ.NEXT_TRANSACTION (2) 常量,必须指定在PL/SQL常量包的范围内的常量。 dequeue_options_t.navigation
DBMS_AQ.FOREVER (0) 如果找不到与搜索词匹配的消息,则持续等待,该参数为默认值。 dequeue_options_t.wait
DBMS_AQ.NO_WAIT (1) 如果找不到与搜索词匹配的消息,则不等待。 dequeue_options_t.wait
DBMS_AQ.ON_COMMIT (0) 常量,必须指定在PL/SQL常量包的范围内的常量。 enqueue_options_t.visibility,dequeue_options_t.visibility
DBMS_AQ.IMMEDIATE (1) 常量,必须指定在PL/SQL常量包的范围内的常量。 enqueue_options_t.visibility,dequeue_options_t.visibility
DBMS_AQ.PERSISTENT (0) 此消息应存储在表中。 enqueue_options_t.delivery_mode
DBMS_AQ.BUFFERED (1) 常量,必须指定在PL/SQL常量包的范围内的常量。 enqueue_options_t.delivery_mode
DBMS_AQ.READY (0) 指定消息已经准备好进行处理。 message_properties_t.state
DBMS_AQ.WAITING (1) 指定消息正在等待处理。 message_properties_t.state
DBMS_AQ.PROCESSED (2) 指定消息已处理。 message_properties_t.state
DBMS_AQ.EXPIRED (3) 指定消息处于异常队列中。 message_properties_t.state
DBMS_AQ.NO_DELAY (0) 常量,必须指定在PL/SQL常量包的范围内的常量。 message_properties_t.delay
DBMS_AQ.NEVER (NULL) 常量,必须指定在PL/SQL常量包的范围内的常量。 message_properties_t.expiration
DBMS_AQ.NAMESPACE_AQ (0) 接收来自DBMS_AQ队列的通知。 sys.aq$_reg_info.namespace
DBMS_AQ.NAMESPACE_ANONYMOUS (1) 常量,必须指定在PL/SQL常量包的范围内的常量。 sys.aq$_reg_info.namespace

ENQUEUE

ENQUEUE存储过程将一个条目添加到队列。语法如下:
ENQUEUE(
  queue_name IN VARCHAR2,
  enqueue_options IN DBMS_AQ.ENQUEUE_OPTIONS_T,
  message_properties IN DBMS_AQ.MESSAGE_PROPERTIES_T,
  payload IN <type_name>,
  msgid OUT RAW)
参数
  • queue_name
    现有队列的名称(可能是schema限定的)。
    说明
    • 如果您省略schema名称,服务器将使用在SEARCH_PATH中指定的schema。
    • 使用特殊字符或者区分大小写的名称时,请添加双引号。
  • enqueue_options
    enqueue_options是类型为enqueue_options_t的值:
    DBMS_AQ.ENQUEUE_OPTIONS_T IS RECORD(
      visibility BINARY_INTEGER DEFAULT ON_COMMIT,
      relative_msgid RAW(16) DEFAULT NULL,
      sequence_deviation BINARY INTEGER DEFAULT NULL,
      transformation VARCHAR2(61) DEFAULT NULL,
      delivery_mode PLS_INTEGER NOT NULL DEFAULT PERSISTENT);
    目前,enqueue_options_t仅支持下表中的参数值:
    参数名称 缺省值
    visibility ON_COMMIT
    delivery_mode PERSISTENT
    sequence_deviation NULL
    transformation NULL
    relative_msgid NULL
  • message_properties
    message_properties 是类型为message_properties_t的值:
          message_properties_t IS RECORD(
        priority INTEGER,
        delay INTEGER,
        expiration INTEGER,
        correlation CHARACTER VARYING(128) COLLATE pg_catalog.”C”,
        attempts INTEGER,
        recipient_list“AQ$_RECIPIENT_LIST_T”,
        exception_queue CHARACTER VARYING(61) COLLATE pg_catalog.”C”,
        enqueue_time TIMESTAMP WITHOUT TIME ZONE,
          state INTEGER,
         original_msgid BYTEA,
          transaction_group CHARACTER VARYING(30) COLLATE pg_catalog.”C”,
          delivery_mode INTEGER
        DBMS_AQ.PERSISTENT);
    message_properties_t支持的参数值如下:
    参数 说明
    priority 如果队列表定义包括sort_list并引用了priority,则此参数影响消息出队的顺序。较低的值表示较高的出队优先级。
    delay 指定消息可用于出队之前将经过的秒数,或者NO_DELAY。
    expiration 使用expiration参数指定消息过期的秒数。
    correlation 使用 correlation参数指定与条目关联的消息,默认值为NULL。
    attempts 系统维护的值,指定消息出队的尝试次数。
    exception_queue 使用exception_queue参数指定异常队列的名称,如果有消息过期或者回退太多次数的事务出队,则将消息移动到该队列中。
    enqueue_time 记录添加到队列的时间。
    state 此参数由DBMS_AQ指定,状态取值如下:
    • DBMS_AQ.READY : 未达到延迟。
    • DBMS_AQ.WAITING : 队列条目已准备好处理。
    • DBMS_AQ.PROCESSED :队列条目已处理。
    • DBMS_AQ.EXPIRED : 队列条目已移动到异常队列。
    original_msgid 为了实现兼容性而支持此参数。
    transaction_group 为了实现兼容性而支持此参数。
  • payload

    使用 payload参数提供将与队列条目关联的数据。有效负载类型必须与创建对应的队列表时指定的类型匹配(详情请参见CREATE_QUEUE_TABLE)。

  • msgid

    使用msgid 参数检索唯一(系统生成的)消息标识符。

示例

以下匿名块通过调用DBMS_AQ.ENQUEUE,将消息添加到名为work_order的队列:
DECLARE

  enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
  message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  message_handle     raw(16);
  payload            work_order;

BEGIN

  payload := work_order('Smith', 'system upgrade');

DBMS_AQ.ENQUEUE(
  queue_name         => 'work_order',
  enqueue_options    => enqueue_options,
  message_properties => message_properties,
  payload            => payload,
  msgid              => message_handle
    );
 END;

DEQUEUE

DEQUEUE存储过程让消息出队。语法如下:
DEQUEUE(
  queue_name IN VARCHAR2,
  dequeue_options IN DBMS_AQ.DEQUEUE_OPTIONS_T,
  message_properties OUT DBMS_AQ.MESSAGE_PROPERTIES_T,
  payload OUT type_name,
  msgid OUT RAW)
参数
  • queue_name
    现有队列的名称(可能是schema限定的)。
    说明
    • 如果您省略schema名称,则服务器将使用在SEARCH_PATH中指定的schema。
    • 如果要使用特殊字符或者区分大小写的名称,请添加双引号。
  • dequeue_options
    dequeue _options是类型为dequeue_options_t的值:
    DEQUEUE_OPTIONS_T IS RECORD(
      consumer_name CHARACTER VARYING(30),
      dequeue_mode INTEGER,
      navigation INTEGER,
      visibility INTEGER,
      wait INTEGER,
      msgid BYTEA,
      correlation CHARACTER VARYING(128),
      deq_condition CHARACTER VARYING(4000),
      transformation CHARACTER VARYING(61),
      delivery_mode INTEGER);
    目前,dequeue_options_t支持的参数值为:
    参数 说明
    consumer_name 必须为NULL。
    dequeue_mode 出队操作的锁定行为。取值如下:
    • DBMS_AQ.BROWSE:读取消息而不获取锁定。
    • DBMS_AQ.LOCKED: 获取锁定之后读取消息。
    • DBMS_AQ.REMOVE:删除消息之前读取消息。
    • DBMS_AQ.REMOVE_NODATA:读取消息,但不删除消息。
    navigation 标识将检索的消息。取值如下:
    • FIRST_MESSAGE:队列中与搜索词匹配的第一条消息。
    • NEXT_MESSAGE:与第一个词语匹配的下一条可用消息。
    visibility 必须为ON_COMMIT, 如果您回退当前事务,出队项目将保持在队列中。
    wait 必须为大于0的数字,或者设置为以下参数:
    • DBMS_AQ.FOREVER:无限期等待。
    • DBMS_AQ.NO_WAIT:不等待。
    msgid 出队消息的ID。
    correlation 为了实现兼容性而提供的参数。
    deq_condition 一个VARCHAR2表达式,取值为BOOLEAN值,表示消息是否应出队。
    transformation 为了实现兼容性而提供的参数。
    delivery_mode 必须为PERSISTENT,此模式下不支持缓冲的消息。
  • message_properties
    message_properties是类型为message_properties_t的值:
        message_properties_t IS RECORD(
        priority INTEGER,
        delay INTEGER,
        expiration INTEGER,
        correlation CHARACTER VARYING(128) COLLATE pg_catalog.”C”,
        attempts INTEGER,
        recipient_list“AQ$_RECIPIENT_LIST_T”,
        exception_queue CHARACTER VARYING(61) COLLATE pg_catalog.”C”,
        enqueue_time TIMESTAMP WITHOUT TIME ZONE,
        state INTEGER,
        original_msgid BYTEA,
        transaction_group CHARACTER VARYING(30) COLLATE pg_catalog.”C”,
        delivery_mode INTEGER
      DBMS_AQ.PERSISTENT);
    message_properties_t支持的参数值为:
    参数 说明
    priority 如果队列表定义包括sort_list并引用了priority,则此参数影响消息出队的顺序。较低的值表示较高的出队优先级。
    delay 指定消息出队或NO_DELAY之前经过的秒数。
    expiration 指定消息过期的秒数。
    correlation 指定与条目关联的消息,默认值为NULL。
    attempts 系统维护的值,指定消息出队的尝试次数。
    exception_queue 指定异常队列的名称,如果有消息过期或者回退太多次数的事务出队,则将消息移动到该队列中。
    enqueue_time 记录添加到队列的时间。
    state 此参数由DBMS_AQ指定,状态取值如下:
    • DBMS_AQ.WAITING:未达到延迟。
    • DBMS_AQ.READY:队列条目已准备好处理。
    • DBMS_AQ.PROCESSED:队列条目已处理。
    • DBMS_AQ.EXPIRED:队列条目已移动到异常队列。
    original_msgid 为了实现兼容性而提供的参数。
    transaction_group 为了实现兼容性而提供的参数。
    delivery_mode 不支持此参数,指定DBMS_AQ.PERSISTENT的值。
  • payload

    使用payload参数检索具有出队操作的消息的有效负载。有效负载类型必须与创建队列表时指定的类型匹配。

  • msgid

    使用msgid参数检索唯一消息标识符。

示例

以下匿名块通过调用DBMS_AQ.DEQUEUE,从队列和有效负载中检索消息:
DECLARE

  dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T;
  message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  message_handle     raw(16);
  payload            work_order;

BEGIN
  dequeue_options.dequeue_mode := DBMS_AQ.BROWSE;

  DBMS_AQ.DEQUEUE(
    queue_name         => 'work_queue',
    dequeue_options    => dequeue_options,
    message_properties => message_properties,
    payload            => payload,
    msgid              => message_handle
  );

  DBMS_OUTPUT.PUT_LINE(
  'The next work order is [' || payload.subject || '].'
  );
END;
有效负载由DBMS_OUTPUT.PUT_LINE显示。

REGISTER

使用REGISTER存储过程用于在消息入队或出队时接收通知。语法如下:
REGISTER(
  reg_list IN SYS.AQ$_REG_INFO_LIST,
  count IN NUMBER)
参数
  • reg_list
    reg_list是类型为AQ$_REG_INFO_LIST的列表,提供有关您要注册的各种订阅信息。列表中每个条目的类型都是AQ$_REG_INFO,包含的属性有:
    属性 类型 说明
    name VARCHAR2 (128) 订阅的名称(可能是 schema 限定的)。
    namespace NUMERIC 唯一支持的值为DBMS_AQ.NAMESPACE_AQ (0)。
    callback VARCHAR2 (4000) 描述对通知执行的操作。目前,仅支持调用PL/SQL存储过程。调用应采取以下形式:plsql://schema.procedure其中:
    • schema:指定存储过程所在的schema。
    • procedure:指定待通知的存储过程的名称。
    context RAW (16) 回调存储过程需要的用户定义的值。
  • count

    countreg_list中的条目数。

示例

以下匿名块通过调用DBMS_AQ.REGISTER注册存储过程,用于在队列中添加或删除项目时接收通知。为在DECLARE部分标识的每个订阅信息提供一组属性(类型为 sys.aq$_reg_info):
DECLARE
   subscription1 sys.aq$_reg_info;
   subscription2 sys.aq$_reg_info;
   subscription3 sys.aq$_reg_info;
   subscriptionlist sys.aq$_reg_info_list;
BEGIN
   subscription1 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://assign_worker?PR=0',HEXTORAW('FFFF'));
   subscription2 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://add_to_history?PR=1',HEXTORAW('FFFF'));
   subscription3 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://reserve_parts?PR=2',HEXTORAW('FFFF'));

   subscriptionlist := sys.aq$_reg_info_list(subscription1, subscription2, subscription3);
   dbms_aq.register(subscriptionlist, 3);
   commit;
  END;
   /
subscriptionlist的类型为sys.aq$_reg_info_list,包含以前描述的sys.aq$_reg_info对象。列表名称和对象计数传递到dbms_aq.register

UNREGISTER

使用UNREGISTER存储过程关闭与入队和出队相关的通知。语法如下:
UNREGISTER(
  reg_list IN SYS.AQ$_REG_INFO_LIST,
  count
IN NUMBER)
参数
  • reg_list

    reg_list是类型为AQ$_REG_INFO_LIST的列表,提供有关您要注册的各个订阅信息。列表中每个条目的类型都是AQ$_REG_INFO,包含的属性有:

    属性 类型 说明
    name VARCHAR2 (128) 订阅的名称(可能是schema限定的)。
    namespace NUMERIC 唯一支持的值为DBMS_AQ.NAMESPACE_AQ (0)。
    callback VARCHAR2 (4000) 描述对通知执行的操作。目前,仅支持调用PL/SQL存储过程。调用应采取以下形式:plsql://schema.procedure其中:
    • schema:指定存储过程所在的schema。
    • procedure:指定将通知的存储过程的名称。
    context RAW (16) 该存储过程需要的任何用户定义的值。
  • count

    countreg_list中的条目数。

示例

以下匿名块通过调用DBMS_AQ.UNREGISTER关闭示例中DBMS_AQ.REGISTER指定的通知:

subscriptionlist的类型为sys.aq$_reg_info_list,包含之前描述的sys.aq$_reg_info对象、列表名称和对象数量将传递到dbms_aq.unregister
DECLARE
   subscription1 sys.aq$_reg_info;
   subscription2 sys.aq$_reg_info;
   subscription3 sys.aq$_reg_info;
   subscriptionlist sys.aq$_reg_info_list;
BEGIN
   subscription1 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://assign_worker?PR=0',HEXTORAW('FFFF'));
   subscription2 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://add_to_history?PR=1',HEXTORAW('FFFF'));
   subscription3 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://reserve_parts?PR=2',HEXTORAW('FFFF'));

   subscriptionlist := sys.aq$_reg_info_list(subscription1, subscription2, subscription3);
   dbms_aq.unregister(subscriptionlist, 3);
   commit;
  END;
   /