首页 文章

在后台线程中运行的任务的可编辑队列

提问于
浏览
1

我知道这个问题已被多次回答,但我很难理解它是如何工作的 .

因此,在我的应用程序中,用户必须能够选择将添加到队列中的项目(使用 ObservableList<Task> 显示在 ListView 中),并且每个项目都需要由 ExecutorService 按顺序处理 .

该队列也应该是可编辑的(更改顺序并从列表中删除项目) .

private void handleItemClicked(MouseEvent event) {
    if (event.getClickCount() == 2) {
        File item = listView.getSelectionModel().getSelectedItem();
        Task<Void> task = createTask(item);
        facade.getTaskQueueList().add(task); // this list is bound to a ListView, where it can be edited
        Future result = executor.submit(task); 
        // where executor is an ExecutorService of which type?

        try {
            result.get();
        } catch (Exception e) {
            // ...
        }
    }
}

尝试使用 executor = Executors.newFixedThreadPool(1) ,但我无法控制队列 .
我读了 ThreadPoolExecutor 和队列,但我对并发很新 .

我需要在后台线程中运行该方法 handleItemClicked ,以便UI不会冻结,我该怎样才能做到最好?

Summed up: How can I implement a queue of tasks, which is editable and sequentially processed by a background thread?

请帮我搞清楚

EDIT 使用vanOekel的 SerialTaskQueue 类帮助了我,现在我想将任务列表绑定到我的 ListView .

ListProperty<Runnable> listProperty = new SimpleListProperty<>();
listProperty.set(taskQueue.getTaskList()); // getTaskList() returns the LinkedList from SerialTaskQueue
queueListView.itemsProperty().bind(listProperty);

显然这不起作用,因为它期望一个ObservableList . 有一种优雅的方式吗?

1 回答

  • 1

    我能想到的最简单的解决方案是将任务列表保存在执行程序之外,并使用回调为执行程序提供下一个任务(如果可用) . 不幸的是,它涉及任务列表上的同步和 AtomicBoolean 以指示正在执行的任务 .

    回调只是一个包装原始任务的 Runnable ,然后"calls back"查看是否还有另一个要执行的任务,如果是,则使用(后台)执行程序执行它 .

    需要同步以使任务列表保持有序且处于已知状态 . 任务列表可以由两个线程同时修改:通过执行程序(后台)线程中运行的回调和通过UI前台线程执行的 handleItemClicked 方法 . 这反过来意味着,例如,当任务列表为空时,它永远不会完全知道 . 为了使任务列表按顺序保持在已知的固定状态,需要同步任务列表 .

    这仍然留下了一个模糊的时刻来决定任务何时可以执行 . 这就是 AtomicBoolean 的用武之地:值集总是可以立即获取并由任何其他线程读取, compareAndSet 方法将始终确保只有一个线程获得"OK" .

    结 Contract 步和 AtomicBoolean 的使用允许创建一个带有"critical section"的方法,可以同时由前台线程和后台线程调用,以便在可能的情况下触发新任务的执行 . 以下代码的设计和设置方式可以存在一种这样的方法( runNextTask ) . 优良作法是使并发代码中的"critical section"尽可能简单明了(反过来,通常会导致高效"critical section") .

    import java.util.*;
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    public class SerialTaskQueue {
    
        public static void main(String[] args) {
    
            ExecutorService executor = Executors.newSingleThreadExecutor();
            // all operations on this list must be synchronized on the list itself.
            SerialTaskQueue tq = new SerialTaskQueue(executor);
            try {
                // test running the tasks one by one
                tq.add(new SleepSome(10L));
                Thread.sleep(5L);
                tq.add(new SleepSome(20L));
                tq.add(new SleepSome(30L));
    
                Thread.sleep(100L);
                System.out.println("Queue size: " + tq.size()); // should be empty
                tq.add(new SleepSome(10L));
    
                Thread.sleep(100L);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                executor.shutdownNow();
            }
        }
    
        // all lookups and modifications to the list must be synchronized on the list.
        private final List<Runnable> tasks = new LinkedList<Runnable>();
        // atomic boolean used to ensure only 1 task is executed at any given time
        private final AtomicBoolean executeNextTask = new AtomicBoolean(true);
        private final Executor executor;
    
        public SerialTaskQueue(Executor executor) {
            this.executor = executor;
        }
    
        public void add(Runnable task) {
    
            synchronized(tasks) { tasks.add(task); }
            runNextTask();
        }
    
        private void runNextTask() {
            // critical section that ensures one task is executed.
            synchronized(tasks) {
                if (!tasks.isEmpty()
                        && executeNextTask.compareAndSet(true, false)) {
                    executor.execute(wrapTask(tasks.remove(0)));
                }
            }
        }
    
        private CallbackTask wrapTask(Runnable task) {
    
            return new CallbackTask(task, new Runnable() {
                @Override public void run() {
                    if (!executeNextTask.compareAndSet(false, true)) {
                        System.out.println("ERROR: programming error, the callback should always run in execute state.");
                    }
                    runNextTask();
                }
            });
        }
    
        public int size() {
            synchronized(tasks) { return tasks.size(); }
        }
    
        public Runnable get(int index) {
            synchronized(tasks) { return tasks.get(index); }
        }
    
        public Runnable remove(int index) {
            synchronized(tasks) { return tasks.remove(index); }
        }
    
        // general callback-task, see https://stackoverflow.com/a/826283/3080094
        static class CallbackTask implements Runnable {
    
            private final Runnable task, callback;
    
            public CallbackTask(Runnable task, Runnable callback) {
                this.task = task;
                this.callback = callback;
            }
    
            @Override public void run() {
                try {
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    try {
                        callback.run();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        // task that just sleeps for a while
        static class SleepSome implements Runnable {
    
            static long startTime = System.currentTimeMillis();
    
            private final long sleepTimeMs;
            public SleepSome(long sleepTimeMs) {
                this.sleepTimeMs = sleepTimeMs;
            }
            @Override public void run() {
                try { 
                    System.out.println(tdelta() + "Sleeping for " + sleepTimeMs + " ms.");
                    Thread.sleep(sleepTimeMs);
                    System.out.println(tdelta() + "Slept for " + sleepTimeMs + " ms.");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
            private String tdelta() { return String.format("% 4d ", (System.currentTimeMillis() - startTime)); }
        }
    }
    

    更新:如果任务组需要串行执行,请查看改编后的实现here .

相关问题