首页 文章

Asyncio检测断开挂起

提问于
浏览
4

我在Python 3.4中使用Asyncio,我将尝试解释我在这一点上做了什么,以及我(认为)导致问题的原因 .

一方面,我有一个带阻塞操作的UDP连接框架,我正在从这个流中获取数据并创建以SSE格式传递给客户端的json . 这一切都很有效 .

我遇到的问题是,如果我不做任何事情并且客户端断开连接我无法正常处理客户端断开连接我将开始收到此错误:

WARNING [selector_events:613] socket.send() raised exception.

因为循环仍然在运行,我一直在研究如何干净地打破循环并触发.close(),但是我遇到了我发现的例子的问题,并且在线资源不多 .

似乎实际工作的一个例子是尝试从客户端读取一行,如果它是一个空字符串,则意味着客户端断开连接 .

while True:
        data = (yield from client_reader.readline())
        if not data: #client disconnected
            break

然而,在大约十条消息之后,所有发送给客户端的消息都停止了,我认为这是因为它挂起了“data =(从client_reader.readline()得到的产品)”,如果我关闭客户端然后它正确地关闭并且“结束”它挂起连接“确实被调用了 . 任何想法为什么它可能挂?我觉得我现在对Asyncio有很好的处理能力,但这个让我很困惑 .

注意:location()和status()是我从UDP套接字获取信息的两次调用 - 我使用相同的代码成功运行它们几个小时没有问题 - 减去客户端断开线 .

clients = {}

def accept_client(client_reader, client_writer):
    task = asyncio.Task(handle_client(client_reader, client_writer))
    clients[task] = (client_writer)


def client_done(task):
    del clients[task]
    client_writer.close()
    log.info("End Connection")

log.info("New Connection")
task.add_done_callback(client_done)

@asyncio.coroutine
def handle_client(client_reader, client_writer):
    data = {'result':{'status':'Connection Ready'}}
    yield from postmessage(data,client_writer)
    while True:
        data = (yield from client_reader.readline())
        if not data: #client disconnected
            break
        data = yield from asyncio.wait_for(location(),
                                           timeout=1.0)
        yield from postmessage(data,client_writer)

        data = yield from asyncio.wait_for(status(),
                                           timeout=1.0)
        yield from postmessage(data,client_writer)

@asyncio.coroutine
def postmessage(data, client_writer):
        mimetype=('text/event-stream')
        response = ('data: {0}\n\n'.format(data).encode('utf-8'))
        client_writer.write(response)
        client_writer.drain()

更新:如果我在“yield from client_reader”上添加超时,当它达到通常挂起的点时,我会收到以下错误 .

2014-11-17 03:13:56,214 INFO [try:23] End Connection
2014-11-17 03:13:56,214 ERROR [base_events:912] Task exception was never retrieved
future: <Task finished coro=<handle_client() done, defined at try.py:29> exception=TimeoutError()>
Traceback (most recent call last):
  File "/opt/python3.4.2/lib/python3.4/asyncio/tasks.py", line 236, in _step
    result = next(coro)
  File "try.py", line 35, in handle_client
    timeout=1.0))
  File "/opt/python3.4.2/lib/python3.4/asyncio/tasks.py", line 375, in wait_for
    raise futures.TimeoutError()
concurrent.futures._base.TimeoutError

这是一个示例脚本,显示了操作中的错误 - 只需在python 3.4.2中运行它,经过9次迭代后,它将挂起来自客户端的读取 .

(该脚本已完成,因此您可以运行它以便自己查看)

import asyncio
import logging

import json
import time

log = logging.getLogger(__name__)

clients = {}

def accept_client(client_reader, client_writer):
    task = asyncio.Task(handle_client(client_reader, client_writer))
    clients[task] = (client_writer)

    def client_done(task):
        del clients[task]
        client_writer.close()
        log.info("End Connection")

    log.info("New Connection")
    task.add_done_callback(client_done)


@asyncio.coroutine
def handle_client(client_reader, client_writer):
    data = {'result':{'status':'Connection Ready'}}
    postmessage(data,client_writer)
    count = 0
    while True:
        data = (yield from asyncio.wait_for(client_reader.readline(),timeout=1.0))
        if not data: #client disconnected
            break

        data = yield from asyncio.wait_for(test1(),timeout=1.0)
        yield from postmessage(data,client_writer)

        data = yield from asyncio.wait_for(test2(),timeout=1.0)
        yield from postmessage(data,client_writer)

@asyncio.coroutine
def postmessage(data, client_writer):
        mimetype=('text/event-stream')
        response = ('data: {0}\n\n'.format(data).encode('utf-8'))
        client_writer.write(response)
        client_writer.drain()

@asyncio.coroutine
def test1():
        data = {'result':{
                        'test1':{ }
                    }
                }
        data = json.dumps(data)
        return data

@asyncio.coroutine
def test2():
        data = {'result':{ 
                    'test2':{ }
                    }
                }
        data = json.dumps(data)
        return data 

def main():
    loop = asyncio.get_event_loop()
    f = asyncio.start_server(accept_client, host=None, port=2991)
    loop.run_until_complete(f)
    loop.run_forever()

if __name__ == '__main__':
    log = logging.getLogger("")
    formatter = logging.Formatter("%(asctime)s %(levelname)s " +
                                  "[%(module)s:%(lineno)d] %(message)s")
    # log the things
    log.setLevel(logging.DEBUG)
    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)

    ch.setFormatter(formatter)
    log.addHandler(ch)
    main()

另一个更新:我发现它死了,因为它从客户端的 Headers 中读取所有行,然后在它用完行时超时 . 我想,我正在寻找的真正答案是当你实际上不需要从客户端接收数据时(如初始连接),如何检测客户端断开连接 .

3 回答

  • 3

    好的,我想我理解你的问题 . 您正在从客户端读取以查明客户端是否已断开连接,但是一旦客户端发送了其标头, readline() 将在客户端仍然连接时无限期地阻塞,这将阻止您实际执行任何工作 . 使用超时来避免阻塞很好,你只需要处理 TimeoutError ,因为当它发生时你可以假设客户端没有断开连接:

    from concurrent.futures import TimeoutError
    
    @asyncio.coroutine
    def handle_client(client_reader, client_writer):
        data = {'result':{'status':'Connection Ready'}}
        postmessage(data,client_writer)
        count = 0
        while True:
            try:
                # See if client has disconnected.
                data = (yield from asyncio.wait_for(client_reader.readline(),timeout=0.01))
                if not data: # Client disconnected
                    break
            except TimeoutError:
                pass  # Client hasn't disconnected.
    
            data = yield from asyncio.wait_for(test1(),timeout=1.0)
            yield from postmessage(data,client_writer)
    
            data = yield from asyncio.wait_for(test2(),timeout=1.0)
            yield from postmessage(data,client_writer)
    

    请注意,我在这里的暂停时间非常短,因为我们真的根本不想阻止,我们只想知道连接是否已关闭 .

    但更好的解决方案是不明确检查连接是否已关闭,而是处理在连接关闭时尝试通过套接字发送数据时遇到的任何异常:

    @asyncio.coroutine
    def handle_client(client_reader, client_writer):
        data = {'result':{'status':'Connection Ready'}}
        postmessage(data,client_writer)
        count = 0
        while True:
            try:
                data = yield from asyncio.wait_for(test1(),timeout=1.0)
                yield from postmessage(data,client_writer)
    
                data = yield from asyncio.wait_for(test2(),timeout=1.0)
                yield from postmessage(data,client_writer)
            except ConnectionResetError:  # And/or whatever other exceptions you see.
                break
    
  • 1

    我阅读了源代码并找到了一种简单的方法:

    if client_writer.transport._conn_lost:
        print('Connection is lost')
        # break some loop
    
  • 0

    我遇到了同样的问题, select() 在等待事件时也无法检测到死套接字 .

    我使用非阻塞 read() 解决了我的问题,并且考虑到 select() 的读取事件产生,映射到收到的 0 字节,是一个死连接 .

    在内核模式(sys调用)中执行代码时,套接字的死亡不会生成信号,也不会产生任何异常 . 您似乎是使用启发式方法,考虑到来自零字节的 read() 的产量应该被视为死链接/套接字......不是我认为优雅,但我无法找到任何东西更好(目前) .

相关问题