Part 1 :
我需要使用spring批处理开发一个作业,它将从csv文件中读取数据并写入oracle数据库 . 我需要实现多线程/并行处理以便更快地处理,因为预计记录将达到数百万 .
Question 1:
是否适合为此目的使用多线程(任务执行程序)或分区(分区程序)?哪个会更好地服务于目的?
Part 2 :
我正在尝试使用分区程序 . 我需要跳过导致插入失败的记录并将它们打印到日志中 . 实现跳过侦听器的侦听器打印这些侦听器 . 但我面对分区器的问题是我的侦听器方法被每个跳过的记录的每个线程调用 . 例如 . 跳过4个线程和4个记录,因此控制台正在打印 4*4 = 16
记录,而不是仅仅4个跳过的记录 .
listener print statement:
@OnSkipInWrite
public void logWrite(Report item, Throwable t) {
count++;
System.out.println("record skipped before writing " +count +" : " +item.toString());
}
Job xml code for partitioner:
<batch:step id="step1">
<batch:partition step = "partitionReadWrite" partitioner= "rangePartitioner">
<batch:handler grid-size = "4" task-executor = "task-executor"/>
</batch:partition>
</batch:step>
</batch:job>
<batch:step id = "partitionReadWrite" >
<batch:tasklet>
<batch:chunk reader="cvsFileItemReader" writer="mysqlItemWriter"
commit-interval="10" skip-limit="50" >
<batch:skippable-exception-classes>
<batch:include class = "java.sql.SQLException"/>
<batch:include class = "org.springframework.dao.DataAccessException" />
</batch:skippable-exception-classes>
</batch:chunk>
<batch:listeners>
<batch:listener ref="orderSkipListener" />
</batch:listeners>
</batch:tasklet>
</batch:step>
<bean id="task-executor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
<property name="corePoolSize" value="5" />
<property name="maxPoolSize" value="10" />
<property name="allowCoreThreadTimeOut" value="true" />
</bean>
reader and writer :
<bean id="cvsFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope = "step">
<property name = "linesToSkip" value = "1"/>
<!-- Read a csv file -->
<property name="resource" value="classpath:cvs/report.csv" />
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<!-- split it -->
<property name="lineTokenizer">
<bean
class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="names" value="date,impressions,clicks,earning" />
<property name = "includedFields" value = "0,1,2,3" />
</bean>
</property>
<property name="fieldSetMapper">
<!-- return back to reader, rather than a mapped object. -->
<!-- <bean class="org.springframework.batch.item.file.mapping.PassThroughFieldSetMapper" /> -->
<!-- map to an object -->
<bean
class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
<property name="prototypeBeanName" value="report" />
</bean>
</property>
</bean>
</property>
</bean>
Writer
我使用的是JdbcBatchItemWriter .
任何线程安全的Writer要使用?
<bean id="mysqlItemWriter"
class="org.springframework.batch.item.database.JdbcBatchItemWriter" scope = "step">
<property name="dataSource" ref="dataSource" />
<property name="sql">
<value = "{insertquery}/>
</property>
<!-- It will take care matching between object property and sql name parameter -->
<property name="itemSqlParameterSourceProvider">
<bean
class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" />
</property>
</bean>
Question 2:
怎么用skip来处理失败?有没有其他方法可以防止数据库插入失败,以防止wole块失败?