The Qt’s signals is a very powerful system that allows for a great flow of data and events between objects and threads. This architecture allowed everything from the value of a spinbox changing to the process finishing to be connected and be acted on.
One limitation is that signals are limited to either a QCoreApplication or QGuiApplication instance, making them very scoped in that regard. What happens if we want to have a tool emit a signal and have another tool respond to that. What happens if we want to work across computers or even networks!
The reality of bringing this concept to life is actually a lot simpler than you might think!
The general concept is to use a database as a middle man, an emitter can add the signal and args to the database and there can be listeners checking that database for new signals that they care about and then just loading and emitting them locally.
A great database choice for this would be MongoDB, as its free, fast to set up, powerfully quick to search, and it’s all document-based, which makes it ideal. Installing it to a machine going through its 3-minute setup is all that is needed for that side of the database work! Sadly Python doe not come packaged with the ability to talk to a MongoDB so it’s necessary to download the require 3rd party lib or just use the following pip command:
$ python -m pip install pymongo
With all the setup done and out of the way, we can start coding!
It makes sense to have a single class for interacting with MongoDB, and it might as well contain methods for both the emitting and listening for signals, so all the logic is in one place. One thing we don’t want is to have this listener class get out of sync, so making everything on a class method and static method means that all operations happen on the same object in memory and we won’t run into issues with multiple instances of the main class.
As the database is document drive, there are no given keys, no column names, it’s important to take these as consts in the code, so we can refer back to them in the code easily, but these could be their own class of keys if multiple things were trying to access the raw data.
class SignalBeacons(QtCore.QObject): _SIGNAL_KEY = "SignalName" _ARG_LIST_KEY = "ArgList" _TIMESTAMP_KEY = "TimeStamp"
One of the great things with MongoDB is that queries to get database and collections will also create them if they don’t exist. To ensure that we dont end up with different databases or collections with slight variations of the spellings, a single static method to get the connection seems like the best way forward
@staticmethod def _getDatabaseConnection(): """ Get the database connection """ client = pymongo.MongoClient("mongodb://localhost:27017/") databse = client["blogCode"] return databse["signals"]
The easiest side to add first would be the emitting side as its fairly straightforward. The process takes in a signal name and a list of args. Qt doesn’t allow keyword args for their signal emits, and whilst it’s technically possible to implement in this system, I’ve opted to respect Qt’s design choice here.
We serialize the list of signal args with Pickle so we can easily load them back up on the other side. This approach is very barebones as a lot of Qt classes are not pickleable, but standard types are, but creating a pickle that’s Qt aware is a topic for another blog!
To complete the signal emitting code, we add in the time in UTC to when the signal was emitted so listeners can make an informed choice to emit on their side or not based on the time.
@classmethod def _addSignalToDatabase(cls, signalName, listOfArgs): """ Emitter Side! Add the signal to the database with the given name and list of signal args. The args are pickled! args: signalName (str): The name of the signal to emit listOfArgs (list of objects): List of objects to emit with the signal. """ data = {cls._SIGNAL_KEY: signalName, cls._ARG_LIST_KEY: pickle.dumps(listOfArgs), cls._TIMESTAMP_KEY: datetime.datetime.utcnow(),} cls._getDatabaseConnection().insert_one(data)
In order to not have to force developers to convert their args into a list before emitting them, we’re going to expose a public-facing method which will use *args to package them up.
@classmethod def emit(cls, signalName, *args): """ Emitter Side! Emit the signal given name signal with the given args The args are pickled! args: signalName (str): The name of the signal to emit """ cls._addSignalToDatabase(signalName, args)
And that’s it for the emitting side. I told you it was the easiest part!!
Now that leaves the more complex side, the listener’s side, which is due to all the moving parts and ensuring that signals are connected functions to signals are executed on the main thread, as signals would be.
On the emitting side we had the “_addSignalToDatabase” method, with the listener side we need to have an equivalent which will take in a list of signal names to look out for as well as a UTC time that was when the listener last checked.
To make sure that we don’t miss out on emitted signals due to time lost due to gaps in getting the UTC current time, we get the latest time of the new signal data and return that, and if there is no new data, we return back the originally given timestamp data, so the next query can use that. With this system in place, there is no way for any signal to not get picked up due to slipping through the cracks.
@classmethod def getEmittedSignals(cls, fromTimeStamp, signalsNames): """ Listener Side! Get all the emitted signal data for the given signal names since the given timestamp args: fromTimeStamp (datetime): The DateTime to use find new signals since signalsNames (list of strings): The names of the signal to check for return: signal data (tuple of str (signalName) and list of objects(args)) and UTC time of the last signal """ # Setup the search query query = { cls._TIMESTAMP_KEY: {"$gt":fromTimeStamp}, cls._SIGNAL_KEY: {"$in":signalsNames}, } dbValues = list(cls._getDatabaseConnection().find(query).sort([(cls._TIMESTAMP_KEY, pymongo.ASCENDING)])) # If theres no values, just exit out and return with an empty list and the given last time stamp if len(dbValues) == 0: return [], fromTimeStamp # unpickle the data intop a signalName, signalData tuple, extract the last signal emitted timestamp signalData = [(data[cls._SIGNAL_KEY], pickle.loads(data[cls._ARG_LIST_KEY])) for data in dbValues] lastSignalTime = max([data[cls._TIMESTAMP_KEY] for data in dbValues]) return signalData, lastSignalTime
The “getEmittedSignal” method works great for getting data, but we need to setup a system to periodically call that method to pick up any new data. My initial approach was to use a QTimer, but there were issues when I introduced an artificial slowdown on the getting data back from the database side which caused the main thread to block and the UI to freeze. This is down to how signals are emitted, their connected methods are run as part of the main thread, which is something we will use to our advantage later.
To work around this, a simple QThread looked to be the best option as the run method within it can just keep checking for new data and sleeping, rather than having to spin up a new QRunnable each time one finishes.
The implementation of this is nothing noteworthy, but one cool thing I did do was to connect the QApplications aboutToQuit method to a handler which stopped the thread. This stopped the thread still existing when the application was closed.
class _SignalBeaconsThread(QtCore.QThread): """ Signal Beacons Thread to handle the task of getting the data so its not blocking the main thread signals: signalEmitted (str, list of objects): A signal has been emitted. sends the name and args of the signal """ signalEmitted = QtCore.Signal(object, object) def __init__(self): """ Constructor """ super(_SignalBeaconsThread, self).__init__() self._signalNames = [] self._lastTime = None self._isRunning = False self._app = None def run(self): """ ReImplemented method. Main loop logic """ # Hook into the QApplications close event to make sure the thread is stopped with the processed if self._app is None: self._app = QtCore.QCoreApplication.instance() or QtGui.QApplication.instance() self._app.aboutToQuit.connect(self._handleAppClosing) # Mark the process as running self._isRunning = True self._lastTime = datetime.datetime.utcnow() # Just keep running until we tell it to stop while self._isRunning is True: # Get the new data newData, self._lastTime = SignalBeacons.getEmittedSignals(self._lastTime, self._signalNames) # Check the thread is still running as the Database call might have taken some time if self._isRunning is False: break # Emit the signals and their data for sigName, sigData in newData: self.signalEmitted.emit(sigName, sigData) # sleep for a second-ish self.msleep(1*1000) # 1 second def setSignals(self, newSignalList): """ Set the signals that the listener should be looking out for. args: newSignalList (list of str): List of string names for the signals """ self._signalNames = list(newSignalList) def stop(self): """ Stop the listener thread. As its all just read data, we arent too presious about how we stop it """ self._isRunning = False self.terminate() def _handleAppClosing(self): """ Handle the QApplication closing, making sure that the thread is stopped before the process stops """ self.stop()
To know what signal to connect to and what functions to call when those signals are emitted, a simple connect and disconnect method are needed. Whilst they might look like they are doing a lot, they are not. The general idea is that we add that method the signal list for that signalName, or remove it for the disconnect, and then either start the thread if its the first connected signal, or disconnect if there are no more signals connected. Either way, we pass the new list of signalNames to the thread so it knows what to check for.
def connectSignal(cls, signalName, funcPointer): """ Listener Side! Connect the given function from the given signal Will raise ValueError if the signal is already connected args: signalName (str): The name of the signal to connect to funcPointer (function): The function that was assigned to that signal to connect """ # Get the current list of functions for that signal name currentFuncs = cls._SIGNALS.get(signalName, []) # Dont re-add it if its already connected if funcPointer in currentFuncs: raise ValueError("Unable to connect signal") # Add the fucntion, and if the listener timer isnt running, connect it and start it. currentFuncs.append(funcPointer) cls._SIGNALS[signalName] = currentFuncs # update the thread with the new signal name list cls._LISTENER_THREAD.setSignals(cls._SIGNALS.keys()) if cls._LISTENER_THREAD.isRunning() is False: cls._LISTENER_THREAD.start() cls._LISTENER_THREAD.signalEmitted.connect(cls._handleSignalEmitted) @classmethod def disconnectSignal(cls, signalName, funcPointer): """ Listener Side! Disconnect the given function from the given signal Will raise ValueError if the signal is not connected args: signalName (str): The name of the signal to disconnect funcPointer (function): The function that was assigned to that signal to disconnect """ # Try and get the list of functions for that signal name try: signalList = cls._SIGNALS[signalName] except KeyError: raise ValueError("Unable to disconnect signal") # Try removing that function from list of functions for that signal name try: signalList.remove(funcPointer) except ValueError: raise ValueError("Unable to disconnect signal") # If the list of functions for that signal is empty, then remove it from the list of signals if len(signalList) == 0: cls._SIGNALS.pop(signalName) # and if theres no more signals to listen for, stop the timer if len(cls._SIGNALS) == 0: cls._LISTENER_THREAD.stop() cls._LISTENER_THREAD.signalEmitted.disconnect(cls._handleSignalEmitted) else: # set the reduced list of functions to that signal name cls._SIGNALS[signalName] = signalList # update the thread with the new signal name list cls._LISTENER_THREAD.setSignals(cls._SIGNALS.keys())
The last method in the whole logic is for calling the connected function to the signal. As I mentioned earlier, functions connected to signals are executed in the main thread, which means by simply having a method connected to the “signalEmitted” signal on the thread, we can go through the list of connected signals with that name and just call them. It’s as simple as that!
@classmethod def _handleSignalEmitted(cls, signalName, signalData): """ Listener Side! Handle a new signal being emitted from the listener thread. """ # Go through each of the functions with that signal, and trigger them with the given data for fntPrt in cls._SIGNALS.get(signalName, []): fntPrt(*signalData)
This makes the public interface look something like this for emitting signals:
import beaconsLib beaconsLib.SignalBeacons.emit("Ping", 1) beaconsLib.SignalBeacons.emit("SomeSignal", "myArg1", "myArg2", "myArg3")
And the interface for connecting to them as:
import beaconsLib def _handlePing(num): print num def _handleSomeSignal(arg1, arg2, arg3): print "{}, {}, {}".format(arg1, arg2, arg3) beaconsLib.SignalBeacons.connectSignal("Ping", _handlePing) beaconsLib.SignalBeacons.connectSignal("SomeSignal", _handleSomeSignal)
As Qt signals are all async, they do require either a QCoreApplication or a QApplication to work, so this implementation also expects there to be one of these running as well to work.
I’ve written a python listener demo and signal generator demo to show off it working, and as per its design, you can connect multiple listeners with no issues, as well as multiple generators.

As a result of my article a few weeks about porting Python to C++, I did look into how this could work in C++, but it seemed to be an uphill battle due to the dynamic and weak type nature of python that this system takes advance of, such as not knowing the types of the objects in the list of args. I would love to find a way to implement this one day!
I hope this given you some ideas about how Qt signals and MongoDB can be used to make processes more globally aware of what’s going on and making everything more responsive and current as a result.
– Geoff
Like!! Great article post.Really thank you! Really Cool.