我正在启动一个需要高度可扩展的Java EE项目 . 到目前为止,这个概念是:
-
几个Message Driven Beans,负责架构的不同部分
-
每个MDB都注入了会话Bean,处理业务逻辑
-
几个实体Bean,提供对持久层的访问
通过JMS消息通过请求/回复概念在架构的不同部分之间进行 -
通信:
-
MDB接收包含活动请求的消息
-
使用其会话bean来执行必要的业务逻辑
-
将msg中的响应对象返回给原始请求者
这个想法是通过消息总线将部分架构彼此分离,对可扩展性没有限制 . 只需启动更多组件 - 只要它们连接到同一总线,我们就可以成长和发展 .
不幸的是,我们遇到了请求 - 回复概念的大量问题 . 交易管理似乎在我们的方式充足 . 它接缝会话bean不应该使用消息?!
阅读http://blogs.oracle.com/fkieviet/entry/request_reply_from_an_ejb和http://forums.sun.com/message.jspa?messageID=10338789,我觉得人们实际上建议不要使用EJB的请求/回复概念 .
如果是这种情况,您如何在EJB之间进行通信? (记住,可扩展性是我追求的)
我当前设置的详细信息:
-
MDB 1 'TestController',使用(本地)SLSB 1 'TestService'作为业务逻辑
-
TestController.onMessage()使TestService向队列XYZ发送消息并请求回复
-
TestService使用Bean管理事务
-
TestService在初始化时通过联合连接工厂 Build 与JMS代理的连接和会话(@PostConstruct)
-
TestService在发送后提交事务,然后开始另一个事务并等待10秒的响应
-
消息到达MDB 2 'LocationController',它使用(本地)SLSB 2 'LocationService'作为业务逻辑
-
LocationController.onMessage()使LocationService将消息发送回请求的JMSReplyTo队列
-
相同的BMT概念,相同的@PostConstruct概念
-
都使用相同的连接工厂来访问代理
问题:第一条消息被发送(通过SLSB 1)并被接收(通过MDB 2)确定 . 发送返回消息(通过SLSB 2)也很好 . 然而,SLSB 1从未收到任何东西 - 它只是超时 .
我试过没有messageSelector,没有改变,仍然没有接收消息 .
会话bean消费消息是不是可以的?
SLSB 1 - TestService.java
@Resource(name = "jms/mvs.MVSControllerFactory")
private javax.jms.ConnectionFactory connectionFactory;
@PostConstruct
public void initialize() {
try {
jmsConnection = connectionFactory.createConnection();
session = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
System.out.println("Connection to JMS Provider established");
} catch (Exception e) { }
}
public Serializable sendMessageWithResponse(Destination reqDest, Destination respDest, Serializable request) {
Serializable response = null;
try {
utx.begin();
Random rand = new Random();
String correlationId = rand.nextLong() + "-" + (new Date()).getTime();
// prepare the sending message object
ObjectMessage reqMsg = session.createObjectMessage();
reqMsg.setObject(request);
reqMsg.setJMSReplyTo(respDest);
reqMsg.setJMSCorrelationID(correlationId);
// prepare the publishers and subscribers
MessageProducer producer = session.createProducer(reqDest);
// send the message
producer.send(reqMsg);
System.out.println("Request Message has been sent!");
utx.commit();
// need to start second transaction, otherwise the first msg never gets sent
utx.begin();
MessageConsumer consumer = session.createConsumer(respDest, "JMSCorrelationID = '" + correlationId + "'");
jmsConnection.start();
ObjectMessage respMsg = (ObjectMessage) consumer.receive(10000L);
utx.commit();
if (respMsg != null) {
response = respMsg.getObject();
System.out.println("Response Message has been received!");
} else {
// timeout waiting for response
System.out.println("Timeout waiting for response!");
}
} catch (Exception e) { }
return response;
}
SLSB 2 - LocationService.Java(只有回复方法,其余与上面相同)
public boolean reply(Message origMsg, Serializable o) {
boolean rc = false;
try {
// check if we have necessary correlationID and replyTo destination
if (!origMsg.getJMSCorrelationID().equals("") && (origMsg.getJMSReplyTo() != null)) {
// prepare the payload
utx.begin();
ObjectMessage msg = session.createObjectMessage();
msg.setObject(o);
// make it a response
msg.setJMSCorrelationID(origMsg.getJMSCorrelationID());
Destination dest = origMsg.getJMSReplyTo();
// send it
MessageProducer producer = session.createProducer(dest);
producer.send(msg);
producer.close();
System.out.println("Reply Message has been sent");
utx.commit();
rc = true;
}
} catch (Exception e) {}
return rc;
}
太阳resources.xml中
<admin-object-resource enabled="true" jndi-name="jms/mvs.LocationControllerRequest" res-type="javax.jms.Queue" res-adapter="jmsra">
<property name="Name" value="mvs.LocationControllerRequestQueue"/>
</admin-object-resource>
<admin-object-resource enabled="true" jndi-name="jms/mvs.LocationControllerResponse" res-type="javax.jms.Queue" res-adapter="jmsra">
<property name="Name" value="mvs.LocationControllerResponseQueue"/>
</admin-object-resource>
<connector-connection-pool name="jms/mvs.MVSControllerFactoryPool" connection-definition-name="javax.jms.QueueConnectionFactory" resource-adapter-name="jmsra"/>
<connector-resource enabled="true" jndi-name="jms/mvs.MVSControllerFactory" pool-name="jms/mvs.MVSControllerFactoryPool" />
1 回答
即使使用JMS,请求/回复模式实质上仍然是 synchronous . 呼叫者发送消息,然后等待回复 . 由于分布式事务,这不仅复杂,而且还意味着在等待回复时,分配和浪费一个或多个资源(至少在这种情况下为线程) . 您无法以这种方式扩展:您本身就受到线程数量的限制 .
要拥有真正可扩展的JMS架构,一切都必须是 asynchronous . 换句话说:你永远不要等待 . 发送和接收的消息应传递必要的信息以触发下一个活动 .
如果消息的大小太大,则只能存储标识符并将相应的数据存储在数据库中 . 但随后数据库再次成为争论的焦点 .
如果不同消息需要知道他们参与的长时间运行过程,您也可以使用 correlation identifiers . 收到消息后,接收可以使用相关标识符"resume"长时间运行的活动 . 这是BPEL的传统模式 . 同步请求/回复与具有相关标识符的异步消息之间的主要区别在于可以在每个步骤之间释放资源 . 您可以使用后者进行缩放,但不能使用第一个进行缩放 .
说实话,我对你的长篇文章感到困惑,并且不理解你的设计是异步(和正确),还是与请求/回复同步(并且有问题) . 但我希望我提供了一些答案 .
无论如何,请访问网站Enterprise Integration Patterns,这是一个宝贵的信息来源 .