需要帮忙
我需要创建多个并行执行的sqs队列使用者,但我不知道如何使用Sprint Integration实现这一点
我有以下架构
具有200k消息的Amazon SQS队列
具有5个EC2实例的Amazon堆栈,每个实例都使用tomcat服务器运行带有Spring Integration流的Spring启动应用程序,该流使用来自spring-integration-aws的sqs-message-driven-channel-adapter消耗SQS的消息(https://github.com/spring-projects/spring-integration-aws)
并将该消息发布到具有1秒平均响应的REST服务(我无法修改REST服务是一个约束,但我可以并行发送消息)
SQS队列 - >堆栈(5个tomcat实例) - > Rest服务
Constraints Amazon SQS允许客户端按批次读取最多10条消息的消息,但我可以让多个客户端并行消耗更多消息 .
在Amazon SQS中,需要手动删除消息,这是使用spring集成完成的,只有在REST服务返回OK时才删除消息 .
我没有可能的重复问题(SQS向两个不同的客户端发送相同的消息)
我无法以任何方式存储Spring Boot应用程序中的消息
My Spring Integration flow
<aws-messaging:sqs-async-client id="clientWithCredentials"/>
<int-aws:sqs-message-driven-channel-adapter
sqs="clientWithCredentials"
channel="channel_1"
queues="https://sqs.us-east-1.amazonaws.com/123456789000/SomeAmazonSQSName"
max-number-of-messages="10"/>
<int:channel id="channel_1" />
<int:outbound-channel-adapter ref="restService" method="publish" channel="channel_1" />
我如何在多个线程中并行执行此流程以并行消耗更多消息?
我尝试将 <int:poller fixed-rate="1" task-executor="executor" />
放在sqs-message-driven-channel-adapter中但不允许 .
1 回答
要实现这样的要求,您可以使用
ExecutorChannel
而不是默认DirectChannel
.这样,所有SQS消息都将分发到
ExecutorChannel
提供的线程,因此并行执行 .有关
ExecutorChannel
的更多信息,请参见Reference Manual .UPDATE
所以,我建议的内容应该反映在你当前的配置中:
UPDATE
如果您仍然坚持使用多个SQS适配器,则简化版本如下:
另外为避免重复,您可以考虑切换到Java DSL并开始使用
ItengrationFlowContext
进行动态IntegrationFlow
注册:https://docs.spring.io/spring-integration/docs/5.0.4.RELEASE/reference/html/java-dsl.html#java-dsl-runtime-flows