Any sequence of python strings can be disguised to look like a file opened for reading, which can then be passed on the fly to functions expecting such an input file. This code snippet defines the necessary adapter class. The obtained file-like objects store only one string of the sequence at a time (which limits some file operations, seek() doesn't work backwards for example). This code was inspired by the standard library's StringIO module. Please note that when memory or disk access is not an issue, it may be more efficient to write a temporary file than to use this adapter class (for example in linux one could write a file in /dev/shm).
Adapt an iterable to look like a file.
# Name: module adaptstrings.py
# Date: october 29 2011
# Author: Gribouillis for the python forum at www.daniweb.com
# License: public domain
# Use this code freely in your programs
"""Module to adapt sequences of strings to a file-like interface.
"""
__all__ = ["adapt_as_file", "adapt_as_opener"]
def adapt_as_file(iterable):
"""Adapt the iterable argument to look like a file object opened for
reading. It is assumed that the iterable's items are strings.
Return a file-like object with methods close(), read(), readline(), readlines()
seek(), tell(). See the documentation of the class IterableAsFileAdapter for
complete documentation.
typical usage:
f = adapt_as_file(sequence_of_strings)
for line in f:
...
# use f as if it were a file opened for reading
"""
return IterableAsFileAdapter(iterable)
def adapt_as_opener(func):
"""Decorator to adapt a string generator function as a function which
return a file-like object opened for reading. See the documentation of
adapt_as_file() for more.
typical usage:
@adapt_as_opener
def data_source(args):
yield a_string
yield another_string
f = data_source(args)
for line in f:
...
# use f as if it were a file opened for reading
"""
def wrapper(*args, **kwd):
return adapt_as_file(func(*args, **kwd))
update_wrapper(wrapper, func)
return wrapper
def _complain_ifclosed(closed):
if closed:
raise ValueError, "I/O operation on closed file"
# This class implementation was inspired by the pure python implementation
# of the StringIO class in the python standard library
class IterableAsFileAdapter(object):
def __init__(self, iterable):
self.source = iter(iterable)
self.start = self.pos = self.end = 0
self.buf = ''
self._closed = False
def is_sane(self):
"""return a bool indicating that the file-like instance satisfies the
class invariants defined in its implementation. A False value
should not happen, it would mean an internal implementation error.
"""
return bool(self._closed or (
(self.start <= self.pos <= self.end)
and (self.start + len(self.buf) == self.end)
and ((self.source is not None) or self.buf == '')
))
def __iter__(self):
return self
def next(self):
"""A file object is its own iterator, for example iter(f) returns f
(unless f is closed). When a file is used as an iterator, typically
in a for loop (for example, for line in f: print line), the next()
method is called repeatedly. This method returns the next input line,
or raises StopIteration when EOF is hit.
"""
r = self.readline()
if not r:
raise StopIteration
return r
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
@property
def closed(self):
"""bool indicating the current state of the file-like object. This is a
read-only attribute; the close() method changes the value."""
return self._closed
def close(self):
"""Free the memory buffer.
"""
if not self._closed:
self._closed, self.buf, self.source = True, '', None
self.start = self.pos = self.end = 0
def isatty(self):
"""Returns False.
"""
_complain_ifclosed(self._closed)
return False
def _go_next(self):
"""Advance the file position to the beginning of the next string
in the iterable source. For internal use only.
"""
self.start = self.pos = self.end
self.buf = ''
if self.source is not None:
try:
self.buf = next(self.source)
self.end += len(self.buf)
except StopIteration:
self.source = None
def seek(self, where, mode = 0):
"""Set the file's current position.
The mode argument is optional and defaults to 0 (absolute file
positioning); other values are 1 (seek relative to the current
position) and 2 (seek relative to the file's end). This adapter
class only support seek to positions beyond the current file position
and does not support mode 2. A ValueError is raised when these
conditions fail.
There is no return value.
"""
_complain_ifclosed(self._closed)
if mode == 2:
raise ValueError("File-like object does not support seek() from eof.")
elif mode == 1:
where += self.pos # use absolute positioning
if where < self.pos:
raise ValueError("File-like object does not support backward seek().")
while where > self.end:
self._go_next()
if self.source is None:
return
self.pos = where
def tell(self):
"""Return the file's current position."""
_complain_ifclosed(self._closed)
return self.pos
def read(self, n = -1):
"""Read at most size bytes from the file
(less if the read hits EOF before obtaining size bytes).
If the size argument is negative or omitted, read all data until EOF
is reached. The bytes are returned as a string object. An empty
string is returned when EOF is encountered immediately.
"""
_complain_ifclosed(self._closed)
L = list()
if n < 0:
L.append(self.buf[self.pos-self.start:])
while self.source is not None:
self._go_next()
L.append(self.buf)
else:
where = self.pos + n
while where > self.end:
L.append(self.buf[self.pos-self.start:])
self._go_next()
if self.source is None:
break
else:
L.append(self.buf[self.pos-self.start:where-self.start])
self.pos = where
return ''.join(L)
def readline(self, length = None):
r"""Read one entire line from the file.
A trailing newline character is kept in the string (but may be absent
when a file ends with an incomplete line). If the size argument is
present and non-negative, it is a maximum byte count (including the
trailing newline) and an incomplete line may be returned.
An empty string is returned only when EOF is encountered immediately.
Note: Unlike stdio's fgets(), the returned string contains null
characters ('\0') if they occurred in the input.
"""
_complain_ifclosed(self._closed)
if length is not None and length > 0:
stop = self.pos + length
else:
stop = -1
L = list()
while True:
where = self.buf.find('\n', self.pos-self.start)
if where < 0: # newline not found in buffer
if self.pos <= stop <= self.end:
L.append(self.buf[self.pos-self.start:stop-self.start])
self.pos = stop
break
else:
L.append(self.buf[self.pos-self.start:])
self._go_next()
if self.source is None:
break
else:
where += 1 + self.start
if 0 <= stop < where:
where = stop
L.append(self.buf[self.pos-self.start:where-self.start])
self.pos = where
break
return ''.join(L)
def readlines(self, sizehint = 0):
"""Read until EOF using readline() and return a list containing the
lines thus read.
If the optional sizehint argument is present, instead of reading up
to EOF, whole lines totalling approximately sizehint bytes (or more
to accommodate a final whole line).
"""
total = 0
lines = []
line = self.readline()
while line:
lines.append(line)
total += len(line)
if 0 < sizehint <= total:
break
line = self.readline()
return lines
def flush(self):
"""This is a no-op for this file-like object.
"""
_complain_ifclosed(self._closed)
@property
def name(self):
return "<%s>" % self.__class__.__name__
# test code
def test():
import sys
if sys.argv[1:]:
file = sys.argv[1]
else:
file = '/etc/passwd'
text = open(file, 'r').read(100000)
if not len(text):
text = "x"*70 + "\n"
while len(text) < 10000:
text += text
assert 10000 <= len(text) < 100000
def sample_iterable():
from random import Random
ra = Random()
ra.seed(12345678901234567890)
i = 0
while i < len(text):
d = ra.randint(10, 120)
yield text[i: i+d]
i += d
def new():
return IterableAsFileAdapter(sample_iterable())
assert new().read() == text
f = new()
x = list()
while True:
x.append(f.read(31))
assert f.is_sane()
if len(x[-1]) != 31:
assert f.tell() == len(text)
break
assert f.tell() == len(text)
assert ''.join(x) == text
f = new()
while True:
p = f.tell()
f.seek(47, 1)
assert f.is_sane()
if p + 47 < len(text):
assert f.tell() == p + 47
else:
assert f.tell() == len(text)
break
f = new()
while True:
p = f.tell()
f.seek(p+47)
assert f.is_sane()
if p + 47 < len(text):
assert f.tell() == p + 47
else:
assert f.tell() == len(text)
break
f = new()
U, V = f.readlines(), text.split('\n')
if not V[-1]:
del V[-1]
assert len(U) == len(V)
assert all(x == y + '\n' for x, y in zip(U, V))
with new() as f:
for line in f:
pass
if __name__ == "__main__":
test()
Gribouillis 1,391 Programming Explorer Team Colleague
Gribouillis 1,391 Programming Explorer Team Colleague
Gribouillis 1,391 Programming Explorer Team Colleague
Be a part of the DaniWeb community
We're a friendly, industry-focused community of developers, IT pros, digital marketers, and technology enthusiasts meeting, networking, learning, and sharing knowledge.