首页 文章

如何使用asyncio和redis创建全局连接

提问于
浏览
4

我是python 3的新手,来自gevent和2.7的asyncio ....

如何创建所有人都可以使用的全局连接?例如 . 我将进行1个过程,例如10个asyncio线程,但我不希望每个线程单独连接 . 为什么?..会有... 100个内核,每个内核有10个线程,并且不希望有多个连接到redis

import asyncio
import asyncio_redis

async def worker():
    while True:
        data = await connection.brpop(['queue'], timeout=0)
        print(data)
        res = blocking_code(data)
        await connection.set('test',res)

#Process raw data here and all code is blocking
def blocking_code(data):
    results = {}
    return results

if __name__ == '__main__':
    connection = asyncio_redis.Connection.create(host='127.0.0.1', port=6379, poolsize=2)
    loop = asyncio.get_event_loop()
    tasks = [asyncio.ensure_future(worker()), asyncio.ensure_future(worker())]
    loop.run_until_complete(asyncio.gather(*tasks))
    connection.close()


    Traceback (most recent call last):
      File "/Users//worker.py", line 14, in <module>
        loop.run_until_complete(example())
      File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 466, in run_until_complete
        return future.result()
      File "/Users//worker.py", line 7, in example
        data = yield from connection.brpop(['queue'], timeout=0)
    AttributeError: 'generator' object has no attribute 'brpop'

所以在上面我有两个任务,但我只想要1个redis连接

1 回答

  • 1

    10个asyncio线程

    以防万一 - asyncio coroutines在一个线程中运行 . 通过在I / O操作时在协同程序之间切换来实现并发 .

    为什么你的代码不起作用?

    asyncio_redis.Connection.create - 是一个协程,您应该使用 yield from 等待此操作从中获取结果:

    connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
    

    如何创建全局连接

    如果你'll have only one connection, you' ll可能从使用asyncio没有任何好处 . 并发请求可能需要可以使用的连接池 . asyncio_redis has简单的方法,例如:

    import asyncio
    import asyncio_redis
    
    
    @asyncio.coroutine
    def main():
        connection = yield from asyncio_redis.Pool.create(host='127.0.0.1', port=6379, poolsize=10)
        try:
            # 3 requests running concurrently in single thread using connections from pool: 
            yield from asyncio.gather(
                connection.brpop(['queue:pixel'], timeout=0),
                connection.brpop(['queue:pixel'], timeout=0),
                connection.brpop(['queue:pixel'], timeout=0),
            )
        finally:
            connection.close()
    
    
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
        loop.close()
    

    Python 3.5

    如果您正在使用Python 3.5,请考虑使用newer syntax来定义和等待协同程序 .

    Upd:

    阻止代码(例如,需要很多CPU时间的代码)不能直接在协程内部使用:它会冻结你的事件循环,你将无法获得asyncio的好处 . 它与连接数无关 .

    您可以使用run_in_executor在单独的进程中运行此代码而不阻塞事件循环:

    from concurrent.futures import ProcessPoolExecutor
    
    
    executor = ProcessPoolExecutor(max_workers=10)  # use number of cores here
    
    
    async def worker():
        while True:
            data = await connection.brpop(['queue'], timeout=0)
            print(data)
    
            # await blocking_code from separate process:
            loop = asyncio.get_event_loop()
            result = await loop.run_in_executor(executor, blocking_code, data)
    

相关问题