最近在EmbedIT工作中,我需要评估Oracle AQ是否是一个替代旧的异步任务管理系统的不错选择。所以,让我分享一下我的经验。首先,有关Oracle AQ的文档非常庞大,因此我将指出您想知道的最重要的内容。
何时考虑Oracle AQ?
如果出于某种原因你不想搞像Hazelcast或Terracotta这样的大数据框架,或者 通过JMS或AMPQ当前的消息传递解决方案对你来说还不够,那么一定要检查:Oracle Advance Queuing
Oracle带来了自己的消息传递,它通过数据库工作。什么时候适合你?
- 你每天都有数百万封邮件。
- 你收到了很多消息。
- 您希望以消息方式与其他数据库/应用程序通信。
- Oracle AQ对分布式事务来说非常棒。甲骨文明确指出:
Oracle AQ的工作方式是不会将消息视为dequeued 已出列(并因此被删除,假设您在默认的破坏模式下出列) ,直到保留到所有消费者都已将该消息取出为止。如果处理某些事件涉及多个系统,并且主应用程序需要某种机制来知道其中一个处理失败,则Oracle AQ是一个完美的候选者。
Oracle AQ的技术观点
Oracle AQ允许将消息排入队列并从数据库管理的队列中出列,这里每个队列与一个队列表相关联。
每个队列都有一个有效载荷,可以是:
- RAW
- OBJECT:指定类型的消息。
- ANYDATA:具有任何对象类型的消息。
Oracle AQ中的队列可以是:
- 单个消费者队列(只有一个消费者能够在一瞬间出队)
- 多个消费者队列。可以通过以下方式实现:
1)多个收件人 - 在排队之前设置邮件的收件人。2)多个订户 - 队列具有默认的订户集。
如何使用Oracle AQ队列
重要说明:
Oracle AQ在订阅者之间没有任何类型的自动负载平衡,就像WebLogic JMS服务器中存在循环一样。但是你可以实现它。这并不难。Oracle AQ演示
在演示的所有部分中,我将使用JDBC API和附加类AQTestObjectStruct和AQTestClient。
如何创建Oracle AQ表(多个消费者)
假设我们将在数据库模式ho_kloucek_in中排队“ Message_typ ” 类型的消息。所以在数据库中运行:
create or replace type ho_kloucek_in.Message_typ as object ( subject VARCHAR2(30), text VARCHAR2(80))
|
好的,我们有一个对象类型。现在我们可以为它创建带队列的队列表。
public void createQueue() throws SQLException, AQException, ClassNotFoundException { AQQueueTableProperty qtable_prop; AQQueueProperty queue_prop; AQQueueTable q_table; AQQueue queue;
java.sql.Connection aqconn = getOracleDataSource().getConnection(); aqconn.setAutoCommit(false);
AQSession aqsession = null;
// Register the Oracle AQ Driver Class.forName("oracle.AQ.AQOracleDriver"); try { AQEnqueueOption enqueueOption = new AQEnqueueOption();
aqsession = AQDriverManager.createAQSession(aqconn);
qtable_prop = new AQQueueTableProperty("ho_kloucek_in.Message_typ"); qtable_prop.setMultiConsumer(true);
/* Creating a queue table called aq_table1 in aqjava schema: */ q_table = aqsession.createQueueTable(queueOwner, queueTable, qtable_prop); System.out.println("Successfully created "+queueTable+" in "+queueOwner+" schema");
/* Creating a new AQQueueProperty object */ queue_prop = new AQQueueProperty();
/* Creating a queue called aq_queue1 in aq_table1: */ queue = aqsession.createQueue(q_table, queueName, queue_prop);
queue.start(true, true); System.out.println("Successfully created "+queueName+" in "+queueOwner+""); } catch (Exception ex) { ex.printStackTrace(); } finally { aqsession.close(); aqconn.close(); } }
|
请注意我是如何调用qtable_prop.setMultiConsumer(true)的通过设置收件人列表来定位多个消费者
现在,在队列表和队列设置之后,让我们用subsName变量设置名称,将消费者作为收件人:
public void dequeueMessage(final String subsName) throws AQException, SQLException, ClassNotFoundException { java.sql.Connection aqconn = getOracleDataSource().getConnection(); aqconn.setAutoCommit(false);
AQSession aq_sess = null;
Class.forName("oracle.AQ.AQOracleDriver");
try { aq_sess = AQDriverManager.createAQSession(aqconn);
AQQueue queue; AQMessage message; AQDequeueOption deq_option;
queue = aq_sess.getQueue(queueOwner, queueName);
AQDequeueOption opt = new AQDequeueOption(); opt.setConsumerName(subsName);
while (true) { System.out.println("Waiting on subscription:"+subsName); message = queue.dequeue(opt, oracle.sql.STRUCT.class);
if (message == null) { System.out.println("no messages"); } else { System.out.println("Successful dequeue");
if (message.getObjectPayload().getPayloadData() instanceof STRUCT) { STRUCT popedStruct = (STRUCT) message.getObjectPayload().getPayloadData(); System.out.println("subject: " + popedStruct.getAttributes()[0]); System.out.println("text: " + popedStruct.getAttributes()[1]); }
//Commit aqconn.commit(); } } } finally { aq_sess.close(); aqconn.close(); } }
|
在设置件收件人列表时,我们在排队前设置收件人:public void enqueueMessage(String xmlMessage) throws SQLException, AQException, ClassNotFoundException { java.sql.Connection aqconn = getOracleDataSource().getConnection(); aqconn.setAutoCommit(false);
AQSession aqsession = null;
// Register the Oracle AQ Driver Class.forName("oracle.AQ.AQOracleDriver"); try { AQEnqueueOption enqueueOption = new AQEnqueueOption();
aqsession = AQDriverManager.createAQSession(aqconn); AQQueue queue = aqsession.getQueue(queueOwner, queueName); AQMessage msg = queue.createMessage();
AQMessageProperty msgProps = new AQMessageProperty(); msgProps.setPriority(1); Vector recipientList = new Vector(); AQAgent subs1 = new AQAgent("Sub2", null, 0); recipientList.add(subs1); msgProps.setRecipientList(recipientList); msg.setMessageProperty(msgProps);
AQObjectPayload payload = msg.getObjectPayload();
Object [] test_attributes = new Object[2]; test_attributes [0] = "AsyncTask"; test_attributes [1] = "121212666";
StructDescriptor personDesc = StructDescriptor.createDescriptor("HO_KLOUCEK_IN.MESSAGE_TYP", aqconn);
STRUCT new_async = new STRUCT(personDesc, aqconn, test_attributes);
payload.setPayloadData(new_async);
queue.enqueue(enqueueOption, msg); aqconn.commit(); System.out.println("Message succesfully enqueued.."); } catch (Exception ex) { ex.printStackTrace(); } finally { aqsession.close(); aqconn.close(); } }
|
此方法将消息发送到我的队列,并且仅为名为“Sub2”的消费者者使用!我们来试试吧。
启动两个JVM,使用参数“Sub1”和“Sub2”启动前面提到的dequeueMessage方法... 使用包含dequeueMessage方法的类AQTestObjectStruct。以下内容应出现在两个JVM中:
JVM1:
Waiting on subscription: Sub1
JVM2:
Waiting on subscription: Sub2
如您所见,默认情况下AQQueue.dequeue方法是阻塞的。无论如何,您也可以指定阻塞一段时间,请参阅文档和AQDequeueOption
现在启动类AQTestClient和先前修改过的方法enqueueMessage发送消息
Object [] test_attributes = new Object[2]; test_attributes [0] = "AsyncTask"; test_attributes [1] = "11111";
|
对于订户“Sub2”:
AQMessage msg = queue.createMessage();
AQMessageProperty msgProps = new AQMessageProperty(); msgProps.setPriority(1); Vector recipientList = new Vector(); AQAgent subs2 = new AQAgent("Sub2", null, 0); recipientList.add(subs2); msgProps.setRecipientList(recipientList); msg.setMessageProperty(msgProps);
|
consumer由AQAgent的名称设置。现在,在启动AQTestClient之后,应该在JVM2中出现:
Successful dequeue subject: AsyncTask text: 11111 Waiting on subscription: Sub2
|
订阅的多个消费者
使用AQMessage的收件人列表参数,您将在获取之前设置收件人。Oracle AQ文档明确指出:
如果在入队期间指定了收件人列表,则它将覆盖订阅列表。
那么让我们看看如何创建队列订阅 ...让我们更改AQTestObjectStruct中的dequeue方法并将其作为订阅者启动:
public void dequeueMessage(final String subsName) throws AQException, SQLException, ClassNotFoundException { java.sql.Connection aqconn = getOracleDataSource().getConnection(); aqconn.setAutoCommit(false);
AQSession aq_sess = null;
Class.forName("oracle.AQ.AQOracleDriver");
try { aq_sess = AQDriverManager.createAQSession(aqconn);
AQQueue queue; AQMessage message; AQDequeueOption deq_option;
queue = aq_sess.getQueue(queueOwner, queueName);
// add subscription AQAgent subs = new AQAgent(subsName, null, 0); queue.removeSubscriber(subs); queue.addSubscriber(subs,null);
AQDequeueOption opt = new AQDequeueOption(); opt.setConsumerName(subsName);
while (true) { System.out.println("Waiting on subscription:"+subsName); message = queue.dequeue(opt, oracle.sql.STRUCT.class);
if (message == null) { System.out.println("no messages"); } else { System.out.println("Successful dequeue");
if (message.getObjectPayload().getPayloadData() instanceof STRUCT) { STRUCT popedStruct = (STRUCT) message.getObjectPayload().getPayloadData(); System.out.println("subject: " + popedStruct.getAttributes()[0]); System.out.println("text: " + popedStruct.getAttributes()[1]); }
//Commit aqconn.commit(); } } } finally { aq_sess.close(); aqconn.close(); } }
|
(注意:如果要添加新订阅者,请注释掉“queue.removeSubscriber(subs)”行)
现在,您可以在排队之前省略消息中的收件人列表,因为订阅会设置一组消息目标。让我再引用Oracle AQ DOC:
如果enqueue的消息生成者提供消费者的收件人列表,则无需为多消费者队列指定订阅。 在某些情况下,可能需要将针对特定消费者集的消息排队,而不是默认的订户列表。
这就是它!如果未在消息中指定收件人列表,系统将向所有订户发送消息。如果指定收件人列表,则系统会将邮件传递给指定的收件人。如果你不指定收件人列表,队列将没有任何订阅者(AQQueue.addSubscriber方法),那么你最终会得到错误:
oracle.AQ.AQOracleSQLException: ORA-24033: no recipients for message ORA-06512: at "SYS.DBMS_AQIN", line 345 ORA-06512: at line 1
at oracle.AQ.AQOracleQueue.enqueue(AQOracleQueue.java:1267) at com.sachinhandiekar.oracle.aq.AQTestClient.enqueueMessage(AQTestClient.java:55) at com.sachinhandiekar.oracle.aq.AQTestClient.main(AQTestClient.java:84)
|
测试多个订阅者
首先改变的方法入队中AQTestClient,就像我说的,我们可以努力忽略任何消费者的设置:
public void enqueueMessage(String xmlMessage) throws SQLException, AQException, ClassNotFoundException { java.sql.Connection aqconn = getOracleDataSource().getConnection(); aqconn.setAutoCommit(false);
AQSession aqsession = null;
// Register the Oracle AQ Driver Class.forName("oracle.AQ.AQOracleDriver"); try { AQEnqueueOption enqueueOption = new AQEnqueueOption();
aqsession = AQDriverManager.createAQSession(aqconn); AQQueue queue = aqsession.getQueue(queueOwner, queueName); AQMessage msg = queue.createMessage();
AQMessageProperty msgProps = new AQMessageProperty(); msgProps.setPriority(1);
AQObjectPayload payload = msg.getObjectPayload();
Object [] test_attributes = new Object[2]; test_attributes [0] = "AsyncTask"; test_attributes [1] = "5555";
StructDescriptor personDesc = StructDescriptor.createDescriptor("HO_KLOUCEK_IN.MESSAGE_TYP", aqconn);
STRUCT new_async = new STRUCT(personDesc, aqconn, test_attributes);
payload.setPayloadData(new_async);
queue.enqueue(enqueueOption, msg); aqconn.commit(); System.out.println("Message succesfully enqueued.."); } catch (Exception ex) { ex.printStackTrace(); } finally { aqsession.close(); aqconn.close(); } }
|
现在通过两个JVM中的AQTestObjectStruct类启动前面提到的dequeueMessage方法和订阅:JVM1:Waiting on subscription: Sub1
JVM2:Waiting on subscription: Sub2
现在运行修改后的没有收件人的AQTestClient.enqueueMessage方法和两个JVM中的输出将是:
**JVM1**: Waiting on subscription: Sub1 Successful dequeue subject: AsyncTask text: 5555 Waiting on subscription: Sub1
**JVM2**: Waiting on subscription: Sub2 Successful dequeue subject: AsyncTask text: 5555 Waiting on subscription: Sub2
|
如您所见,消息已发送给所有订阅者,因为我们未在消息中指定收件人。
总结
我希望我能很好地解释所有细节。我真的很想念Oracle AQ的一些消息默认负载均衡。这也许是我不会将它用于我们的应用程序节点之间的异步任务分配的原因,因为我需要循环,WebLogic JMS免费提供给我。但是如果你想在某种分布式事务中与多个应用程序通信,那么肯定会使用Oracle AQ。
点击标题见原文,源码