-
Notifications
You must be signed in to change notification settings - Fork 129
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #59 from CoffeaTeam/processor
Introduce some processor framework ideas
- Loading branch information
Showing
13 changed files
with
466 additions
and
26 deletions.
There are no files selected for viewing
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
from .processor import ProcessorABC | ||
from .dataframe import ( | ||
LazyDataFrame, | ||
PreloadedDataFrame, | ||
) | ||
from .helpers import Weights, PackedSelection | ||
from .executor import ( | ||
iterative_executor, | ||
futures_executor, | ||
condor_executor, | ||
) | ||
from .accumulator import ( | ||
accumulator, | ||
set_accumulator, | ||
dict_accumulator, | ||
defaultdict_accumulator, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
from six import with_metaclass | ||
from abc import ABCMeta, abstractmethod | ||
import collections | ||
|
||
try: | ||
from collections.abc import Set | ||
except ImportError: | ||
from collections import Set | ||
|
||
|
||
class AccumulatorABC(with_metaclass(ABCMeta)): | ||
''' | ||
ABC for an accumulator. Derived must implement: | ||
identity: returns a new object of same type as self, | ||
such that self + self.identity() == self | ||
add(other): adds an object of same type as self to self | ||
Concrete implementations are provided for __add__, __iadd__ | ||
''' | ||
@abstractmethod | ||
def identity(self): | ||
pass | ||
|
||
@abstractmethod | ||
def add(self, other): | ||
pass | ||
|
||
def __add__(self, other): | ||
ret = self.identity() | ||
ret.add(self) | ||
ret.add(other) | ||
return ret | ||
|
||
def __iadd__(self, other): | ||
self.add(other) | ||
return self | ||
|
||
|
||
class accumulator(AccumulatorABC): | ||
''' | ||
Holds a value, of type and identity as provided to initializer | ||
''' | ||
def __init__(self, identity): | ||
self.value = identity | ||
self._identity = identity | ||
|
||
def identity(self): | ||
return accumulator(self._identity) | ||
|
||
def add(self, other): | ||
if isinstance(other, AccumulatorABC): | ||
self.value += other.value | ||
else: | ||
self.value += other | ||
|
||
|
||
class set_accumulator(set, AccumulatorABC): | ||
''' | ||
A set with accumulator semantics | ||
''' | ||
def identity(self): | ||
return set_accumulator() | ||
|
||
def add(self, other): | ||
if isinstance(other, Set): | ||
set.update(self, other) | ||
else: | ||
set.add(self, other) | ||
|
||
|
||
class dict_accumulator(dict, AccumulatorABC): | ||
''' | ||
Like a dict but also has accumulator semantics | ||
It is assumed that the contents of the dict have accumulator semantics | ||
''' | ||
def identity(self): | ||
ret = dict_accumulator() | ||
for key, value in self.items(): | ||
ret[key] = value.identity() | ||
return ret | ||
|
||
def add(self, other): | ||
if isinstance(other, dict_accumulator): | ||
for key, value in other.items(): | ||
if key not in self: | ||
self[key] = value.identity() | ||
self[key] += value | ||
else: | ||
raise ValueError | ||
|
||
|
||
class defaultdict_accumulator(collections.defaultdict, AccumulatorABC): | ||
''' | ||
Like a defaultdict but also has accumulator semantics | ||
It is assumed that the contents of the dict have accumulator semantics | ||
''' | ||
def identity(self): | ||
return defaultdict_accumulator(self.default_factory) | ||
|
||
def add(self, other): | ||
for key, value in other.items(): | ||
self[key] += value |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
import warnings | ||
from ..util import awkward | ||
|
||
try: | ||
from collections.abc import MutableMapping | ||
except ImportError: | ||
from collections import MutableMapping | ||
|
||
|
||
class LazyDataFrame(MutableMapping): | ||
""" | ||
Simple delayed uproot reader (a la lazyarrays) | ||
Keeps track of values accessed, for later parsing. | ||
""" | ||
def __init__(self, tree, stride=None, index=None, preload_items=None): | ||
self._tree = tree | ||
self._branchargs = {'awkwardlib': awkward} | ||
self._stride = None | ||
if (stride is not None) and (index is not None): | ||
self._stride = stride | ||
self._branchargs['entrystart'] = index*stride | ||
self._branchargs['entrystop'] = min(self._tree.numentries, (index+1)*stride) | ||
self._dict = {} | ||
self._materialized = set() | ||
if preload_items: | ||
self.preload(preload_items) | ||
|
||
def __delitem__(self, key): | ||
del self._dict[key] | ||
|
||
def __getitem__(self, key): | ||
if key in self._dict: | ||
return self._dict[key] | ||
elif key in self._tree: | ||
self._materialized.add(key) | ||
self._dict[key] = self._tree[key].array(**self._branchargs) | ||
return self._dict[key] | ||
else: | ||
raise KeyError(key) | ||
|
||
def __iter__(self): | ||
warnings.warning("An iterator has requested to read all branches from the tree", RuntimeWarning) | ||
for item in self._dict: | ||
self._materialized.add(item[0]) | ||
yield item | ||
|
||
def __len__(self): | ||
return len(self._dict) | ||
|
||
def __setitem__(self, key, value): | ||
self._dict[key] = value | ||
|
||
@property | ||
def available(self): | ||
return self._tree.keys() | ||
|
||
@property | ||
def materialized(self): | ||
return self._materialized | ||
|
||
@property | ||
def size(self): | ||
if self._stride is None: | ||
return self._tree.numentries | ||
return (self._branchargs['entrystop'] - self._branchargs['entrystart']) | ||
|
||
def preload(self, columns): | ||
for name in columns: | ||
if name in self._tree: | ||
_ = self[name] | ||
|
||
|
||
class PreloadedDataFrame(MutableMapping): | ||
""" | ||
For instances like spark where the columns are preloaded | ||
Require input number of rows (don't want to implicitly rely on picking a random item) | ||
Still keep track of what was accessed in case it is of use | ||
""" | ||
def __init__(self, size, items): | ||
self._size = size | ||
self._dict = items | ||
self._accessed = set() | ||
|
||
def __delitem__(self, key): | ||
del self._dict[key] | ||
|
||
def __getitem__(self, key): | ||
self._accessed.add(key) | ||
return self._dict[key] | ||
|
||
def __iter__(self): | ||
for key in self._dict: | ||
self._accessed.add(key) | ||
yield key | ||
|
||
def __len__(self): | ||
return len(self._dict) | ||
|
||
def __setitem__(self, key, value): | ||
self._dict[key] = value | ||
|
||
@property | ||
def available(self): | ||
return self._tree.keys() | ||
|
||
@property | ||
def materialized(self): | ||
return self._accessed | ||
|
||
@property | ||
def size(self): | ||
return self._size |
Oops, something went wrong.