首页 文章

如何在 Spring 季批量停止文件传输

提问于
浏览
0

我创建了一个弹出批处理作业,用于从本地目录读取文件,并使用Camel-spring-batch通过ftp将其上载到远程目录 . 我正在使用chunk做同样的事情 . 我的 Spring 季批处理作业配置如下:

<bean id="consumerTemplate" class="org.apache.camel.impl.DefaultConsumerTemplate" init-method="start" destroy-method="stop">
    <constructor-arg ref="camelContext"/>
</bean>

<bean id="producerTemplate" class="org.apache.camel.impl.DefaultProducerTemplate" scope="step" init-method="start" destroy-method="stop">
    <constructor-arg ref="camelContext"/>
</bean>

<bean id="localFileReader" class="com.camel.springbatch.reader.LocalFileReader" scope="step" destroy-method="stop">
    <constructor-arg value="file:#{jobParameters['dirPath']}"/>
    <constructor-arg ref="consumerTemplate"/>
</bean>

<bean id="ftpFileWriter" class="com.camel.springbatch.writer.FtpFileWriter" scope="step">
    <constructor-arg ref="producerTemplate"/>
    <constructor-arg value="ftp://#{jobParameters['host']}?username=#{jobParameters['user']}&amp;password=#{jobParameters['password']}"/>
</bean>

工作配置:

<batch:job id="ftpReadWrite">
    <batch:step id="readFromLocalWriteToFtp" next="readFromFtpWriteToLocal">
        <batch:tasklet>
            <batch:chunk reader="localFileReader" writer="ftpFileWriter"  commit-interval="5" />
        </batch:tasklet>
    </batch:step>

我的“Localfilereader”和“ftpFileWriter”看起来像:

import org.apache.camel.ConsumerTemplate;
import org.apache.camel.component.spring.batch.support.CamelItemReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalFileReader extends CamelItemReader {
private Logger log= LoggerFactory.getLogger(this.getClass());
ConsumerTemplate consumerTemplate;
String endpointUri;

public LocalFileReader(ConsumerTemplate consumerTemplate, String endpointUri) {
    super(consumerTemplate, endpointUri);
    this.consumerTemplate=consumerTemplate;
    this.endpointUri=endpointUri;
}

@Override
public Object read() throws Exception {
    Object item = consumerTemplate.receiveBody(endpointUri);
    return item;
}

}

“Ftp文件编写者”

import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.spring.batch.support.CamelItemWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class FtpFileWriter extends CamelItemWriter {
private Logger log= LoggerFactory.getLogger(this.getClass());
ProducerTemplate producerTemplate;
String endpointUri;
public FtpFileWriter(ProducerTemplate producerTemplate, String endpointUri) {
    super(producerTemplate, endpointUri);
    this.producerTemplate=producerTemplate;
    this.endpointUri=endpointUri;
}

@Override
public void write(List items) throws Exception {
    System.out.println("************************Writing item to ftp "+items);
    for (Object item : items) {
        System.out.println("writing item [{}]..."+item);
        producerTemplate.sendBody(endpointUri, item);
        log.debug("wrote item");
    }
}
}

如果我的本地目录中只有5个文件,它工作正常 . 它从我的本地目录中读取全部5个文件,然后发送到编写器和编写器,将其作为我的commit-interval = 5发送到ftp服务器 . 如果我在本地目录中有6个文件,那么它会向编写器发送第一个5个文件块,然后再次开始读取剩余文件,这次只剩下一个文件 . 它读取1个文件并开始等待4个文件,从不发送给编写器 . 我用commit-interval = 1尝试了它现在它将所有6个文件发送到服务器并再次开始等待下一个文件 . 这里我需要在处理完所有文件后停止进程 .

请帮我解决这个问题......

2 回答

  • 1

    ConsumerTemplate 的javadoc receiveBody 等到有响应;你需要使用超时(在spring-batch中检查TimeoutPolicy)或以不同的方式将阅读器标记为'exhausted'(从阅读器返回null)以阻止阅读器阅读

  • 1

    您可以使用receiveBodyNoWait而不是receiveBody . 然后,您必须检查消费者 endpoints 中是否还有文件 . 我将这个代码编写为一个将big-xml文件转换为较小块的tasklet .

    tasklet:

    public class MyCamelTasklet extends ServiceSupport implements Tasklet, InitializingBean{
    
    private static final Logger LOG = LoggerFactory.getLogger(MyCamelTasklet.class);
    
    private final CamelContext camelContext;
    private final ConsumerTemplate consumerTemplate;
    private final File workingDir;
    private final Route xmlSplitRoute;
    
    
    public MyCamelTasklet(ConsumerTemplate consumerTemplate) {
        super();        
        this.consumerTemplate = consumerTemplate;
        this.camelContext = consumerTemplate.getCamelContext();     
        this.xmlSplitRoute = this.camelContext.getRoutes().get(0);
        this.workingDir = new File(xmlSplitRoute.getRouteContext().getFrom().getUri().replace("file:", ""));
    }
    
    @Override
    public RepeatStatus execute(StepContribution arg0, ChunkContext arg1)
            throws Exception {
    
        LOG.debug("reading new item...");
    
        Endpoint endpointXmlSplitRoute = xmlSplitRoute.getEndpoint();
    
        while(getNbFilesToConsume(this.workingDir) > 0) {       
    
         consumerTemplate.receiveBodyNoWait(endpointXmlSplitRoute);
    
        }       
    
        return RepeatStatus.FINISHED;
    }
    
    private int getNbFilesToConsume(File workingDir){
        return FileUtils.listFiles(workingDir, new String[]{"xml"}, false).size();
    }
    
    @Override
    protected void doStart() throws Exception {
        ServiceHelper.startService(consumerTemplate);
    
    }
    
    
    @Override
    protected void doStop() throws Exception {
        ServiceHelper.stopService(consumerTemplate);
    
    }
    
    
    @Override
    public void afterPropertiesSet() throws Exception {
         ObjectHelper.notNull(camelContext, "CamelContext", this);         
         camelContext.addService(this);     
    }
    }
    

    前面的tasklet的单元测试:

    public class SplitTaskletTest {
    
    @Test public void execute() throws Exception {
        CamelContext camelContext = new DefaultCamelContext();      
    
        camelContext.addRoutes(new RouteBuilder() {
            public void configure() {
    
                Namespaces ns = new Namespaces("nsl", "http://www.toto.fr/orders");
                from("file:data/inbox").id("inbox-road").
                split().
                xtokenize("//nsl:order", 'w', ns, 1000).
                streaming().
                to("file:data/outbox?fileName=${file:name.noext}-${exchangeId}.${file:ext}"); 
    
            }
    
        });
    
        camelContext.start();
    
        ConsumerTemplate consumer =new DefaultConsumerTemplate(camelContext);
    
        consumer.start();
    
        MyCamelTasklet tasklet = new MyCamelTasklet(consumer);
    
        long debutTraitement = System.currentTimeMillis();
    
        tasklet.execute(null, null);
    
        long finTraitement = System.currentTimeMillis();
    
        long total = finTraitement-debutTraitement;
    
        File outputDir = new File("data/outbox");
        outputDir.mkdir();
    
        int nbGeneratesFiles = FileUtils.listFiles(outputDir, new String[]{"xml"}, false).size();
    
        System.out.println("Traitement total en secondes : "+total/1000);
    
        Assert.assertTrue(nbGeneratesFiles>0);
    
    }   
    }
    

相关问题