首页 文章

如何使用带有moto的boto3测试方法

提问于
浏览
6

我正在编写测试用例,以便使用boto3从s3中查找/获取密钥 . 我过去使用过moto来测试boto(而不是3)代码,但我正试图通过这个项目转移到boto3,并遇到了一个问题:

class TestS3Actor(unittest.TestCase):
    @mock_s3
    def setUp(self):
        self.bucket_name = 'test_bucket_01'
        self.key_name = 'stats_com/fake_fake/test.json'
        self.key_contents = 'This is test data.'
        s3 = boto3.session.Session().resource('s3')
        s3.create_bucket(Bucket=self.bucket_name)
        s3.Object(self.bucket_name, self.key_name).put(Body=self.key_contents)

错误:

...
File "/Library/Python/2.7/site-packages/botocore/vendored/requests/packages/urllib3/connectionpool.py", line 344, in _make_request
self._raise_timeout(err=e, url=url, timeout_value=conn.timeout)
File "/Library/Python/2.7/site-packages/botocore/vendored/requests/packages/urllib3/connectionpool.py", line 314, in _raise_timeout
if 'timed out' in str(err) or 'did not complete (read)' in str(err):  # Python 2.6
TypeError: __str__ returned non-string (type WantWriteError)
botocore.hooks: DEBUG: Event needs-retry.s3.CreateBucket: calling handler <botocore.retryhandler.RetryHandler object at 0x10ce75310>

看起来moto没有正确地模仿boto3调用 - 我该如何工作呢?

2 回答

  • 7

    对我来说有用的是在使用 boto3 运行模拟测试之前使用 boto 设置环境 .

    这是一个工作片段:

    import unittest
    import boto
    from boto.s3.key import Key
    from moto import mock_s3
    import boto3
    
    
    class TestS3Actor(unittest.TestCase):
        mock_s3 = mock_s3()
    
        def setUp(self):
            self.mock_s3.start()
            self.location = "eu-west-1"
            self.bucket_name = 'test_bucket_01'
            self.key_name = 'stats_com/fake_fake/test.json'
            self.key_contents = 'This is test data.'
            s3 = boto.connect_s3()
            bucket = s3.create_bucket(self.bucket_name, location=self.location)
            k = Key(bucket)
            k.key = self.key_name
            k.set_contents_from_string(self.key_contents)
    
        def tearDown(self):
            self.mock_s3.stop()
    
        def test_s3_boto3(self):
            s3 = boto3.resource('s3', region_name=self.location)
            bucket = s3.Bucket(self.bucket_name)
            assert bucket.name == self.bucket_name
            # retrieve already setup keys
            keys = list(bucket.objects.filter(Prefix=self.key_name))
            assert len(keys) == 1
            assert keys[0].key == self.key_name
            # update key
            s3.Object(self.bucket_name, self.key_name).put(Body='new')
            key = s3.Object(self.bucket_name, self.key_name).get()
            assert 'new' == key['Body'].read()
    

    使用 py.test test.py 运行时,您将获得以下输出:

    collected 1 items 
    
    test.py .
    
    ========================================================================================= 1 passed in 2.22 seconds =========================================================================================
    
  • 1

    根据this信息,它似乎尚不支持使用Boto3 S3 Put流式传输到s3 .

    就我而言,我使用以下命令将对象成功上传到存储桶:

    s3.Object(self.s3_bucket_name, self.s3_key).put(Body=open("file_to_upload", 'rb'))
    

    其中“file_to_upload”是您要上传到s3存储桶的本地文件 . 对于您的测试用例,您只需创建一个临时文件来检查此功能:

    test_file = open("test_file.json", "w")
    test_file.write("some test contents")
    test_file.close()
    s3.Object(self.s3_bucket_name, self.s3_key).put(Body=open("test_file", 'rb'))
    

相关问题