import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
ProcessPoolExecutor
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
import threading
class SummingThread(threading.Thread):
def __init__(self,low,high):
super(SummingThread, self).__init__()
self.low=low
self.high=high
self.total=0
def run(self):
for i in range(self.low,self.high):
self.total+=i
thread1 = SummingThread(0,500000)
thread2 = SummingThread(500000,1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join() # This waits until the thread has completed
thread2.join()
# At this point, both threads have completed
result = thread1.total + thread2.total
print result
也许考虑一下python 3的 concurrent.futures.ThreadPoolExecutor 模块 . 结合 with 子句和列表理解,它可能是一个真正的魅力 .
from concurrent.futures import ThreadPoolExecutor, as_completed
def get_url(url):
# Your actual program here. Using threading.Lock() if necessary
return ""
# List of urls to fetch
urls = ["url1", "url2"]
with ThreadPoolExecutor(max_workers = 5) as executor:
# Create threads
futures = {executor.submit(get_url, url) for url in urls}
# as_completed() gives you the threads once finished
for f in as_completed(futures):
# Get the results
rs = f.result()
16
自从2010年提出这个问题以来,如何使用 map 和 pool 进行简单的多线程处理已经有了真正的简化 .
下面的代码来自一篇文章/博客文章,你绝对应该检查(没有隶属关系) - Parallelism in one line: A Better Model for Day to Day Threading Tasks . 我将在下面总结 - 它最终只是几行代码:
from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(4)
results = pool.map(my_function, my_array)
哪个是多线程版本:
results = []
for item in my_array:
results.append(my_function(item))
import urllib2
from multiprocessing.dummy import Pool as ThreadPool
urls = [
'http://www.python.org',
'http://www.python.org/about/',
'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
'http://www.python.org/doc/',
'http://www.python.org/download/',
'http://www.python.org/getit/',
'http://www.python.org/community/',
'https://wiki.python.org/moin/',
]
# make the Pool of workers
pool = ThreadPool(4)
# open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)
# close the pool and wait for the work to finish
pool.close()
pool.join()
import threading
from random import randint
from time import sleep
def print_number(number):
# Sleeps a random 1 to 10 seconds
rand_int_var = randint(1, 10)
sleep(rand_int_var)
print "Thread " + str(number) + " slept for " + str(rand_int_var) + " seconds"
thread_list = []
for i in range(1, 10):
# Instantiates the thread
# (i) does not make a sequence, so (i,)
t = threading.Thread(target=print_number, args=(i,))
# Sticks the thread in a list so that it remains accessible
thread_list.append(t)
# Starts threads
for thread in thread_list:
thread.start()
# This blocks the calling thread until the thread whose join() method is called is terminated.
# From http://docs.python.org/2/library/threading.html#thread-objects
for thread in thread_list:
thread.join()
# Demonstrates that the main process waited for threads to complete
print "Done"
from multiprocessing import Process
def f(name):
print 'hello', name
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
from threading import Thread
from project import app
import csv
def import_handler(csv_file_name):
thr = Thread(target=dump_async_csv_data, args=[csv_file_name])
thr.start()
def dump_async_csv_data(csv_file_name):
with app.app_context():
with open(csv_file_name) as File:
reader = csv.DictReader(File)
for row in reader:
#DB operation/query
驱动功能:
import_handler(csv_file_name)
1122
Alex Martelli给出的答案对我有所帮助,但是这里的修改版本我认为更有用(至少对我而言) .
import Queue
import threading
import urllib2
worker_data = ['http://google.com', 'http://yahoo.com', 'http://bing.com']
#load up a queue with your data, this will handle locking
q = Queue.Queue()
for url in worker_data:
q.put(url)
#define a worker function
def worker(queue):
queue_full = True
while queue_full:
try:
#get your data off the queue, and do some work
url= queue.get(False)
data = urllib2.urlopen(url).read()
print len(data)
except Queue.Empty:
queue_full = False
#create as many threads as you want
thread_count = 5
for i in range(thread_count):
t = threading.Thread(target=worker, args = (q,))
t.start()
6
这是一个简单的示例:您需要尝试一些备用URL并返回第一个要响应的内容 .
import Queue
import threading
import urllib2
# called by each thread
def get_url(q, url):
q.put(urllib2.urlopen(url).read())
theurls = ["http://google.com", "http://yahoo.com"]
q = Queue.Queue()
for u in theurls:
t = threading.Thread(target=get_url, args = (q,u))
t.daemon = True
t.start()
s = q.get()
print s
这是一种将线程用作简单优化的情况:每个子线程都在等待URL解析和响应,以便放置其内容在队列中;每个线程都是守护进程(赢得't keep the process up if main thread ends -- that'更常见);主线程启动所有子线程,在队列上执行 get 等待其中一个已完成 put ,然后发出结果并终止(这将取消任何可能仍在运行的子线程,因为它们是守护线程) .
import Queue
import threading
import multiprocessing
import subprocess
q = Queue.Queue()
for i in range(30): #put 30 tasks in the queue
q.put(i)
def worker():
while True:
item = q.get()
#execute a task: call a shell program and wait until it completes
subprocess.call("echo "+str(item), shell=True)
q.task_done()
cpus=multiprocessing.cpu_count() #detect number of cores
print("Creating %d threads" % cpus)
for i in range(cpus):
t = threading.Thread(target=worker)
t.daemon = True
t.start()
q.join() #block until all tasks are done
def sqr(val):
import time
time.sleep(0.1)
return val * val
def process_result(result):
print(result)
def process_these_asap(tasks):
import concurrent.futures
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = []
for task in tasks:
futures.append(executor.submit(sqr, task))
for future in concurrent.futures.as_completed(futures):
process_result(future.result())
# Or instead of all this just do:
# results = executor.map(sqr, tasks)
# list(map(process_result, results))
def main():
tasks = list(range(10))
print('Processing {} tasks'.format(len(tasks)))
process_these_asap(tasks)
print('Done')
return 0
if __name__ == '__main__':
import sys
sys.exit(main())
对于那些以前用Java弄脏的人来说,执行者的方法似乎很熟悉 .
另外还有一个注意事项:为了让宇宙保持理智,不要使用 with 上下文(这对你来说太棒了)
10
import threading
import requests
def send():
r = requests.get('https://www.stackoverlow.com')
thread = []
t = threading.Thread(target=send())
thread.append(t)
t.start()
import math
import timeit
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def time_stuff(fn):
"""
Measure time of execution of a function
"""
def wrapper(*args, **kwargs):
t0 = timeit.default_timer()
fn(*args, **kwargs)
t1 = timeit.default_timer()
print("{} seconds".format(t1 - t0))
return wrapper
def find_primes_in(nmin, nmax):
"""
Compute a list of prime numbers between the given minimum and maximum arguments
"""
primes = []
#Loop from minimum to maximum
for current in range(nmin, nmax + 1):
#Take the square root of the current number
sqrt_n = int(math.sqrt(current))
found = False
#Check if the any number from 2 to the square root + 1 divides the current numnber under consideration
for number in range(2, sqrt_n + 1):
#If divisible we have found a factor, hence this is not a prime number, lets move to the next one
if current % number == 0:
found = True
break
#If not divisible, add this number to the list of primes that we have found so far
if not found:
primes.append(current)
#I am merely printing the length of the array containing all the primes but feel free to do what you want
print(len(primes))
@time_stuff
def sequential_prime_finder(nmin, nmax):
"""
Use the main process and main thread to compute everything in this case
"""
find_primes_in(nmin, nmax)
@time_stuff
def threading_prime_finder(nmin, nmax):
"""
If the minimum is 1000 and the maximum is 2000 and we have 4 workers
1000 - 1250 to worker 1
1250 - 1500 to worker 2
1500 - 1750 to worker 3
1750 - 2000 to worker 4
so lets split the min and max values according to the number of workers
"""
nrange = nmax - nmin
threads = []
for i in range(8):
start = int(nmin + i * nrange/8)
end = int(nmin + (i + 1) * nrange/8)
#Start the thrread with the min and max split up to compute
#Parallel computation will not work here due to GIL since this is a CPU bound task
t = threading.Thread(target = find_primes_in, args = (start, end))
threads.append(t)
t.start()
#Dont forget to wait for the threads to finish
for t in threads:
t.join()
@time_stuff
def processing_prime_finder(nmin, nmax):
"""
Split the min, max interval similar to the threading method above but use processes this time
"""
nrange = nmax - nmin
processes = []
for i in range(8):
start = int(nmin + i * nrange/8)
end = int(nmin + (i + 1) * nrange/8)
p = multiprocessing.Process(target = find_primes_in, args = (start, end))
processes.append(p)
p.start()
for p in processes:
p.join()
@time_stuff
def thread_executor_prime_finder(nmin, nmax):
"""
Split the min max interval similar to the threading method but use thread pool executor this time
This method is slightly faster than using pure threading as the pools manage threads more efficiently
This method is still slow due to the GIL limitations since we are doing a CPU bound task
"""
nrange = nmax - nmin
with ThreadPoolExecutor(max_workers = 8) as e:
for i in range(8):
start = int(nmin + i * nrange/8)
end = int(nmin + (i + 1) * nrange/8)
e.submit(find_primes_in, start, end)
@time_stuff
def process_executor_prime_finder(nmin, nmax):
"""
Split the min max interval similar to the threading method but use the process pool executor
This is the fastest method recorded so far as it manages process efficiently + overcomes GIL limitations
RECOMMENDED METHOD FOR CPU BOUND TASKS
"""
nrange = nmax - nmin
with ProcessPoolExecutor(max_workers = 8) as e:
for i in range(8):
start = int(nmin + i * nrange/8)
end = int(nmin + (i + 1) * nrange/8)
e.submit(find_primes_in, start, end)
def main():
nmin = int(1e7)
nmax = int(1.05e7)
print("Sequential Prime Finder Starting")
sequential_prime_finder(nmin, nmax)
print("Threading Prime Finder Starting")
threading_prime_finder(nmin, nmax)
print("Processing Prime Finder Starting")
processing_prime_finder(nmin, nmax)
print("Thread Executor Prime Finder Starting")
thread_executor_prime_finder(nmin, nmax)
print("Process Executor Finder Starting")
process_executor_prime_finder(nmin, nmax)
main()
以下是我的Mac OSX 4核心机器上的结果
Sequential Prime Finder Starting
9.708213827005238 seconds
Threading Prime Finder Starting
9.81836523200036 seconds
Processing Prime Finder Starting
3.2467174359990167 seconds
Thread Executor Prime Finder Starting
10.228896902000997 seconds
Process Executor Finder Starting
2.656402041000547 seconds
17 回答
给定一个函数
f
,如下所示:将参数传递给
f
Python 3具有Launching parallel tasks的功能 . 这使我们的工作更轻松 .
它有thread pooling和Process pooling .
以下是一个见解:
ThreadPoolExecutor Example
ProcessPoolExecutor
NOTE :对于Python中的实际并行化,您应该使用multiprocessing模块来并行执行并行执行的多个进程(由于全局解释器锁定,Python线程提供交错但实际上是串行执行,而不是并行执行,并且仅在交错I / O操作) .
但是,如果您只是在寻找交错(或者正在进行可以并行化的I / O操作,尽管全局解释器锁定),那么threading模块就是起点 . 作为一个非常简单的例子,让我们通过并行求和子范围来考虑求和大范围的问题:
请注意,上面是一个非常愚蠢的例子,因为它完全没有I / O,并且由于全局解释器锁定而在CPython中将被串行执行,尽管是交错的(带有上下文切换的额外开销) .
大多数文档和教程都使用Python的
Threading
和Queue
模块,对于初学者来说,它们看起来似乎无法应对 .也许考虑一下python 3的
concurrent.futures.ThreadPoolExecutor
模块 . 结合with
子句和列表理解,它可能是一个真正的魅力 .自从2010年提出这个问题以来,如何使用 map 和 pool 进行简单的多线程处理已经有了真正的简化 .
下面的代码来自一篇文章/博客文章,你绝对应该检查(没有隶属关系) - Parallelism in one line: A Better Model for Day to Day Threading Tasks . 我将在下面总结 - 它最终只是几行代码:
哪个是多线程版本:
Description
Implementation
multiprocessing.dummy
与多处理模块but uses threads instead完全相同(an important distinction - 对CPU密集型任务使用多个进程; IO期间(和期间)使用线程):时间结果如下:
Passing multiple arguments (像这样工作only in Python 3.3 and later):
要传递多个数组:
或传递常量和数组:
如果您使用的是早期版本的Python,则可以通过this workaround传递多个参数 .
(感谢user136036的有用评论)
对我来说,线程的完美示例是监视异步事件 . 看看这段代码 .
您可以通过打开IPython会话并执行以下操作来使用此代码:
等几分钟
只需注意,线程不需要队列 .
这是我能想象的最简单的例子,它显示了10个并发运行的进程 .
与其他提到的一样,由于GIL,CPython只能使用线程进行I \ O等待 . 如果要从CPU绑定任务的多个内核中受益,请使用multiprocessing:
使用简单示例的多线程将有所帮助 . 您可以运行它并轻松理解多线程如何在python中工作 . 我使用lock来阻止访问其他线程,直到前一个线程完成他们的工作 . 通过使用
这行代码可以一次允许多个进程并保持其余的线程,这些线程将在稍后或之前完成的进程之后运行 .
以下是使用线程进行CSV导入的一个非常简单的示例 . [图书馆馆藏因不同目的而有所不同]
助手功能:
驱动功能:
Alex Martelli给出的答案对我有所帮助,但是这里的修改版本我认为更有用(至少对我而言) .
这是一个简单的示例:您需要尝试一些备用URL并返回第一个要响应的内容 .
这是一种将线程用作简单优化的情况:每个子线程都在等待URL解析和响应,以便放置其内容在队列中;每个线程都是守护进程(赢得't keep the process up if main thread ends -- that'更常见);主线程启动所有子线程,在队列上执行
get
等待其中一个已完成put
,然后发出结果并终止(这将取消任何可能仍在运行的子线程,因为它们是守护线程) .在Python中正确使用线程总是与I / O操作相关联(因为CPython无论如何都不使用多个内核来运行CPU绑定任务,因此在等待某些I / O时,线程的唯一原因并不是阻止进程) . 顺便说一下,队列几乎总是将工作分配到线程和/或收集工作结果的最佳方式,并且它们本质上是线程安全的,因此它们可以避免担心锁,条件,事件,信号量和其他线程协调/通信概念 .
我发现这非常有用:创建与核心一样多的线程并让它们执行(大量)任务(在这种情况下,调用shell程序):
以上解决方案都没有在我的GNU / Linux服务器上实际使用多个核心(我没有管理员权限) . 他们只是在一个核心上运行 . 我使用较低级别的
os.fork
接口来生成多个进程 . 这是适合我的代码:使用炽热的新concurrent.futures模块
对于那些以前用Java弄脏的人来说,执行者的方法似乎很熟悉 .
另外还有一个注意事项:为了让宇宙保持理智,不要使用
with
上下文(这对你来说太棒了)我在这里看到很多例子,没有真正的工作,他们主要是CPU绑定 . 以下是CPU绑定任务的示例,该任务计算1000万到1005万之间的所有素数 . 我在这里使用了所有4种方法
以下是我的Mac OSX 4核心机器上的结果