首页 文章

Python asyncio:垃圾收集器会破坏未引用的任务吗?

提问于
浏览
2

我正在编写一个程序,通过AMQP接受RPC请求以执行网络请求(CoAP) . 处理RPC请求时,aioamqp回调会生成负责网络IO的任务 . 这些任务可以被视为后台任务,它将无限期地运行以通过AMQP进行流式网络响应(在这种情况下,一个RPC请求会触发RPC响应和数据流) .

我注意到,在我的原始代码中,网络任务将在看似随机的时间间隔(在它完成之前)被销毁,然后asyncio将打印以下警告"Task was destroyed but it is pending" . 此问题类似于此处描述的问题:https://bugs.python.org/issue21163 .

现在,我已经通过在模块级列表中存储硬引用来规避问题,这会阻止GC破坏任务对象 . 但是,我想知道是否有更好的解决方法?理想情况下,我想在RPC回调中调用await任务,但我注意到这会阻止任何进一步的AMQP操作完成 - >例如创建一个新的amqp通道停顿并通过amqp接收rpc请求也会停顿 . 我不确定是什么导致这种停滞(因为回调本身就是一个协程,我希望等待不会停止整个aioamqp库) .

我发布了下面的RPC客户端和服务器源代码,两者都基于aioamqp / aiocoap示例 . 在服务器中,on_rpc_request是amqp rpc回调,send_coap_obs_request是在删除'obs_tasks.append(task)'语句时被销毁的网络协程 .

client.py:

"""
    CoAP RPC client, based on aioamqp implementation of RPC examples from RabbitMQ tutorial
"""

import base64
import json
import uuid

import asyncio
import aioamqp


class CoAPRpcClient(object):
    def __init__(self):
        self.transport = None
        self.protocol = None
        self.channel = None
        self.callback_queue = None
        self.waiter = asyncio.Event()

    async def connect(self):
        """ an `__init__` method can't be a coroutine"""
        self.transport, self.protocol = await aioamqp.connect()
        self.channel = await self.protocol.channel()

        result = await self.channel.queue_declare(queue_name='', exclusive=True)
        self.callback_queue = result['queue']

        await self.channel.basic_consume(
            self.on_response,
            no_ack=True,
            queue_name=self.callback_queue,
        )

    async def on_response(self, channel, body, envelope, properties):
        if self.corr_id == properties.correlation_id:
            self.response = body

        self.waiter.set()

    async def call(self, n):
        if not self.protocol:
            await self.connect()
        self.response = None
        self.corr_id = str(uuid.uuid4())
        await self.channel.basic_publish(
            payload=str(n),
            exchange_name='',
            routing_key='coap_request_rpc_queue',
            properties={
                'reply_to': self.callback_queue,
                'correlation_id': self.corr_id,
            },
        )
        await self.waiter.wait()

        await self.protocol.close()
        return json.loads(self.response)


async def rpc_client():
    coap_rpc = CoAPRpcClient()

    request_dict = {}
    request_dict_json = json.dumps(request_dict)

    print(" [x] Send RPC coap_request({})".format(request_dict_json))
    response_dict = await coap_rpc.call(request_dict_json)
    print(" [.] Got {}".format(response_dict))


asyncio.get_event_loop().run_until_complete(rpc_client())

server.py:

"""
CoAP RPC server, based on aioamqp implementation of RPC examples from RabbitMQ tutorial
"""

import base64
import json
import sys

import logging
import warnings

import asyncio
import aioamqp
import aiocoap

amqp_protocol = None
coap_client_context = None
obs_tasks = []

AMQP_COAP_NOTIFICATIONS_EXCHANGE_NAME = 'topic_coap'
AMQP_COAP_NOTIFICATIONS_TOPIC_NAME = 'topic'
AMQP_COAP_NOTIFICATIONS_ROUTING_KEY = 'coap.response'

def create_response_dict(coap_request, coap_response):
    response_dict = {'request_uri': "", 'code': 0}
    response_dict['request_uri'] = coap_request.get_request_uri()
    response_dict['code'] = coap_response.code

    if len(coap_response.payload) > 0:
        response_dict['payload'] = base64.b64encode(coap_response.payload).decode('utf-8')

    return response_dict


async def handle_coap_response(amqp_envelope, amqp_properties, coap_request, coap_response):
    # create response dict:
    response_dict = create_response_dict(coap_request, coap_response)
    message = json.dumps(response_dict)

    # create new channel:
    global amqp_protocol
    amqp_channel = await amqp_protocol.channel()

    await amqp_channel.basic_publish(
        payload=message,
        exchange_name='',
        routing_key=amqp_properties.reply_to,
        properties={
            'correlation_id': amqp_properties.correlation_id,
        },
    )

    await amqp_channel.basic_client_ack(delivery_tag=amqp_envelope.delivery_tag)

    print(" [.] handle_coap_response() published response: {}".format(response_dict))


def incoming_observation(coap_request, coap_response):
    asyncio.async(handle_coap_notification(coap_request, coap_response))


async def handle_coap_notification(coap_request, coap_response):
    # create response dict:
    response_dict = create_response_dict(coap_request, coap_response)
    message = json.dumps(response_dict)

    # create new channel:
    global amqp_protocol
    amqp_channel = await amqp_protocol.channel()

    await amqp_channel.exchange(AMQP_COAP_NOTIFICATIONS_EXCHANGE_NAME, AMQP_COAP_NOTIFICATIONS_TOPIC_NAME)

    await amqp_channel.publish(message, exchange_name=AMQP_COAP_NOTIFICATIONS_EXCHANGE_NAME, routing_key=AMQP_COAP_NOTIFICATIONS_ROUTING_KEY)

    print(" [.] handle_coap_notification() published response: {}".format(response_dict))


async def send_coap_obs_request(amqp_envelope, amqp_properties, request_dict, coap_request):
    observation_is_over = asyncio.Future()
    try:
        global coap_client_context
        requester = coap_client_context.request(coap_request)
        requester.observation.register_errback(observation_is_over.set_result)
        requester.observation.register_callback(lambda data, coap_request=coap_request: incoming_observation(coap_request, data))

        try:
            print(" [..] Sending CoAP obs request: {}".format(request_dict))
            coap_response = await requester.response
        except socket.gaierror as  e:
            print("Name resolution error:", e, file=sys.stderr)
            return
        except OSError as e:
            print("Error:", e, file=sys.stderr)
            return

        if coap_response.code.is_successful():
            print(" [..] Received CoAP response: {}".format(coap_response))
            await handle_coap_response(amqp_envelope, amqp_properties, coap_request, coap_response)
        else:
            print(coap_response.code, file=sys.stderr)
            if coap_response.payload:
                print(coap_response.payload.decode('utf-8'), file=sys.stderr)
            sys.exit(1)

        exit_reason = await observation_is_over
        print("Observation is over: %r"%(exit_reason,), file=sys.stderr)

    finally:
        if not requester.response.done():
            requester.response.cancel()
        if not requester.observation.cancelled:
            requester.observation.cancel()


async def on_rpc_request(amqp_channel, amqp_body, amqp_envelope, amqp_properties):
    print(" [.] on_rpc_request(): received RPC request: {}".format(amqp_body))

    request_dict = {} # hardcoded to vdna.be for SO example
    aiocoap_code = aiocoap.GET
    aiocoap_uri = "coap://vdna.be/obs"
    aiocoap_payload = ""

    # as we are ready to send the CoAP request, ack the client already indicating we have received the RPC request
    await amqp_channel.basic_client_ack(delivery_tag=amqp_envelope.delivery_tag)

    coap_request = aiocoap.Message(code=aiocoap_code, uri=aiocoap_uri, payload=aiocoap_payload)
    coap_request.opt.observe = 0

    task = asyncio.ensure_future(send_coap_obs_request(amqp_envelope, amqp_properties, request_dict, coap_request))
    # we have to keep a hard ref to this task, otherwise the python garbage collector destroyes the task before it is completed. See https://bugs.python.org/issue21163
    # this is apparent from the "Task was destroyed but it is pending" exception thrown after random (lengthy) time intervals, probably the time interval is related to when the gc is triggered
    # await task # this does not seem to work, as it prevents new amqp operations from executing (e.g. amqp channels do not get created)
    # we are actually not interested in waiting for the task anyway, so instead just keep a hard ref to the task in the obs_tasks list
    obs_tasks.append(task) # TODO: when do we remove the task from the list?


async def amqp_connect():
    try:
        (transport, protocol) = await aioamqp.connect('localhost', 5672)
        print(" [x] Connected to AMQP broker")
        return (transport, protocol)
    except aioamqp.AmqpClosedConnection as ex:
        print("closed connections: {}".format(ex))
        raise ex


async def main():
    """Open AMQP connection to broker, subscribe to coap_request_rpc_queue and setup aiocoap client context """

    try:
        global amqp_protocol
        (amqp_transport, amqp_protocol) = await amqp_connect()

        channel = await amqp_protocol.channel()

        await channel.queue_declare(queue_name='coap_request_rpc_queue')
        await channel.basic_qos(prefetch_count=10, prefetch_size=0, connection_global=False)
        await channel.basic_consume(on_rpc_request, queue_name='coap_request_rpc_queue')

        print(" [x] Awaiting CoAP request RPC requests")
    except aioamqp.AmqpClosedConnection as ex:
        print("amqp_connect: closed connections: {}".format(ex))
        exit()

    global coap_client_context
    coap_client_context = await aiocoap.Context.create_client_context()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()

    loop.set_debug(True)

    asyncio.async(main())
    loop.run_forever()

1 回答

  • 0

    在安排任务时,会在循环中安排 _step 回调 . 该回调通过 self 维护对任务的引用 . 我没有检查代码,但我很有信心循环保持对其回调的引用 . 但是,当某个任务等待某个等待或未来时, _step 回调未被安排 . 在这种情况下,任务添加一个完成回调,保留对任务的引用,但循环不保留对等待期货的任务的引用 .

    只要某些东西保留了对任务等待的未来的参考,一切都很好 . 但是,如果没有什么可以保留对未来的硬参考,那么未来可以收集垃圾,当发生这种情况时,任务可以收集垃圾 .

    所以,我会寻找你的任务调用的东西,其中任务等待的未来可能没有被引用 . 一般来说,未来需要被引用,所以有人可以最终设置其结果,所以如果你有未参考的未来,很可能是一个错误 .

相关问题