您可以使用DBMS_AQ
包中的存储过程添加消息到队列、从队列中删除消息、注册或注销PL/SQL回调存储过程。
PolarDB通过以下SQL命令为
DBMS_AQ
包提供扩展功能:
ALTER QUEUE
ALTER QUEUE TABLE
CREATE QUEUE
CREATE QUEUE TABLE
DROP QUEUE
DROP QUEUE TABLE
函数/存储过程 | 返回类型 | 说明 |
---|---|---|
ENQUEUE | N/A | 发布消息到队列。 |
DEQUEUE | N/A | 如果有消息可用或者在消息可用时,从队列检索消息。 |
REGISTER | N/A | 注册回调过程。 |
UNREGISTER | N/A | 注销回调过程。 |
PolarDB支持使用下表列出的常量:
常量 | 说明 | 适用参数 |
---|---|---|
DBMS_AQ.BROWSE (0) | 读取消息而不锁定。 | dequeue_options_t.dequeue_mode |
DBMS_AQ.LOCKED (1) | 常量,必须指定在PL/SQL常量包的范围内的常量。 | dequeue_options_t.dequeue_mode |
DBMS_AQ.REMOVE (2) | 读取之后删除消息,该参数为默认值。 | dequeue_options_t.dequeue_mode |
DBMS_AQ.REMOVE_NODATA (3) | 常量,必须指定在PL/SQL常量包的范围内的常量。 | dequeue_options_t.dequeue_mode |
DBMS_AQ.FIRST_MESSAGE (0) | 返回与搜索词匹配的第一个可用消息。 | dequeue_options_t.navigation |
DBMS_AQ.NEXT_MESSAGE (1) | 返回与搜索词匹配的下一个可用消息。 | dequeue_options_t.navigation |
DBMS_AQ.NEXT_TRANSACTION (2) | 常量,必须指定在PL/SQL常量包的范围内的常量。 | dequeue_options_t.navigation |
DBMS_AQ.FOREVER (0) | 如果找不到与搜索词匹配的消息,则持续等待,该参数为默认值。 | dequeue_options_t.wait |
DBMS_AQ.NO_WAIT (1) | 如果找不到与搜索词匹配的消息,则不等待。 | dequeue_options_t.wait |
DBMS_AQ.ON_COMMIT (0) | 常量,必须指定在PL/SQL常量包的范围内的常量。 | enqueue_options_t.visibility,dequeue_options_t.visibility |
DBMS_AQ.IMMEDIATE (1) | 常量,必须指定在PL/SQL常量包的范围内的常量。 | enqueue_options_t.visibility,dequeue_options_t.visibility |
DBMS_AQ.PERSISTENT (0) | 此消息应存储在表中。 | enqueue_options_t.delivery_mode |
DBMS_AQ.BUFFERED (1) | 常量,必须指定在PL/SQL常量包的范围内的常量。 | enqueue_options_t.delivery_mode |
DBMS_AQ.READY (0) | 指定消息已经准备好进行处理。 | message_properties_t.state |
DBMS_AQ.WAITING (1) | 指定消息正在等待处理。 | message_properties_t.state |
DBMS_AQ.PROCESSED (2) | 指定消息已处理。 | message_properties_t.state |
DBMS_AQ.EXPIRED (3) | 指定消息处于异常队列中。 | message_properties_t.state |
DBMS_AQ.NO_DELAY (0) | 常量,必须指定在PL/SQL常量包的范围内的常量。 | message_properties_t.delay |
DBMS_AQ.NEVER (NULL) | 常量,必须指定在PL/SQL常量包的范围内的常量。 | message_properties_t.expiration |
DBMS_AQ.NAMESPACE_AQ (0) | 接收来自DBMS_AQ队列的通知。 | sys.aq$_reg_info.namespace |
DBMS_AQ.NAMESPACE_ANONYMOUS (1) | 常量,必须指定在PL/SQL常量包的范围内的常量。 | sys.aq$_reg_info.namespace |
ENQUEUE
ENQUEUE
存储过程将一个条目添加到队列。语法如下:ENQUEUE(
queue_name IN VARCHAR2,
enqueue_options IN DBMS_AQ.ENQUEUE_OPTIONS_T,
message_properties IN DBMS_AQ.MESSAGE_PROPERTIES_T,
payload IN <type_name>,
msgid OUT RAW)
参数
queue_name
现有队列的名称(可能是schema限定的)。说明- 如果您省略schema名称,服务器将使用在SEARCH_PATH中指定的schema。
- 使用特殊字符或者区分大小写的名称时,请添加双引号。
enqueue_options
enqueue_options
是类型为enqueue_options_t
的值:
目前,DBMS_AQ.ENQUEUE_OPTIONS_T IS RECORD( visibility BINARY_INTEGER DEFAULT ON_COMMIT, relative_msgid RAW(16) DEFAULT NULL, sequence_deviation BINARY INTEGER DEFAULT NULL, transformation VARCHAR2(61) DEFAULT NULL, delivery_mode PLS_INTEGER NOT NULL DEFAULT PERSISTENT);
enqueue_options_t
仅支持下表中的参数值:参数名称 缺省值 visibility ON_COMMIT delivery_mode PERSISTENT sequence_deviation NULL transformation NULL relative_msgid NULL message_properties
message_properties
是类型为message_properties_t
的值:message_properties_t IS RECORD( priority INTEGER, delay INTEGER, expiration INTEGER, correlation CHARACTER VARYING(128) COLLATE pg_catalog.”C”, attempts INTEGER, recipient_list“AQ$_RECIPIENT_LIST_T”, exception_queue CHARACTER VARYING(61) COLLATE pg_catalog.”C”, enqueue_time TIMESTAMP WITHOUT TIME ZONE, state INTEGER, original_msgid BYTEA, transaction_group CHARACTER VARYING(30) COLLATE pg_catalog.”C”, delivery_mode INTEGER DBMS_AQ.PERSISTENT);
message_properties_t
支持的参数值如下:参数 说明 priority 如果队列表定义包括sort_list并引用了priority,则此参数影响消息出队的顺序。较低的值表示较高的出队优先级。 delay 指定消息可用于出队之前将经过的秒数,或者NO_DELAY。 expiration 使用expiration参数指定消息过期的秒数。 correlation 使用 correlation参数指定与条目关联的消息,默认值为NULL。 attempts 系统维护的值,指定消息出队的尝试次数。 exception_queue 使用exception_queue参数指定异常队列的名称,如果有消息过期或者回退太多次数的事务出队,则将消息移动到该队列中。 enqueue_time 记录添加到队列的时间。 state 此参数由DBMS_AQ指定,状态取值如下: - DBMS_AQ.READY : 未达到延迟。
- DBMS_AQ.WAITING : 队列条目已准备好处理。
- DBMS_AQ.PROCESSED :队列条目已处理。
- DBMS_AQ.EXPIRED : 队列条目已移动到异常队列。
original_msgid 为了实现兼容性而支持此参数。 transaction_group 为了实现兼容性而支持此参数。 payload
使用
payload
参数提供将与队列条目关联的数据。有效负载类型必须与创建对应的队列表时指定的类型匹配(详情请参见CREATE_QUEUE_TABLE)。msgid
使用
msgid
参数检索唯一(系统生成的)消息标识符。
示例
以下匿名块通过调用
DBMS_AQ.ENQUEUE
,将消息添加到名为work_order的队列:DECLARE
enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle raw(16);
payload work_order;
BEGIN
payload := work_order('Smith', 'system upgrade');
DBMS_AQ.ENQUEUE(
queue_name => 'work_order',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => payload,
msgid => message_handle
);
END;
DEQUEUE
DEQUEUE
存储过程让消息出队。语法如下:DEQUEUE(
queue_name IN VARCHAR2,
dequeue_options IN DBMS_AQ.DEQUEUE_OPTIONS_T,
message_properties OUT DBMS_AQ.MESSAGE_PROPERTIES_T,
payload OUT type_name,
msgid OUT RAW)
参数
queue_name
现有队列的名称(可能是schema限定的)。说明- 如果您省略schema名称,则服务器将使用在SEARCH_PATH中指定的schema。
- 如果要使用特殊字符或者区分大小写的名称,请添加双引号。
dequeue_options
dequeue _options
是类型为dequeue_options_t
的值:
目前,DEQUEUE_OPTIONS_T IS RECORD( consumer_name CHARACTER VARYING(30), dequeue_mode INTEGER, navigation INTEGER, visibility INTEGER, wait INTEGER, msgid BYTEA, correlation CHARACTER VARYING(128), deq_condition CHARACTER VARYING(4000), transformation CHARACTER VARYING(61), delivery_mode INTEGER);
dequeue_options_t
支持的参数值为:参数 说明 consumer_name 必须为NULL。 dequeue_mode 出队操作的锁定行为。取值如下: - DBMS_AQ.BROWSE:读取消息而不获取锁定。
- DBMS_AQ.LOCKED: 获取锁定之后读取消息。
- DBMS_AQ.REMOVE:删除消息之前读取消息。
- DBMS_AQ.REMOVE_NODATA:读取消息,但不删除消息。
navigation 标识将检索的消息。取值如下: - FIRST_MESSAGE:队列中与搜索词匹配的第一条消息。
- NEXT_MESSAGE:与第一个词语匹配的下一条可用消息。
visibility 必须为ON_COMMIT, 如果您回退当前事务,出队项目将保持在队列中。 wait 必须为大于0的数字,或者设置为以下参数: - DBMS_AQ.FOREVER:无限期等待。
- DBMS_AQ.NO_WAIT:不等待。
msgid 出队消息的ID。 correlation 为了实现兼容性而提供的参数。 deq_condition 一个VARCHAR2表达式,取值为BOOLEAN值,表示消息是否应出队。 transformation 为了实现兼容性而提供的参数。 delivery_mode 必须为PERSISTENT,此模式下不支持缓冲的消息。 message_properties
message_properties
是类型为message_properties_t
的值:message_properties_t IS RECORD( priority INTEGER, delay INTEGER, expiration INTEGER, correlation CHARACTER VARYING(128) COLLATE pg_catalog.”C”, attempts INTEGER, recipient_list“AQ$_RECIPIENT_LIST_T”, exception_queue CHARACTER VARYING(61) COLLATE pg_catalog.”C”, enqueue_time TIMESTAMP WITHOUT TIME ZONE, state INTEGER, original_msgid BYTEA, transaction_group CHARACTER VARYING(30) COLLATE pg_catalog.”C”, delivery_mode INTEGER DBMS_AQ.PERSISTENT);
message_properties_t
支持的参数值为:参数 说明 priority 如果队列表定义包括sort_list并引用了priority,则此参数影响消息出队的顺序。较低的值表示较高的出队优先级。 delay 指定消息出队或NO_DELAY之前经过的秒数。 expiration 指定消息过期的秒数。 correlation 指定与条目关联的消息,默认值为NULL。 attempts 系统维护的值,指定消息出队的尝试次数。 exception_queue 指定异常队列的名称,如果有消息过期或者回退太多次数的事务出队,则将消息移动到该队列中。 enqueue_time 记录添加到队列的时间。 state 此参数由DBMS_AQ指定,状态取值如下: - DBMS_AQ.WAITING:未达到延迟。
- DBMS_AQ.READY:队列条目已准备好处理。
- DBMS_AQ.PROCESSED:队列条目已处理。
- DBMS_AQ.EXPIRED:队列条目已移动到异常队列。
original_msgid 为了实现兼容性而提供的参数。 transaction_group 为了实现兼容性而提供的参数。 delivery_mode 不支持此参数,指定DBMS_AQ.PERSISTENT的值。 payload
使用
payload
参数检索具有出队操作的消息的有效负载。有效负载类型必须与创建队列表时指定的类型匹配。msgid
使用
msgid
参数检索唯一消息标识符。
示例
以下匿名块通过调用
DBMS_AQ.DEQUEUE
,从队列和有效负载中检索消息:DECLARE
dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle raw(16);
payload work_order;
BEGIN
dequeue_options.dequeue_mode := DBMS_AQ.BROWSE;
DBMS_AQ.DEQUEUE(
queue_name => 'work_queue',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => payload,
msgid => message_handle
);
DBMS_OUTPUT.PUT_LINE(
'The next work order is [' || payload.subject || '].'
);
END;
有效负载由DBMS_OUTPUT.PUT_LINE
显示。
REGISTER
使用REGISTER存储过程用于在消息入队或出队时接收通知。语法如下:
REGISTER(
reg_list IN SYS.AQ$_REG_INFO_LIST,
count IN NUMBER)
参数
reg_list
reg_list
是类型为AQ$_REG_INFO_LIST
的列表,提供有关您要注册的各种订阅信息。列表中每个条目的类型都是AQ$_REG_INFO
,包含的属性有:属性 类型 说明 name VARCHAR2 (128) 订阅的名称(可能是 schema 限定的)。 namespace NUMERIC 唯一支持的值为DBMS_AQ.NAMESPACE_AQ (0)。 callback VARCHAR2 (4000) 描述对通知执行的操作。目前,仅支持调用PL/SQL存储过程。调用应采取以下形式: plsql://schema.procedure
其中:- schema:指定存储过程所在的schema。
- procedure:指定待通知的存储过程的名称。
context RAW (16) 回调存储过程需要的用户定义的值。 count
count
是reg_list
中的条目数。
示例
以下匿名块通过调用
DBMS_AQ.REGISTER
注册存储过程,用于在队列中添加或删除项目时接收通知。为在DECLARE部分标识的每个订阅信息提供一组属性(类型为 sys.aq$_reg_info
):DECLARE
subscription1 sys.aq$_reg_info;
subscription2 sys.aq$_reg_info;
subscription3 sys.aq$_reg_info;
subscriptionlist sys.aq$_reg_info_list;
BEGIN
subscription1 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://assign_worker?PR=0',HEXTORAW('FFFF'));
subscription2 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://add_to_history?PR=1',HEXTORAW('FFFF'));
subscription3 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://reserve_parts?PR=2',HEXTORAW('FFFF'));
subscriptionlist := sys.aq$_reg_info_list(subscription1, subscription2, subscription3);
dbms_aq.register(subscriptionlist, 3);
commit;
END;
/
subscriptionlist
的类型为sys.aq$_reg_info_list
,包含以前描述的sys.aq$_reg_info
对象。列表名称和对象计数传递到dbms_aq.register
。
UNREGISTER
使用
UNREGISTER
存储过程关闭与入队和出队相关的通知。语法如下:UNREGISTER(
reg_list IN SYS.AQ$_REG_INFO_LIST,
count
IN NUMBER)
参数
reg_list
reg_list
是类型为AQ$_REG_INFO_LIST
的列表,提供有关您要注册的各个订阅信息。列表中每个条目的类型都是AQ$_REG_INFO
,包含的属性有:属性 类型 说明 name VARCHAR2 (128) 订阅的名称(可能是schema限定的)。 namespace NUMERIC 唯一支持的值为DBMS_AQ.NAMESPACE_AQ (0)。 callback VARCHAR2 (4000) 描述对通知执行的操作。目前,仅支持调用PL/SQL存储过程。调用应采取以下形式: plsql://schema.procedure
其中:- schema:指定存储过程所在的schema。
- procedure:指定将通知的存储过程的名称。
context RAW (16) 该存储过程需要的任何用户定义的值。 count
count
是reg_list
中的条目数。
示例
以下匿名块通过调用DBMS_AQ.UNREGISTER
关闭示例中DBMS_AQ.REGISTER
指定的通知:
subscriptionlist
的类型为sys.aq$_reg_info_list
,包含之前描述的sys.aq$_reg_info
对象、列表名称和对象数量将传递到dbms_aq.unregister
。DECLARE
subscription1 sys.aq$_reg_info;
subscription2 sys.aq$_reg_info;
subscription3 sys.aq$_reg_info;
subscriptionlist sys.aq$_reg_info_list;
BEGIN
subscription1 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://assign_worker?PR=0',HEXTORAW('FFFF'));
subscription2 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://add_to_history?PR=1',HEXTORAW('FFFF'));
subscription3 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://reserve_parts?PR=2',HEXTORAW('FFFF'));
subscriptionlist := sys.aq$_reg_info_list(subscription1, subscription2, subscription3);
dbms_aq.unregister(subscriptionlist, 3);
commit;
END;
/