Добро пожаловать в форум, Guest  >>   Войти | Регистрация | Поиск | Правила | В избранное | Подписаться
Все форумы / Oracle Новый топик    Ответить
 Прослушивание очереди сообщений DBMS_AQ  [new]
BaurzhanSH
Member

Откуда:
Сообщений: 3
Всем привет, есть небольшая проблема с включением прослушивателя. Никак не могу понять в чем причиина. Может кто то подскажет. Выполняю следющие шаги

1. Создаю тип который буду использовать в queue

CREATE OR REPLACE
TYPE XG_USER.AQ_SMS_TYPE AS OBJECT
( login VARCHAR2(30),
password VARCHAR2(80),
msg_body VARCHAR2(100),
destinationadress VARCHAR2(30),
datetosend VARCHAR2(100),
msgid number(32),
fromdate varchar2(30),
todate varchar2(30),
date_ins date
);
/

2. Создаю queue table, queue, have started queue,создаю agent,добавляю subscriber

exec dbms_aqadm.CREATE_QUEUE_TABLE(queue_table=>'aq_test_sms', queue_payload_type=>'aq_sms_type', multiple_consumers=>TRUE, comment=>'Для тестирования очереди смс');
/
exec DBMS_AQADM.CREATE_QUEUE(queue_name=>'test_queue_sms', queue_table=>'aq_test_sms')
/
exec dbms_aqadm.START_QUEUE('test_queue_sms', true, true)
/
exec dbms_aqadm.create_aq_agent('test_sms')
/
exec dbms_aqadm.ADD_SUBSCRIBER('test_queue_sms', sys.aq$_agent( 'recipient', null, null ))
/

3.Создаю временную таблицу в которую буду считывать queue

CREATE SEQUENCE XG_USER.AQ_SEC
START WITH 1
MAXVALUE 999999999999999999999999999
MINVALUE 1
NOCYCLE
CACHE 20
NOORDER
/

CREATE TABLE AQ_TEST_SMS_READ
(
ID NUMBER(32),
LOGIN VARCHAR2(30 BYTE),
PASSWORD VARCHAR2(80 BYTE),
MSG_BODY VARCHAR2(100 BYTE),
DESTINATIONADRESS VARCHAR2(30 BYTE),
DATETOSEND VARCHAR2(100 BYTE),
MSGID NUMBER(32),
FROMDATE VARCHAR2(30 BYTE),
TODATE VARCHAR2(30 BYTE),
DATE_INS DATE
)

CREATE OR REPLACE TRIGGER tr_aq_test_sms_read
before insert on aq_test_sms_read
for each row
declare
n number;
begin
select AQ_SEC_sms.nextval
into n
from dual;

:new.date_ins := sysdate;
:new.id := n;
end;
/

4. Создаю процедуру для добавления сообщения в очередь

CREATE OR REPLACE PROCEDURE add_message(p_login VARCHAR2,
p_password VARCHAR2,
p_msg_body VARCHAR2,
p_destinationadress VARCHAR2,
p_datetosend VARCHAR2,
p_msgid NUMBER,
p_fromdate VARCHAR2,
p_todate VARCHAR2,
p_queue_name VARCHAR2 DEFAULT 'scott.test_queue_sms')
IS
msg aq_sms_type;
msg_id RAW(16);
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
BEGIN
msg := aq_sms_type( login => p_login,
password => p_password,
msg_body => p_msg_body,
destinationadress => p_destinationadress,
datetosend => p_datetosend,
msgid => p_msgid,
fromdate => p_fromdate,
todate => p_todate,
date_ins => sysdate);


message_properties.delay := dbms_aq.no_delay;
enqueue_options.visibility := dbms_aq.on_commit;
enqueue_options.visibility := dbms_aq.immediate;

dbms_aq.enqueue(queue_name => p_queue_name,
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => msg,
msgid => msg_id);

END add_message;

5. Создаю процедуру прослушивания и для считывания из очереди во временную таблицу

--Прослушивание сообщений
CREATE OR REPLACE PROCEDURE SCOTT.listen_for_messages_sms
IS
--для прослушивания
agent sys.aq$_agent;
test_agent_list DBMS_AQ.aq$_agent_list_t;

--для выборки сообщений
res aq_sms_type;
msid RAW(16);
deq_options dbms_aq.DEQUEUE_OPTIONS_T;
mes_props dbms_aq.message_properties_t;



BEGIN
test_agent_list(1) := sys.aq$_agent('recipient', 'scott.test_queue_sms', NULL);

deq_options.dequeue_mode := dbms_aq.remove;
deq_options.visibility := dbms_aq.on_commit;
deq_options.consumer_name := 'recipient';
deq_options.wait := dbms_aq.forever;

DBMS_AQ.LISTEN(
agent_list => test_agent_list,
wait => dbms_aq.forever,
agent => agent);


dbms_aq.dequeue(queue_name=>'scott.test_queue_sms', dequeue_options=>deq_options, message_properties=>mes_props, payload=>res, msgid=>msid);

INSERT INTO aq_test_sms_read (LOGIN, PASSWORD, MSG_BODY, DESTINATIONADRESS, DATETOSEND, MSGID, FROMDATE, TODATE )
VALUES (res.login, res.password, res.msg_body, res.destinationadress, res.datetosend, res.msgid, res.fromdate, res.todate);

commit;



END listen_for_messages_sms;
/


6. Проверяю работу

begin

for i in 1..2 loop
add_message(p_login =>'operator',
p_password =>'telebankoper',
p_msg_body =>'VityaBauka',
p_destinationadress =>'77051853874',
p_datetosend =>to_char(sysdate,'yyyy-mm-dd hh:mi:ss'),
p_msgid =>'',
p_fromdate =>'',
p_todate =>'');









commit;
end loop;
end;


exec listen_for_messages_sms;
commit;


После того как я стартую процедуру считывания валится следующая ошибка


ORA-00600: [kwqidrdq: loop], [0], [0], [0], [0],
[], [], []
ORA-06512: ia "SYS.DBMS_AQ", line 675
ORA-06512: ia "SYS.DBMS_AQ", line 702
ORA-06512: ia "SCOTT.LISTEN_FOR_MESSAGES_SMS", line 23
ORA-06512: ia line 1

Прошу помочь в этой проблеме. Задача изначально состоит чтоб происходило добавление сообщений в очередь, а прослушиватель должен быть постоянно быть включен и считывать из очереди сообщения во временную таблицу, при чем удалять сообщения считанные из очереди. Возможно в моей коде есть какие то ошибки, прошу отметить.
16 фев 09, 14:07    [6824378]     Ответить | Цитировать Сообщить модератору
 Re: Прослушивание очереди сообщений DBMS_AQ  [new]
SQL*Plus
Member

Откуда: Россия, Москва
Сообщений: 8131
BaurzhanSH
Всем привет, есть небольшая проблема с включением прослушивателя. Никак не могу понять в чем причиина. Может кто то подскажет. Выполняю следющие шаги

1. Создаю тип который буду использовать в queue
CREATE OR REPLACE
TYPE XG_USER.AQ_SMS_TYPE AS OBJECT
( login VARCHAR2(30), 
  password VARCHAR2(80),
  msg_body VARCHAR2(100),
  destinationadress VARCHAR2(30), 
  datetosend VARCHAR2(100),
  msgid number(32),
  fromdate varchar2(30),
  todate varchar2(30),
  date_ins date
  );
/
2. Создаю queue table, queue, have started queue,создаю agent,добавляю subscriber

exec dbms_aqadm.CREATE_QUEUE_TABLE(queue_table=>'aq_test_sms', queue_payload_type=>'aq_sms_type', multiple_consumers=>TRUE, comment=>'Для тестирования очереди смс');
/
exec DBMS_AQADM.CREATE_QUEUE(queue_name=>'test_queue_sms', queue_table=>'aq_test_sms')
/
exec dbms_aqadm.START_QUEUE('test_queue_sms', true, true) 
/
exec dbms_aqadm.create_aq_agent('test_sms')
/
exec dbms_aqadm.ADD_SUBSCRIBER('test_queue_sms',  sys.aq$_agent( 'recipient', null, null ))
/
3.Создаю временную таблицу в которую буду считывать queue
CREATE SEQUENCE XG_USER.AQ_SEC
  START WITH 1
  MAXVALUE 999999999999999999999999999
  MINVALUE 1
  NOCYCLE
  CACHE 20
  NOORDER
/  

CREATE TABLE AQ_TEST_SMS_READ
(
  ID                 NUMBER(32),
  LOGIN              VARCHAR2(30 BYTE),
  PASSWORD           VARCHAR2(80 BYTE),
  MSG_BODY           VARCHAR2(100 BYTE),
  DESTINATIONADRESS  VARCHAR2(30 BYTE),
  DATETOSEND         VARCHAR2(100 BYTE),
  MSGID              NUMBER(32),
  FROMDATE           VARCHAR2(30 BYTE),
  TODATE             VARCHAR2(30 BYTE),
  DATE_INS           DATE
)

CREATE OR REPLACE TRIGGER tr_aq_test_sms_read
before insert on aq_test_sms_read
for each row
declare
    n number;
begin
    select AQ_SEC_sms.nextval
      into n
     from dual;
    
    :new.date_ins := sysdate; 
    :new.id := n;
end;
/
4. Создаю процедуру для добавления сообщения в очередь
CREATE OR REPLACE PROCEDURE add_message(p_login              VARCHAR2,
                        p_password           VARCHAR2,
                        p_msg_body           VARCHAR2,
                        p_destinationadress  VARCHAR2,
                        p_datetosend         VARCHAR2,
                        p_msgid              NUMBER,
                        p_fromdate           VARCHAR2,
                        p_todate             VARCHAR2,    
                        p_queue_name       VARCHAR2 DEFAULT 'scott.test_queue_sms')
  IS
       msg aq_sms_type;
       msg_id RAW(16);
       enqueue_options dbms_aq.enqueue_options_t;
       message_properties dbms_aq.message_properties_t;
  BEGIN
    msg := aq_sms_type( login              => p_login,                      
                        password           => p_password,            
                        msg_body           => p_msg_body,            
                        destinationadress  => p_destinationadress,
                        datetosend         => p_datetosend,          
                        msgid              => p_msgid,                   
                        fromdate           => p_fromdate,             
                        todate             => p_todate,                 
                        date_ins           => sysdate);
    
    
    message_properties.delay := dbms_aq.no_delay;
    enqueue_options.visibility := dbms_aq.on_commit;
    enqueue_options.visibility := dbms_aq.immediate;
    
    dbms_aq.enqueue(queue_name => p_queue_name,
                    enqueue_options => enqueue_options,
                    message_properties => message_properties,
                    payload => msg,
                    msgid => msg_id);
                    
  END add_message;
 

5. Создаю процедуру прослушивания и для считывания из очереди во временную таблицу
--Прослушивание сообщений
CREATE OR REPLACE PROCEDURE SCOTT.listen_for_messages_sms
IS
    --для прослушивания
    agent            sys.aq$_agent;
    test_agent_list  DBMS_AQ.aq$_agent_list_t;
  
    --для выборки сообщений
    res aq_sms_type;
    msid RAW(16);
    deq_options dbms_aq.DEQUEUE_OPTIONS_T;
    mes_props dbms_aq.message_properties_t;
    
  
  
BEGIN
   test_agent_list(1) := sys.aq$_agent('recipient', 'scott.test_queue_sms',  NULL);

   deq_options.dequeue_mode := dbms_aq.remove;
   deq_options.visibility := dbms_aq.on_commit;
   deq_options.consumer_name := 'recipient';
   deq_options.wait := dbms_aq.forever;
   
       DBMS_AQ.LISTEN(
          agent_list   =>   test_agent_list, 
          wait         =>   dbms_aq.forever, 
          agent        =>   agent);
      
   
        dbms_aq.dequeue(queue_name=>'scott.test_queue_sms', dequeue_options=>deq_options, message_properties=>mes_props, payload=>res, msgid=>msid);
        
         INSERT INTO aq_test_sms_read (LOGIN, PASSWORD, MSG_BODY, DESTINATIONADRESS, DATETOSEND, MSGID, FROMDATE, TODATE ) 
                          VALUES (res.login, res.password, res.msg_body, res.destinationadress, res.datetosend, res.msgid, res.fromdate, res.todate);

    commit;   
  
       
    
END listen_for_messages_sms;
/

6. Проверяю работу

begin

    for i in 1..2 loop
        add_message(p_login              =>'operator',
                                  p_password           =>'telebankoper',
                                  p_msg_body           =>'VityaBauka',
                                  p_destinationadress  =>'77051853874',
                                  p_datetosend         =>to_char(sysdate,'yyyy-mm-dd hh:mi:ss'),
                                  p_msgid              =>'',
                                  p_fromdate           =>'',
                                  p_todate             =>'');    
        
        commit;
    end loop;
end;    

exec listen_for_messages_sms;
commit;
После того как я стартую процедуру считывания валится следующая ошибка


ORA-00600:  [kwqidrdq: loop], [0], [0], [0], [0],
[], [], []
ORA-06512: ia  "SYS.DBMS_AQ", line 675
ORA-06512: ia  "SYS.DBMS_AQ", line 702
ORA-06512: ia  "SCOTT.LISTEN_FOR_MESSAGES_SMS", line 23
ORA-06512: ia  line 1
Прошу помочь в этой проблеме. Задача изначально состоит чтоб происходило добавление сообщений в очередь, а прослушиватель должен быть постоянно быть включен и считывать из очереди сообщения во временную таблицу, при чем удалять сообщения считанные из очереди. Возможно в моей коде есть какие то ошибки, прошу отметить.

Посмотрите на MetaLink'е и/или обратитесь в техподдержку.

P.S. Огласите версию и платформу Oracle Server.

P.P.S. При оформлении кода используйте, пожалуйста, тег SRC данного форума.
Этим вы повысите свои шансы на получение ответа.
16 фев 09, 14:19    [6824455]     Ответить | Цитировать Сообщить модератору
 Re: Прослушивание очереди сообщений DBMS_AQ  [new]
expal
Guest
Нафига тебе listen для одной очереди? Делай сразу dbms_aq.dequeue. Он тоже может ждать сообщения из очереди.

Это
exec dbms_aqadm.create_aq_agent('test_sms')
/
вроде как не нужно.

Ты создаёшь очередь 'test_queue_sms' в своей схеме, а пользуешь очередь 'scott.test_queue_sms', возможно в чужой схеме. Приди к консенсусу в именовании очереди.

И контрольный пример неплохо покороче сделать, а то букф много...
16 фев 09, 15:13    [6824899]     Ответить | Цитировать Сообщить модератору
Все форумы / Oracle Ответить