首页 文章

Asyncio imap获取邮件python3

提问于
浏览
6

我正在使用asyncio模块进行测试,但是我需要提示/ suggesstion如何以异步方式获取大型电子邮件 .

我有一个包含邮件帐户的用户名和密码的列表 .

data = [
    {'usern': 'foo@bar.de', 'passw': 'x'},
    {'usern': 'foo2@bar.de', 'passw': 'y'},
    {'usern': 'foo3@bar.de', 'passw': 'z'} (...)
]

我想过:

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([get_attachment(d) for d in data]))
loop.close()

但是,很长一部分是下载电子邮件附件 .

电子邮件:

@asyncio.coroutine
def get_attachment(d):
    username = d['usern']
    password = d['passw']

    connection = imaplib.IMAP4_SSL('imap.bar.de')
    connection.login(username, password)
    connection.select()

    # list all available mails
    typ, data = connection.search(None, 'ALL')

    for num in data[0].split():
        # fetching each mail
        typ, data = connection.fetch(num, '(RFC822)')
        raw_string = data[0][1].decode('utf-8')
        msg = email.message_from_string(raw_string)
        for part in msg.walk():
            if part.get_content_maintype() == 'multipart':
                continue

            if part.get('Content-Disposition') is None:
                continue

            if part.get_filename():
                body = part.get_payload(decode=True)
                # do something with the body, async?

    connection.close()
    connection.logout()

我怎么能以异步方式处理所有(下载附件)邮件?

2 回答

  • 6

    如果您没有基于异步I / O的imap库,则可以使用 concurrent.futures.ThreadPoolExecutor 在线程中执行I / O. Python将在I / O期间释放GIL,因此您将获得真正的并发性:

    def init_connection(d):    
        username = d['usern']
        password = d['passw']
    
        connection = imaplib.IMAP4_SSL('imap.bar.de')
        connection.login(username, password)
        connection.select()
        return connection
    
    local = threading.local() # We use this to get a different connection per thread
    def do_fetch(num, d, rfc):
        try:
            connection = local.connection
        except AttributeError:
            connnection = local.connection = init_connection(d)
        return connnection.fetch(num, rfc)
    
    @asyncio.coroutine
    def get_attachment(d, pool):
        connection = init_connection(d)    
        # list all available mails
        typ, data = connection.search(None, 'ALL')
    
        # Kick off asynchronous tasks for all the fetches
        loop = asyncio.get_event_loop()
        futs = [asyncio.async(loop.run_in_executor(pool, do_fetch, num, d, '(RFC822)'))
                    for num in data[0].split()]
    
        # Process each fetch as it completes
        for fut in asyncio.as_completed(futs):
            typ, data = yield from fut
            raw_string = data[0][1].decode('utf-8')
            msg = email.message_from_string(raw_string)
            for part in msg.walk():
                if part.get_content_maintype() == 'multipart':
                    continue
    
                if part.get('Content-Disposition') is None:
                    continue
    
                if part.get_filename():
                    body = part.get_payload(decode=True)
                    # do something with the body, async?
    
        connection.close()
        connection.logout()    
    
    
    loop = asyncio.get_event_loop()
    pool = ThreadPoolExecutor(max_workers=5)  # You can probably increase max_workers, because the threads are almost exclusively doing I/O.
    loop.run_until_complete(asyncio.wait([get_attachment(d, pool) for d in data]))
    loop.close()
    

    这还没有得到创建线程的开销,这限制了可伸缩性并增加了额外的内存开销 . 由于包含实际I / O调用的所有代码,您也会得到一些GIL减速 . 尽管如此,如果您处理的邮件数量不到数千封,它仍然可以正常运行 .

    我们使用run_in_executorThreadPoolExecutor 用作asyncio事件循环的一部分,使用asyncio.async来包装 asyncio.Future 中返回的协同程序对象,并使用as_completed按照它们完成的顺序迭代期货 .

    Edit

    似乎 imaplib 不是线程安全的 . 我已经编辑了我的答案,通过threading.local使用线程本地存储,这允许我们为每个线程创建一个连接对象,可以在线程的整个生命周期中重复使用(意味着你只创建 num_workers 连接对象,而不是而不是每个 fetch 的新连接 .

  • 1

    我有同样的需求:使用python 3完全异步获取电子邮件 . 如果其他人有兴趣我在这里推了一个asyncio IMAP库:https://github.com/bamthomas/aioimaplib

    你可以像这样使用它:

    import asyncio
    from aioimaplib import aioimaplib
    
    @asyncio.coroutine
    def wait_for_new_message(host, user, password):
        imap_client = aioimaplib.IMAP4(host=host)
        yield from imap_client.wait_hello_from_server()
    
        yield from imap_client.login(user, password)
        yield from imap_client.select()
    
        asyncio.async(imap_client.idle())
        id = 0
        while True:
            msg = yield from imap_client.wait_server_push()
            print('--> received from server: %s' % msg)
            if 'EXISTS' in msg:
                id = msg.split()[0]
                imap_client.idle_done()
                break
    
        result, data = yield from imap_client.fetch(id, '(RFC822)')
        email_message = email.message_from_bytes(data[0])
    
        attachments = []
        body = ''
        for part in email_message.walk():
            if part.get_content_maintype() == 'multipart':
                continue
            if part.get_content_maintype() == 'text' and 'attachment' not in part.get('Content-Disposition', ''):
                body = part.get_payload(decode=True).decode(part.get_param('charset', 'ascii')).strip()
            else:
                attachments.append(
                    {'type': part.get_content_type(), 'filename': part.get_filename(), 'size': len(part.as_bytes())})
    
        print('attachments : %s' % attachments)
        print('body : %s' % body)
        yield from imap_client.logout()
    
    
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.run_until_complete(wait_for_new_message('my.imap.server', 'user', 'pass'))
    

    带有附件的大型电子邮件也会使用asyncio下载 .

相关问题