Lighting the cross-application signal beacons

The Qt’s signals is a very powerful system that allows for a great flow of data and events between objects and threads. This architecture allowed everything from the value of a spinbox changing to the process finishing to be connected and be acted on.

One limitation is that signals are limited to either a QCoreApplication or QGuiApplication instance, making them very scoped in that regard. What happens if we want to have a tool emit a signal and have another tool respond to that. What happens if we want to work across computers or even networks!

The reality of bringing this concept to life is actually a lot simpler than you might think!

The general concept is to use a database as a middle man, an emitter can add the signal and args to the database and there can be listeners checking that database for new signals that they care about and then just loading and emitting them locally. 

A great database choice for this would be MongoDB, as its free, fast to set up, powerfully quick to search, and it’s all document-based, which makes it ideal. Installing it to a machine going through its 3-minute setup is all that is needed for that side of the database work! Sadly Python doe not come packaged with the ability to talk to a MongoDB so it’s necessary to download the require 3rd party lib or just use the following pip command:

$ python -m pip install pymongo

With all the setup done and out of the way, we can start coding!

It makes sense to have a single class for interacting with MongoDB, and it might as well contain methods for both the emitting and listening for signals, so all the logic is in one place. One thing we don’t want is to have this listener class get out of sync, so making everything on a class method and static method means that all operations happen on the same object in memory and we won’t run into issues with multiple instances of the main class.

As the database is document drive, there are no given keys, no column names, it’s important to take these as consts in the code, so we can refer back to them in the code easily, but these could be their own class of keys if multiple things were trying to access the raw data.

class SignalBeacons(QtCore.QObject):

    _SIGNAL_KEY = "SignalName"
    _ARG_LIST_KEY = "ArgList"
    _TIMESTAMP_KEY = "TimeStamp"

One of the great things with MongoDB is that queries to get database and collections will also create them if they don’t exist. To ensure that we dont end up with different databases or collections with slight variations of the spellings, a single static method to get the connection seems like the best way forward

    @staticmethod
    def _getDatabaseConnection():
        """
        Get the database connection
        """
        client = pymongo.MongoClient("mongodb://localhost:27017/")
        databse = client["blogCode"]
        return databse["signals"]

The easiest side to add first would be the emitting side as its fairly straightforward. The process takes in a signal name and a list of args. Qt doesn’t allow keyword args for their signal emits, and whilst it’s technically possible to implement in this system, I’ve opted to respect Qt’s design choice here. 

We serialize the list of signal args with Pickle so we can easily load them back up on the other side. This approach is very barebones as a lot of Qt classes are not pickleable, but standard types are, but creating a pickle that’s Qt aware is a topic for another blog!

To complete the signal emitting code, we add in the time in UTC to when the signal was emitted so listeners can make an informed choice to emit on their side or not based on the time.

    @classmethod
    def _addSignalToDatabase(cls, signalName, listOfArgs):
        """
        Emitter Side!
        
        Add the signal to the database with the given name and list of signal args.
        The args are pickled!
        
        args:
            signalName (str): The name of the signal to emit
            listOfArgs (list of objects): List of objects to emit with the signal.
        """
        data = {cls._SIGNAL_KEY: signalName, 
                cls._ARG_LIST_KEY: pickle.dumps(listOfArgs),
                cls._TIMESTAMP_KEY: datetime.datetime.utcnow(),}
        cls._getDatabaseConnection().insert_one(data)

In order to not have to force developers to convert their args into a list before emitting them, we’re going to expose a public-facing method which will use *args to package them up.

    @classmethod
    def emit(cls, signalName, *args):
        """
        Emitter Side!
        
        Emit the signal given name signal with the given args
        The args are pickled!
        
        args:
            signalName (str): The name of the signal to emit
        """
        cls._addSignalToDatabase(signalName, args)

And that’s it for the emitting side. I told you it was the easiest part!!

Now that leaves the more complex side, the listener’s side, which is due to all the moving parts and ensuring that signals are connected functions to signals are executed on the main thread, as signals would be.

On the emitting side we had the “_addSignalToDatabase” method, with the listener side we need to have an equivalent which will take in a list of signal names to look out for as well as a UTC time that was when the listener last checked. 

To make sure that we don’t miss out on emitted signals due to time lost due to gaps in getting the UTC current time, we get the latest time of the new signal data and return that, and if there is no new data, we return back the originally given timestamp data, so the next query can use that. With this system in place, there is no way for any signal to not get picked up due to slipping through the cracks.

    @classmethod
    def getEmittedSignals(cls, fromTimeStamp, signalsNames):
        """
        Listener Side!
        
        Get all the emitted signal data for the given signal names since the given timestamp
        
        args:
            fromTimeStamp (datetime): The DateTime to use find new signals since
            signalsNames (list of strings): The names of the signal to check for
            
        return:
            signal data (tuple of str (signalName) and list of objects(args)) and UTC time of the last signal
        """
        # Setup the search query
        query = {
            cls._TIMESTAMP_KEY: {"$gt":fromTimeStamp},
            cls._SIGNAL_KEY: {"$in":signalsNames},
        }
        dbValues = list(cls._getDatabaseConnection().find(query).sort([(cls._TIMESTAMP_KEY, pymongo.ASCENDING)]))
        
        # If theres no values, just exit out and return with an empty list and the given last time stamp
        if len(dbValues) == 0:
            return [], fromTimeStamp
        
        # unpickle the data intop a signalName, signalData tuple, extract the last signal emitted timestamp
        signalData = [(data[cls._SIGNAL_KEY], pickle.loads(data[cls._ARG_LIST_KEY])) for data in dbValues]
        lastSignalTime = max([data[cls._TIMESTAMP_KEY] for data in dbValues])
        return signalData, lastSignalTime

The “getEmittedSignal” method works great for getting data, but we need to setup a system to periodically call that method to pick up any new data. My initial approach was to use a QTimer, but there were issues when I introduced an artificial slowdown on the getting data back from the database side which caused the main thread to block and the UI to freeze. This is down to how signals are emitted, their connected methods are run as part of the main thread, which is something we will use to our advantage later. 

To work around this, a simple QThread looked to be the best option as the run method within it can just keep checking for new data and sleeping, rather than having to spin up a new QRunnable each time one finishes.

The implementation of this is nothing noteworthy, but one cool thing I did do was to connect the QApplications aboutToQuit method to a handler which stopped the thread. This stopped the thread still existing when the application was closed.

class _SignalBeaconsThread(QtCore.QThread):
    """
    Signal Beacons Thread to handle the task of getting the data so its not blocking the main thread
    
    signals:
        signalEmitted (str, list of objects): A signal has been emitted. sends the name and args of the signal
    """
    signalEmitted = QtCore.Signal(object, object)
    
    def __init__(self):
        """
        Constructor
        """
        super(_SignalBeaconsThread, self).__init__()
        self._signalNames = []
        self._lastTime = None
        self._isRunning = False
        self._app = None

    def run(self):
        """
        ReImplemented method.

        Main loop logic
        """
        # Hook into the QApplications close event to make sure the thread is stopped with the processed
        if self._app is None:
            self._app = QtCore.QCoreApplication.instance() or QtGui.QApplication.instance()
            self._app.aboutToQuit.connect(self._handleAppClosing)
        
        # Mark the process as running
        self._isRunning = True
        self._lastTime = datetime.datetime.utcnow()
        
        # Just keep running until we tell it to stop
        while self._isRunning is True:
            # Get the new data
            newData, self._lastTime = SignalBeacons.getEmittedSignals(self._lastTime, self._signalNames)
            
            # Check the thread is still running as the Database call might have taken some time
            if self._isRunning is False:
                break
            
            # Emit the signals and their data
            for sigName, sigData in newData:
                self.signalEmitted.emit(sigName, sigData)
            
            # sleep for a second-ish
            self.msleep(1*1000) # 1 second
    
    def setSignals(self, newSignalList):
        """
        Set the signals that the listener should be looking out for.
        
        args:
            newSignalList (list of str): List of string names for the signals
        """
        self._signalNames = list(newSignalList)

    def stop(self):
        """
        Stop the listener thread. As its all just read data, we arent too presious about how we stop it
        """
        self._isRunning = False
        self.terminate()
        
    def _handleAppClosing(self):
        """
        Handle the QApplication closing, making sure that the thread is stopped before the process stops
        """
        self.stop()

To know what signal to connect to and what functions to call when those signals are emitted, a simple connect and disconnect method are needed. Whilst they might look like they are doing a lot, they are not. The general idea is that we add that method the signal list for that signalName, or remove it for the disconnect, and then either start the thread if its the first connected signal, or disconnect if there are no more signals connected. Either way, we pass the new list of signalNames to the thread so it knows what to check for.

    def connectSignal(cls, signalName, funcPointer):
        """
        Listener Side!
        
        Connect the given function from the given signal 
        Will raise ValueError if the signal is already connected
        
        args:
            signalName (str): The name of the signal to connect to
            funcPointer (function): The function that was assigned to that signal to connect
        """
        # Get the current list of functions for that signal name
        currentFuncs = cls._SIGNALS.get(signalName, [])

        # Dont re-add it if its already connected
        if funcPointer in currentFuncs:
            raise ValueError("Unable to connect signal")
        
        # Add the fucntion, and if the listener timer isnt running, connect it and start it.
        currentFuncs.append(funcPointer)
        cls._SIGNALS[signalName] = currentFuncs
        
        # update the thread with the new signal name list
        cls._LISTENER_THREAD.setSignals(cls._SIGNALS.keys())
        
        if cls._LISTENER_THREAD.isRunning() is False:
            cls._LISTENER_THREAD.start()
            cls._LISTENER_THREAD.signalEmitted.connect(cls._handleSignalEmitted)

    @classmethod
    def disconnectSignal(cls, signalName, funcPointer):
        """
        Listener Side!
        
        Disconnect the given function from the given signal 
        Will raise ValueError if the signal is not connected
        
        args:
            signalName (str): The name of the signal to disconnect
            funcPointer (function): The function that was assigned to that signal to disconnect
        """
        # Try and get the list of functions for that signal name
        try:
            signalList = cls._SIGNALS[signalName]
        except KeyError:
            raise ValueError("Unable to disconnect signal")
        
        # Try removing that function from list of functions for that signal name
        try:
            signalList.remove(funcPointer)
        except ValueError:
            raise ValueError("Unable to disconnect signal")
        
        # If the list of functions for that signal is empty, then remove it from the list of signals
        if len(signalList) == 0:
            cls._SIGNALS.pop(signalName)
            # and if theres no more signals to listen for, stop the timer
            if len(cls._SIGNALS) == 0:
                cls._LISTENER_THREAD.stop()
                cls._LISTENER_THREAD.signalEmitted.disconnect(cls._handleSignalEmitted)

        else:
            # set the reduced list of functions to that signal name
            cls._SIGNALS[signalName] = signalList
        
        # update the thread with the new signal name list
        cls._LISTENER_THREAD.setSignals(cls._SIGNALS.keys())

The last method in the whole logic is for calling the connected function to the signal. As I mentioned earlier, functions connected to signals are executed in the main thread, which means by simply having a method connected to the “signalEmitted” signal on the thread, we can go through the list of connected signals with that name and just call them. It’s as simple as that!

    @classmethod
    def _handleSignalEmitted(cls, signalName, signalData):
        """
        Listener Side!
         
        Handle a new signal being emitted from the listener thread.
        """
        # Go through each of the functions with that signal, and trigger them with the given data
        for fntPrt in cls._SIGNALS.get(signalName, []):
            fntPrt(*signalData)

This makes the public interface look something like this for emitting signals:

import beaconsLib
beaconsLib.SignalBeacons.emit("Ping", 1)
beaconsLib.SignalBeacons.emit("SomeSignal", "myArg1", "myArg2", "myArg3")

And the interface for connecting to them as:

import beaconsLib

def _handlePing(num):
    print num

def _handleSomeSignal(arg1, arg2, arg3):
   print "{}, {}, {}".format(arg1, arg2, arg3)
   
beaconsLib.SignalBeacons.connectSignal("Ping", _handlePing)
beaconsLib.SignalBeacons.connectSignal("SomeSignal", _handleSomeSignal)

As Qt signals are all async, they do require either a QCoreApplication or a QApplication to work, so this implementation also expects there to be one of these running as well to work.

I’ve written a python listener demo and signal generator demo to show off it working, and as per its design, you can connect multiple listeners with no issues, as well as multiple generators.

As a result of my article a few weeks about porting Python to C++, I did look into how this could work in C++, but it seemed to be an uphill battle due to the dynamic and weak type nature of python that this system takes advance of, such as not knowing the types of the objects in the list of args. I would love to find a way to implement this one day!

I hope this given you some ideas about how Qt signals and MongoDB can be used to make processes more globally aware of what’s going on and making everything more responsive and current as a result.

– Geoff

1 thought on “Lighting the cross-application signal beacons

Comments are closed.