首页 文章

Spring Integration:如何增加传入消息的处理

提问于
浏览
1

我正在开发一个Spring应用程序,它每分钟将接收大约500 xml消息 . 下面的xml配置仅允许每分钟处理大约60条消息,其余消息存储在队列中(保存在DB中),并且以每分钟60条消息的速率检索它们 .

尝试从多个来源阅读文档,但仍然不清楚Poller与任务执行者的角色 . 我对当前处理每分钟60条消息的理解是因为轮询器配置中的“固定延迟”值设置为10(因此它将在1分钟内轮询6次)和“每次轮询最多消息”设置为10,因此每分钟处理6x10 = 60条消息 .

请告知我的理解是否正确,并帮助修改xml配置以更高速率处理传入消息 .

任务执行者的角色也不清楚 - 这是否意味着pool-size =“50”将允许50个线程并行运行以处理轮询器轮询的消息?

我想要的是:

  • JdbcChannelMessageStore用于将传入的xml消息存储在数据库(INT_CHANNEL_MESSAGE)表中 . 这是必需的,因此在服务器重启的情况下,消息仍然存储在表中而不会丢失 .

  • 要以并行或受控/限制的数量并行执行的传入消息 . 根据系统处理这些消息的能力,我想限制系统应该并行处理多少消息 .

  • 由于此配置将在群集中的多个服务器上使用,因此任何服务器都可以拾取任何消息,因此不应导致两个服务器处理相同消息的任何冲突 . 希望这是由Spring Integration处理的 .

如果这已在其他地方得到解答但在阅读了大量帖子之后我仍然不明白这是如何工作的 .

提前致谢 .

<!-- Message Store configuration start -->              

    <!-- JDBC message store configuration -->
    <bean id="store" class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore">
        <property name="dataSource" ref="dataSource"/>
        <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
        <property name="region" value="TX_TIMEOUT"/>
        <property name="usingIdCache" value="true"/>
    </bean>

    <bean id="queryProvider" class="org.springframework.integration.jdbc.store.channel.MySqlChannelMessageStoreQueryProvider" />        

<int:transaction-synchronization-factory
    id="syncFactory">
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())" />
</int:transaction-synchronization-factory>

<task:executor id="pool" pool-size="50" queue-capacity="100" rejection-policy="CALLER_RUNS" />  

<int:poller id="messageStorePoller" fixed-delay="10"
    receive-timeout="500" max-messages-per-poll="10" task-executor="pool"
    default="true" time-unit="SECONDS">
    <int:transactional propagation="REQUIRED"
        synchronization-factory="syncFactory" isolation="READ_COMMITTED"
        transaction-manager="transactionManager" /> 
</int:poller>

<bean id="transactionManager"
    class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
<!--  1)        Store the message in  persistent message store -->
    <int:channel id="incomingXmlProcessingChannel">
         <int:queue message-store= "store" />
    </int:channel> 

    <!-- 2) Check in, Enrich the headers, Check out -->
    <!-- (This is the entry point for WebService requests) -->
    <int:chain input-channel="incomingXmlProcessingChannel" output-channel="incomingXmlSplitterChannel">
        <int:claim-check-in message-store="simpleMessageStore" />
        <int:header-enricher >
            <int:header name="CLAIM_CHECK_ID" expression="payload"/>
            <int:header name="MESSAGE_ID" expression="headers.id" />
            <int:header name="IMPORT_ID" value="XML_IMPORT"/>
        </int:header-enricher>
        <int:claim-check-out message-store="simpleMessageStore" />          
    </int:chain>

在Artem的回复后添加:

谢谢阿尔乔姆 . 因此,在固定延迟10秒后发生的每次轮询(根据上面的配置),任务执行程序将检查任务队列,如果可能(并且需要)启动新任务?并且每个pollingTask(线程)将根据“maxMessagesPerPoll”配置从消息存储(队列)接收“10”消息 .

为了实现传入消息的更高处理时间,我应该减少poller上的fixedDelay,以便任务执行者可以启动更多线程吗?如果我将fixedDelay设置为2秒,将启动一个新线程来执行10个消息,大约30个这样的线程将在一分钟内启动,在一分钟内处理“大约”300个传入消息 .

很抱歉在一个问题上问了太多 - 只想解释完整的问题 .

1 回答

  • 0

    这个类背后的主要逻辑是:

    private final class Poller implements Runnable {
    
        private final Callable<Boolean> pollingTask;
    
        Poller(Callable<Boolean> pollingTask) {
            this.pollingTask = pollingTask;
        }
    
        @Override
        public void run() {
            AbstractPollingEndpoint.this.taskExecutor.execute(() -> {
                int count = 0;
                while (AbstractPollingEndpoint.this.initialized
                        && (AbstractPollingEndpoint.this.maxMessagesPerPoll <= 0
                        || count < AbstractPollingEndpoint.this.maxMessagesPerPoll)) {
                    try {
                        if (!Poller.this.pollingTask.call()) {
                            break;
                        }
                        count++;
                    }
                    catch (Exception e) {
                        if (e instanceof MessagingException) {
                            throw (MessagingException) e;
                        }
                        else {
                            Message<?> failedMessage = null;
                            if (AbstractPollingEndpoint.this.transactionSynchronizationFactory != null) {
                                Object resource = TransactionSynchronizationManager.getResource(getResourceToBind());
                                if (resource instanceof IntegrationResourceHolder) {
                                    failedMessage = ((IntegrationResourceHolder) resource).getMessage();
                                }
                            }
                            throw new MessagingException(failedMessage, e);
                        }
                    }
                    finally {
                        if (AbstractPollingEndpoint.this.transactionSynchronizationFactory != null) {
                            Object resource = getResourceToBind();
                            if (TransactionSynchronizationManager.hasResource(resource)) {
                                TransactionSynchronizationManager.unbindResource(resource);
                            }
                        }
                    }
                }
            });
        }
    
    }
    

    如你所见, taskExecutor 负责在一个线程中旋转 pollingTask 直到 maxMessagesPerPoll . 如果当前轮询任务对于新计划来说太长,则池中的其他线程将被涉及 . 但是,一次轮询中的所有消息都在同一个线程中处理,而不是并行处理 .

    这就是它的工作原理 . 由于你在一个问题中提出太多要求,我希望这些信息足以让你知道下一步的步骤 .

相关问题