使用Spring Batch实现ETL

我需要为正在进行的其中一个项目实现一个ETL应用程序 .

它有以下步骤:

  • 需要从表中读取以检索将作为作业参数传递的某些值 .

  • 步骤1的返回对象将进一步用于从第二个表中检索某些数据 .

  • 然后必须从将与步骤2中的值一起使用的平面文件中读取 . 应用业务逻辑 . 然后写入表格 .

我们正在使用Spring Data JPA,Spring集成 .

我面临的挑战是从表中读取值以检索作业的参数然后启动作业 .

然后,必须将步骤2的输出与文件信息一起发送以进行进一步处理 .

我知道如何独立实施上述步骤,但努力将它们从头到尾联系起来 .

分享任何想法来设计上述内容都会很棒 . 提前致谢 .

回答(1)

2 years ago

我会尝试给你一些不同点的想法 .

1 - Read table values and pass them as Job Parameters

我在这看到2个解决方案:

您可以执行"manual"查询(即没有springbatch),然后执行业务逻辑以将结果作为JobParameters传递(您只需要 JobLauncherCommandLineJobRunner ,请参阅Springbatch Documentation §4.4):

JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
Job job = (Job) context.getBean(jobName);

// Do your business logic and your database query here.

// Create your parameters
JobParameter parameter = new JobParameter(resultOfQuery);

// Add them to a map
Map<String, JobParameter> parameters = new HashMap<String, JobParameter>();
parameters.add("yourParameter", parameter);

// Pass them to the job
JobParameters jobParameters = new JobParameters(parameters);
JobExecution execution = jobLauncher.run(job, parameters);

另一种解决方案是添加 JobExecutionListener 并覆盖方法 beforeJob 来执行查询,然后将结果保存在 executionContext 中(然后可以使用: #{jobExecutionContext[name]} 进行访问) .

@Override
public void beforeJob(JobExecution jobExecution) {

    // Do your business logic and your database query here.

    jobExecution.getExecutionContext().put(key, value);
}

在每种情况下,您都可以使用SpringBatch ItemReader 来进行查询 . 例如,您可以将项目阅读器声明为侦听器的字段(不要忘记设置器)并将其配置为:

<batch:listener>
    <bean class="xx.xx.xx.YourListener">
        <property name="reader">
            <bean class="org.springframework.batch.item.database.JdbcCursorItemReader">
                <property name="dataSource" ref="dataSource"></property>
                <property name="sql" value="${yourSQL}"></property>
                 <property name="rowMapper">
                    <bean class="xx.xx.xx.YourRowMapper"></bean>
                </property>
            </bean>
        </property>
    </bean>
</batch:listener>

2 - Read a table depending on results from previous step

再一次,您可以使用 JobExecutionContext 在步骤之间存储和检索数据 . 然后,您可以实现 StepExecutionListener 以覆盖方法 beforeStep 并访问 StepExecution ,这将引导您 JobExecution .

3 - Send result from table reading along results of file reading

没有"default" CompositeItemReader 可以让你同时阅读2个来源,但我不想你真正想要做什么 .

对于你的情况,我会在 <batch:chunk> 中声明"table reader"为读者,然后声明一个自定义的 ItemProcessor ,它将有另一个 ItemReader 字段 . 这个读者将是你的 FlatFileItemReader . 然后,您可以手动启动读取并在 process 方法中应用业务逻辑 .