首页 文章

在asyncio.Protocol.data_received中调用协同程序

提问于
浏览
11

我在新的Python asyncio 模块的 asyncio.Protocol.data_received 回调中执行异步操作时遇到问题 .

考虑以下服务器:

class MathServer(asyncio.Protocol):

   @asyncio.coroutine
   def slow_sqrt(self, x):
      yield from asyncio.sleep(1)
      return math.sqrt(x)

   def fast_sqrt(self, x):
      return math.sqrt(x)

   def connection_made(self, transport):
      self.transport = transport

   #@asyncio.coroutine
   def data_received(self, data):
      print('data received: {}'.format(data.decode()))
      x = json.loads(data.decode())
      #res = self.fast_sqrt(x)
      res = yield from self.slow_sqrt(x)
      self.transport.write(json.dumps(res).encode('utf8'))
      self.transport.close()

与以下客户一起使用:

class MathClient(asyncio.Protocol):

   def connection_made(self, transport):
      transport.write(json.dumps(2.).encode('utf8'))

   def data_received(self, data):
      print('data received: {}'.format(data.decode()))

   def connection_lost(self, exc):
      asyncio.get_event_loop().stop()

随着 self.fast_sqrt 被调用,一切都按预期工作 .

使用 self.slow_sqrt ,它不起作用 .

它也不适用于 self.fast_sqrt@asyncio.coroutine 上的 @asyncio.coroutine 装饰器 .

我觉得我在这里缺少一些基本的东西 .

完整的代码在这里:

经测试:

  • Python 3.4.0b1(Windows)

  • Python 3.3.3 asyncio-0.2.1(FreeBSD)

两者的问题都是一样的:使用 slow_sqrt ,客户端/服务器只会挂起什么都不做 .

1 回答

  • 8

    看来,这需要通过 Future 解耦 - 尽管我仍然不确定这是否是正确的方法 .

    class MathServer(asyncio.Protocol):
    
       @asyncio.coroutine
       def slow_sqrt(self, x):
          yield from asyncio.sleep(2)
          return math.sqrt(x)
    
       def fast_sqrt(self, x):
          return math.sqrt(x)
    
       def consume(self):
          while True:
             self.waiter = asyncio.Future()
             yield from self.waiter
             while len(self.receive_queue):
                data = self.receive_queue.popleft()
                if self.transport:
                   try:
                      res = self.process(data)
                      if isinstance(res, asyncio.Future) or \
                         inspect.isgenerator(res):
                         res = yield from res
                   except Exception as e:
                      print(e)
    
       def connection_made(self, transport):
          self.transport = transport
          self.receive_queue = deque()
          asyncio.Task(self.consume())
    
       def data_received(self, data):
          self.receive_queue.append(data)
          if not self.waiter.done():
             self.waiter.set_result(None)
          print("data_received {} {}".format(len(data), len(self.receive_queue)))
    
       def process(self, data):
          x = json.loads(data.decode())
          #res = self.fast_sqrt(x)
          res = yield from self.slow_sqrt(x)
          self.transport.write(json.dumps(res).encode('utf8'))
          #self.transport.close()
    
       def connection_lost(self, exc):
          self.transport = None
    

    这是Guido van Rossum的answer

    解决方案很简单:将该逻辑写为使用@coroutine标记的单独方法,并使用async()(在这种情况下为== Task())在data_received()中将其关闭 . 之所以没有内置到协议中的原因是,如果是,它将需要备用事件循环实现来处理协同程序 .

    def data_received(self, data):
        asyncio.ensure_future(self.process_data(data))
    
    @asyncio.coroutine
    def process_data(self, data):
        # ...stuff using yield from...
    

    完整代码在这里: - Client - Server

相关问题