实体不坚持 . RepositoryItemWriter和SimpleJpaWriter是线程安全的吗?

我遇到了一个RepositoryItemWriter的奇怪问题,它似乎没有通过我配置的Spring Data JPA存储库正确地将实体持久化到数据源 .

步骤配置

@Bean
public Step orderStep(StepBuilderFactory stepBuilderFactory, ItemReader<OrderEncounter> orderEncounterReader, ItemWriter<List<Order>> orderWriter,
                      ItemProcessor<OrderEncounter, List<Order>> orderProcessor, TaskExecutor taskExecutor) {
    return stepBuilderFactory.get("orderStep")
            .<OrderEncounter, List<Order>> chunk(10)
            .reader(orderEncounterReader)
            .processor(orderProcessor)
            .writer(orderWriter)
            .taskExecutor(taskExecutor)
            .build();
}

任务 Actuator Bean(配置为1个线程用于测试目的)

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(1);
    taskExecutor.setMaxPoolSize(1);
    taskExecutor.afterPropertiesSet();
    return taskExecutor;
}

OrderRepository

@Repository
public interface OrderRepository extends PagingAndSortingRepository<Order, Long> {

}

orderWriter bean

@Bean
public ItemWriter<List<Order>> orderWriter(OrderRepository orderRepository) {
    RepositoryListItemWriter<List<Order>> writer = new RepositoryListItemWriter<>();
    writer.setRepository(orderRepository);
    writer.setMethodName("save");

    try {
        writer.afterPropertiesSet();
    } catch (Exception e) {
        e.printStackTrace();
    }

    return writer;
}

RepositoryListItemWriter(修改后的RepositoryItemWriter,支持从存储过程结果集返回的多个项)

public class RepositoryListItemWriter<T> implements ItemWriter<T>, InitializingBean {
    protected static final Log logger = LogFactory.getLog(RepositoryListItemWriter.class);
    private CrudRepository<?, ?> repository;
    private String methodName;

    public RepositoryListItemWriter() {
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public void setRepository(CrudRepository<?, ?> repository) {
        this.repository = repository;
    }

    public void write(List<? extends T> items) throws Exception {
        if(!CollectionUtils.isEmpty(items)) {
            this.doWrite(items);
        }

    }

    protected void doWrite(List<? extends T> items) throws Exception {
        if(logger.isDebugEnabled()) {
            logger.debug("Writing to the repository with " + items.size() + " items.");
        }

        MethodInvoker invoker = this.createMethodInvoker(this.repository, this.methodName);
        Iterator i$ = items.iterator();

        while(i$.hasNext()) {
            Object object = i$.next();
            invoker.setArguments(new Object[]{object});
            this.doInvoke(invoker);
        }

    }

    public void afterPropertiesSet() throws Exception {
        Assert.state(this.repository != null, "A CrudRepository implementation is required");
    }

    private Object doInvoke(MethodInvoker invoker) throws Exception {
        try {
            invoker.prepare();
        } catch (ClassNotFoundException var3) {
            throw new DynamicMethodInvocationException(var3);
        } catch (NoSuchMethodException var4) {
            throw new DynamicMethodInvocationException(var4);
        }

        try {
            return invoker.invoke();
        } catch (InvocationTargetException var5) {
            if(var5.getCause() instanceof Exception) {
                throw (Exception)var5.getCause();
            } else {
                throw new InvocationTargetThrowableWrapper(var5.getCause());
            }
        } catch (IllegalAccessException var6) {
            throw new DynamicMethodInvocationException(var6);
        }
    }

    private MethodInvoker createMethodInvoker(Object targetObject, String targetMethod) {
        MethodInvoker invoker = new MethodInvoker();
        invoker.setTargetObject(targetObject);
        invoker.setTargetMethod(targetMethod);
        return invoker;
    }
}

如果在没有taskExecutor的情况下配置orderStep,那么实体可以通过RepositoryItemWriter继续保存到数据库中;但是,当它甚至配置了单线程的taskExecutor时,在整个作业执行期间 - 包括完成后 - 都不会持久保存到数据库 .

我已经进行了大量的研究,RepositoryItemWriter似乎是线程安全的 - 还有PagingAndSortingRepository和SimpleJpaWriter . 有什么建议?

跟踪级日志输出

工作中没有TaskExecutor:

2015-11-23 20:51:07.589 TRACE 31126 --- [nio-8080-exec-1] o.s.beans.CachedIntrospectionResults     : Found bean property 'verified
ChangedByUsername' of type [java.lang.String]2015-11-23 20:51:07.589 TRACE 31126 --- [nio-8080-exec-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springfr
amework.orm.jpa.EntityManagerHolder@56ca0bbc] for key [org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean@5ffffca4] bound to thread [http-nio-8080-exec-1]2015-11-23 20:51:07.590 TRACE 31126 --- [nio-8080-exec-1] o.h.e.i.AbstractSaveEventListener        : Transient instance of: com.ii
massociates.distiller.domain.Order2015-11-23 20:51:07.590 TRACE 31126 --- [nio-8080-exec-1] o.h.e.i.DefaultPersistEventListener      : Saving transient instance
2015-11-23 20:51:07.590 TRACE 31126 --- [nio-8080-exec-1] o.h.e.i.AbstractSaveEventListener        : Saving [com.iimassociates.distiller.domain.Order#<null>]
2015-11-23 20:51:07.591 TRACE 31126 --- [nio-8080-exec-1] org.hibernate.engine.spi.ActionQueue     : Adding an EntityIdentityInsertAction for [com.iimassociates.distiller.domain.Order] object
2015-11-23 20:51:07.591 TRACE 31126 --- [nio-8080-exec-1] org.hibernate.engine.spi.ActionQueue     : Executing inserts before finding non-nullable transient entities for early insert: [EntityIdentityInsertAction[com.iimassociates.distiller.domain.Order#<null>]
]
2015-11-23 20:51:07.591 TRACE 31126 --- [nio-8080-exec-1] org.hibernate.engine.spi.ActionQueue     : Adding insert with no non-nullable, transient entities: [EntityIdentityInsertAction[com.iimassociates.distiller.domain.Order#<null>]]
2015-11-23 20:51:07.591 TRACE 31126 --- [nio-8080-exec-1] org.hibernate.engine.spi.ActionQueue     : Executing insertions before resolved early-insert
2015-11-23 20:51:07.591 DEBUG 31126 --- [nio-8080-exec-1] org.hibernate.engine.spi.ActionQueue     : Executing identity-insert immediately
2015-11-23 20:51:07.591 TRACE 31126 --- [nio-8080-exec-1] o.h.p.entity.AbstractEntityPersister     : Inserting entity: com.iimassociates.distiller.domain.Order (native id)
2015-11-23 20:51:07.592 DEBUG 31126 --- [nio-8080-exec-1] org.hibernate.SQL                        : insert into orders (blah blah SQL)

TaskExecutor在工作中

2015-11-23 21:02:34.628 TRACE 31257 --- [ taskExecutor-1] o.s.beans.CachedIntrospectionResults     : Found bean property 'verifiedChangedByUsername' of type [java.lang.String]
2015-11-23 21:02:34.628 TRACE 31257 --- [ taskExecutor-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.orm.jpa.EntityManagerHolder@2b151d8a] for key [org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean@2706e09c] bound to thread [taskExecutor-1]
2015-11-23 21:02:34.629 TRACE 31257 --- [ taskExecutor-1] o.h.e.i.AbstractSaveEventListener        : Transient instance of: com.iimassociates.distiller.domain.Order
2015-11-23 21:02:34.629 TRACE 31257 --- [ taskExecutor-1] o.h.e.i.DefaultPersistEventListener      : Saving transient instance
2015-11-23 21:02:34.629 TRACE 31257 --- [ taskExecutor-1] o.h.e.i.AbstractSaveEventListener        : Saving [com.iimassociates.distiller.domain.Order#<null>]
2015-11-23 21:02:34.630 TRACE 31257 --- [ taskExecutor-1] org.hibernate.engine.spi.ActionQueue     : Adding an EntityIdentityInsertAction for [com.iimassociates.distiller.domain.Order] object
2015-11-23 21:02:34.630 TRACE 31257 --- [ taskExecutor-1] org.hibernate.engine.spi.ActionQueue     : Adding insert with no non-nullable, transient entities: [EntityIdentityInsertAction[com.iimassociates.distiller.domain.Order#<delayed:1>]]
2015-11-23 21:02:34.630 TRACE 31257 --- [ taskExecutor-1] org.hibernate.engine.spi.ActionQueue     : Adding resolved non-early insert action.
2015-11-23 21:02:34.630 TRACE 31257 --- [ taskExecutor-1] o.h.a.i.UnresolvedEntityInsertActions    : No unresolved entity inserts that depended on [[com.iimassociates.distiller.domain.Order#<delayed:1>]]
2015-11-23 21:02:34.630 TRACE 31257 --- [ taskExecutor-1] o.h.a.i.UnresolvedEntityInsertActions    : No entity insert actions have non-nullable, transient entity dependencies.
2015-11-23 21:02:34.630 TRACE 31257 --- [ taskExecutor-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.orm.jpa.EntityManagerHolder@2b151d8a] for key [org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean@2706e09c] bound to thread [taskExecutor-1]
2015-11-23 21:02:34.630 TRACE 31257 --- [ taskExecutor-1] o.h.e.i.AbstractSaveEventListener        : Transient instance of: com.iimassociates.distiller.domain.Order
2015-11-23 21:02:34.630 TRACE 31257 --- [ taskExecutor-1] o.h.e.i.DefaultPersistEventListener      : Saving transient instance
2015-11-23 21:02:34.630 TRACE 31257 --- [ taskExecutor-1] o.h.e.i.AbstractSaveEventListener        : Saving [com.iimassociates.distiller.domain.Order#<null>]
2015-11-23 21:02:34.631 TRACE 31257 --- [ taskExecutor-1] org.hibernate.engine.spi.ActionQueue     : Adding an EntityIdentityInsertAction for [com.iimassociates.distiller.domain.Order] object
2015-11-23 21:02:34.631 TRACE 31257 --- [ taskExecutor-1] org.hibernate.engine.spi.ActionQueue     : Adding insert with no non-nullable, transient entities: [EntityIdentityInsertAction[com.iimassociates.distiller.domain.Order#<delayed:2>]]
2015-11-23 21:02:34.631 TRACE 31257 --- [ taskExecutor-1] org.hibernate.engine.spi.ActionQueue     : Adding resolved non-early insert action.

回答(1)

2 years ago

看起来我通过添加PlatformTransactionManager来修复它 . 请参阅以下更改 . 希望这对某人有帮助,因为这是我几周来一直在争斗的一个人 . 我不明白为什么Spring Boot能够在我的pom.xml中提供没有Bitronix或Atomikos的JtaTransactionManager .

@SuppressWarnings("SpringJavaAutowiringInspection")
@Bean
public Step orderStep(StepBuilderFactory stepBuilderFactory, ItemReader<OrderEncounter> orderEncounterReader, ItemWriter<List<Order>> orderWriter,
                      ItemProcessor<OrderEncounter, List<Order>> orderProcessor, TaskExecutor taskExecutor, PlatformTransactionManager platformTransactionManager) {
    return stepBuilderFactory.get("orderStep")
            .<OrderEncounter, List<Order>> chunk(10)
            .reader(orderEncounterReader)
            .processor(orderProcessor)
            .writer(orderWriter)
            .taskExecutor(taskExecutor)
            .transactionManager(platformTransactionManager)
            .build();
}

和配置类:

@Configuration
public class JTOpenDataSourceConfiguration {

@Bean(name="distillerDataSource")
@Primary
@ConfigurationProperties(prefix="spring.datasource.distiller")
public DataSource distillerDataSource() {
    return DataSourceBuilder.create().build();
}

@Bean
@ConfigurationProperties(prefix="spring.datasource.target")
public DataSource targetDataSource() {
    return DataSourceBuilder.create().build();
}

@Bean
public PlatformTransactionManager platformTransactionManager(@Qualifier("targetDataSource") DataSource targetDataSource) {
    JpaTransactionManager transactionManager = new JpaTransactionManager();
    transactionManager.setDataSource(targetDataSource);
    return transactionManager;
}