Source code for parts.parts

"""
Minimal library that enables partitioning of iterable objects in a concise
manner.
"""
import doctest
from typing import Tuple, Union, Optional, Iterable
import collections.abc
from itertools import islice, chain

def _empty(xs: Iterable) -> Tuple[Iterable, bool]:
    """
    Determine whether a sequential type instance is empty.

    >>> def error():
    ...     for i in range(2):
    ...         if i == 0:
    ...             yield i
    ...         else:
    ...             raise RuntimeError('error in generator')
    ...
    >>> list(parts(error(), length=1))
    Traceback (most recent call last):
      ...
    RuntimeError: error in generator
    """
    try:
        return (xs, len(xs) == 0)
    except TypeError:
        try:
            x = next(xs)
            return (iter(chain([x], xs)), False)
        except StopIteration:
            return (xs, True)
        except Exception as e:
            raise e from None

def _slice(xs: Iterable, lower: int, upper: int) -> Iterable:
    """
    Attempt to retrieve a subsequence of a sequential type instance
    using slice notation or :obj:`itertools.islice`.
    """
    try:
        return xs[lower: min(len(xs), upper)]
    except TypeError:
        try:
            return islice(xs, 0, upper - lower)
        except:
            raise TypeError(
                'object does not support retrieval of slices'
            ) from None

[docs]def parts( # pylint: disable=R0912,R0915 xs: Iterable, number: Optional[int] = None, length: Union[int, Iterable[int], None] = None ) -> Iterable: """ This function splits an :obj:`~collections.abc.Iterable` object into either the specified number of parts or a number of parts each of the specified length. When input parameters lead to ambiguous or conflicting constraints, either elements are distributed in a best-effort manner or an exception is raised (depending on the specific scenario). :param xs: Iterable to split into parts. :param number: Number of parts. :param length: Length of every part or iterable of part lengths. In the simplest case, the target number of parts can be specified. >>> list(parts([1, 2, 3, 4, 5, 6, 7], 1)) [[1, 2, 3, 4, 5, 6, 7]] >>> list(parts([1, 2, 3, 4, 5, 6, 7], 2)) [[1, 2, 3], [4, 5, 6, 7]] >>> list(parts([1, 2, 3, 4, 5, 6, 7], 3)) [[1, 2], [3, 4], [5, 6, 7]] >>> list(parts([1, 2, 3, 4, 5, 6, 7], 4)) [[1], [2, 3], [4, 5], [6, 7]] >>> list(parts([1, 2, 3, 4, 5, 6, 7], 5)) [[1], [2], [3], [4, 5], [6, 7]] >>> list(parts([1, 2, 3, 4, 5, 6, 7], 6)) [[1], [2], [3], [4], [5], [6, 7]] >>> list(parts([1, 2, 3, 4, 5, 6, 7], 7)) [[1], [2], [3], [4], [5], [6], [7]] The target length for each part can be specified; the number of parts will be determined based on the length and the available number of items. >>> list(parts([1, 2, 3, 4, 5, 6, 7], length=1)) [[1], [2], [3], [4], [5], [6], [7]] >>> list(parts([1, 2, 3, 4, 5, 6, 7], length=2)) [[1, 2], [3, 4], [5, 6], [7]] >>> list(parts([1, 2, 3, 4, 5, 6, 7], length=3)) [[1, 2, 3], [4, 5, 6], [7]] >>> list(parts([1, 2, 3, 4, 5, 6, 7], length=4)) [[1, 2, 3, 4], [5, 6, 7]] >>> list(parts([1, 2, 3, 4, 5, 6, 7], length=5)) [[1, 2, 3, 4, 5], [6, 7]] >>> list(parts([1, 2, 3, 4, 5, 6, 7], length=6)) [[1, 2, 3, 4, 5, 6], [7]] >>> list(parts([1, 2, 3, 4, 5, 6, 7], length=7)) [[1, 2, 3, 4, 5, 6, 7]] >>> list(map(list, parts(iter([1, 2, 3, 4, 5, 6, 7]), length=4))) [[1, 2, 3, 4], [5, 6, 7]] An iterable of length values can be specified. The entry at each position in the iterable of lengths dictates the length of the part in the corresponding position in the output. >>> list(parts([1, 2, 3, 4, 5, 6, 7], 7, [1, 1, 1, 1, 1, 1, 1])) [[1], [2], [3], [4], [5], [6], [7]] >>> list(parts([1, 2, 3, 4, 5, 6, 7], length=[1, 1, 1, 1, 1, 1, 1])) [[1], [2], [3], [4], [5], [6], [7]] >>> list(parts([1, 2, 3, 4, 5, 6], length=[2, 2, 2])) [[1, 2], [3, 4], [5, 6]] >>> list(parts([1, 2, 3, 4, 5, 6], length=[1, 2, 3])) [[1], [2, 3], [4, 5, 6]] The type of input objects (for built-in types) is preserved in the output. >>> isinstance(next(parts([1, 2, 3, 4, 5], length=2)), list) True >>> isinstance(next(parts([1, 2, 3, 4, 5], length=[2, 2, 1])), list) True >>> isinstance(next(parts([1, 2, 3, 4, 5], number=2)), list) True >>> isinstance(next(parts([1, 2, 3, 4], number=2, length=2)), list) True >>> isinstance(next(parts([1, 2, 3, 4], number=2, length=[2, 2])), list) True >>> isinstance(next(parts((1, 2, 3, 4, 5), length=2)), tuple) True >>> isinstance(next(parts((1, 2, 3, 4, 5), length=[2, 2, 1])), tuple) True >>> isinstance(next(parts((1, 2, 3, 4, 5), number=2)), tuple) True >>> isinstance(next(parts((1, 2, 3, 4), number=2, length=2)), tuple) True >>> isinstance(next(parts((1, 2, 3, 4), number=2, length=[2, 2])), tuple) True >>> isinstance(next(parts('abc', length=2)), str) True >>> isinstance(next(parts('abc', length=[2, 1])), str) True >>> isinstance(next(parts('abc', number=2)), str) True >>> isinstance(next(parts('abcd', number=2, length=2)), str) True >>> isinstance(next(parts('abcd', number=2, length=[2, 2])), str) True >>> isinstance(next(parts(bytes([1, 2, 3, 4, 5]), length=2)), bytes) True >>> isinstance(next(parts(bytes([1, 2, 3, 4, 5]), length=[2, 2, 1])), bytes) True >>> isinstance(next(parts(bytes([1, 2, 3, 4, 5]), number=2)), bytes) True >>> isinstance(next(parts(bytes([1, 2, 3, 4]), number=2, length=2)), bytes) True >>> isinstance(next(parts(bytes([1, 2, 3, 4]), number=2, length=[2, 2])), bytes) True >>> isinstance(next(parts(bytearray([1, 2, 3, 4, 5]), length=2)), bytearray) True >>> isinstance(next(parts(bytearray([1, 2, 3, 4, 5]), length=[2, 2, 1])), bytearray) True >>> isinstance(next(parts(bytearray([1, 2, 3, 4, 5]), number=2)), bytearray) True >>> isinstance(next(parts(bytearray([1, 2, 3, 4]), number=2, length=2)), bytearray) True >>> isinstance(next(parts(bytearray([1, 2, 3, 4]), number=2, length=[2, 2])), bytearray) True >>> isinstance(next(parts(range(0, 10), length=2)), range) True >>> isinstance(next(parts(range(0, 5), length=[2, 2, 1])), range) True >>> isinstance(next(parts(range(0, 10), number=2)), range) True >>> isinstance(next(parts(range(0, 4), number=2, length=2)), range) True >>> isinstance(next(parts(range(0, 4), number=2, length=[2, 2])), range) True >>> isinstance(next(parts(iter([1, 2, 3, 4]), length=2)), Iterable) True The type of input :obj:`~collections.abc.Sequence` objects is preserved in the output. >>> class wrap: ... def __init__(self, xs): self.xs = xs ... def __len__(self): return len(self.xs) ... def __getitem__(self, key): return wrap(self.xs[key]) ... def __repr__(self): return 'wrap(' + str(self.xs) + ')' >>> isinstance(next(parts(wrap([1, 2, 3, 4]), number=2)), wrap) True >>> list(parts(wrap([1, 2, 3, 4]), number=2)) [wrap([1, 2]), wrap([3, 4])] >>> list(parts(wrap([1, 2, 3, 4]), length=2)) [wrap([1, 2]), wrap([3, 4])] >>> list(parts(wrap([1, 2, 3, 4]), length=[2, 2])) [wrap([1, 2]), wrap([3, 4])] >>> list(parts(wrap([1, 2, 3, 4]), number=2, length=2)) [wrap([1, 2]), wrap([3, 4])] >>> list(parts(wrap([1, 2, 3, 4]), number=2, length=[2, 2])) [wrap([1, 2]), wrap([3, 4])] The type of input objects *derived* from a :obj:`~collections.abc.Sequence` type is also preserved in the output. >>> class inherit(tuple): ... def __getitem__(self, key): return inherit(tuple(self)[key]) ... def __repr__(self): return 'inherit' + str(tuple(self)) >>> isinstance(next(parts(inherit([1, 2, 3, 4]), 2)), inherit) True >>> list(parts(inherit([1, 2, 3, 4]), number=2)) [inherit(1, 2), inherit(3, 4)] >>> list(parts(inherit([1, 2, 3, 4]), length=2)) [inherit(1, 2), inherit(3, 4)] >>> list(parts(inherit([1, 2, 3, 4]), length=[2, 2])) [inherit(1, 2), inherit(3, 4)] >>> list(parts(inherit([1, 2, 3, 4]), number=2, length=2)) [inherit(1, 2), inherit(3, 4)] >>> list(parts(inherit([1, 2, 3, 4]), number=2, length=[2, 2])) [inherit(1, 2), inherit(3, 4)] Iterable inputs yield iterable outputs when possible. >>> def iterable(): ... for i in range(10): ... yield i >>> isinstance((next(parts(iterable(), number=2))), Iterable) Traceback (most recent call last): ... TypeError: object must have length to determine part lengths from number parameter >>> isinstance((next(parts(iterable(), length=2))), Iterable) True >>> isinstance((next(parts(iterable(), length=[2, 2]))), Iterable) True >>> ps = parts(iterable(), number=2, length=2) >>> isinstance(next(ps), Iterable) # doctest: +NORMALIZE_WHITESPACE Traceback (most recent call last): ... TypeError: object must have length to determine if number of \ parts having specified length(s) can be retrieved >>> ps = parts(iterable(), number=2, length=[2, 2]) >>> isinstance(next(ps), Iterable) # doctest: +NORMALIZE_WHITESPACE Traceback (most recent call last): ... TypeError: object must have length to determine if number of \ parts having specified length(s) can be retrieved >>> not isinstance((next(parts(iterable(), length=2))), list) True >>> list(parts(123, length=2)) Traceback (most recent call last): ... TypeError: object does not support retrieval of slices A descriptive exception is raised when parameter values cannot be satisfied, cause a conflict, or have an incorrect type. >>> list(parts([1, 2, 3, 4, 5, 6], 2, 3)) [[1, 2, 3], [4, 5, 6]] >>> list(parts([1, 2, 3, 4, 5, 6], number=3, length=2)) [[1, 2], [3, 4], [5, 6]] >>> list(parts(iter([1, 2, 3, 4, 5, 6]), number=3, length=2)) Traceback (most recent call last): ... TypeError: object must have length to determine if number of \ parts having specified length(s) can be retrieved >>> list(parts(iter([1, 2, 3, 4, 5, 6, 7]), 1)) Traceback (most recent call last): ... TypeError: object must have length to determine part lengths from number parameter >>> list(parts([1, 2, 3, 4, 5, 6], 1.2)) Traceback (most recent call last): ... TypeError: number parameter must be an integer >>> list(parts([1, 2, 3, 4, 5, 6], length=1.23)) Traceback (most recent call last): ... TypeError: length parameter must be an integer or iterable of integers >>> list(parts([1, 2, 3, 4, 5, 6], length=[1.23])) Traceback (most recent call last): ... TypeError: length parameter must be an integer or iterable of integers >>> list(parts([1, 2, 3, 4, 5, 6], 2, length=[1, 2, 3])) Traceback (most recent call last): ... ValueError: number parameter does not match number of specified part lengths >>> list(parts([1, 2, 3, 4, 5, 6, 7], number=3, length=2)) Traceback (most recent call last): ... ValueError: cannot retrieve 3 parts from object given part length parameter of 2 >>> list(parts([1, 2, 3], length=4)) [[1, 2, 3]] >>> list(parts([1, 2, 3], number=2, length=[1, 2])) [[1], [2, 3]] >>> list(parts([1, 2, 3], number=3, length=[1, 2, 3])) Traceback (most recent call last): ... ValueError: object has too few items to retrieve parts having specified part lengths >>> list(parts([1, 2, 3])) Traceback (most recent call last): ... ValueError: missing number of parts parameter and part length(s) parameter >>> list(parts([1, 2, 3], length=[4])) [[1, 2, 3]] >>> list(parts([1, 2, 3], length=[3, 1])) Traceback (most recent call last): ... ValueError: object has too few items to retrieve parts having specified part lengths >>> list(parts([1, 2, 3], number=2, length=[1, 1])) Traceback (most recent call last): ... ValueError: object has too many items to retrieve parts having specified part lengths >>> list(parts([1, 2, 3], number=1, length=[4])) [[1, 2, 3]] >>> list(parts([1, 2, 3], number=2, length=[3, 1])) Traceback (most recent call last): ... ValueError: object has too few items to retrieve parts having specified part lengths >>> list(parts([1, 2, 3], length=[1, 1, 1, 1])) Traceback (most recent call last): ... ValueError: object has too few items to retrieve parts having specified part lengths >>> list(parts([1, 2, 3], number=1, length=[1.2])) Traceback (most recent call last): ... TypeError: length parameter must be an integer or list of integers """ if number is not None and not isinstance(number, int): raise TypeError('number parameter must be an integer') if length is not None: if not isinstance(length, int) and not isinstance(length, collections.abc.Iterable): raise TypeError( 'length parameter must be an integer or iterable of integers' ) if number is not None and length is None: try: len_ = len(xs) except: raise TypeError( 'object must have length to determine part lengths from number parameter' ) from None number = max(1, min(len_, number)) # Number should be reasonable. length = len_ // number # Produce parts by updating length after each part to ensure # an even distribution. i = 0 while number > 0 and i < len_: number -= 1 if number == 0: yield xs[i:] break yield xs[i:i + length] i += length length = (len_ - i) // number elif number is None and length is not None: if isinstance(length, int): length = max(1, length) index = 0 while True: part = _slice(xs, index, index + length) index += length # The type of each part will match that of the original # object to the extent that `_slice` can do so. (part, empty) = _empty(part) if empty: break yield part else: # Length can only be an iterable of integers. lengths = iter(length) index = 0 while True: try: length = next(lengths) if not isinstance(length, int): raise TypeError( 'length parameter must be an integer or iterable of integers' ) part = _slice(xs, index, index + length) index += length # The type of each part will match that of the original # object to the extent that `_slice` can do so. (part, empty) = _empty(part) if empty: raise ValueError( 'object has too few items to retrieve parts having ' + \ 'specified part lengths' ) yield part except StopIteration: break elif number is not None and length is not None: try: len_ = len(xs) except: raise TypeError( 'object must have length to determine if number of ' + \ 'parts having specified length(s) can be retrieved' ) from None if isinstance(length, int): length = max(1, length) if len_ > (length * number) or len_ <= (length * (number - 1)): raise ValueError( 'cannot retrieve ' + str(number) + ' parts from object ' + \ 'given part length parameter of ' + str(length) ) for i in range(0, len_, length): # Yield parts of specified length. yield xs[i:i + length] elif (not isinstance(length, list)) or \ (not all(isinstance(l, int) for l in length)): raise TypeError( 'length parameter must be an integer or list of integers' ) else: # Length must be a list of integers. if len(length) != number: # pylint: disable=R1720 raise ValueError( 'number parameter does not match number of specified part lengths' ) elif len_ <= sum(length[:-1]): raise ValueError( 'object has too few items to retrieve parts having ' + \ 'specified part lengths' ) elif len_ > sum(length): raise ValueError( 'object has too many items to retrieve parts having ' + \ 'specified part lengths' ) else: lengths = iter(length) index = 0 while True: try: length = next(lengths) part = _slice(xs, index, index + length) index += length yield part except StopIteration: break else: # Neither is specified. raise ValueError('missing number of parts parameter and part length(s) parameter')
if __name__ == '__main__': doctest.testmod() # pragma: no cover