您可以使用DBMS_AQADM包中的存储过程创建并管理消息队列和队列表。

表 1. DBMS_AQADM函数/存储过程
函数/存储过程 返回类型 说明
ALTER_QUEUE N/A 修改现有的队列。
ALTER_QUEUE_TABLE N/A 修改现有的队列表。
CREATE_QUEUE N/A 创建队列。
CREATE_QUEUE_TABLE N/A 创建队列表。
DROP_QUEUE N/A 删除现有队列。
DROP_QUEUE_TABLE N/A 删除现有队列表。
PURGE_QUEUE_TABLE N/A 从队列表中删除一个或多个消息。
START_QUEUE N/A 使队列对于入队和出队过程可用。
STOP_QUEUE N/A 使队列对于入队和出队过程不可用。

PolarDB O引擎支持使用下面列出的参数:

常量 说明 适用参数
DBMS_AQADM.TRANSACTIONAL(1) 此常量已定义,但是,如果使用会返回错误。 message_grouping
DBMS_AQADM.NONE(0) 用于为队列表指定消息分组。 message_grouping
DBMS_AQADM.NORMAL_QUEUE(0) create_queue一起使用来指定queue_type queue_type
DBMS_AQADM.NORMAL_QUEUE(0) create_queue一起使用来指定queue_type queue_type
DBMS_AQADM.INFINITE(-1) create_queue 一起使用来指定retention_time retention_time
DBMS_AQADM.PERSISTENT (0) 此消息应存储在表中。 enqueue_options_t.delivery_mode
DBMS_AQADM.BUFFERED (1) 此常量已定义,但是,如果使用会返回错误。 enqueue_options_t.delivery_mode
DBMS_AQADM.PERSISTENT_OR_BUFFERED (2) 此常量已定义,但是,如果使用会返回错误。 enqueue_options_t.delivery_mode

ALTER_QUEUE

使用ALTER_QUEUE存储过程修改现有队列。语法如下:

ALTER_QUEUE(
  max_retries IN NUMBER DEFAULT NULL,
  retry_delay IN NUMBER DEFAULT 0
  retention_time IN NUMBER DEFAULT 0,
  auto_commit IN BOOLEAN DEFAULT TRUE)
  comment IN VARCHAR2 DEFAULT NULL,

参数

参数 描述
queue_name 新队列的名称。
max_retries 指定使用dequeue语句删除消息的最大尝试次数。

max_retries的值会随着ROLLBACK语句的数量而增加。当失败的尝试次数达到max_retries指定的值之后,消息将移动到异常队列。当指定为0时,表示不允许重试。

retry_delay 指定在ROLLBACK之后计划重新处理消息等待的秒数。默认值为0,表示应立即重试消息。
retention_time 指定消息在出队之后进行存储所经过的时间长度。单位:秒。
说明
  • 指定为默认值0时,表示消息在出队之后不应保留。
  • 指定为INFINITE时,表示永久保留消息。
comment 指定与队列关联的注释。

示例

以下代码块,将 work_order的队列中的retry_delay参数设置为5秒:

EXEC DBMS_AQADM.ALTER_QUEUE(queue_name => 'work_order', retry_delay => 5);

ALTER_QUEUE_TABLE

使用ALTER_QUEUE_TABLE 存储过程修改现有队列表。语法如下:

ALTER_QUEUE_TABLE (
  queue_table IN VARCHAR2,
  comment IN VARCHAR2 DEFAULT NULL,
  primary_instance IN BINARY_INTEGER DEFAULT 0,
  secondary_instance IN BINARY_INTEGER DEFAULT 0,

参数

参数 描述
queue_table 队列表的名称。
comment 使用comment参数可以提供有关队列表的注释。
primary_instance 为了实现兼容性而支持此参数,但被忽略。
secondary_instance 为了实现兼容性而支持此参数,但被忽略。

示例

以下命令修改名为work_order_table 的队列表:

EXEC DBMS_AQADM.ALTER_QUEUE_TABLE
      (queue_table => 'work_order_table', comment => 'This queue table contains work orders for the shipping department.');

CREATE_QUEUE

使用CREATE_QUEUE 存储过程在现有队列表中创建队列。语法如下:

CREATE_QUEUE(
  queue_name IN VARCHAR2
  queue_table IN VARCHAR2,
  queue_type IN BINARY_INTEGER DEFAULT NORMAL_QUEUE,
  max_retries IN NUMBER DEFAULT 5,
  retry_delay IN NUMBER DEFAULT 0
  retention_time IN NUMBER DEFAULT 0,
  dependency_tracking IN BOOLEAN DEFAULT FALSE,
  comment IN VARCHAR2 DEFAULT NULL,
  auto_commit IN BOOLEAN DEFAULT TRUE)

参数

参数 描述
queue_name 新队列的名称。
queue_table 新队列所在的表的名称。
queue_type 新队列的类型。queue_type取值如下:
  • DBMS_AQADM.NORMAL_QUEUE:此值指定普通队列(默认值)。
  • DBMS_AQADM.EXCEPTION_QUEUE:此值指定新队列是异常队列。异常队列仅支持出队操作。
max_retries 指定使用dequeue 语句删除消息的最大尝试次数。dequeue的值随着ROLLBACK语句数量的增加而增加。当失败的尝试次数达到max_retries指定的值之后,消息移动到异常队列。系统表的默认值为0;用户创建的表的默认值为5。
retry_delay 指定在ROLLBACK之后计划重新处理消息等待的秒数。当指定为默认值0时,表示应立即重试消息。
retention_time 指定消息在出队之后进行存储所经过的时间长度。单位:秒。
说明
  • 指定为默认值0时,表示消息在出队之后不应保留。
  • 指定为INFINITE时,表示永久保留消息。
dependency_tracking 为了实现兼容性而支持此参数,但被忽略。
comment 指定与队列关联的注释。
auto_commit 为了实现兼容性而支持此参数,但被忽略。

示例

以下匿名块在work_order_table表中创建名为work_order 的队列:

BEGIN
DBMS_AQADM.CREATE_QUEUE ( queue_name => 'work_order', queue_table => 'work_order_table', comment => 'This queue contains pending work orders.');
END;

CREATE_QUEUE_TABLE

使用CREATE_QUEUE_TABLE 存储过程创建队列表。语法如下:

CREATE_QUEUE_TABLE (
  queue_table IN VARCHAR2,
  queue_payload_type IN VARCHAR2,
  storage_clause IN VARCHAR2 DEFAULT NULL,
  sort_list IN VARCHAR2 DEFAULT NULL,
  multiple_consumers IN BOOLEAN DEFAULT FALSE,
  message_grouping IN BINARY_INTEGER DEFAULT NONE,
  comment IN VARCHAR2 DEFAULT NULL,
  auto_commit IN BOOLEAN DEFAULT TRUE,
  primary_instance IN BINARY_INTEGER DEFAULT 0,
  secondary_instance IN BINARY_INTEGER DEFAULT 0,
  compatible IN VARCHAR2 DEFAULT NULL,
  secure IN BOOLEAN DEFAULT FALSE)

参数

参数 描述
queue_table 队列表的名称。
queue_payload_type 存储在队列表中的用户自定义的数据类型。
说明 如果要指定RAW数据类型,您必须在用户定义类型中创建RAW数据类型。
storage_clause 指定队列表的属性。
  • storage_clause可以是以下一项或多项:
    • TABLESPACE tablespace_name
    • PCTFREE integer
    • PCTUSED integer
    • INITRANS integer
    • MAXTRANS integer
    • STORAGE storage_option
  • storage_option 可以是以下一项或多项:
    • MINEXTENTS integer
    • MAXEXTENTS integer
    • PCTINCREASE integer
    • INITIAL size_clause
    • NEXT
    • FREELISTS integer
    • OPTIMAL size_clause
    • BUFFER_POOL {KEEP|RECYCLE|DEFAULT}
sort_list 控制队列的出队顺序。指定将用于对队列进行排序(升序)的列名称。当前取值如下:
  • enq_time, priority
  • priority, enq_time
  • priority
  • enq_time
multiple_consumers 如果指定,multiple_consumers必须为FALSE。
message_grouping 如果指定,message_grouping必须为 NONE。
comment 提供有关队列表的注释。
auto_commit 为了实现兼容性而支持此参数,但被忽略。
primary_instance 为了实现兼容性而支持此参数,但被忽略。
secondary_instance 为了实现兼容性而支持此参数,但被忽略。
compatible 为了实现兼容性而支持此参数,但被忽略。
secure 为了实现兼容性而支持此参数,但被忽略。

示例

以下匿名块首先创建work_order类型,该类型具有保存名称VARCHAR2的属性,以及项目说明。然后,使用该类型创建队列表:

BEGIN

CREATE TYPE work_order AS (name VARCHAR2, project TEXT, completed BOOLEAN);

EXEC DBMS_AQADM.CREATE_QUEUE_TABLE
      (queue_table => 'work_order_table',
       queue_payload_type => 'work_order',
       comment => 'Work order message queue table');
END;

队列表名为work_order_table,包含类型为work_order的有效负载。注释说明为Work order message queue table

DROP_QUEUE

使用DROP_QUEUE 存储过程可以删除队列。语法如下:

DROP_QUEUE(
  queue_name  IN VARCHAR2,
  auto_commit IN BOOLEAN DEFAULT TRUE)

参数

参数 描述
queue_name 要删除的队列的名称。
auto_commit 为了实现兼容性而支持此参数,但被忽略。

示例

以下匿名块删除名为work_order的队列:

BEGIN
DBMS_AQADM.DROP_QUEUE(queue_name => 'work_order');
END;

DROP_QUEUE_TABLE

使用DROP_QUEUE_TABLE存储过程可以删除队列表。语法如下:

DROP_QUEUE_TABLE(
  queue_table IN VARCHAR2,
  force IN BOOLEAN default FALSE,
  auto_commit IN BOOLEAN default TRUE)

参数

参数 描述
queue_table 队列表的名称。
force 指定定DROP_QUEUE_TABLE命令在删除包含项目的表时的行为:
  • 如果目标表包含项目且force 为FALSE时,则该命令将失败,并且服务器将发出错误。
  • 如果目标表包含项目且force 为 TRUE时,则该命令将删除表以及任何从属对象。
auto_commit 为了实现兼容性而支持此参数,但被忽略。

示例

以下匿名块删除名为work_order_table 的表:

BEGIN
   DBMS_AQADM.DROP_QUEUE_TABLE ('work_order_table', force => TRUE);
END;

PURGE_QUEUE_TABLE

使用PURGE_QUEUE_TABLE存储过程可以从队列表中删除消息。语法如下:

PURGE_QUEUE_TABLE(
  queue_table IN VARCHAR2,
  purge_condition IN VARCHAR2,
  purge_options IN aq$_purge_options_t)

参数

参数 描述
queue_table 指定从中删除消息的队列表的名称。
purge_condition 指定服务器在决定要清除哪些消息时将评估的条件(WHERE子句)。
purge_options purge_options是类型为aq$_purge_options_t的对象。aq$_purge_options_t对象包含内容请参见表 2
表 2. aq$_purge_options_t
属性 类型 说明
Block Boolean 在表中所有队列上是否保有排他锁。
  • 取值为TRUE时,表示表中所有队列上存在排他锁。
  • 取值为FALSE时,表示表中所有队列上不存在排他锁。
delivery_mode INTEGER 指定将清除的消息类型。唯一可接受的值为DBMS_AQ.PERSISTENT

示例

以下匿名块从work_order_table中删除completed 列值为YES的任何消息:

DECLARE
   purge_options dbms_aqadm.aq$_purge_options_t;
BEGIN
   dbms_aqadm.purge_queue_table('work_order_table', 'completed = YES', purge_options);
  END;

START_QUEUE

使用START_QUEUE存储过程使队列可用于排队和取消排队。语法如下:

START_QUEUE(
  queue_name IN VARCHAR2,
  enqueue IN BOOLEAN DEFAULT TRUE,
  dequeue IN BOOLEAN DEFAULT TRUE)

参数

参数 描述
queue_name 指定要启动的队列的名称。
enqueue
  • 指定TRUE(默认)时,启用排队。
  • 指定FALSE时,保持当前设置不变
dequeue
  • 指定TRUE(默认)时,取消排队。
  • 指定FALSE时,保持当前设置不变

示例

以下匿名块使名为work_order的队列可用于排队:

BEGIN
DBMS_AQADM.START_QUEUE
(queue_name => 'work_order);
END;

STOP_QUEUE

使用STOP_QUEUE存储过程在指定队列中禁用排队或取消排队。语法如下:

STOP_QUEUE(
  queue_name IN VARCHAR2,
  enqueue IN BOOLEAN DEFAULT TRUE,
  dequeue IN BOOLEAN DEFAULT TRUE,
  wait IN BOOLEAN DEFAULT TRUE)

参数

参数 描述
queue_name 指定要停止的队列的名称。
enqueue
  • 指定TRUE(默认)时,禁用排队。
  • 指定FALSE时,保持当前设置不变
dequeue
  • 指定TRUE(默认)时,禁用出队。
  • 指定FALSE时,保持当前设置不变
wait
  • 指定为TRUE时,表示服务器等待任何未完成的事务完成,然后再应用指定的更改。在等待停止队列时,在指定队列中不允许任何事务排队或取消排队。
  • 指定为FALSE时,立即停止队列。

示例

以下匿名块在名为work_order的队列中禁用排队和取消排队:

BEGIN
DBMS_AQADM.STOP_QUEUE(queue_name =>'work_order', enqueue=>TRUE, dequeue=>TRUE, wait=>TRUE);
END;

排队和取消排队将在任何未完成的事务完成之后停止。