首页 文章

python multithreading等到所有线程都完成了

提问于
浏览
75

这可能是在类似的背景下提出的,但是在搜索约20分钟后我无法找到答案,所以我会问 .

我编写了一个Python脚本(比方说:scriptA.py)和一个脚本(比如说scriptB.py)

在scriptB中我想用不同的参数多次调用scriptA,每次运行大约需要一个小时,(它是一个巨大的脚本,做了很多东西......不用担心它)我希望能够运行scriptA同时具有所有不同的参数,但我需要等到所有这些都完成后再继续;我的代码:

import subprocess

#setup
do_setup()

#run scriptA
subprocess.call(scriptA + argumentsA)
subprocess.call(scriptA + argumentsB)
subprocess.call(scriptA + argumentsC)

#finish
do_finish()

我想同时运行所有 subprocess.call() ,然后等到它们全部完成,我该怎么做?

我试图像示例here一样使用线程:

from threading import Thread
import subprocess

def call_script(args)
    subprocess.call(args)

#run scriptA   
t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))
t1.start()
t2.start()
t3.start()

但我不认为这是对的 .

在我去 do_finish() 之前,我如何知道他们已经完成了所有运行?

8 回答

  • 4

    将线程放在列表中,然后使用Join method

    threads = []
    
     t = Thread(...)
     threads.append(t)
    
     ...repeat as often as necessary...
    
     # Start all threads
     for x in threads:
         x.start()
    
     # Wait for all of them to finish
     for x in threads:
         x.join()
    
  • 0

    我更喜欢使用基于输入列表的列表推导:

    inputs = [scriptA + argumentsA, scriptA + argumentsB, ...]
    threads = [Thread(target=call_script, args=(i)) for i in inputs]
    [t.start() for t in threads]
    [t.join() for t in threads]
    
  • 2

    您需要在脚本的末尾使用 Thread 对象的join方法 .

    t1 = Thread(target=call_script, args=(scriptA + argumentsA))
    t2 = Thread(target=call_script, args=(scriptA + argumentsB))
    t3 = Thread(target=call_script, args=(scriptA + argumentsC))
    
    t1.start()
    t2.start()
    t3.start()
    
    t1.join()
    t2.join()
    t3.join()
    

    因此主线程将等到 t1t2t3 完成执行 .

  • 106

    在Python3中,由于Python 3.2有一种新的方法可以达到相同的结果,我个人更喜欢传统的线程创建/启动/加入,包 concurrent.futureshttps://docs.python.org/3/library/concurrent.futures.html

    使用 ThreadPoolExecutor 代码将是:

    from concurrent.futures.thread import ThreadPoolExecutor
    
    def call_script(arg)
        subprocess.call(scriptA + arg)
    
    args = [argumentsA, argumentsB, argumentsC]
    with ThreadPoolExecutor(max_workers=2) as executor:
        for arg in args:
            executor.submit(call_script, arg)
    print('All tasks has been finished')
    

    其中一个优点是您可以控制吞吐量设置最大并发工作者 .

  • 0

    您可以使用类似下面的类,您可以在其中添加'n'个函数或者您想要以并行激情执行的console_scripts并开始执行并等待所有作业完成 .

    from multiprocessing import Process
    
    class ProcessParallel(object):
        """
        To Process the  functions parallely
    
        """    
        def __init__(self, *jobs):
            """
            """
            self.jobs = jobs
            self.processes = []
    
        def fork_processes(self):
            """
            Creates the process objects for given function deligates
            """
            for job in self.jobs:
                proc  = Process(target=job)
                self.processes.append(proc)
    
        def start_all(self):
            """
            Starts the functions process all together.
            """
            for proc in self.processes:
                proc.start()
    
        def join_all(self):
            """
            Waits untill all the functions executed.
            """
            for proc in self.processes:
                proc.join()
    
    
    def two_sum(a=2, b=2):
        return a + b
    
    def multiply(a=2, b=2):
        return a * b
    
    
    #How to run:
    if __name__ == '__main__':
        #note: two_sum, multiply can be replace with any python console scripts which
        #you wanted to run parallel..
        procs =  ProcessParallel(two_sum, multiply)
        #Add all the process in list
        procs.fork_processes()
        #starts  process execution 
        procs.start_all()
        #wait until all the process got executed
        procs.join_all()
    
  • 128

    也许,像

    for t in threading.enumerate():
        if t.daemon:
            t.join()
    
  • 18

    我刚遇到同样的问题,我需要等待使用for循环创建的所有线程 . 我只是尝试了下面的代码 . 它可能不是完美的解决方案,但我认为这将是一个简单的解决方案去测试:

    for t in threading.enumerate():
        try:
            t.join()
        except RuntimeError as err:
            if 'cannot join current thread' in err:
                continue
            else:
                raise
    
  • 13

    来自 threading module documentation

    有一个“主线程”对象;这对应于Python程序中的初始控制线程 . 它不是守护程序线程 . 有可能创建“虚拟线程对象” . 这些是对应于“外部线程”的线程对象,它们是在线程模块外部启动的控制线程,例如直接来自C代码 . 虚拟线程对象具有有限的功能;他们总是被认为是活着的和守护的,不能加入()编辑 . 它们永远不会被删除,因为无法检测外来线程的终止 .

    因此,当您不想保留您创建的线程列表时,要捕获这两种情况:

    import threading as thrd
    
    
    def alter_data(data, index):
        data[index] *= 2
    
    
    data = [0, 2, 6, 20]
    
    for i, value in enumerate(data):
        thrd.Thread(target=alter_data, args=[data, i]).start()
    
    for thread in thrd.enumerate():
        if thread.daemon:
            continue
        try:
            thread.join()
        except RuntimeError as err:
            if 'cannot join current thread' in err.args[0]:
                # catchs main thread
                continue
            else:
                raise
    

    于是:

    >>> print(data)
    [0, 4, 12, 40]
    

相关问题