首页 文章

使用任务执行程序的事务轮询器是否将数据库资源用于排队的消息

提问于
浏览
1

我有一个服务激活器,它使用轮询器从通道中提取消息 . 该通道具有一个队列,该队列由持久存储支持到数据库 . 轮询器还配置了任务执行程序,以便从通道向消息处理添加一些并发性 .

任务执行程序配置有队列容量 .

由于轮询器从数据库中检索来自通道的消息,并且这被配置为事务性的,因此如果任务执行程序没有可用的线程,那么在任务 Actuator 中排队的消息的事务会发生什么 . 对线程的任务执行程序的请求是排队的,因为这些消息没有自己的线程,所以事务会发生什么?我假设将提交由任务执行程序排队的轮询器从持久性通道存储中删除消息 . 因此,如果服务器在排队的任务 Actuator 中排队的runnables失败,它们将丢失?

由于事务持久性通道队列的想法是确保在服务器发生故障时没有消息丢失,所以排队的消息(在任务 Actuator 中)如何处理它们在通道数据库支持的队列/存储上活动的事务?

<bean id="store" class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="channelServerDataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
    <property name="region" value="${user.name}_${channel.queue.region:default}"/>
    <property name="usingIdCache" value="false"/>
</bean> 

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/> 
</int:transaction-synchronization-factory>

<int:channel id="transacitonAsyncServiceQueue">
    <int:queue message-store="store"/> 
    <!--  <int:queue/>  --> 
</int:channel>

<bean id="rxPollingTrigger" class="org.springframework.scheduling.support.PeriodicTrigger">
    <constructor-arg value="500"/>
    <constructor-arg value="MILLISECONDS"/>
    <property name = "initialDelay" value = "30000"/> 
    <!-- initialDelay important to ensure channel doesnt start processing before the datasources have been initialised becuase we
         now persist transactions in the queue, at startup (restart) there maybe some ready to go which get processed before the
         connection pools have been created which happens when the servlet is first hit -->
</bean> 

<int:service-activator ref="asyncChannelReceiver" method="processMessage" input-channel="transacitonAsyncServiceQueue">
    <int:poller trigger="rxPollingTrigger" max-messages-per-poll="20"  task-executor="taskExecutor" receive-timeout="400">
        <int:transactional transaction-manager="transactionManagerAsyncChannel" /> 
    </int:poller>
    <int:request-handler-advice-chain>
        <ref bean="databaseSessionContext" />
    </int:request-handler-advice-chain>
</int:service-activator>

<task:executor id="taskExecutor" pool-size="100-200" queue-capacity="200" keep-alive="1" rejection-policy="CALLER_RUNS" />

1 回答

  • 0

    ...然后,如果任务执行程序没有可用的线程,那么在任务执行程序中排队的消息的事务会发生什么 . 对线程的任务执行程序的请求是排队的,因为这些消息没有自己的线程,所以事务会发生什么?

    它不会在任务执行之前启动(包括来自通道的 receive() ) .

    轮询器调度任务,任务启动,事务启动,工作继续,事务被提交/回滚 .

    请参阅AbstractPollingEndpoint - pollingTask.call() 是事务开始的位置 .

相关问题