Oracle Advanced Queuing 之05(多订阅人

2020-11-14  本文已影响0人  轻飘飘D

背景
这里假设一个简单的需求, 某单位有三种角色, Level1, Level2, Level3, 消息也分三种类型, p1(正常), p2(紧急), pn(机密). 不同角色仅仅关心某些信息,关系矩阵如下

----- p1 p2 pn
level1 -
level2 -
level3

下面写了几个demo来实现这个需求, 流程如下, 很简单
建立消息载体, 这里使用object(aq_admin.aq_msg_typ)
建立queue table和queue
建立 subscriber (即consumer)
Enqueue, dequeue

1.clean up queue if exists(可选) aq_admin login

--删除队列
declare
  l_qt varchar2(30):= 'aq_msg_qtab4';
  l_q varchar2(30):='aq_msg_queue4';
begin
  dbms_aqadm.stop_queue(queue_name => l_q);
  dbms_aqadm.drop_queue(queue_name => l_q);
  dbms_aqadm.drop_queue_table(queue_table => l_qt);
end;

2.創建隊列及隊列表且启动之 aq_admin login

begin
--Create a table for queues 
dbms_aqadm.create_queue_table(queue_table=>'aq_msg_qtab4'
,queue_payload_type=>'AQ_ADMIN.AQ_MSG_TYPE'
,multiple_consumers => true  --多消费者
,comment => 'queue for aq_msg_queue4'
);

  --Create a test queue
  dbms_aqadm.create_queue(
   queue_name  => 'aq_msg_queue4'
  ,queue_table => 'aq_msg_qtab4'
  ,queue_type => sys.dbms_aqadm.normal_queue
  ,max_retries => 3 --dequeue失败后重试次数
  ,retry_delay => 1 --重试前等待
  ,retention_time => 0 --dequeue后保持时间,不保持
);
                           
  --Start the queue                           
  dbms_aqadm.start_queue (queue_name => 'aq_msg_queue4');      
end;
/

begin
 dbms_aqadm.grant_queue_privilege (privilege => 'ALL',queue_name => 'AQ_ADMIN.AQ_MSG_QUEUE4',grantee => 'AQ_USER',grant_option => FALSE);
end;

3.根据不同需求建立订阅关系, 实现了背景需求中的矩阵 aq_admin login

--add subscribers
declare
  l_q   varchar2(30) := 'aq_msg_queue4';
  l_level1 sys.aq$_agent := sys.aq$_agent(name => 'level1', address  => null, protocol => 0);
  l_level2 sys.aq$_agent := sys.aq$_agent(name => 'level2', address  => null, protocol => 0);
  l_level3 sys.aq$_agent := sys.aq$_agent(name => 'level3', address  => null, protocol => 0);                            
begin
  -- remove them
  begin
    dbms_aqadm.remove_subscriber(queue_name => l_q, subscriber => l_level1);
    dbms_aqadm.remove_subscriber(queue_name => l_q, subscriber => l_level2);
    dbms_aqadm.remove_subscriber(queue_name => l_q, subscriber => l_level3);
  exception
    when others then
      null;
  end;
 
  --level1无条件收取所有消息(仅仅是demo,实际也是应该设置条件) 
  --levle1 receives all events (not including those with specified recipents)
  dbms_aqadm.add_subscriber(queue_name => l_q, subscriber => l_level1);
 
  --ldr需要关心urgent和secret类型的消息, 所以设置了一个基于消息类型内容的规则
  --level2 is a rule-based subscriber, who concerns p2 and p3
  dbms_aqadm.add_subscriber(queue_name => l_q,
                            subscriber => l_level2,
                            rule       => 'tab.user_data.msg_level in (''p2'',''p3'')');
 
  --pm也设置了一个rule, 仅仅关心secret
  -- pm is a rule-based subscriber, who just concerns SECRET
  dbms_aqadm.add_subscriber(queue_name => l_q,
                            subscriber => l_level3,
                            rule       => 'tab.user_data.msg_level in (''p3'')');
end;

4.生成消息, 并且入队 aq_user login

declare
  v_queue_name varchar2(50);
  v_enqueue_options     dbms_aq.enqueue_options_t;
  v_message_properties  dbms_aq.message_properties_t;
  v_message_handle      raw(16);
  v_aq_msg              aq_admin.aq_msg_type;
  l_rcpt_list           dbms_aq.aq$_recipient_list_t;
begin
  v_queue_name:='aq_admin.aq_msg_queue4';
  v_aq_msg := aq_admin.aq_msg_type (11,'-','-','p1','content priority 1',sysdate);
  v_enqueue_options.visibility :=dbms_aq.immediate;
  dbms_aq.enqueue(
      queue_name => v_queue_name
      ,enqueue_options => v_enqueue_options
      ,message_properties => v_message_properties
      ,payload => v_aq_msg
      ,msgid => v_message_handle);
  dbms_output.put_line('encode success,msgid is ' || v_message_handle);
  commit;
  
  dbms_lock.sleep(1);
  v_aq_msg := aq_admin.aq_msg_type (12,'-','-','p2','content priority 2',sysdate);
  v_enqueue_options.visibility :=dbms_aq.immediate;
  dbms_aq.enqueue(
      queue_name => v_queue_name
      ,enqueue_options => v_enqueue_options
      ,message_properties => v_message_properties
      ,payload => v_aq_msg
      ,msgid => v_message_handle);
  dbms_output.put_line('encode success,msgid is ' || v_message_handle);
  commit;
  
  dbms_lock.sleep(1);
  --这里有个特殊处理: 因为level1没有设置条件,所以level1可以收取pn消息. 且level2也可以收取pn
  --但是我只想level3收到此消息,所以这消息入队的时候指定了recipient
  -- top pn event only visible to level3, by using speicified recipients
  v_aq_msg := aq_admin.aq_msg_type (13,'-','-','p3','content priority 3',sysdate);

  l_rcpt_list(0) := sys.aq$_agent(name => 'level3', address  => null, protocol => 0); 
  v_message_properties.recipient_list := l_rcpt_list;

  v_enqueue_options.visibility :=dbms_aq.immediate;
  dbms_aq.enqueue(
      queue_name => v_queue_name
      ,enqueue_options => v_enqueue_options
      ,message_properties => v_message_properties
      ,payload => v_aq_msg
      ,msgid => v_message_handle);
  dbms_output.put_line('encode success,msgid is ' || v_message_handle);
  commit;
end; 

5.收取一个指定Consumer的所有消息. 这是一个主动获取的过程(Pull), 订阅者并没有得到通知 (aq_user login)

declare
 v_queue_name varchar2(50) :='aq_admin.aq_msg_queue4';
 v_aq_msg             aq_admin.aq_msg_type;
 v_message_handle RAW(16);
 v_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
 v_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
 l_consumer varchar2(30) := 'level3';
 l_more     boolean := true;
 no_messages exception;
 pragma exception_init(no_messages, -25228);
begin
 v_dequeue_options.consumer_name := l_consumer ;
 v_dequeue_options.navigation    := dbms_aq.first_message;
 v_dequeue_options.wait          := dbms_aq.no_wait;

 while (l_more) loop
 begin
   dbms_aq.dequeue(queue_name => v_queue_name,dequeue_options => v_dequeue_options,message_properties => v_message_properties,payload => v_aq_msg,msgid => v_message_handle);
   commit;
   dbms_output.put_line('--------------------------------------- ');
   dbms_output.put_line('msg_seq :'         || v_aq_msg.msg_seq);
   dbms_output.put_line('msg_sender :'      || v_aq_msg.msg_sender);
   dbms_output.put_line('msg_receiver :'    || v_aq_msg.msg_receiver);
   dbms_output.put_line('msg_level :'       || v_aq_msg.msg_level);
   v_dequeue_options.navigation := dbms_aq.next_message;
  exception when no_messages then
        l_more := false;
  end;
  end loop;
end; 
----------------输出如下-------------------
msg_seq :13
msg_sender :-
msg_receiver :-
msg_level :p3

select count(1)  from aq_msg_qtab4
-------------------------------------------------
2

6.清空消息队列(aq_admin login)

DECLARE
  v_options sys.dbms_aqadm.aq$_purge_options_t;
  l_qt varchar2(30):= 'aq_msg_qtab4';
  l_q varchar2(30):='aq_msg_queue4';
BEGIN
  SYS.DBMS_AQADM.STOP_QUEUE(QUEUE_NAME => l_q);
  dbms_aqadm.purge_queue_table(l_qt, NULL, v_options);
  SYS.DBMS_AQADM.START_QUEUE(QUEUE_NAME => l_q);
END;

7.修改回调过程(aq_user login)

    增加如下 3 个 过程
    AQ_USER.CALL_BACK_PCK.AQ_MSG_QUEUE41
    AQ_USER.CALL_BACK_PCK.AQ_MSG_QUEUE42
    AQ_USER.CALL_BACK_PCK.AQ_MSG_QUEUE43

8.注册回调 aq_admin login

begin                                                          
  -- Register the procedure for dequeuing the messages received.
  dbms_aq.register(
    sys.aq$_reg_info_list(
      sys.aq$_reg_info('AQ_ADMIN.AQ_MSG_QUEUE4:LEVEL1',dbms_aq.namespace_aq, 
                       'plsql://AQ_USER.CALL_BACK_PCK.AQ_MSG_QUEUE41',hextoraw('FF'))
                      ),
      1);

 dbms_aq.register(
    sys.aq$_reg_info_list(
      sys.aq$_reg_info('AQ_ADMIN.AQ_MSG_QUEUE4:LEVEL2',dbms_aq.namespace_aq, 
                       'plsql://AQ_USER.CALL_BACK_PCK.AQ_MSG_QUEUE42',hextoraw('FF'))
                      ),
      1);

 dbms_aq.register(
    sys.aq$_reg_info_list(
      sys.aq$_reg_info('AQ_ADMIN.AQ_MSG_QUEUE4:LEVEL3',dbms_aq.namespace_aq, 
                       'plsql://AQ_USER.CALL_BACK_PCK.AQ_MSG_QUEUE43',hextoraw('FF'))
                      ),
      1);
end;

9.入队测试(aq_user)

重复上面的步骤4

10.验证数据

select t.aq_msg_seq,t.queue_name,t.consumer_name,t.content.msg_content  
from aq_msg_received t order by 3,1
---------------------------------------------------------------------------------------------------
AQ_MSG_SEQ  QUEUE_NAME                  CONSUMER_NAME           CONTENT.MSG_CONTENT
8           "AQ_ADMIN"."AQ_MSG_QUEUE4"  aq_msg_queue41->LEVEL1  content priority 1
10          "AQ_ADMIN"."AQ_MSG_QUEUE4"  aq_msg_queue41->LEVEL1  content priority 2
9           "AQ_ADMIN"."AQ_MSG_QUEUE4"  aq_msg_queue42->LEVEL2  content priority 2
11          "AQ_ADMIN"."AQ_MSG_QUEUE4"  aq_msg_queue43->LEVEL3  content priority 3

上一篇下一篇

猜你喜欢

热点阅读