如何从数据库分区读取器,编写不同文件并优化线程负载

编辑:

我认为这个条款有问题:

我尝试运行我的第一个运行单线程的测试,大约需要35分钟,其中whereCause和执行速度非常慢 . 当我从表中执行select *时,whitout whereClause正常进行该过程 .


我尝试使用Spring Batch在Job中使用Step Partitioning,但我没有意识到它是否适合我的情况:

我从数据库中读取了大约3000万条记录 . 在记录中,我有一个列bank_id,大约有23个不同的银行 .

我必须读取此列中的值,并将每个库中的记录分成不同的txt文件 .

我希望我的工作在4或8个线程中并行处理工作,在第一时间我尝试使用步骤分区,然后将作业分成4个从属并设置我在SqlPagingQueryProviderFactoryBean中的查询参数中处理的id_bank,我只使用4种不同的ID . 但是从一个bank_id到另一个bank_id的记录数量变化很大,导致奴隶完成他们在其他人之前工作 .

我希望当奴隶完成工作时,他开始处理另一个bank_id .

我需要帮助在 Spring 季批次中做这样的事情 . 我使用2.1版本的 spring 批次 .

这是我的文件:

<bean id="arquivoWriter"
        class="org.springframework.batch.item.file.FlatFileItemWriter"
        scope="step">
        <property name="encoding" value="ISO-8859-1" />
        <property name="lineAggregator">
            <bean
                class="org.springframework.batch.item.file.transform.FormatterLineAggregator">
                <property name="fieldExtractor">
                    <bean
                        class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
                        <property name="names"
                            value="name_bank, id_bank, etc" />
                    </bean>
                </property>
                <property name="format"
                    value="..." />
            </bean>
        </property>
        <property name="resource"
            value="file:./arquivos/#{stepExecutionContext[faixa]}.txt" />
    </bean>



<job id="partitionJob" xmlns="http://www.springframework.org/schema/batch">
    <step id="masterStep"> 
        <partition step="slave" partitioner="rangePartitioner">
            <handler task-executor="taskExecutor" />
        </partition>
    </step>
</job>

<step id="slave" xmlns="http://www.springframework.org/schema/batch">
    <tasklet>
        <chunk reader="pagingReader" writer="arquivoWriter"
            commit-interval="#{jobParameters['commit.interval']}" />

        <listeners>
            <listener ref="myChunkListener"></listener>
        </listeners>        
    </tasklet>
</step>

<bean id="rangePartitioner" class="....RangePartitioner" />



<bean id="pagingReader"
    class="org.springframework.batch.item.database.JdbcPagingItemReader"
    scope="step">
    <property name="dataSource" ref="dataSource" />
    <property name="fetchSize" value="#{jobParameters['fetch.size']}"></property>
    <property name="queryProvider">
        <bean
            class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean">
            <property name="dataSource" ref="dataSource" />
            <property name="selectClause">
                <value>
                    <![CDATA[
                       SELECT ...
                    ]]>
                </value>
            </property>
            <property name="fromClause" value="FROM my_table" />
            <property name="whereClause" value="where id_bank = :id_op" /> 

        </bean>
    </property>
    <property name="parameterValues">
        <map>
            <entry key="id_op" value="#{stepExecutionContext[id_op]}" />
        </map>
    </property>
    <property name="maxItemCount" value="#{jobParameters['max.rows']}"></property>
    <property name="rowMapper">
        <bean class="....reader.MyRowMapper" />
    </property>
</bean>

范围分区器:

public class RangePartitioner implements Partitioner {

@Autowired
BancoDao bancoDao;

final Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext> ();
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
    List<OrgaoPagadorQuantidadeRegistrosTO> lista = bancoDao.findIdsOps();
    for (OrgaoPagadorQuantidadeRegistrosTO op:lista){
        String name = String.valueOf(op.getIdOrgaoPagador());
        ExecutionContext ex = new ExecutionContext();
        ex.putLong("id_op", op.getIdBank());
        ex.putString ("faixa", name);
        result.put("p"+name, ex);
    }
    return result;
}

}

回答(1)

2 years ago

假设你有足够的工作为每个奴隶工作,你所要求的应该是有用的 . 例如,如果你有23家银行,但其中一家拥有2000万条记录而其他银行都有10万条,那么不在大银行工作的奴隶就会迅速释放 .

您是在为每个银行还是每个线程创建StepExecution?我建议每家银行都这样做 . 这将允许线程在完成时接收工作 . 否则,您最终会通过实现执行此规范化的分区程序来负责该负载 balancer .