首页 文章

Python asyncio任务收益率很低

提问于
浏览
12

我对如何在Python 3.4中使用 asyncio 模块感到困惑 . 我有一个用于搜索引擎的 searching API,并且希望每个搜索请求可以并行或异步运行,这样我就不必等待一次搜索完成另一次搜索 .

这是我的高级搜索API,用于使用原始搜索结果构建一些对象 . 搜索引擎本身正在使用某种asyncio机制,所以我不会打扰它 .

# No asyncio module used here now
class search(object):
  ...
  self.s = some_search_engine()
  ...
  def searching(self, *args, **kwargs):
    ret = {}
    # do some raw searching according to args and kwargs and build the wrapped results
    ...
    return ret

为了尝试异步请求,我编写了以下测试用例来测试如何将我的东西与 asyncio 模块进行交互 .

# Here is my testing script
@asyncio.coroutine
def handle(f, *args, **kwargs):
  r = yield from f(*args, **kwargs)
  return r

s = search()
loop = asyncio.get_event_loop()
loop.run_until_complete(handle(s.searching, arg1, arg2, ...))
loop.close()

通过运行pytest,当它到达 r = yield from ... 行时,它将返回 RuntimeError: Task got bad yield : {results from searching...} .

我也试过另一种方式 .

# same handle as above
def handle(..):
  ....
s = search()
loop = asyncio.get_event_loop()
tasks = [
        asyncio.async(handle(s.searching, arg11, arg12, ...)),
        asyncio.async(handle(s.searching, arg21, arg22, ...)),
        ...
        ]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

通过pytest运行这个测试用例,它会通过,但搜索引擎会产生一些奇怪的异常 . 它说 Future/Task exception was never retrieved .

我想问的事情:

  • 对于我的第一次尝试,通过返回函数调用的实际结果,是使用 yield from 的正确方法吗?

  • 我想我需要为我的第二个测试用例添加一些睡眠以等待任务完成,但是我应该怎么做?如何在第二个测试用例中调用函数调用?

  • 这是通过创建异步处理程序来处理请求,通过现有模块实现asyncio的好方法吗?

  • 如果对问题2的回答为NO,那么每个客户端调用类 search 是否需要包含 loop = get_event_loop() 这类东西来同步请求?

1 回答

  • 16

    问题是你不能只调用现有的同步代码,就好像它是一个 asyncio.coroutine 并获得异步行为 . 当你调用 yield from searching(...) 时,如果 searching 本身实际上是 asyncio.coroutine ,或者至少返回 asyncio.Future ,那么你只会得到异步行为 . 现在, searching 只是一个常规的同步函数,所以调用 yield from searching(...) 只会抛出一个错误,因为它不会返回 Future 或协同程序 .

    要获得所需的行为,除了 synchronous 版本之外,您还需要具有 searching 的异步版本(或者如果您不需要,则只需删除同步版本) . 您有几个选项可以支持两者:

    • searching 重写为 asyncio.coroutine ,它使用 asyncio 兼容调用来执行其I / O,而不是阻止I / O.这将使它在 asyncio 上下文中工作,但这意味着您还需要提供另一个同步 searching 方法来启动 asyncio 事件循环并调用 return loop.run_until_complete(self.searching(...)) . 有关详细信息,请参阅this question .

    • 保持 searching 的同步实现,并提供另一个使用BaseEventLoop.run_in_executor在后台线程中运行 searching 方法的异步API:

    class search(object):
      ...
      self.s = some_search_engine()
      ...
      def searching(self, *args, **kwargs):
        ret = {}
        ...
        return ret
    
       @asyncio.coroutine
       def searching_async(self, *args, **kwargs):
          loop = kwargs.get('loop', asyncio.get_event_loop())
          try:
              del kwargs['loop']  # assuming searching doesn't take loop as an arg
          except KeyError:
              pass
          r = yield from loop.run_in_executor(None, self.searching, *args)  # Passing None tells asyncio to use the default ThreadPoolExecutor
          return r
    

    测试脚本:

    s = search()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(s.searching_async(arg1, arg2, ...))
    loop.close()
    

    这样,您可以保持同步代码不变,并且至少提供可以在 asyncio 代码中使用的方法,而不会阻止事件循环 . 它不是像你在代码中实际使用异步I / O那样干净的解决方案,但它总比没有好 .

    • 提供两个完全独立的 searching 版本,一个使用阻塞I / O,另一个使用 asyncio 兼容 . 这为两个上下文提供了理想的实现,但需要两倍的工作 .

相关问题