# -*- coding: utf-8 -*-
"""
lantz.drivers.kentech.hri
~~~~~~~~~~~~~~~~~~~~~~~~~
Implements the driver for Kentech High Repetition Rate Image Intensifier
revisions 1 and 2.
Implementation Notes
--------------------
The set of commands is cumbersome and inconsistent. Moreover, each revision
introduces backward incompatible changes. The Lantz driver abstracts those
differences.
Sources::
- LaVision PicoStar HR12
- HRI Commands obtained from Kentech
- HRI.cl from DaVis
- Lantz reverse engineering team
:copyright: 2015 by Lantz Authors, see AUTHORS for more details.
:license: BSD, see LICENSE for more details.
"""
from lantz import Feat, Action
from lantz.errors import InstrumentError
from lantz.messagebased import MessageBasedDriver
def between(s, before, after):
ndx1 = s.index(before)
ndx2 = s.index(after)
return s[ndx1+len(before):ndx2]
[docs]class HRI(MessageBasedDriver):
"""Kentech High Repetition Rate Image Intensifier.
"""
DEFAULTS = {'COMMON': {'write_termination': '\r',
'read_termination': '\n'}}
[docs] def query(self, command, *, send_args=(None, None), recv_args=(None, None)):
"""Send query to the instrument and return the answer.
Set remote mode if needed.
"""
if command and not self.recall('remote'):
self.log_info('Setting Remote.')
self.remote = True
return super().query(command, send_args=send_args, recv_args=recv_args)
[docs] def query_expect(self, command, read_termination=None, expected='ok'):
"""Send a query and check that the answer contains the string.
:type command: str
:type read_termination: str | None
:type expected: str | None
"""
if command and not self.recall('remote'):
self.log_info('Setting Remote.')
self.remote = True
self.resource.write(command)
ans = self.read(read_termination)
if expected and not expected in ans:
raise InstrumentError("'{}' not in '{}'".format(expected, ans))
return ans
@Action()
def clear(self):
"""Clear the buffer.
"""
self.write('\r\r')
@Feat(None, values={True, False})
def remote(self, value):
"""Remote or local.
"""
if value:
#self.query_expect('', None, None)
self.query_expect('\r', expected=None)
self.read()
else:
return self.query_expect('LOCAL', chr(0), None)
@Feat(read_once=True)
def revision(self):
"""Revision.
"""
ans = self.query_expect('.REV', expected=None)
print(ans)
if 'UNDEFINED' in ans:
ans = '1.0'
else:
ans = self.read()
ans = ans.split()[1]
return ans
@Feat(None, values={'ecl': 'ECLTRIG', 'ttl': 'TTLTRIG'})
def trigger_logic(self, value):
"""Trigger logic.
"""
self.query_expect(value)
@Feat(None, values={'high': 'HITRIG', '50ohm': '50TRIG}'})
def trigger_ttl_termination(self, value):
"""Trigger termination for TTL logic (for ECL is fixed to 50 ohm).
"""
if self.recall('trigger_type') == 'ecl':
raise InstrumentError('Level triggering only with ECL')
self.query_expect(value)
@Feat(None, values={'rising': '+VETRIG', 'falling': '-VETRIG}'})
def trigger_edge(self, value):
"""Trigger on rising or falling edge.
"""
self.query_expect(value)
@Feat(None, values={'level': 'LVLTRIG', 'log': 'LOGTRIG}'})
def trigger_ecl_mode(self, value):
"""Trigger mode for ECL logic.
"""
if self.recall('trigger_type') == 'ttl':
raise InstrumentError('Level triggering only with ECL')
self.query_expect(value)
@Feat(units='centivolt', limits=(-40, 40, 1))
def trigger_ecl_level(self):
"""Trigger level for ECL logic, mode level.
"""
if self.revision >= 2.0:
ans = self.query_expect('THRESHV ?')
ans = between(ans, 'THRESHV ?', 'ok')
return float(ans.strip())
else:
ans = self.query_expect('THRESHV @ .')[8:]
try:
pos = ans.index('.')
except ValueError:
raise InstrumentError('Unsupported operation.')
return float(ans[pos+2:pos+7])
@trigger_ecl_level.setter
def trigger_ecl_level(self, value):
if self.revision >= 2.0:
self.query_expect('{:d} !THRESH'.format(value))
else:
value = 40 * value + 2000.0
self.query_expect('{:d} THRESH ! TRIG+RF>HW'.format(value))
@Feat(units='volt', limits=(-50, 50))
def clamp_voltage(self):
"""Most negative value of the gate pulse.
"""
if self.revision >= 2.0:
ans = self.query_expect('CLAMP ?')
ans = between(ans, 'CLAMP ?', 'ok').strip()
return float(ans)
else:
ans = self.query_expect('CLAMP @ .')
try:
pos = ans.index('.')
except ValueError:
raise InstrumentError('Unsupported operation.')
return float(ans[pos+2:pos+7]) / 10.0
@clamp_voltage.setter
def clamp_voltage(self, value):
average = self.recall('average_voltage')
mn, mx = average - Q_(60, volt), average
if mn < value < mx:
raise ValueError('Invalid clamp voltage. Not in range {}-{}'.format(mn, mx))
self.query_expect('{:d} CLAMP ! CLAMP>HW'.format(value * 10))
@Feat(units='volt', limits=(-50, 50))
def average_voltage(self):
"""Cathode potential bias with respect of MCP.
"""
if self.revision >= 2.0:
ans = self.query_expect('AVE ?')
ans = between(ans, 'AVE ?', 'ok')
return float(ans.strip()) / 10.
else:
ans = self.query_expect('THRESHV @ .')[8:]
try:
pos = ans.index('.')
except ValueError:
raise InstrumentError('Unsupported operation.')
return float(ans[pos+2:pos+7]) / 10.
@average_voltage.setter
def average_voltage(self, value):
self.query_expect('{:d} AVE ! AVE>HW'.format(value * 10))
@Feat()
def status(self):
"""Get status.
"""
return self.query_expect(".STATUS", chr(0))
@Feat(None, units='volt', limits=(0, 1700))
def mcp(self, value):
"""MCP Voltage.
"""
if self.revision >= '2.0':
return self.query_expect('{} !MCP'.format(value))
else:
return self.query_expect('{} !MCPVOLTS'.format(value))
@Feat(None, values={'inhibit': 0, 'rf': 21, 'ldc': 22, 'hdc': 23, 'dc': 24,
'user1': 25, 'user2': 26, 'user3': 27, 'user4': 28})
def mode(self, mode):
"""Gain modulation mode.
HRI Machine Modes and Mode Indices
None Mode
0 INHIBIT
2-10 COMB modes 200 ps to 1 ns inclusive (High rate operation)
11-20 COMB modes 100 ps to 3 ns inclusive (Low rate (+GOI) operation)
21 RF
22 logic low duty cycle (LDC)
23 logic high duty cycle
24 DC
25-28 user modes 1 to 4
"""
#TODO: Modes [11-20] not available in rev < 2.0
return self.query_expect("{} !MODE".format(mode))
@Feat(None)
def rfgain(self, value):
"""RF Gain.
"""
return self.query("{} !RFGAIN".format(value))
@Feat()
def temperature(self):
"""Temperature.
"""
if self.revision == 2.0:
return self.query("@TEMP .")
return 0
@Feat(None, values={True, False})
def enabled(self, value):
"""MCP Enabled
"""
if self.revision < 2:
if value:
self.query_expect('+M')
else:
self.query_expect('-M')
else:
if value:
self.mode = self.__dict__.get('_last_mode', 21)
else:
self._last_mode = self.recall('mode')
self.mode = 0
if __name__ == '__main__':
import argparse
import lantz.log
parser = argparse.ArgumentParser(description='Test Kentech HRI')
parser.add_argument('-i', '--interactive', action='store_true',
default=False, help='Show interactive GUI')
parser.add_argument('-p', '--port', type=str, default='17',
help='Serial port to connect to')
args = parser.parse_args()
lantz.log.log_to_socket(lantz.log.DEBUG)
with HRI.from_serial_port(args.port, baudrate=9600) as inst:
if args.interactive:
from lantz.ui.app import start_test_app
start_test_app(inst)
else:
#inst.clear()
inst.remote = True
print(inst.revision)
inst.mode = "inhibit"
inst.mcp = 350
inst.rfgain = 99
#print(inst.status)
inst.mode = "rf"
#print(inst.status)
inst.remote = False