Source code for lantz.driver

# -*- coding: utf-8 -*-
"""
    lantz.driver
    ~~~~~~~~~~~~

    Implements the Driver base class.

    :copyright: 2015 by Lantz Authors, see AUTHORS for more details.
    :license: BSD, see LICENSE for more details.
"""
import copy
import atexit
import logging
import threading
from functools import wraps
from concurrent import futures
from collections import defaultdict

from .utils.qt import MetaQObject, SuperQObject, QtCore
from .feat import Feat, DictFeat, MISSING, FeatProxy
from .action import Action, ActionProxy
from .stats import RunningStats
from .log import get_logger

logger = get_logger('lantz.driver', False)

def _merge_dicts(*args):
    """ _merge_dicts(dict1, [dict2 [...]]) -> dict1.update(dict2);dict1.update ...

    Merge argument dictionaries into the first treating None as an empty
    dictionary.
    """

    args = [arg for arg in args if arg]

    if not args:
        return {}

    out = copy.copy(args[0])
    for arg in args[1:]:
        out.update(arg)

    return out


class MetaSelf(type):
    """Metaclass for Self object
    """

    def __getattr__(self, item):
        return Self(item)


class Self(metaclass=MetaSelf):
    """Self objects are used in during Driver class declarations
    to refer to the object that is going to be instantiated.

    >>> Self.units('s')
    <Self.units('ms')>
    """

    def __init__(self, item, default=MISSING):
        self.item = item
        self.default = default

    def __get__(self, instance, owner=None):
        return getattr(instance, self.item)

    def __call__(self, default_value):
        self.default = default_value
        return self

    def __repr__(self):
        return "<Self.{}('{}')>".format(self.item, self.default)


class Proxy(object):
    """Read only dictionary that maps feat name to Proxy objects
    """

    def __init__(self, instance, collection, callable):
        self.instance = instance
        self.collection = collection
        self.callable = callable

    def __contains__(self, item):
        return item in self.collection

    def __getattr__(self, item):
        return self.callable(self.instance, self.collection[item])

    def __getitem__(self, item):
        return self.callable(self.instance, self.collection[item])

    def items(self):
        for key, value in self.collection.items():
            yield key, self.callable(self.instance, value)

    def keys(self):
        for key in self.collection.keys():
            yield key


def repartial(func, *parameters, **kparms):
    """Well behaved partial for bound methods.
    """
    @wraps(func)
    def wrapped(self, *args, **kw):
        kw.update(kparms)
        return func(self, *(args + parameters), **kw)
    return wrapped


def repartial_submit(fname):
    """Used to create an async bound method in Driver.
    """
    def wrapped(self, *args, **kwargs):
        return self._submit(getattr(self, fname), *args, **kwargs)
    return wrapped


class _DriverType(MetaQObject):
    """Base metaclass for all drivers.
    """

    def __new__(cls, classname, bases, class_dict):


        # Qt Signals need to be added to the class before it is created.
        # We loop through all members of the class and add a changed event
        # for each Feat/DictFeat.

        # We do the same thing for all base classes which are not derived from
        # Driver (checking for the attribute _lantz_features) to enable base clases
        # for drivers that do not derive from Driver.

        signals = {}

        for d in [class_dict] + [b.__dict__ for b in bases
                                 if not hasattr(b, '_lantz_features')]:
            for feat_name, feat in d.items():
                if isinstance(feat, DictFeat):
                    # The signature is new value, old value, dictionary of other stuff such as keys
                    signals[feat_name + '_changed'] = QtCore.Signal(object, object, dict)
                else:
                    # The signature is new value, old value
                    signals[feat_name + '_changed'] = QtCore.Signal(object, object)

        class_dict.update(signals)

        return super().__new__(cls, classname, bases, class_dict)

    def __init__(self, classname, bases, class_dict):
        super().__init__(classname, bases, class_dict)

        feats = {}
        actions = {}

        # We add Feats and Actions to the corresponding dictionaries.

        for d in [class_dict] + [b.__dict__ for b in bases
                                 if not hasattr(b, '_lantz_features')]:
            for key, value in d.items():
                if isinstance(value, (Feat, DictFeat)):
                    value.name = key
                    feats[key] = value
                elif isinstance(value, Action):
                    value.rebuild()
                    actions[key] = value

        # We create async versions of each Action if it does not exists.

        for key, action in actions.items():
            if not hasattr(self, key + '_async'):
                async_action = repartial_submit(key)
                async_action.__doc__ = '(Async) ' + action.__doc__ if action.__doc__ else ''
                setattr(self, key + '_async', async_action)

        # We update the feat an actions dictionaries with the ones
        # from the base clases

        for base in bases:
            for key, value in getattr(base, '_lantz_features', {}).items():
                if isinstance(value, (Feat, DictFeat)) and key not in feats:
                    feats[key] = value
            for key, value in getattr(base, '_lantz_actions', {}).items():
                if isinstance(value, Action) and key not in actions:
                    actions[key] = value

        self._lantz_features = feats
        self._lantz_actions = actions


_REGISTERED = defaultdict(int)

def _set(inst, feat_name, feat_attr):
    def _inner(value, *args):
        proxy = inst.feats[feat_name]
        setattr(proxy, feat_attr, value)
    return _inner

def _raise_must_change(dependent, feat_name, operation):
    def _inner(value):
        raise Exception("You must get or set '{}' before trying to {} '{}'".format(dependent, operation, feat_name))
    return _inner


[docs]class Driver(SuperQObject, metaclass=_DriverType): """Base class for all drivers. :params name: easy to remember identifier given to the instance for logging purposes """ _lantz_features = {} _lantz_actions = {} __name = '' def __new__(cls, *args, **kwargs): inst = SuperQObject.__new__(cls) name = kwargs.pop('name', None) inst._executor = None inst._lock = threading.RLock() inst.__unfinished_tasks = 0 inst.timing = RunningStats() if hasattr(inst, 'name') and inst.name: pass elif name: inst.name = name else: inst.name = '{}{:d}'.format(cls.__name__, _REGISTERED[cls.__name__]) _REGISTERED[cls.__name__] += 1 inst.log_extra = {'lantz_driver': cls.__name__, 'lantz_name': inst.name} for feat_name, feat in cls._lantz_features.items(): for attr_name, attr_value in feat.modifiers[MISSING][MISSING].items(): if not isinstance(attr_value, Self): continue getattr(inst, attr_value.item + '_changed').connect(_set(inst, feat_name, attr_name)) if attr_value.default is MISSING: feat.get_processors[MISSING][MISSING] = (_raise_must_change(attr_value.item, feat_name, 'get'), ) feat.set_processors[MISSING][MISSING] = (_raise_must_change(attr_value.item, feat_name, 'set'), ) else: feat.modifiers[MISSING][MISSING][attr_name] = attr_value.default feat.rebuild(build_doc=False, store=True) inst.log_info('Created ' + inst.name) return inst @property def name(self): return self.__name @name.setter def name(self, value): self.__name = value def __submit_by_name(self, fname, *args, **kwargs): return self._submit(getattr(self, fname), *args, **kwargs) def _first_submit(self, fn, *args, **kwargs): self._executor = futures.ThreadPoolExecutor(max_workers=1) self._submit = self._notfirst_submit return self._notfirst_submit(fn, *args, **kwargs) def _notfirst_submit(self, fn, *args, **kwargs): self.__unfinished_tasks += 1 fut = self._executor.submit(fn, *args, **kwargs) fut.add_done_callback(self._decrease_unfinished_tasks) return fut _submit = _first_submit def _decrease_unfinished_tasks(self, *args): self.__unfinished_tasks -= 1 unfinished_tasks = property(lambda self: self.__unfinished_tasks)
[docs] def log(self, level, msg, *args, **kwargs): """Log with the integer severity 'level' on the logger corresponding to this instrument. :param level: severity level for this event. :param msg: message to be logged (can contain PEP3101 formatting codes) """ if kwargs: kwargs.update(self.log_extra) logger.log(level, msg, *args, extra=kwargs) else: logger.log(level, msg, *args, extra=self.log_extra)
[docs] def log_info(self, msg, *args, **kwargs): """Log with the severity 'INFO' on the logger corresponding to this instrument. :param msg: message to be logged (can contain PEP3101 formatting codes) """ self.log(logging.INFO, msg, *args, **kwargs)
[docs] def log_debug(self, msg, *args, **kwargs): """Log with the severity 'DEBUG' on the logger corresponding to this instrument. :param msg: message to be logged (can contain PEP3101 formatting codes) """ self.log(logging.DEBUG, msg, *args, **kwargs)
[docs] def log_error(self, msg, *args, **kwargs): """Log with the severity 'ERROR' on the logger corresponding to this instrument. :param msg: message to be logged (can contain PEP3101 formatting codes) """ self.log(logging.ERROR, msg, *args, **kwargs)
[docs] def log_warning(self, msg, *args, **kwargs): """Log with the severity 'WARNING' on the logger corresponding to this instrument. :param msg: message to be logged (can contain PEP3101 formatting codes) """ self.log(logging.WARNING, msg, *args, **kwargs)
[docs] def log_critical(self, msg, *args, **kwargs): """Log with the severity 'CRITICAL' on the logger corresponding to this instrument. :param msg: message to be logged (can contain PEP3101 formatting codes) """ self.log(logging.CRITICAL, msg, *args, **kwargs)
def __str__(self): classname = self.__class__.__name__ return "{} {}".format(classname, self.name) def __repr__(self): classname = self.__class__.__name__ return "<{}('{}')>".format(classname, self.name) def __enter__(self): self.initialize() return self def __exit__(self, *args): self.finalize() @Action() def initialize(self): pass @Action() def finalize(self): pass
[docs] def update(self, newstate=None, *, force=False, **kwargs): """Update driver. :param newstate: a dictionary containing the new driver state. :type newstate: dict. :param force: apply change even when the cache says it is not necessary. :param force: boolean. :raises: ValueError if called with an empty dictionary. """ newstate = _merge_dicts(newstate, kwargs) if not newstate: raise ValueError("update() called with an empty dictionary") for key, value in newstate.items(): self._lantz_features[key].set(self, value, force)
[docs] def update_async(self, newstate=None, *, force=False, callback=None, **kwargs): """Asynchronous update driver. :param newstate: driver state. :type newstate: dict. :param force: apply change even when the cache says it is not necessary. :type force: boolean. :param callback: Called when the update finishes. :type callback: callable. :return type: concurrent.future :raises: ValueError if called with an empty dictionary. """ newstate = _merge_dicts(newstate, kwargs) if not newstate: raise ValueError("update() called with an empty dictionary") fut = self._submit(self.update, newstate, force=force) if not callback is None: fut.add_done_callback(callback) return fut
[docs] def refresh(self, keys=None): """Refresh cache by reading values from the instrument. :param keys: a string or list of strings with the properties to refresh. Default None, meaning all properties. If keys is a string, returns the value. If keys is a list/tuple, returns a tuple. If keys is a dict, returns a dict. :type keys: str or list or tuple or dict """ if keys: if isinstance(keys, (list, tuple)): return tuple(getattr(self, key) for key in keys) elif isinstance(keys, dict): return {key: getattr(self, key) for key in keys.keys()} elif isinstance(keys, str): return getattr(self, keys) else: raise ValueError('keys must be a (str, list, tuple or dict)') return {key: getattr(self, key) for key in self._lantz_features}
[docs] def refresh_async(self, keys=None, *, callback=None): """Asynchronous refresh cache by reading values from the instrument. :param keys: a string or list of strings with the properties to refresh Default None, meaning all properties. If keys is a string, returns the value. If keys is a list, returns a dictionary. :type keys: str or list or tuple or dict :return type: concurrent.future. """ fut = self._submit(self.refresh, keys=keys) if not callback is None: fut.add_done_callback(callback) return fut
[docs] def recall(self, keys=None): """Return the last value seen for a feat or a collection of feats. :param keys: a string or list of strings with the properties to refresh. Default None all properties. If keys is a string, returns the value. If keys is a list, returns a dictionary. :type keys: str, list, tuple, dict. """ if keys: if isinstance(keys, (list, tuple, set)): return {key: self._lantz_features[key].get_cache(self) for key in keys} return self._lantz_features[keys].get_cache(self) return {key: value.get_cache(self) for key, value in self._lantz_features.keys()}
@property def feats(self): return Proxy(self, self._lantz_features, FeatProxy) @property def actions(self): return Proxy(self, self._lantz_actions, ActionProxy)
def _solve_dependencies(dependencies, all_members=None): """Solve a dependency graph. :param dependencies: dependency dictionary. For each key, the value is an iterable indicating its dependencies. :param all_members: If provided :return: list of sets, each containing independent task only dependent of the previous set in the list. """ d = dict((key, set(value)) for key, value in dependencies.items()) if all_members: d.update({key: set() for key in all_members if key not in d}) r = [] while d: # values not in keys (items without dep) t = set(i for v in d.values() for i in v) - set(d.keys()) # and keys without value (items without dep) t.update(k for k, v in d.items() if not v) # can be done right away r.append(t) # and cleaned up d = dict(((k, v - t) for k, v in d.items() if v)) return r def initialize_many(drivers, register_finalizer=True, on_initializing=None, on_initialized=None, on_exception=None, concurrent=False, dependencies=None): """Initialize a group of drivers. :param drivers: an iterable of drivers. :param register_finalizer: register driver.finalize method to be called at python exit. :param on_initializing: a callable to be executed BEFORE initialization. It takes the driver as the first argument. :param on_initialized: a callable to be executed AFTER initialization. It takes the driver as the first argument. :param on_exception: a callable to be executed in case an exception occurs. It takes the offending driver as the first argument and the exception as the second one. :param concurrent: indicates that drivers with satisfied dependencies should be initialized concurrently. :param dependencies: indicates which drivers depend on others to be initialized. each key is a driver name, and the corresponding value is an iterable with its dependencies. """ if dependencies: names = {driver.name: driver for driver in drivers} groups = _solve_dependencies(dependencies, set(names.keys())) drivers = tuple(tuple(names[name] for name in group) for group in groups) for subset in drivers: initialize_many(subset, register_finalizer, on_initializing, on_initialized, on_exception, concurrent) return if concurrent: def _finalize(d): def _inner_finalize(f): atexit.register(d.finalize) return _inner_finalize def _done(d): def _inner(f): ex = f.exception() if ex: if not on_exception: raise ex on_exception(d, ex) else: if on_initialized: on_initialized(d) return _inner futs = [] for driver in drivers: if on_initializing: on_initializing(driver) fut = driver.initialize_async() if register_finalizer: fut.add_done_callback(_finalize(driver)) fut.add_done_callback(_done(driver)) futs.append(fut) futures.wait(futs) else: for driver in drivers: if on_initializing: on_initializing(driver) try: driver.initialize() except Exception as ex: if not on_exception: raise ex on_exception(driver, ex) else: if on_initialized: on_initialized(driver) if register_finalizer: atexit.register(driver.finalize) def finalize_many(drivers, on_finalizing=None, on_finalized=None, on_exception=None, concurrent=False, dependencies=None): """Finalize a group of drivers. :param drivers: an iterable of drivers. :param on_finalizing: a callable to be executed BEFORE finalization. It takes the driver as the first argument. :param on_finalized: a callable to be executed AFTER finalization. It takes the driver as the first argument. :param on_exception: a callable to be executed in case an exception occurs. It takes the offending driver as the first argument and the exception as the second one. :param concurrent: indicates that drivers with satisfied dependencies are finalized concurrently. :param dependencies: indicates which drivers depend on others to be initialized. each key is a driver name, and the corresponding value is an iterable with its dependencies. The dependencies are used in reverse. """ if dependencies: names = {driver.name: driver for driver in drivers} groups = _solve_dependencies(dependencies, set(names.keys())) drivers = tuple(tuple(names[name] for name in group) for group in groups) for subset in reversed(drivers): finalize_many(subset, on_finalizing, on_finalized, on_exception, concurrent) return if concurrent: def _done(d): def _inner(f): ex = f.exception() if ex: if not on_exception: raise ex on_exception(d, ex) else: if on_finalized: on_finalized(d) return _inner futs = [] for driver in drivers: if on_finalizing: on_finalizing(driver) fut = driver.finalize_async() fut.add_done_callback(_done(driver)) futs.append(fut) futures.wait(futs) else: for driver in drivers: if on_finalizing: on_finalizing(driver) try: driver.finalize() except Exception as ex: if not on_exception: raise ex on_exception(driver, ex) else: if on_finalized: on_finalized(driver)