Created
November 13, 2012 07:05
-
-
Save jrydberg/4064426 to your computer and use it in GitHub Desktop.
Functional-programming style sequences.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Functional-programming style sequences.""" | |
import itertools | |
import functools | |
import operator as op | |
class Blank(object): | |
def __init__(self, pos=None): | |
self.pos = pos | |
# Our "blank" objects | |
_1, _2, _3, _4 = Blank(0), Blank(1), Blank(2), Blank(3) | |
class BlankedInvocation(object): | |
def __init__(self, fn, posargs, kwargs): | |
self.fn = fn | |
self.posargs = posargs | |
self.kwargs = kwargs | |
def padnone(self, iterable): | |
"""Returns the sequence elements and then returns None indefinitely. | |
Useful for emulating the behavior of the built-in map() function. | |
""" | |
return itertools.chain(iterable, itertools.repeat(None)) | |
def __call__(self, *values): | |
values = list(itertools.islice(self.padnone(values), | |
0, len(self.posargs))) | |
args = [] | |
for arg in self.posargs: | |
if isinstance(arg, Blank): | |
args.append(values[arg.pos]) | |
else: | |
args.append(arg) | |
return self.fn(*args, **self.kwargs) | |
def fnliteral(*args, **kw): | |
"""Create a function literal based on the provided arguments. | |
Will return a callable with signature C{(value)}. | |
This acts like L{functools.partial} in some ways, except that | |
it allows the value to be placed at any position. Example: | |
>>> fnliteral(operator.add, _, 20)(10) | |
30 | |
""" | |
if len(args) == 0: | |
return lambda x: x if x else None | |
if len(args) == 1: | |
return lambda *a: args[0](*a, **kw) | |
else: | |
if any(isinstance(arg, Blank) for arg in args[1:]): | |
return BlankedInvocation(args[0], args[1:], kw) | |
else: | |
return functools.partial(*args, **kw) | |
class _BaseSeq: | |
"""Base class of our sequences that expects that C{self} is an | |
iterable of some form. | |
""" | |
def take(self, n): | |
return self.slice(None, n) | |
def takewhile(self, *args, **kw): | |
predicate = fnliteral(*args, **kw) | |
return self.__class__(itertools.takewhile(predicate, self)) | |
def some(self, *args, **kw): | |
predicate = fnliteral(*args, **kw) | |
for value in self: | |
if predicate(value): | |
return value | |
return None | |
def filter(self, *args, **kw): | |
fn = fnliteral(*args, **kw) | |
return self.__class__(elem for elem in self if fn(elem)) | |
def filterfalse(self, *args, **kw): | |
fn = fnliteral(*args, **kw) | |
return self.__class__(elem for elem in self if not fn(elem)) | |
def map(self, *args, **kw): | |
fn = fnliteral(*args, **kw) | |
return self.__class__(fn(elem) for elem in self) | |
def concat(self, *other): | |
return self.__class__(itertools.chain(self, *other)) | |
def count(self, fn=bool): | |
return sum(1 for elem in self if fn(elem)) | |
def reduce(self, *args, **kw): | |
"""Reduce content of sequence down to a single value. | |
""" | |
fn = fnliteral(*args, **kw) | |
it = iter(self) | |
ret = next(it) | |
for value in it: | |
ret = fn(ret, value) | |
return ret | |
def first(self): | |
"""Return the first element or None.""" | |
return next(self, None) | |
def dropwhile(self, *args, **kw): | |
predicate = fnliteral(*args, **kw) | |
return self.__class__(itertools.dropwhile(predicate, self)) | |
def slice(self, *args): | |
return self.__class__(itertools.islice(self, *args)) | |
def zip(self, *iterables): | |
return self.__class__(itertools.izip(self, *iterables)) | |
def enumerate(self): | |
return self.__class__(enumerate(self)) | |
def append(self, elem): | |
return self.__class__(itertools.chain(self, [elem])) | |
def prepend(self, elem): | |
return self.__class__(itertools.chain([elem], self)) | |
def rest(self): | |
"""Return a sequence of all but the first element.""" | |
it = iter(self) | |
try: | |
next(it) | |
except StopIteration: | |
it = iter([]) | |
return self.__class__(it) | |
def nth(self, n, default=None): | |
"""Returns the nth element or a default value""" | |
return next(itertools.islice(self, n, None), default) | |
def reverse(self): | |
"""Return reversed sequence.""" | |
return self.__class__(reversed(self)) | |
def remove(self, toremove): | |
def produce(): | |
it = iter(self) | |
for elem in it: | |
if elem == toremove: | |
break | |
yield elem | |
for elem in it: | |
yield elem | |
return self.__class__(produce()) | |
def flatten(self): | |
"""Flatten one level of nesting.""" | |
return self.__class__(itertools.chain.from_iterable(self)) | |
def sort(self, cmp=None, key=None, reverse=False): | |
"""Return sorted sequence.""" | |
return self.__class__(sorted(self, cmp=cmp, key=key, | |
reverse=reverse)) | |
def do(self, *args, **kw): | |
fn = fnliteral(*args, **kw) | |
for value in self: | |
fn(value) | |
class lazy_seq(object, _BaseSeq): | |
"""A lazy sequence, that operate on iterable. | |
Note that this sequence is "one shot" in the sense that it | |
is only possible to iterate over it once. | |
If you want to make the content permanent, use the C{save} method. | |
""" | |
def __init__(self, iterable): | |
self._iterable = iter(iterable) | |
def __iter__(self): | |
return self._iterable | |
def __repr__(self): | |
return 'lazy_seq(...)' | |
def tee(self, n=2): | |
return map(lazy_seq, itertools.tee(self, n)) | |
def save(self): | |
return seq(self) | |
seq = persist = save | |
class seq(_BaseSeq, list): | |
"""An immutable sequence of elements.""" | |
def __getitem__(self, elem): | |
if type(elem) == slice: | |
return seq(list(self)[elem]) | |
return list.__getitem__(self, elem) | |
def __setitem__(self, elem): | |
raise TypeError | |
def insert(self, index, elem): | |
# FIXME: do this more elegantly | |
data = list(self) | |
data.insert(index, elem) | |
return seq(data) | |
def __repr__(self): | |
return 'seq(%r)' % (list(self),) | |
def test(): | |
print (lazy_seq(itertools.count(1)) | |
.slice(0, 2) | |
.prepend(20) | |
.rest() | |
.map(op.add, _1, _1) # x + x | |
.map(op.div, _1, 0.5) # x / 0.5 | |
.map(int) | |
).seq().reduce(op.add, _1, _2) | |
if __name__ == '__main__': | |
test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment