首页 文章

FlatFileItemWriter应该写出与输入文件相同的输出文件

提问于
浏览
0

我有一个Spring批处理作业,它从目录中读取与命名模式匹配的文件,进行一些处理,并在输入文件中写回每行处理的状态 . 编写器必须生成与输入文件同名的输出文件 . 对MultiResourceItemReader我传递模式:“files - * .txt”并期望FlatFileItemWriter使用输入文件的名称 . 如何在上下文xml文件中指定此约束?

读者 beans

<bean id="multiResourceReader" class="org.springframework.batch.item.file.MultiResourceItemReader" scope="step">
  <property name="resources" value="file:#{jobParameters['cwd']}/#{jobParameters['inputFolder']}/file-*.txt" />
  <property name="delegate" ref="itemReader" />
</bean>
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">

        <property name="lineMapper">
          <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">

            <property name="lineTokenizer">
              <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                <property name="delimiter" value="," />
                <property name="names" value="paramA,paramB" />
              </bean>
            </property>
            <property name="fieldSetMapper" >
              <bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
                <property name="targetType" value="a.b.c.d.MyObject" />
              </bean>
            </property>
          </bean>
        </property>
    </bean>

我试过这个:在我的FlatFileItemReader实现中将输入文件路径作为字符串属性添加到ExecutionContext的映射中 . 在我的FlatFileItemWriter实现中 - 覆盖setResource并实际从ExecutionContext创建一个超出值的Resource对象 . 这好吗?

1 回答

  • 1

    我认为这个解决方案可能会产生错误的输出,因为 MultiResourceItemReader 从第一个文件中读取了一堆项目,如果这个集合与你的完成策略不匹配则从下一个资源读取;在这种情况下,您将拥有来自两个不同源的项目,并且 itemReader bean未同步以写入正确的输出资源 .
    您可以尝试解决让 a.b.c.d.MyObject 实现org.springframework.batch.item.ResourceAware并编写自定义多资源项编写器并在资源更改时管理资源打开/关闭(此解决方案并非易事,因为当然需要状态保存以实现可重新启动) .

    class MyItemWriter implements ItemWriter<MyObject>,ItemStream {
      private FlatFileItemWriter delegate;
      private String resourceName;
    
      public open(ExecutionContext e) {
        resourceName = e.getString("resourceName");
        delegate.open(e);
      }
      public close(ExecutionContext e) {
        e.putString("resourceName",resourceName);
        delegate.close(e);
      }
      public update(ExecutionContext e) {
        e.putString("resourceName",resourceName);
        delegate.update(e);
      }
      public void write(List<? extends MyObject> items) {
        for(final MyObject o : items) {
          String itemResource = o.getResource().toString();
          if(!itemResource.equals(resourceName)) {
            closeDelegate();
            openDelegate();
          }
          writeToDelegate(o);
        }
      }
    }
    

    当然,这没有经过测试,这只是一个想法 .
    另一种方式是完全不同的,并以这种方式使用decider模拟循环:

    • 创建一个bean(例如,名为 ResourceHolder ),用于存储来自 file:#{jobParameters['cwd']}/#{jobParameters['inputFolder']}/file-*.txt 的资源;这个bean没有存储到执行上下文中,而是在每个作业启动时都填充了JobExecutionListener,因为执行上下文应该保持较小 .

    • 使用决策程序来模拟循环!从decider store到执行上下文的当前资源的名称;您还需要存储当前资源的索引以移至下一个或停止工作

    • 读取/处理/写入步骤使用单个文件,一个存储在决策程序的执行上下文中


    <batch:job id="job">
      <batch:decision decider="decider" id="decider">
        <batch:next on="CONTINUABLE" to="readWriteStep" />
        <batch:end on="FINISHED" />
      </batch:decision>
      <batch:step id="readWriteStep" reader="itemReader" write="itemWriter" next="decider" />
      <batch:listeners>
        <batch:listener class="ResourceHolderFiller">
          <property name="resources" value="file:#{jobParameters['cwd']}/#{jobParameters['inputFolder']}/file-*.txt" />
          <property name="resourceHolderBean" ref="resourceHolderBean" />
        </batch:listener>
      </batch:listeners>
    </batch:job>
    <bean id="resourceHolder" class="ResourceHolderBean" />
    <bean id="decider" class="MyDecider">
      <property name="resourceHolderBean" ref="resourceHolderBean" />
    </bean>
    
    class ResourceHolderBean
    {
      Resource[] resource;
    
      public void setResource(Resource[] resource) {...}
      public Resource[] getResource() {...}
    }
    class MyDecider implements JobExecutionDecider {
      ResourceHolderBean resourceBean;
    
      public void setResourceBean(ResourceHolderBean resBean){...}
    
      FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        Integer resourceIndex = jobExecution.getExecutionContext().getInt("resourceIndex");
        if(resourceIndex == null)
        {
          resourceIndex = 0;
        }
        if(resourceIndex == resourceBean.getResources().size()) {
          return new FlowExecutionStatus(RepeatStatus.FINISHED.name());
        }
        else {
          jobExecution.getExecutionContext().putInt("resourceIndex",resourceIndex);
          // This is the key your reader/writer must use as inpu/output filename
          jobExecution.getExecutionContext().putString("resourceName",resourceBean.getResources()[resourceIndex].toString());
        }
      }
    }
    

    ResourceHolderFiller 是一个将资源注入ResourceHolderBean bean的作业侦听器 .

    希望能有所帮助 .

相关问题