QDeviceIO

class dvg_qdeviceio.QDeviceIO(dev=None, **kwargs)[source]

Bases: QObject

This class provides the framework for multithreaded data acquisition (DAQ) and communication with an I/O device.

All device I/O operations will be offloaded to workers, each running in their dedicated thread. The following workers can be created:

  • Worker_DAQ :

    Acquires data from the device, either periodically or aperiodically.

    Created by calling create_worker_DAQ().

  • Worker_jobs :

    Maintains a thread-safe queue where desired device I/O operations, called jobs, can be put onto. It will send out the queued jobs first-in, first-out (FIFO) to the device.

    Created by calling create_worker_jobs().

Tip

You can inherit from QDeviceIO to build your own subclass that hides the specifics of creating Worker_DAQ and Worker_jobs from the user and modifies the default parameter values. E.g., when making a QDeviceIO subclass specific to your Arduino project:

from dvg_qdeviceio import QDeviceIO, DAQ_TRIGGER

class Arduino_qdev(QDeviceIO):
    def __init__(
        self, dev=None, DAQ_function=None, debug=False, **kwargs,
    ):
        # Pass `dev` onto QDeviceIO() and pass `**kwargs` onto QtCore.QObject()
        super().__init__(dev, **kwargs)

        # Set the DAQ to 10 Hz internal timer
        self.create_worker_DAQ(
            DAQ_trigger                = DAQ_TRIGGER.INTERNAL_TIMER,
            DAQ_function               = DAQ_function,
            DAQ_interval_ms            = 100,  # 100 ms -> 10 Hz
            critical_not_alive_count   = 3,
            debug                      = debug,
        )

        # Standard jobs handling
        self.create_worker_jobs(debug=debug)

Now, the user only has to call the following to get up and running:

ard_qdev = Arduino_qdev(
    dev=my_Arduino_device,
    DAQ_function=my_DAQ_function
)
ard_qdev.start()
Parameters:
  • dev (object | None, optional) – Reference to a user-supplied device class instance containing I/O methods. In addition, dev should also have the following members. If not, they will be injected into the dev instance for you:

    • dev.name (str) – Short display name for the device. Default: “myDevice”.
    • dev.mutex (QMutex) – To allow for properly multithreaded device I/O operations. It will be used by Worker_DAQ and Worker_jobs.
    • dev.is_alive (bool) – Device is up and communicatable? Default: True.

    Default: None

  • **kwargs – All remaining keyword arguments will be passed onto inherited class QObject.

Attributes:

dev

Reference to a user-supplied device class instance containing I/O methods.

Type:object | None
worker_DAQ

Instance of Worker_DAQ as created by create_worker_DAQ(). This worker runs in a dedicated thread.

Type:Worker_DAQ | None
worker_jobs

Instance of Worker_jobs as created by create_worker_jobs(). This worker runs in a dedicated thread.

Type:Worker_jobs | None
update_counter_DAQ

Increments every time worker_DAQ tries to update.

Type:int
update_counter_jobs

Increments every time worker_jobs tries to update.

Type:int
obtained_DAQ_interval_ms

Obtained time interval in milliseconds since the previous worker_DAQ update.

Type:int | numpy.nan
obtained_DAQ_rate_Hz

Obtained acquisition rate of worker_DAQ in hertz. It will take several DAQ updates for the value to be properly calculated, and till that time it will be numpy.nan.

Type:float | numpy.nan
not_alive_counter_DAQ

Number of consecutive failed attempts to update worker_DAQ, presumably due to device I/O errors. Will be reset to 0 once a successful DAQ update occurs. See the signal_connection_lost() mechanism.

Type:int

Signals

QDeviceIO.signal_DAQ_updated

Emitted by Worker_DAQ when its DAQ_function has run and finished, either succesfully or not.

Tip

It can be useful to connect this signal to a slot containing, e.g., your GUI redraw routine:

from PyQt5 import QtCore

@QtCore.pyqtSlot()
def my_GUI_redraw_routine():
    ...

qdev.signal_DAQ_updated.connect(my_GUI_redraw_routine)

where qdev is your instance of QDeviceIO. Don’t forget to decorate the function definition with a PyQt5.QtCore.pyqtSlot() decorator.

Type:PyQt5.QtCore.pyqtSignal
QDeviceIO.signal_jobs_updated

Emitted by Worker_jobs when all pending jobs in the queue have been sent out to the device in a response to send() or process_jobs_queue(). See also the tip at signal_DAQ_updated().

Type:PyQt5.QtCore.pyqtSignal
QDeviceIO.signal_DAQ_paused

Emitted by Worker_DAQ to confirm the worker has entered the paused state in a response to Worker_DAQ.pause(). See also the tip at signal_DAQ_updated().

Type:PyQt5.QtCore.pyqtSignal
QDeviceIO.signal_connection_lost

Emitted by Worker_DAQ to indicate that we have lost connection to the device. This happens when N consecutive device I/O operations have failed, where N equals the argument critical_not_alive_count as passed to method create_worker_DAQ(). See also the tip at signal_DAQ_updated().

Type:PyQt5.QtCore.pyqtSignal

Methods

QDeviceIO.attach_device(dev)[source]

Attach a reference to a user-supplied device class instance containing I/O methods. In addition, dev should also have the following members. If not, they will be injected into the dev instance for you:

  • dev.name (str) – Short display name for the device. Default: “myDevice”.
  • dev.mutex (QMutex) – To allow for properly multithreaded device I/O operations. It will be used by Worker_DAQ and Worker_jobs.
  • dev.is_alive (bool) – Device is up and communicatable? Default: True.
Parameters:dev (object) – Reference to a user-supplied device class instance containing I/O methods.
QDeviceIO.create_worker_DAQ(**kwargs)[source]

Create and configure an instance of Worker_DAQ and transfer it to a new QThread.

Parameters:**kwargs – Will be passed directly to Worker_DAQ as initialization parameters, see here.
QDeviceIO.create_worker_jobs(**kwargs)[source]

Create and configure an instance of Worker_jobs and transfer it to a new QThread.

Parameters:**kwargs – Will be passed directly to Worker_jobs as initialization parameters, see here.
QDeviceIO.start(DAQ_priority=7, jobs_priority=7) → bool[source]

Start the event loop of all of any created workers.

Parameters:
  • DAQ_priority (PyQt5.QtCore.QThread.Priority, optional) – By default, the worker threads run in the operating system at the same thread priority as the main/GUI thread. You can change to higher priority by setting priority to, e.g., PyQt5.QtCore.QThread.TimeCriticalPriority. Be aware that this is resource heavy, so use sparingly.

    Default: PyQt5.QtCore.QThread.InheritPriority.

  • jobs_priority (PyQt5.QtCore.QThread.Priority, optional) – Like DAQ_priority.

    Default: PyQt5.QtCore.QThread.InheritPriority.

Returns:

True if successful, False otherwise.

QDeviceIO.start_worker_DAQ(priority=7) → bool[source]

Start the data acquisition with the device by starting the event loop of the worker_DAQ thread.

Parameters:priority (PyQt5.QtCore.QThread.Priority, optional) – See start() for details.
Returns:True if successful, False otherwise.
QDeviceIO.start_worker_jobs(priority=7) → bool[source]

Start maintaining the jobs queue by starting the event loop of the worker_jobs thread.

Parameters:priority (PyQt5.QtCore.QThread.Priority, optional) – See start() for details.
Returns:True if successful, False otherwise.
QDeviceIO.quit() → bool[source]

Stop all of any running workers and close their respective threads.

Returns:True if successful, False otherwise.
QDeviceIO.quit_worker_DAQ() → bool[source]

Stop worker_DAQ and close its thread.

Returns:True if successful, False otherwise.
QDeviceIO.quit_worker_jobs() → bool[source]

Stop worker_jobs and close its thread.

Returns:True if successful, False otherwise.
QDeviceIO.pause_DAQ()[source]

Only useful in mode DAQ_TRIGGER.CONTINUOUS. Request worker_DAQ to pause and stop listening for data. After worker_DAQ has achieved the paused state, it will emit signal_DAQ_paused().

QDeviceIO.unpause_DAQ()[source]

Only useful in mode DAQ_TRIGGER.CONTINUOUS. Request worker_DAQ to resume listening for data. Once worker_DAQ has successfully resumed, it will emit signal_DAQ_updated() for every DAQ update.

QDeviceIO.wake_up_DAQ()[source]

Only useful in mode DAQ_TRIGGER.SINGLE_SHOT_WAKE_UP. Request worker_DAQ to wake up and perform a single update, i.e. run DAQ_function once. It will emit signal_DAQ_updated() after DAQ_function has run, either successful or not.

QDeviceIO.send(instruction, pass_args=())[source]

Put a job on the worker_jobs queue and send out the full queue first-in, first-out to the device until empty. Once finished, it will emit signal_jobs_updated().

Parameters:
  • instruction (function | other) – Intended to be a reference to a device I/O method such as dev.write(). Any arguments to be passed to the I/O method need to be set in the pass_args parameter.

    You have the freedom to be creative and put, e.g., strings decoding special instructions on the queue as well. Handling such special cases must be programmed by supplying the argument jobs_function with your own alternative job-processing-routine function during the initialization of Worker_jobs. See here.

  • pass_args (tuple | other, optional) – Arguments to be passed to the instruction. Must be given as a tuple, but for convenience any other type will also be accepted if it just concerns a single argument.

    Default: ().

Example:

qdev.send(dev.write, "toggle LED")

where qdev is your QDeviceIO class instance and dev is your device class instance containing I/O methods.

QDeviceIO.add_to_jobs_queue(instruction, pass_args=())[source]

Put a job on the worker_jobs queue.

See send() for details on the parameters.

QDeviceIO.process_jobs_queue()[source]

Send out the full worker_jobs queue first-in, first-out to the device until empty. Once finished, it will emit signal_jobs_updated().