Service-Based Remote Pi Recording

In the last 3 posts, I’ve gone over building a remote Pi video capture tool, allowing users to view and control the PiCamera on a Raspberry Pi. If you haven’t checked them out yet, you can find them:

Now that we have this great platform, we’re going to extend it by building the start of a service type architect into it where we can search the local network to discover these Remote Raspberry Pi’s and then allow for an easy connection to them.

The reason I’m calling it a service-based architect is that this is the base that can be extended out to contain multiple services, one of which being Pi Camera service.

How this will work is by having the client broadcast out a signal via UDP seeking what devices are out there. This signal is read by the Raspberry Pi’s and a response is generated with the IP, Address and sent back to the requesting IP Address. 

We’re going to use the existing code infrastructure to do most of the heavy lifting, the first stage is to add the new broadcast port and some new broadcast commands to the NetworkCommands in consts.py.

class NetworkCommands(object):
	"""
	Enums for Network commands
	"""
	BROADCAST_PORT = 9897
	DATA_PORT = 9898
	...
	# Broadcast Commands
	SERVICES_BROADCAST = 98
	SERVICES_BROADCAST_RESPONSE = 99

At the core of this architecture is a Qt drive UDP socket, which we can wrap in a new Services Connector class which ultimately can be used for both the server and client sides. 

There are a couple of interesting points about using Qt driven UDP broadcasts which need to be accounted for in the design. 

The first is that a UDP broadcast will get picked up by itself if the socket is listening to all broadcasts. We can work around this by getting, and caching, all the known IP addresses for the local computer when we start up the socket and filter against any incoming data packet from one of these addresses.

class ServicesConnector(QtCore.QObject):
    """
    Services Connector, able to search for other services and send data to allow for them to be connceted
    
    Signals:
        serviceFound (str, str): Emits when a service is found, sending its IP address and hostname
        serviceRemoved (str, str): Emits when a service is no longer connectable, sending its IP address and hostname
        searchingForServicesStarted (): Emits when the searching for services has started
        searchingForServicesFinished (): Emits when the searching for services has finished
    """
    serviceFound = QtCore.Signal(object, object)
    serviceRemoved = QtCore.Signal(object, object)
    searchingForServicesStarted = QtCore.Signal()
    searchingForServicesFinished = QtCore.Signal()

    def __init__(self):
        """
        Constructor
        """
        self._hostName = socket.gethostname()

        self._searchForServicesMutex = QtCore.QMutex()
        self._searchForServicesFoundList = []

        self._connectableClients = []
        self._readData = {}

        # Get the local IP's of the machine so we can ignore these broadcasts from itself
        self._localIps = []
        for address in QtNetwork.QNetworkInterface.allAddresses():
            if address.protocol() == QtNetwork.QAbstractSocket.IPv4Protocol:
                self._localIps.append(address)

        if len(self._localIps) == 0:
            raise ValueError("Unable to detect any local IP Address")

        super(ServicesConnector, self).__init__()
        self._serviceSocket = QtNetwork.QUdpSocket()
        self._serviceSocket.bind(consts.NetworkCommands.BROADCAST_PORT, QtNetwork.QUdpSocket.ShareAddress);
        self._serviceSocket.readyRead.connect(self._handleServiceFinderRead)
    def _handleServiceFinderRead(self):
        """
        Handle reading data off the UDP socket when data is available.
        """
        while (self._serviceSocket.hasPendingDatagrams() is True):
            datagram, address, _ = self._serviceSocket.readDatagram(999)
            
            # Disreguard if its from its self
            if address in self._localIps:
                continue
            
            # Append to the currently read data
            currentData = self._readData.get(address, QtCore.QByteArray())
            currentData.append(datagram)
            while True:
                # Decode and parse
                result = consts.NetworkParser.decodeMessage(currentData)
                if result is None:
                    break
                self._parseData(result, address)
            self._readData[address] = currentData

    def _parseData(self, message, address):
        """
        Parse the messages sent from the services
        
        args:
            data (consts.NetworkCommands, list of objects): Tuple of consts.NetworkCommands and list 
                                                            of args
        """
        commandIdx, args = message
        if commandIdx == consts.NetworkCommands.SERVICES_BROADCAST_RESPONSE:
            self._searchForServicesFoundList.append((address.toString(), args[0],))
        elif commandIdx == consts.NetworkCommands.SERVICES_BROADCAST:
            self._sendBroadcastResponse(address)

    def _sendBroadcastResponse(self, address):
        """
        Sends the correct broadcast response 
        """
        data = consts.NetworkParser.encodeMessage(consts.NetworkCommands.SERVICES_BROADCAST_RESPONSE, [self._hostName])
        self._serviceSocket.writeDatagram(data, address, consts.NetworkCommands.BROADCAST_PORT)

The second is the async nature of sending and receiving data using Qt sockets means that we need to be aware that there needs to be a window for services to reply before working out when to send signals out to either add or remove services. Using a QMutex to lock the operation and utilize the QEventLoop to block the main loop whilst still processing QEvents (See my article Holding for the Applause (Signal) for more information on that), we can create this window for services to reply back. Bookending the search functionality with a start and stop signal can let any GUI know that the search operation has started and disabled the triggering UI and when the operation has finished re-enabling the UI.

    def searchForServices(self):
        """
        Start searching for services. 
        This method will emit the searchingForServicesStarted at the stand and searchingForServicesFinished
        at the end.
        Found services are emitted using the serviceFound signal, and services no longer found are
        emitted with the serviceRemoved signal.
        """
        self.searchingForServicesStarted.emit()

        # Lock the mutex so this isnt being run more than once at a time
        locker = QtCore.QMutexLocker(self._searchForServicesMutex)
        locker.relock()
        self._searchForServicesFoundList = []

        # Send out the broadcast to see whos out there
        data = consts.NetworkParser.encodeMessage(98, [])
        self._serviceSocket.writeDatagram(data, QtNetwork.QHostAddress.Broadcast, consts.NetworkCommands.BROADCAST_PORT)

        # Start a 5 second timer and block, giving the services time to respond back
        eventLoop = QtCore.QEventLoop()
        timer = QtCore.QTimer()
        timer.start(1000 * 5)  # 5 Seconds
        timer.timeout.connect(eventLoop.quit)
        eventLoop.exec_()

        # Check what we got and send out removed or added signals
        for item in self._searchForServicesFoundList:
            ipAddress, niceHostName = item
            if item not in self._connectableClients:
                self._connectableClients.append(item)
                self.serviceFound.emit(ipAddress, niceHostName)
                print "Found {}, {}".format(ipAddress, niceHostName)

        for item in self._connectableClients:
            ipAddress, niceHostName = item
            if item not in self._searchForServicesFoundList:
                self._connectableClients.remove(item)
                self.serviceRemoved.emit(ipAddress, niceHostName)
                print "Lost {}, {}".format(ipAddress, niceHostName)

        # Unlock the search for services mutex
        locker.unlock()
        self.searchingForServicesFinished.emit()

Adding in the service functionality to the remoteCaptureServer, is nothing more complex than adding the services import and adding an instance of the ServicesConnector class to the PiCameraWrapperServer constructor

    def __init__(self, captureInfo=consts.CaptureInfo.VIDEO_1080p_30):
    	"""
    	Constructor
    	"""
    	super(PiCameraWrapperServer, self).__init__()
    	self._servicesConnector = services.ServicesConnector()

The PiCameraWidget needs some small tweaking to make it service based, firstly removing the UI ability to specify the hostname and adding the IP address and hostname into the constructor so they can be passed in by the new service widget.

	def __init__(self, ipAddress, hostname, parent=None):
    	"""
    	Constructor
   	 
    	args:
        	ipAddress (str): The IP Address to connect to
        	hostname (str): A nice name to use to for the connection, e.g. a hostname
    	"""
    	super(PiCameraWidget, self).__init__(parent=parent)
    	self._hostname = hostname
    	self._camera = PiCameraWrapperClient(ipAddress)

    	self.setupUi()

	def setupUi(self):
	...
    	# Layouts to Widgets
    	configLayout.addWidget(QtGui.QLabel("Camera to connected to:"), 0, 0)
    	configLayout.addWidget(QtGui.QLabel(self._hostname), 0, 1)
This is the old UI compared to the new UI

Lastly, we want to make a new UI that will populate a list of services to connect to, the UI would be a simple ListWidget with a refresh button, and be event-driven from a ServicesConnector instance.

class RemoteClientLauncherWidget(QtGui.QWidget):
    """
    Simple Client Interface to find and display all the found services.
    """
    def __init__(self):
        """
        Constructor
        """
        super(RemoteClientLauncherWidget, self).__init__()
        self._cameraWins = {}
        self.setupUi()
        self._servicesConnector = services.ServicesConnector()
        self._servicesConnector.serviceFound.connect(self._handleServicesClientAdded)
        self._servicesConnector.serviceRemoved.connect(self._handleServicesClientRemoved)
        self._servicesConnector.searchingForServicesStarted.connect(self._handleServicesSearchStarted)
        self._servicesConnector.searchingForServicesFinished.connect(self._handleServicesSearchFinished)
        self._servicesConnector.searchForServices()
        
        self.setWindowTitle("Remote Pi Camera Launcher")

    def setupUi(self):
        """
        Set up the UI
        """
        # Layouts
        layout = QtGui.QVBoxLayout()

        # Widgets
        self._foundServicesWidget = QtGui.QListWidget()
        self._refreshServices = QtGui.QPushButton("Search for services")

        # Layouts to Widgets
        layout.addWidget(self._foundServicesWidget)
        layout.addWidget(self._refreshServices)

        self.setLayout(layout)

        # Signals
        self._refreshServices.pressed.connect(self._handleRefreshServicesButton)
        self._foundServicesWidget.itemDoubleClicked.connect(self._handleServiesWidgetDoubleClick)

    def _handleRefreshServicesButton(self):
        """ Handle the refresh services button"""
        self._servicesConnector.searchForServices()

    def _handleServicesClientAdded(self, ipAddress, niceHostName):
        """ Handle services client finding a new service"""
        newItem = QtGui.QListWidgetItem()
        newItem.setText("{} ({})".format(niceHostName, ipAddress))
        newItem.setData(QtCore.Qt.UserRole, (ipAddress, niceHostName))
        self._foundServicesWidget.addItem(newItem)

    def _handleServicesClientRemoved(self, ipAddress, niceHostName):
        """ Handle services client removing a service"""
        for idx in xrange(self._foundServicesWidget.count()):
            item = self._foundServicesWidget.item(idx)
            if item.data(QtCore.Qt.UserRole) == [ipAddress, niceHostName]:
                self._foundServicesWidget.takeItem(idx)
                del item
                return

    def _handleServicesSearchStarted(self):
        """ Handle services starting to search for services"""
        for wid in [self._refreshServices, self._foundServicesWidget]:
            wid.setEnabled(False)

    def _handleServicesSearchFinished(self):
        """ Handle services finishing the search for services"""
        for wid in [self._refreshServices, self._foundServicesWidget]:
            wid.setEnabled(True)

Hooking into the double click event on the listWidget, we can pass in the IP address and hostname to a new instance of the PiCameraWidget and show it.

    def _handleServiesWidgetDoubleClick(self, item):
    	"""Handle a double click on a service item, will launch and connect to the service"""
    	ipAddress, niceHostName = item.data(QtCore.Qt.UserRole)
    	newWin = PiCameraWidget(ipAddress, niceHostName)
    	self._cameraWins[ipAddress] = newWin
    	newWin.show()

With that complete, we now have the base of a service-based PiCamera tool which will search for camera’s to connect to with an interface to connect to them. By adding code to start the Server on startup of the Pi via the .bashrc file, we can simply plug the Pi in and then search and connect to it via the controlling computer, rather than having to find its IP address to connect to it.

The whole UI with the services and connected Pi Cameras

I hope this shows the potential of service-based architecture and how easy it would be to add new cameras to any application utilizing it.

– Geoff