如何实现asyncpg事务装饰器?

我试图实现asyncpg事务包装器,如下所示:

def asyncpg_tx(autocommit=True, ignore_err=False):
'''
:param autocommit:
:param ignore_err:
:return:
'''
async def decorator(func):
    try:
        @functools.wraps(func)
        async def func_ex(*args, **kwargs):
            async with pool.acquire() as con:
                tx = con.transaction()
                await tx.start()
                logger.debug("tx %s started", tx)
                try:
                    return await func(con, *args, **kwargs)
                except Exception as e:
                    logger.exception(e)
                    await tx.rollback()
                    logger.info("tx %s rollbacked", tx)
                    raise_with_traceback(e)
                finally:
                    logger.debug("tx %s is released", tx)
                    await tx.commit()

        return await func_ex()
    except Exception as e:
            logger.exception(e)

return decorator

如果函数被装饰没有参数,这是有效的:

@asyncpg_tx()

async def load_latest_group_stats(con):

now = datetime.datetime.now()
slot = int(2 * (now.hour + now.minute / 60))  # range(0~47)

cur = await con.cursor('select * from table where $1=?', slot)
rows = await cur.fetch(5)
return rows

但是,如果fucn有params,它就不起作用:

@asyncpg_tx()

async def save_group_stats(con,stats):sql ='...'result = await con.execute(sql,stats)print(result)

错误消息是:TypeError:save_group_stats()缺少1个必需的位置参数:'stats'

调用脚本是:

result = asyncio.get_event_loop().run_until_complete(load_latest_group_stats)
print(result) # this will prints records, all ok here

stats = ... # assignment here


#async def save():
    #return await save_group_stats(stats.dict())

result = asyncio.get_event_loop().run_until_complete(save_group_stats)

如果我取消注释#async def save,并调用run_until_complete(save()),事情仍然无法解决,错误信息是:

File "C:/workspace/parrots/dal/postgres/group_stats_dao.py", line 55, in save
return await save_group_stats(stats.dict())

TypeError:'coroutine'对象不可调用sys:1:RuntimeWarning:coroutine'asyncpg_tx..decorator'从未等待过

问题是,如何正确实现包装器?

回答(0)