Oracle Advanced Queuing 之01(异步通知
2020-11-07 本文已影响0人
轻飘飘D
1.環境準備(sys login)
create user aq_admin identified by aq_admin default tablespace test_ud quota unlimited on test_ud;
grant create type to aq_admin;
grant create session to aq_admin;
grant create table to aq_admin;
grant create procedure to aq_admin;
grant execute on dbms_aqadm to aq_admin;
grant execute on dbms_aq to aq_admin;
-------------------------------------------------------------
create user aq_user identified by aq_user default tablespace test_ud quota unlimited on test_ud;
grant create sequence to aq_user;
grant create session to aq_user;
grant create table to aq_user;
grant create procedure to aq_user;
grant execute on dbms_aq to aq_user;
-- This permission is needed for the test case
grant execute on dbms_lock to aq_user;
2.定義發送的對象類型(messages_t ) aq_admin login
-- Create the type for the messages payload.
create or replace type aq_admin.messages_t as object (message varchar2(100 char));
GRANT EXECUTE ON aq_admin.messages_t TO aq_user;
3.1創建測試隊列1(test_queue) aq_admin login
begin
-- Create a table for queues
dbms_aqadm.create_queue_table (queue_table => 'queues_qt',
queue_payload_type => 'AQ_ADMIN.MESSAGES_T');
-- Create a test queue
dbms_aqadm.create_queue (queue_name => 'test_queue',
queue_table => 'queues_qt');
-- Start the queue for enqueuing and dequeuing messages.
dbms_aqadm.start_queue (queue_name => 'test_queue');
-- Register the procedure for dequeuing the messages received.
-- No subscriber is needed
dbms_aq.register(
sys.aq$_reg_info_list(
sys.aq$_reg_info('AQ_ADMIN.TEST_QUEUE',
dbms_aq.namespace_aq,
'plsql://AQ_USER.TEST_P.RECEIVE_MESSAGE_CALLBACK',
hextoraw('FF'))
),
1);
end;
/
begin
DBMS_AQADM.grant_queue_privilege (privilege => 'ALL',queue_name => 'AQ_ADMIN.TEST_QUEUE',grantee => 'AQ_USER',grant_option => FALSE);
end;
3.2 創建測試隊列2(test_topic)aq_admin login
begin
-- Create a table for queues
-- It's a topic, so multiple_consumers parameter is specified.
dbms_aqadm.create_queue_table (queue_table => 'topics_qt',
queue_payload_type => 'AQ_ADMIN.MESSAGES_T',
multiple_consumers => true);
-- Create a test topic
dbms_aqadm.create_queue (queue_name => 'test_topic',
queue_table => 'topics_qt');
-- Start the topic for enqueuing and dequeuing messages.
dbms_aqadm.start_queue (queue_name => 'test_topic');
-- Configure the demo subscriber.
dbms_aqadm.add_subscriber (queue_name => 'test_topic',
subscriber => sys.aq$_agent(name => 'demo_subscriber',
address => null,
protocol => 0));
-- Register the procedure for dequeuing the messages received.
dbms_aq.register(
sys.aq$_reg_info_list(
sys.aq$_reg_info('AQ_ADMIN.TEST_TOPIC:DEMO_SUBSCRIBER',
dbms_aq.namespace_aq,
'plsql://AQ_USER.TEST_P.RECEIVE_MESSAGE_CALLBACK',
hextoraw('FF'))
),
1);
end;
/
begin
DBMS_AQADM.grant_queue_privilege (privilege => 'ALL',queue_name => 'AQ_ADMIN.TEST_TOPIC',grantee => 'AQ_USER',grant_option => FALSE);
end;
4.创建应用环境下的信息接收表及入队、回调过程 aq_user login
-- Create a table to store the received messages.
create table aq_user.received_messages
(
received_message_id number primary key,
received_timestamp timestamp default systimestamp,
content varchar2(100 char),
consumer_name varchar2(512)
);
-- Create a sequence for id generation.
create sequence aq_user.received_messages_id_s;
-- Create a package with procedures to enqueue and dequeue messages.
create package aq_user.test_p
as
procedure receive_message_callback (
context raw,
reginfo sys.aq$_reg_info,
descr sys.aq$_descriptor,
payload raw,
payloadl number);
procedure send_message (
queue in varchar,
message_content in clob);
end;
/
-- Create a package with procedures to enqueue and dequeue messages.
create or replace package body test_p
as
procedure receive_message_callback (
context raw,
reginfo sys.aq$_reg_info,
descr sys.aq$_descriptor,
payload raw,
payloadl number)
is
r_dequeue_options dbms_aq.dequeue_options_t;
r_message_properties dbms_aq.message_properties_t;
v_message_handle raw(26);
o_payload aq_admin.messages_t;
begin
r_dequeue_options.msgid := descr.msg_id;
r_dequeue_options.consumer_name := descr.consumer_name;
dbms_aq.dequeue(queue_name => descr.queue_name,
dequeue_options => r_dequeue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle);
insert into received_messages(received_message_id, content,consumer_name)
values (received_messages_id_s.nextval, o_payload.message,descr.consumer_name);
commit;
exception
when others then
rollback;
end receive_message_callback;
procedure send_message (
queue in varchar,
message_content in clob)
is
enq_msgid raw(16);
eopt dbms_aq.enqueue_options_t;
mprop dbms_aq.message_properties_t;
begin
dbms_aq.enqueue(queue_name => queue,
enqueue_options => eopt,
message_properties => mprop,
payload => aq_admin.messages_t(message_content),
msgid => enq_msgid);
end send_message;
end;
/
grant execute on test_p to aq_admin;
5.用上面3.1,3.2創建的 兩個隊列分別做信息入隊及異步通知後出隊測試 aq_user login
declare
QUEUE_MESSAGE constant varchar2(20) := 'Queue test message';
TOPIC_MESSAGE constant varchar2(20) := 'Topic test message';
NUM_TEST_MESSAGES constant number(2) := 3;
procedure check_num_messages (p_queue in varchar2,
p_content in varchar2,
p_expected_messages in number)
as
l_received_messages number(2) := 0;
begin
select count(1) into l_received_messages from received_messages where content = p_content;
if l_received_messages = p_expected_messages then
dbms_output.put_line (p_queue || ': test passed');
else
dbms_output.put_line (p_queue || ': test failed. Received: ' ||
l_received_messages ||
', expected: ' ||
p_expected_messages);
end if;
exception
when no_data_found then
dbms_output.put_line (p_queue || ': test failed. Received: 0' ||
l_received_messages ||
', expected: ' ||
p_expected_messages);
end check_num_messages;
begin
-- Enqueue some test messages
for i in 1..NUM_TEST_MESSAGES
loop
test_p.send_message ('aq_admin.test_queue', QUEUE_MESSAGE);
end loop;
commit;
for i in 1..NUM_TEST_MESSAGES
loop
test_p.send_message ('aq_admin.test_topic', TOPIC_MESSAGE);
end loop;
commit;
-- Give the system time to process
dbms_lock.sleep(5);
-- Verify the results
check_num_messages ('aq_admin.test_queue', QUEUE_MESSAGE, NUM_TEST_MESSAGES);
check_num_messages ('aq_admin.test_topic', TOPIC_MESSAGE, NUM_TEST_MESSAGES);
end;
/
aq_admin.test_queue: test passed
aq_admin.test_topic: test passed
6.再次驗證出隊的數據 aq_user login
select to_char(t.received_timestamp,'yyyy-mm-dd hh24:mi:ss') as received_timestamp,t.content,t.consumer_name
from received_messages t
-----------------------------------------------------------------
RECEIVED_TIMESTAMP CONTENT CONSUMER_NAME
1 2020-11-08 23:45:25 Queue test message
2 2020-11-08 23:45:25 Queue test message
3 2020-11-08 23:45:25 Queue test message
4 2020-11-08 23:45:25 Topic test message DEMO_SUBSCRIBER
5 2020-11-08 23:45:25 Topic test message DEMO_SUBSCRIBER
6 2020-11-08 23:45:25 Topic test message DEMO_SUBSCRIBER