首页 文章

如何将python asyncio与线程结合起来?

提问于
浏览
32

我已成功构建了一个带有Python asyncio和aiohttp的RESTful microservice,它可以监听POST事件以从各种馈送器收集实时事件 .

然后,它构建一个内存中结构,以便在嵌套的defaultdict / deque结构中缓存最后24h的事件 .

现在我想定期检查该结构到光盘,最好使用泡菜 .

由于内存结构可能> 100MB,我想避免在检查结构所需的时间内阻止我的传入事件处理 .

我宁愿创建结构的快照拷贝(例如深度拷贝),然后花时间将其写入磁盘并在预设的时间间隔内重复 .

我一直在寻找关于如何组合线程的例子(并且是一个线程甚至是最好的解决方案吗?)和asyncio用于那个目的但找不到能帮助我的东西 .

任何开始使用的指针都非常感谢!

2 回答

  • 3

    使用BaseEventLoop.run_in_executor将方法委托给线程或子进程非常简单:

    import asyncio
    import time
    from concurrent.futures import ProcessPoolExecutor
    
    def cpu_bound_operation(x):
        time.sleep(x) # This is some operation that is CPU-bound
    
    @asyncio.coroutine
    def main():
        # Run cpu_bound_operation in the ProcessPoolExecutor
        # This will make your coroutine block, but won't block
        # the event loop; other coroutines can run in meantime.
        yield from loop.run_in_executor(p, cpu_bound_operation, 5)
    
    
    loop = asyncio.get_event_loop()
    p = ProcessPoolExecutor(2) # Create a ProcessPool with 2 processes
    loop.run_until_complete(main())
    

    至于是否使用 ProcessPoolExecutorThreadPoolExecutor ,这很难说;腌制一个大型物体肯定会占用一些CPU周期,最初你会认为 ProcessPoolExecutor 是要走的路 . 但是,将100MB对象传递给池中的 Process 将需要在主进程中对实例进行pickle,通过IPC将字节发送到子进程,在子进程中取消对其进行取消,然后再次对其进行pickle以便将其写入磁盘 . 考虑到这一点,我的猜测是,酸洗/去除开销会足够大,以至于你最好使用 ThreadPoolExecutor ,即使你因GIL而受到性能影响 .

    也就是说,测试两种方式并确定无疑是非常简单的,所以你不妨这样做 .

  • 50

    我也使用了 run_in_executor ,但是在大多数情况下我发现这个函数有点粗糙,因为它需要 partial() 用于关键字args并且我永远不会用除了单个 Actuator 和默认事件循环之外的任何东西来调用它 . 所以我用一个合理的默认值和自动关键字参数处理为它做了一个方便的包装 .

    from time import sleep
    import asyncio as aio
    loop = aio.get_event_loop()
    
    class Executor:
        """In most cases, you can just use the 'execute' instance as a
        function, i.e. y = await execute(f, a, b, k=c) => run f(a, b, k=c) in
        the executor, assign result to y. The defaults can be changed, though,
        with your own instantiation of Executor, i.e. execute =
        Executor(nthreads=4)"""
        def __init__(self, loop=loop, nthreads=1):
            from concurrent.futures import ThreadPoolExecutor
            self._ex = ThreadPoolExecutor(nthreads)
            self._loop = loop
        def __call__(self, f, *args, **kw):
            from functools import partial
            return self._loop.run_in_executor(self._ex, partial(f, *args, **kw))
    execute = Executor()
    
    ...
    
    def cpu_bound_operation(t, alpha=30):
        sleep(t)
        return 20*alpha
    
    async def main():
        y = await execute(cpu_bound_operation, 5, alpha=-2)
    
    loop.run_until_complete(main())
    

相关问题