首页 文章

我们可以使用Spring批处理分区和rabbitmq同时运行具有不同参数的同一作业的多个作业实例

提问于
浏览
1

我使用spring批处理分区并使用rabbitmq作为中间件来实现我的批处理作业 .

我研究了文档,并参考了这些单元测试

https://github.com/sshcherbakov/spring-batch-talk/blob/master/src/main/resources/META-INF/master.xml

https://github.com/sshcherbakov/spring-batch-talk/blob/master/src/main/resources/META-INF/slave.xml

我可以同时运行我的工作步骤但是我有点担心如果我用不同的参数同时启动多个相同作业的实例,它将如何工作 .

例如,我正在使用importExchange作业导入交换数据,但如果我为不同的市场(例如美国市场)启动importExchange作业,则欧洲会同时进行标记 .

分区程序将输入交换名称分区为不同的分区步骤执行上下文,MessagePartitionHandler将stepExecutionRequests作为消息通过rabbitmq队列发送到不同的服务器,并将在不同的服务器上同时执行步骤 .

现在的困惑是,在回复队列上发送响应时(对于所有作业实例都是相同的),所有实例的侦听器都将侦听相同的回复队列 . 例如job1和job2都会回复同一个回复队列 .

我们如何确保job1的响应将被job1出站网关而不是job2的响应,反之亦然?出站网关是否只接收对自己发送的请求的响应(通过检查核心ID)并忽略其他响应?

由于我们使用直接通道和交换,响应将仅传递给一个侦听器,因此job1的响应将被job2的侦听器接收吗?

或者是否有选择性回复的路由器或过滤器?

我是否需要担心这个问题,或者MessageChannelPartitionHandler会解决这个问题?或者我应该为具有工作ID的回复队列添加前缀?

以下是我的配置

<task:executor id="taskExecutor" pool-size="20" />

<int:channel id="importExchangesOutboundChannel">
    <int:dispatcher task-executor="taskExecutor" />
</int:channel>

<int:channel id="importExchangesInboundStagingChannel" />

<amqp:outbound-gateway request-channel="importExchangesOutboundChannel"
    reply-channel="importExchangesInboundStagingChannel" amqp-template="importExchangesAmqpTemplate"
    mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
    mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />


<beans:bean id="importExchangesMessagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate"
    p:defaultChannel-ref="importExchangesOutboundChannel"
    p:receiveTimeout="150000" />


<beans:bean id="importExchangesPartitioner"
    class="org.springframework.batch.core.partition.support.FlatFilePartitioner"
    p:resource="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/exchanges.txt"
    scope="step" />


<beans:bean id="importExchangesPartitionHandler"
    class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler"
    p:stepName="importExchangesStep" p:gridSize="6"
    p:messagingOperations-ref="importExchangesMessagingTemplate" />

<int:aggregator ref="importExchangesPartitionHandler"
    send-partial-result-on-expiry="true" send-timeout="300000"
    input-channel="importExchangesInboundStagingChannel" />

<amqp:inbound-gateway concurrent-consumers="6"
    request-channel="importExchangesInboundChannel" receive-timeout="300000"
    reply-channel="importExchangesOutboundStagingChannel" queue-names="importExchangesQueue"
    connection-factory="rabbitConnectionFactory"
    mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
    mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />

<rabbit:template id="importExchangesAmqpTemplate" connection-factory="rabbitConnectionFactory"
    routing-key="importExchangesQueue" reply-timeout="300000">
</rabbit:template>

<int:channel id="importExchangesInboundChannel" />

<int:service-activator ref="stepExecutionRequestHandler"
    input-channel="importExchangesInboundChannel" output-channel="importExchangesOutboundStagingChannel" />

<int:channel id="importExchangesOutboundStagingChannel" />



<rabbit:queue name="${import.exchanges.queue}" />
<rabbit:queue name="${import.exchanges.reply.queue}" />

<rabbit:direct-exchange name="${import.exchanges.exchange}">
    <rabbit:bindings>
        <rabbit:binding queue="${import.exchanges.queue}"
            key="${import.exchanges.routing.key}" />
    </rabbit:bindings>
</rabbit:direct-exchange>


<beans:bean id="stepExecutionRequestHandler"
    class="org.springframework.batch.integration.partition.StepExecutionRequestHandler"
    p:jobExplorer-ref="jobExplorer" p:stepLocator-ref="stepLocator" />


<beans:bean id="stepLocator"
    class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />


<beans:bean id="importExchangesItemWriter"
    class="com.st.batch.foundation.ImportExchangesItemWriter"
    p:symfony-ref="symfony" p:replyTimeout="${import.exchanges.reply.timeout}"
    p:logFilePath="${batch.log.file.path}.#{jobParameters[batch_id]}"
    scope="step" />


<beans:bean id="importExchangesFileItemReader"
    class="org.springframework.batch.item.file.MultiThreadedFlatFileItemReader"
    p:resource="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/exchanges.txt"
    p:lineMapper-ref="stLineMapper" p:startAt="#{stepExecutionContext['startAt']}"
    p:maxItemCount="#{stepExecutionContext['itemsCount']}" scope="step" />

<step id="importExchangesStep">
    <tasklet transaction-manager="transactionManager">
        <chunk reader="importExchangesFileItemReader" writer="importExchangesItemWriter"
            commit-interval="${import.exchanges.commit.interval}" />
    </tasklet>
</step>

<job id="importExchangesJob" restartable="true">

    <step id="importExchangesStep.master" next="importEclsStep.master">
        <partition partitioner="importExchangesPartitioner"
            handler="importExchangesPartitionHandler" />
    </step>

</job>

编辑:

我尝试从amqpTemplate中删除回复队列名称以使用默认临时回复队列并测试此用例,在查看回复之前,问题也在从属端 .

<rabbit:template id="importExchangesAmqpTemplate" connection-factory="rabbitConnectionFactory"
    routing-key="importExchangesQueue" reply-timeout="300000">
</rabbit:template>

我用伪数据创建了两个输入文件,例如

我的工作ID是2014-06-08和2014-06-09 . 我在2014-06-08和2014-06-09的文件夹名下创建了exchanges.txt .

/home/ubuntu/tmp/spring/batch/2015-06-08/exchanges.txt
/home/ubuntu/tmp/spring/batch/2015-06-09/exchanges.txt

/home/ubuntu/tmp/spring/batch/2015-06-08/exchanges.txt文件中的数据是

1
2
3
up to 30

在/home/ubuntu/tmp/spring/batch/2015-06-09/exchanges.txt中

31
32
33
up to 60

我正在使用这个项目阅读器阅读项目并传递给作家 .

读者:

<beans:bean id="importExchangesFileItemReader"
    class="org.springframework.batch.item.file.MultiThreadedFlatFileItemReader"
    p:resource="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/exchanges.txt"
    p:lineMapper-ref="stLineMapper" p:startAt="#{stepExecutionContext['startAt']}"
    p:maxItemCount="#{stepExecutionContext['itemsCount']}" scope="step" />

作家:

<beans:bean id="importExchangesItemWriter"
    class="com.st.batch.foundation.ImportExchangesItemWriter"
    p:symfony-ref="symfony" p:replyTimeout="${import.ecls.reply.timeout}"
    p:logFilePath="${batch.log.file.path}.#{jobParameters[batch_id]}"
    scope="step" />

在内部编写器中,我调用外部命令,为每个项目交换导入数据

@Override
public void write(List<? extends T> exchanges) throws Exception {

    commandRunner.setLogFilePath(this.logFilePath);

    for (T exchange : exchanges) {

        String command = commandRunner.getConsolePath() + " "
                + "st:import exchange" + " " + exchange.toString();

        commandRunner.run(command, this.replyTimeout);  
    }

}

在commandRunner里面,

public void run(String command, long replyTimeout)
        throws Exception {

    String[] commands = command.split("\\s+");

    ProcessBuilder pb = new ProcessBuilder(commands);
    File log = new File(this.logFilePath);
    pb.redirectErrorStream(true);
    pb.redirectOutput(Redirect.appendTo(log));
    Process p = pb.start();
    .......
}

如果我只启动一个作业实例(8个批处理ID为2015-06-08或2015-06-09),一切正常但如果同时启动两个输入数据,这两个作业实例的步骤混合,我的意思是,这是我在日志文件中得到的

tail -f /var/log/st/batch.log.2015-06-08

14 23 1 27 19 9 15 24 2 10 28 20 25 16 3 21 29 11 26 17 4 30 12 22 18 5 44 45 46

在/var/log/st/batch.log.2015-06-09

52 13 47 31 37 6 53 57 48 32 38 54 7 49 58 33 39 55 8 59 50 34 40 56 60 51 35 42 41 36 43

所以44 45 46转到batch.log.2015-06-08,这应该转到batch.log.2015-06-09和6 7 8转到batch.log.2015-06-09,这应该转到batch.log .2015-06-08

我将日志文件路径传递给项目编写器,因为我需要为每个作业单独的日志文件,因此将batch_id附加到文件名 .

<beans:bean id="importExchangesItemWriter"
    class="com.st.batch.foundation.ImportExchangesItemWriter"
    p:symfony-ref="symfony" p:replyTimeout="${import.exchanges.reply.timeout}"
    p:logFilePath="${batch.log.file.path}.#{jobParameters[batch_id]}"
    scope="step" />

它是否由于出站和入站网关而发生?是否为不同的作业实例创建了不同的Spring集成通道,网关等实例,或者它们就像所有作业实例的rabbitmq队列一样?

入站网关的并发消费者=“8”这些消费者对于所有作业实例是否相同,或者是否会为每个作业实例创建单独的8个消费者?

这个处理程序可以为多个作业分区吗?

<beans:bean id="importExchangesPartitioner"
    class="org.springframework.batch.core.partition.support.FlatFilePartitioner"
    p:resource="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/exchanges.txt"
    scope="step" />

重现这一点的应用就在这里

https://github.com/vishalmelmatti/spring-batch-remote-partition-test/tree/master

1 回答

  • 1

    这一切都由框架照顾到你 . 当分区处理程序发出步骤执行请求时,他在 Headers 中设置了相关ID ...

    .setCorrelationId(stepExecutionRequest.getJobExecutionId() + ":" + stepExecutionRequest.getStepName())
    

    聚合器使用它将所有响应(对于此作业)聚合到单个消息中,当收到所有响应时,该消息将释放给分区处理程序...

    Message<Collection<StepExecution>> message = (Message<Collection<StepExecution>>) messagingGateway.receive(replyChannel);
    

    这是使用序列大小 Headers 实现的 .

相关问题