据我所知,ActiveMQ有一个名为AUTO Acknowledge的功能,它实际上通知代理已收到消息(不确认 生产环境 者) .
我想知道是否有可能在ActiveMQ或RabbitMQ中向消费者发送确认 . 然后我想在 生产环境 者中处理确认消息,如果它不接收确认,则再次将消息发送给消费者 .
您希望在异步介质上执行同步用例 .
在RabbitMQ的情况下,您可以使用RPC,如此处所述 - https://www.rabbitmq.com/tutorials/tutorial-six-python.html和https://www.rabbitmq.com/direct-reply-to.html
请注意,即使作者建议避免它:
如有疑问,请避免使用RPC . 如果可以,您应该使用异步管道 - 而不是类似RPC的阻塞,将结果异步推送到下一个计算阶段 .
RabbitMQ Java客户端通过 com.rabbitmq.client.Channel.basicConsume 提供自动执行 .
com.rabbitmq.client.Channel.basicConsume
至少对于ActiveMQ - 这是内置的 . 你必须在activemq.xml中打开它
<policyEntry queue=">" advisoryForConsumed="true"/>
只需听取您要监视消费消息的队列的咨询主题 . 然后你可以提取消息id:s以及什么不“勾选”未完成的请求 .
对于完整的端到端确认,我建议更多自定义 . 即您的producer-app应该监听一些“响应”队列,该队列接收有关生成的消息状态的响应 . 即如果处理失败 - 您可能想知道为什么等...
无论如何,这里有一些代码与 生产环境 者也听取ActiveMQ的确认 .
public void run() throws Exception { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); conn = cf.createConnection(); sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination dest = sess.createQueue("duck"); MessageConsumer mc = sess.createConsumer(AdvisorySupport.getMessageConsumedAdvisoryTopic(dest)); mc.setMessageListener(this); conn.start(); MessageProducer mp = sess.createProducer(sess.createQueue("duck")); mp.send(sess.createTextMessage("quack")); } public void onMessage(Message msg) { try { String msgId = msg.getStringProperty("orignalMessageId"); System.out.println("Msg: " + msgId + " consumed"); } catch ( Exception e) { e.printStackTrace(); } }
2 回答
您希望在异步介质上执行同步用例 .
在RabbitMQ的情况下,您可以使用RPC,如此处所述 - https://www.rabbitmq.com/tutorials/tutorial-six-python.html和https://www.rabbitmq.com/direct-reply-to.html
请注意,即使作者建议避免它:
RabbitMQ Java客户端通过
com.rabbitmq.client.Channel.basicConsume
提供自动执行 .至少对于ActiveMQ - 这是内置的 . 你必须在activemq.xml中打开它
只需听取您要监视消费消息的队列的咨询主题 . 然后你可以提取消息id:s以及什么不“勾选”未完成的请求 .
对于完整的端到端确认,我建议更多自定义 . 即您的producer-app应该监听一些“响应”队列,该队列接收有关生成的消息状态的响应 . 即如果处理失败 - 您可能想知道为什么等...
无论如何,这里有一些代码与 生产环境 者也听取ActiveMQ的确认 .