我正在编写一个简单的 生产环境 者/消费者应用程序来异步调用多个URL .

在下面的代码中,如果我设置conn_count = 1,并将2个项添加到Queue,它可以正常工作,因为只创建了一个使用者 . 但是,如果我使conn_count = 2并向Queue添加4个项目,则只会发出3个请求 . 另一个请求因ClientConnectorError而失败 .

您能帮助调试多个消费者失败的原因吗?谢谢 .

我正在使用我创建的echo服务器 .

服务器:

import os
import logging.config
import yaml
from aiohttp import web
import json


def start():
    setup_logging()

    app = web.Application()
    app.router.add_get('/', do_get)
    app.router.add_post('/', do_post)
    web.run_app(app)


async def do_get(request):
    return web.Response(text='hello')


async def do_post(request):
    data = await request.json()
    return web.Response(text=json.dumps(data))


def setup_logging(
        default_path='logging.yaml',
        default_level=logging.INFO,
        env_key='LOG_CFG'
):
    path = default_path
    value = os.getenv(env_key, None)

    if value:
        path = value

    if os.path.exists(path):
        with open(path, 'rt') as f:
            config = yaml.safe_load(f.read())
        logging.config.dictConfig(config)
    else:
        logging.basicConfig(level=default_level)


if __name__ == '__main__':
    start()

客户:

import asyncio
import collections
import json
import sys
import async_timeout

from aiohttp import ClientSession, TCPConnector

MAX_CONNECTIONS = 100
URL = 'http://localhost:8080'

InventoryAccount = collections.namedtuple("InventoryAccount", "op_co customer_id")


async def produce(queue, num_consumers):
    for i in range(num_consumers * 2):
        await queue.put(InventoryAccount(op_co=i, customer_id=i * 100))

    for j in range(num_consumers):
        await queue.put(None)


async def consumer(n, queue, session, responses):
    print('consumer {}: starting'.format(n))

    while True:
        try:
            account = await queue.get()

            if account is None:
                queue.task_done()
                break
            else:
                print(f"Consumer {n}, Updating cloud prices for account: opCo = {account.op_co!s}, customerId = {account.customer_id!s}")

                params = {'opCo': account.op_co, 'customerId': account.customer_id}
                headers = {'content-type': 'application/json'}

                with async_timeout.timeout(10):
                    print(f"Consumer {n}, session state " + str(session.closed))
                    async with session.post(URL,
                                            headers=headers,
                                            data=json.dumps(params)) as response:
                        assert response.status == 200

                        responses.append(await response.text())

                queue.task_done()
        except:
            e = sys.exc_info()[0]
            print(f"Consumer {n}, Error updating cloud prices for account: opCo = {account.op_co!s}, customerId = {account.customer_id!s}. {e}")
            queue.task_done()

    print('consumer {}: ending'.format(n))


async def start(loop, session, num_consumers):
    queue = asyncio.Queue(maxsize=num_consumers)
    responses = []

    consumers = [asyncio.ensure_future(loop=loop, coro_or_future=consumer(i, queue, session, responses)) for i in range(num_consumers)]

    await produce(queue, num_consumers)
    await queue.join()

    for consumer_future in consumers:
        consumer_future.cancel()

    return responses


async def run(loop, conn_count):
    async with ClientSession(loop=loop, connector=TCPConnector(verify_ssl=False, limit=conn_count)) as session:
        result = await start(loop, session, conn_count)
        print("Result: " + str(result))


if __name__ == '__main__':
    conn_count = 2

    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(run(loop, conn_count))
    finally:
        loop.close()

参考: