from __future__ import absolute_import, print_function
from qtpy import QtCore, QtWidgets
import pyqtgraph
import numpy as np
from collections import OrderedDict
import json
import sys
from ScopeFoundry.helper_funcs import get_logger_from_class, str2bool, QLock
from ScopeFoundry.ndarray_interactive import ArrayLQ_QTableModel
import pyqtgraph as pg
from inspect import signature
#import threading
# python 2/3 compatibility
if sys.version_info[0] == 3:
unicode = str
[docs]class DummyLock(object):
[docs] def acquire(self):
[docs] def release(self):
def __enter__(self):
return self
def __exit__(self, *args):
[docs]class LoggedQuantity(QtCore.QObject):
**LoggedQuantity** objects are containers that wrap settings. These settings
may be a number (integer or float) or a string and occasionally small arrays
of them.
These objects emit signals when changed and can be connected bidirectionally
to Qt Widgets.
In ScopeFoundry we represent the values in an object called a
`LoggedQuantity`. A :class:`LoggedQuantity` is a class that contains a
value, a `bool`, `float`, `int`, `str` etc that is part of an application's
state. In the case of microscope and equipment control, these also can
represent the state of a piece of hardware. These are very useful objects
because the are the central location of the value contained within. All
graphical interface views will be guaranteed to be consistent with the `LQ`
state. The data of these quantities will also be saved in datafiles created
by ScopeFoundry.
# signal sent when value has been updated
updated_value = QtCore.Signal((float,),(int,),(bool,), (), (str,),)
# signal sent when value has been updated, sends text representation
updated_text_value = QtCore.Signal(str)
# emits the index of the value in self.choices
updated_choice_index_value = QtCore.Signal(int)
# signal sent when min max range updated
updated_min_max = QtCore.Signal((float,float),(int,int), (),)
# signal sent when read only (ro) status has changed
updated_readonly = QtCore.Signal((bool,), (),)
def __init__(self, name, dtype=float,
hardware_read_func=None, hardware_set_func=None,
initial=0, fmt="%g", si=False,
ro = False, # read only flag
unit = None,
spinbox_decimals = 2,
vmin=-1e12, vmax=+1e12, choices=None,
reread_from_hardware_after_write = False,
description = None
QtCore.QObject.__init__(self) = name
self.dtype = dtype
self.val = dtype(initial)
self.hardware_read_func = hardware_read_func
self.hardware_set_func = hardware_set_func
self.fmt = fmt # string formatting string. This is ignored if dtype==str
if self.dtype == str:
self.fmt = "%s" = si # will use pyqtgraph SI Spinbox if True
self.unit = unit
self.vmin = vmin
self.vmax = vmax
# choices should be tuple [ ('name', val) ... ] or simple list [val, val, ...]
self.choices = self._expand_choices(choices) = ro # Read-Only
self.is_array = False
self.description = description
self.log = get_logger_from_class(self)
if self.dtype == int:
self.spinbox_decimals = 0
self.spinbox_decimals = spinbox_decimals
self.reread_from_hardware_after_write = reread_from_hardware_after_write
if self.dtype == int:
self.spinbox_step = 1
self.spinbox_step = spinbox_step
self.oldval = None
self._in_reread_loop = False # flag to prevent reread from hardware loops
self.widget_list = []
self.listeners = []
# threading lock
#self.lock = threading.Lock()
#self.lock = DummyLock()
self.lock = QLock(mode=1) # mode 0 is non-reentrant lock
[docs] def coerce_to_type(self, x):
Force x to dtype of the LQ
============= ==================================
**Arguments** **Description**
*x* value of type str, bool, int, etc.
============= ==================================
:returns: Same value, *x* of the same type as its respective logged
if self.dtype==bool and isinstance(x, str):
return str2bool(x)
return self.dtype(x)
def _expand_choices(self, choices):
if choices is None:
return None
expanded_choices = []
for c in choices:
if isinstance(c, tuple):
name, val = c
expanded_choices.append( ( str(name), self.dtype(val) ) )
expanded_choices.append( ( str(c), self.dtype(c) ) )
return expanded_choices
def __str__(self):
return "{} = {}".format(, self.val)
def __repr__(self):
return "LQ: {} = {}".format(, self.val)
[docs] def read_from_hardware(self, send_signal=True):
self.log.debug("{}: read_from_hardware send_signal={}".format(, send_signal))
if self.hardware_read_func:
with self.lock:
self.oldval = self.val
val = self.hardware_read_func()
self.update_value(new_val=val, update_hardware=False, send_signal=send_signal)
self.log.warn("{} read_from_hardware called when not connected to hardware".format(
return self.val
[docs] def write_to_hardware(self, reread_hardware=None):
if reread_hardware is None:
# if undefined, default to stored reread_from_hardware_after_write bool
reread_hardware = self.reread_from_hardware_after_write
# Read from Hardware
if self.has_hardware_write():
with self.lock:
if reread_hardware:
def value(self):
"return stored value"
return self.val
[docs] @QtCore.Slot(str)
def update_value(self, new_val=None, update_hardware=True, send_signal=True, reread_hardware=None):
Update stored value with new_val
Change value of LQ and emit signals to inform listeners of change
if *update_hardware* is true: call connected hardware write function
=============== =================================================================================================================
**Arguments:** **Description:**
new_val New value for the LoggedQuantity to store
update_hardware calls hardware_set_func if defined (default True)
send_signal sends out QT signals on upon change (default True)
reread_hardware read from hardware after writing to hardware to ensure change (defaults to self.reread_from_hardware_after_write)
=============== =================================================================================================================
:returns: None
# use a thread lock during update_value to avoid another thread
# calling update_value during the update_value
if reread_hardware is None:
# if undefined, default to stored reread_from_hardware_after_write bool
reread_hardware = self.reread_from_hardware_after_write
with self.lock:
# sometimes a the sender is a textbox that does not send its new value,
# grab the text() from it instead
if new_val is None:
new_val = self.sender().text()
self.oldval = self.coerce_to_type(self.val)
new_val = self.coerce_to_type(new_val)
self.log.debug("{}: update_value {} --> {} sender={}".format(, repr(self.oldval), repr(new_val), repr(self.sender())))
# check for equality of new vs old, do not proceed if they are same
if self.same_values(self.oldval, new_val):
self.log.debug("{}: same_value so returning {} {}".format(, self.oldval, new_val))
self.log.debug("{}: different values {} {}".format(, self.oldval, new_val))
# actually change internal state value
self.val = new_val
# Read from Hardware
if update_hardware and self.hardware_set_func:
if reread_hardware:
# Send Qt Signals
if send_signal:
[docs] def send_display_updates(self, force=False):
Emit updated_value signals if value has changed.
============= =============================================
**Arguments** **Description**
*force* will emit signals regardless of value change.
============= =============================================
:returns: None
#self.log.debug("{}:send_display_updates: force={}. From {} to {}".format(, force, self.oldval, self.val))
if (not self.same_values(self.oldval, self.val)) or (force):
str_val = self.string_value()
if self.dtype in [float, int]:
if self.choices is not None:
choice_vals = [c[1] for c in self.choices]
if self.val in choice_vals:
self.updated_choice_index_value.emit(choice_vals.index(self.val) )
self.oldval = self.val
# no updates sent
[docs] def same_values(self, v1, v2):
Compares two values of the LQ type, used in update_value
============= ====================
**Arguments** **Description**
v1 value 1
v2 value 2
============= ====================
:returns: Boolean value (True or False)
return v1 == v2
[docs] def string_value(self):
if self.dtype == str:
return self.val
return self.fmt % self.val
[docs] def ini_string_value(self):
:returns: A string showing the logged quantity value.
return str(self.val)
[docs] def update_choice_index_value(self, new_choice_index, **kwargs):
self.update_value(self.choices[new_choice_index][1], **kwargs)
[docs] def add_listener(self, func, argtype=(), **kwargs):
Connect 'func' as a listener (Qt Slot) for the
updated_value signal.
By default 'func' should take no arguments,
but argtype can define the data type that it should accept.
but should be limited to those supported by LoggedQuantity
(i.e. int, float, str)
\*\*kwargs are passed to the connect function
appends the 'func' to the 'listeners' list
# --> This is now handled by redefining sys.excepthook handle in ScopeFoundry.base_app
# Wraps func in a try block to absorb the Exception to avoid crashing PyQt5 >5.5
# see
# def wrapped_func(func):
# def f(*args):
# try:
# func(*args)
# except Exception as err:
# print "Exception on listener:"
self.updated_value[argtype].connect(func, **kwargs)
[docs] def connect_to_lq(self, lq):
[docs] def change_choice_list(self, choices):
#widget = self.widget
with self.lock:
self.choices = self._expand_choices(choices)
for widget in self.widget_list:
if type(widget) == QtWidgets.QComboBox:
# need to have a choice list to connect to a QComboBox
assert self.choices is not None
widget.clear() # removes all old choices
for choice_name, choice_value in self.choices:
widget.addItem(choice_name, choice_value)
raise RuntimeError("Invalid widget type.")
[docs] def change_min_max(self, vmin=-1e12, vmax=+1e12):
# TODO setRange should be a slot for the updated_min_max signal
with self.lock:
self.vmin = vmin
self.vmax = vmax
for widget in self.widget_list: # may not work for certain widget types
widget.setRange(vmin, vmax)
[docs] def change_readonly(self, ro=True):
with self.lock: = ro
for widget in self.widget_list:
if type(widget) in [QtWidgets.QDoubleSpinBox, pyqtgraph.widgets.SpinBox.SpinBox]:
#TODO other widget types
[docs] def change_unit(self, unit):
with self.lock:
self.unit = unit
for widget in self.widget_list:
if type(widget) == QtWidgets.QDoubleSpinBox:
if self.unit is not None:
widget.setSuffix(" "+self.unit)
elif type(widget) == pyqtgraph.widgets.SpinBox.SpinBox:
suffix = self.unit
if self.unit is None:
suffix = ""
opts = dict(
[docs] def is_connected_to_hardware(self):
:returns: True if either self.hardware_read_func or
self.hardware_set_func are defined. False if None.
return (self.hardware_read_func is not None) or (self.hardware_set_func is not None)
[docs] def has_hardware_read(self):
return self.hardware_read_func is not None
[docs] def has_hardware_write(self):
return self.hardware_set_func is not None
[docs] def connect_to_hardware(self, read_func=None, write_func=None):
if read_func is not None:
assert callable(read_func)
self.hardware_read_func = read_func
if write_func is not None:
assert callable(write_func)
self.hardware_set_func = write_func
[docs] def disconnect_from_hardware(self, dis_read=True, dis_write=True):
if dis_read:
self.hardware_read_func = None
if dis_write:
self.hardware_set_func = None
[docs] def connect_lq_math(self, lqs, func, reverse_func=None):
Links LQ to other LQs using math functions
takes a func that takes a set of logged quantities
new_val = f(lq1, lq2, ...)
when any of the lqs change, the value of this derived LQ
will be updated based on func
reverse_func allows changes to this LQ to update lqs via reverse func
new_lq1_val, new_lq2_val, ... = g(new_val, old_lqs_values)
new_lq1_val, new_lq2_val, ... = g(new_val)
self.math_lqs = tuple(lqs)
except TypeError: # if not iterable, assume its a single LQ
self.math_lqs = (lqs,)
self.math_func = func
self.reverse_math_func = reverse_func
if reverse_func is not None:
rev_sig = signature(reverse_func)
self.reverse_func_num_params = len(rev_sig.parameters)
def update_math():
lq_vals = [lq.value for lq in self.math_lqs]
new_val = self.math_func(*lq_vals)
#print(, "update_math", lq_vals, "-->", new_val, )
if reverse_func:
def update_math_reverse():
lq_vals = [lq.value for lq in self.math_lqs]
if self.reverse_func_num_params > 1:
new_vals = self.reverse_math_func(self.val, lq_vals)
new_vals = self.reverse_math_func(self.val)
new_vals = tuple(new_vals)
except TypeError: # if not iterable, assume its a single value
new_vals = (new_vals,)
for lq, new_val in zip(self.math_lqs, new_vals):
for lq in self.math_lqs:
if reverse_func:
[docs] def read_from_lq_math(self):
lq_vals = [lq.value for lq in self.math_lqs]
new_val = self.math_func(*lq_vals)
#print("read_from_lq_math", lq_vals, "-->", new_val, )
[docs] def connect_lq_scale(self, lq, scale):
self.lq_scale = scale
self.connect_lq_math((lq,), func=lambda x: scale*x,
reverse_func=lambda y, old_vals: [y * 1.0/scale,])
[docs]class FileLQ(LoggedQuantity):
Specialized str type :class:`LoggedQuantity` that handles
a filename (or directory) and associated file.
def __init__(self, name, default_dir=None, is_dir=False, **kwargs):
kwargs.pop('dtype', None)
LoggedQuantity.__init__(self, name, dtype=str, **kwargs)
self.default_dir = default_dir
self.is_dir = is_dir
[docs] def file_browser(self):
# TODO add default directory, etc
if self.is_dir:
fname = QtWidgets.QFileDialog.getExistingDirectory(None)
fname, _ = QtWidgets.QFileDialog.getOpenFileName(None)
if fname:
[docs]class ArrayLQ(LoggedQuantity):
updated_shape = QtCore.Signal(str)
def __init__(self, name, dtype=float,
hardware_read_func=None, hardware_set_func=None,
initial=[], fmt="%g", si=True,
ro = False,
unit = None,
vmin=-1e12, vmax=+1e12, choices=None):
QtCore.QObject.__init__(self) = name
self.dtype = dtype
if self.dtype == str:
self.val = np.array(initial, dtype=object)
self.val = np.array(initial, dtype=dtype)
self.hardware_read_func = hardware_read_func
self.hardware_set_func = hardware_set_func
self.fmt = fmt # % string formatting string. This is ignored if dtype==str
if self.dtype == str:
self.fmt = "%s"
self.unit = unit
self.vmin = vmin
self.vmax = vmax = ro # Read-Only
self.log = get_logger_from_class(self)
if self.dtype == int:
self.spinbox_decimals = 0
self.spinbox_decimals = 2
self.reread_from_hardware_after_write = False
self.oldval = None
self._in_reread_loop = False # flag to prevent reread from hardware loops
self.widget_list = []
self.listeners = []
# threading lock
self.lock = QLock(mode=0) # mode 0 is non-reentrant lock
self.is_array = True
self._tableView = None
[docs] def same_values(self, v1, v2):
if v1.shape == v2.shape:
return np.all(v1 == v2)
self.log.debug("same_values %s %s" % (v2-v1, np.all(v1 == v2)))
return False
[docs] def change_shape(self, newshape):
[docs] def string_value (self):
return json.dumps(self.val.tolist())
[docs] def ini_string_value(self):
return json.dumps(self.val.tolist())
[docs] def coerce_to_type(self, x):
#print type(x)
if type(x) in (unicode, str):
x = json.loads(x)
#print repr(x)
return np.array(x, dtype=self.dtype)
[docs] def send_display_updates(self, force=False):
with self.lock:
self.log.debug( + ' send_display_updates')
#print "send_display_updates: {} force={}".format(, force)
if force or np.any(self.oldval != self.val):
#print "send display updates",, self.val, self.oldval
str_val = self.string_value()
#if self.dtype != float:
# self.updated_value[int].emit(self.val)
self.oldval = self.val
self.log.debug( + ' send_display_updates skipped (olval!=self.val)={} force={} oldval={} val={}'.format(
(self.oldval != self.val) , (force), self.oldval, self.val))
#print "\t no updates sent", (self.oldval != self.val) , (force), self.oldval, self.val
def array_tableView(self):
if self._tableView == None:
self._tableView = self.create_tableView()
return self._tableView
[docs] def create_tableView(self, **kwargs):
widget = QtWidgets.QTableView()
model = ArrayLQ_QTableModel(self, transpose=(len(self.val.shape) == 1), **kwargs)
return widget
[docs] def connect_element_follower_lq(self, lq, index, bidir=True):
connects an LQ to follow the changes of the array and display a specific element
# when self (array_lq) is update, update follower lq
func=lambda arr, index=index: arr[index])
if bidir:
# when LQ is updated, update element in self
def on_element_follower_lq(lq=lq, arr_lq=self, index=index):
#print("on_element_follower_lq", arr_lq.value, lq.value, index)
old_val = arr_lq.value[index]
new_val = lq.value
if new_val == old_val:
new_arr = arr_lq.value.copy()
new_arr[index] = new_val
[docs]class LQCircularNetwork(QtCore.QObject):
LQCircularNetwork is collection of logged quantities
Helper class if a network of lqs with circular structures are present
Use update_values_synchronously method if the specified lqs should
be updated at once. The internal lock-flag prevents infinite loops.
updated_values = QtCore.Signal((),)
def __init__(self, lq_dict):
self.lq_dict = lq_dict # {lq_key:lq}
self.locked = False # some lock that does NOT allow blocked routines to be executed after release()
# a flag (as it is now) works well.
[docs] def update_values_synchronously(self,**kwargs):
kwargs is dict containing lq_key and new_vals
Note: lq_key is not necessarily the name of the lq but key of lq_dict
as specified at initialization
if self.locked == False:
self.locked = True
for kev,val in kwargs.items():
self.locked = False
[docs]class LQRange(LQCircularNetwork):
LQRange is a collection of logged quantities that describe a
numpy.linspace array inputs.
Four (or six) LQ's are defined, min, max, num, step (center, span)
and are connected by signals/slots that keep the quantities
in sync.
LQRange.array is the linspace array and is kept upto date
with changes to the 4 (or 6) LQ's
def __init__(self, min_lq, max_lq, step_lq, num_lq, center_lq=None, span_lq=None):
self.log = get_logger_from_class(self)
self.min = min_lq
self.max = max_lq
self.num = num_lq
self.step = step_lq = center_lq
self.span = span_lq
lq_dict = {'min':self.min, 'max':self.max, 'num':self.num, 'step':self.step}
if == None:
assert(self.span == None, 'Invalid initialization of LQRange')
if self.span == None:
assert( == None, 'Invalid initialization of LQRange')
LQCircularNetwork.__init__(self, lq_dict)
Note: {step, num} and {min,max,span,center} form each 2 circular subnetworks
The listener functions update the subnetworks, a connect_lq_math connects
{min,max,span,center} to {step, num} unidirectional
if and self.span:
self.step.connect_lq_math((self.min,self.max,self.num), self.calc_step)
[docs] def calc_num(self, min_, max_, step):
enforces num to be a positive integer and adjust step accordingly
returns num,step
span = max_-min_
if step==0:
n = 10
n = span/step #num = n+1
if n < 0:
n = -n
n = np.ceil(n)
step = span/n
num = n+1
return int(num),step
[docs] def calc_step(self, min_, max_, num):
excludes num=1 to prevent division by zero,
returns step
if num==1: #prevent division by zero
num = 2
return step
[docs] def calc_span(self, min_, max_):
return (max_-min_)
[docs] def calc_center(self, min_, max_):
return (max_-min_)/2+min_
[docs] def calc_min(self, center, span):
return center-span/2.0
[docs] def calc_max(self, center, span):
return center+span/2.0
[docs] def on_change_step(self):
step = self.step.val
num,step = self.calc_num(self.min.val, self.max.val, step)
self.update_values_synchronously(num=num, step=step)
[docs] def on_change_num(self):
num = self.num.val
if num ==1:
num = 2
step = self.calc_step(self.min.val, self.max.val, self.num.val)
[docs] def on_change_min_max(self):
min_ = self.min.val
max_ = self.max.val
span = self.calc_span(min_, max_)
center = self.calc_center(min_, max_)
self.update_values_synchronously(span=span, center=center)
[docs] def on_change_center_span(self):
span = self.span.val
center =
min_ = self.calc_min(center, span)
max_ = self.calc_max(center, span)
self.update_values_synchronously(min=min_, max=max_)
def array(self):
return np.linspace(self.min.val, self.max.val, self.num.val)
[docs]class LQCollection(object):
LQCollection is a smart dictionary of LoggedQuantity objects.
attribute access such as lqcoll.x1 will return full LoggedQuantity object
dictionary-style access lqcoll['x1'] allows direct reading and writing of
the LQ's value, handling the signals automatically
New LQ's can be created with :meth:`New`
LQRange objects can be created with :meth:`New_Range` and will be stored
in :attr:ranges
def __init__(self):
self._logged_quantities = OrderedDict()
self.ranges = OrderedDict()
self.log = get_logger_from_class(self)
[docs] def New(self, name, dtype=float, **kwargs):
Create a new LoggedQuantity with name and dtype
is_array = kwargs.pop('array', False)
#self.log.debug("{} is_array? {}".format(name, is_array))
if is_array:
lq = ArrayLQ(name=name, dtype=dtype, **kwargs)
if dtype == 'file':
lq = FileLQ(name=name, **kwargs)
lq = LoggedQuantity(name=name, dtype=dtype, **kwargs)
return self.Add(lq)
[docs] def Add(self, lq):
"""Add an existing LoggedQuantity to the Collection
Examples of usefulness: add hardware lq to measurement settings
name =
assert not (name in self._logged_quantities)
assert not (name in self.__dict__)
self._logged_quantities[name] = lq
self.__dict__[name] = lq # allow attribute access
return lq
[docs] def get_lq(self, key):
return self._logged_quantities[key]
[docs] def get_val(self, key):
return self._logged_quantities[key].val
[docs] def as_list(self):
return self._logged_quantities.values()
[docs] def as_dict(self):
return self._logged_quantities
# def items(self):
# return self._logged_quantities.items()
[docs] def keys(self):
return self._logged_quantities.keys()
[docs] def remove(self, name):
del self._logged_quantities[name]
del self.__dict__[name]
def __delitem__(self, key):
def __getitem__(self, key):
"Dictionary-like access reads and sets value of LQ's"
return self._logged_quantities[key].val
def __setitem__(self, key, item):
"Dictionary-like access reads and sets value of LQ's"
def __contains__(self, key):
return self._logged_quantities.__contains__(key)
def __getattribute__(self,name):
if name in self.logged_quantities.keys():
return self.logged_quantities[name]
return object.__getattribute__(self, name)
[docs] def New_Range(self, name, include_center_span=False, **kwargs):
min_lq = self.New( name + "_min" , initial=0., **kwargs )
max_lq = self.New( name + "_max" , initial=1., **kwargs )
step_lq = self.New( name + "_step", initial=0.1, **kwargs)
num_lq = self.New( name + "_num", dtype=int, vmin=1, initial=11)
if include_center_span:
center_lq = self.New(name + "_center", **kwargs, initial=0.5)
span_lq = self.New( name + "_span", **kwargs, initial=1.0)
lqrange = LQRange(min_lq, max_lq, step_lq, num_lq, center_lq, span_lq)
lqrange = LQRange(min_lq, max_lq, step_lq, num_lq)
self.ranges[name] = lqrange
return lqrange
[docs] def New_UI(self, include = None, exclude = []):
"""create a default Qt Widget that contains
widgets for all settings in the LQCollection
ui_widget = QtWidgets.QWidget()
formLayout = QtWidgets.QFormLayout()
if include is None:
lqnames = self.as_dict().keys()
lqnames = include
for lqname in lqnames:
if lqname in exclude:
lq = self.get_lq(lqname)
#: :type lq: LoggedQuantity
widget = lq.new_default_widget()
# Add to formlayout
formLayout.addRow(lqname, widget)
#lq_tree_item = QtWidgets.QTreeWidgetItem(self.tree_item, [lqname, ""])
#lq.hardware_tree_widget = widget
#tree.setItemWidget(lq_tree_item, 1, lq.hardware_tree_widget)
#self.control_widgets[lqname] = widget
return ui_widget
[docs] def disconnect_all_from_hardware(self):
for lq in self.as_list():