首页 文章

如何使用python asyncio从EC2实例调用AWS Lambda函数

提问于
浏览
1

我最近发布了一个关于How to allow invoking an AWS Lambda function only from EC2 instances inside a VPC的问题 . 我设法通过将带有"AWS lambda role"策略的IAM角色附加到EC2实例来实现它,现在我可以使用boto3调用lambda函数 .

现在,我想使用asyncio await语法异步调用lambda函数 . 我读到lambda函数通过设置 InvokeType='Event' 提供异步版本,但这实际上使得调用立即返回而不获取函数的结果 .

由于该函数需要一些时间并且我想并行启动多个函数,因此我希望避免在等待函数返回时阻止执行 .

我尝试使用aiobotocore,但这只支持基本的's3'服务功能 .

解决这个问题的最佳方法(简而言之)是使用AWS API网关服务通过GET / POST请求调用lambda函数,该请求可以使用aiohttp轻松处理 .

然而,我无法使其发挥作用 .

我在EC2 IAM角色中添加了策略“AmazonAPIGatewayInvokeFullAccess”,但每次我尝试:

import requests
r = requests.get('https://url_to_api_gateway_for_function')

我得到一个禁止的回应 <Response [403]> .

我直接使用lambda函数中的触发器创建了API网关 .

我还尝试编辑API网关设置,方法是在函数路径中添加post方法并设置“AWS_IAM”身份验证,然后将其部署为“prod”部署......没有运气 . 仍然是相同的禁止回应 . 当我通过“API网关上的测试屏幕测试它时,它工作正常” .

知道如何解决这个问题吗?我错过了一步吗?

1 回答

  • 3

    经过一番努力,我设法解决了我的问题 .

    问题是像python的请求这样的curl和python模块没有使用运行它们的EC2机器的IAM凭证来签署http请求 . 必须使用AWS v4登录协议对AWS GATEWAY API的http请求进行签名 .

    这里有一个例子:http://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html

    幸运的是,为了简单起见,有一些辅助模块,如requests-aws-sign:https://github.com/jmenga/requests-aws-sign

    最后代码看起来像:

    import aiohttp
    import asyncio
    from requests_aws_sign import AWSV4Sign
    from boto3 import session
    
    session = session.Session()
    credentials = session.get_credentials()
    region = session.region_name or 'ap-southeast-2'
    service = 'execute-api'
    url = "get_it_from_api->stages->your_deployment->invoke_url"
    auth=AWSV4Sign(credentials, region, service)
    
    async def invoke_func(loop):
        async with aiohttp.request('GET', url, auth=auth, loop=loop) as resp:
            html = await resp.text()
            print(html)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    

    希望这会节省别人的时间!

    EDIT:

    为了完整性和帮助他人,我不得不说上面的代码不起作用,因为requests_aws_sign与aiohttp不兼容 . 我得到了一些“auth字段错误” .

    我通过使用以下方法来解决它:

    async with session.get(url, headers=update_headers()) as resp:
    

    其中update_headers()是一个简单的函数,它模仿requests_aws_sign对 Headers 做了什么(这样我就可以使用header参数将它们直接设置为上面的请求) . 它看起来像这样:

    def update_headers(sim_id):
        url = urlparse("get_it_from_api->stages->your_deployment->invoke_url")
        path = url.path or '/'
        querystring = ''
        if url.query:
            querystring = '?' + urlencode(parse_qs(url.query), doseq=True)
        safe_url = url.scheme + '://' + url.netloc.split(':')[0] + path + querystring
        request = AWSRequest(method='GET', url=safe_url)
        SigV4Auth(credentials, service, region).add_auth(request)
        return dict(request.headers.items())
    

相关问题