我一直在运行一个程序来读取一个包含600万行的文件并验证每一行并将包含验证消息的字符串写入输出文件 . 我使用 spring 集成来实现这一点,以及使用 spring 批量集成适配器从 spring 集成中调用 spring 批量 . 从 spring 集成调用的Spring批次在47s完成,而完全在spring集成中完成的处理需要接近3分钟 . 我的目标是确定改进 spring 集成代码的方法,以便接近 spring 批量性能 . 这是我的spring集成项目的样子 .

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch-int="http://www.springframework.org    /schema    /batch-integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/batch-integration http://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd
    http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
    http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
    http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/integration
        http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/integration/stream
        http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xmlns:task="http://www.springframework.org/schema/task" xmlns:context="http://www.springframework.org/schema/context"
xmlns:stream="http://www.springframework.org/schema/integration/stream">

<context:annotation-config />
<import resource="classpath*:/spring/batch/jobs/response-job.xml" />
<import
    resource="classpath*:/META-INF/spring/integration/integration-bean-config.xml" />

<context:component-scan
    base-package="com.foo.integration.framework" />
<int-file:inbound-channel-adapter id="inputFileChannel"
    directory="file:#{systemProperties['input.tsm.file.watcher.directory']} "
    channel="filesChannel" />
<int:poller id="tsmFilePoller" max-messages-per-poll="100"
    fixed-rate="1000" default="true" />

<int:channel id="filesChannel">
    <int:interceptors>
        <int:wire-tap channel="inputLogger" />
    </int:interceptors>
</int:channel>
<int:logging-channel-adapter id="inputLogger"
    level="DEBUG" expression="payload.toString()"
    logger-name="org.springframework.integration.handler.LoggingHandler"></int:logging-channel-adapter>
<int:logging-channel-adapter id="errorLogger"
    level="DEBUG"></int:logging-channel-adapter>


<int:router input-channel="filesChannel"
    expression="payload.getName().substring(payload.getName().lastIndexOf('.'))">
    <int:mapping value=".dat" channel="datFileChannel" />
    <int:mapping value=".txt" channel="errorFileChannel" />
    <int:mapping value=".csv" channel="tsmBatch" />
</int:router>

<int:channel id="errorFileChannel">
</int:channel>

<int:channel id="datFileChannel">
    <int:queue />
</int:channel>

<int-file:outbound-channel-adapter
    channel="errorFileChannel"
    directory="file:#{systemProperties['incorrect.tsm.file.drop.directory']} "
    auto-create-directory="true"></int-file:outbound-channel-adapter>

<int:channel id="tsmBatch"></int:channel>
<int:transformer id="fileToBatchTransformer" ref="tsmToBatchJobTransformerBean"
    input-channel="tsmBatch" method="toRequest" output-channel="invokeJob" />
<int:channel id="invokeJob"></int:channel>

<batch-int:job-launching-gateway
    request-channel="invokeJob" job-launcher="jobLauncher" reply-channel="nullChannel"></batch-int:job-launching-gateway>


<int:splitter output-channel="channel5" input-channel="channel3">
    <bean
        class="com.foo.integration.framework.splitter.FileSplitter" />
</int:splitter>
<int:channel id="channel3"></int:channel>
<int:channel id="channel5">
    <int:dispatcher task-executor="executor" />
</int:channel>

<int:chain input-channel="channel5" output-channel="preServiceChannel">

    <int:aggregator correlation-strategy-expression="headers['fileName']"

        release-strategy-expression="size() == 3000"
        expire-groups-upon-completion="true" send-partial-result-on-expiry="true"
        message-store="firstMessageStore"></int:aggregator>

</int:chain>

<int:channel id="preServiceChannel">
    <int:queue />
</int:channel>
<int:channel id="postServiceChannel">
    <int:queue />
</int:channel>
<int:service-activator ref="tsmServiceActivator"
    input-channel="preServiceChannel" method="handleMessage"
    output-channel="postServiceChannel">
    <int:poller max-messages-per-poll="1" fixed-rate="1"
        time-unit="MILLISECONDS" task-executor="executor"></int:poller>
</int:service-activator>

<int:chain input-channel="postServiceChannel">

    <int:header-enricher>

        <int:header name="processed" method="computeValue"
            ref="postHeaderEnrichingBean" />
    </int:header-enricher>
    <int:aggregator id="fileMsgAggregator"
        correlation-strategy-expression="headers['processed']"
        release-strategy-expression="size() == 3000"
        send-partial-result-on-expiry="true" message-store="messageStore"
        expire-groups-upon-completion="true"> <!-- release-strategy-expression="size() == 3000" -->
    </int:aggregator>
    <int:transformer method="transform" ref="payloadEnricher"></int:transformer>

    <int-file:outbound-channel-adapter
        id="validationService"
        directory="file:#{systemProperties['validation.error.tsm.file.drop.directory']} "
        auto-create-directory="true" filename-generator-expression="'TSMValidationsErrorMsgFile.dat'"
        mode="APPEND">
    </int-file:outbound-channel-adapter>
</int:chain>

<task:executor id="executor" pool-size="20"
    queue-capacity="0" rejection-policy="CALLER_RUNS" />

<bean id="firstMessageStore"
    class="org.springframework.integration.store.SimpleMessageStore" />
<bean id="firstReaper"
    class="org.springframework.integration.store.MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="firstMessageStore" />
    <property name="timeout" value="40000" />
</bean>
<task:scheduled-tasks>
    <task:scheduled ref="firstReaper" method="run"
        fixed-rate="20000" />
</task:scheduled-tasks>


<int:header-enricher input-channel="datFileChannel"
    output-channel="channel3">
    <int:header name="fileName" method="computeValue" ref="headerEnrichingBean" />
</int:header-enricher>

<bean id="headerEnrichingBean"
    class="com.foo.integration.framework.enricher.MessageHeaderEnricher" />
<bean id="postHeaderEnrichingBean"
    class="com.foo.integration.framework.enricher.PostMessageHeaderEnricher" />
<int:channel id="channel3"></int:channel>
<bean id="messageStore"
    class="org.springframework.integration.store.SimpleMessageStore" />
<bean id="reaper"
    class="org.springframework.integration.store.MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="messageStore" />
    <property name="timeout" value="30000" />
</bean>
<task:scheduled-tasks>
    <task:scheduled ref="reaper" method="run" fixed-rate="20000" />
</task:scheduled-tasks>


<bean id="payloadEnricher"
    class="com.foo.integration.framework.enricher.TSMPayloadEnricher" />

</beans>

STS结构:
enter image description here

我使用迭代器逐行拆分输入文件,并将其传递给聚合器,聚合器将大小为3000的记录分组并传递给执行行验证的service-activator,并将输出发送到聚合输出的聚合器和将其写入输出文件 .

如果有人能指出我可以执行或删除执行程序以提高性能的话,我将非常感激 . 如果您需要更多信息,请与我们联系 . 谢谢,祝福有个美好的一天 .