首页 文章

Python 3.5 asyncio在事件循环中从不同线程中的同步代码执行协程

提问于
浏览
4

我希望有人可以帮助我 .

我有一个对象,能够具有返回协程对象的属性 . 这很好用,但是我需要在单独的线程中从同步代码获取coroutine对象的结果,而事件循环当前正在运行 . 我想出的代码是:

def get_sync(self, key: str, default: typing.Any=None) -> typing.Any:
    """
    Get an attribute synchronously and safely.

    Note:
        This does nothing special if an attribute is synchronous. It only
        really has a use for asynchronous attributes. It processes
        asynchronous attributes synchronously, blocking everything until
        the attribute is processed. This helps when running SQL code that
        cannot run asynchronously in coroutines.

    Args:
        key (str): The Config object's attribute name, as a string.
        default (Any): The value to use if the Config object does not have
            the given attribute. Defaults to None.

    Returns:
        Any: The vale of the Config object's attribute, or the default
        value if the Config object does not have the given attribute.
    """
    ret = self.get(key, default)

    if asyncio.iscoroutine(ret):
        if loop.is_running():
            loop2 = asyncio.new_event_loop()
            try:
                ret = loop2.run_until_complete(ret)

            finally:
                loop2.close()
        else:
            ret = loop.run_until_complete(ret)

    return ret

我正在寻找的是一种在多线程环境中同步获取协程对象结果的安全方法 . self.get() 可以返回一个coroutine对象,用于我设置为它们提供的属性 . 我发现的问题是:如果事件循环正在运行 . 在堆栈溢出和其他一些站点上搜索了几个小时之后,我的(破碎的)解决方案就在上面 . 如果循环正在运行,我创建一个新的事件循环并在新的事件循环中运行我的协同程序 . 除了代码永远挂在 ret = loop2.run_until_complete(ret) 行上之外,这是有效的 .

现在,我有以下带有结果的场景:

_149003_的

  • 结果不是协程

  • 返回结果 . [Good]

_149005_的

  • 结果是一个协程和事件循环没有运行(基本上与事件循环在同一个线程中)

  • 返回结果 . [Good]

_149007的

  • 结果是一个协程和事件循环正在运行(基本上在与事件循环不同的线程中)

  • 永远等待结果 . [Bad]

有谁知道如何修复坏结果,这样我才能得到我需要的 Value ?谢谢 .

我希望我在这里有所了解 .

我确实有一个好的,有效的理由来使用线程;特别是我使用的SQLAlchemy不是异步的,我将SQLAlchemy代码发送到ThreadPoolExecutor来安全地处理它 . 但是,我需要能够从这些线程中查询这些异步属性,以便SQLAlchemy代码安全地获取某些配置值 . 不,我不会为了完成我需要的东西而从SQLAlchemy转移到另一个系统,所以请不要提供它的替代品 . 该项目太过遥远,无法切换一些如此重要的东西 .

我尝试使用 asyncio.run_coroutine_threadsafe()loop.call_soon_threadsafe() 并且都失败了 . 到目前为止,它已经最大限度地使它工作,我觉得我只是遗漏了一些明显的东西 .

当我有机会时,我会编写一些代码来提供问题的示例 .

好的,我实现了一个示例案例,它按照我期望的方式工作 . 所以我的问题很可能是代码中的其他地方 . 保持开放状态,如果需要,将改变问题以适应我的真实问题 .

有没有人有任何可能的想法,为什么 asyncio.run_coroutine_threadsafe()asyncio.run_coroutine_threadsafe() 将永远挂起而不是返回结果?

不幸的是,我的示例代码不会重复我的错误,如下所示:

import asyncio
import typing

loop = asyncio.get_event_loop()

class ConfigSimpleAttr:
    __slots__ = ('value', '_is_async')

    def __init__(
        self,
        value: typing.Any,
        is_async: bool=False
    ):
        self.value = value
        self._is_async = is_async

    async def _get_async(self):
        return self.value

    def __get__(self, inst, cls):
        if self._is_async and loop.is_running():
            return self._get_async()
        else:
            return self.value

class BaseConfig:
    __slots__ = ()

    attr1 = ConfigSimpleAttr(10, True)
    attr2 = ConfigSimpleAttr(20, True)    

    def get(self, key: str, default: typing.Any=None) -> typing.Any:
        return getattr(self, key, default)

    def get_sync(self, key: str, default: typing.Any=None) -> typing.Any:
        ret = self.get(key, default)

        if asyncio.iscoroutine(ret):
            if loop.is_running():
                fut = asyncio.run_coroutine_threadsafe(ret, loop)
                print(fut, fut.running())
                ret = fut.result()
            else:
                ret = loop.run_until_complete(ret)

        return ret

config = BaseConfig()

def example_func():
    return config.get_sync('attr1')

async def main():
    a1 = await loop.run_in_executor(None, example_func)
    a2 = await config.attr2
    val = a1 + a2
    print('{a1} + {a2} = {val}'.format(a1=a1, a2=a2, val=val))
    return val

loop.run_until_complete(main())

这是我的代码正在做的精简版,并且该示例有效,即使我的实际应用程序没有 . 我被困在哪里寻找答案 . 欢迎建议在哪里尝试追踪我的“永远卡住”问题,即使我上面的代码实际上没有复制问题 .

2 回答

  • 1

    您不太可能同时运行多个事件循环,因此这部分看起来非常错误:

    if loop.is_running():
            loop2 = asyncio.new_event_loop()
            try:
                ret = loop2.run_until_complete(ret)
    
            finally:
                loop2.close()
        else:
            ret = loop.run_until_complete(ret)
    

    即使测试循环是否正在运行,最好还是明确地将(仅)运行循环给 get_sync 并使用run_coroutine_threadsafe安排协程:

    def get_sync(self, key, loop):
        ret = self.get(key, default)
        if not asyncio.iscoroutine(ret):
            return ret
        future = asyncio.run_coroutine_threadsafe(ret, loop)
        return future.result()
    

    编辑:挂起问题可能与在错误循环中调度的任务有关(例如,在调用协程时忘记了可选的 loop 参数) . 使用PR 303(现在合并)应该更容易调试这种问题:当循环和未来不匹配时,会引发 RuntimeError . 因此,您可能希望使用最新版本的asyncio运行测试 .

  • 0

    好的,我通过采用不同的方法让我的代码工作 . 问题与使用具有文件IO的东西有关,我正在使用文件IO组件上的loop.run_in_executor()将其转换为协程 . 然后,我试图在从另一个线程调用的同步函数中使用它,在该函数上使用另一个loop.run_in_executor()进行处理 . 这是我的代码中一个非常重要的例程(在执行我的短代码时可能会被称为百万次或更多),我做出了一个决定,我的逻辑变得太复杂了 . 所以......我简单了 . 现在,如果我想异步使用文件IO组件,我明确使用我的“get_async()”方法,否则,我通过普通的属性访问使用我的属性 .

    通过消除逻辑的复杂性,它使代码更清晰,更容易理解,更重要的是,它实际上是有效的 . 虽然我并不是100%确定我知道问题的根本原因(我相信它与处理属性的线程有关,然后又会启动另一个尝试在处理属性之前读取属性的线程,导致类似竞争条件和停止我的代码,但我永远不能复制我的应用程序之外的错误,不幸的是完全证明了它,我能够通过它并继续我的开发工作 .

相关问题