首页 文章

Python 3.4 Comms Stream Delegate - 非阻塞recv和send - 数据从asyncio输出

提问于
浏览
0

我正在将一个客户端服务器应用程序放在RPi上 . 它有一个主线程,可以创建一个与iOS设备通信的通信线程 . 主线程创建一个asyncio事件循环和一个sendQ和一个recvQ,并将它们作为args传递给comms线程中的commsDelegate main方法 .

我遇到的麻烦是当iOS设备连接时,它需要在数据可用时立即从此Python应用程序接收未经请求的数据,并且需要能够将数据发送到Python应用程序 . 所以发送和接收需要是非阻塞的 .

那里有很棒的echo服务器教程 . 但在服务器对数据做有用的事情方面却很少 .

任何人都可以协助我让asyncio读取我的发送队列并在主线程排队后立即转发数据吗?我收到了很好的工作 .

Main Thread creates a loop and starts the comms thread:

commsLoop = asyncio.new_event_loop()
  commsMainThread = threading.Thread(target=CommsDelegate.commsDelegate, args=(commsInQ,commsOutQ,commsLoop,commsPort,), daemon=True)
  commsMainThread.start()

然后模块CommsDelegate中的asyncio应该运行循环作为loop.run_forever()服务器任务从套接字流读取和写入并使用队列发送接收消息备份到主线程 .

到目前为止,这是我的代码 . 我发现如果我为协议生成器创建一个工厂,我可以将它传递给队列名称,以便现在收到的消息都很好 . 当他们从客户端到达时,他们排队_nowait并且主线程接收它们就好了 .

我只需要asyncio来处理来自主线程的出站消息队列,因为它们到达sendQ,因此它可以将它们发送到连接的客户端 .

#!/usr/bin/env python3.6
import asyncio

class ServerProtocol(asyncio.Protocol):

  def __init__(self, loop, recvQ, sendQ):
    self.loop = loop
    self.recvQ = recvQ
    self.sendQ = sendQ

  def connection_made(self, transport):
    peername = transport.get_extra_info('peername')
    print('Connection from {}'.format(peername))
    self.transport = transport

  def data_received(self, data):
    message = data.decode()
    print('Data received: {!r}'.format(message))
    self.recvQ.put_nowait(message.rstrip())

  # Needs work... I think the queue.get_nowait should be a co-ro maybe? 
  def unknownAtTheMo():
    dataToSend = sendQ.get_nowait()
    print('Send: {!r}'.format(message))
    self.transport.write(dataToSend)

  # Needs work to close on request from client or server or exc...
  def handleCloseSocket(self):
    print('Close the client socket')
    self.transport.close()

# async co-routine to consume the send message Q from Main Thread
async def consume(sendQ):
  print("In consume coro")
  while True:
    outboundData = await self.sendQ.get()
    print("Consumed", outboundData)
    self.transport.write(outboundData.encode('ascii'))

def commsDelegate(recvQ, sendQ, loop, port):
  asyncio.set_event_loop(loop)

  # Connection coroutine - Create a factory to assist the protocol in receipt of the queues as args
  factory = lambda: ProveItServerProtocol(loop, recvQ, sendQ)
  # Each client connection will create a new protocol instance
  connection = loop.run_until_complete(loop.create_server(factory, host='192.168.1.199', port=port))

  # Outgoing message queue handler
  consumer = asyncio.ensure_future(consume(sendQ))

  # Set up connection
  loop.run_until_complete(connection)

  # Wait until the connection is closed
  loop.run_forever()

  # Wait until the queue is empty
  loop.run_until_complete(queue.join())

  # Cancel the consumer
  consumer.cancel()

  # Let the consumer terminate
  loop.run_until_complete(consumer)

  # Close the connection
  connection.close()

  # Close the loop
  loop.close()

我将所有数据消息发送为json,CommsDelegate执行编码和解码,然后将它们作为中继进行中继 .

更新:asyncio线程似乎适用于传入流量 . 服务器接收json并通过队列中继它 - 非阻塞 .

一旦发送工作,我将在一个线程上有一个可重用的黑盒服务器 .

1 回答

  • 0

    我可以看到你的方法有两个问题 . 首先,所有客户端都使用相同的 recvsend 队列,因此 consume 协程无法知道应答谁 .

    第二个问题与您使用队列作为同步和异步世界之间的桥梁有关 . 看到你的代码的这一部分:

    await self.sendQ.get()
    

    如果 sendQ 是常规队列(来自 queue 模块),则此行将失败,因为 sendQ 不是协程 . 另一方面,如果 sendQasyncio.Queue ,主线程将无法使用 sendQ.put ,因为它是一个协程 . 可以使用 put_nowait ,但asyncio不保证线程安全 . 相反,你必须使用loop.call_soon_threadsafe

    loop.call_soon_threadsafe(sendQ.put_nowait, message)
    

    通常,请记住asyncio旨在作为主应用程序运行 . 它应该在主线程中运行,并通过 ThreadPoolExecutor 与同步代码通信(参见loop.run_in_executor) .

    有关asyncio documentation中多线程的更多信息 . 您可能还想查看asyncio stream API,它提供了一个更好的界面来使用TCP .

相关问题