# -*- coding: utf-8 -*-
"""
Created on Tue Apr 1 09:25:48 2014
@author: esbarnard
"""
from __future__ import absolute_import, print_function
from qtpy import QtCore, QtWidgets
import threading
import time
from .logged_quantity import LQCollection
from .helper_funcs import load_qt_ui_file
from collections import OrderedDict
import pyqtgraph as pg
from ScopeFoundry.helper_funcs import get_logger_from_class
import traceback
import sys
[docs]class MeasurementQThread(QtCore.QThread):
def __init__(self, measurement, parent=None):
super(MeasurementQThread, self).__init__(parent)
self.measurement = measurement
[docs] def run(self):
self.measurement._thread_run()
[docs]class Measurement(QtCore.QObject):
"""
Base class for ScopeFoundry Hardware objects
to subclass, implement :meth:`setup`, :meth:`run`
for measurements with graphical interfaces,
subclass and additionally implement :meth:`setup_figure`, :meth:`update_display`
"""
measurement_sucessfully_completed = QtCore.Signal(())
"""signal sent when full measurement is complete"""
measurement_interrupted = QtCore.Signal(())
"""signal sent when measurement is complete due to an interruption"""
#measurement_state_changed = QtCore.Signal(bool) # signal sent when measurement started or stopped
def __init__(self, app, name=None):
"""
:type app: BaseMicroscopeApp
"""
QtCore.QObject.__init__(self)
self.log = get_logger_from_class(self)
if not hasattr(self, 'name'):
self.name = self.__class__.__name__
if name is not None:
self.name = name
self.app = app
self.display_update_period = 0.1 # seconds
self.display_update_timer = QtCore.QTimer(self)
self.display_update_timer.timeout.connect(self._on_display_update_timer)
self.acq_thread = None
self.interrupt_measurement_called = False
#self.logged_quantities = OrderedDict()
self.settings = LQCollection()
self.operations = OrderedDict()
self.activation = self.settings.New('activation', dtype=bool, ro=False) # does the user want to the thread to be running
self.running = self.settings.New('running', dtype=bool, ro=True) # is the thread actually running?
self.progress = self.settings.New('progress', dtype=float, unit="%", si=False, ro=True)
self.settings.New('profile', dtype=bool, initial=False) # Run a profile on the run to find performance problems
self.activation.updated_value[bool].connect(self.start_stop)
self.add_operation("start", self.start)
self.add_operation("interrupt", self.interrupt)
#self.add_operation('terminate', self.terminate)
#self.add_operation("setup", self.setup)
#self.add_operation("setup_figure", self.setup_figure)
self.add_operation("update_display", self.update_display)
self.add_operation('show_ui', self.show_ui)
if hasattr(self, 'ui_filename'):
self.load_ui()
self.setup()
[docs] def setup(self):
"""Override this to set up logged quantities and gui connections
Runs during __init__, before the hardware connection is established
Should generate desired LoggedQuantities"""
pass
#raise NotImplementedError()
[docs] @QtCore.Slot()
def start(self):
"""
Starts the measurement
calls *pre_run*
creates acquisition thread
runs thread
"""
#self.start_stop(True)
self.activation.update_value(True)
def _start(self):
"""
Starts the measurement
calls *pre_run*
creates acquisition thread
runs thread
"""
self.log.info("measurement {} start".format(self.name))
self.interrupt_measurement_called = False
if (self.acq_thread is not None) and self.is_measuring():
raise RuntimeError("Cannot start a new measurement while still measuring")
#self.acq_thread = threading.Thread(target=self._thread_run)
self.acq_thread = MeasurementQThread(self)
self.acq_thread.finished.connect(self.post_run)
#self.measurement_state_changed.emit(True)
self.running.update_value(True)
self.pre_run()
self.acq_thread.start()
self.t_start = time.time()
self.display_update_timer.start(self.display_update_period*1000)
[docs] def pre_run(self):
"""Override this method to enable main-thread initialization prior to measurement thread start"""
pass
[docs] def run(self):
"""
*run* method runs in an separate thread and is used for data acquisition
No GUI updates should occur within the *run* function, any Qt related GUI work
should occur in :meth:`update_display`
"""
if hasattr(self, '_run'):
self.log.warning("warning _run is deprecated, use run")
self._run()
else:
raise NotImplementedError("Measurement {}.run() not defined".format(self.name))
[docs] def post_run(self):
"""Override this method to enable main-thread finalization after to measurement thread completes"""
pass
def _thread_run(self):
"""
This function governs the behavior of the measurement thread.
"""
self.set_progress(50.) # set progress bars to default run position at 50%
try:
if self.settings['profile']:
import cProfile
profile = cProfile.Profile()
profile.enable()
self.run()
#except Exception as err:
# self.interrupt_measurement_called = True
# raise err
finally:
self.running.update_value(False)
self.activation.update_value(False)
self.set_progress(0.) # set progress bars back to zero
#self.measurement_state_changed.emit(False)
if self.interrupt_measurement_called:
self.measurement_interrupted.emit()
self.interrupt_measurement_called = False
else:
self.measurement_sucessfully_completed.emit()
if self.settings['profile']:
profile.disable()
profile.print_stats(sort='time')
@property
def gui(self):
self.log.warning("Measurement.gui is deprecated, use Measurement.app " + repr(DeprecationWarning))
return self.app
[docs] def set_progress(self, pct):
"""
This function updates the logged quantity progress which is used for the display of progress bars in the UI.
============== ==============================================================================================
**Arguments:**
pct The percentage of progress given by a measurement module
============== ==============================================================================================
"""
self.progress.update_value(pct)
[docs] @QtCore.Slot()
def interrupt(self):
"""
Kindly ask the measurement to stop.
This raises the :attr:`interrupt_measurement_called` flag
To actually stop, the threaded :meth:`run` method must check
for this flag and exit
"""
self.log.info("measurement {} interrupt".format(self.name))
self.interrupt_measurement_called = True
self.activation.update_value(False)
#Make sure display is up to date
#self._on_display_update_timer()
[docs] def terminate(self):
"""
Terminate MeasurementQThread.
"""
self.acq_thread.terminate()
[docs] def start_stop(self, start):
"""
Use boolean *start* to either start (True) or
interrupt (False) measurement. Test.
"""
self.log.info("{} start_stop {}".format(self.name, start))
if start:
self._start()
else:
self.interrupt()
[docs] def is_measuring(self):
"""
Returns whether the acquisition thread is running
"""
if self.acq_thread is None:
self.running.update_value(False)
self.activation.update_value(False)
self.settings['progress'] = 0.0
return False
else:
#resp = self.acq_thread.is_alive()
resp = self.acq_thread.isRunning()
self.running.update_value(resp)
return resp
[docs] def update_display(self):
"Override this function to provide figure updates when the display timer runs"
pass
@QtCore.Slot()
def _on_display_update_timer(self):
try:
self.update_display()
except Exception as err:
exc_type, exc_value, exc_traceback = sys.exc_info()
self.log.error("{} Failed to update figure1: {}. {}".format(self.name, err, traceback.format_exception(exc_type, exc_value, exc_traceback)))
finally:
if not self.is_measuring():
self.display_update_timer.stop()
[docs] def add_logged_quantity(self, name, **kwargs):
"""
Create a new :class:`LoggedQuantity` and adds it to the measurement's
:attr:`settings` (:class:`LQCollection`)
"""
lq = self.settings.New(name=name, **kwargs)
return lq
[docs] def add_operation(self, name, op_func):
"""
Used to create a logged quantity connection between a button in the Measurement tree
and a function.
============== =================
**type name:** **type op_func:**
str QtCore.Slot
============== =================
"""
self.operations[name] = op_func
[docs] def start_nested_measure_and_wait(self, measure, nested_interrupt = True,
polling_func=None, polling_time=0.1):
"""
Start another nested measurement *measure* and wait until completion.
Optionally it can call a polling function *polling_func* with no arguments
at an interval *polling_time* in seconds.
if *nested_interrupt* is True then interrupting the nested *measure* will
also interrupt the outer measurement. *nested_interrupt* defaults to True
"""
measure.settings['activation'] = True
time.sleep(0.010)
last_polling = time.time()
while measure.is_measuring():
if self.interrupt_measurement_called:
measure.interrupt()
if measure.interrupt_measurement_called and nested_interrupt:
#print("nested interrupt bubbling up")
self.interrupt()
time.sleep(0.010)
# polling
if polling_func:
t = time.time()
if t - last_polling > polling_time:
#print("poll", t - last_polling )
polling_func()
last_polling = t
[docs] def load_ui(self, ui_fname=None):
"""
Loads and shows user interface.
============== ===============================================================
**Arguments:**
ui_fname filename of user interface file (usually made with Qt Designer)
============== ===============================================================
"""
# TODO destroy and rebuild UI if it already exists
if ui_fname is not None:
self.ui_filename = ui_fname
# Load Qt UI from .ui file
self.ui = load_qt_ui_file(self.ui_filename)
#self.show_ui()
[docs] def show_ui(self):
"""
Shows the graphical user interface of this measurement. :attr:`ui`
"""
self.app.bring_measure_ui_to_front(self)
# if self.app.mdi and self.ui.parent():
# self.ui.parent().raise_()
# return
# self.ui.show()
# self.ui.activateWindow()
# self.ui.raise_() #just to be sure it's on top
# if self.app.mdi and self.ui.parent():
# self.ui.parent().raise_()
[docs] def web_ui(self):
return "Hardware {}".format(self.name)