您可以使用DBMS_AQADM包中的存储过程创建并管理消息队列和队列表。

说明

使用DBMS_AQADM包中的存储过程创建和管理消息队列和队列表,需要使用超级用户操作,如您有相应需求,请联系我们处理。

表 1. DBMS_AQADM函数/存储过程

函数/存储过程

返回类型

说明

ALTER_QUEUE

N/A

修改现有的队列。

ALTER_QUEUE_TABLE

N/A

修改现有的队列表。

CREATE_QUEUE

N/A

创建队列。

CREATE_QUEUE_TABLE

N/A

创建队列表。

DROP_QUEUE

N/A

删除现有队列。

DROP_QUEUE_TABLE

N/A

删除现有队列表。

PURGE_QUEUE_TABLE

N/A

从队列表中删除一个或多个消息。

START_QUEUE

N/A

使队列对于入队和出队过程可用。

STOP_QUEUE

N/A

使队列对于入队和出队过程不可用。

PolarDB PostgreSQL版(兼容Oracle)支持使用下面列出的参数:

常量

说明

适用参数

DBMS_AQADM.TRANSACTIONAL(1)

此常量已定义,但是,如果使用会返回错误。

message_grouping

DBMS_AQADM.NONE(0)

用于为队列表指定消息分组。

message_grouping

DBMS_AQADM.NORMAL_QUEUE(0)

create_queue一起使用来指定queue_type

queue_type

DBMS_AQADM.NORMAL_QUEUE(0)

create_queue一起使用来指定queue_type

queue_type

DBMS_AQADM.INFINITE(-1)

create_queue 一起使用来指定retention_time

retention_time

DBMS_AQADM.PERSISTENT (0)

此消息应存储在表中。

enqueue_options_t.delivery_mode

DBMS_AQADM.BUFFERED (1)

此常量已定义,但是,如果使用会返回错误。

enqueue_options_t.delivery_mode

DBMS_AQADM.PERSISTENT_OR_BUFFERED (2)

此常量已定义,但是,如果使用会返回错误。

enqueue_options_t.delivery_mode

使用说明

ALTER_QUEUE

使用ALTER_QUEUE存储过程修改现有队列。

语法

ALTER_QUEUE(
  max_retries IN NUMBER DEFAULT NULL,
  retry_delay IN NUMBER DEFAULT 0
  retention_time IN NUMBER DEFAULT 0,
  auto_commit IN BOOLEAN DEFAULT TRUE)
  comment IN VARCHAR2 DEFAULT NULL,

参数

参数

描述

queue_name

新队列的名称。

max_retries

指定使用dequeue语句删除消息的最大尝试次数。

max_retries的值会随着ROLLBACK语句的数量而增加。当失败的尝试次数达到max_retries指定的值之后,消息将移动到异常队列。当指定为0时,表示不允许重试。

retry_delay

指定在ROLLBACK之后计划重新处理消息等待的秒数。默认值为0,表示应立即重试消息。

retention_time

指定消息在出队之后进行存储所经过的时间长度。单位:秒。

说明
  • 指定为默认值0时,表示消息在出队之后不应保留。

  • 指定为INFINITE时,表示长期保留消息。

comment

指定与队列关联的注释。

示例

以下代码块,将 work_order的队列中的retry_delay参数设置为5秒:

EXEC DBMS_AQADM.ALTER_QUEUE(queue_name => 'work_order', retry_delay => 5);

ALTER_QUEUE_TABLE

使用ALTER_QUEUE_TABLE 存储过程修改现有队列表。

语法

ALTER_QUEUE_TABLE (
  queue_table IN VARCHAR2,
  comment IN VARCHAR2 DEFAULT NULL,
  primary_instance IN BINARY_INTEGER DEFAULT 0,
  secondary_instance IN BINARY_INTEGER DEFAULT 0,

参数

参数

描述

queue_table

队列表的名称。

comment

使用comment参数可以提供有关队列表的注释。

primary_instance

为了实现兼容性而支持此参数,可忽略。

secondary_instance

为了实现兼容性而支持此参数,可忽略。

示例

以下命令修改名为work_order_table 的队列表:

EXEC DBMS_AQADM.ALTER_QUEUE_TABLE
      (queue_table => 'work_order_table', comment => 'This queue table contains work orders for the shipping department.');

CREATE_QUEUE

使用CREATE_QUEUE 存储过程在现有队列表中创建队列。

语法

CREATE_QUEUE(
  queue_name IN VARCHAR2
  queue_table IN VARCHAR2,
  queue_type IN BINARY_INTEGER DEFAULT NORMAL_QUEUE,
  max_retries IN NUMBER DEFAULT 5,
  retry_delay IN NUMBER DEFAULT 0
  retention_time IN NUMBER DEFAULT 0,
  dependency_tracking IN BOOLEAN DEFAULT FALSE,
  comment IN VARCHAR2 DEFAULT NULL,
  auto_commit IN BOOLEAN DEFAULT TRUE)

参数

参数

描述

queue_name

新队列的名称。

queue_table

新队列所在的表的名称。

queue_type

新队列的类型。queue_type取值如下:

  • DBMS_AQADM.NORMAL_QUEUE:此值指定普通队列(默认值)。

  • DBMS_AQADM.EXCEPTION_QUEUE:此值指定新队列是异常队列。异常队列仅支持出队操作。

max_retries

指定使用dequeue 语句删除消息的最大尝试次数。dequeue的值随着ROLLBACK语句数量的增加而增加。当失败的尝试次数达到max_retries指定的值之后,消息移动到异常队列。系统表的默认值为0;用户创建的表的默认值为5。

retry_delay

指定在ROLLBACK之后计划重新处理消息等待的秒数。当指定为默认值0时,表示应立即重试消息。

retention_time

指定消息在出队之后进行存储所经过的时间长度。单位:秒。

说明
  • 指定为默认值0时,表示消息在出队之后不应保留。

  • 指定为INFINITE时,表示长期保留消息。

dependency_tracking

为了实现兼容性而支持此参数,但被忽略。

comment

指定与队列关联的注释。

auto_commit

为了实现兼容性而支持此参数,可忽略。

示例

以下匿名块在work_order_table表中创建名为work_order 的队列:

BEGIN
DBMS_AQADM.CREATE_QUEUE ( queue_name => 'work_order', queue_table => 'work_order_table', comment => 'This queue contains pending work orders.');
END;

CREATE_QUEUE_TABLE

使用CREATE_QUEUE_TABLE 存储过程创建队列表。

语法

CREATE_QUEUE_TABLE (
  queue_table IN VARCHAR2,
  queue_payload_type IN VARCHAR2,
  storage_clause IN VARCHAR2 DEFAULT NULL,
  sort_list IN VARCHAR2 DEFAULT NULL,
  multiple_consumers IN BOOLEAN DEFAULT FALSE,
  message_grouping IN BINARY_INTEGER DEFAULT NONE,
  comment IN VARCHAR2 DEFAULT NULL,
  auto_commit IN BOOLEAN DEFAULT TRUE,
  primary_instance IN BINARY_INTEGER DEFAULT 0,
  secondary_instance IN BINARY_INTEGER DEFAULT 0,
  compatible IN VARCHAR2 DEFAULT NULL,
  secure IN BOOLEAN DEFAULT FALSE)

参数

参数

描述

queue_table

队列表的名称。

queue_payload_type

存储在队列表中的用户自定义的数据类型。

说明

如果要指定RAW数据类型,您必须在用户定义类型中创建RAW数据类型。

storage_clause

指定队列表的属性。

  • storage_clause可以是以下一项或多项:

    • TABLESPACE tablespace_name

    • PCTFREE integer

    • PCTUSED integer

    • INITRANS integer

    • MAXTRANS integer

    • STORAGE storage_option

  • storage_option 可以是以下一项或多项:

    • MINEXTENTS integer

    • MAXEXTENTS integer

    • PCTINCREASE integer

    • INITIAL size_clause

    • NEXT

    • FREELISTS integer

    • OPTIMAL size_clause

    • BUFFER_POOL {KEEP|RECYCLE|DEFAULT}

sort_list

控制队列的出队顺序。指定将用于对队列进行排序(升序)的列名称。当前取值如下:

  • enq_time, priority

  • priority, enq_time

  • priority

  • enq_time

multiple_consumers

如果指定,multiple_consumers必须为FALSE。

message_grouping

如果指定,message_grouping必须为 NONE。

comment

提供有关队列表的注释。

auto_commit

为了实现兼容性而支持此参数,可忽略。

primary_instance

为了实现兼容性而支持此参数,可忽略。

secondary_instance

为了实现兼容性而支持此参数,可忽略。

compatible

为了实现兼容性而支持此参数,可忽略。

secure

为了实现兼容性而支持此参数,可忽略。

示例

以下匿名块首先创建work_order类型,该类型具有保存名称VARCHAR2的属性,以及项目说明。然后,使用该类型创建队列表:

BEGIN

CREATE TYPE work_order AS (name VARCHAR2, project TEXT, completed BOOLEAN);

EXEC DBMS_AQADM.CREATE_QUEUE_TABLE
      (queue_table => 'work_order_table',
       queue_payload_type => 'work_order',
       comment => 'Work order message queue table');
END;

队列表名为work_order_table,包含类型为work_order的有效负载。注释说明为Work order message queue table

DROP_QUEUE

使用DROP_QUEUE 存储过程可以删除队列。

语法

DROP_QUEUE(
  queue_name  IN VARCHAR2,
  auto_commit IN BOOLEAN DEFAULT TRUE)

参数

参数

描述

queue_name

要删除的队列的名称。

auto_commit

为了实现兼容性而支持此参数,但被忽略。

示例

以下匿名块删除名为work_order的队列:

BEGIN
DBMS_AQADM.DROP_QUEUE(queue_name => 'work_order');
END;

DROP_QUEUE_TABLE

使用DROP_QUEUE_TABLE存储过程可以删除队列表。

语法

DROP_QUEUE_TABLE(
  queue_table IN VARCHAR2,
  force IN BOOLEAN default FALSE,
  auto_commit IN BOOLEAN default TRUE)

参数

参数

描述

queue_table

队列表的名称。

force

指定DROP_QUEUE_TABLE命令在删除包含项目的表时的行为:

  • 如果目标表包含项目且force 为FALSE时,则该命令将失败,并且服务器将发出错误。

  • 如果目标表包含项目且force 为 TRUE时,则该命令将删除表以及任何从属对象。

auto_commit

为了实现兼容性而支持此参数,但被忽略。

示例

以下匿名块删除名为work_order_table 的表:

BEGIN
   DBMS_AQADM.DROP_QUEUE_TABLE ('work_order_table', force => TRUE);
END;

PURGE_QUEUE_TABLE

使用PURGE_QUEUE_TABLE存储过程可以从队列表中删除消息。

语法

PURGE_QUEUE_TABLE(
  queue_table IN VARCHAR2,
  purge_condition IN VARCHAR2,
  purge_options IN aq$_purge_options_t)

参数

参数

描述

queue_table

指定从中删除消息的队列表的名称。

purge_condition

指定服务器在决定要清除哪些消息时将评估的条件(WHERE子句)。

purge_options

purge_options是类型为aq$_purge_options_t的对象。aq$_purge_options_t对象包含内容请参见aq$_purge_options_t

表 2. aq$_purge_options_t

属性

类型

说明

Block

Boolean

在表中所有队列上是否保有排他锁。

  • 取值为TRUE时,表示表中所有队列上存在排他锁。

  • 取值为FALSE时,表示表中所有队列上不存在排他锁。

delivery_mode

INTEGER

指定将清除的消息类型。唯一可接受的值为DBMS_AQ.PERSISTENT

示例

以下匿名块从work_order_table中删除completed 列值为YES的任何消息:

DECLARE
   purge_options dbms_aqadm.aq$_purge_options_t;
BEGIN
   dbms_aqadm.purge_queue_table('work_order_table', 'completed = YES', purge_options);
  END;

START_QUEUE

使用START_QUEUE存储过程使队列可用于排队和取消排队。

语法

START_QUEUE(
  queue_name IN VARCHAR2,
  enqueue IN BOOLEAN DEFAULT TRUE,
  dequeue IN BOOLEAN DEFAULT TRUE)

参数

参数

描述

queue_name

指定要启动的队列的名称。

enqueue

  • 指定TRUE(默认)时,启用排队。

  • 指定FALSE时,保持当前设置不变。

dequeue

  • 指定TRUE(默认)时,取消排队。

  • 指定FALSE时,保持当前设置不变。

示例

以下匿名块使名为work_order的队列可用于排队:

BEGIN
DBMS_AQADM.START_QUEUE
(queue_name => 'work_order);
END;

STOP_QUEUE

使用STOP_QUEUE存储过程在指定队列中禁用排队或取消排队。

语法

STOP_QUEUE(
  queue_name IN VARCHAR2,
  enqueue IN BOOLEAN DEFAULT TRUE,
  dequeue IN BOOLEAN DEFAULT TRUE,
  wait IN BOOLEAN DEFAULT TRUE)

参数

参数

描述

queue_name

指定要停止的队列的名称。

enqueue

  • 指定TRUE(默认)时,禁用排队。

  • 指定FALSE时,保持当前设置不变。

dequeue

  • 指定TRUE(默认)时,禁用出队。

  • 指定FALSE时,保持当前设置不变。

wait

  • 指定为TRUE时,表示服务器等待任何未完成的事务完成,然后再应用指定的更改。在等待停止队列时,在指定队列中不允许任何事务排队或取消排队。

  • 指定为FALSE时,立即停止队列。

示例

以下匿名块在名为work_order的队列中禁用排队和取消排队:

BEGIN
DBMS_AQADM.STOP_QUEUE(queue_name =>'work_order', enqueue=>TRUE, dequeue=>TRUE, wait=>TRUE);
END;

排队和取消排队将在任何未完成的事务完成之后停止。