首页 文章

如何缓存asyncio协同程序

提问于
浏览
9

我正在使用aiohttp在python 3.4中发出一个简单的HTTP请求,如下所示:

response = yield from aiohttp.get(url)

应用程序一遍又一遍地请求相同的URL,所以我自然想要缓存它 . 我的第一次尝试是这样的:

@functools.lru_cache(maxsize=128)
def cached_request(url):
    return aiohttp.get(url)

第一次调用 cached_request 工作正常,但在以后的调用中我最终使用 None 而不是响应对象 .

我对asyncio比较新,所以我尝试了很多 asyncio.coroutine 装饰器, yield from 和其他一些东西的组合,但似乎都没有 .

那么缓存协同程序如何工作?

7 回答

  • 3

    我自己写了一个简单的缓存装饰器:

    def async_cache(maxsize=128):
        cache = {}
    
        def decorator(fn):
            def wrapper(*args):                                                         
                key = ':'.join(args)
    
                if key not in cache:
                    if len(cache) >= maxsize:
                        del cache[cache.keys().next()]
    
                    cache[key] = yield from fn(*args)
    
                return cache[key]
    
            return wrapper
    
        return decorator
    
    
    @async_cache()
    @asyncio.coroutine
    def expensive_io():
        ....
    

    这种工作方式 . 但是很多方面都可能得到改善 . 例如:如果在第一次调用返回之前第二次调用缓存函数,它将再次执行 .

  • 0

    也许有点晚了,但我已经开始了一个可能有用的新包装:https://github.com/argaen/aiocache . 我们随时欢迎您的贡献/评论 .

    一个例子:

    import asyncio
    
    from collections import namedtuple
    
    from aiocache import cached
    from aiocache.serializers import PickleSerializer
    
    Result = namedtuple('Result', "content, status")
    
    
    @cached(ttl=10, serializer=PickleSerializer())
    async def async_main():
        print("First ASYNC non cached call...")
        await asyncio.sleep(1)
        return Result("content", 200)
    
    
    if __name__ == "__main__":
        loop = asyncio.get_event_loop()
        print(loop.run_until_complete(async_main()))
        print(loop.run_until_complete(async_main()))
        print(loop.run_until_complete(async_main()))
        print(loop.run_until_complete(async_main()))
    

    请注意,作为额外的,它可以使用Pickle序列化将任何python对象缓存到redis中 . 如果您只想使用内存,可以使用 SimpleMemoryCache 后端:) .

  • 0

    要使用带有协同程序的 functools.lru_cache ,以下代码可以正常工作 .

    class Cacheable:
        def __init__(self, co):
            self.co = co
            self.done = False
            self.result = None
            self.lock = asyncio.Lock()
    
        def __await__(self):
            with (yield from self.lock):
                if self.done:
                    return self.result
                self.result = yield from self.co.__await__()
                self.done = True
                return self.result
    
    def cacheable(f):
        def wrapped(*args, **kwargs):
            r = f(*args, **kwargs)
            return Cacheable(r)
        return wrapped
    
    
    @functools.lru_cache()
    @cacheable
    async def foo():
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
                return await resp.text()
    

    以下是线程安全的

    class ThreadSafeCacheable:
        def __init__(self, co):
            self.co = co
            self.done = False
            self.result = None
            self.lock = threading.Lock()
    
        def __await__(self):
            while True:
                if self.done:
                    return self.result
                if self.lock.acquire(blocking=False):
                    self.result = yield from self.co.__await__()
                    self.done = True
                    return self.result
                else:
                    yield from asyncio.sleep(0.005)
    
  • 3

    我不熟悉aiohttp所以我不确定会发生什么会导致Nones被返回,但是lru_cache装饰器不能用于异步函数 .

    我使用的装饰器基本上是一样的;请注意,它与上面的tobib的装饰器不同,它总是返回一个future或一个任务,而不是值:

    from collections import OrderedDict
    from functools import _make_key, wraps
    
    def future_lru_cache(maxsize=128):
        # support use as decorator without calling, for this case maxsize will
        # not be an int
        try:
            real_max_size = int(maxsize)
        except ValueError:
            real_max_size = 128
    
        cache = OrderedDict()
    
        async def run_and_cache(func, args, kwargs):
            """Run func with the specified arguments and store the result
            in cache."""
            result = await func(*args, **kwargs)
            cache[_make_key(args, kwargs, False)] = result
            if len(cache) > real_max_size:
                cache.popitem(False)
            return result
    
        def wrapper(func):
            @wraps(func)
            def decorator(*args, **kwargs):
                key = _make_key(args, kwargs, False)
                if key in cache:
                    # Some protection against duplicating calls already in
                    # progress: when starting the call cache the future, and if
                    # the same thing is requested again return that future.
                    if isinstance(cache[key], asyncio.Future):
                        return cache[key]
                    else:
                        f = asyncio.Future()
                        f.set_result(cache[key])
                        return f
                else:
                    task = asyncio.Task(run_and_cache(func, args, kwargs))
                    cache[key] = task
                    return task
            return decorator
    
        if callable(maxsize):
            return wrapper(maxsize)
        else:
            return wrapper
    

    我使用functools中的_make_key作为lru_cache,我猜它应该是私有的,所以最好把它复制一遍 .

  • 0

    lru装饰器的另一个变种,它缓存尚未完成的协同程序,对同一个键的并行请求非常有用:

    import asyncio
    from collections import OrderedDict
    from functools import _make_key, wraps
    
    def async_cache(maxsize=128, event_loop=None):
        cache = OrderedDict()
        if event_loop is None:
            event_loop = asyncio.get_event_loop()
        awaiting = dict()
    
        async def run_and_cache(func, args, kwargs):
            """await func with the specified arguments and store the result
            in cache."""
            result = await func(*args, **kwargs)
            key = _make_key(args, kwargs, False)
            cache[key] = result
            if len(cache) > maxsize:
                cache.popitem(False)
            cache.move_to_end(key)
            return result
    
        def decorator(func):
            @wraps(func)
            async def wrapper(*args, **kwargs):
                key = _make_key(args, kwargs, False)
                if key in cache:
                    return cache[key]
                if key in awaiting:
                    task = awaiting[key]
                    return await asyncio.wait_for(task, timeout=None, loop=event_loop)
                task = asyncio.ensure_future(run_and_cache(func, args, kwargs), loop=event_loop)
                awaiting[key] = task
                result = await asyncio.wait_for(task, timeout=None, loop=event_loop)
                del awaiting[key]
                return result
            return wrapper
    
        return decorator
    
    
    async def test_async_cache(event_loop):
        counter = 0
        n, m = 10, 3
    
        @async_cache(maxsize=n, event_loop=event_loop)
        async def cached_function(x):
            nonlocal counter
            await asyncio.sleep(0)  # making event loop switch to other coroutine
            counter += 1
            return x
    
        tasks = [asyncio.ensure_future(cached_function(x), loop=event_loop)
                 for x in list(range(n)) * m]
        done, pending = await asyncio.wait(tasks, loop=event_loop, timeout=1)
        assert len(done) == n * m
        assert counter == n
    
    event_loop = asyncio.get_event_loop()
    task = asyncio.ensure_future(test_async_cache(event_loop))
    event_loop.run_until_complete(task)
    
  • 2

    我认为最简单的方法是使用aiohttp_cachedocumentation

    pip install aiohttp-cache
    

    并在代码中使用它:

    from aiohttp_cache import cache, setup_cache
    
    @cache()  # <-- DECORATED FUNCTION
    async def example_1(request):
        return web.Response(text="Example")
    
    
    app = web.Application()
    
    app.router.add_route('GET', "/", example_1)
    
    setup_cache(app)  # <-- INITIALIZED aiohttp-cache
    
    web.run_app(app, host="127.0.0.1")
    
  • 0

    这里存在一个流行的异步版 lru_cacheasync_lru

相关问题