QDeviceIO

class dvg_qdeviceio.QDeviceIO(dev=<dvg_qdeviceio._NoDevice object>, **kwargs)[source]

Bases: QObject

This class provides the framework for multithreaded data acquisition (DAQ) and communication with an I/O device.

All device I/O operations will be offloaded to workers, each running in their dedicated thread. The following workers can be created:

  • Worker_DAQ :

    Acquires data from the device, either periodically or aperiodically.

    Created by calling create_worker_DAQ().

  • Worker_jobs :

    Maintains a thread-safe queue where desired device I/O operations, called jobs, can be put onto. It will send out the queued jobs first-in, first-out (FIFO) to the device.

    Created by calling create_worker_jobs().

Tip

You can inherit from QDeviceIO to build your own subclass that hides the specifics of creating Worker_DAQ and Worker_jobs from the user and modifies the default parameter values. E.g., when making a QDeviceIO subclass specific to your Arduino project:

from dvg_qdeviceio import QDeviceIO, DAQ_TRIGGER

class Arduino_qdev(QDeviceIO):
    def __init__(
        self, dev=None, DAQ_function=None, debug=False, **kwargs,
    ):
        # Pass `dev` onto QDeviceIO() and pass `**kwargs` onto QtCore.QObject()
        super().__init__(dev, **kwargs)

        # Set the DAQ to 10 Hz internal timer
        self.create_worker_DAQ(
            DAQ_trigger                = DAQ_TRIGGER.INTERNAL_TIMER,
            DAQ_function               = DAQ_function,
            DAQ_interval_ms            = 100,  # 100 ms -> 10 Hz
            critical_not_alive_count   = 3,
            debug                      = debug,
        )

        # Standard jobs handling
        self.create_worker_jobs(debug=debug)

Now, the user only has to call the following to get up and running:

ard_qdev = Arduino_qdev(
    dev=my_Arduino_device,
    DAQ_function=my_DAQ_function
)
ard_qdev.start()
Parameters:
  • dev (object | _NoDevice, optional) – Reference to a user-supplied device class instance containing I/O methods. In addition, dev should also have the following members. If not, they will be injected into the dev instance for you:

    • dev.name (str) – Short display name for the device. Default: “myDevice”.
    • dev.mutex (PySide6.QtCore.QMutex) – To allow for properly multithreaded device I/O operations. It will be used by Worker_DAQ and Worker_jobs.
    • dev.is_alive (bool) – Device is up and communicatable? Default: True.

    Default: _NoDevice

  • **kwargs – All remaining keyword arguments will be passed onto inherited class PySide6.QtCore.QObject.

Attributes:

dev

Reference to a user-supplied device class instance containing I/O methods.

Type:object | _NoDevice
worker_DAQ

Instance of Worker_DAQ as created by create_worker_DAQ(). This worker runs in a dedicated thread.

Type:Worker_DAQ
worker_jobs

Instance of Worker_jobs as created by create_worker_jobs(). This worker runs in a dedicated thread.

Type:Worker_jobs
update_counter_DAQ

Increments every time worker_DAQ tries to update.

Type:int
update_counter_jobs

Increments every time worker_jobs tries to update.

Type:int
obtained_DAQ_interval_ms

Obtained time interval in milliseconds since the previous worker_DAQ update.

Type:int | numpy.nan
obtained_DAQ_rate_Hz

Obtained acquisition rate of worker_DAQ in hertz. It will take several DAQ updates for the value to be properly calculated, and till that time it will be numpy.nan.

Type:float | numpy.nan
not_alive_counter_DAQ

Number of consecutive failed attempts to update worker_DAQ, presumably due to device I/O errors. Will be reset to 0 once a successful DAQ update occurs. See the signal_connection_lost() mechanism.

Type:int

Signals

QDeviceIO.signal_DAQ_updated

Emitted by Worker_DAQ when its DAQ_function has run and finished, either succesfully or not.

Tip

It can be useful to connect this signal to a slot containing, e.g., your GUI redraw routine:

from PySide6 import QtCore

@QtCore.pyqtSlot()
def my_GUI_redraw_routine():
    ...

qdev.signal_DAQ_updated.connect(my_GUI_redraw_routine)

where qdev is your instance of QDeviceIO. Don’t forget to decorate the function definition with a Slot decorator.

Type:Signal
QDeviceIO.signal_jobs_updated

Emitted by Worker_jobs when all pending jobs in the queue have been sent out to the device in a response to send() or process_jobs_queue(). See also the tip at signal_DAQ_updated().

Type:Signal
QDeviceIO.signal_DAQ_paused

Emitted by Worker_DAQ to confirm the worker has entered the paused state in a response to pause_DAQ(). See also the tip at signal_DAQ_updated().

Type:Signal
QDeviceIO.signal_connection_lost

Emitted by Worker_DAQ to indicate that we have lost connection to the device. This happens when N consecutive device I/O operations have failed, where N equals the argument critical_not_alive_count as passed to method create_worker_DAQ(). See also the tip at signal_DAQ_updated().

Type:Signal

Methods

QDeviceIO.create_worker_DAQ(DAQ_trigger: DAQ_TRIGGER = DAQ_TRIGGER.INTERNAL_TIMER, DAQ_function: Optional[Callable] = None, DAQ_interval_ms: int = 100, DAQ_timer_type: TimerType = 0, critical_not_alive_count: int = 1, debug: bool = False, **kwargs)[source]

Create and configure an instance of Worker_DAQ and transfer it to a new PySide6.QtCore.QThread.

This worker acquires data from the I/O device, either periodically or aperiodically. It does so by calling a user-supplied function, passed as parameter DAQ_function, containing device I/O operations and subsequent data processing, every time the worker updates. There are different modes of operation for this worker to perform an update. This is set by parameter DAQ_trigger.

The Worker_DAQ routine is robust in the following sense. It can be set to quit as soon as a communication error appears, or it could be set to allow a certain number of communication errors before it quits. The latter can be useful in non-critical implementations where continuity of the program is of more importance than preventing drops in data transmission. This, obviously, is a work-around for not having to tackle the source of the communication error, but sometimes you just need to struggle on. E.g., when your Arduino is out in the field and picks up occasional unwanted interference/ground noise that messes with your data transmission. See parameter critical_not_alive_count.

Parameters:
  • DAQ_trigger (int, optional) – Mode of operation. See DAQ_TRIGGER.

    Default: DAQ_TRIGGER.INTERNAL_TIMER.

  • DAQ_function (Callable | None, optional) – Reference to a user-supplied function containing the device I/O operations and subsequent data processing, to be invoked every DAQ update.

    Default: None.

    Important

    The function must return True when the communication with the device was successful, and False otherwise.

    Warning

    Neither directly change the GUI, nor print to the terminal from out of this function. Doing so might temporarily suspend the function and could mess with the timing stability of the worker. (You’re basically undermining the reason to have multithreading in the first place). That could be acceptable, though, when you need to print debug or critical error information to the terminal, but be aware about the possible negative effects.

    Instead, connect to QDeviceIO.signal_DAQ_updated() from out of the main/GUI thread to instigate changes to the terminal/GUI when needed.

    Example

    Pseudo-code, where time and temperature are variables that live at a higher scope, presumably at the main scope level. The function dev.query_temperature() contains the device I/O operations, e.g., sending out a query over RS232 and collecting the device reply. In addition, the function notifies if the communication was successful. Hence, the return values of dev.query_temperature() are success as boolean and reply as a tuple containing a time stamp and a temperature reading.

    def my_DAQ_function():
        success, reply = dev.query_temperature()
        if not(success):
            print("Device IOerror")
            return False    # Return failure
    
        # Parse readings into separate variables and store them
        try:
            time, temperature = reply
        except Exception as err:
            print(err)
            return False    # Return failure
    
        return True         # Return success
    
  • DAQ_interval_ms (int, optional) – Only useful in mode DAQ_TRIGGER.INTERNAL_TIMER. Desired data-acquisition update interval in milliseconds.

    Default: 100.

  • DAQ_timer_type (PySide6.QtCore.Qt.TimerType, optional) – Only useful in mode DAQ_TRIGGER.INTERNAL_TIMER. The update interval is timed to a PySide6.QtCore.QTimer running inside Worker_DAQ. The default value PySide6.QtCore.Qt.TimerType.PreciseTimer tries to ensure the best possible timer accuracy, usually ~1 ms granularity depending on the OS, but it is resource heavy so use sparingly. One can reduce the CPU load by setting it to less precise timer types PySide6.QtCore.Qt.TimerType.CoarseTimer or PySide6.QtCore.Qt.TimerType.VeryCoarseTimer.

    Default: PySide6.QtCore.Qt.TimerType.PreciseTimer.

  • critical_not_alive_count (int, optional) – The worker will allow for up to a certain number of consecutive communication failures with the device, before hope is given up and a QDeviceIO.signal_connection_lost() is emitted. Use at your own discretion. Setting the value to 0 will never give up on communication failures.

    Default: 1.

  • debug (bool, optional) – Print debug info to the terminal? Warning: Slow! Do not leave on unintentionally.

    Default: False.

  • **kwargs – All remaining keyword arguments will be passed onto inherited class PySide6.QtCore.QObject.

QDeviceIO.create_worker_jobs(jobs_function: Optional[Callable] = None, debug: bool = False, **kwargs)[source]

Create and configure an instance of Worker_jobs and transfer it to a new PySide6.QtCore.QThread.

This worker maintains a thread-safe queue where desired device I/O operations, called jobs, can be put onto. The worker will send out the operations to the device, first-in, first-out (FIFO), until the queue is empty again. The manner in which each job gets handled is explained by parameter jobs_function.

This worker uses the PySide6.QtCore.QWaitCondition mechanism. Hence, it will only send out all pending jobs on the queue, whenever the thread is woken up by a call to Worker_jobs.process_queue(). When it has emptied the queue, the thread will go back to sleep again.

Parameters:
  • jobs_function (Callable | None, optional) – Routine to be performed per job.

    Default: None.

    When omitted and, hence, left set to the default value None, it will perform the default job handling routine, which goes as follows:

    func and args will be retrieved from the jobs queue and their combination func(*args) will get executed. Respectively, func and args correspond to instruction and pass_args of methods send() and add_to_queue().

    The default is sufficient when func corresponds to an I/O operation that is an one-way send, i.e. a write operation with optionally passed arguments, but without a reply from the device.

    Alternatively, you can pass it a reference to a user-supplied function performing an alternative job handling routine. This allows you to get creative and put, e.g., special string messages on the queue that decode into, e.g.,

    • multiple write operations to be executed as one block,
    • query operations whose return values can be acted upon accordingly,
    • extra data processing in between I/O operations.

    The function you supply must take two arguments, where the first argument is to be func and the second argument is to be args of type tuple. Both func and args will be retrieved from the jobs queue and passed onto your supplied function.

    Warning

    Neither directly change the GUI, nor print to the terminal from out of this function. Doing so might temporarily suspend the function and could mess with the timing stability of the worker. (You’re basically undermining the reason to have multithreading in the first place). That could be acceptable, though, when you need to print debug or critical error information to the terminal, but be aware about this warning.

    Instead, connect to QDeviceIO.signal_jobs_updated() from out of the main/GUI thread to instigate changes to the terminal/GUI when needed.

    Example:

    def my_jobs_function(func, args):
        if func == "query_id?":
            # Query the device for its identity string
            success, ans_str = dev.query("id?")
            # And store the reply 'ans_str' in another variable
            # at a higher scope or do stuff with it here.
        else:
            # Default job handling where, e.g.
            # func = dev.write
            # args = ("toggle LED",)
            func(*args)
    
  • debug (bool, optional) – Print debug info to the terminal? Warning: Slow! Do not leave on unintentionally.

    Default: False.

  • **kwargs – All remaining keyword arguments will be passed onto inherited class PySide6.QtCore.QObject.

QDeviceIO.start(DAQ_priority=7, jobs_priority=7) bool[source]

Start the event loop of all of any created workers.

Parameters:
  • DAQ_priority (PySide6.QtCore.QThread.Priority, optional) – By default, the worker threads run in the operating system at the same thread priority as the main/GUI thread. You can change to higher priority by setting priority to, e.g., PySide6.QtCore.QThread.TimeCriticalPriority. Be aware that this is resource heavy, so use sparingly.

    Default: PySide6.QtCore.QThread.Priority.InheritPriority.

  • jobs_priority (PySide6.QtCore.QThread.Priority, optional) – Like DAQ_priority.

    Default: PySide6.QtCore.QThread.Priority.InheritPriority.

Returns:

True if successful, False otherwise.

QDeviceIO.start_worker_DAQ(priority=7) bool[source]

Start the data acquisition with the device by starting the event loop of the worker_DAQ thread.

Parameters:priority (PySide6.QtCore.QThread.Priority, optional) – See start() for details.
Returns:True if successful, False otherwise.
QDeviceIO.start_worker_jobs(priority=7) bool[source]

Start maintaining the jobs queue by starting the event loop of the worker_jobs thread.

Parameters:priority (PySide6.QtCore.QThread.Priority, optional) – See start() for details.
Returns:True if successful, False otherwise.
QDeviceIO.quit() bool[source]

Stop all of any running workers and close their respective threads.

Returns:True if successful, False otherwise.
QDeviceIO.quit_worker_DAQ() bool[source]

Stop worker_DAQ and close its thread.

Returns:True if successful, False otherwise.
QDeviceIO.quit_worker_jobs() bool[source]

Stop worker_jobs and close its thread.

Returns:True if successful, False otherwise.
QDeviceIO.pause_DAQ()[source]

Only useful in mode DAQ_TRIGGER.CONTINUOUS. Request worker_DAQ to pause and stop listening for data. After worker_DAQ has achieved the paused state, it will emit signal_DAQ_paused().

QDeviceIO.unpause_DAQ()[source]

Only useful in mode DAQ_TRIGGER.CONTINUOUS. Request worker_DAQ to resume listening for data. Once worker_DAQ has successfully resumed, it will emit signal_DAQ_updated() for every DAQ update.

QDeviceIO.wake_up_DAQ()[source]

Only useful in mode DAQ_TRIGGER.SINGLE_SHOT_WAKE_UP. Request worker_DAQ to wake up and perform a single update, i.e. run its DAQ_function once. It will emit signal_DAQ_updated() after the DAQ_function has run, either successful or not.

QDeviceIO.send(instruction, pass_args=())[source]

Put a job on the worker_jobs queue and send out the full queue first-in, first-out to the device until empty. Once finished, it will emit signal_jobs_updated().

Parameters:
  • instruction (Callable | other) – Intended to be a reference to a device I/O method such as dev.write(). Any arguments to be passed to the I/O method need to be set in the pass_args parameter.

    You have the freedom to be creative and put, e.g., strings decoding special instructions on the queue as well. Handling such special cases must be programmed by supplying the parameter jobs_function when calling QDeviceIO.create_worker_jobs() with your own alternative job-processing-routine function.

  • pass_args (tuple | other, optional) – Arguments to be passed to the instruction. Must be given as a tuple, but for convenience any other type will also be accepted if it just concerns a single argument.

    Default: ().

Example:

qdev.send(dev.write, "toggle LED")

where qdev is your QDeviceIO class instance and dev is your device class instance containing I/O methods.

QDeviceIO.add_to_jobs_queue(instruction, pass_args=())[source]

Put a job on the worker_jobs queue.

See send() for details on the parameters.

QDeviceIO.process_jobs_queue()[source]

Send out the full worker_jobs queue first-in, first-out to the device until empty. Once finished, it will emit signal_jobs_updated().