You can use Function Compute to automatically decompress compressed files that you upload to OSS. When a ZIP file that matches specified decompression rules is uploaded to Object Storage Service, Function Compute is automatically triggered to decompress the file. After the decompression is complete, Function Compute uploads the decompressed files to a specified directory in Object Storage Service.
Usage
Use UTF-8 or GB 2312 encoding for file and folder names to prevent garbled characters or decompression failures.
Archive or Cold Archive files must be restored before decompression.
The maximum decompression time for a single ZIP package is 2 hours. Any task exceeding this limit will fail.
To prevent decompression failure, ensure each file in a ZIP package does not exceed 1 GB.
Set the function timeout to more than 2 hours, up to a maximum of 24 hours.
Create the function in the same region as the Object Storage Service bucket.
Prerequisites
Procedure
Step 1: Create a function
Log in to the Function Compute console, and in the left-side navigation pane, select Function Management > Functions.
In the top navigation bar, select a region. On the Functions page, click Create Function.
On the Create Function page, select a creation method, configure the following settings, and click Create.
This section describes the key settings. For other settings, see Create a function.
Runtime: Select Python 3.10.
Function Role: Select an existing role or create a new one. Ensure the role has permissions to access the OSS bucket by attaching the AliyunOSSFullAccess policy.
On the Function Details page, select the Code tab, create a file, enter your code, and click Deploy.
The sample code is as follows. This sample code supports one-click deployment. Click the button on the right to deploy it to Function Compute without manually creating a trigger.start-unzip-oss-v3
index.pyfileSample code
# -*- coding: utf-8 -*- """ NOTE: This function handles character encoding for file and folder names within ZIP archives. The encoding varies depending on the operating system used to create the archive. - For archives created on macOS or Linux, filenames are typically encoded in UTF-8. - For archives created on Windows, filenames are often encoded in GB2312 but can also be UTF-8. To handle these variations, this function uses the 'chardet' library to automatically detect the encoding. However, detection is not guaranteed to be 100% accurate. If you encounter issues with garbled filenames after decompression, you may need to modify the encoding logic in this function and redeploy it. """ import helper import oss2 import json import os import time import logging import chardet """ This function runs when a .zip file is uploaded to a specified source prefix in an OSS bucket. It decompresses the file and uploads the contents to a destination prefix. For example, a file uploaded as `source/archive.zip` will be decompressed to `processed/archive/`. The source and destination prefixes can be configured using environment variables. """ # Suppress the info logs printed by the OSS SDK. logging.getLogger("oss2.api").setLevel(logging.ERROR) logging.getLogger("oss2.auth").setLevel(logging.ERROR) LOGGER = logging.getLogger() # Prints the execution time of a function. def print_excute_time(func): def wrapper(*args, **kwargs): local_time = time.time() ret = func(*args, **kwargs) LOGGER.info('current Function [%s] excute time is %.2f' % (func.__name__, time.time() - local_time)) return ret return wrapper def get_zipfile_name(origin_name): # Handles potential garbled text in filenames, especially for non-UTF-8 encodings. name = origin_name try: name_bytes = origin_name.encode(encoding="cp437") except: name_bytes = origin_name.encode(encoding="utf-8") # Detection is more accurate for longer strings. detect = chardet.detect(name_bytes) confidence = detect["confidence"] detect_encoding = detect["encoding"] if confidence > 0.75 and (detect_encoding.lower() in ["gb2312", "gbk", "gb18030", "ascii", "utf-8"]): try: if detect_encoding.lower() in ["gb2312", "gbk", "gb18030"]: detect_encoding = "gb18030" name = name_bytes.decode(detect_encoding) except: name = name_bytes.decode(encoding="gb18030") else: try: name = name_bytes.decode(encoding="gb18030") except: name = name_bytes.decode(encoding="utf-8") # Replace Windows-style backslashes with forward slashes for path consistency. name = name.replace("\\", "/") return name @print_excute_time def handler(event, context): """ The main handler function executed by Function Compute. It automatically decompresses a .zip object from an OSS bucket. :param event: A JSON string representing the OSS event, which includes the object's key and other metadata. :param context: The function context, which provides credential and runtime information. """ evt_lst = json.loads(event) creds = context.credentials auth = oss2.StsAuth( creds.access_key_id, creds.access_key_secret, creds.security_token) evt = evt_lst['events'][0] bucket_name = evt['oss']['bucket']['name'] endpoint = 'oss-' + evt['region'] + '-internal.aliyuncs.com' bucket = oss2.Bucket(auth, endpoint, bucket_name) object_name = evt['oss']['object']['key'] if "ObjectCreated:PutSymlink" == evt['eventName']: object_name = bucket.get_symlink(object_name).target_key if not object_name: raise RuntimeError('{} is an invalid symlink file'.format( evt['oss']['object']['key'])) file_type = os.path.splitext(object_name)[1] if file_type != ".zip": raise RuntimeError('{} is not a .zip file'.format(object_name)) LOGGER.info("Starting to decompress zip file: {}".format(object_name)) lst = object_name.split("/") zip_name = lst[-1] PROCESSED_DIR = os.environ.get("PROCESSED_DIR", "") RETAIN_FILE_NAME = os.environ.get("RETAIN_FILE_NAME", "") if PROCESSED_DIR and PROCESSED_DIR[-1] != "/": PROCESSED_DIR += "/" if RETAIN_FILE_NAME == "false": newKey = PROCESSED_DIR else: newKey = PROCESSED_DIR + zip_name zip_fp = helper.OssStreamFileLikeObject(bucket, object_name) newKey = newKey.replace(".zip", "/") with helper.zipfile_support_oss.ZipFile(zip_fp) as zip_file: for name in zip_file.namelist(): with zip_file.open(name) as file_obj: name = get_zipfile_name(name) bucket.put_object(newKey + name, file_obj)helper.pyfileSample code
# -*- coding: utf-8 -*- import oss2 from oss2 import utils, models import ossZipfile as zipfile zipfile_support_oss = zipfile # Supports uploading to OSS as a file-like object. def make_crc_adapter(data, init_crc=0): data = utils.to_bytes(data) # file-like object if hasattr(data, 'read'): return utils._FileLikeAdapter(data, crc_callback=utils.Crc64(init_crc)) utils.make_crc_adapter = make_crc_adapter class OssStreamFileLikeObject(object): def __init__(self, bucket, key): super(OssStreamFileLikeObject, self).__init__() self._bucket = bucket self._key = key self._meta_data = self._bucket.get_object_meta(self._key) @property def bucket(self): return self._bucket @property def key(self): return self._key @property def filesize(self): return self._meta_data.content_length def get_reader(self, begin, end): begin = begin if begin >= 0 else 0 end = end if end > 0 else self.filesize - 1 end = end if end < self.filesize else self.filesize - 1 begin = begin if begin < end else end return self._bucket.get_object(self._key, byte_range=(begin, end)) def get_content_bytes(self, begin, end): reader = self.get_reader(begin, end) return reader.read() def get_last_content_bytes(self, offset): return self.get_content_bytes(self.filesize-offset, self.filesize-1)ossZipfile.pySample code
""" Read and write ZIP files. XXX references to UTF-8 need further investigation. """ import io import os import importlib.util import sys import time import stat import shutil import struct import binascii import threading try: import zlib # We may need its compression method crc32 = zlib.crc32 except ImportError: zlib = None crc32 = binascii.crc32 try: import bz2 # We may need its compression method except ImportError: bz2 = None try: import lzma # We may need its compression method except ImportError: lzma = None __all__ = ["BadZipFile", "BadZipfile", "error", "ZIP_STORED", "ZIP_DEFLATED", "ZIP_BZIP2", "ZIP_LZMA", "is_zipfile", "ZipInfo", "ZipFile", "PyZipFile", "LargeZipFile"] class BadZipFile(Exception): pass class LargeZipFile(Exception): """ Raised when writing a ZIP file that requires ZIP64 extensions, but those extensions are disabled. """ error = BadZipfile = BadZipFile # Pre-3.2 compatibility names ZIP64_LIMIT = (1 << 31) - 1 ZIP_FILECOUNT_LIMIT = (1 << 16) - 1 ZIP_MAX_COMMENT = (1 << 16) - 1 # Constants for ZIP file compression methods ZIP_STORED = 0 ZIP_DEFLATED = 8 ZIP_BZIP2 = 12 ZIP_LZMA = 14 # Other ZIP compression methods are not supported DEFAULT_VERSION = 20 ZIP64_VERSION = 45 BZIP2_VERSION = 46 LZMA_VERSION = 63 # We recognize (but not necessarily support) all features up to that version MAX_EXTRACT_VERSION = 63 # Below are some formats and associated data for reading/writing headers using # the struct module. The names and structures of headers/records are those used # in the PKWARE description of the ZIP file format: # http://www.pkware.com/documents/casestudies/APPNOTE.TXT # (URL valid as of January 2008) # The "end of central directory" structure, magic number, size, and indices # (section V.I in the format document) structEndArchive = b"<4s4H2LH" stringEndArchive = b"PK\005\006" sizeEndCentDir = struct.calcsize(structEndArchive) _ECD_SIGNATURE = 0 _ECD_DISK_NUMBER = 1 _ECD_DISK_START = 2 _ECD_ENTRIES_THIS_DISK = 3 _ECD_ENTRIES_TOTAL = 4 _ECD_SIZE = 5 _ECD_OFFSET = 6 _ECD_COMMENT_SIZE = 7 # These last two indices are not part of the structure as defined in the # spec, but they are used internally by this module as a convenience _ECD_COMMENT = 8 _ECD_LOCATION = 9 # The "central directory" structure, magic number, size, and indices # of entries in the structure (section V.F in the format document) structCentralDir = "<4s4B4HL2L5H2L" stringCentralDir = b"PK\001\002" sizeCentralDir = struct.calcsize(structCentralDir) # Indexes of entries in the central directory structure _CD_SIGNATURE = 0 _CD_CREATE_VERSION = 1 _CD_CREATE_SYSTEM = 2 _CD_EXTRACT_VERSION = 3 _CD_EXTRACT_SYSTEM = 4 _CD_FLAG_BITS = 5 _CD_COMPRESS_TYPE = 6 _CD_TIME = 7 _CD_DATE = 8 _CD_CRC = 9 _CD_COMPRESSED_SIZE = 10 _CD_UNCOMPRESSED_SIZE = 11 _CD_FILENAME_LENGTH = 12 _CD_EXTRA_FIELD_LENGTH = 13 _CD_COMMENT_LENGTH = 14 _CD_DISK_NUMBER_START = 15 _CD_INTERNAL_FILE_ATTRIBUTES = 16 _CD_EXTERNAL_FILE_ATTRIBUTES = 17 _CD_LOCAL_HEADER_OFFSET = 18 # The "local file header" structure, magic number, size, and indices # (section V.A in the format document) structFileHeader = "<4s2B4HL2L2H" stringFileHeader = b"PK\003\004" sizeFileHeader = struct.calcsize(structFileHeader) _FH_SIGNATURE = 0 _FH_EXTRACT_VERSION = 1 _FH_EXTRACT_SYSTEM = 2 _FH_GENERAL_PURPOSE_FLAG_BITS = 3 _FH_COMPRESSION_METHOD = 4 _FH_LAST_MOD_TIME = 5 _FH_LAST_MOD_DATE = 6 _FH_CRC = 7 _FH_COMPRESSED_SIZE = 8 _FH_UNCOMPRESSED_SIZE = 9 _FH_FILENAME_LENGTH = 10 _FH_EXTRA_FIELD_LENGTH = 11 # The "Zip64 end of central directory locator" structure, magic number, and size structEndArchive64Locator = "<4sLQL" stringEndArchive64Locator = b"PK\x06\x07" sizeEndCentDir64Locator = struct.calcsize(structEndArchive64Locator) # The "Zip64 end of central directory" record, magic number, size, and indices # (section V.G in the format document) structEndArchive64 = "<4sQ2H2L4Q" stringEndArchive64 = b"PK\x06\x06" sizeEndCentDir64 = struct.calcsize(structEndArchive64) _CD64_SIGNATURE = 0 _CD64_DIRECTORY_RECSIZE = 1 _CD64_CREATE_VERSION = 2 _CD64_EXTRACT_VERSION = 3 _CD64_DISK_NUMBER = 4 _CD64_DISK_NUMBER_START = 5 _CD64_NUMBER_ENTRIES_THIS_DISK = 6 _CD64_NUMBER_ENTRIES_TOTAL = 7 _CD64_DIRECTORY_SIZE = 8 _CD64_OFFSET_START_CENTDIR = 9 def _check_zipfile(fp): try: if _EndRecData(fp): return True # file has correct magic number except OSError: pass return False def is_zipfile(filename): """Quickly determine if a file is a ZIP file by checking the magic number. The filename argument may be a file or a file-like object. """ result = False try: if hasattr(filename, "read"): result = _check_zipfile(fp=filename) else: with open(filename, "rb") as fp: result = _check_zipfile(fp) except OSError: pass return result def _EndRecData64(fpin, offset, endrec): """ Read the ZIP64 end-of-archive records and use them to update endrec. """ if hasattr(fpin, 'bucket'): data = fpin.get_content_bytes( fpin.filesize+offset-sizeEndCentDir64Locator, fpin.filesize+offset-1) else: try: fpin.seek(offset - sizeEndCentDir64Locator, 2) except OSError: # If the seek fails, the file is not large enough to contain a ZIP64 # end-of-archive record, so just return the end record we were given. return endrec data = fpin.read(sizeEndCentDir64Locator) if len(data) != sizeEndCentDir64Locator: return endrec sig, diskno, reloff, disks = struct.unpack(structEndArchive64Locator, data) if sig != stringEndArchive64Locator: return endrec if diskno != 0 or disks != 1: raise BadZipFile("ZIP files that span multiple disks are not supported") # Assume no 'zip64 extensible data' if hasattr(fpin, 'bucket'): data = fpin.get_content_bytes( fpin.filesize + offset - sizeEndCentDir64Locator - sizeEndCentDir64, fpin.filesize+offset-sizeEndCentDir64Locator-1) else: fpin.seek(offset - sizeEndCentDir64Locator - sizeEndCentDir64, 2) data = fpin.read(sizeEndCentDir64) if len(data) != sizeEndCentDir64: return endrec sig, sz, create_version, read_version, disk_num, disk_dir, \ dircount, dircount2, dirsize, diroffset = \ struct.unpack(structEndArchive64, data) if sig != stringEndArchive64: return endrec # Update the original endrec using data from the ZIP64 record endrec[_ECD_SIGNATURE] = sig endrec[_ECD_DISK_NUMBER] = disk_num endrec[_ECD_DISK_START] = disk_dir endrec[_ECD_ENTRIES_THIS_DISK] = dircount endrec[_ECD_ENTRIES_TOTAL] = dircount2 endrec[_ECD_SIZE] = dirsize endrec[_ECD_OFFSET] = diroffset return endrec def _EndRecData(fpin): """Return data from the "End of Central Directory" record, or None. The data is a list of the nine items in the ZIP "End of Central Directory" record followed by a tenth item, the file seek offset of this record.""" # Determine file size if hasattr(fpin, 'bucket'): filesize = fpin.filesize data = fpin.get_last_content_bytes(sizeEndCentDir) else: fpin.seek(0, 2) filesize = fpin.tell() # Check to see if this is a ZIP file with no archive comment (the # "end of central directory" structure should be the last item in the # file if this is the case). try: fpin.seek(-sizeEndCentDir, 2) except OSError: return None data = fpin.read() if (len(data) == sizeEndCentDir and data[0:4] == stringEndArchive and data[-2:] == b"\000\000"): # The signature is correct and there's no comment; unpack the structure endrec = struct.unpack(structEndArchive, data) endrec = list(endrec) # Append a blank comment and record start offset endrec.append(b"") endrec.append(filesize - sizeEndCentDir) # Try to read the "Zip64 end of central directory" structure return _EndRecData64(fpin, -sizeEndCentDir, endrec) # Either this is not a ZIP file, or it is a ZIP file with an archive # comment. Search the end of the file for the "end of central directory" # record signature. The comment is the last item in the ZIP file and may be # up to 64K long. It is assumed that the "end of central directory" magic # number does not appear in the comment. maxCommentStart = max(filesize - (1 << 16) - sizeEndCentDir, 0) if hasattr(fpin, 'bucket'): data = fpin.get_content_bytes(maxCommentStart, -1) else: fpin.seek(maxCommentStart, 0) data = fpin.read() start = data.rfind(stringEndArchive) if start >= 0: # Found the magic number; attempt to unpack and interpret recData = data[start:start+sizeEndCentDir] if len(recData) != sizeEndCentDir: # ZIP file is corrupted. return None endrec = list(struct.unpack(structEndArchive, recData)) commentSize = endrec[_ECD_COMMENT_SIZE] # as claimed by the ZIP file comment = data[start+sizeEndCentDir:start+sizeEndCentDir+commentSize] endrec.append(comment) endrec.append(maxCommentStart + start) # Try to read the "Zip64 end of central directory" structure return _EndRecData64(fpin, maxCommentStart + start - filesize, endrec) # Unable to find a valid end of central directory structure return None class ZipInfo (object): """Class with attributes describing each file in a ZIP archive.""" __slots__ = ( 'orig_filename', 'filename', 'date_time', 'compress_type', 'comment', 'extra', 'create_system', 'create_version', 'extract_version', 'reserved', 'flag_bits', 'volume', 'internal_attr', 'external_attr', 'header_offset', 'CRC', 'compress_size', 'file_size', '_raw_time', ) def __init__(self, filename="NoName", date_time=(1980, 1, 1, 0, 0, 0)): self.orig_filename = filename # Original file name in archive # Terminate the file name at the first null byte. Null bytes in file # names are used as tricks by viruses in archives. null_byte = filename.find(chr(0)) if null_byte >= 0: filename = filename[0:null_byte] # This is used to ensure paths in generated ZIP files always use # forward slashes as the directory separator, as required by the # ZIP format specification. if os.sep != "/" and os.sep in filename: filename = filename.replace(os.sep, "/") self.filename = filename # Normalized file name self.date_time = date_time # year, month, day, hour, min, sec if date_time[0] < 1980: raise ValueError('ZIP does not support timestamps before 1980') # Standard values: self.compress_type = ZIP_STORED # Type of compression for the file self.comment = b"" # Comment for each file self.extra = b"" # ZIP extra data if sys.platform == 'win32': self.create_system = 0 # System that created the ZIP archive else: # Assume everything else is unix-y self.create_system = 3 # System that created the ZIP archive self.create_version = DEFAULT_VERSION # Version that created the ZIP archive self.extract_version = DEFAULT_VERSION # Version needed to extract the archive self.reserved = 0 # Must be zero self.flag_bits = 0 # ZIP flag bits self.volume = 0 # Volume number of file header self.internal_attr = 0 # Internal attributes self.external_attr = 0 # External file attributes # Other attributes are set by class ZipFile: # header_offset Byte offset to the file header # CRC CRC-32 of the uncompressed file # compress_size Size of the compressed file # file_size Size of the uncompressed file def __repr__(self): result = ['<%s filename=%r' % (self.__class__.__name__, self.filename)] if self.compress_type != ZIP_STORED: result.append(' compress_type=%s' % compressor_names.get(self.compress_type, self.compress_type)) hi = self.external_attr >> 16 lo = self.external_attr & 0xFFFF if hi: result.append(' filemode=%r' % stat.filemode(hi)) if lo: result.append(' external_attr=%#x' % lo) isdir = self.is_dir() if not isdir or self.file_size: result.append(' file_size=%r' % self.file_size) if ((not isdir or self.compress_size) and (self.compress_type != ZIP_STORED or self.file_size != self.compress_size)): result.append(' compress_size=%r' % self.compress_size) result.append('>') return ''.join(result) def FileHeader(self, zip64=None): """Return the per-file header as a string.""" dt = self.date_time dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2] dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2) if self.flag_bits & 0x08: # Set these to zero because we write them after the file data CRC = compress_size = file_size = 0 else: CRC = self.CRC compress_size = self.compress_size file_size = self.file_size extra = self.extra min_version = 0 if zip64 is None: zip64 = file_size > ZIP64_LIMIT or compress_size > ZIP64_LIMIT if zip64: fmt = ' ZIP64_LIMIT or compress_size > ZIP64_LIMIT: if not zip64: raise LargeZipFile("File size would require ZIP64 extensions") # File is larger than what fits into a 4-byte integer; # fall back to the ZIP64 extension. file_size = 0xffffffff compress_size = 0xffffffff min_version = ZIP64_VERSION if self.compress_type == ZIP_BZIP2: min_version = max(BZIP2_VERSION, min_version) elif self.compress_type == ZIP_LZMA: min_version = max(LZMA_VERSION, min_version) self.extract_version = max(min_version, self.extract_version) self.create_version = max(min_version, self.create_version) filename, flag_bits = self._encodeFilenameFlags() header = struct.pack(structFileHeader, stringFileHeader, self.extract_version, self.reserved, flag_bits, self.compress_type, dostime, dosdate, CRC, compress_size, file_size, len(filename), len(extra)) return header + filename + extra def _encodeFilenameFlags(self): try: return self.filename.encode('ascii'), self.flag_bits except UnicodeEncodeError: return self.filename.encode('utf-8'), self.flag_bits | 0x800 def _decodeExtra(self): # Try to decode the extra field. extra = self.extra unpack = struct.unpack while len(extra) >= 4: tp, ln = unpack(' len(extra): raise BadZipFile( "Corrupt extra field %04x (size=%d)" % (tp, ln)) if tp == 0x0001: if ln >= 24: counts = unpack('> 1) ^ 0xEDB88320 else: crc >>= 1 return crc # ZIP supports a password-based form of encryption. Even though known # plaintext attacks have been found against it, it is still useful # to be able to get data out of such a file. # # Usage: # zd = _ZipDecrypter(mypwd) # plain_bytes = zd(cipher_bytes) def _ZipDecrypter(pwd): key0 = 305419896 key1 = 591751049 key2 = 878082192 global _crctable if _crctable is None: _crctable = list(map(_gen_crc, range(256))) crctable = _crctable def crc32(ch, crc): """Compute the CRC32 primitive on one byte.""" return (crc >> 8) ^ crctable[(crc ^ ch) & 0xFF] def update_keys(c): nonlocal key0, key1, key2 key0 = crc32(c, key0) key1 = (key1 + (key0 & 0xFF)) & 0xFFFFFFFF key1 = (key1 * 134775813 + 1) & 0xFFFFFFFF key2 = crc32(key1 >> 24, key2) for p in pwd: update_keys(p) def decrypter(data): """Decrypt a bytes object.""" result = bytearray() append = result.append for c in data: k = key2 | 2 c ^= ((k * (k ^ 1)) >> 8) & 0xFF update_keys(c) append(c) return bytes(result) return decrypter class LZMACompressor: def __init__(self): self._comp = None def _init(self): props = lzma._encode_filter_properties({'id': lzma.FILTER_LZMA1}) self._comp = lzma.LZMACompressor(lzma.FORMAT_RAW, filters=[ lzma._decode_filter_properties(lzma.FILTER_LZMA1, props) ]) return struct.pack('<< 31 - 1 # Read from compressed files in 4k blocks. MIN_READ_SIZE = 4096 def __init__(self, fileobj, mode, zipinfo, decrypter=None, close_fileobj=False): self._fileobj = fileobj self._decrypter = decrypter self._close_fileobj = close_fileobj self._compress_type = zipinfo.compress_type self._compress_left = zipinfo.compress_size self._left = zipinfo.file_size self._decompressor = _get_decompressor(self._compress_type) self._eof = False self._readbuffer = b'' self._offset = 0 self.newlines = None # Adjust read size for encrypted files, since the first 12 bytes # are for the encryption/password information. if self._decrypter is not None: self._compress_left -= 12 self.mode = mode self.name = zipinfo.filename if hasattr(zipinfo, 'CRC'): self._expected_crc = zipinfo.CRC self._running_crc = crc32(b'') else: self._expected_crc = None def __repr__(self): result = ['<%s.%s' % (self.__class__.__module__, self.__class__.__qualname__)] if not self.closed: result.append(' name=%r mode=%r' % (self.name, self.mode)) if self._compress_type != ZIP_STORED: result.append(' compress_type=%s' % compressor_names.get(self._compress_type, self._compress_type)) else: result.append(' [closed]') result.append('>') return ''.join(result) def readline(self, limit=-1): """Read and return a line from the stream. If limit is specified, at most limit bytes will be read. """ if limit < 0: # Shortcut for common case: newline found in buffer. i = self._readbuffer.find(b'\n', self._offset) + 1 if i > 0: line = self._readbuffer[self._offset: i] self._offset = i return line return io.BufferedIOBase.readline(self, limit) def peek(self, n=1): """Return buffered bytes without advancing the position.""" if n > len(self._readbuffer) - self._offset: chunk = self.read(n) if len(chunk) > self._offset: self._readbuffer = chunk + self._readbuffer[self._offset:] self._offset = 0 else: self._offset -= len(chunk) # Return up to 512 bytes to reduce allocation overhead for tight loops. return self._readbuffer[self._offset: self._offset + 512] def readable(self): return True def read(self, n=-1): """Read and return up to n bytes. If the argument is omitted, None, or negative, data is read and returned until EOF is reached. """ if n is None or n < 0: buf = self._readbuffer[self._offset:] self._readbuffer = b'' self._offset = 0 while not self._eof: buf += self._read1(self.MAX_N) return buf end = n + self._offset if end < len(self._readbuffer): buf = self._readbuffer[self._offset:end] self._offset = end return buf n = end - len(self._readbuffer) buf = self._readbuffer[self._offset:] self._readbuffer = b'' self._offset = 0 while n > 0 and not self._eof: data = self._read1(n) if n < len(data): self._readbuffer = data self._offset = n buf += data[:n] break buf += data n -= len(data) return buf def _update_crc(self, newdata): # Update the CRC using the given data. if self._expected_crc is None: # No need to compute the CRC if we don't have a reference value. return self._running_crc = crc32(newdata, self._running_crc) # Check the CRC if we're at the end of the file. if self._eof and self._running_crc != self._expected_crc: raise BadZipFile("Bad CRC-32 for file %r" % self.name) def read1(self, n): """Read up to n bytes with at most one read() system call.""" if n is None or n < 0: buf = self._readbuffer[self._offset:] self._readbuffer = b'' self._offset = 0 while not self._eof: data = self._read1(self.MAX_N) if data: buf += data break return buf end = n + self._offset if end < len(self._readbuffer): buf = self._readbuffer[self._offset:end] self._offset = end return buf n = end - len(self._readbuffer) buf = self._readbuffer[self._offset:] self._readbuffer = b'' self._offset = 0 if n > 0: while not self._eof: data = self._read1(n) if n < len(data): self._readbuffer = data self._offset = n buf += data[:n] break if data: buf += data break return buf def _read1(self, n): # Read up to n compressed bytes with at most one read() system call, # then decrypt and decompress them. if self._eof or n <= 0: return b'' # Read from file. if self._compress_type == ZIP_DEFLATED: # Handle unconsumed data. data = self._decompressor.unconsumed_tail if n > len(data): data += self._read2(n - len(data)) else: data = self._read2(n) if self._compress_type == ZIP_STORED: self._eof = self._compress_left <= 0 elif self._compress_type == ZIP_DEFLATED: n = max(n, self.MIN_READ_SIZE) data = self._decompressor.decompress(data, n) self._eof = (self._decompressor.eof or self._compress_left <= 0 and not self._decompressor.unconsumed_tail) if self._eof: data += self._decompressor.flush() else: data = self._decompressor.decompress(data) self._eof = self._decompressor.eof or self._compress_left <= 0 data = data[:self._left] self._left -= len(data) if self._left <= 0: self._eof = True self._update_crc(data) return data def _read2(self, n): if self._compress_left <= 0: return b'' n = max(n, self.MIN_READ_SIZE) n = min(n, self._compress_left) data = self._fileobj.read(n) self._compress_left -= len(data) if not data: raise EOFError if self._decrypter is not None: data = self._decrypter(data) return data def close(self): try: if self._close_fileobj: self._fileobj.close() finally: super().close() class _ZipWriteFile(io.BufferedIOBase): def __init__(self, zf, zinfo, zip64): self._zinfo = zinfo self._zip64 = zip64 self._zipfile = zf self._compressor = _get_compressor(zinfo.compress_type) self._file_size = 0 self._compress_size = 0 self._crc = 0 @property def _fileobj(self): return self._zipfile.fp def writable(self): return True def write(self, data): if self.closed: raise ValueError('I/O operation on closed file.') nbytes = len(data) self._file_size += nbytes self._crc = crc32(data, self._crc) if self._compressor: data = self._compressor.compress(data) self._compress_size += len(data) self._fileobj.write(data) return nbytes def close(self): if self.closed: return super().close() # Flush any data from the compressor and update header info. if self._compressor: buf = self._compressor.flush() self._compress_size += len(buf) self._fileobj.write(buf) self._zinfo.compress_size = self._compress_size else: self._zinfo.compress_size = self._file_size self._zinfo.CRC = self._crc self._zinfo.file_size = self._file_size # Write updated header info. if self._zinfo.flag_bits & 0x08: # Write CRC and file sizes after the file data fmt = ' ZIP64_LIMIT: raise RuntimeError('File size unexpectedly exceeded ZIP64 ' 'limit') if self._compress_size > ZIP64_LIMIT: raise RuntimeError('Compressed size unexpectedly exceeded ' 'ZIP64 limit') # Seek backwards and write the file header (which will now include # the correct CRC and file sizes). # Preserve current position in file. self._zipfile.start_dir = self._fileobj.tell() self._fileobj.seek(self._zinfo.header_offset) self._fileobj.write(self._zinfo.FileHeader(self._zip64)) self._fileobj.seek(self._zipfile.start_dir) self._zipfile._writing = False # Successfully written: Add file to our caches. self._zipfile.filelist.append(self._zinfo) self._zipfile.NameToInfo[self._zinfo.filename] = self._zinfo class ZipFile: """Class with methods to open, read, write, close, and list ZIP files. z = ZipFile(file, mode="r", compression=ZIP_STORED, allowZip64=True) file: Either the path to the file, or a file-like object. If it is a path, the file is opened and closed by ZipFile. mode: Can be 'r' for reading, 'w' for writing, 'x' for exclusive creation, or 'a' for appending. compression: ZIP_STORED (no compression), ZIP_DEFLATED (requires zlib), ZIP_BZIP2 (requires bz2), or ZIP_LZMA (requires lzma). allowZip64: If True, ZipFile creates files with ZIP64 extensions when needed. Otherwise, it raises an exception if ZIP64 is required. """ fp = None # Set here since __del__ checks it _windows_illegal_name_trans_table = None def __init__(self, file, mode="r", compression=ZIP_STORED, allowZip64=True): """Open a ZIP file in read ('r'), write ('w'), exclusive create ('x'), or append ('a') mode.""" if mode not in ('r', 'w', 'x', 'a'): raise ValueError("ZipFile requires mode 'r', 'w', 'x', or 'a'") _check_compression(compression) self._allowZip64 = allowZip64 self._didModify = False self.debug = 0 # Level of printing: 0 through 3 self.NameToInfo = {} # Maps file names to ZipInfo instances self.filelist = [] # List of ZipInfo instances for archive self.compression = compression # Default compression method self.mode = mode self.pwd = None self._comment = b'' # Check if we were passed a file-like object if isinstance(file, os.PathLike): file = os.fspath(file) if isinstance(file, str): # It's a filename self._filePassed = 0 self.filename = file modeDict = {'r': 'rb', 'w': 'w+b', 'x': 'x+b', 'a': 'r+b', 'r+b': 'w+b', 'w+b': 'wb', 'x+b': 'xb'} filemode = modeDict[mode] while True: try: self.fp = io.open(file, filemode) except OSError: if filemode in modeDict: filemode = modeDict[filemode] continue raise break else: self._filePassed = 1 self.fp = file self.filename = getattr(file, 'name', None) self._fileRefCnt = 1 self._lock = threading.RLock() self._seekable = True self._writing = False try: if mode == 'r': self._RealGetContents() elif mode in ('w', 'x'): # Set the modified flag so the central directory gets written # even if no files are added to the archive. self._didModify = True try: self.start_dir = self.fp.tell() except (AttributeError, OSError): self.fp = _Tellable(self.fp) self.start_dir = 0 self._seekable = False else: # Some file-like objects can provide tell() but not seek(). try: self.fp.seek(self.start_dir) except (AttributeError, OSError): self._seekable = False elif mode == 'a': try: # See if the file is a ZIP file. self._RealGetContents() # Seek to the start of the central directory and overwrite it. self.fp.seek(self.start_dir) except BadZipFile: # File is not a ZIP file, just append to it. self.fp.seek(0, 2) # Set the modified flag so the central directory gets written # even if no files are added to the archive. self._didModify = True self.start_dir = self.fp.tell() else: raise ValueError("Mode must be 'r', 'w', 'x', or 'a'") except: fp = self.fp self.fp = None self._fpclose(fp) raise def __enter__(self): return self def __exit__(self, type, value, traceback): self.close() def __repr__(self): result = ['<%s.%s' % (self.__class__.__module__, self.__class__.__qualname__)] if self.fp is not None: if self._filePassed: result.append(' file=%r' % self.fp) elif self.filename is not None: result.append(' filename=%r' % self.filename) result.append(' mode=%r' % self.mode) else: result.append(' [closed]') result.append('>') return ''.join(result) def _RealGetContents(self): """Read the table of contents from the ZIP file.""" fp = self.fp try: endrec = _EndRecData(fp) except OSError: raise BadZipFile("File is not a ZIP file") if not endrec: raise BadZipFile("File is not a ZIP file") if self.debug > 1: print(endrec) size_cd = endrec[_ECD_SIZE] # Bytes in central directory offset_cd = endrec[_ECD_OFFSET] # Offset of central directory self._comment = endrec[_ECD_COMMENT] # Archive comment # "concat" is zero, unless the ZIP file was concatenated to another file. concat = endrec[_ECD_LOCATION] - size_cd - offset_cd if endrec[_ECD_SIGNATURE] == stringEndArchive64: # If ZIP64 extension structures are present, account for them. concat -= (sizeEndCentDir64 + sizeEndCentDir64Locator) if self.debug > 2: inferred = concat + offset_cd print("given, inferred, offset", offset_cd, inferred, concat) # self.start_dir: Position of the start of the central directory. self.start_dir = offset_cd + concat if hasattr(fp, "bucket"): data = fp.get_content_bytes( self.start_dir, self.start_dir+size_cd-1) else: fp.seek(self.start_dir, 0) data = fp.read(size_cd) fp = io.BytesIO(data) total = 0 while total < size_cd: centdir = fp.read(sizeCentralDir) if len(centdir) != sizeCentralDir: raise BadZipFile("Truncated central directory") centdir = struct.unpack(structCentralDir, centdir) if centdir[_CD_SIGNATURE] != stringCentralDir: raise BadZipFile("Bad magic number for central directory") if self.debug > 2: print(centdir) filename = fp.read(centdir[_CD_FILENAME_LENGTH]) flags = centdir[5] if flags & 0x800: # UTF-8 file names extension filename = filename.decode('utf-8') else: # Historical ZIP filename encoding filename = filename.decode('cp437') # Create a ZipInfo instance to store file information. x = ZipInfo(filename) x.extra = fp.read(centdir[_CD_EXTRA_FIELD_LENGTH]) x.comment = fp.read(centdir[_CD_COMMENT_LENGTH]) x.header_offset = centdir[_CD_LOCAL_HEADER_OFFSET] (x.create_version, x.create_system, x.extract_version, x.reserved, x.flag_bits, x.compress_type, t, d, x.CRC, x.compress_size, x.file_size) = centdir[1:12] if x.extract_version > MAX_EXTRACT_VERSION: raise NotImplementedError("ZIP file version %.1f" % (x.extract_version / 10)) x.volume, x.internal_attr, x.external_attr = centdir[15:18] # Convert date/time code to (year, month, day, hour, min, sec). x._raw_time = t x.date_time = ((d >> 9)+1980, (d >> 5) & 0xF, d & 0x1F, t >> 11, (t >> 5) & 0x3F, (t & 0x1F) * 2) x._decodeExtra() x.header_offset = x.header_offset + concat self.filelist.append(x) self.NameToInfo[x.filename] = x # Update total bytes read from the central directory. total = (total + sizeCentralDir + centdir[_CD_FILENAME_LENGTH] + centdir[_CD_EXTRA_FIELD_LENGTH] + centdir[_CD_COMMENT_LENGTH]) if self.debug > 2: print("total", total) def namelist(self): """Return a list of file names in the archive.""" return [data.filename for data in self.filelist] def infolist(self): """Return a list of ZipInfo instances for files in the archive.""" return self.filelist def printdir(self, file=None): """Print a table of contents for the ZIP file.""" print("%-46s %19s %12s" % ("File Name", "Modified ", "Size"), file=file) for zinfo in self.filelist: date = "%d-%02d-%02d %02d:%02d:%02d" % zinfo.date_time[:6] print("%-46s %s %12d" % (zinfo.filename, date, zinfo.file_size), file=file) def testzip(self): """Read all the files and check their CRC.""" chunk_size = 2 ** 20 for zinfo in self.filelist: try: # Read by chunks to avoid an OverflowError or a # MemoryError with very large embedded files. with self.open(zinfo.filename, "r") as f: while f.read(chunk_size): # Check CRC-32 pass except BadZipFile: return zinfo.filename def getinfo(self, name): """Return the ZipInfo for a member `name`.""" info = self.NameToInfo.get(name) if info is None: raise KeyError( 'There is no item named %r in the archive' % name) return info def setpassword(self, pwd): """Set the default password for encrypted files.""" if pwd and not isinstance(pwd, bytes): raise TypeError("pwd: expected bytes, got %s" % type(pwd).__name__) if pwd: self.pwd = pwd else: self.pwd = None @property def comment(self): """The comment text associated with the ZIP file.""" return self._comment @comment.setter def comment(self, comment): if not isinstance(comment, bytes): raise TypeError("comment: expected bytes, got %s" % type(comment).__name__) # Check for a valid comment length. if len(comment) > ZIP_MAX_COMMENT: import warnings warnings.warn('Archive comment is too long; truncating to %d bytes' % ZIP_MAX_COMMENT, stacklevel=2) comment = comment[:ZIP_MAX_COMMENT] self._comment = comment self._didModify = True def read(self, name, pwd=None): """Return the bytes of the file `name`.""" with self.open(name, "r", pwd) as fp: return fp.read() def open(self, name, mode="r", pwd=None, *, force_zip64=False): """Return a file-like object for `name`. `name` can be a string specifying the file name within the ZIP file, or a ZipInfo object. `mode` must be 'r' to read a file from the archive, or 'w' to write a new file to the archive. `pwd` is the password to decrypt files (only used for reading). When writing, if the file size is not known in advance but may exceed 2 GiB, pass `force_zip64=True` to use the ZIP64 format, which supports large files. If the size is known in advance, it is best to pass a ZipInfo instance for `name` with `zinfo.file_size` set. """ if mode not in {"r", "w"}: raise ValueError('open() requires mode "r" or "w"') if pwd and not isinstance(pwd, bytes): raise TypeError("pwd: expected bytes, got %s" % type(pwd).__name__) if pwd and (mode == "w"): raise ValueError("pwd is only supported for reading files") if not self.fp: raise ValueError( "Attempt to use a ZIP archive that was already closed") # Make sure we have an info object if isinstance(name, ZipInfo): # 'name' is already an info object zinfo = name elif mode == 'w': zinfo = ZipInfo(name) zinfo.compress_type = self.compression else: # Get info object for name zinfo = self.getinfo(name) if mode == 'w': return self._open_to_write(zinfo, force_zip64=force_zip64) if self._writing: raise ValueError("Cannot read from a ZIP file while there " "is an open writing handle on it. " "Close the writing handle before trying to read.") # Open for reading: self._fileRefCnt += 1 zef_file = _SharedFile(self.fp, zinfo.header_offset, self._fpclose, self._lock, lambda: self._writing) try: # Skip the file header: fheader = zef_file.read(sizeFileHeader) if len(fheader) != sizeFileHeader: raise BadZipFile("Truncated file header") fheader = struct.unpack(structFileHeader, fheader) if fheader[_FH_SIGNATURE] != stringFileHeader: raise BadZipFile("Bad magic number for file header") fname = zef_file.read(fheader[_FH_FILENAME_LENGTH]) if fheader[_FH_EXTRA_FIELD_LENGTH]: zef_file.read(fheader[_FH_EXTRA_FIELD_LENGTH]) if zinfo.flag_bits & 0x20: # Zip 2.7: compressed patched data raise NotImplementedError( "compressed patched data (flag bit 5)") if zinfo.flag_bits & 0x40: # strong encryption raise NotImplementedError("strong encryption (flag bit 6)") if zinfo.flag_bits & 0x800: # UTF-8 filename fname_str = fname.decode("utf-8") else: fname_str = fname.decode("cp437") if fname_str != zinfo.orig_filename: raise BadZipFile( 'File name in directory %r and header %r differ.' % (zinfo.orig_filename, fname)) # check for encrypted flag & handle password is_encrypted = zinfo.flag_bits & 0x1 zd = None if is_encrypted: if not pwd: pwd = self.pwd if not pwd: raise RuntimeError("File %r is encrypted, password " "required for extraction" % name) zd = _ZipDecrypter(pwd) # The first 12 bytes in the cipher stream form an encryption header # used to strengthen the algorithm. The first 11 bytes are # completely random, while the 12th contains the MSB of the CRC # or the file time, depending on the header type, and is used to # check the correctness of the password. header = zef_file.read(12) h = zd(header[0:12]) if zinfo.flag_bits & 0x8: # compare against the file type from extended local headers check_byte = (zinfo._raw_time >> 8) & 0xff else: # compare against the CRC otherwise check_byte = (zinfo.CRC >> 24) & 0xff if h[11] != check_byte: raise RuntimeError("Bad password for file %r" % name) return ZipExtFile(zef_file, mode, zinfo, zd, True) except: zef_file.close() raise def _open_to_write(self, zinfo, force_zip64=False): if force_zip64 and not self._allowZip64: raise ValueError( "force_zip64 is True, but allowZip64 was False when opening " "the ZIP file." ) if self._writing: raise ValueError("Cannot write to a ZIP file while another " "write handle is open on it. " "Close the first handle before opening another.") # Sizes and CRC are overwritten with correct data after processing the file. if not hasattr(zinfo, 'file_size'): zinfo.file_size = 0 zinfo.compress_size = 0 zinfo.CRC = 0 zinfo.flag_bits = 0x00 if zinfo.compress_type == ZIP_LZMA: # Compressed data includes an end-of-stream (EOS) marker zinfo.flag_bits |= 0x02 if not self._seekable: zinfo.flag_bits |= 0x08 if not zinfo.external_attr: zinfo.external_attr = 0o600 << 16 # permissions: ?rw------- # Compressed size can be larger than uncompressed size. zip64 = self._allowZip64 and \ (force_zip64 or zinfo.file_size * 1.05 > ZIP64_LIMIT) if self._seekable: self.fp.seek(self.start_dir) zinfo.header_offset = self.fp.tell() self._writecheck(zinfo) self._didModify = True self.fp.write(zinfo.FileHeader(zip64)) self._writing = True return _ZipWriteFile(self, zinfo, zip64) def extract(self, member, path=None, pwd=None): """Extract a member from the archive to the current working directory. `member` may be a filename or a ZipInfo object. You can specify a different directory using `path`. Its file information is extracted as accurately as possible. """ if path is None: path = os.getcwd() else: path = os.fspath(path) return self._extract_member(member, path, pwd) def extractall(self, path=None, members=None, pwd=None): """Extract all members from the archive to the current working directory. `path` specifies a different directory to extract to. `members` is optional and must be a subset of the list returned by `namelist()`. """ if members is None: members = self.namelist() if path is None: path = os.getcwd() else: path = os.fspath(path) for zipinfo in members: self._extract_member(zipinfo, path, pwd) @classmethod def _sanitize_windows_name(cls, arcname, pathsep): """Replace invalid characters and remove trailing dots from parts.""" table = cls._windows_illegal_name_trans_table if not table: illegal = ':<>|"?*' table = str.maketrans(illegal, '_' * len(illegal)) cls._windows_illegal_name_trans_table = table arcname = arcname.translate(table) # remove trailing dots arcname = (x.rstrip('.') for x in arcname.split(pathsep)) # rejoin, removing empty parts arcname = pathsep.join(x for x in arcname if x) return arcname def _extract_member(self, member, targetpath, pwd): """Extract the ZipInfo object `member` to a physical file on the path `targetpath`. """ if not isinstance(member, ZipInfo): member = self.getinfo(member) # build the destination pathname, replacing # forward slashes with platform-specific separators. arcname = member.filename.replace('/', os.path.sep) if os.path.altsep: arcname = arcname.replace(os.path.altsep, os.path.sep) # interpret absolute pathname as relative, remove drive letter or # UNC path, and redundant separators, "." and ".." components. arcname = os.path.splitdrive(arcname)[1] invalid_path_parts = ('', os.path.curdir, os.path.pardir) arcname = os.path.sep.join(x for x in arcname.split(os.path.sep) if x not in invalid_path_parts) if os.path.sep == '\\': # filter illegal characters on Windows arcname = self._sanitize_windows_name(arcname, os.path.sep) targetpath = os.path.join(targetpath, arcname) targetpath = os.path.normpath(targetpath) # Create all upper directories if necessary. upperdirs = os.path.dirname(targetpath) if upperdirs and not os.path.exists(upperdirs): os.makedirs(upperdirs) if member.is_dir(): if not os.path.isdir(targetpath): os.mkdir(targetpath) return targetpath with self.open(member, pwd=pwd) as source, \ open(targetpath, "wb") as target: shutil.copyfileobj(source, target) return targetpath def _writecheck(self, zinfo): """Check for errors before writing a file to the archive.""" if zinfo.filename in self.NameToInfo: import warnings warnings.warn('Duplicate name: %r' % zinfo.filename, stacklevel=3) if self.mode not in ('w', 'x', 'a'): raise ValueError("write() requires mode 'w', 'x', or 'a'") if not self.fp: raise ValueError( "Attempt to write to a ZIP archive that was already closed") _check_compression(zinfo.compress_type) if not self._allowZip64: requires_zip64 = None if len(self.filelist) >= ZIP_FILECOUNT_LIMIT: requires_zip64 = "Files count" elif zinfo.file_size > ZIP64_LIMIT: requires_zip64 = "File size" elif zinfo.header_offset > ZIP64_LIMIT: requires_zip64 = "ZIP file size" if requires_zip64: raise LargeZipFile(requires_zip64 + " would require ZIP64 extensions") def write(self, filename, arcname=None, compress_type=None): """Write the file `filename` to the archive under the name `arcname`.""" if not self.fp: raise ValueError( "Attempt to write to a ZIP archive that was already closed") if self._writing: raise ValueError( "Cannot write to a ZIP archive while an open writing handle exists" ) zinfo = ZipInfo.from_file(filename, arcname) if zinfo.is_dir(): zinfo.compress_size = 0 zinfo.CRC = 0 else: if compress_type is not None: zinfo.compress_type = compress_type else: zinfo.compress_type = self.compression if zinfo.is_dir(): with self._lock: if self._seekable: self.fp.seek(self.start_dir) zinfo.header_offset = self.fp.tell() # Start of header bytes if zinfo.compress_type == ZIP_LZMA: # Compressed data includes an end-of-stream (EOS) marker zinfo.flag_bits |= 0x02 self._writecheck(zinfo) self._didModify = True self.filelist.append(zinfo) self.NameToInfo[zinfo.filename] = zinfo self.fp.write(zinfo.FileHeader(False)) self.start_dir = self.fp.tell() else: with open(filename, "rb") as src, self.open(zinfo, 'w') as dest: shutil.copyfileobj(src, dest, 1024*8) def writestr(self, zinfo_or_arcname, data, compress_type=None): """Write a file into the archive. The content is `data`, which can be a `str` or a `bytes` instance; if it is a `str`, it is first encoded as UTF-8. `zinfo_or_arcname` is either a ZipInfo instance or the name of the file in the archive.""" if isinstance(data, str): data = data.encode("utf-8") if not isinstance(zinfo_or_arcname, ZipInfo): zinfo = ZipInfo(filename=zinfo_or_arcname, date_time=time.localtime(time.time())[:6]) zinfo.compress_type = self.compression if zinfo.filename[-1] == '/': zinfo.external_attr = 0o40775 << 16 # drwxrwxr-x zinfo.external_attr |= 0x10 # MS-DOS directory flag else: zinfo.external_attr = 0o600 << 16 # ?rw------- else: zinfo = zinfo_or_arcname if not self.fp: raise ValueError( "Attempt to write to a ZIP archive that was already closed") if self._writing: raise ValueError( "Cannot write to a ZIP archive while an open writing handle exists." ) if compress_type is not None: zinfo.compress_type = compress_type zinfo.file_size = len(data) # Uncompressed size with self._lock: with self.open(zinfo, mode='w') as dest: dest.write(data) def __del__(self): """Call the "close()" method in case the user forgot.""" self.close() def close(self): """Close the file, and for 'w', 'x', and 'a' modes, write the ending records.""" if self.fp is None: return if self._writing: raise ValueError("Cannot close the ZIP file while there is " "an open writing handle on it. " "Close the writing handle before closing the ZIP file.") try: if self.mode in ('w', 'x', 'a') and self._didModify: # write ending records with self._lock: if self._seekable: self.fp.seek(self.start_dir) self._write_end_record() finally: fp = self.fp self.fp = None self._fpclose(fp) def _write_end_record(self): for zinfo in self.filelist: # write central directory dt = zinfo.date_time dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2] dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2) extra = [] if zinfo.file_size > ZIP64_LIMIT \ or zinfo.compress_size > ZIP64_LIMIT: extra.append(zinfo.file_size) extra.append(zinfo.compress_size) file_size = 0xffffffff compress_size = 0xffffffff else: file_size = zinfo.file_size compress_size = zinfo.compress_size if zinfo.header_offset > ZIP64_LIMIT: extra.append(zinfo.header_offset) header_offset = 0xffffffff else: header_offset = zinfo.header_offset extra_data = zinfo.extra min_version = 0 if extra: # Append a ZIP64 field to the extra data extra_data = struct.pack( ' ZIP_FILECOUNT_LIMIT: requires_zip64 = "Files count" elif centDirOffset > ZIP64_LIMIT: requires_zip64 = "Central directory offset" elif centDirSize > ZIP64_LIMIT: requires_zip64 = "Central directory size" if requires_zip64: # Need to write the ZIP64 end-of-archive records if not self._allowZip64: raise LargeZipFile(requires_zip64 + " would require ZIP64 extensions") zip64endrec = struct.pack( structEndArchive64, stringEndArchive64, 44, 45, 45, 0, 0, centDirCount, centDirCount, centDirSize, centDirOffset) self.fp.write(zip64endrec) zip64locrec = struct.pack( structEndArchive64Locator, stringEndArchive64Locator, 0, pos2, 1) self.fp.write(zip64locrec) centDirCount = min(centDirCount, 0xFFFF) centDirSize = min(centDirSize, 0xFFFFFFFF) centDirOffset = min(centDirOffset, 0xFFFFFFFF) endrec = struct.pack(structEndArchive, stringEndArchive, 0, 0, centDirCount, centDirCount, centDirSize, centDirOffset, len(self._comment)) self.fp.write(endrec) self.fp.write(self._comment) self.fp.flush() def _fpclose(self, fp): assert self._fileRefCnt > 0 self._fileRefCnt -= 1 if not self._fileRefCnt and not self._filePassed: fp.close() class PyZipFile(ZipFile): """Class to create ZIP archives with Python library files and packages.""" def __init__(self, file, mode="r", compression=ZIP_STORED, allowZip64=True, optimize=-1): ZipFile.__init__(self, file, mode=mode, compression=compression, allowZip64=allowZip64) self._optimize = optimize def writepy(self, pathname, basename="", filterfunc=None): """Add all files from `pathname` to the ZIP archive. If `pathname` is a package directory, recursively search the directory and all subdirectories for *.py files and add the modules to the archive. If `pathname` is a plain directory, add all *.py files from the directory. If `pathname` is a Python *.py file, this method adds the module to the archive. This method always stores added modules as `module.pyc`. This method will compile the `module.py` into `module.pyc` if necessary. If `filterfunc(pathname)` is provided, it is called for every path. This method skips the file or directory if `filterfunc` returns False. """ pathname = os.fspath(pathname) if filterfunc and not filterfunc(pathname): if self.debug: label = 'path' if os.path.isdir(pathname) else 'file' print('%s %r skipped by filterfunc' % (label, pathname)) return dir, name = os.path.split(pathname) if os.path.isdir(pathname): initname = os.path.join(pathname, "__init__.py") if os.path.isfile(initname): # This is a package directory, add it if basename: basename = "%s/%s" % (basename, name) else: basename = name if self.debug: print("Adding package in", pathname, "as", basename) fname, arcname = self._get_codename(initname[0:-3], basename) if self.debug: print("Adding", arcname) self.write(fname, arcname) dirlist = os.listdir(pathname) dirlist.remove("__init__.py") # Add all *.py files and package subdirectories for filename in dirlist: path = os.path.join(pathname, filename) root, ext = os.path.splitext(filename) if os.path.isdir(path): if os.path.isfile(os.path.join(path, "__init__.py")): # This is a package directory, add it self.writepy(path, basename, filterfunc=filterfunc) # Recursive call elif ext == ".py": if filterfunc and not filterfunc(path): if self.debug: print('file %r skipped by filterfunc' % path) continue fname, arcname = self._get_codename(path[0:-3], basename) if self.debug: print("Adding", arcname) self.write(fname, arcname) else: # This is NOT a package directory; add its files at the top level if self.debug: print("Adding files from directory", pathname) for filename in os.listdir(pathname): path = os.path.join(pathname, filename) root, ext = os.path.splitext(filename) if ext == ".py": if filterfunc and not filterfunc(path): if self.debug: print('file %r skipped by filterfunc' % path) continue fname, arcname = self._get_codename(path[0:-3], basename) if self.debug: print("Adding", arcname) self.write(fname, arcname) else: if pathname[-3:] != ".py": raise RuntimeError( 'Files added with writepy() must end with ".py"') fname, arcname = self._get_codename(pathname[0:-3], basename) if self.debug: print("Adding file", arcname) self.write(fname, arcname) def _get_codename(self, pathname, basename): """Return (filename, archivename) for the given path. Given a module path, return the correct file path and archive name, compiling if necessary. For example, given `/python/lib/string`, return (`/python/lib/string.pyc`, `string`). """ def _compile(file, optimize=-1): import py_compile if self.debug: print("Compiling", file) try: py_compile.compile(file, doraise=True, optimize=optimize) except py_compile.PyCompileError as err: print(err.msg) return False return True file_py = pathname + ".py" file_pyc = pathname + ".pyc" pycache_opt0 = importlib.util.cache_from_source( file_py, optimization='') pycache_opt1 = importlib.util.cache_from_source( file_py, optimization=1) pycache_opt2 = importlib.util.cache_from_source( file_py, optimization=2) if self._optimize == -1: # legacy mode: use whatever file is present if (os.path.isfile(file_pyc) and os.stat(file_pyc).st_mtime >= os.stat(file_py).st_mtime): # Use .pyc file. arcname = fname = file_pyc elif (os.path.isfile(pycache_opt0) and os.stat(pycache_opt0).st_mtime >= os.stat(file_py).st_mtime): # Use the __pycache__/*.pyc file, but write it to the legacy pyc # file name in the archive. fname = pycache_opt0 arcname = file_pyc elif (os.path.isfile(pycache_opt1) and os.stat(pycache_opt1).st_mtime >= os.stat(file_py).st_mtime): # Use the __pycache__/*.pyc file, but write it to the legacy pyc # file name in the archive. fname = pycache_opt1 arcname = file_pyc elif (os.path.isfile(pycache_opt2) and os.stat(pycache_opt2).st_mtime >= os.stat(file_py).st_mtime): # Use the __pycache__/*.pyc file, but write it to the legacy pyc # file name in the archive. fname = pycache_opt2 arcname = file_pyc else: # Compile .py into a PEP 3147 .pyc file. if _compile(file_py): if sys.flags.optimize == 0: fname = pycache_opt0 elif sys.flags.optimize == 1: fname = pycache_opt1 else: fname = pycache_opt2 arcname = file_pyc else: fname = arcname = file_py else: # new mode: use given optimization level if self._optimize == 0: fname = pycache_opt0 arcname = file_pyc else: arcname = file_pyc if self._optimize == 1: fname = pycache_opt1 elif self._optimize == 2: fname = pycache_opt2 else: msg = "invalid value for 'optimize': {!r}".format( self._optimize) raise ValueError(msg) if not (os.path.isfile(fname) and os.stat(fname).st_mtime >= os.stat(file_py).st_mtime): if not _compile(file_py, optimize=self._optimize): fname = arcname = file_py archivename = os.path.split(arcname)[1] if basename: archivename = "%s/%s" % (basename, archivename) return (fname, archivename) def main(args=None): import argparse description = 'A simple command-line interface for the zipfile module.' parser = argparse.ArgumentParser(description=description) group = parser.add_mutually_exclusive_group(required=True) group.add_argument('-l', '--list', metavar='', help='Show listing of a ZIP file') group.add_argument('-e', '--extract', nargs=2, metavar=('', ''), help='Extract a ZIP file into a target directory') group.add_argument('-c', '--create', nargs='+', metavar=('', ''), help='Create a ZIP file from sources') group.add_argument('-t', '--test', metavar='', help='Test if a ZIP file is valid') args = parser.parse_args(args) if args.test is not None: src = args.test with ZipFile(src, 'r') as zf: badfile = zf.testzip() if badfile: print( "The following enclosed file is corrupted: {!r}".format(badfile)) print("Done testing") elif args.list is not None: src = args.list with ZipFile(src, 'r') as zf: zf.printdir() elif args.extract is not None: src, curdir = args.extract with ZipFile(src, 'r') as zf: zf.extractall(curdir) elif args.create is not None: zip_name = args.create.pop(0) files = args.create def addToZip(zf, path, zippath): if os.path.isfile(path): zf.write(path, zippath, ZIP_DEFLATED) elif os.path.isdir(path): if zippath: zf.write(path, zippath) for nm in os.listdir(path): addToZip(zf, os.path.join(path, nm), os.path.join(zippath, nm)) # else: ignore with ZipFile(zip_name, 'w') as zf: for path in files: zippath = os.path.basename(path) if not zippath: zippath = os.path.basename(os.path.dirname(path)) if zippath in ('', os.curdir, os.pardir): zippath = '' addToZip(zf, path, zippath) if __name__ == "__main__": main()
Step 2: Create an OSS trigger
On the function details page, select the Trigger tab and click Create Trigger.
In the Create Trigger panel, select the OSS trigger type, configure the settings, and then click OK.
Key settings are described below. For other settings, see Configure a native OSS trigger.
Bucket Name: Select the bucket that you created.
Object Prefix: Enter
srcfor this example.Object Suffix: Enter
zipfor this example.Trigger Event: For this example, select the following events:
oss:ObjectCreated:PutObject, oss:ObjectCreated:PostObject, oss:ObjectCreated:CompleteMultipartUpload, oss:ObjectCreated:PutSymlink.Role Name: Select a role with permissions to invoke the function. You can grant these permissions by attaching the AliyunFCFullAccess policy to the role.
Once created, the trigger appears on the Triggers page.
Step 3: Verification
You can test your validation configuration in two ways:
Method 1: Upload a file via the console
Log on to the Object Storage Service console and upload a ZIP file, such as code.zip, to the src directory of the bucket you selected in Step 2: Create an OSS trigger. The upload automatically triggers the function to extract the ZIP file to the bucket's root directory.
Method 2: Test with event parameters
After configuring the event parameters, click Test Function to run the function. Then, go to the Object Storage Service console and check the file list in the target bucket to verify that the ZIP file has been decompressed.
On the function details page, click the Code tab, click the
icon next to Test Function, and select Configure Test Parameters from the drop-down list.In the Configure Test Parameters panel, select an Event Template, enter an Event Name and the event content, and click OK.
The following is a sample event.
{ "events": [ { "eventName": "ObjectCreated:PutObject", "eventSource": "acs:oss", "eventTime": "2023-08-13T06:45:43.000Z", "eventVersion": "1.0", "oss": { "bucket": { "arn": "acs:oss:cn-hangzhou:10343546824****:bucket****", "name": "bucket****", "ownerIdentity": "10343546824****" }, "object": { "deltaSize": 122539, "eTag": "688A7BF4F233DC9C88A80BF985AB****", "key": "src/test.zip", "size": 122539 }, "ossSchemaVersion": "1.0", "ruleId": "9adac8e253828f4f7c0466d941fa3db81161****" }, "region": "cn-hangzhou", "requestParameters": { "sourceIPAddress": "140.205.XX.XX" }, "responseElements": { "requestId": "58F9FF2D3DF792092E12044C" }, "userIdentity": { "principalId": "10343546824****" } } ] }For an explanation of the fields in the event parameter, see Configure Function Input Parameters.
ImportantThe preceding event content is for demonstration purposes only. You must modify some parameters based on your actual information and ensure that the specified file (in this example,
src/test.zip) exists in the configured bucket. Otherwise, the function cannot be triggered or the function execution will fail.The following parameters must be modified based on your actual configuration:
bucket.arn: For example,acs:oss:<region>:<your_account_id>:<your_bucket>. Replace<region>with the region where you created the function,<your_account_id>with your Alibaba Cloud account ID, and<your_bucket>with the name of a bucket that you have created in the same region. You can find your Alibaba Cloud account ID in the Common Information section of the Overview page in the Function Compute console.bucket.name: Replace with the name of your bucket.bucket.ownerIdentity: Replace with your Alibaba Cloud account ID.object.key: Replace with the key of the object in the target bucket.region: Replace with the function's region.userIdentity.principalId: Replace with your Alibaba Cloud account ID.
On the Code tab, click Test Function to run the test.
After the operation is complete, log in to the Object Storage Service console and check whether the target ZIP file (
src/test.zipin this example) in the target bucket is decompressed. For more information, see Query files by using the OSS console.
When you have finished testing, delete the application and its associated resources if you no longer need them to avoid unnecessary charges.
References
To learn more about OSS triggers, see OSS Trigger Overview.
To use Function Compute to package and download files from OSS, see Download multiple files as a ZIP package.
For more information about object storage, see Bucket and Object.