Oracle Advanced Queuing 之01(异步通知

2020-11-07  本文已影响0人  轻飘飘D

1.環境準備(sys login)

create user aq_admin identified by aq_admin default tablespace test_ud quota unlimited on test_ud;

grant create type to aq_admin;
grant create session   to aq_admin;
grant create table     to aq_admin;
grant create procedure to aq_admin;

grant execute on dbms_aqadm to aq_admin;
grant execute on dbms_aq    to aq_admin;

-------------------------------------------------------------
create user aq_user identified by aq_user default tablespace test_ud quota unlimited on test_ud;

grant create sequence to aq_user;
grant create session to aq_user;
grant create table to aq_user;
grant create procedure to aq_user;
grant execute on dbms_aq to aq_user;
-- This permission is needed for the test case
grant execute on dbms_lock to aq_user;

2.定義發送的對象類型(messages_t ) aq_admin login

-- Create the type for the messages payload.
create or replace type aq_admin.messages_t as object (message varchar2(100 char));

GRANT EXECUTE ON aq_admin.messages_t TO aq_user;

3.1創建測試隊列1(test_queue) aq_admin login

begin
  -- Create a table for queues 
  dbms_aqadm.create_queue_table (queue_table        => 'queues_qt',
                                 queue_payload_type => 'AQ_ADMIN.MESSAGES_T');

  -- Create a test queue
  dbms_aqadm.create_queue (queue_name  => 'test_queue',
                           queue_table => 'queues_qt');
                           
  -- Start the queue for enqueuing and dequeuing messages.                           
  dbms_aqadm.start_queue (queue_name => 'test_queue');    
  
  -- Register the procedure for dequeuing the messages received.
  -- No subscriber is needed
  dbms_aq.register(
    sys.aq$_reg_info_list(
      sys.aq$_reg_info('AQ_ADMIN.TEST_QUEUE',
                       dbms_aq.namespace_aq, 
                       'plsql://AQ_USER.TEST_P.RECEIVE_MESSAGE_CALLBACK',
                       hextoraw('FF'))
                      ),
      1);
end;
/

begin
 DBMS_AQADM.grant_queue_privilege (privilege => 'ALL',queue_name => 'AQ_ADMIN.TEST_QUEUE',grantee => 'AQ_USER',grant_option => FALSE);
end;

3.2 創建測試隊列2(test_topic)aq_admin login

begin
  -- Create a table for queues 
  -- It's a topic, so multiple_consumers parameter is specified.
  dbms_aqadm.create_queue_table (queue_table        => 'topics_qt',
                                 queue_payload_type => 'AQ_ADMIN.MESSAGES_T',
                                 multiple_consumers => true);

  -- Create a test topic
  dbms_aqadm.create_queue (queue_name  => 'test_topic',
                           queue_table => 'topics_qt');
                           
  -- Start the topic for enqueuing and dequeuing messages.                           
  dbms_aqadm.start_queue (queue_name => 'test_topic');    
  
  -- Configure the demo subscriber.
  dbms_aqadm.add_subscriber (queue_name => 'test_topic',
                             subscriber => sys.aq$_agent(name     => 'demo_subscriber',
                                                         address  => null,
                                                         protocol => 0));
                                                         
  -- Register the procedure for dequeuing the messages received.
  dbms_aq.register(
    sys.aq$_reg_info_list(
      sys.aq$_reg_info('AQ_ADMIN.TEST_TOPIC:DEMO_SUBSCRIBER',
                       dbms_aq.namespace_aq, 
                       'plsql://AQ_USER.TEST_P.RECEIVE_MESSAGE_CALLBACK',
                       hextoraw('FF'))
                      ),
      1);
end;
/

begin
 DBMS_AQADM.grant_queue_privilege (privilege => 'ALL',queue_name => 'AQ_ADMIN.TEST_TOPIC',grantee => 'AQ_USER',grant_option => FALSE);
end;

4.创建应用环境下的信息接收表及入队、回调过程 aq_user login

-- Create a table to store the received messages.
create table aq_user.received_messages
(
  received_message_id number              primary key,
  received_timestamp  timestamp           default systimestamp,
  content             varchar2(100 char),
  consumer_name       varchar2(512)
);

-- Create a sequence for id generation.
create sequence aq_user.received_messages_id_s;

-- Create a package with procedures to enqueue and dequeue messages.
create package aq_user.test_p
as
  procedure receive_message_callback (
    context   raw,
    reginfo   sys.aq$_reg_info,
    descr     sys.aq$_descriptor,
    payload   raw,
    payloadl  number);
  
  procedure send_message (
    queue           in varchar,
    message_content in clob);
end;
/

-- Create a package with procedures to enqueue and dequeue messages.
create or replace package body test_p
as
  procedure receive_message_callback (
    context   raw,
    reginfo   sys.aq$_reg_info,
    descr     sys.aq$_descriptor,
    payload   raw,
    payloadl  number)
  is
    r_dequeue_options    dbms_aq.dequeue_options_t;
    r_message_properties dbms_aq.message_properties_t;
    v_message_handle     raw(26);
    o_payload            aq_admin.messages_t;
  begin
    r_dequeue_options.msgid         := descr.msg_id;
    r_dequeue_options.consumer_name := descr.consumer_name;
    dbms_aq.dequeue(queue_name         => descr.queue_name,
                    dequeue_options    => r_dequeue_options,
                    message_properties => r_message_properties,
                    payload            => o_payload,
                    msgid              => v_message_handle);

    insert into received_messages(received_message_id, content,consumer_name)
    values (received_messages_id_s.nextval, o_payload.message,descr.consumer_name);
    commit;
  exception
    when others then
      rollback;
  end receive_message_callback;

  procedure send_message (
    queue           in varchar,
    message_content in clob)
  is
    enq_msgid raw(16);
    eopt      dbms_aq.enqueue_options_t;
    mprop     dbms_aq.message_properties_t;
  begin
    dbms_aq.enqueue(queue_name         => queue,
                    enqueue_options    => eopt,
                    message_properties => mprop,
                    payload            => aq_admin.messages_t(message_content),
                    msgid              => enq_msgid);
  end send_message;
end;
/

grant execute on test_p to aq_admin;

5.用上面3.1,3.2創建的 兩個隊列分別做信息入隊及異步通知後出隊測試 aq_user login

declare
  QUEUE_MESSAGE       constant varchar2(20) := 'Queue test message';
  TOPIC_MESSAGE       constant varchar2(20) := 'Topic test message';
  NUM_TEST_MESSAGES   constant number(2) := 3;
    
  procedure check_num_messages (p_queue             in varchar2, 
                                p_content           in varchar2,
                                p_expected_messages in number)
  as
    l_received_messages number(2) := 0;
  begin
    select count(1) into l_received_messages from received_messages where content = p_content;
    
    if l_received_messages = p_expected_messages then
      dbms_output.put_line (p_queue || ': test passed');      
    else
      dbms_output.put_line (p_queue || ': test failed. Received: ' || 
                            l_received_messages ||
                            ', expected: ' ||
                            p_expected_messages);
    end if;
  exception
    when no_data_found then
      dbms_output.put_line (p_queue || ': test failed. Received: 0' || 
                            l_received_messages ||
                            ', expected: ' ||
                            p_expected_messages);
  end check_num_messages;
begin
  -- Enqueue some test messages
  for i in 1..NUM_TEST_MESSAGES
  loop
    test_p.send_message ('aq_admin.test_queue', QUEUE_MESSAGE);
  end loop;  
  commit;
  
  
  for i in 1..NUM_TEST_MESSAGES
  loop
    test_p.send_message ('aq_admin.test_topic', TOPIC_MESSAGE);
  end loop;  
  commit;
    
  -- Give the system time to process  
  dbms_lock.sleep(5);
  
  -- Verify the results
  check_num_messages ('aq_admin.test_queue', QUEUE_MESSAGE, NUM_TEST_MESSAGES);
  check_num_messages ('aq_admin.test_topic', TOPIC_MESSAGE, NUM_TEST_MESSAGES);
end;
/
aq_admin.test_queue: test passed
aq_admin.test_topic: test passed

6.再次驗證出隊的數據 aq_user login

select to_char(t.received_timestamp,'yyyy-mm-dd hh24:mi:ss') as received_timestamp,t.content,t.consumer_name  
from received_messages t
-----------------------------------------------------------------
    RECEIVED_TIMESTAMP  CONTENT             CONSUMER_NAME
1   2020-11-08 23:45:25 Queue test message  
2   2020-11-08 23:45:25 Queue test message  
3   2020-11-08 23:45:25 Queue test message  
4   2020-11-08 23:45:25 Topic test message  DEMO_SUBSCRIBER
5   2020-11-08 23:45:25 Topic test message  DEMO_SUBSCRIBER
6   2020-11-08 23:45:25 Topic test message  DEMO_SUBSCRIBER



上一篇 下一篇

猜你喜欢

热点阅读