首页 文章

如何在 spring 集成中并行和同步处理?

提问于
浏览
4

是否有可能在Spring集成中保持通道同步(在发送消息后获得确认)但同时处理更多消息(并行处理)而不用线程创建自己的代码(即ExecutorService执行并提交worker)并等待它们?我想通过FTP上传文件,但同时上传更多文件而不在代码中创建自己的线程 . 我需要知道何时上传所有文件(这就是我希望它同步的原因) . 是否可以通过Spring集成配置,如果是,如何?

2 回答

  • 2

    好吧,看起来你需要一些流程:

    • <gateway> 将文件发送到通道并等待一些结果作为确认

    • <splitter>ExecutorChannel 并行处理每个文件

    • <int-ftp:outbound-gateway> 上传每个文件

    • <aggregator> 关联 <int-ftp:outbound-gateway> 的结果

    • <aggregator> 应该将结果发送到 <gateway> ,那时候是waitng .

    如果有什么不清楚,请告诉我 .

    UPDATE

    如何在Spring Integration Java DSL中执行此任何示例?

    像这样的东西:

    @Configuration
    @EnableIntegration
    @IntegrationComponentScan
    public class Configuration {
    
        @Bean
        public IntegrationFlow uploadFiles() {
            return f ->
                       f.split()
                           .handle(Ftp.outboundGateway(this.ftpSessionFactory,
                               AbstractRemoteFileOutboundGateway.Command.PUT, "'remoteDirectory'"))
                           .aggregate();
        }
    
    }
    
    @MessagingGateway(defaultRequestChannel = "uploadFiles.input") 
    interface FtpUploadGateway {
    
        List<String> upload(List<File> filesToUpload);
    
    }
    
  • 2

    通过使用 @Async 任务处理,这在Spring中非常有用 .

    首先创建一个将异步执行任务的服务 . 这里记下 @Async 方法的 @Async 注释,它将被spring扫描并标记为异步执行 .

    import java.util.concurrent.Future;
    
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.scheduling.annotation.AsyncResult;
    import org.springframework.stereotype.Service;
    
    @Service
    public class AsyncTask {
    
        @Async
        public Future<Result> performTask(String someArgument) {
            // put the business logic here and collect the result below
            Result result = new Result(); // this is some custom bean holding your result
            return new AsyncResult<Result>(result);
        }
    }
    

    接下来创建一个组件(可选 - 可以来自任何其他现有服务),它将调用上述服务 .

    import java.util.concurrent.Future;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class AsyncClass {
    
        @Autowired
        private AsyncTask asyncTask;
    
        public void doAsyncOperation() throws Exception {
    
        List<Future<Result>> futures = new ArrayList<Future<Result>>();
    
        for (int i = 1; i < 10; i++) {
            // Simulate multiple calls
            Future<Result > future = doAsync(String.valueOf(i));            
            futures.add(future);
        }
    
        for (Future<Result > future : futures) {
                // fetch the result
                Result result = future.get();
                // process the result
        }
    }
    
        private Future<Result> doAsync(final String someArgument) {
    
            // this will immediately return with a placeholder Future object which
            // can be used later to fetch the result
            Future<Result> future = asyncTask.performAsync(someArgument);
            return future;
        }
    }
    

    启用异步所需的示例xml配置如下(对于基于注释的配置使用@EnableAsync)

    <task:annotation-driven executor="myExecutor" />
    <task:executor id="myExecutor" pool-size="30" rejection-policy="CALLER_RUNS"/>
    

    有关详细文档,请参阅here

相关问题