首页 文章

使用Spring Integration中的Java配置从SFTP复制并移动文件

提问于
浏览
0

我是Spring集成的新手 . 我有这个要求首先将文件从/ files文件夹移动到SFTP位置的/ process文件夹,然后将该文件复制到本地 . 我建议使用网关,配置必须在java中使用注释 . 我试过在stackoverflow上寻找答案,但找不到相关的东西 . 但是,我能够使用@InboundChannelAdapter并通过配置其他bean来复制文件 .

下面是我到目前为止编写的代码

配置公共类SftpConfiguration {

@Value("${ftp.file.host}")
private String host;

@Value("${ftp.file.port")
private int port;

@Value("${ftp.file.user")
private String user;

@Value("${ftp.file.password")
private String password;

@Value("${directorry.file.remote")
private String remoteDir;

@Value("${directorry.file.in.pollerDelay")
final private String pollerDelay = "1000";

@Value("${directory.file.remote.move}")
private String toMoveDirectory;

@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
    factory.setHost(host);
    factory.setPort(port);
    factory.setUser(user);
    factory.setPassword(password);
    factory.setAllowUnknownKeys(true);
    return new CachingSessionFactory<LsEntry>(factory);
}

@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
    SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
    fileSynchronizer.setDeleteRemoteFiles(false);
    fileSynchronizer.setRemoteDirectory(remoteDir);
    fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xlsx"));
    return fileSynchronizer;
}

@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = pollerDelay))
public MessageSource<File> sftpMessageSource() {
    SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(
            sftpInboundFileSynchronizer());
    source.setLocalDirectory(new File("ftp-inbound"));
    source.setAutoCreateLocalDirectory(true);
    source.setLocalFilter(new AcceptOnceFileListFilter<File>());

    return source;
}

@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
    return new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            try {
                new FtpOrderRequestHandler().handle((File) message.getPayload());
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    };
}

@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handlerOut() {
    return new SftpOutboundGateway(sftpSessionFactory(), "mv", toMoveDirectory);
}

}

我将不胜感激任何提示或建议 . 谢谢 .

2 回答

  • 0

    是的,您需要使用 @Bean@InboundChannelAdapter for SftpInboundFileSynchronizingMessageSource 将文件从远程 /process 文件夹下载到本地目录 . 这是在轮询器基础上完成的,并且是一个单独的过程,完全与移动操作无关 . 您可以使用 MV 命令通过 FtpOutboundGateway 执行的移动逻辑:

    mv命令没有选项 . expression属性定义“from”路径,rename-expression属性定义“to”路径 . 默认情况下,rename-expression是headers ['file_renameTo'] . 此表达式不能求值为null或空字符串 . 如有必要,将创建所需的任何远程目录 . 结果消息的有效负载是Boolean.TRUE . 原始远程目录在file_remoteDirectory标头中提供,文件名在file_remoteFile标头中提供 . 新路径位于file_renameTo标头中 .

    这个你必须通过以下方式使用:

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel")
    public MessageHandler handler() {
        return new SftpOutboundGateway(ftpSessionFactory(), "mv", "'my_remote_dir/my_file'");
    }
    
  • 0

    我试过,以下为我工作 .

    @Configuration
    @EnableIntegration
    public class SftpConfiguration {
    
        @Value("${config.adapter.ftp.host}")
        private String host;
    
        @Value("${config.adapter.ftp.port}")
        private int port;
    
        @Value("${config.adapter.ftp.user}")
        private String user;
    
        @Value("${config.adapter.ftp.password}")
        private String password;
    
        @Autowired
        private FtpOrderRequestHandler handler;
    
        @Autowired
        ConfigurableApplicationContext context;
    
        @Value("${config.adapter.ftp.excel.sourceFolder}")
        private String sourceDir;
    
        @Value("${config.adapter.ftp.excel.localFolder}")
        private String localDir;
    
        @Value("${config.adapter.ftp.excel.backupFolder}")
        private String backupDir;
    
        @Value("${config.adapter.ftp.statusExcel.destinationFolder}")
        private String statusDest;
    
        private static final Logger LOGGER = LoggerFactory.getLogger(SftpConfiguration.class);
    
    
        @Bean
        public SessionFactory<LsEntry> sftpSessionFactory() {
            DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
            factory.setHost(host);
            factory.setPort(port);
            factory.setUser(user);
            factory.setPassword(password);
            factory.setAllowUnknownKeys(true);
            return new CachingSessionFactory<LsEntry>(factory);
        }
    
        @Bean
        public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
            SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
            fileSynchronizer.setDeleteRemoteFiles(true);
            fileSynchronizer.setRemoteDirectory(sourceDir);
            fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xlsx"));
            return fileSynchronizer;
        }
    
        @Bean
        @InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(cron = "${config.cron.cataligent.order}"), autoStartup = "true")
        public MessageSource<File> sftpMessageSource() {
            SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(
                    sftpInboundFileSynchronizer());
            source.setLocalDirectory(new File(localDir));
            source.setAutoCreateLocalDirectory(true);
            source.setLocalFilter(new AcceptOnceFileListFilter<File>());
            return source;
        }
    
    
        @Bean
        @ServiceActivator(inputChannel = "sftpChannel")
        public MessageHandler handlerOrder() {
            return new MessageHandler() {
                @Override
                public void handleMessage(Message<?> message) throws MessagingException {
                    try {
                        SFTPGateway gateway = context.getBean(SFTPGateway.class);
                        final File orderFile = (File) message.getPayload();
                        gateway.sendToSftp(orderFile);
                        LOGGER.debug("{} is picked by scheduler and moved to {} folder",orderFile.getName(),backupDir);
                        handler.handle((File) message.getPayload());
                        handler.deleteFileFromLocal((File) message.getPayload());
                        LOGGER.info("Processing of file {} is completed",orderFile.getName());
                    } catch (IOException e) {
                        LOGGER.error("Error occurred while processing order file exception is {}",e.getMessage());
                    }
                }
    
            };
        }
    
        @Bean
        @ServiceActivator(inputChannel = "sftpChannelDest")
        public MessageHandler handlerOrderBackUp() {
            SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
            handler.setRemoteDirectoryExpression(new LiteralExpression(backupDir));
            return handler;
        }
    
        @Bean
        @ServiceActivator(inputChannel = "sftpChannelStatus")
        public MessageHandler handlerOrderStatusUS() {
            SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
            handler.setRemoteDirectoryExpression(new LiteralExpression(statusDest));
            return handler;
        }
    
    
    
        @MessagingGateway
        public interface SFTPGateway {
            @Gateway(requestChannel = "sftpChannelDest")
            void sendToSftp(File file);
    
            @Gateway(requestChannel = "sftpChannelStatus")
            void sendToSftpOrderStatus(File file);
    
        }
    
    }
    

    我用它来从SFTP位置获取文件 . 保存在本地 . 然后将文件移动到备份文件夹,然后处理整个文件 . 处理后从本地删除文件 .

    我使用网关(@MessagingGateway)进行文件移动,我在该接口中有两种不同的方法 . 一个用于将同一文件移动到备份文件夹和其他方法是将我的代码中的另一个文件(状态文件)移动到所需的SFTP位置 . 我试图保持代码清洁,以便更好地理解 .

相关问题