首页 文章

具有 spring 批作业的Spring Cloud 数据流 - 扩展考虑因素

提问于
浏览
2

我们目前正在评估流程从Spring批量管理员转移到基于Spring Cloud的基础架构 .

our main challenges / questions:

1. 作为 spring 批处理作业的整体设计的一部分,我们正在获取一些通用MD并将其聚合到通用数据结构中,许多作业使用这种结构以更优化的方式运行 . 在我们的案例中,SCDF任务的性质是否会成为一个问题?我们应该重新考虑转移到Streams吗?以及如何做到这一点?

2. 使用SCDF的一个主要原因是支持扩展以获得更好的性能 . 作为第一个POC,我们很难创建一个真正的 Cloud 基础架构,而我正在寻找使用远程分区设计进行扩展解决方案的独立SCDF . 我们正在寻找演示/介绍GitHub项目/指南 - 我没有找到任何相关的东西 . 它是否也需要过去几年通过JMS基础设施(Spring Integration)解决节点之间的通信?

3. 我们面临的主要挑战是重构我们的批处理作业,并能够支持每个节点上的远程分区和多个线程 . 是否可以创建具有这两个方面的 spring 批处理作业 .

4. 将20个乔布斯的单片 jar 拆分成单独的 spring 靴überjars并非简单的任务 - 任何想法/想法/最佳实践 .

最好,很高兴

1 回答

  • 0

    我遇到了与Elad的第3点相同的问题,并最终通过使用基本框架解决了它,如演示here但使用DeployerPartitionHandler和DeployerStepExecutionHandler的修改版本 .

    我首先尝试了创建两级分区的天真方法,其中每个工作程序执行的步骤本身被分区为子分区 . 但该框架似乎并不支持这一点;它对步骤的状态感到困惑 .

    所以我回到了一组平面分区,但是将多个步骤执行ID传递给每个工作者 . 为此,我创建了DeployerMultiPartitionHandler,它启动配置的worker数,并为每个worker传递一个步执行ID列表 . 请注意,现在有两个自由度:worker和gridSize,它是尽可能均匀地分配给worker的分区总数 . 不幸的是,我不得不在这里复制很多DeployerPartitionHandler的代码 .

    @Slf4j
    @Getter
    @Setter
    public class DeployerMultiPartitionHandler implements PartitionHandler, EnvironmentAware, InitializingBean {
    
        public static final String SPRING_CLOUD_TASK_STEP_EXECUTION_IDS =
                "spring.cloud.task.step-execution-ids";
    
        public static final String SPRING_CLOUD_TASK_JOB_EXECUTION_ID =
                "spring.cloud.task.job-execution-id";
    
        public static final String SPRING_CLOUD_TASK_STEP_EXECUTION_ID =
                "spring.cloud.task.step-execution-id";
    
        public static final String SPRING_CLOUD_TASK_STEP_NAME =
                "spring.cloud.task.step-name";
    
        public static final String SPRING_CLOUD_TASK_PARENT_EXECUTION_ID =
                "spring.cloud.task.parentExecutionId";
    
        public static final String SPRING_CLOUD_TASK_NAME = "spring.cloud.task.name";
    
        private int maxWorkers = -1;
    
        private int gridSize = 1;
    
        private int currentWorkers = 0;
    
        private TaskLauncher taskLauncher;
    
        private JobExplorer jobExplorer;
    
        private TaskExecution taskExecution;
    
        private Resource resource;
    
        private String stepName;
    
        private long pollInterval = 10000;
    
        private long timeout = -1;
    
        private Environment environment;
    
        private Map<String, String> deploymentProperties;
    
        private EnvironmentVariablesProvider environmentVariablesProvider;
    
        private String applicationName;
    
        private CommandLineArgsProvider commandLineArgsProvider;
    
        private boolean defaultArgsAsEnvironmentVars = false;
    
        public DeployerMultiPartitionHandler(TaskLauncher taskLauncher,
                                        JobExplorer jobExplorer,
                                        Resource resource,
                                        String stepName) {
                Assert.notNull(taskLauncher, "A taskLauncher is required");
                Assert.notNull(jobExplorer, "A jobExplorer is required");
                Assert.notNull(resource, "A resource is required");
                Assert.hasText(stepName, "A step name is required");
    
                this.taskLauncher = taskLauncher;
                this.jobExplorer = jobExplorer;
                this.resource = resource;
                this.stepName = stepName;
        }
    
        @Override
        public Collection<StepExecution> handle(StepExecutionSplitter stepSplitter,
                                                StepExecution stepExecution) throws Exception {
    
    
            final Set<StepExecution> tempCandidates =
                    stepSplitter.split(stepExecution, this.gridSize);
    
            // Following two lines due to https://jira.spring.io/browse/BATCH-2490
            final List<StepExecution> candidates = new ArrayList<>(tempCandidates.size());
            candidates.addAll(tempCandidates);
    
            int partitions = candidates.size();
    
            log.debug(String.format("%s partitions were returned", partitions));
    
            final Set<StepExecution> executed = new HashSet<>(candidates.size());
    
            if (CollectionUtils.isEmpty(candidates)) {
                return null;
            }
    
            launchWorkers(candidates, executed);
    
            candidates.removeAll(executed);
    
            return pollReplies(stepExecution, executed, partitions);
        }
    
        private void launchWorkers(List<StepExecution> candidates, Set<StepExecution> executed) {
            int partitions = candidates.size();
            int numWorkers = this.maxWorkers != -1 ? Math.min(this.maxWorkers, partitions) : partitions;
            IntStream.range(0, numWorkers).boxed()
                    .map(i -> candidates.subList(partitionOffset(partitions, numWorkers, i), partitionOffset(partitions, numWorkers, i + 1)))
                    .filter(not(List::isEmpty))
                    .forEach(stepExecutions -> processStepExecutions(stepExecutions, executed));
        }
    
        private void processStepExecutions(List<StepExecution> stepExecutions, Set<StepExecution> executed) {
            launchWorker(stepExecutions);
            this.currentWorkers++;
            executed.addAll(stepExecutions);
        }
    
        private void launchWorker(List<StepExecution> workerStepExecutions) {
            List<String> arguments = new ArrayList<>();
    
            StepExecution firstWorkerStepExecution = workerStepExecutions.get(0);
            ExecutionContext copyContext = new ExecutionContext(firstWorkerStepExecution.getExecutionContext());
    
            arguments.addAll(
                    this.commandLineArgsProvider
                            .getCommandLineArgs(copyContext));
    
            String jobExecutionId = String.valueOf(firstWorkerStepExecution.getJobExecution().getId());
            String stepExecutionIds = workerStepExecutions.stream().map(workerStepExecution -> String.valueOf(workerStepExecution.getId())).collect(joining(","));
            String taskName = String.format("%s_%s_%s",
                    taskExecution.getTaskName(),
                    firstWorkerStepExecution.getJobExecution().getJobInstance().getJobName(),
                    firstWorkerStepExecution.getStepName());
            String parentExecutionId = String.valueOf(taskExecution.getExecutionId());
    
            if(!this.defaultArgsAsEnvironmentVars) {
                arguments.add(formatArgument(SPRING_CLOUD_TASK_JOB_EXECUTION_ID,
                        jobExecutionId));
                arguments.add(formatArgument(SPRING_CLOUD_TASK_STEP_EXECUTION_IDS,
                        stepExecutionIds));
                arguments.add(formatArgument(SPRING_CLOUD_TASK_STEP_NAME, this.stepName));
                arguments.add(formatArgument(SPRING_CLOUD_TASK_NAME, taskName));
                arguments.add(formatArgument(SPRING_CLOUD_TASK_PARENT_EXECUTION_ID,
                        parentExecutionId));
            }
    
            copyContext = new ExecutionContext(firstWorkerStepExecution.getExecutionContext());
    
            log.info("launchWorker context={}", copyContext);
    
            Map<String, String> environmentVariables = this.environmentVariablesProvider.getEnvironmentVariables(copyContext);
    
            if(this.defaultArgsAsEnvironmentVars) {
                environmentVariables.put(SPRING_CLOUD_TASK_JOB_EXECUTION_ID,
                        jobExecutionId);
                environmentVariables.put(SPRING_CLOUD_TASK_STEP_EXECUTION_ID,
                        String.valueOf(firstWorkerStepExecution.getId()));
                environmentVariables.put(SPRING_CLOUD_TASK_STEP_NAME, this.stepName);
                environmentVariables.put(SPRING_CLOUD_TASK_NAME, taskName);
                environmentVariables.put(SPRING_CLOUD_TASK_PARENT_EXECUTION_ID,
                        parentExecutionId);
            }
    
            AppDefinition definition =
                    new AppDefinition(resolveApplicationName(),
                            environmentVariables);
    
            AppDeploymentRequest request =
                    new AppDeploymentRequest(definition,
                            this.resource,
                            this.deploymentProperties,
                            arguments);
    
            taskLauncher.launch(request);
        }
    
        private String resolveApplicationName() {
            if(StringUtils.hasText(this.applicationName)) {
                return this.applicationName;
            }
            else {
                return this.taskExecution.getTaskName();
            }
        }
    
        private String formatArgument(String key, String value) {
            return String.format("--%s=%s", key, value);
        }
    
        private Collection<StepExecution> pollReplies(final StepExecution masterStepExecution,
                                                      final Set<StepExecution> executed,
                                                      final int size) throws Exception {
    
            final Collection<StepExecution> result = new ArrayList<>(executed.size());
    
            Callable<Collection<StepExecution>> callback = new Callable<Collection<StepExecution>>() {
                @Override
                public Collection<StepExecution> call() {
                    Set<StepExecution> newExecuted = new HashSet<>();
    
                    for (StepExecution curStepExecution : executed) {
                        if (!result.contains(curStepExecution)) {
                            StepExecution partitionStepExecution =
                                    jobExplorer.getStepExecution(masterStepExecution.getJobExecutionId(), curStepExecution.getId());
    
                            if (isComplete(partitionStepExecution.getStatus())) {
                                result.add(partitionStepExecution);
                                currentWorkers--;
                            }
                        }
                    }
    
                    executed.addAll(newExecuted);
    
                    if (result.size() == size) {
                        return result;
                    }
                    else {
                        return null;
                    }
                }
            };
    
            Poller<Collection<StepExecution>> poller = new DirectPoller<>(this.pollInterval);
            Future<Collection<StepExecution>> resultsFuture = poller.poll(callback);
    
            if (timeout >= 0) {
                return resultsFuture.get(timeout, TimeUnit.MILLISECONDS);
            }
            else {
                return resultsFuture.get();
            }
        }
    
        private boolean isComplete(BatchStatus status) {
            return status.equals(BatchStatus.COMPLETED) || status.isGreaterThan(BatchStatus.STARTED);
        }
    
        @Override
        public void setEnvironment(Environment environment) {
            this.environment = environment;
        }
    
        @Override
        public void afterPropertiesSet() {
            Assert.notNull(taskExecution, "A taskExecution is required");
    
            if(this.environmentVariablesProvider == null) {
                this.environmentVariablesProvider =
                        new CloudEnvironmentVariablesProvider(this.environment);
            }
    
            if(this.commandLineArgsProvider == null) {
                SimpleCommandLineArgsProvider simpleCommandLineArgsProvider = new SimpleCommandLineArgsProvider();
                simpleCommandLineArgsProvider.onTaskStartup(taskExecution);
                this.commandLineArgsProvider = simpleCommandLineArgsProvider;
            }
        }
    
    }
    

    在静态函数partitionOffset的帮助下将分区分发给worker,这确保了每个worker接收的分区数最多相差一个:

    static int partitionOffset(int length, int numberOfPartitions, int partitionIndex) {
        return partitionIndex * (length / numberOfPartitions) + Math.min(partitionIndex, length % numberOfPartitions);
    }
    

    在接收端,我创建了DeployerMultiStepExecutionHandler,它继承了TaskExecutorPartitionHandler中分区的并行执行,另外还实现了与DeployerMultiPartitionHandler匹配的命令行界面:

    @Slf4j
    public class DeployerMultiStepExecutionHandler extends TaskExecutorPartitionHandler implements CommandLineRunner {
    
        private JobExplorer jobExplorer;
    
        private JobRepository jobRepository;
    
        private Log logger = LogFactory.getLog(org.springframework.cloud.task.batch.partition.DeployerStepExecutionHandler.class);
    
        @Autowired
        private Environment environment;
    
        private StepLocator stepLocator;
    
        public DeployerMultiStepExecutionHandler(BeanFactory beanFactory, JobExplorer jobExplorer, JobRepository jobRepository) {
            Assert.notNull(beanFactory, "A beanFactory is required");
            Assert.notNull(jobExplorer, "A jobExplorer is required");
            Assert.notNull(jobRepository, "A jobRepository is required");
    
            this.stepLocator = new BeanFactoryStepLocator();
            ((BeanFactoryStepLocator) this.stepLocator).setBeanFactory(beanFactory);
    
            this.jobExplorer = jobExplorer;
            this.jobRepository = jobRepository;
        }
    
        @Override
        public void run(String... args) throws Exception {
    
            validateRequest();
    
            Long jobExecutionId = Long.parseLong(environment.getProperty(SPRING_CLOUD_TASK_JOB_EXECUTION_ID));
            Stream<Long> stepExecutionIds = Stream.of(environment.getProperty(SPRING_CLOUD_TASK_STEP_EXECUTION_IDS).split(",")).map(Long::parseLong);
            Set<StepExecution> stepExecutions = stepExecutionIds.map(stepExecutionId -> jobExplorer.getStepExecution(jobExecutionId, stepExecutionId)).collect(Collectors.toSet());
    
            log.info("found stepExecutions:\n{}", stepExecutions.stream().map(stepExecution -> stepExecution.getId() + ":" + stepExecution.getExecutionContext()).collect(joining("\n")));
    
            if (stepExecutions.isEmpty()) {
                throw new NoSuchStepException(String.format("No StepExecution could be located for step execution id %s within job execution %s", stepExecutionIds, jobExecutionId));
            }
    
            String stepName = environment.getProperty(SPRING_CLOUD_TASK_STEP_NAME);
            setStep(stepLocator.getStep(stepName));
    
            doHandle(null, stepExecutions);
        }
    
        private void validateRequest() {
            Assert.isTrue(environment.containsProperty(SPRING_CLOUD_TASK_JOB_EXECUTION_ID), "A job execution id is required");
            Assert.isTrue(environment.containsProperty(SPRING_CLOUD_TASK_STEP_EXECUTION_IDS), "A step execution id is required");
            Assert.isTrue(environment.containsProperty(SPRING_CLOUD_TASK_STEP_NAME), "A step name is required");
    
            Assert.isTrue(this.stepLocator.getStepNames().contains(environment.getProperty(SPRING_CLOUD_TASK_STEP_NAME)), "The step requested cannot be found in the provided BeanFactory");
        }
    }
    

相关问题