首页 文章

在Python中用生成器创建一个zip文件?

提问于
浏览
19

我有大量的数据(几个演出)我需要用Python写一个zip文件 . 我不能一次将它全部加载到内存中以传递给ZipFile的.writestr方法,我真的不想使用临时文件将它全部输出到磁盘然后再读回来 .

有没有办法将生成器或类文件对象提供给ZipFile库?或者是否有某些原因似乎不支持此功能?

通过zip文件,我的意思是zip文件 . 正如Python zipfile包中所支持的那样 .

10 回答

  • 2

    gzip库将采用类似文件的对象进行压缩 .

    class GzipFile([filename [,mode [,compresslevel [,fileobj]]]])
    

    您仍然需要提供一个名义文件名以包含在zip文件中,但您可以将数据源传递给fileobj .

    (这个答案与Damnsweet的答案不同,因为重点应放在逐步读取的数据源上,而不是逐步写入的压缩文件 . )

    我现在看到原来的提问者不会接受Gzip :-(

  • 0

    现在使用python 2.7,您可以将数据添加到文件的zipfile中:

    http://docs.python.org/2/library/zipfile#zipfile.ZipFile.writestr

  • 2

    这是2017年 . 如果您仍然希望优雅地使用Python Zipstream by allanlei . 到目前为止,它可能是唯一一个写得很好的库 .

  • 0

    如果有人偶然发现这个问题,这在2017年仍然与Python 2.7相关,这里是一个真正的流式压缩文件的工作解决方案,不需要像其他情况那样可以搜索输出 . 秘诀是设置通用位标志的第3位(参见https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT第4.3.9.1节) .

    请注意,此实现将始终创建ZIP64样式的文件,允许流式传输适用于任意大的文件 . 它包含一个丑陋的黑客来强制zip64结束中央目录记录,因此请注意它将导致您的进程编写的所有zipfiles变为ZIP64风格 .

    import io
    import zipfile
    import zlib
    import binascii
    import struct
    
    class ByteStreamer(io.BytesIO):
        '''
        Variant on BytesIO which lets you write and consume data while
        keeping track of the total filesize written. When data is consumed
        it is removed from memory, keeping the memory requirements low.
        '''
        def __init__(self):
            super(ByteStreamer, self).__init__()
            self._tellall = 0
    
        def tell(self):
            return self._tellall
    
        def write(self, b):
            orig_size = super(ByteStreamer, self).tell()
            super(ByteStreamer, self).write(b)
            new_size = super(ByteStreamer, self).tell()
            self._tellall += (new_size - orig_size)
    
        def consume(self):
            bytes = self.getvalue()
            self.seek(0)
            self.truncate(0)
            return bytes
    
    class BufferedZipFileWriter(zipfile.ZipFile):
        '''
        ZipFile writer with true streaming (input and output).
        Created zip files are always ZIP64-style because it is the only safe way to stream
        potentially large zip files without knowing the full size ahead of time.
    
        Example usage:
        >>> def stream():
        >>>     bzfw = BufferedZip64FileWriter()
        >>>     for arc_path, buffer in inputs:  # buffer is a file-like object which supports read(size)
        >>>         for chunk in bzfw.streambuffer(arc_path, buffer):
        >>>             yield chunk
        >>>     yield bzfw.close()
        '''
        def __init__(self, compression=zipfile.ZIP_DEFLATED):
            self._buffer = ByteStreamer()
            super(BufferedZipFileWriter, self).__init__(self._buffer, mode='w', compression=compression, allowZip64=True)
    
        def streambuffer(self, zinfo_or_arcname, buffer, chunksize=2**16):
            if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
                zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname,
                                        date_time=time.localtime(time.time())[:6])
                zinfo.compress_type = self.compression
                zinfo.external_attr = 0o600 << 16     # ?rw-------
            else:
                zinfo = zinfo_or_arcname
    
            zinfo.file_size = file_size = 0
            zinfo.flag_bits = 0x08  # Streaming mode: crc and size come after the data
            zinfo.header_offset = self.fp.tell()
    
            self._writecheck(zinfo)
            self._didModify = True
    
            zinfo.CRC = CRC = 0
            zinfo.compress_size = compress_size = 0
            self.fp.write(zinfo.FileHeader())
            if zinfo.compress_type == zipfile.ZIP_DEFLATED:
                cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
            else:
                cmpr = None
    
            while True:
                buf = buffer.read(chunksize)
                if not buf:
                    break
    
                file_size += len(buf)
                CRC = binascii.crc32(buf, CRC) & 0xffffffff
                if cmpr:
                    buf = cmpr.compress(buf)
                    compress_size += len(buf)
    
                self.fp.write(buf)
                compressed_bytes = self._buffer.consume()
                if compressed_bytes:
                    yield compressed_bytes
    
            if cmpr:
                buf = cmpr.flush()
                compress_size += len(buf)
                self.fp.write(buf)
                zinfo.compress_size = compress_size
                compressed_bytes = self._buffer.consume()
                if compressed_bytes:
                    yield compressed_bytes
            else:
                zinfo.compress_size = file_size
    
            zinfo.CRC = CRC
            zinfo.file_size = file_size
    
            # Write CRC and file sizes after the file data
            # Always write as zip64 -- only safe way to stream what might become a large zipfile
            fmt = '<LQQ'
            self.fp.write(struct.pack(fmt, zinfo.CRC, zinfo.compress_size, zinfo.file_size))
    
            self.fp.flush()
            self.filelist.append(zinfo)
            self.NameToInfo[zinfo.filename] = zinfo
            yield self._buffer.consume()
    
        # The close method needs to be patched to force writing a ZIP64 file
        # We'll hack ZIP_FILECOUNT_LIMIT to do the forcing
        def close(self):
            tmp = zipfile.ZIP_FILECOUNT_LIMIT
            zipfile.ZIP_FILECOUNT_LIMIT = 0
            super(BufferedZipFileWriter, self).close()
            zipfile.ZIP_FILECOUNT_LIMIT = tmp
            return self._buffer.consume()
    
  • 9

    唯一的解决方案是重写用于压缩文件以从缓冲区读取的方法 . 将它添加到标准库中是微不足道的;我有点惊讶它还没有完成 . 我认为整个界面需要彻底检查,并且这似乎阻止了任何增量改进 .

    import zipfile, zlib, binascii, struct
    class BufferedZipFile(zipfile.ZipFile):
        def writebuffered(self, zipinfo, buffer):
            zinfo = zipinfo
    
            zinfo.file_size = file_size = 0
            zinfo.flag_bits = 0x00
            zinfo.header_offset = self.fp.tell()
    
            self._writecheck(zinfo)
            self._didModify = True
    
            zinfo.CRC = CRC = 0
            zinfo.compress_size = compress_size = 0
            self.fp.write(zinfo.FileHeader())
            if zinfo.compress_type == zipfile.ZIP_DEFLATED:
                cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
            else:
                cmpr = None
    
            while True:
                buf = buffer.read(1024 * 8)
                if not buf:
                    break
    
                file_size = file_size + len(buf)
                CRC = binascii.crc32(buf, CRC) & 0xffffffff
                if cmpr:
                    buf = cmpr.compress(buf)
                    compress_size = compress_size + len(buf)
    
                self.fp.write(buf)
    
            if cmpr:
                buf = cmpr.flush()
                compress_size = compress_size + len(buf)
                self.fp.write(buf)
                zinfo.compress_size = compress_size
            else:
                zinfo.compress_size = file_size
    
            zinfo.CRC = CRC
            zinfo.file_size = file_size
    
            position = self.fp.tell()
            self.fp.seek(zinfo.header_offset + 14, 0)
            self.fp.write(struct.pack("<LLL", zinfo.CRC, zinfo.compress_size, zinfo.file_size))
            self.fp.seek(position, 0)
            self.filelist.append(zinfo)
            self.NameToInfo[zinfo.filename] = zinfo
    
  • 3

    我拿了Chris B.'s answer并创建了一个完整的解决方案 . 这是以防其他人感兴趣的:

    import os
    import threading
    from zipfile import *
    import zlib, binascii, struct
    
    class ZipEntryWriter(threading.Thread):
        def __init__(self, zf, zinfo, fileobj):
            self.zf = zf
            self.zinfo = zinfo
            self.fileobj = fileobj
    
            zinfo.file_size = 0
            zinfo.flag_bits = 0x00
            zinfo.header_offset = zf.fp.tell()
    
            zf._writecheck(zinfo)
            zf._didModify = True
    
            zinfo.CRC = 0
            zinfo.compress_size = compress_size = 0
            zf.fp.write(zinfo.FileHeader())
    
            super(ZipEntryWriter, self).__init__()
    
        def run(self):
            zinfo = self.zinfo
            zf = self.zf
            file_size = 0
            CRC = 0
    
            if zinfo.compress_type == ZIP_DEFLATED:
                cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
            else:
                cmpr = None
            while True:
                buf = self.fileobj.read(1024 * 8)
                if not buf:
                    self.fileobj.close()
                    break
    
                file_size = file_size + len(buf)
                CRC = binascii.crc32(buf, CRC)
                if cmpr:
                    buf = cmpr.compress(buf)
                    compress_size = compress_size + len(buf)
    
                zf.fp.write(buf)
    
            if cmpr:
                buf = cmpr.flush()
                compress_size = compress_size + len(buf)
                zf.fp.write(buf)
                zinfo.compress_size = compress_size
            else:
                zinfo.compress_size = file_size
    
            zinfo.CRC = CRC
            zinfo.file_size = file_size
    
            position = zf.fp.tell()
            zf.fp.seek(zinfo.header_offset + 14, 0)
            zf.fp.write(struct.pack("<lLL", zinfo.CRC, zinfo.compress_size, zinfo.file_size))
            zf.fp.seek(position, 0)
            zf.filelist.append(zinfo)
            zf.NameToInfo[zinfo.filename] = zinfo
    
    class EnhZipFile(ZipFile, object):
    
        def _current_writer(self):
            return hasattr(self, 'cur_writer') and self.cur_writer or None
    
        def assert_no_current_writer(self):
            cur_writer = self._current_writer()
            if cur_writer and cur_writer.isAlive():
                raise ValueError('An entry is already started for name: %s' % cur_write.zinfo.filename)
    
        def write(self, filename, arcname=None, compress_type=None):
            self.assert_no_current_writer()
            super(EnhZipFile, self).write(filename, arcname, compress_type)
    
        def writestr(self, zinfo_or_arcname, bytes):
            self.assert_no_current_writer()
            super(EnhZipFile, self).writestr(zinfo_or_arcname, bytes)
    
        def close(self):
            self.finish_entry()
            super(EnhZipFile, self).close()
    
        def start_entry(self, zipinfo):
            """
            Start writing a new entry with the specified ZipInfo and return a
            file like object. Any data written to the file like object is
            read by a background thread and written directly to the zip file.
            Make sure to close the returned file object, before closing the
            zipfile, or the close() would end up hanging indefinitely.
    
            Only one entry can be open at any time. If multiple entries need to
            be written, make sure to call finish_entry() before calling any of
            these methods:
            - start_entry
            - write
            - writestr
            It is not necessary to explicitly call finish_entry() before closing
            zipfile.
    
            Example:
                zf = EnhZipFile('tmp.zip', 'w')
                w = zf.start_entry(ZipInfo('t.txt'))
                w.write("some text")
                w.close()
                zf.close()
            """
            self.assert_no_current_writer()
            r, w = os.pipe()
            self.cur_writer = ZipEntryWriter(self, zipinfo, os.fdopen(r, 'r'))
            self.cur_writer.start()
            return os.fdopen(w, 'w')
    
        def finish_entry(self, timeout=None):
            """
            Ensure that the ZipEntry that is currently being written is finished.
            Joins on any background thread to exit. It is safe to call this method
            multiple times.
            """
            cur_writer = self._current_writer()
            if not cur_writer or not cur_writer.isAlive():
                return
            cur_writer.join(timeout)
    
    if __name__ == "__main__":
        zf = EnhZipFile('c:/tmp/t.zip', 'w')
        import time
        w = zf.start_entry(ZipInfo('t.txt', time.localtime()[:6]))
        w.write("Line1\n")
        w.write("Line2\n")
        w.close()
        zf.finish_entry()
        w = zf.start_entry(ZipInfo('p.txt', time.localtime()[:6]))
        w.write("Some text\n")
        w.close()
        zf.close()
    
  • 12

    gzip.GzipFile以gzip压缩块的形式写入数据,您可以根据从文件中读取的行数来设置块的大小 .

    一个例子:

    file = gzip.GzipFile('blah.gz', 'wb')
    sourcefile = open('source', 'rb')
    chunks = []
    for line in sourcefile:
      chunks.append(line)
      if len(chunks) >= X: 
          file.write("".join(chunks))
          file.flush()
          chunks = []
    
  • 0

    基本压缩由zlib.compressobj完成 . ZipFile(在MacOSX上的Python 2.5下似乎是编译的) . Python 2.3版本如下 .

    您可以看到它以8k块的形式构建压缩文件 . 取出源文件信息很复杂,因为许多源文件属性(如未压缩的大小)都记录在zip文件头中 .

    def write(self, filename, arcname=None, compress_type=None):
        """Put the bytes from filename into the archive under the name
        arcname."""
    
        st = os.stat(filename)
        mtime = time.localtime(st.st_mtime)
        date_time = mtime[0:6]
        # Create ZipInfo instance to store file information
        if arcname is None:
            zinfo = ZipInfo(filename, date_time)
        else:
            zinfo = ZipInfo(arcname, date_time)
        zinfo.external_attr = st[0] << 16L      # Unix attributes
        if compress_type is None:
            zinfo.compress_type = self.compression
        else:
            zinfo.compress_type = compress_type
        self._writecheck(zinfo)
        fp = open(filename, "rb")
    
        zinfo.flag_bits = 0x00
        zinfo.header_offset = self.fp.tell()    # Start of header bytes
        # Must overwrite CRC and sizes with correct data later
        zinfo.CRC = CRC = 0
        zinfo.compress_size = compress_size = 0
        zinfo.file_size = file_size = 0
        self.fp.write(zinfo.FileHeader())
        zinfo.file_offset = self.fp.tell()      # Start of file bytes
        if zinfo.compress_type == ZIP_DEFLATED:
            cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
                 zlib.DEFLATED, -15)
        else:
            cmpr = None
        while 1:
            buf = fp.read(1024 * 8)
            if not buf:
                break
            file_size = file_size + len(buf)
            CRC = binascii.crc32(buf, CRC)
            if cmpr:
                buf = cmpr.compress(buf)
                compress_size = compress_size + len(buf)
            self.fp.write(buf)
        fp.close()
        if cmpr:
            buf = cmpr.flush()
            compress_size = compress_size + len(buf)
            self.fp.write(buf)
            zinfo.compress_size = compress_size
        else:
            zinfo.compress_size = file_size
        zinfo.CRC = CRC
        zinfo.file_size = file_size
        # Seek backwards and write CRC and file sizes
        position = self.fp.tell()       # Preserve current position in file
        self.fp.seek(zinfo.header_offset + 14, 0)
        self.fp.write(struct.pack("<lLL", zinfo.CRC, zinfo.compress_size,
              zinfo.file_size))
        self.fp.seek(position, 0)
        self.filelist.append(zinfo)
        self.NameToInfo[zinfo.filename] = zinfo
    
  • 0

    一些(许多?大多数?)压缩算法基于查看整个文件中的冗余 .

    一些压缩库将在几种压缩算法之间进行选择,这些压缩算法最适合文件 .

    我相信ZipFile模块可以做到这一点,所以它希望看到整个文件,而不是一次只看一块 .

    因此,它不适用于生成器或文件大到加载内存 . 这可以解释Zipfile库的局限性 .

  • 3

    已更改 Python 3.5 (来自官方文档):已添加support以写入 unseekable 流 .

    这意味着现在对于 zipfile.ZipFile ,我们可以使用不将整个文件存储在内存中的流 . 这样的流do not support在整个数据卷上移动 .

    所以这是简单的发电机:

    from zipfile import ZipFile, ZipInfo
    
    def zipfile_generator(path, stream):
        with ZipFile(stream, mode='w') as zf:
            z_info = ZipInfo.from_file(path)
            with open(path, 'rb') as entry, zf.open(z_info, mode='w') as dest:
                for chunk in iter(lambda: entry.read(16384), b''):
                    dest.write(chunk)
                    # Yield chunk of the zip file stream in bytes.
                    yield stream.get()
        # ZipFile was closed.
        yield stream.get()
    

    path 是大文件或目录或 pathlike 对象的字符串路径 .

    stream 是类的 unseekable 流实例(根据official docs设计):

    from io import RawIOBase
    
    class UnseekableStream(RawIOBase):
        def __init__(self):
            self._buffer = b''
    
        def writable(self):
            return True
    
        def write(self, b):
            if self.closed:
                raise ValueError('Stream was closed!')
            self._buffer += b
            return len(b)
    
        def get(self):
            chunk = self._buffer
            self._buffer = b''
            return chunk
    

    您可以在线试用此代码:https://repl.it/@IvanErgunov/zipfilegenerator


    还有另一种方法可以创建没有 ZipInfo 的生成器并手动读取和分割大文件 . 您可以将 queue.Queue() 对象传递给 UnseekableStream() 对象,并在另一个线程中写入此队列 . 然后在当前线程中,您可以以可迭代的方式简单地从此队列中读取块 . 见docs

    P.S. Python Zipstream by allanlei已经过时且不可靠 . 它是在正式完成之前尝试添加对不可见流的支持 .

相关问题