среда, 7 апреля 2010 г.

Oracle AQ - Listener

Сегодня решал очередную проблему. Проблему подкинули наши деловые партнёры. Подкинуть-подкинули, а решать пришлось мне.
Словом, в очередь Oracle AQ пишется сообщение в XML-формате. Сообщение необходимо передато в другой програмный модуль, но при этом добавив к нему Namespace. То есть, необходимо каким-то образом слушать очередь входящих сообщений, трансформировать их, и передавать дальше. Вот как я это решил:

-- Создал очередь для выходящих сообщений
BEGIN
DBMS_AQADM.create_queue_table (
queue_table => 'jmsuser.JMS_QUEUE_OUT_TABLE', queue_payload_type => 'SYS.AQ$_JMS_TEXT_MESSAGE');

DBMS_AQADM.create_queue (
queue_name => 'jmsuser.JMS_QUEUE_OUT',
queue_table => 'jmsuser.JMS_QUEUE_OUT_TABLE');

DBMS_AQADM.start_queue (
queue_name => 'jmsuser.JMS_QUEUE_OUT',
enqueue => TRUE);
END;
/

--
-- Процедура - обработчик, читает из входю очереди, пишет в выход. очередь
--
-------------------------------------------------------
CREATE OR REPLACE procedure notifyAbout(
context raw,
reginfo sys.aq$_reg_info,
descr sys.aq$_descriptor,
payload raw,
payloadl number)
AS
dequeue_opts DBMS_AQ.dequeue_options_t;
enqueue_opts DBMS_AQ.enqueue_options_t;
message_props DBMS_AQ.message_properties_t;
message_handle RAW(16);
msg SYS.AQ$_JMS_TEXT_MESSAGE;
msqOut SYS.AQ$_JMS_TEXT_MESSAGE;
xml VARCHAR2(32000 CHAR);
BEGIN
-- get the consumer name and msg_id from the descriptor
dequeue_opts.msgid := descr.msg_id;
dequeue_opts.consumer_name := descr.consumer_name;

-- Dequeue the message
DBMS_AQ.DEQUEUE(queue_name => descr.queue_name,
dequeue_options => dequeue_opts,
message_properties => message_props,
payload => msg,
msgid => message_handle);

dbms_output.put_line('Dequeued '||message_handle) ;

-- Change the payload
msg.get_text(xml);
xml := replace( xml, '<:RESPONSE', '<:RESPONSE xmlns="http://www.lodestarcorp.com" ');

-- Enqueue the new message
msqOut := sys.aq$_jms_text_message.construct;
msqOut.set_text(xml);

DBMS_AQ.enqueue(queue_name => 'jmsuser.JMS_QUEUE_OUT',
enqueue_options => enqueue_opts,
message_properties => message_props,
payload => msqOut,
msgid => message_handle);
DBMS_OUTPUT.put_line (message_handle);

commit;
END;
/

--
-- Регистрация листенера
--
DECLARE
reginfo1 sys.aq$_reg_info;
reginfolist sys.aq$_reg_info_list;

BEGIN
-- register for the pl/sql procedure notifyCB to be called on notification
reginfo1 := sys.aq$_reg_info('jmsuser.JMS_QUEUE',
DBMS_AQ.NAMESPACE_AQ,
'plsql://jmsuser.notifyAbout',
HEXTORAW('FF'));

-- Create the registration info list
reginfolist := sys.aq$_reg_info_list(reginfo1);

-- do the registration
sys.dbms_aq.register(reginfolist, 1);

END;

Комментариев нет:

Отправить комментарий