Source code for ScopeFoundry.hardware

from __future__ import absolute_import, print_function
from qtpy import QtCore, QtWidgets, QtGui
from ScopeFoundry.logged_quantity import LQCollection#, LoggedQuantity
from collections import OrderedDict
import pyqtgraph as pg
import warnings
from ScopeFoundry.helper_funcs import get_logger_from_class, QLock
import time

[docs]class HardwareComponent(QtCore.QObject): """ :class:`HardwareComponent` Base class for ScopeFoundry Hardware objects to subclass, implement :meth:`setup`, :meth:`connect` and :meth:`disconnect` """ connection_succeeded = QtCore.Signal() connection_failed = QtCore.Signal()
[docs] def add_logged_quantity(self, name, **kwargs): #lq = LoggedQuantity(name=name, **kwargs) #self.logged_quantities[name] = lq #return lq return self.settings.New(name, **kwargs)
[docs] def add_operation(self, name, op_func): """ Create an operation for the HardwareComponent. *op_func* is a function that will be called upon operation activation operations are typically exposed in the default ScopeFoundry gui via a pushButton :type name: str :type op_func: QtCore.Slot """ self.operations[name] = op_func
def __init__(self, app, debug=False, name=None): """ create new HardwareComponent attached to *app* """ QtCore.QObject.__init__(self) if not hasattr(self, 'name'): self.name = self.__class__.__name__ if name is not None: self.name = name self.log = get_logger_from_class(self) # threading lock #self.lock = threading.Lock() #self.lock = DummyLock() self.lock = QLock(mode=1) # mode 0 is non-reentrant lock self.app = app #self.logged_quantities = OrderedDict() self.settings = LQCollection() self.operations = OrderedDict() self.connected = self.add_logged_quantity("connected", dtype=bool) self.connected.updated_value[bool].connect(self.enable_connection) self.connect_success = False self.debug_mode = self.add_logged_quantity("debug_mode", dtype=bool, initial=debug) self.auto_thread_lock = True self.setup() if self.auto_thread_lock: self.thread_lock_all_lq() self.has_been_connected_once = False self.is_connected = False self.connection_failed.connect(self.on_connection_failed) self.connection_succeeded.connect(self.on_connection_succeeded)
[docs] def setup(self): """ Runs during __init__, before the hardware connection is established Should generate desired LoggedQuantities, operations """ raise NotImplementedError()
[docs] def new_control_widgets(self): self.controls_groupBox = QtWidgets.QGroupBox(self.name) self.controls_formLayout = QtWidgets.QFormLayout() self.controls_groupBox.setLayout(self.controls_formLayout) #self.connect_hardware_checkBox = QtWidgets.QCheckBox("Connect to Hardware") #self.controls_formLayout.addRow("Connect", self.connect_hardware_checkBox) #self.connect_hardware_checkBox.stateChanged.connect(self.enable_connection) self.control_widgets = OrderedDict() for lqname, lq in self.settings.as_dict().items(): #: :type lq: LoggedQuantity if lq.choices is not None: widget = QtWidgets.QComboBox() elif lq.dtype in [int, float]: if lq.si: widget = pg.SpinBox() else: widget = QtWidgets.QDoubleSpinBox() elif lq.dtype in [bool]: widget = QtWidgets.QCheckBox() elif lq.dtype in [str]: widget = QtWidgets.QLineEdit() lq.connect_bidir_to_widget(widget) # Add to formlayout self.controls_formLayout.addRow(lqname, widget) self.control_widgets[lqname] = widget self.op_buttons = OrderedDict() for op_name, op_func in self.operations.items(): op_button = QtWidgets.QPushButton(op_name) op_button.clicked.connect(lambda checked, f=op_func: f()) self.controls_formLayout.addRow(op_name, op_button) self.read_from_hardware_button = QtWidgets.QPushButton("Read From Hardware") self.read_from_hardware_button.clicked.connect(self.read_from_hardware) self.controls_formLayout.addRow("Logged Quantities:", self.read_from_hardware_button) return self.controls_groupBox
[docs] def add_widgets_to_tree(self, tree): #tree = self.app.ui.hardware_treeWidget #tree = QTreeWidget() tree.setColumnCount(2) tree.setHeaderLabels(["Hardware", "Value"]) self.tree_item = QtWidgets.QTreeWidgetItem(tree, [self.name, "o"]) tree.insertTopLevelItem(0, self.tree_item) self.tree_item.setFirstColumnSpanned(False) self.tree_item.setForeground(1, QtGui.QColor('red')) # Add logged quantities to tree self.settings.add_widgets_to_subtree(self.tree_item) # Add oepration buttons to tree self.op_buttons = OrderedDict() for op_name, op_func in self.operations.items(): op_button = QtWidgets.QPushButton(op_name) op_button.clicked.connect(lambda checked, f=op_func: f()) self.op_buttons[op_name] = op_button #self.controls_formLayout.addRow(op_name, op_button) op_tree_item = QtWidgets.QTreeWidgetItem(self.tree_item, [op_name, ""]) tree.setItemWidget(op_tree_item, 1, op_button) self.tree_read_from_hardware_button = QtWidgets.QPushButton("Read From\nHardware") self.tree_read_from_hardware_button.clicked.connect(self.read_from_hardware) #self.controls_formLayout.addRow("Logged Quantities:", self.read_from_hardware_button) self.read_from_hardware_button_tree_item = QtWidgets.QTreeWidgetItem(self.tree_item, ["Logged Quantities:", ""]) self.tree_item.addChild(self.read_from_hardware_button_tree_item) tree.setItemWidget(self.read_from_hardware_button_tree_item, 1, self.tree_read_from_hardware_button)
[docs] @QtCore.Slot() def read_from_hardware(self): """ Read all settings (:class:`LoggedQuantity`) connected to hardware states """ for name, lq in self.settings.as_dict().items(): if lq.has_hardware_read(): if self.debug_mode.val: self.log.debug("read_from_hardware {}".format(name) ) lq.read_from_hardware()
[docs] def connect(self): """ Opens a connection to hardware and connects :class:`LoggedQuantity` settings to related hardware functions and parameters """ raise NotImplementedError()
[docs] def disconnect(self): """ Disconnects the hardware and severs hardware--:class:`LoggedQuantity` links """ raise NotImplementedError()
[docs] @QtCore.Slot(bool) def enable_connection(self, enable=True): if enable: try: self.connect() self.connection_succeeded.emit() except Exception as err: self.connection_failed.emit() raise err else: print("disabling connection") try: self.disconnect() self.tree_item.setText(1,'X') self.tree_item.setForeground(1, QtGui.QColor('red')) except Exception as err: # disconnect failed self.tree_item.setText(1,'?') self.tree_item.setForeground(1, QtGui.QColor('red')) raise err
[docs] def on_connection_succeeded(self): print(self.name, "connection succeeded!") self.tree_item.setText(1,'O') self.tree_item.setForeground(1, QtGui.QColor('green'))
[docs] def on_connection_failed(self): print(self.name, "connection failed!") self.settings.connected.update_value(False) self.tree_item.setText(1,'!') self.tree_item.setForeground(1, QtGui.QColor('red'))
@property def gui(self): warnings.warn("Hardware.gui is deprecated, use Hardware.app", DeprecationWarning) return self.app
[docs] def web_ui(self): return "Hardware {}".format(self.name)
[docs] def thread_lock_lq(self, lq): lq.old_lock = lq.lock lq.lock = self.lock
[docs] def thread_lock_all_lq(self): for lq in self.settings.as_list(): lq.old_lock = lq.lock lq.lock = self.lock