# -*- coding: utf-8 -*-
"""
lantz.processors
~~~~~~~~~~~~~~~~
:copyright: 2015 by Lantz Authors, see AUTHORS for more details.
:license: BSD, see LICENSE for more details.
"""
import warnings
from . import Q_
from .log import LOGGER as _LOG
from stringparser import Parser
class DimensionalityWarning(Warning):
pass
def _do_nothing(value):
return value
def _getitem(a, b):
"""Return a[b] or if not found a[type(b)]
"""
try:
return a[b]
except KeyError:
return a[type(b)]
getitem = _getitem
[docs]def convert_to(units, on_dimensionless='warn', on_incompatible='raise',
return_float=False):
"""Return a function that convert a Quantity to to another units.
:param units: string or Quantity specifying the target units
:param on_dimensionless: how to proceed when a dimensionless number
number is given.
'raise' to raise an exception,
'warn' to log a warning and proceed,
'ignore' to silently proceed
:param on_incompatible: how to proceed when source and target units are
incompatible. Same options as `on_dimensionless`
:raises: :class:`ValueError` if the incoming value cannot be
properly converted
>>> convert_to('mV')(Q_(1, 'V'))
<Quantity(1000.0, 'millivolt')>
>>> convert_to('mV', return_float=True)(Q_(1, 'V'))
1000.0
"""
if on_dimensionless not in ('ignore', 'warn', 'raise'):
raise ValueError("{} is not a valid value for 'on_dimensionless'. "
"It should be either 'ignore', 'warn' or 'raise'".format(on_dimensionless))
if on_incompatible not in ('ignore', 'warn', 'raise'):
raise ValueError("{} is not a valid value for 'on_incompatible'. "
"It should be either 'ignore', 'warn' or 'raise'".format(on_dimensionless))
if isinstance(units, str):
units = Q_(1, units)
elif not isinstance(units, Q_):
raise ValueError("{} is not a valid value for 'units'. "
"It should be either str or Quantity")
if return_float:
def _inner(value):
if isinstance(value, Q_):
try:
return value.to(units).magnitude
except ValueError as e:
if on_incompatible == 'raise':
raise ValueError(e)
elif on_incompatible == 'warn':
msg = 'Unable to convert {} to {}. Ignoring source units.'.format(value, units)
warnings.warn(msg, DimensionalityWarning)
_LOG.warn(msg)
# on_incompatible == 'ignore'
return value.magnitude
else:
if not units.dimensionless:
if on_dimensionless == 'raise':
raise ValueError('Unable to convert {} to {}'.format(value, units))
elif on_dimensionless == 'warn':
msg = 'Assuming units `{1.units}` for {0}'.format(value, units)
warnings.warn(msg, DimensionalityWarning)
_LOG.warn(msg)
# on_incompatible == 'ignore'
return float(value)
return _inner
else:
def _inner(value):
if isinstance(value, Q_):
try:
return value.to(units)
except ValueError as e:
if on_incompatible == 'raise':
raise ValueError(e)
elif on_incompatible == 'warn':
msg = 'Assuming units `{1.units}` for {0}'.format(value, units)
warnings.warn(msg, DimensionalityWarning)
_LOG.warn(msg)
# on_incompatible == 'ignore'
return float(value.magnitude) * units
else:
if not units.dimensionless:
if on_dimensionless == 'raise':
raise ValueError('Unable to convert {} to {}'.format(value, units))
elif on_dimensionless == 'warn':
msg = 'Assuming units `{1.units}` for {0}'.format(value, units)
warnings.warn(msg, DimensionalityWarning)
_LOG.warn(msg)
# on_incompatible == 'ignore'
return float(value) * units
return _inner
[docs]class Processor(object):
"""Processor to convert the function parameters.
A `callable` argument will be used to convert the corresponding
function argument.
For example, here `x` will be converted to float, before entering
the function body::
>>> conv = Processor(float)
>>> conv
<class 'float'>
>>> conv('10')
10.0
The processor supports multiple argument conversion in a tuple::
>>> conv = Processor((float, str))
>>> type(conv)
<class 'lantz.processors.Processor'>
>>> conv(('10', 10))
(10.0, '10')
"""
def __new__(cls, processors):
if isinstance(processors, (tuple, list)):
if len(processors) > 1:
inst = super().__new__(cls)
inst.processors = tuple(cls._to_callable(processor)
for processor in processors)
return inst
else:
return cls._to_callable(processors[0])
else:
return cls._to_callable(processors)
def __call__(self, values):
return tuple(processor(value)
for processor, value in zip(self.processors, values))
@classmethod
def _to_callable(cls, obj):
if callable(obj):
return obj
if obj is None:
return _do_nothing
return cls.to_callable(obj)
@classmethod
def to_callable(cls, obj):
raise TypeError('Preprocessor argument must callable, not {}'.format(obj))
def __len__(self):
if isinstance(self.processors, tuple):
return len(self.processors)
return 1
[docs]class FromQuantityProcessor(Processor):
"""Processor to convert the units the function arguments.
The syntax is equal to `Processor` except that strings are interpreted
as units.
>>> conv = FromQuantityProcessor('ms')
>>> conv(Q_(1, 's'))
1000.0
"""
@classmethod
def to_callable(cls, obj):
if isinstance(obj, (str, Q_)):
return convert_to(obj, return_float=True)
raise TypeError('FromQuantityProcessor argument must be a string '
' or a callable, not {}'.format(obj))
[docs]class ToQuantityProcessor(Processor):
"""Decorator to convert the units the function arguments.
The syntax is equal to `Processor` except that strings are interpreted
as units.
>>> conv = ToQuantityProcessor('ms')
>>> conv(Q_(1, 's'))
<Quantity(1000.0, 'millisecond')>
>>> conv(1)
<Quantity(1.0, 'millisecond')>
"""
@classmethod
def to_callable(cls, obj):
if isinstance(obj, (str, Q_)):
return convert_to(obj, on_dimensionless='ignore')
raise TypeError('ToQuantityProcessor argument must be a string '
' or a callable, not {}'.format(obj))
[docs]class ParseProcessor(Processor):
"""Processor to convert/parse the function parameters.
The syntax is equal to `Processor` except that strings are interpreted
as a :class:Parser expression.
>>> conv = ParseProcessor('spam {:s} eggs')
>>> conv('spam ham eggs')
'ham'
>>> conv = ParseProcessor(('hi {:d}', 'bye {:s}'))
>>> conv(('hi 42', 'bye Brian'))
(42, 'Brian')
"""
@classmethod
def to_callable(cls, obj):
if isinstance(obj, str):
return Parser(obj)
raise TypeError('parse_params argument must be a string or a callable, '
'not {}'.format(obj))
[docs]class MapProcessor(Processor):
"""Processor to map the function parameter values.
The syntax is equal to `Processor` except that a dict is used as
mapping table.
Examples::
>>> conv = MapProcessor({True: 42})
>>> conv(True)
42
"""
@classmethod
def to_callable(cls, obj):
if isinstance(obj, dict):
return get_mapping(obj)
if isinstance(obj, set):
return check_membership(obj)
raise TypeError('MapProcessor argument must be a dict or a callable, '
'not {}'.format(obj))
[docs]class ReverseMapProcessor(Processor):
"""Processor to map the function parameter values.
The syntax is equal to `Processor` except that a dict is used as
mapping table.
Examples::
>>> conv = ReverseMapProcessor({True: 42})
>>> conv(42)
True
"""
#: Shared cache of reversed dictionaries indexed by the id()
__reversed_cache = {}
@classmethod
def to_callable(cls, obj):
if isinstance(obj, dict):
obj = cls.__reversed_cache.setdefault(id(obj),
{value: key for key, value
in obj.items()})
return get_mapping(obj)
if isinstance(obj, set):
return check_membership(obj)
raise TypeError('ReverseMapProcessor argument must be a dict or a callable, '
'not {}'.format(obj))
[docs]class RangeProcessor(Processor):
"""Processor to convert the units the function arguments.
The syntax is equal to `Processor` except that iterables are interpreted
as (low, high, step) specified ranges. Step is optional and max is included
>>> conv = RangeProcessor(((1, 2, .5), ))
>>> conv(1.7)
1.5
"""
@classmethod
def to_callable(cls, obj):
if not isinstance(obj, (list, tuple)):
raise TypeError('RangeProcessor argument must be a tuple/list '
'or a callable, not {}'.format(obj))
if not len(obj) in (1, 2, 3):
raise TypeError('RangeProcessor argument must be a tuple/list '
'with 1, 2 or 3 elements ([low,] high[, step]) '
'not {}'.format(len(obj)))
if len(obj) == 1:
return check_range_and_coerce_step(0, *obj)
return check_range_and_coerce_step(*obj)
[docs]def check_range_and_coerce_step(low, high, step=None):
"""
:param low:
:param high:
:param step:
:return:
>>> checker = check_range_and_coerce_step(1, 10)
>>> checker(1), checker(5), checker(10)
(1, 5, 10)
>>> checker(11)
Traceback (most recent call last):
...
ValueError: 11 not in range (1, 10)
>>> checker = check_range_and_coerce_step(1, 10, 1)
>>> checker(1), checker(5.4), checker(10)
(1, 5, 10)
"""
def _inner(value):
if not (low <= value <= high):
raise ValueError('{} not in range ({}, {})'.format(value, low, high))
if step:
value = round((value - low) / step) * step + low
return value
return _inner
[docs]def check_membership(container):
"""
:param container:
:return:
>>> checker = check_membership((1, 2, 3))
>>> checker(1)
1
>>> checker(0)
Traceback (most recent call last):
...
ValueError: 0 not in (1, 2, 3)
"""
def _inner(value):
if value not in container:
raise ValueError('{!r} not in {}'.format(value, container))
return value
return _inner
[docs]def get_mapping(container):
"""
>>> getter = get_mapping({'A': 42, 'B': 43})
>>> getter('A')
42
>>> getter(0)
Traceback (most recent call last):
...
ValueError: 0 not in ('A', 'B')
"""
def _inner(key):
if key not in container:
raise ValueError("{!r} not in {}".format(key, tuple(container.keys())))
return container[key]
return _inner