# -*- coding: utf-8 -*-
"""
lantz.messagebased
~~~~~~~~~~~~~~~~~~
Implementes base class for message based drivers using PyVISA under the hood.
:copyright: 2015 by Lantz Authors, see AUTHORS for more details.
:license: BSD, see LICENSE for more details.
"""
from collections import ChainMap
import types
import visa
from .errors import NotSupportedError
from .driver import Driver
from .log import LOGGER
from .processors import ParseProcessor
#: Cache of parsing functions.
#: :type: dict[str, ParseProcessor]
_PARSERS_CACHE = {}
#: PyVISA Resource Manager used in Lantz
#: :type: visa.ResourceManager
_resource_manager = None
[docs]def get_resource_manager():
"""Return the PyVISA Resource Manager, creating an instance if necessary.
:rtype: visa.ResourceManager
"""
global _resource_manager
if _resource_manager is None:
_resource_manager = visa.ResourceManager()
return _resource_manager
[docs]class MessageBasedDriver(Driver):
"""Base class for message based drivers using PyVISA as underlying library.
Notice that PyVISA can communicate using different backends. For example:
- @ni: Using NI-VISA for communication. Backend bundled with PyVISA.
- @py: Using PySerial, PyUSB and linux-gpib for communication. Available with PyVISA-py package.
- @sim: Simulated devices. Available with PyVISA-sim package.
"""
#: Default arguments passed to the Resource constructor on initialize.
#: It should be specified in two layers, the first indicating the
#: interface type and the second the corresponding arguments.
#: The key COMMON is used to indicate keywords for all interfaces.
#: For example::
#:
#: {'ASRL': {'read_termination': '\n',
#: 'baud_rate': 9600},
#: 'USB': {'read_termination': \r'},
#: 'COMMON': {'write_termination': '\n'}
#: }
#:
#: :type: dict[str, dict[str, str]]
DEFAULTS = None
#: The identification number of the manufacturer as hex code.
#: :type: str | None
MANUFACTURER_ID = None
#: The code number of the model as hex code.
#: Can provide a tuple/list to indicate multiple models.
#: :type: str | list | tuple | None
MODEL_CODE = None
#: Stores a reference to a PyVISA ResourceManager.
#: :type: visa.ResourceManager
__resource_manager = None
@classmethod
def _get_defaults_kwargs(cls, instrument_type, resource_type, **user_kwargs):
"""Compute the default keyword arguments combining:
- user provided keyword arguments.
- (instrument_type, resource_type) keyword arguments.
- instrument_type keyword arguments.
- resource_type keyword arguments.
- common keyword arguments.
(the first ones have precedence)
:param instrument_type: ASRL, USB, TCPIP, GPIB
:type instrument_type: str
:param resource_type: INSTR, SOCKET, RAW
:type resource_type: str
:rtype: dict
"""
if cls.DEFAULTS:
maps = [user_kwargs] if user_kwargs else []
for key in ((instrument_type, resource_type), instrument_type, resource_type, 'COMMON'):
if key not in cls.DEFAULTS:
continue
value = cls.DEFAULTS[key]
if value is None:
raise NotSupportedError('An %s instrument is not supported by the driver %s',
key, cls.__name__)
if value:
maps.append(value)
return dict(ChainMap(*maps))
else:
return user_kwargs
@classmethod
def _via_usb(cls, resource_type='INSTR', serial_number=None, manufacturer_id=None,
model_code=None, name=None, board=0, **kwargs):
"""Return a Driver with an underlying USB resource.
A connected USBTMC instrument with the specified serial_number, manufacturer_id,
and model_code is returned. If any of these is missing, the first USBTMC driver
matching any of the provided values is returned.
To specify the manufacturer id and/or the model code override the following class attributes::
class RigolDS1052E(MessageBasedDriver):
MANUFACTURER_ID = '0x1AB1'
MODEL_CODE = '0x0588'
:param serial_number: The serial number of the instrument.
:param manufacturer_id: The unique identification number of the manufacturer.
:param model_code: The unique identification number of the product.
:param name: Unique name given within Lantz to the instrument for logging purposes.
Defaults to one generated based on the class name if not provided.
:param board: USB Board to use
:param kwargs: keyword arguments passed to the Resource constructor on initialize.
:rtype: MessageBasedDriver
"""
manufacturer_id = manufacturer_id or cls.MANUFACTURER_ID
model_code = model_code or cls.MODEL_CODE
if isinstance(model_code, (list, tuple)):
_models = model_code
model_code = '?*'
else:
_models = None
query = 'USB%d::%s::%s::%s::%s' % (board, manufacturer_id or '?*',
model_code or '?*',
serial_number or '?*',
resource_type)
rm = get_resource_manager()
try:
resource_names = rm.list_resources(query)
except:
raise ValueError('No USBTMC devices found for %s' % query)
if _models:
# There are more than 1 model compatible with
resource_names = [r for r in resource_names
if r.split('::')[2] in _models]
if not resource_names:
raise ValueError('No USBTMC devices found for %s '
'with model in %s' % (query, _models))
if len(resource_names) > 1:
raise ValueError('%d USBTMC devices found for %s. '
'Please specify the serial number' % (len(resource_names), query))
return cls(resource_names[0], name, **kwargs)
@classmethod
[docs] def via_usb(cls, serial_number=None, manufacturer_id=None,
model_code=None, name=None, board=0, **kwargs):
"""Return a Driver with an underlying USB Instrument resource.
A connected USBTMC instrument with the specified serial_number, manufacturer_id,
and model_code is returned. If any of these is missing, the first USBTMC driver
matching any of the provided values is returned.
To specify the manufacturer id and/or the model code override the following class attributes::
class RigolDS1052E(MessageBasedDriver):
MANUFACTURER_ID = '0x1AB1'
MODEL_CODE = '0x0588'
:param serial_number: The serial number of the instrument.
:param manufacturer_id: The unique identification number of the manufacturer.
:param model_code: The unique identification number of the product.
:param name: Unique name given within Lantz to the instrument for logging purposes.
Defaults to one generated based on the class name if not provided.
:param board: USB Board to use
:param kwargs: keyword arguments passed to the Resource constructor on initialize.
:rtype: MessageBasedDriver
"""
return cls._via_usb('INSTR', serial_number, manufacturer_id, model_code, name, board, **kwargs)
@classmethod
[docs] def via_usb_raw(cls, serial_number=None, manufacturer_id=None, model_code=None, name=None, board=0, **kwargs):
"""Return a Driver with an underlying USB RAW resource.
:param serial_number: The serial number of the instrument.
:param manufacturer_id: The unique identification number of the manufacturer.
:param model_code: The unique identification number of the product.
:param name: Unique name given within Lantz to the instrument for logging purposes.
Defaults to one generated based on the class name if not provided.
:param board: USB Board to use
:param kwargs: keyword arguments passed to the Resource constructor on initialize.
:rtype: MessageBasedDriver
"""
return cls._via_usb('RAW', serial_number, manufacturer_id, model_code, name, board, **kwargs)
@classmethod
[docs] def via_serial(cls, port, name=None, **kwargs):
"""Return a Driver with an underlying ASRL (Serial) Instrument resource.
:param port: The serial port to which the instrument is connected.
:param name: Unique name given within Lantz to the instrument for logging purposes.
Defaults to one generated based on the class name if not provided.
:param kwargs: keyword arguments passed to the Resource constructor on initialize.
:rtype: MessageBasedDriver
"""
resource_name = 'ASRL%s::INSTR' % port
return cls(resource_name, name, **kwargs)
@classmethod
[docs] def via_tcpip(cls, hostname, port, name=None, **kwargs):
"""Return a Driver with an underlying TCP Instrument resource.
:param hostname: The ip address or hostname of the instrument.
:param port: the port of the instrument.
:param name: Unique name given within Lantz to the instrument for logging purposes.
Defaults to one generated based on the class name if not provided.
:param kwargs: keyword arguments passed to the Resource constructor on initialize.
:rtype: MessageBasedDriver
"""
resource_name = 'TCPIP::%s::%s::INSTR' % (hostname, port)
return cls(resource_name, name, **kwargs)
@classmethod
[docs] def via_tcpip_socket(cls, hostname, port, name=None, **kwargs):
"""Return a Driver with an underlying TCP Socket resource.
:param hostname: The ip address or hostname of the instrument.
:param port: the port of the instrument.
:param name: Unique name given within Lantz to the instrument for logging purposes.
Defaults to one generated based on the class name if not provided.
:param kwargs: keyword arguments passed to the Resource constructor on initialize.
:rtype: MessageBasedDriver
"""
resource_name = 'TCPIP::%s::%s::SOCKET' % (hostname, port)
return cls(resource_name, name, **kwargs)
@classmethod
[docs] def via_gpib(cls, address, name=None, **kwargs):
"""Return a Driver with an underlying GPIB Instrument resource.
:param address: The gpib address of the instrument.
:param name: Unique name given within Lantz to the instrument for logging purposes.
Defaults to one generated based on the class name if not provided.
:param kwargs: keyword arguments passed to the Resource constructor on initialize.
:rtype: MessageBasedDriver
"""
resource_name = 'GPIB::%s::INSTR' % address
return cls(resource_name, name, **kwargs)
def __init__(self, resource_name, name=None, **kwargs):
"""
:param resource_name: The resource name
:type resource_name: str
:params name: easy to remember identifier given to the instance for logging
purposes.
:param kwargs: keyword arguments passed to the resource during initialization.
"""
self.__resource_manager = get_resource_manager()
try:
resource_info = self.__resource_manager.resource_info(resource_name)
except visa.VisaIOError:
raise ValueError('The resource name is invalid')
super().__init__(name=name)
# This is to avoid accidental modifications of the class value by an instance.
self.DEFAULTS = types.MappingProxyType(self.DEFAULTS or {})
#: The resource name
#: :type: str
self.resource_name = resource_name
#: keyword arguments passed to the resource during initialization.
#: :type: dict
self.resource_kwargs = self._get_defaults_kwargs(resource_info.interface_type.name.upper(),
resource_info.resource_class,
**kwargs)
# The resource will be created when the driver is initialized.
#: :type: pyvisa.resources.MessageBasedResource
self.resource = None
self.log_debug('Using MessageBasedDriver for {}', self.resource_name)
def initialize(self):
super().initialize()
self.log_debug('Opening resource {}', self.resource_name)
self.log_debug('Setting {}', list(self.resource_kwargs.items()))
self.resource = get_resource_manager().open_resource(self.resource_name, **self.resource_kwargs)
def finalize(self):
self.log_debug('Closing resource {}', self.resource_name)
self.resource.close()
super().finalize()
[docs] def query(self, command, *, send_args=(None, None), recv_args=(None, None)):
"""Send query to the instrument and return the answer
:param command: command to be sent to the instrument
:type command: string
:param send_args: (termination, encoding) to override class defaults
:param recv_args: (termination, encoding) to override class defaults
"""
self.write(command, *send_args)
return self.read(*recv_args)
[docs] def parse_query(self, command, *,
send_args=(None, None), recv_args=(None, None),
format=None):
"""Send query to the instrument, parse the output using format
and return the answer.
.. seealso:: TextualMixin.query and stringparser
"""
ans = self.query(command, send_args=send_args, recv_args=recv_args)
if format:
parser = _PARSERS_CACHE.setdefault(format, ParseProcessor(format))
ans = parser(ans)
return ans
[docs] def write(self, command, termination=None, encoding=None):
"""Send command to the instrument.
:param command: command to be sent to the instrument.
:type command: string.
:param termination: termination character to override class defined
default.
:param encoding: encoding to transform string to bytes to override class
defined default.
:return: number of bytes sent.
"""
self.log_debug('Writing {!r}', command)
return self.resource.write(command, termination, encoding)
[docs] def read(self, termination=None, encoding=None):
"""Receive string from instrument.
:param termination: termination character (overrides class default)
:type termination: str
:param encoding: encoding to transform bytes to string (overrides class default)
:return: string encoded from received bytes
"""
ret = self.resource.read(termination, encoding)
self.log_debug('Read {!r}', ret)
return ret