import io
import zipfile
import zlib
import binascii
import struct
class ByteStreamer(io.BytesIO):
'''
Variant on BytesIO which lets you write and consume data while
keeping track of the total filesize written. When data is consumed
it is removed from memory, keeping the memory requirements low.
'''
def __init__(self):
super(ByteStreamer, self).__init__()
self._tellall = 0
def tell(self):
return self._tellall
def write(self, b):
orig_size = super(ByteStreamer, self).tell()
super(ByteStreamer, self).write(b)
new_size = super(ByteStreamer, self).tell()
self._tellall += (new_size - orig_size)
def consume(self):
bytes = self.getvalue()
self.seek(0)
self.truncate(0)
return bytes
class BufferedZipFileWriter(zipfile.ZipFile):
'''
ZipFile writer with true streaming (input and output).
Created zip files are always ZIP64-style because it is the only safe way to stream
potentially large zip files without knowing the full size ahead of time.
Example usage:
>>> def stream():
>>> bzfw = BufferedZip64FileWriter()
>>> for arc_path, buffer in inputs: # buffer is a file-like object which supports read(size)
>>> for chunk in bzfw.streambuffer(arc_path, buffer):
>>> yield chunk
>>> yield bzfw.close()
'''
def __init__(self, compression=zipfile.ZIP_DEFLATED):
self._buffer = ByteStreamer()
super(BufferedZipFileWriter, self).__init__(self._buffer, mode='w', compression=compression, allowZip64=True)
def streambuffer(self, zinfo_or_arcname, buffer, chunksize=2**16):
if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname,
date_time=time.localtime(time.time())[:6])
zinfo.compress_type = self.compression
zinfo.external_attr = 0o600 << 16 # ?rw-------
else:
zinfo = zinfo_or_arcname
zinfo.file_size = file_size = 0
zinfo.flag_bits = 0x08 # Streaming mode: crc and size come after the data
zinfo.header_offset = self.fp.tell()
self._writecheck(zinfo)
self._didModify = True
zinfo.CRC = CRC = 0
zinfo.compress_size = compress_size = 0
self.fp.write(zinfo.FileHeader())
if zinfo.compress_type == zipfile.ZIP_DEFLATED:
cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
else:
cmpr = None
while True:
buf = buffer.read(chunksize)
if not buf:
break
file_size += len(buf)
CRC = binascii.crc32(buf, CRC) & 0xffffffff
if cmpr:
buf = cmpr.compress(buf)
compress_size += len(buf)
self.fp.write(buf)
compressed_bytes = self._buffer.consume()
if compressed_bytes:
yield compressed_bytes
if cmpr:
buf = cmpr.flush()
compress_size += len(buf)
self.fp.write(buf)
zinfo.compress_size = compress_size
compressed_bytes = self._buffer.consume()
if compressed_bytes:
yield compressed_bytes
else:
zinfo.compress_size = file_size
zinfo.CRC = CRC
zinfo.file_size = file_size
# Write CRC and file sizes after the file data
# Always write as zip64 -- only safe way to stream what might become a large zipfile
fmt = '<LQQ'
self.fp.write(struct.pack(fmt, zinfo.CRC, zinfo.compress_size, zinfo.file_size))
self.fp.flush()
self.filelist.append(zinfo)
self.NameToInfo[zinfo.filename] = zinfo
yield self._buffer.consume()
# The close method needs to be patched to force writing a ZIP64 file
# We'll hack ZIP_FILECOUNT_LIMIT to do the forcing
def close(self):
tmp = zipfile.ZIP_FILECOUNT_LIMIT
zipfile.ZIP_FILECOUNT_LIMIT = 0
super(BufferedZipFileWriter, self).close()
zipfile.ZIP_FILECOUNT_LIMIT = tmp
return self._buffer.consume()
import os
import threading
from zipfile import *
import zlib, binascii, struct
class ZipEntryWriter(threading.Thread):
def __init__(self, zf, zinfo, fileobj):
self.zf = zf
self.zinfo = zinfo
self.fileobj = fileobj
zinfo.file_size = 0
zinfo.flag_bits = 0x00
zinfo.header_offset = zf.fp.tell()
zf._writecheck(zinfo)
zf._didModify = True
zinfo.CRC = 0
zinfo.compress_size = compress_size = 0
zf.fp.write(zinfo.FileHeader())
super(ZipEntryWriter, self).__init__()
def run(self):
zinfo = self.zinfo
zf = self.zf
file_size = 0
CRC = 0
if zinfo.compress_type == ZIP_DEFLATED:
cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
else:
cmpr = None
while True:
buf = self.fileobj.read(1024 * 8)
if not buf:
self.fileobj.close()
break
file_size = file_size + len(buf)
CRC = binascii.crc32(buf, CRC)
if cmpr:
buf = cmpr.compress(buf)
compress_size = compress_size + len(buf)
zf.fp.write(buf)
if cmpr:
buf = cmpr.flush()
compress_size = compress_size + len(buf)
zf.fp.write(buf)
zinfo.compress_size = compress_size
else:
zinfo.compress_size = file_size
zinfo.CRC = CRC
zinfo.file_size = file_size
position = zf.fp.tell()
zf.fp.seek(zinfo.header_offset + 14, 0)
zf.fp.write(struct.pack("<lLL", zinfo.CRC, zinfo.compress_size, zinfo.file_size))
zf.fp.seek(position, 0)
zf.filelist.append(zinfo)
zf.NameToInfo[zinfo.filename] = zinfo
class EnhZipFile(ZipFile, object):
def _current_writer(self):
return hasattr(self, 'cur_writer') and self.cur_writer or None
def assert_no_current_writer(self):
cur_writer = self._current_writer()
if cur_writer and cur_writer.isAlive():
raise ValueError('An entry is already started for name: %s' % cur_write.zinfo.filename)
def write(self, filename, arcname=None, compress_type=None):
self.assert_no_current_writer()
super(EnhZipFile, self).write(filename, arcname, compress_type)
def writestr(self, zinfo_or_arcname, bytes):
self.assert_no_current_writer()
super(EnhZipFile, self).writestr(zinfo_or_arcname, bytes)
def close(self):
self.finish_entry()
super(EnhZipFile, self).close()
def start_entry(self, zipinfo):
"""
Start writing a new entry with the specified ZipInfo and return a
file like object. Any data written to the file like object is
read by a background thread and written directly to the zip file.
Make sure to close the returned file object, before closing the
zipfile, or the close() would end up hanging indefinitely.
Only one entry can be open at any time. If multiple entries need to
be written, make sure to call finish_entry() before calling any of
these methods:
- start_entry
- write
- writestr
It is not necessary to explicitly call finish_entry() before closing
zipfile.
Example:
zf = EnhZipFile('tmp.zip', 'w')
w = zf.start_entry(ZipInfo('t.txt'))
w.write("some text")
w.close()
zf.close()
"""
self.assert_no_current_writer()
r, w = os.pipe()
self.cur_writer = ZipEntryWriter(self, zipinfo, os.fdopen(r, 'r'))
self.cur_writer.start()
return os.fdopen(w, 'w')
def finish_entry(self, timeout=None):
"""
Ensure that the ZipEntry that is currently being written is finished.
Joins on any background thread to exit. It is safe to call this method
multiple times.
"""
cur_writer = self._current_writer()
if not cur_writer or not cur_writer.isAlive():
return
cur_writer.join(timeout)
if __name__ == "__main__":
zf = EnhZipFile('c:/tmp/t.zip', 'w')
import time
w = zf.start_entry(ZipInfo('t.txt', time.localtime()[:6]))
w.write("Line1\n")
w.write("Line2\n")
w.close()
zf.finish_entry()
w = zf.start_entry(ZipInfo('p.txt', time.localtime()[:6]))
w.write("Some text\n")
w.close()
zf.close()
这意味着现在对于 zipfile.ZipFile ,我们可以使用不将整个文件存储在内存中的流 . 这样的流do not support在整个数据卷上移动 .
所以这是简单的发电机:
from zipfile import ZipFile, ZipInfo
def zipfile_generator(path, stream):
with ZipFile(stream, mode='w') as zf:
z_info = ZipInfo.from_file(path)
with open(path, 'rb') as entry, zf.open(z_info, mode='w') as dest:
for chunk in iter(lambda: entry.read(16384), b''):
dest.write(chunk)
# Yield chunk of the zip file stream in bytes.
yield stream.get()
# ZipFile was closed.
yield stream.get()
10 回答
gzip库将采用类似文件的对象进行压缩 .
您仍然需要提供一个名义文件名以包含在zip文件中,但您可以将数据源传递给fileobj .
(这个答案与Damnsweet的答案不同,因为重点应放在逐步读取的数据源上,而不是逐步写入的压缩文件 . )
我现在看到原来的提问者不会接受Gzip :-(
现在使用python 2.7,您可以将数据添加到文件的zipfile中:
http://docs.python.org/2/library/zipfile#zipfile.ZipFile.writestr
这是2017年 . 如果您仍然希望优雅地使用Python Zipstream by allanlei . 到目前为止,它可能是唯一一个写得很好的库 .
如果有人偶然发现这个问题,这在2017年仍然与Python 2.7相关,这里是一个真正的流式压缩文件的工作解决方案,不需要像其他情况那样可以搜索输出 . 秘诀是设置通用位标志的第3位(参见https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT第4.3.9.1节) .
请注意,此实现将始终创建ZIP64样式的文件,允许流式传输适用于任意大的文件 . 它包含一个丑陋的黑客来强制zip64结束中央目录记录,因此请注意它将导致您的进程编写的所有zipfiles变为ZIP64风格 .
唯一的解决方案是重写用于压缩文件以从缓冲区读取的方法 . 将它添加到标准库中是微不足道的;我有点惊讶它还没有完成 . 我认为整个界面需要彻底检查,并且这似乎阻止了任何增量改进 .
我拿了Chris B.'s answer并创建了一个完整的解决方案 . 这是以防其他人感兴趣的:
gzip.GzipFile以gzip压缩块的形式写入数据,您可以根据从文件中读取的行数来设置块的大小 .
一个例子:
基本压缩由zlib.compressobj完成 . ZipFile(在MacOSX上的Python 2.5下似乎是编译的) . Python 2.3版本如下 .
您可以看到它以8k块的形式构建压缩文件 . 取出源文件信息很复杂,因为许多源文件属性(如未压缩的大小)都记录在zip文件头中 .
一些(许多?大多数?)压缩算法基于查看整个文件中的冗余 .
一些压缩库将在几种压缩算法之间进行选择,这些压缩算法最适合文件 .
我相信ZipFile模块可以做到这一点,所以它希望看到整个文件,而不是一次只看一块 .
因此,它不适用于生成器或文件大到加载内存 . 这可以解释Zipfile库的局限性 .
已更改 Python 3.5 (来自官方文档):已添加support以写入 unseekable 流 .
这意味着现在对于
zipfile.ZipFile
,我们可以使用不将整个文件存储在内存中的流 . 这样的流do not support在整个数据卷上移动 .所以这是简单的发电机:
path
是大文件或目录或pathlike
对象的字符串路径 .stream
是类的 unseekable 流实例(根据official docs设计):您可以在线试用此代码:https://repl.it/@IvanErgunov/zipfilegenerator
还有另一种方法可以创建没有
ZipInfo
的生成器并手动读取和分割大文件 . 您可以将queue.Queue()
对象传递给UnseekableStream()
对象,并在另一个线程中写入此队列 . 然后在当前线程中,您可以以可迭代的方式简单地从此队列中读取块 . 见docsP.S. Python Zipstream by allanlei已经过时且不可靠 . 它是在正式完成之前尝试添加对不可见流的支持 .