首页 文章

在Python中发送100,000个HTTP请求的最快方法是什么?

提问于
浏览
204

我正在打开一个有100,000个网址的文件 . 我需要向每个URL发送一个http请求并打印状态代码 . 我使用的是Python 2.6,到目前为止,我看到了Python实现线程/并发的许多令人困惑的方式 . 我甚至看过python concurrence库,但无法弄清楚如何正确编写这个程序 . 有没有人遇到类似的问题?我想通常我需要知道如何尽快在Python中执行数千个任务 - 我想这意味着'concurrently' .

13 回答

  • 7

    对于您的情况,线程可能会起作用,因为您可能花费大部分时间等待响应 . 标准库中有一些有用的模块,如Queue,可能有所帮助 .

    我之前做过类似的并行下载文件,这对我来说已经足够了,但它并不是你所说的规模 .

    如果您的任务受CPU限制,您可能需要查看multiprocessing模块,这将允许您使用更多的CPU /核心/线程(由于每个进程的锁定,因此更多的进程不会相互阻塞)

  • 42

    解决此问题的一个好方法是首先编写获取一个结果所需的代码,然后合并线程代码以并行化应用程序 .

    在完美的世界中,这只是意味着同时启动100,000个线程,将其结果输出到字典或列表中以供稍后处理,但实际上,您可以以这种方式发出多少并行HTTP请求 . 在本地,您可以同时打开多少个套接字,Python解释器允许执行多少个线程 . 远程地,如果所有请求都针对一个服务器或多个请求,则可能会限制同时连接的数量 . 这些限制可能需要您编写脚本,以便在任何时候只调查一小部分URL(100,正如另一张海报所提到的,可能是一个不错的线程池大小,尽管你可能会发现你可以成功部署更多) .

    您可以按照此设计模式解决上述问题:

    • 启动一个启动新请求线程的线程,直到当前运行的线程数(您可以通过threading.active_count()或通过将线程对象推送到数据结构中来跟踪它们)> =您的最大并发请求数(比如100) ),然后睡一会儿 . 当没有更多URL要处理时,此线程应终止 . 因此,线程将继续唤醒,启动新线程,并在完成之前休眠 .

    • 让请求线程将其结果存储在某些数据结构中,以便以后检索和输出 . 如果您在CPython中存储结果的结构是 listdict ,则可以safely append or insert unique items from your threads without locks,但如果您写入文件或需要更复杂的跨线程数据交互,则应使用互斥锁来保护此状态腐败 .

    我建议你使用threading模块 . 您可以使用它来启动和跟踪正在运行的线程 . Python的线程支持是裸的,但是对你的问题的描述表明它完全足以满足你的需求 .

    最后,如果您希望看到用Python编写的并行网络应用程序的非常简单的应用程序,请查看ssh.py . 它是一个小型库,它使用Python线程来并行化许多SSH连接 . 设计非常接近您的要求,您可能会发现它是一个很好的资源 .

  • 0

    使用grequests,它是Gevent模块请求的组合 .

    GRequests允许您使用带有Gevent的请求轻松地生成异步HTTP请求 .

    用法很简单:

    import grequests
    
    urls = [
       'http://www.heroku.com',
       'http://tablib.org',
       'http://httpbin.org',
       'http://python-requests.org',
       'http://kennethreitz.com'
    ]
    

    创建一组未发送的请求:

    >>> rs = (grequests.get(u) for u in urls)
    

    同时发送所有内容:

    >>> grequests.map(rs)
    [<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]
    
  • 1

    一个办法:

    from twisted.internet import reactor, threads
    from urlparse import urlparse
    import httplib
    import itertools
    
    
    concurrent = 200
    finished=itertools.count(1)
    reactor.suggestThreadPoolSize(concurrent)
    
    def getStatus(ourl):
        url = urlparse(ourl)
        conn = httplib.HTTPConnection(url.netloc)   
        conn.request("HEAD", url.path)
        res = conn.getresponse()
        return res.status
    
    def processResponse(response,url):
        print response, url
        processedOne()
    
    def processError(error,url):
        print "error", url#, error
        processedOne()
    
    def processedOne():
        if finished.next()==added:
            reactor.stop()
    
    def addTask(url):
        req = threads.deferToThread(getStatus, url)
        req.addCallback(processResponse, url)
        req.addErrback(processError, url)   
    
    added=0
    for url in open('urllist.txt'):
        added+=1
        addTask(url.strip())
    
    try:
        reactor.run()
    except KeyboardInterrupt:
        reactor.stop()
    

    原料与材料:

    [kalmi@ubi1:~] wc -l urllist.txt
    10000 urllist.txt
    [kalmi@ubi1:~] time python f.py > /dev/null 
    
    real    1m10.682s
    user    0m16.020s
    sys 0m10.330s
    [kalmi@ubi1:~] head -n 6 urllist.txt
    http://www.google.com
    http://www.bix.hu
    http://www.godaddy.com
    http://www.google.com
    http://www.bix.hu
    http://www.godaddy.com
    [kalmi@ubi1:~] python f.py | head -n 6
    200 http://www.bix.hu
    200 http://www.bix.hu
    200 http://www.bix.hu
    200 http://www.bix.hu
    200 http://www.bix.hu
    200 http://www.bix.hu
    

    Pingtime:

    bix.hu is ~10 ms away from me
    godaddy.com: ~170 ms
    google.com: ~30 ms
    
  • 0

    线程绝对不是这里的答案 . 它们将提供进程和内核瓶颈,以及如果总体目标是“最快的方式”则不能接受的吞吐量限制 .

    一点 twisted 和它的异步 HTTP 客户端会给你更好的结果 .

  • 30

    考虑使用Windmill,虽然Windmill可能无法做那么多线程 .

    您可以在5台计算机上使用手动滚动的Python脚本执行此操作,每个计算机使用端口40000-60000连接出站,打开100,000个端口连接 .

    此外,使用一个很好的线程化QA应用程序(如OpenSTA)进行样本测试可能会有所帮助,以便了解每个服务器可以处理多少 .

    另外,尝试使用简单的Perl和LWP :: ConnCache类 . 您可能会以这种方式获得更多性能(更多连接) .

  • 14

    如果您希望获得最佳性能,可能需要考虑使用异步I / O而不是线程 . 成千上万的开销操作系统线程非常重要,Python解释器中的上下文切换在它之上增加了更多 . 线程肯定会完成工作,但我怀疑异步路由将提供更好的整体性能 .

    具体来说,我建议在Twisted库中使用异步Web客户端(http://www.twistedmatrix.com) . 它有一个公认的陡峭的学习曲线,但是一旦你很好地处理了Twisted的异步编程风格,它就很容易使用 .

    Twisted的异步Web客户端API上的HowTo可在以下位置获得:

    http://twistedmatrix.com/documents/current/web/howto/client.html

  • -2

    使用tornado异步网络库的解决方案

    from tornado import ioloop, httpclient
    
    i = 0
    
    def handle_request(response):
        print(response.code)
        global i
        i -= 1
        if i == 0:
            ioloop.IOLoop.instance().stop()
    
    http_client = httpclient.AsyncHTTPClient()
    for url in open('urls.txt'):
        i += 1
        http_client.fetch(url.strip(), handle_request, method='HEAD')
    ioloop.IOLoop.instance().start()
    
  • 7

    事情发生了很大变化,自2010年发布以来,我没有尝试过所有其他答案,但我尝试了一些,我发现这对我使用python3.6最好 .

    我能够在AWS上每秒获取约150个独特的域名 .

    import pandas as pd
    import concurrent.futures
    import requests
    import time
    
    out = []
    CONNECTIONS = 100
    TIMEOUT = 5
    
    tlds = open('../data/sample_1k.txt').read().splitlines()
    urls = ['http://{}'.format(x) for x in tlds[1:]]
    
    def load_url(url, timeout):
        ans = requests.head(url, timeout=timeout)
        return ans.status_code
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
        future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
        time1 = time.time()
        for future in concurrent.futures.as_completed(future_to_url):
            try:
                data = future.result()
            except Exception as exc:
                data = str(type(exc))
            finally:
                out.append(data)
    
                print(str(len(out)),end="\r")
    
        time2 = time.time()
    
    print(f'Took {time2-time1:.2f} s')
    print(pd.Series(out).value_counts())
    
  • 14

    无双解决方案:

    from urlparse import urlparse
    from threading import Thread
    import httplib, sys
    from Queue import Queue
    
    concurrent = 200
    
    def doWork():
        while True:
            url = q.get()
            status, url = getStatus(url)
            doSomethingWithResult(status, url)
            q.task_done()
    
    def getStatus(ourl):
        try:
            url = urlparse(ourl)
            conn = httplib.HTTPConnection(url.netloc)   
            conn.request("HEAD", url.path)
            res = conn.getresponse()
            return res.status, ourl
        except:
            return "error", ourl
    
    def doSomethingWithResult(status, url):
        print status, url
    
    q = Queue(concurrent * 2)
    for i in range(concurrent):
        t = Thread(target=doWork)
        t.daemon = True
        t.start()
    try:
        for url in open('urllist.txt'):
            q.put(url.strip())
        q.join()
    except KeyboardInterrupt:
        sys.exit(1)
    

    这个比扭曲的解决方案快一点,并且使用更少的CPU .

  • 5

    最简单的方法是使用Python的内置线程库 . 它们不是“真正的”/内核线程它们有问题(比如序列化),但它们已经足够好了 . 你想要一个队列和线程池 . 一个选项是here,但是's trivial to write your own. You can'并行化了所有100,000个调用,但是你可以同时触发它们中的100个(左右) .

  • 0

    使用thread pool是一个不错的选择,这将使这相当容易 . 不幸的是,python没有一个标准库,使线程池变得非常容易 . 但这里有一个像样的图书馆,可以帮助你入门:http://www.chrisarndt.de/projects/threadpool/

    他们网站的代码示例:

    pool = ThreadPool(poolsize)
    requests = makeRequests(some_callable, list_of_args, callback)
    [pool.putRequest(req) for req in requests]
    pool.wait()
    

    希望这可以帮助 .

  • 162

    这个扭曲的异步Web客户端非常快 .

    #!/usr/bin/python2.7
    
    from twisted.internet import reactor
    from twisted.internet.defer import Deferred, DeferredList, DeferredLock
    from twisted.internet.defer import inlineCallbacks
    from twisted.web.client import Agent, HTTPConnectionPool
    from twisted.web.http_headers import Headers
    from pprint import pprint
    from collections import defaultdict
    from urlparse import urlparse
    from random import randrange
    import fileinput
    
    pool = HTTPConnectionPool(reactor)
    pool.maxPersistentPerHost = 16
    agent = Agent(reactor, pool)
    locks = defaultdict(DeferredLock)
    codes = {}
    
    def getLock(url, simultaneous = 1):
        return locks[urlparse(url).netloc, randrange(simultaneous)]
    
    @inlineCallbacks
    def getMapping(url):
        # Limit ourselves to 4 simultaneous connections per host
        # Tweak this number, but it should be no larger than pool.maxPersistentPerHost 
        lock = getLock(url,4)
        yield lock.acquire()
        try:
            resp = yield agent.request('HEAD', url)
            codes[url] = resp.code
        except Exception as e:
            codes[url] = str(e)
        finally:
            lock.release()
    
    
    dl = DeferredList(getMapping(url.strip()) for url in fileinput.input())
    dl.addCallback(lambda _: reactor.stop())
    
    reactor.run()
    pprint(codes)
    

相关问题