首页 文章

如何在多个线程中执行Spring集成流以并行消耗更多Amazon SQS队列消息?

提问于
浏览
2

需要帮忙

我需要创建多个并行执行的sqs队列使用者,但我不知道如何使用Sprint Integration实现这一点

我有以下架构

具有200k消息的Amazon SQS队列

具有5个EC2实例的Amazon堆栈,每个实例都使用tomcat服务器运行带有Spring Integration流的Spring启动应用程序,该流使用来自spring-integration-aws的sqs-message-driven-channel-adapter消耗SQS的消息(https://github.com/spring-projects/spring-integration-aws

并将该消息发布到具有1秒平均响应的REST服务(我无法修改REST服务是一个约束,但我可以并行发送消息)

SQS队列 - >堆栈(5个tomcat实例) - > Rest服务

Constraints Amazon SQS允许客户端按批次读取最多10条消息的消息,但我可以让多个客户端并行消耗更多消息 .

在Amazon SQS中,需要手动删除消息,这是使用spring集成完成的,只有在REST服务返回OK时才删除消息 .

我没有可能的重复问题(SQS向两个不同的客户端发送相同的消息)

我无法以任何方式存储Spring Boot应用程序中的消息

My Spring Integration flow

<aws-messaging:sqs-async-client id="clientWithCredentials"/>
<int-aws:sqs-message-driven-channel-adapter
  sqs="clientWithCredentials" 
  channel="channel_1"
  queues="https://sqs.us-east-1.amazonaws.com/123456789000/SomeAmazonSQSName"
  max-number-of-messages="10"/>

<int:channel id="channel_1" />
<int:outbound-channel-adapter ref="restService" method="publish" channel="channel_1" />

我如何在多个线程中并行执行此流程以并行消耗更多消息?

我尝试将 <int:poller fixed-rate="1" task-executor="executor" /> 放在sqs-message-driven-channel-adapter中但不允许 .

1 回答

  • 1

    要实现这样的要求,您可以使用 ExecutorChannel 而不是默认 DirectChannel .

    这样,所有SQS消息都将分发到 ExecutorChannel 提供的线程,因此并行执行 .

    有关 ExecutorChannel 的更多信息,请参见Reference Manual .

    UPDATE

    所以,我建议的内容应该反映在你当前的配置中:

    <int:channel id="channel_1">
       <int:dispatcher task-executor="someExecutor"/>
    </int:channel>
    

    UPDATE

    如果您仍然坚持使用多个SQS适配器,则简化版本如下:

    <int-aws:sqs-message-driven-channel-adapter
        sqs="sqsAsyncClient" 
        channel="sqs-to-metricator"
        queues="https://sqs.us-east-1.amazonaws.com/123/SomeSQSQueueName"
        max-number-of-messages="10"
        />
    
    
    <int-aws:sqs-message-driven-channel-adapter
        sqs="sqsAsyncClient" 
        channel="sqs-to-metricator"
        queues="https://sqs.us-east-1.amazonaws.com/123/SomeSQSQueueName"
        max-number-of-messages="10"
        />
    
    <int-aws:sqs-message-driven-channel-adapter
        sqs="sqsAsyncClient" 
        channel="sqs-to-metricator"
        queues="https://sqs.us-east-1.amazonaws.com/123/SomeSQSQueueName"
        max-number-of-messages="10"
        />
    
    <int:channel id="sqs-to-metricator" />
    
    <int:outbound-channel-adapter ref="restService"
        method="publish" channel="sqs-to-metricator" />
    

    另外为避免重复,您可以考虑切换到Java DSL并开始使用 ItengrationFlowContext 进行动态 IntegrationFlow 注册:https://docs.spring.io/spring-integration/docs/5.0.4.RELEASE/reference/html/java-dsl.html#java-dsl-runtime-flows

相关问题