Source code for lantz.foreign

# -*- coding: utf-8 -*-
"""
    lantz.foreign
    ~~~~~~~~~~~~~

    Implements classes and methods to interface to foreign functions.

    :copyright: 2015 by Lantz Authors, see AUTHORS for more details.
    :license: BSD, see LICENSE for more details.
"""

import os
import ctypes
import inspect
from ctypes.util import find_library
from itertools import chain

from lantz import Driver


class Wrapper(object):

    def __init__(self, name, wrapped, wrapper):
        self.name = name
        self.wrapped = wrapped
        self.wrapper = wrapper

    def __call__(self, *args):
        return self.wrapper(self.name, self.wrapped, *args)

    @property
    def argtypes(self):
        return self.wrapped.argtypes

    @argtypes.setter
    def argtypes(self, value):
        self.wrapped.argtypes = value

    @property
    def restype(self):
        return self.wrapped.restype

    @restype.setter
    def restype(self, value):
        self.wrapped.restype = value


[docs]class Library(object): """Library wrapper :param library: ctypes library :param wrapper: callable that takes two arguments the name of the function and the function itself. It should return a callable. """ def __init__(self, library, prefix='', wrapper=None): if isinstance(library, str): self.library_name = library if os.name == 'nt': library = ctypes.WinDLL(library) else: library = ctypes.CDLL(library) self.wrapper = wrapper self.prefix = prefix self.internal = library def __get_func(self, name): if self.prefix: try: return getattr(self.internal, self.prefix + name) except: pass try: return getattr(self.internal, name) except Exception: if self.prefix: raise AttributeError('Could not find ({}){} in {}'.format(self.prefix, name, self.internal)) raise AttributeError('Could not find {} in {}'.format(name, self.internal)) def __getattr__(self, name): if name.startswith('__') and name.endswith('__'): raise AttributeError(name) func = self.__get_func(name) if self.wrapper: func = Wrapper(name, func, self.wrapper) setattr(self, name, func) return func
TYPES = {'c': ctypes.c_char, 'b': ctypes.c_byte, 'B': ctypes.c_ubyte, '?': ctypes.c_bool, 'h': ctypes.c_short, 'H': ctypes.c_ushort, 'i': ctypes.c_int, 'I': ctypes.c_uint, 'l': ctypes.c_long, 'L': ctypes.c_ulong, 'q': ctypes.c_longlong, 'Q': ctypes.c_ulonglong, 'f': ctypes.c_float, 'd': ctypes.c_double, 'u32': ctypes.c_uint32, 'i32': ctypes.c_int32, 'f32': ctypes.c_float, 'f64': ctypes.c_double} class RetStr(object): def __init__(self, length, encoding='ascii'): self.length = length self.buffer = ctypes.create_string_buffer(b'', length) self.encoding = encoding def __iter__(self): yield self yield self.length @property def value(self): if self.encoding: return self.buffer.value.decode(self.encoding) else: return self.buffer.value class RetValue(object): def __init__(self, type): try: self.buffer = (TYPES[type] * 1)() except KeyError: raise KeyError('The type {} is not defined ()'.format(type, TYPES.keys())) def __iter__(self): yield self @property def value(self): return self.buffer[0] class RetTuple(object): def __init__(self, type, length=1): try: self.buffer = (TYPES[type] * length)() except KeyError: raise KeyError('The type {} is not defined ()'.format(type, TYPES.keys())) self.length = length def __iter__(self): yield self yield self.length @property def value(self): return tuple(self.buffer[:])
[docs]class LibraryDriver(Driver): """Base class for drivers that communicate with instruments calling a library (dll or others) To use this class you must override LIBRARY_NAME """ #: Name of the library LIBRARY_NAME = '' LIBRARY_PREFIX = '' def __init__(self, *args, **kwargs): library_name = kwargs.pop('library_name', None) super().__init__(*args, **kwargs) folder = os.path.dirname(inspect.getfile(self.__class__)) for name in chain(iter_lib(library_name, folder), iter_lib(self.LIBRARY_NAME, folder)): if name is None: continue self.log_debug('Trying to open library: {}'.format(name)) try: self.lib = Library(name, self.LIBRARY_PREFIX, self._wrapper) break except OSError: pass else: raise OSError('While instantiating {}: library not found'.format(self.__class__.__name__)) self.log_info('LibraryDriver created with {}', name) self._add_types() def _add_types(self): pass def _return_handler(self, func_name, ret_value): return ret_value def _preprocess_args(self, name, *args): new_args = [] collect = [] for arg in args: if isinstance(arg, (RetStr, RetTuple, RetValue)): collect.append(arg) new_args.append(arg.buffer) elif isinstance(arg, str): new_args.append(bytes(arg, 'ascii')) else: new_args.append(arg) return new_args, collect def _wrapper(self, name, func, *args): new_args, collect = self._preprocess_args(name, *args) try: ret = func(*new_args) except Exception as e: raise Exception('While calling {} with {} (was {}): {}'.format(name, new_args, args, e)) ret = self._return_handler(name, ret) return self._postprocess(name, ret, collect) def _postprocess(self, name, ret, collect): if collect: values = [item.value for item in collect] values.insert(0, ret) self.log_debug('Function call {} returned {}. Collected: {}', name, ret, collect) return tuple(values) self.log_debug('Function call {} returned {}.', name, ret) return ret
def iter_lib(library_name, folder=''): if not library_name: raise StopIteration if isinstance(library_name, str): if folder: yield os.path.join(folder, library_name) yield library_name yield find_library(library_name.split('.')[0]) else: for name in library_name: if folder: yield os.path.join(folder, name) for name in library_name: yield name yield find_library(name.split('.')[0])