Source code for dvg_qdeviceio

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""PyQt/PySide framework for multithreaded data acquisition and communication
with an I/O device.
"""
__author__ = "Dennis van Gils"
__authoremail__ = "vangils.dennis@gmail.com"
__url__ = "https://github.com/Dennis-van-Gils/python-dvg-qdeviceio"
__date__ = "02-04-2024"
__version__ = "1.3.0"
# pylint: disable=protected-access, wrong-import-position, too-many-lines

import os
import sys
import queue
import time
from enum import IntEnum, unique

# Code coverage tools 'coverage' and 'pytest-cov' don't seem to correctly trace
# code which is inside methods called from within QThreads, see
# https://github.com/nedbat/coveragepy/issues/686
# To mitigate this problem, I use a custom decorator '@_coverage_resolve_trace'
# to be hung onto those method definitions. This will prepend the decorated
# method code with 'sys.settrace(threading._trace_hook)' when a code
# coverage test is detected. When no coverage test is detected, it will just
# pass the original method untouched.
import threading
from functools import wraps

# Mechanism to support both PyQt and PySide
# -----------------------------------------

PYQT5 = "PyQt5"
PYQT6 = "PyQt6"
PYSIDE2 = "PySide2"
PYSIDE6 = "PySide6"
QT_LIB_ORDER = [PYQT5, PYSIDE2, PYSIDE6, PYQT6]
QT_LIB = None

if QT_LIB is None:
    for lib in QT_LIB_ORDER:
        if lib in sys.modules:
            QT_LIB = lib
            break

if QT_LIB is None:
    for lib in QT_LIB_ORDER:
        try:
            __import__(lib)
            QT_LIB = lib
            break
        except ImportError:
            pass

if QT_LIB is None:
    this_file = __file__.rsplit(os.sep, maxsplit=1)[-1]
    raise ImportError(
        f"{this_file} requires PyQt5, PyQt6, PySide2 or PySide6; "
        "none of these packages could be imported."
    )

# fmt: off
# pylint: disable=import-error, no-name-in-module
if QT_LIB == PYQT5:
    from PyQt5 import QtCore                               # type: ignore
    from PyQt5.QtCore import pyqtSlot as Slot              # type: ignore
    from PyQt5.QtCore import pyqtSignal as Signal          # type: ignore
elif QT_LIB == PYQT6:
    from PyQt6 import QtCore                               # type: ignore
    from PyQt6.QtCore import pyqtSlot as Slot              # type: ignore
    from PyQt6.QtCore import pyqtSignal as Signal          # type: ignore
elif QT_LIB == PYSIDE2:
    from PySide2 import QtCore                             # type: ignore
    from PySide2.QtCore import Slot                        # type: ignore
    from PySide2.QtCore import Signal                      # type: ignore
elif QT_LIB == PYSIDE6:
    from PySide6 import QtCore                             # type: ignore
    from PySide6.QtCore import Slot                        # type: ignore
    from PySide6.QtCore import Signal                      # type: ignore
# pylint: enable=import-error, no-name-in-module
# fmt: on

# \end[Mechanism to support both PyQt and PySide]
# -----------------------------------------------

import numpy as np

from dvg_debug_functions import (
    print_fancy_traceback as pft,
    dprint,
    tprint,
    ANSI,
)

running_coverage = "coverage" in sys.modules
if running_coverage:
    print("\nCode coverage test detected\n")


def _coverage_resolve_trace(fn):
    @wraps(fn)
    def wrapped(*args, **kwargs):
        if running_coverage:
            sys.settrace(threading._trace_hook)  # type: ignore
        fn(*args, **kwargs)

    return wrapped


# Short-hand alias for DEBUG information
def _cur_thread_name():
    return QtCore.QThread.currentThread().objectName()


[docs]@unique class DAQ_TRIGGER(IntEnum): """An enumeration decoding different modes of operation for :class:`Worker_DAQ` to perform a data-acquisition (DAQ) update. """ # fmt: off INTERNAL_TIMER = 0 #: :ref:`Link to background information <INTERNAL_TIMER>`. SINGLE_SHOT_WAKE_UP = 1 #: :ref:`Link to background information <SINGLE_SHOT_WAKE_UP>`. CONTINUOUS = 2 #: :ref:`Link to background information <CONTINUOUS>`.
# fmt: on # ------------------------------------------------------------------------------ # QDeviceIO # ------------------------------------------------------------------------------
[docs]class QDeviceIO(QtCore.QObject): """This class provides the framework for multithreaded data acquisition (DAQ) and communication with an I/O device. All device I/O operations will be offloaded to *workers*, each running in their dedicated thread. The following workers can be created: * :class:`Worker_DAQ` : Acquires data from the device, either periodically or aperiodically. Created by calling :meth:`create_worker_DAQ`. * :class:`Worker_jobs` : Maintains a thread-safe queue where desired device I/O operations, called *jobs*, can be put onto. It will send out the queued jobs first-in, first-out (FIFO) to the device. Created by calling :meth:`create_worker_jobs`. Tip: You can inherit from `QDeviceIO` to build your own subclass that hides the specifics of creating :class:`Worker_DAQ` and :class:`Worker_jobs` from the user and modifies the default parameter values. E.g., when making a `QDeviceIO` subclass specific to your Arduino project:: from dvg_qdeviceio import QDeviceIO, DAQ_TRIGGER class Arduino_qdev(QDeviceIO): def __init__( self, dev=None, DAQ_function=None, debug=False, **kwargs, ): # Pass `dev` onto QDeviceIO() and pass `**kwargs` onto QtCore.QObject() super().__init__(dev, **kwargs) # Set the DAQ to 10 Hz internal timer self.create_worker_DAQ( DAQ_trigger = DAQ_TRIGGER.INTERNAL_TIMER, DAQ_function = DAQ_function, DAQ_interval_ms = 100, # 100 ms -> 10 Hz critical_not_alive_count = 3, debug = debug, ) # Standard jobs handling self.create_worker_jobs(debug=debug) Now, the user only has to call the following to get up and running:: ard_qdev = Arduino_qdev( dev=my_Arduino_device, DAQ_function=my_DAQ_function ) ard_qdev.start() .. _`QDeviceIO_args`: Args: dev (:obj:`object` | :obj:`None`, optional): Reference to a user-supplied *device* class instance containing I/O methods. In addition, `dev` should also have the following members. If not, they will be injected into the `dev` instance for you: * **dev.name** (:obj:`str`) -- Short display name for the \ device. Default: "myDevice". * **dev.mutex** (:class:`PyQt5.QtCore.QMutex`) -- To allow \ for properly multithreaded device I/O operations. It will \ be used by :class:`Worker_DAQ` and :class:`Worker_jobs`. * **dev.is_alive** (:obj:`bool`) -- Device is up and \ communicatable? Default: :const:`True`. Default: :obj:`None` **kwargs: All remaining keyword arguments will be passed onto inherited class :class:`PyQt5.QtCore.QObject`. .. _`QDeviceIO_attributes`: .. rubric:: Attributes: Attributes: dev (:obj:`object` | :obj:`None`): Reference to a user-supplied *device* class instance containing I/O methods. worker_DAQ (:class:`Worker_DAQ` | :obj:`None`): Instance of :class:`Worker_DAQ` as created by :meth:`create_worker_DAQ`. This worker runs in a dedicated thread. worker_jobs (:class:`Worker_jobs` | :obj:`None`): Instance of :class:`Worker_jobs` as created by :meth:`create_worker_jobs`. This worker runs in a dedicated thread. update_counter_DAQ (:obj:`int`): Increments every time :attr:`worker_DAQ` tries to update. update_counter_jobs (:obj:`int`): Increments every time :attr:`worker_jobs` tries to update. obtained_DAQ_interval_ms (:obj:`int` | :obj:`numpy.nan`): Obtained time interval in milliseconds since the previous :attr:`worker_DAQ` update. obtained_DAQ_rate_Hz (:obj:`float` | :obj:`numpy.nan`): Obtained acquisition rate of :attr:`worker_DAQ` in hertz. It will take several DAQ updates for the value to be properly calculated, and till that time it will be :obj:`numpy.nan`. not_alive_counter_DAQ (:obj:`int`): Number of consecutive failed attempts to update :attr:`worker_DAQ`, presumably due to device I/O errors. Will be reset to 0 once a successful DAQ update occurs. See the :obj:`signal_connection_lost()` mechanism. """ signal_DAQ_updated = Signal() """:obj:`PyQt5.QtCore.pyqtSignal`: Emitted by :class:`Worker_DAQ` when its :attr:`~Worker_DAQ.DAQ_function` has run and finished, either succesfully or not. Tip: It can be useful to connect this signal to a slot containing, e.g., your GUI redraw routine:: from PyQt5 import QtCore @QtCore.pyqtSlot() def my_GUI_redraw_routine(): ... qdev.signal_DAQ_updated.connect(my_GUI_redraw_routine) where ``qdev`` is your instance of :class:`QDeviceIO`. Don't forget to decorate the function definition with a :func:`PyQt5.QtCore.pyqtSlot` decorator. """ signal_jobs_updated = Signal() """:obj:`PyQt5.QtCore.pyqtSignal`: Emitted by :class:`Worker_jobs` when all pending jobs in the queue have been sent out to the device in a response to :meth:`send` or :meth:`process_jobs_queue`. See also the tip at :obj:`signal_DAQ_updated()`. """ signal_DAQ_paused = Signal() """:obj:`PyQt5.QtCore.pyqtSignal`: Emitted by :class:`Worker_DAQ` to confirm the worker has entered the `paused` state in a response to :meth:`Worker_DAQ.pause`. See also the tip at :obj:`signal_DAQ_updated()`. """ signal_connection_lost = Signal() """:obj:`PyQt5.QtCore.pyqtSignal`: Emitted by :class:`Worker_DAQ` to indicate that we have lost connection to the device. This happens when `N` consecutive device I/O operations have failed, where `N` equals the argument :obj:`critical_not_alive_count` as passed to method :meth:`create_worker_DAQ`. See also the tip at :obj:`signal_DAQ_updated()`. """ # Necessary for INTERNAL_TIMER _request_worker_DAQ_stop = Signal() # Necessary for CONTINUOUS _request_worker_DAQ_pause = Signal() _request_worker_DAQ_unpause = Signal() def __init__(self, dev=None, **kwargs): super().__init__(**kwargs) # Pass **kwargs onto QtCore.QObject() self.dev = self._NoDevice() if dev is not None: self.attach_device(dev) self._thread_DAQ = None self._thread_jobs = None self.worker_DAQ = None self.worker_jobs = None self.update_counter_DAQ = 0 self.update_counter_jobs = 0 self.not_alive_counter_DAQ = 0 self.obtained_DAQ_interval_ms = np.nan self.obtained_DAQ_rate_Hz = np.nan self._qwc_worker_DAQ_started = QtCore.QWaitCondition() self._qwc_worker_jobs_started = QtCore.QWaitCondition() self._qwc_worker_DAQ_stopped = QtCore.QWaitCondition() self._qwc_worker_jobs_stopped = QtCore.QWaitCondition() self._mutex_wait_worker_DAQ = QtCore.QMutex() self._mutex_wait_worker_jobs = QtCore.QMutex() class _NoDevice: name = "NoDevice" # -------------------------------------------------------------------------- # attach_device # --------------------------------------------------------------------------
[docs] def attach_device(self, dev): """Attach a reference to a user-supplied *device* class instance containing I/O methods. In addition, `dev` should also have the following members. If not, they will be injected into the `dev` instance for you: * **dev.name** (:obj:`str`) -- Short display name for the \ device. Default: "myDevice". * **dev.mutex** (:class:`PyQt5.QtCore.QMutex`) -- To allow \ for properly multithreaded device I/O operations. It will \ be used by :class:`Worker_DAQ` and :class:`Worker_jobs`. * **dev.is_alive** (:obj:`bool`) -- Device is up and \ communicatable? Default: :const:`True`. Args: dev (:obj:`object`): Reference to a user-supplied *device* class instance containing I/O methods. """ if not hasattr(dev, "name"): dev.name = "myDevice" if not hasattr(dev, "mutex"): dev.mutex = QtCore.QMutex() if not hasattr(dev, "is_alive"): dev.is_alive = True # Assume the device is alive from the start if isinstance(self.dev, self._NoDevice): self.dev = dev else: pft( "Device can be attached only once. Already attached to " f"'{self.dev.name}'." ) sys.exit(22)
# -------------------------------------------------------------------------- # Create workers # --------------------------------------------------------------------------
[docs] def create_worker_DAQ(self, **kwargs): """Create and configure an instance of :class:`Worker_DAQ` and transfer it to a new :class:`PyQt5.QtCore.QThread`. Args: **kwargs Will be passed directly to :class:`Worker_DAQ` as initialization parameters, :ref:`see here <Worker_DAQ_args>`. """ if isinstance(self.dev, self._NoDevice): pft( "Can't create worker_DAQ, because there is no device attached." " Did you forget to call 'attach_device()' first?" ) sys.exit(99) self.worker_DAQ = Worker_DAQ(qdev=self, **kwargs) self._request_worker_DAQ_stop.connect(self.worker_DAQ._stop) self._request_worker_DAQ_pause.connect(self.worker_DAQ.pause) self._request_worker_DAQ_unpause.connect(self.worker_DAQ.unpause) self._thread_DAQ = QtCore.QThread() self._thread_DAQ.setObjectName(f"{self.dev.name}_DAQ") self._thread_DAQ.started.connect(self.worker_DAQ._do_work) self.worker_DAQ.moveToThread(self._thread_DAQ) if hasattr(self.worker_DAQ, "_timer"): self.worker_DAQ._timer.moveToThread(self._thread_DAQ)
[docs] def create_worker_jobs(self, **kwargs): """Create and configure an instance of :class:`Worker_jobs` and transfer it to a new :class:`PyQt5.QtCore.QThread`. Args: **kwargs Will be passed directly to :class:`Worker_jobs` as initialization parameters, :ref:`see here <Worker_jobs_args>`. """ if isinstance(self.dev, self._NoDevice): pft( "Can't create worker_jobs, because there is no device attached." " Did you forget to call 'attach_device()' first?" ) sys.exit(99) self.worker_jobs = Worker_jobs(qdev=self, **kwargs) self._thread_jobs = QtCore.QThread() self._thread_jobs.setObjectName(f"{self.dev.name}_jobs") self._thread_jobs.started.connect(self.worker_jobs._do_work) self.worker_jobs.moveToThread(self._thread_jobs)
# -------------------------------------------------------------------------- # Start workers # --------------------------------------------------------------------------
[docs] def start( self, DAQ_priority=QtCore.QThread.Priority.InheritPriority, jobs_priority=QtCore.QThread.Priority.InheritPriority, ) -> bool: """Start the event loop of all of any created workers. Args: DAQ_priority (:obj:`PyQt5.QtCore.QThread.Priority`, optional): By default, the *worker* threads run in the operating system at the same thread priority as the *main/GUI* thread. You can change to higher priority by setting `priority` to, e.g., :const:`PyQt5.QtCore.QThread.TimeCriticalPriority`. Be aware that this is resource heavy, so use sparingly. Default: :const:`PyQt5.QtCore.QThread.Priority.InheritPriority`. jobs_priority (:obj:`PyQt5.QtCore.QThread.Priority`, optional): Like `DAQ_priority`. Default: :const:`PyQt5.QtCore.QThread.Priority.InheritPriority`. Returns: True if successful, False otherwise. """ success = True if self._thread_jobs is not None: success &= self.start_worker_jobs(priority=jobs_priority) if self._thread_DAQ is not None: success &= self.start_worker_DAQ(priority=DAQ_priority) return success
[docs] def start_worker_DAQ( self, priority=QtCore.QThread.Priority.InheritPriority ) -> bool: """Start the data acquisition with the device by starting the event loop of the :attr:`worker_DAQ` thread. Args: priority (:const:`PyQt5.QtCore.QThread.Priority.Priority`, optional): See :meth:`start` for details. Returns: True if successful, False otherwise. """ if self._thread_DAQ is None: pft( f"Worker_DAQ {self.dev.name}: Can't start thread, because it " "does not exist. Did you forget to call 'create_worker_DAQ()' " "first?" ) sys.exit(404) # --> leaving elif not self.dev.is_alive: dprint( f"Worker_DAQ {self.dev.name}: " "WARNING - Device is not alive.\n", ANSI.RED, ) return False # --> leaving if self.worker_DAQ.debug: tprint( f"Worker_DAQ {self.dev.name}: start requested...", ANSI.WHITE, ) self._thread_DAQ.start(priority) # Wait for worker_DAQ to confirm having started locker_wait = QtCore.QMutexLocker(self._mutex_wait_worker_DAQ) self._qwc_worker_DAQ_started.wait(self._mutex_wait_worker_DAQ) locker_wait.unlock() if self.worker_DAQ._DAQ_trigger == DAQ_TRIGGER.SINGLE_SHOT_WAKE_UP: # Wait a tiny amount of extra time for the worker to have entered # 'self._qwc.wait(self._mutex_wait)' of method '_do_work()'. # Unfortunately, we can't use # 'QTimer.singleShot(500, confirm_has_started(self))' # inside the '_do_work()' routine, because it won't never resolve # due to the upcoming blocking 'self._qwc.wait(self._mutex_wait)'. # Hence, we use a blocking 'time.sleep()' here. Also note we can't # use 'QtCore.QCoreApplication.processEvents()' instead of # 'time.sleep()', because it involves a QWaitCondition and not an # signal event. time.sleep(0.05) if self.worker_DAQ._DAQ_trigger == DAQ_TRIGGER.CONTINUOUS: # We expect a 'signal_DAQ_paused' being emitted at start-up by this # worker. Make sure this signal gets processed as soon as possible, # and prior to any other subsequent actions the user might request # from this worker after having returned back from the user's call # to 'start_worker_DAQ()'. QtCore.QCoreApplication.processEvents() return True
[docs] def start_worker_jobs( self, priority=QtCore.QThread.Priority.InheritPriority ) -> bool: """Start maintaining the jobs queue by starting the event loop of the :attr:`worker_jobs` thread. Args: priority (:obj:`PyQt5.QtCore.QThread.Priority`, optional): See :meth:`start` for details. Returns: True if successful, False otherwise. """ if self._thread_jobs is None: pft( f"Worker_jobs {self.dev.name}: Can't start thread because it " "does not exist. Did you forget to call 'create_worker_jobs()' " "first?" ) sys.exit(404) # --> leaving elif not self.dev.is_alive: dprint( f"Worker_jobs {self.dev.name}: " "WARNING - Device is not alive.\n", ANSI.RED, ) return False # --> leaving if self.worker_jobs.debug: tprint( f"Worker_jobs {self.dev.name}: start requested...", ANSI.WHITE, ) self._thread_jobs.start(priority) # Wait for worker_jobs to confirm having started locker_wait = QtCore.QMutexLocker(self._mutex_wait_worker_jobs) self._qwc_worker_jobs_started.wait(self._mutex_wait_worker_jobs) locker_wait.unlock() # Wait a tiny amount of extra time for the worker to have entered # 'self._qwc.wait(self._mutex_wait)' of method '_do_work()'. # Unfortunately, we can't use # 'QTimer.singleShot(500, confirm_has_started(self))' # inside the '_do_work()' routine, because it won't never resolve # due to the upcoming blocking 'self._qwc.wait(self._mutex_wait)'. # Hence, we use a blocking 'time.sleep()' here. Also note we can't # use 'QtCore.QCoreApplication.processEvents()' instead of # 'time.sleep()', because it involves a QWaitCondition and not an # signal event. time.sleep(0.05) return True
# -------------------------------------------------------------------------- # Quit workers # --------------------------------------------------------------------------
[docs] def quit(self) -> bool: """Stop all of any running workers and close their respective threads. Returns: True if successful, False otherwise. """ return self.quit_worker_DAQ() & self.quit_worker_jobs()
[docs] def quit_worker_DAQ(self) -> bool: """Stop :attr:`worker_DAQ` and close its thread. Returns: True if successful, False otherwise. """ if self._thread_DAQ is None or not self.worker_DAQ._has_started: return True if self._thread_DAQ.isFinished(): # CASE: Device has had a 'connection_lost' event during run-time, # which already stopped and closed the thread. print( "Closing thread " f"{self._thread_DAQ.objectName():.<16} already closed." ) return True if not self.worker_DAQ._has_stopped: if self.worker_DAQ.debug: tprint( f"Worker_DAQ {self.dev.name}: stop requested...", ANSI.WHITE, ) if self.worker_DAQ._DAQ_trigger == DAQ_TRIGGER.INTERNAL_TIMER: # The QTimer inside the INTERNAL_TIMER '_do_work()'-routine # /must/ be stopped from within the worker_DAQ thread. Hence, # we must use a signal from out of this different thread. self._request_worker_DAQ_stop.emit() elif ( self.worker_DAQ._DAQ_trigger == DAQ_TRIGGER.SINGLE_SHOT_WAKE_UP ): # The QWaitCondition inside the SINGLE_SHOT_WAKE_UP '_do_work()' # routine will likely have locked worker_DAQ. Hence, a # '_request_worker_DAQ_stop' signal as above might not get # handled by worker_DAQ when emitted from out of this thread. # Instead, we directly call '_stop()' from out of this different # thread, which is perfectly fine for SINGLE_SHOT_WAKE_UP as per # my design. self.worker_DAQ._stop() elif self.worker_DAQ._DAQ_trigger == DAQ_TRIGGER.CONTINUOUS: # We directly call '_stop()' from out of this different thread, # which is perfectly fine for CONTINUOUS as per my design. self.worker_DAQ._stop() # Wait for worker_DAQ to confirm having stopped locker_wait = QtCore.QMutexLocker(self._mutex_wait_worker_DAQ) self._qwc_worker_DAQ_stopped.wait(self._mutex_wait_worker_DAQ) locker_wait.unlock() self._thread_DAQ.quit() print(f"Closing thread {self._thread_DAQ.objectName():.<16} ", end="") if self._thread_DAQ.wait(2000): print("done.\n", end="") return True print("FAILED.\n", end="") # pragma: no cover return False # pragma: no cover
[docs] def quit_worker_jobs(self) -> bool: """Stop :attr:`worker_jobs` and close its thread. Returns: True if successful, False otherwise. """ if self._thread_jobs is None or not self.worker_jobs._has_started: return True if self._thread_jobs.isFinished(): # CASE: Device has had a 'connection_lost' event during run-time, # which already stopped the worker and closed the thread. print( "Closing thread " f"{self._thread_jobs.objectName():.<16} already closed." ) return True if not self.worker_jobs._has_stopped: if self.worker_jobs.debug: tprint( f"Worker_jobs {self.dev.name}: stop requested...", ANSI.WHITE, ) # The QWaitCondition inside the SINGLE_SHOT_WAKE_UP '_do_work()'- # routine will likely have locked worker_DAQ. Hence, a # '_request_worker_DAQ_stop' signal might not get handled by # worker_DAQ when emitted from out of this thread. Instead, # we directly call '_stop()' from out of this different thread, # which is perfectly fine as per my design. self.worker_jobs._stop() # Wait for worker_jobs to confirm having stopped locker_wait = QtCore.QMutexLocker(self._mutex_wait_worker_jobs) self._qwc_worker_jobs_stopped.wait(self._mutex_wait_worker_jobs) locker_wait.unlock() self._thread_jobs.quit() print(f"Closing thread {self._thread_jobs.objectName():.<16} ", end="") if self._thread_jobs.wait(2000): print("done.\n", end="") return True print("FAILED.\n", end="") # pragma: no cover return False # pragma: no cover
# -------------------------------------------------------------------------- # worker_DAQ related # --------------------------------------------------------------------------
[docs] @Slot() def pause_DAQ(self): """Only useful in mode :const:`DAQ_TRIGGER.CONTINUOUS`. Request :attr:`worker_DAQ` to pause and stop listening for data. After :attr:`worker_DAQ` has achieved the `paused` state, it will emit :obj:`signal_DAQ_paused()`. """ if self.worker_DAQ is not None: self._request_worker_DAQ_pause.emit()
[docs] @Slot() def unpause_DAQ(self): """Only useful in mode :const:`DAQ_TRIGGER.CONTINUOUS`. Request :attr:`worker_DAQ` to resume listening for data. Once :attr:`worker_DAQ` has successfully resumed, it will emit :obj:`signal_DAQ_updated()` for every DAQ update. """ if self.worker_DAQ is not None: self._request_worker_DAQ_unpause.emit()
[docs] @Slot() def wake_up_DAQ(self): """Only useful in mode :const:`DAQ_TRIGGER.SINGLE_SHOT_WAKE_UP`. Request :attr:`worker_DAQ` to wake up and perform a single update, i.e. run :attr:`~Worker_DAQ.DAQ_function` once. It will emit :obj:`signal_DAQ_updated()` after :attr:`~Worker_DAQ.DAQ_function` has run, either successful or not. """ if self.worker_DAQ is not None: self.worker_DAQ.wake_up()
# -------------------------------------------------------------------------- # worker_jobs related # --------------------------------------------------------------------------
[docs] @Slot() def send(self, instruction, pass_args=()): """Put a job on the :attr:`worker_jobs` queue and send out the full queue first-in, first-out to the device until empty. Once finished, it will emit :obj:`signal_jobs_updated()`. Args: instruction (:obj:`function` | *other*): Intended to be a reference to a device I/O method such as ``dev.write()``. Any arguments to be passed to the I/O method need to be set in the :attr:`pass_args` parameter. You have the freedom to be creative and put, e.g., strings decoding special instructions on the queue as well. Handling such special cases must be programmed by supplying the argument :obj:`jobs_function` with your own alternative job-processing-routine function during the initialization of :class:`Worker_jobs`. :ref:`See here <Worker_jobs_args>`. pass_args (:obj:`tuple` | *other*, optional): Arguments to be passed to the instruction. Must be given as a :obj:`tuple`, but for convenience any other type will also be accepted if it just concerns a single argument. Default: :obj:`()`. Example:: qdev.send(dev.write, "toggle LED") where ``qdev`` is your :class:`QDeviceIO` class instance and ``dev`` is your *device* class instance containing I/O methods. """ if self.worker_jobs is not None: self.worker_jobs.send(instruction, pass_args)
[docs] @Slot() def add_to_jobs_queue(self, instruction, pass_args=()): """Put a job on the :attr:`worker_jobs` queue. See :meth:`send` for details on the parameters. """ if self.worker_jobs is not None: self.worker_jobs.add_to_queue(instruction, pass_args)
[docs] @Slot() def process_jobs_queue(self): """Send out the full :attr:`worker_jobs` queue first-in, first-out to the device until empty. Once finished, it will emit :obj:`signal_jobs_updated()`. """ if self.worker_jobs is not None: self.worker_jobs.process_queue()
# ------------------------------------------------------------------------------ # Worker_DAQ # ------------------------------------------------------------------------------
[docs]class Worker_DAQ(QtCore.QObject): """This worker acquires data from the I/O device, either periodically or aperiodically. It does so by calling a user-supplied function, passed as initialization parameter :ref:`DAQ_function <arg_DAQ_function>`, containing device I/O operations and subsequent data processing, every time the worker *updates*. There are different modes of operation for this worker to perform an *update*. This is set by initialization parameter :ref:`DAQ_trigger <arg_DAQ_trigger>`. An instance of this worker will be created and placed inside a new thread by a call to :meth:`QDeviceIO.create_worker_DAQ`. The *Worker_DAQ* routine is robust in the following sense. It can be set to quit as soon as a communication error appears, or it could be set to allow a certain number of communication errors before it quits. The latter can be useful in non-critical implementations where continuity of the program is of more importance than preventing drops in data transmission. This, obviously, is a work-around for not having to tackle the source of the communication error, but sometimes you just need to struggle on. E.g., when your Arduino is out in the field and picks up occasional unwanted interference/ground noise that messes with your data transmission. See initialization parameter :obj:`critical_not_alive_count`. .. _`Worker_DAQ_args`: Args: qdev (:class:`QDeviceIO`): Reference to the parent :class:`QDeviceIO` class instance, automatically set when being initialized by :meth:`QDeviceIO.create_worker_DAQ`. .. _`arg_DAQ_trigger`: DAQ_trigger (:obj:`int`, optional): Mode of operation. See :class:`DAQ_TRIGGER`. Default: :const:`DAQ_TRIGGER.INTERNAL_TIMER`. .. _`arg_DAQ_function`: DAQ_function (:obj:`function` | :obj:`None`, optional): Reference to a user-supplied function containing the device I/O operations and subsequent data processing, to be invoked every DAQ update. Default: :obj:`None`. Important: The function must return :const:`True` when the communication with the device was successful, and :const:`False` otherwise. Warning: **Neither directly change the GUI, nor print to the terminal from out of this function.** Doing so might temporarily suspend the function and could mess with the timing stability of the worker. (You're basically undermining the reason to have multithreading in the first place). That could be acceptable, though, when you need to print debug or critical error information to the terminal, but be aware about the possible negative effects. Instead, connect to :meth:`QDeviceIO.signal_DAQ_updated` from out of the *main/GUI* thread to instigate changes to the terminal/GUI when needed. Example: Pseudo-code, where ``time`` and ``temperature`` are variables that live at a higher scope, presumably at the *main* scope level. The function ``dev.query_temperature()`` contains the device I/O operations, e.g., sending out a query over RS232 and collecting the device reply. In addition, the function notifies if the communication was successful. Hence, the return values of ``dev.query_temperature()`` are ``success`` as boolean and ``reply`` as a tuple containing a time stamp and a temperature reading. :: def my_DAQ_function(): [success, reply] = dev.query_temperature() if not(success): print("Device IOerror") return False # Return failure # Parse readings into separate variables and store them try: [time, temperature] = reply except Exception as err: print(err) return False # Return failure return True # Return success DAQ_interval_ms (:obj:`int`, optional): Only useful in mode :const:`DAQ_TRIGGER.INTERNAL_TIMER`. Desired data-acquisition update interval in milliseconds. Default: :const:`100`. DAQ_timer_type (:obj:`PyQt5.QtCore.Qt.TimerType`, optional): Only useful in mode :const:`DAQ_TRIGGER.INTERNAL_TIMER`. The update interval is timed to a :class:`PyQt5.QtCore.QTimer` running inside :class:`Worker_DAQ`. The default value :const:`PyQt5.QtCore.Qt.TimerType.PreciseTimer` tries to ensure the best possible timer accuracy, usually ~1 ms granularity depending on the OS, but it is resource heavy so use sparingly. One can reduce the CPU load by setting it to less precise timer types :const:`PyQt5.QtCore.Qt.TimerType.CoarseTimer` or :const:`PyQt5.QtCore.Qt.TimerType.VeryCoarseTimer`. Default: :const:`PyQt5.QtCore.Qt.TimerType.PreciseTimer`. .. _`arg_critical_not_alive_count`: critical_not_alive_count (:obj:`int`, optional): The worker will allow for up to a certain number of consecutive communication failures with the device, before hope is given up and a :meth:`QDeviceIO.signal_connection_lost` is emitted. Use at your own discretion. Default: :const:`1`. debug (:obj:`bool`, optional): Print debug info to the terminal? Warning: Slow! Do not leave on unintentionally. Default: :const:`False`. **kwargs: All remaining keyword arguments will be passed onto inherited class :class:`PyQt5.QtCore.QObject`. .. rubric:: Attributes: Attributes: qdev (:class:`QDeviceIO`): Reference to the parent :class:`QDeviceIO` class instance. dev (:obj:`object` | :obj:`None`): Reference to the user-supplied *device* class instance containing I/O methods, automatically set when calling :meth:`QDeviceIO.create_worker_DAQ`. It is a shorthand for :obj:`self.qdev.dev`. DAQ_function (:obj:`function` | :obj:`None`): See the similarly named :ref:`initialization parameter <arg_DAQ_function>`. critical_not_alive_count (:obj:`int`): See the similarly named :ref:`initialization parameter <arg_critical_not_alive_count>`. """ def __init__( self, qdev, DAQ_trigger=DAQ_TRIGGER.INTERNAL_TIMER, DAQ_function=None, DAQ_interval_ms=100, DAQ_timer_type=QtCore.Qt.TimerType.PreciseTimer, critical_not_alive_count=1, debug=False, **kwargs, ): super().__init__(**kwargs) # Pass **kwargs onto QtCore.QObject() self.debug = debug self.debug_color = ANSI.CYAN self.qdev = qdev self.dev = None if qdev is None else qdev.dev self._DAQ_trigger = DAQ_trigger self.DAQ_function = DAQ_function self._DAQ_interval_ms = DAQ_interval_ms self._DAQ_timer_type = DAQ_timer_type self.critical_not_alive_count = critical_not_alive_count self._has_started = False self._has_stopped = False # Keep track of the obtained DAQ interval and DAQ rate using # QElapsedTimer (QET) self._QET_interval = QtCore.QElapsedTimer() self._QET_rate = QtCore.QElapsedTimer() # Accumulates the number of DAQ updates passed since the previous DAQ # rate evaluation self._rate_accumulator = 0 # Members specifically for INTERNAL_TIMER if self._DAQ_trigger == DAQ_TRIGGER.INTERNAL_TIMER: self._timer = QtCore.QTimer() self._timer.setInterval(DAQ_interval_ms) self._timer.setTimerType(DAQ_timer_type) self._timer.timeout.connect(self._perform_DAQ) # Members specifically for SINGLE_SHOT_WAKE_UP elif self._DAQ_trigger == DAQ_TRIGGER.SINGLE_SHOT_WAKE_UP: self._running = True self._qwc = QtCore.QWaitCondition() self._mutex_wait = QtCore.QMutex() # Members specifically for CONTINUOUS # Note: At start-up, the worker will directly go into a paused state # and trigger a 'signal_DAQ_paused' PyQt signal elif self._DAQ_trigger == DAQ_TRIGGER.CONTINUOUS: self._running = True self._pause = None """Will be set at init of '_do_work()' when 'start_worker_DAQ()' is called""" self._paused = None """Will be set at init of '_do_work()' when 'start_worker_DAQ()' is called""" if self.debug: tprint( f"Worker_DAQ {self.dev.name}: " f"init @ thread {_cur_thread_name()}", self.debug_color, ) @_coverage_resolve_trace @Slot() def _do_work(self): # fmt: off # Uncomment block to enable Visual Studio Code debugger to have access # to this thread. DO NOT LEAVE BLOCK UNCOMMENTED: Running it outside of # the debugger causes crashes. """ if self.debug: import pydevd pydevd.settrace(suspend=False) """ # fmt: on init = True def confirm_has_started(self): # Wait a tiny amount of extra time for QDeviceIO to have entered # 'self._qwc_worker_###_started.wait(self._mutex_wait_worker_###)' # of method 'start_worker_###()'. time.sleep(0.05) if self.debug: tprint( f"Worker_DAQ {self.dev.name}: has started", self.debug_color, ) # Send confirmation self.qdev._qwc_worker_DAQ_started.wakeAll() self._has_started = True if self.debug: tprint( f"Worker_DAQ {self.dev.name}: " f"starting @ thread {_cur_thread_name()}", self.debug_color, ) # INTERNAL_TIMER if self._DAQ_trigger == DAQ_TRIGGER.INTERNAL_TIMER: self._timer.start() confirm_has_started(self) # SINGLE_SHOT_WAKE_UP elif self._DAQ_trigger == DAQ_TRIGGER.SINGLE_SHOT_WAKE_UP: locker_wait = QtCore.QMutexLocker(self._mutex_wait) locker_wait.unlock() while self._running: locker_wait.relock() if self.debug: tprint( f"Worker_DAQ {self.dev.name}: " "waiting for wake-up trigger", self.debug_color, ) if init: confirm_has_started(self) init = False self._qwc.wait(self._mutex_wait) if self.debug: tprint( f"Worker_DAQ {self.dev.name}: has woken up", self.debug_color, ) # Needed check to prevent _perform_DAQ() at final wake up # when _stop() has been called if self._running: self._perform_DAQ() locker_wait.unlock() if self.debug: tprint( f"Worker_DAQ {self.dev.name}: has stopped", self.debug_color, ) # Wait a tiny amount for the other thread to have entered the # QWaitCondition lock, before giving a wakingAll(). QtCore.QTimer.singleShot( 100, self.qdev._qwc_worker_DAQ_stopped.wakeAll, ) self._has_stopped = True # CONTINUOUS elif self._DAQ_trigger == DAQ_TRIGGER.CONTINUOUS: while self._running: QtCore.QCoreApplication.processEvents() # Essential to fire and process signals if init: self._pause = True self._paused = True if self.debug: tprint( f"Worker_DAQ {self.dev.name}: starting up paused", self.debug_color, ) self.qdev.signal_DAQ_paused.emit() confirm_has_started(self) init = False if self._pause: # == True if self._pause != self._paused: if self.debug and not init: tprint( f"Worker_DAQ {self.dev.name}: has paused", self.debug_color, ) self.qdev.signal_DAQ_paused.emit() self._paused = True time.sleep(0.01) # Do not hog the CPU while paused else: # == False if self._pause != self._paused: if self.debug: tprint( f"Worker_DAQ {self.dev.name}: has unpaused", self.debug_color, ) self._paused = False self._perform_DAQ() if self.debug: tprint( f"Worker_DAQ {self.dev.name}: has stopped", self.debug_color, ) # Wait a tiny amount for 'create_worker_DAQ()', which is running # in a different thread than this one, to have entered the # QWaitCondition lock, before giving a wakingAll(). QtCore.QTimer.singleShot( 100, self.qdev._qwc_worker_DAQ_stopped.wakeAll, ) self._has_stopped = True @_coverage_resolve_trace @Slot() def _perform_DAQ(self): locker = QtCore.QMutexLocker(self.dev.mutex) self.qdev.update_counter_DAQ += 1 if self.debug: tprint( f"Worker_DAQ {self.dev.name}: " f"lock # {self.qdev.update_counter_DAQ}", self.debug_color, ) # Keep track of the obtained DAQ interval and DAQ rate if not self._QET_interval.isValid(): self._QET_interval.start() self._QET_rate.start() else: # Obtained DAQ interval self.qdev.obtained_DAQ_interval_ms = self._QET_interval.restart() # Obtained DAQ rate self._rate_accumulator += 1 dT = self._QET_rate.elapsed() if dT >= 1000: # Evaluate every N elapsed milliseconds. Hard-coded. self._QET_rate.restart() try: self.qdev.obtained_DAQ_rate_Hz = ( self._rate_accumulator / dT * 1e3 ) except ZeroDivisionError: # pragma: no cover self.qdev.obtained_DAQ_rate_Hz = np.nan # pragma: no cover self._rate_accumulator = 0 # ---------------------------------- # User-supplied DAQ function # ---------------------------------- if self.DAQ_function is not None: try: success = self.DAQ_function() except Exception as err: # pylint: disable=broad-except pft(err) dprint( f"@ Worker_DAQ {self.dev.name}\n", ANSI.RED, ) else: if success: # Did return True, hence was successfull # --> Reset the 'not alive' counter self.qdev.not_alive_counter_DAQ = 0 else: # Did return False, hence was unsuccessfull self.qdev.not_alive_counter_DAQ += 1 # ---------------------------------- # End user-supplied DAQ function # ---------------------------------- if self.debug: tprint( f"Worker_DAQ {self.dev.name}: " f"unlock # {self.qdev.update_counter_DAQ}", self.debug_color, ) locker.unlock() # Check the not alive counter if self.qdev.not_alive_counter_DAQ >= self.critical_not_alive_count: dprint( f"Worker_DAQ {self.dev.name}: " f"Lost connection to device.", ANSI.RED, ) self.dev.is_alive = False self._stop() self.qdev.signal_connection_lost.emit() return self.qdev.signal_DAQ_updated.emit() @Slot() def _stop(self): """Stop the worker to prepare for quitting the worker thread.""" if self.debug: tprint(f"Worker_DAQ {self.dev.name}: stopping", self.debug_color) if self._DAQ_trigger == DAQ_TRIGGER.INTERNAL_TIMER: # NOTE: The timer /must/ be stopped from the worker_DAQ thread! self._timer.stop() if self.debug: tprint( f"Worker_DAQ {self.dev.name}: has stopped", self.debug_color, ) # Wait a tiny amount for the other thread to have entered the # QWaitCondition lock, before giving a wakingAll(). QtCore.QTimer.singleShot( 100, self.qdev._qwc_worker_DAQ_stopped.wakeAll, ) self._has_stopped = True elif self._DAQ_trigger == DAQ_TRIGGER.SINGLE_SHOT_WAKE_UP: self._running = False self._qwc.wakeAll() # Wake up for the final time elif self._DAQ_trigger == DAQ_TRIGGER.CONTINUOUS: self._running = False # ---------------------------------------------------------------------- # pause / unpause # ----------------------------------------------------------------------
[docs] @Slot() def pause(self): """Only useful in mode :const:`DAQ_TRIGGER.CONTINUOUS`. Pause the worker to stop listening for data. After :attr:`worker_DAQ` has achieved the `paused` state, it will emit :obj:`signal_DAQ_paused()`. This method should not be called from another thread. Connect this slot to a signal instead. """ if self._DAQ_trigger == DAQ_TRIGGER.CONTINUOUS: if self.debug: tprint( f"Worker_DAQ {self.dev.name}: pause requested...", ANSI.WHITE, ) # The possible undefined behavior of changing variable '_pause' # from out of another thread gets handled acceptably correct in # '_do_work()' as per my design. self._pause = True
[docs] @Slot() def unpause(self): """Only useful in mode :const:`DAQ_TRIGGER.CONTINUOUS`. Unpause the worker to resume listening for data. Once :attr:`worker_DAQ` has successfully resumed, it will emit :obj:`signal_DAQ_updated()` for every DAQ update. This method should not be called from another thread. Connect this slot to a signal instead. """ if self._DAQ_trigger == DAQ_TRIGGER.CONTINUOUS: if self.debug: tprint( f"Worker_DAQ {self.dev.name}: unpause requested...", ANSI.WHITE, ) # The possible undefined behavior of changing variable '_pause' # from out of another thread gets handled acceptably correct in # '_do_work()' as per my design. self._pause = False
# ---------------------------------------------------------------------- # wake_up # ----------------------------------------------------------------------
[docs] @Slot() def wake_up(self): """Only useful in mode :const:`DAQ_TRIGGER.SINGLE_SHOT_WAKE_UP`. See the description at :meth:`QDeviceIO.wake_up_DAQ`. This method can be called from another thread. """ if self._DAQ_trigger == DAQ_TRIGGER.SINGLE_SHOT_WAKE_UP: if self.debug: tprint( f"Worker_DAQ {self.dev.name}: wake-up requested...", ANSI.WHITE, ) self._qwc.wakeAll()
# ------------------------------------------------------------------------------ # Worker_jobs # ------------------------------------------------------------------------------
[docs]class Worker_jobs(QtCore.QObject): """This worker maintains a thread-safe queue where desired device I/O operations, called *jobs*, can be put onto. The worker will send out the operations to the device, first-in, first-out (FIFO), until the queue is empty again. The manner in which each job gets handled is explained by initialization parameter :ref:`jobs_function <arg_jobs_function>`. An instance of this worker will be created and placed inside a new thread by a call to :meth:`QDeviceIO.create_worker_jobs`. This worker uses the :class:`PyQt5.QtCore.QWaitCondition` mechanism. Hence, it will only send out all pending jobs on the queue, whenever the thread is woken up by a call to :meth:`Worker_jobs.process_queue()`. When it has emptied the queue, the thread will go back to sleep again. .. _`Worker_jobs_args`: Args: qdev (:class:`QDeviceIO`): Reference to the parent :class:`QDeviceIO` class instance, automatically set when being initialized by :meth:`QDeviceIO.create_worker_jobs`. .. _`arg_jobs_function`: jobs_function (:obj:`function` | :obj:`None`, optional): Routine to be performed per job. Default: :obj:`None`. When omitted and, hence, left set to the default value :obj:`None`, it will perform the default job handling routine, which goes as follows: ``func`` and ``args`` will be retrieved from the jobs queue and their combination ``func(*args)`` will get executed. Respectively, *func* and *args* correspond to *instruction* and *pass_args* of methods :meth:`send` and :meth:`add_to_queue`. The default is sufficient when ``func`` corresponds to an I/O operation that is an one-way send, i.e. a write operation with optionally passed arguments, but without a reply from the device. Alternatively, you can pass it a reference to a user-supplied function performing an alternative job handling routine. This allows you to get creative and put, e.g., special string messages on the queue that decode into, e.g., - multiple write operations to be executed as one block, - query operations whose return values can be acted upon accordingly, - extra data processing in between I/O operations. The function you supply must take two arguments, where the first argument is to be ``func`` and the second argument is to be ``args`` of type :obj:`tuple`. Both ``func`` and ``args`` will be retrieved from the jobs queue and passed onto your supplied function. Warning: **Neither directly change the GUI, nor print to the terminal from out of this function.** Doing so might temporarily suspend the function and could mess with the timing stability of the worker. (You're basically undermining the reason to have multithreading in the first place). That could be acceptable, though, when you need to print debug or critical error information to the terminal, but be aware about this warning. Instead, connect to :meth:`QDeviceIO.signal_jobs_updated` from out of the *main/GUI* thread to instigate changes to the terminal/GUI when needed. Example:: def my_jobs_function(func, args): if func == "query_id?": # Query the device for its identity string [success, ans_str] = dev.query("id?") # And store the reply 'ans_str' in another variable # at a higher scope or do stuff with it here. else: # Default job handling where, e.g. # func = dev.write # args = ("toggle LED",) func(*args) debug (:obj:`bool`, optional): Print debug info to the terminal? Warning: Slow! Do not leave on unintentionally. Default: :const:`False`. **kwargs: All remaining keyword arguments will be passed onto inherited class :class:`PyQt5.QtCore.QObject`. .. rubric:: Attributes: Attributes: qdev (:class:`QDeviceIO`): Reference to the parent :class:`QDeviceIO` class instance. dev (:obj:`object` | :obj:`None`): Reference to the user-supplied *device* class instance containing I/O methods, automatically set when calling :meth:`QDeviceIO.create_worker_jobs`. It is a shorthand for :obj:`self.qdev.dev`. jobs_function (:obj:`function` | :obj:`None`): See the similarly named :ref:`initialization parameter <arg_jobs_function>`. """ def __init__( self, qdev, jobs_function=None, debug=False, **kwargs, ): super().__init__(**kwargs) # Pass **kwargs onto QtCore.QObject() self.debug = debug self.debug_color = ANSI.YELLOW self.qdev = qdev self.dev = None if qdev is None else qdev.dev self.jobs_function = jobs_function self._has_started = False self._has_stopped = False self._running = True self._qwc = QtCore.QWaitCondition() self._mutex_wait = QtCore.QMutex() # Use a 'sentinel' value to signal the start and end of the queue # to ensure proper multithreaded operation. self._sentinel = None self._queue = queue.Queue() self._queue.put(self._sentinel) if self.debug: tprint( f"Worker_jobs {self.dev.name}: " f"init @ thread {_cur_thread_name()}", self.debug_color, ) @_coverage_resolve_trace @Slot() def _do_work(self): # fmt: off # Uncomment block to enable Visual Studio Code debugger to have access # to this thread. DO NOT LEAVE BLOCK UNCOMMENTED: Running it outside of # the debugger causes crashes. """ if self.debug: import pydevd pydevd.settrace(suspend=False) """ # fmt: on init = True def confirm_has_started(self): # Wait a tiny amount of extra time for QDeviceIO to have entered # 'self._qwc_worker_###_started.wait(self._mutex_wait_worker_###)' # of method 'start_worker_###()'. time.sleep(0.05) if self.debug: tprint( f"Worker_jobs {self.dev.name}: has started", self.debug_color, ) # Send confirmation self.qdev._qwc_worker_jobs_started.wakeAll() self._has_started = True if self.debug: tprint( f"Worker_jobs {self.dev.name}: " f"starting @ thread {_cur_thread_name()}", self.debug_color, ) locker_wait = QtCore.QMutexLocker(self._mutex_wait) locker_wait.unlock() while self._running: locker_wait.relock() if self.debug: tprint( f"Worker_jobs {self.dev.name}: waiting for wake-up trigger", self.debug_color, ) if init: confirm_has_started(self) init = False self._qwc.wait(self._mutex_wait) if self.debug: tprint( f"Worker_jobs {self.dev.name}: has woken up", self.debug_color, ) # Needed check to prevent _perform_jobs() at final wake up # when _stop() has been called if self._running: self._perform_jobs() locker_wait.unlock() if self.debug: tprint( f"Worker_jobs {self.dev.name}: has stopped", self.debug_color, ) # Wait a tiny amount for the other thread to have entered the # QWaitCondition lock, before giving a wakingAll(). QtCore.QTimer.singleShot( 100, self.qdev._qwc_worker_jobs_stopped.wakeAll, ) self._has_stopped = True @_coverage_resolve_trace @Slot() def _perform_jobs(self): locker = QtCore.QMutexLocker(self.dev.mutex) self.qdev.update_counter_jobs += 1 if self.debug: tprint( f"Worker_jobs {self.dev.name}: " f"lock # {self.qdev.update_counter_jobs}", self.debug_color, ) # Process all jobs until the queue is empty. We must iterate 2 times # because we use a sentinel in a FIFO queue. First iter removes the old # sentinel. Second iter processes the remaining queue items and will put # back a new sentinel again. for _i in range(2): for job in iter(self._queue.get_nowait, self._sentinel): func = job[0] args = job[1:] if self.debug: if isinstance(func, str): tprint( f"Worker_jobs {self.dev.name}: " f"{func} " f"{args}", self.debug_color, ) else: tprint( f"Worker_jobs {self.dev.name}: " f"{func.__name__} " f"{args}", self.debug_color, ) if self.jobs_function is None: # Default job processing: # Send I/O operation to the device try: func(*args) except Exception as err: # pylint: disable=broad-except pft(err) dprint( f"@ Worker_jobs {self.dev.name}\n", ANSI.RED, ) else: # User-supplied job processing self.jobs_function(func, args) # Put sentinel back in self._queue.put(self._sentinel) if self.debug: tprint( f"Worker_jobs {self.dev.name}: " f"unlock # {self.qdev.update_counter_jobs}", self.debug_color, ) locker.unlock() self.qdev.signal_jobs_updated.emit() @Slot() def _stop(self): """Stop the worker to prepare for quitting the worker thread""" if self.debug: tprint( f"Worker_jobs {self.dev.name}: stopping", self.debug_color, ) self._running = False self._qwc.wakeAll() # Wake up for the final time # ---------------------------------------------------------------------- # send # ----------------------------------------------------------------------
[docs] def send(self, instruction, pass_args=()): """See the description at :meth:`QDeviceIO.send`. This method can be called from another thread. """ self.add_to_queue(instruction, pass_args) self.process_queue()
# ---------------------------------------------------------------------- # add_to_queue # ----------------------------------------------------------------------
[docs] def add_to_queue(self, instruction, pass_args=()): """See the description at :meth:`QDeviceIO.add_to_jobs_queue`. This method can be called from another thread. """ if not isinstance(pass_args, tuple): pass_args = (pass_args,) self._queue.put((instruction, *pass_args))
# ---------------------------------------------------------------------- # process_queue # ----------------------------------------------------------------------
[docs] def process_queue(self): """See the description at :meth:`QDeviceIO.process_jobs_queue`. This method can be called from another thread. """ if self.debug: tprint( f"Worker_jobs {self.dev.name}: wake-up requested...", ANSI.WHITE, ) self._qwc.wakeAll()