Write an output file by fixed length chunks.

Gribouillis 3 Tallied Votes 4K Views Share

When working whith large data files, it may be desirable to output a sequence of bytes by large chunks. This snippet defines a file adapter class to handle this transparently. Writing bytes to an ChunkedOutputFile will automatically write the underlying file object by fixed length chunks.

TrustyTony commented: looks useful +13
#!/usr/bin/env python
# -*-coding: utf8-*-
# Title: writechunks.py
# Author: Gribouillis for the python forum at www.daniweb.com
# Created: 2012-03-18 11:49:50.489168 (isoformat date)
# License: Public Domain
# Use this code freely.

import mmap
import platform
import sys

version_info = (0, 1)
version = ".".join(map(str, version_info))

B = 1
_K = 1024
KB = _K * B
MB = _K * KB
GB = _K * MB

def new_mmap(chunk_size):
    if platform.system() == 'Windows':
        map = mmap.mmap(-1, chunk_size, None, mmap.ACCESS_WRITE)
    else:
        map = mmap.mmap(-1, chunk_size, mmap.MAP_PRIVATE, mmap.PROT_READ | mmap.PROT_WRITE)
    return map

        
def chunked(iterable, chunk_size):
    """Adapter to convert a sequence of strings to another sequence of
    strings with fixed length (the last string may be smaller).
    
    Example:
    
        >>> data = ["give ", "me ", "bacon ", "and ", "eggs ", "said ", "the ", "other ", "man."]
        >>> for s in chunked(data, 9):
        ...     print(s)
        ...
        give me b
        acon and 
        eggs said
         the othe
        r man.

    """
    iterable = iter(iterable)
    map = new_mmap(chunk_size)
    tell, seek, write = map.tell, map.seek, map.write
    while True:
        try:
            for chars in iterable:
                write(chars)
            break
        except ValueError:
            while True:
                sz = chunk_size - tell()
                write(chars[:sz])
                yield map[:]
                seek(0)
                chars = chars[sz:]
                try:
                    write(chars)
                    break
                except ValueError:
                    pass
    if tell():
        yield map[0:tell()]
        seek(0)

class ChunkedOutputFile(object):
    """Adapter class to write an output file by chunks of fixed length.
    
    Typical use:
    
        # This code writes a sequence of strings to myfile.txt by chunks of 32 megabytes.
    
        with open("myfile.txt", "w") as handle:
            with ChunkedOutputFile(handle, 32 * MB) as ofh:
                for string in data_source():
                    ofh.write(string)
                    
    Many file methods are left unimplemented in this class. They could be implemented in subclasses.
        
    """
    def __init__(self, ofh, chunk_size):
        self.ofh = ofh
        self.chunk_size = chunk_size
        self.map = new_mmap(chunk_size)
        
    def close(self):
        self.flush()
        self.map.close()
        self.ofh.close()

    def fileno(self):
        return self.ofh.fileno()
        
    def isatty(self):
        return self.ofh.isatty()

    def flush(self):
        if self.map.tell():
            self.ofh.write(self.map[:self.map.tell()])
            self.map.seek(0)
        self.ofh.flush()
    
    def next(self):
        raise NotImplementedError
        
    def read(self, size=-1):
        raise NotImplementedError
    
    def readline(self, size=-1):
        raise NotImplementedError
        
    def readlines(self, sizehint=None):
        raise NotImplementedError
        
    def seek(self, offset, whence = 0):
        if self.map.tell():
            self.ofh.write(self.map[:self.map.tell()])
            self.map.seek(0)
        self.ofh.seek(offset, whence)
    
    def tell(self):
        return self.ofh.tell() + self.map.tell()
    
    def truncate(self, size):
        raise NotImplementedError
        
    def write(self, chars):
        while chars:
            sz = self.chunk_size - self.map.tell()
            try:
                if not sz:
                    raise ValueError
                self.map.write(chars[:sz])
                chars = chars[sz:]
            except ValueError:
                self.ofh.write(self.map[:])
                self.map.seek(0)
    
    def writelines(self, sequence):
        iterable = iter(sequence)
        map = self.map
        chunk_size = self.chunk_size
        tell, seek, write = map.tell, map.seek, map.write
        while True:
            try:
                for chars in iterable:
                    write(chars)
                break
            except ValueError:
                while True:
                    sz = chunk_size - tell()
                    write(chars[:sz])
                    self.ofh.write(map[:])
                    seek(0)
                    chars = chars[sz:]
                    try:
                        write(chars)
                        break
                    except ValueError:
                        pass
        if tell():
            self.ofh.write(map[0:tell()])
            seek(0)
            
    @property
    def closed(self):
        return self.ofh.closed
        
    @property
    def encoding(self):
        raise NotImplementedError
        
    @property
    def errors(self):
        raise NotImplementedError
    
    @property
    def mode(self):
        return self.ofh.mode
        
    @property
    def name(self):
        return self.ofh.name
        
    @property
    def newlines(self):
        return self.ofh.newlines
        
    def _get_softspace(self):
        return self.ofh.softspace
    def _set_softspace(self, value):
        self.ofh.softspace = value
    softspace = property(_get_softspace, _set_softspace)

    def __enter__(self):
        return self
        
    def __exit__(self, *args):
        self.close()

if __name__ == "__main__":
    # test code
    test_str = open(__file__).read()
    data = test_str.split()
    L = list()
    chunk = 9
    for c in chunked(data, chunk):
        L.append(c)
        
    def check_list(alist, chunk_size, expected):
        assert all(len(x) == chunk_size for x in alist[:-1])
        assert ''.join(alist) == expected
        
    check_list(L, chunk, ''.join(data))

    class _MockFile(list):
        def write(self, chars):
            self.append(chars)

        def close(self):
            pass
            
        def flush(self):
            pass

    chunk = 3*KB/2
    
    with open(__file__) as ifh:
        with ChunkedOutputFile(_MockFile(), chunk) as ofh:
            for line in ifh:
                ofh.write(line)
        check_list(ofh.ofh, chunk, test_str)
        ifh.seek(0)
        with ChunkedOutputFile(_MockFile(), chunk) as ofh:
            ofh.writelines(ifh)
        check_list(ofh.ofh, chunk, test_str)
Gribouillis 1,391 Programming Explorer Team Colleague

I missed an optimizing trick in write(). Here is the improved version

#!/usr/bin/env python
# -*-coding: utf8-*-
# Title: writechunks.py
# Author: Gribouillis for the python forum at www.daniweb.com
# Created: 2012-03-18 11:49:50.489168 (isoformat date)
# License: Public Domain
# Use this code freely.

import mmap
import platform
import sys

version_info = (0, 2)
version = ".".join(map(str, version_info))

B = 1
_K = 1024
KB = _K * B
MB = _K * KB
GB = _K * MB

if platform.system() == 'Windows':
    def new_mmap(chunk_size):
        return mmap.mmap(-1, chunk_size, None, mmap.ACCESS_WRITE)
else:
    def new_mmap(chunk_size):
        return mmap.mmap(-1, chunk_size, mmap.MAP_PRIVATE, mmap.PROT_READ | mmap.PROT_WRITE)
        
def chunked(iterable, chunk_size):
    """Adapter to convert a sequence of strings to another sequence of
    strings with fixed length (the last string may be smaller).
    
    Example:
    
        >>> data = ["give ", "me ", "bacon ", "and ", "eggs ", "said ", "the ", "other ", "man."]
        >>> for s in chunked(data, 9):
        ...     print(s)
        ...
        give me b
        acon and 
        eggs said
         the othe
        r man.

    """
    iterable = iter(iterable)
    map = new_mmap(chunk_size)
    tell, seek, write = map.tell, map.seek, map.write
    while True:
        try:
            for chars in iterable:
                write(chars)
            break
        except ValueError:
            while True:
                sz = chunk_size - tell()
                write(chars[:sz])
                yield map[:]
                seek(0)
                chars = chars[sz:]
                try:
                    write(chars)
                    break
                except ValueError:
                    pass
    if tell():
        yield map[0:tell()]
        seek(0)

class ChunkedOutputFile(object):
    """Adapter class to write an output file by chunks of fixed length.
    
    Typical use:
    
        # This code writes a sequence of strings to myfile.txt by chunks of 32 megabytes.
    
        with open("myfile.txt", "w") as handle:
            with ChunkedOutputFile(handle, 32 * MB) as ofh:
                for string in data_source():
                    ofh.write(string)
                    
    Many file methods are left unimplemented in this class. They could be implemented in subclasses.
        
    """
    def __init__(self, ofh, chunk_size):
        self.ofh = ofh
        self.chunk_size = chunk_size
        self.map = new_mmap(chunk_size)
        
    def close(self):
        self.flush()
        self.map.close()
        self.ofh.close()

    def fileno(self):
        return self.ofh.fileno()
        
    def isatty(self):
        return self.ofh.isatty()

    def flush(self):
        if self.map.tell():
            self.ofh.write(self.map[:self.map.tell()])
            self.map.seek(0)
        self.ofh.flush()
    
    def next(self):
        raise NotImplementedError
        
    def read(self, size=-1):
        raise NotImplementedError
    
    def readline(self, size=-1):
        raise NotImplementedError
        
    def readlines(self, sizehint=None):
        raise NotImplementedError
        
    def seek(self, offset, whence = 0):
        if self.map.tell():
            self.ofh.write(self.map[:self.map.tell()])
            self.map.seek(0)
        self.ofh.seek(offset, whence)
    
    def tell(self):
        return self.ofh.tell() + self.map.tell()
    
    def truncate(self, size):
        raise NotImplementedError
        
    def write(self, chars):
        while chars:
            try:
                self.map.write(chars)
                return
            except ValueError:
                sz = self.chunk_size - self.map.tell()
                if sz:
                    self.map.write(chars[:sz])
                    chars = chars[sz:]
                self.ofh.write(self.map[:])
                self.map.seek(0)
    
    def writelines(self, sequence):
        iterable = iter(sequence)
        map = self.map
        chunk_size = self.chunk_size
        tell, seek, write = map.tell, map.seek, map.write
        while True:
            try:
                for chars in iterable:
                    write(chars)
                break
            except ValueError:
                while True:
                    sz = chunk_size - tell()
                    write(chars[:sz])
                    self.ofh.write(map[:])
                    seek(0)
                    chars = chars[sz:]
                    try:
                        write(chars)
                        break
                    except ValueError:
                        pass
        if tell():
            self.ofh.write(map[0:tell()])
            seek(0)
            
    @property
    def closed(self):
        return self.ofh.closed
        
    @property
    def encoding(self):
        raise NotImplementedError
        
    @property
    def errors(self):
        raise NotImplementedError
    
    @property
    def mode(self):
        return self.ofh.mode
        
    @property
    def name(self):
        return self.ofh.name
        
    @property
    def newlines(self):
        return self.ofh.newlines
        
    def _get_softspace(self):
        return self.ofh.softspace
    def _set_softspace(self, value):
        self.ofh.softspace = value
    softspace = property(_get_softspace, _set_softspace)

    def __enter__(self):
        return self
        
    def __exit__(self, *args):
        self.close()

if __name__ == "__main__":
    # test code
    test_str = open(__file__).read()
    data = test_str.split()
    L = list()
    chunk = 9
    for c in chunked(data, chunk):
        L.append(c)
        
    def check_list(alist, chunk_size, expected):
        assert all(len(x) == chunk_size for x in alist[:-1])
        assert ''.join(alist) == expected
        
    check_list(L, chunk, ''.join(data))

    class _MockFile(list):
        def write(self, chars):
            self.append(chars)

        def close(self):
            pass
            
        def flush(self):
            pass

    chunk = 3*KB/2
    
    with open(__file__) as ifh:
        with ChunkedOutputFile(_MockFile(), chunk) as ofh:
            for line in ifh:
                ofh.write(line)
        check_list(ofh.ofh, chunk, test_str)
        ifh.seek(0)
        with ChunkedOutputFile(_MockFile(), chunk) as ofh:
            ofh.writelines(ifh)
        check_list(ofh.ofh, chunk, test_str)
TrustyTony 888 ex-Moderator Team Colleague Featured Poster

This snippet could probably be good foundation for file copy/installation with progress bar by adding function to call for each block. It could take parameters for current block number and total number of blocks.

TrustyTony 888 ex-Moderator Team Colleague Featured Poster

What is the benefit of using this snippet compared to io module's buffered io?

Gribouillis 1,391 Programming Explorer Team Colleague

What is the benefit of using this snippet compared to io module's buffered io?

You are right, there may be no advantage at all. I didn't think of that when I wrote the snippet.

Be a part of the DaniWeb community

We're a friendly, industry-focused community of developers, IT pros, digital marketers, and technology enthusiasts meeting, networking, learning, and sharing knowledge.