Applicaties verbinden met ZMQ in Python

Misschien heb je weleens van message queues (MQ) of berichtwachtrijen gehoord. Ze bestaan in allerlei soorten en maten. Om het concept van message queues uit te leggen, focus ik me hier op ZeroMQ (ZMQ). Met een zelfgemaakte chatapplicatie probeer ik je meer inzicht te geven in hoe deze berichtwachtrijen en -patronen werken.

Applicaties verbinden met ZMQ in Python.

Waarom een message queue gebruiken?

Message queues bieden ontwikkelaars een hoge mate van abstractie voor de implementatie van communicatie tussen twee applicaties. Het stelt je in staat om een tussen twee softwarecomponenten een robuuste communicatie te realiseren, zonder dat je een eigen network stack hoeft te ontwikkelen en onderhouden. Kies je ervoor om een message queue te gebruiken, dan levert dat de volgende faciliteiten op.

  • Gedetailleerde logging
  • Robuuste communicatie
  • Herkansingen bij mislukte afleverpogingen
  • Geen omkijken naar multithreading
  • Het routeren van berichten
  • Standaardisatie van communicatie
  • Eenvoudigere opslag van berichten in een database

Wat is ZMQ?

ZeroMQ of ZMQ is een prestatiegerichte berichtenwachtrij-implementatie die op sockets is gebaseerd. Het is te vergelijken met implementaties, zoals Kafka, RabbitMQ, AMQP en ActiveMQ. Het grootste verschil hiermee is dat ZMQ geen extra service is die je aan je stack moet toevoegen, maar in je huidige stack kan worden geïntegreerd. Het komt in de vorm van een softwarebibliotheek die je eenvoudig aan je applicatie kunt toevoegen. Dit scheelt onderhoud en reduceert de complexiteit van je stack.

Een ander kenmerk van ZMQ is dat het zich platformonafhankelijk opstelt. Er zijn language bindings voor de vrijwel alle populaire talen. Dit in contrast tot andere message queues, die  zich vooral richten op een bepaald ecosysteem of protocol. Het voordeel van ZMQ is dat je vrij bent hoe je de message queue in jouw applicatie inzet. Je zit niet vast aan JSON of aan een specifiek message queue-protocol.

ZMQ en Python

ZMQ integreert prima met een grote hoeveelheid talen. Graag wil ik de library PyZMQ toelichten en illustreren dat werken met de ZMQ library met Python redelijk laagdrempelig is. Om dit uit te beelden, heb ik een rudimentaire chatapplicatie gemaakt en probeer ik stap voor stap uit te leggen welke rol ZMQ hierin speelt. Ik focus me hierbij vooral op de werking van ZMQ en niet zozeer op Python en specifieke implementatiedetails van de chatapplicatie. Het doel is ook niet om een superfraaie chatapp te maken, maar vooral om een aantal concepten uit te beelden van een berichtenwachtrij en ZMQ.

ZMQ messaging patterns

Om te weten hoe we met ZMQ moeten werken, dienen we ons eerst bekend te maken met ZMQ messaging patterns. Dit zijn netwerkgerichte, bouwkundige patronen die de communicatiestroom tussen onderling verbonden systemen beschrijven. Je hebt redelijk veel vrijheid in hoe je kiest berichten te versturen en ontvangen. Uiteindelijk beslist de ontwikkelaar dit. Er zijn wel een aantal richtlijnen waar jij je aan kunt houden. De documentatie van PyZMQ beschrijft goed hoe je deze message patterns kunt toepassen. Graag bespreek ik twee van deze patronen, namelijk Client/Server en Publish/Subscribe.


Ben jij ontwikkelaar?

Is dit voor jou allemaal gesneden koek en heb jij deze uitleg niet nodig? Keurig. Heb je daarnaast ervaring met Python en Django? Nog beter! We zoeken jou.

Bekijk de vacature →


Client/Server-patroon

Deze variant van het berichtpatroon is het meest centralistisch en herkenbaar voor veel mensen. Je hebt een server en je hebt clients die ermee verbinden. Heel vergelijkbaar met bijvoorbeeld een webserver. Een centrale eenheid die verbindingen accepteert van verschillende clients. Om te illustreren hoe dit werkt, heb ik een implementatie gemaakt van een chatapp met ZMQ. Deze chatapplicatie heeft een server- en een clientcomponent.

Client/Server-patroon in ZMQ.

De server

De server moet in staat zijn om meerdere clients te accepteren en hier is het Client/Server-patroon uitermate geschikt voor. Hieronder leg ik uit hoe je dit in Python doet. Hiermee wordt de ZMQ-server geïnitialiseerd en luistert het voor connecties op poort 5556.

port = “5556”
context = zmq.Context()
socket = self.context.socket(zmq.REP)
socket.bind("tcp://*:%s" % port)

Om de berichten te kunnen verwerken, moeten we een stapje toevoegen. De socket.recv() is blocking en unblock wanneer er een bericht binnenkomt. De client verwacht een response die met socket.send_string verstuurd wordt.

whlle True:
	message = self.socket.recv()
	socket.send_string(“Ok”)

Dit is wat je van ZMQ nodig hebt om een server te draaien die berichten ontvangt. Het prettige aan ZMQ is dat het message queue is, dus je hoeft je niet druk te maken over threading. Alle berichten worden intern gebufferd. Als er tien berichten op de server door verschillende clients worden afgevuurd, dan worden ze door ZMQ sequentieel met recv() afgehandeld. Deze code samengevat in een Python Class ziet er dan als volgt uit.

def __init__(self, port):
	self.context = zmq.Context()
	self.socket = self.context.socket(zmq.REP)
	self.socket.bind("tcp://*:%s" % port)

def start(self,cls)
	while True:
		print("Waiting for incoming messages..")
		# Wait for next request from client
		message = self.socket.recv()
		print ("Received request: ", message)
		message_type = message.decode().split(':')[0]
		message_value = message.decode().split(':')[1]
		self.socket.send_string(cls.handle_incoming_message(message_type, message_value))

De client

De initialisatiecode voor de client lijkt veel op die van de server. Eigenlijk het grootste verschil is dat we de socket mode naar zmq.REQ zetten voor de client en voor de server is het zmq.REP.

context = zmq.Context()
socket = self.context.socket(zmq.REQ)

Om te verbinden met server, voeren we socket.connect uit.

socket.connect("tcp://localhost:5556")

Om berichten te verzenden, voeren we socket.send_string uit.

self.socket.send_string(‘hello world’)

Gezien in ZMQ het Client/Server-patroon van een request/response-structuur gebruikmaakt, moeten we nadat het bericht verzonden is ook luisteren naar een response. Dat gebeurt hiermee.

message = self.socket.recv()

Alles in een Python Class samengevat, ziet dit er als volgt uit.

class ZMQClient:
	def __init__(self, port):
		self.context = zmq.Context()
		self.socket = self.context.socket(zmq.REQ)
		self.port = port
	def connect(self):
		print("Connecting to server...")
		self.socket.connect("tcp://localhost:%s" % self.port)
def send_string(self,string):
		self.socket.send_string(string)
		message = self.socket.recv()
		return str(message) == "Ok"

Nu we de bouwstenen voor het versturen van berichten hebben, kunnen we de chatapp bouwen. De code die ik geschreven heb om alles te laten werken, heeft verder niet zoveel met ZMQ te maken. De details daarvan laat ik daarom even zitten. De code voor de chatapp is beschikbaar op GitHub.

Publish/Subscribe-patroon

De chatapp is nog niet compleet met alleen een Server/Client-patroon. Om het bruikbaar te maken, moeten we ook in staat zijn om berichten te lezen. Op dit moment kunnen we alleen berichten naar de server sturen. De kunnen we in het Client/Server-patroon prima oplossen. Echter, omdat we graag willen laten zien hoe ZMQ werkt, gaan we met het zogenaamde Publish/Subscribe-patroon deze functionaliteit bouwen.

In vergelijking met het Client/Server-patroon ligt voor berichten versturen het initiatief bij de server. Een abonnee (subscriber) schrijft zich in bij een uitgever (publisher). Als de publisher een bericht publiceert, dan wordt dit bericht naar alle abonnees gepropageerd. We gaan dus Publish/Subscribe aan de chatapp toevoegen om alle clients van nieuwe berichten op de hoogte te stellen.

Publish/Subscribe-patroon in ZMQ.

De publisher

De publisher is verantwoordelijk om de berichten naar alle subscribers te versturen. De initialisatiecode voor de publisher lijkt ook veel op de server. Met de onderstaande code maak je een ZMQ-context met het type zmq.PUB om de publisher te initialiseren.

self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUB)

We moeten nu de publisher op een andere poort starten, want de server draait immers al op 5556. Dat gebeurt met deze code.

socket.bind("tcp://*:%s" % 5557)

Het publiceren van berichten bestaat uit twee componenten, het berichttopic en een payload (de inhoud van een bericht). Het belang van het topic wordt straks duidelijk bij de subscriber. Een bericht via de publisher versturen, gaat als volgt. Hierbij is het belangrijk dat het topic en de payload met een spatie gescheiden worden.

self.socket.send_string(f"{topic} {payload} ")

De code in zijn geheel ziet er dan als volgt uit.

class ZMQPublisher:
def __init__(self, port):
		self.context = zmq.Context()
		self.socket = self.context.socket(zmq.PUB)
		self.port = port
def start(self):
		self.socket.bind("tcp://*:%s" % self.port)
def publish(self,username, channel_name, messagedata):
		topic = channel_name
		self.socket.send_string(f"{topic} {username}:{messagedata} ")
		print(f"sending update to subscriber: {topic} {messagedata}")

De subscriber

De subscriber ontvangt de berichten van de publisher. Net zoals alle ZMQ-initialisatiecode stelt de initialisatie niet veel voor.

self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)

Waarin het Publisher/Subscriber-patroon vooral in verschilt, is dat je kunt kiezen op welke berichten je wilt aboneren. De publisher verstuurt berichtem met een bepaald topic en je kunt de client dus abonneren op een bericht met een bepaald topic.

self.socket.connect ("tcp://localhost:%s" % self.port)
self.socket.setsockopt_string(zmq.SUBSCRIBE, channel)

We hebben ons nu op nieuwe berichten geabonneerd, maar kunnen geen berichten ontvangen. Hiervoor moeten we nog wat code gebruiken. Het ontvangen van berichten gaat in Publish/Subscribe exact hetzelfde als we gezien hebben in Client/Server door middel van een blokkerende socket.recv().

message = self.socket.recv()

De uiteindelijke subscriber-code ziet er dan als volgt uit.

class ZMQSubscriber:
def __init__(self, port):
		self.context = zmq.Context()
		self.socket = self.context.socket(zmq.SUB)
		self.port = port
def subscribe(self, channel):
		self.socket.connect ("tcp://localhost:%s" % self.port)
		self.socket.setsockopt_string(zmq.SUBSCRIBE, channel)
		worker = threading.Thread(target=self.fetch_updates)
		worker.start()
def fetch_updates(self):
		while True:
			message = self.socket.recv()
			messagedata = message.decode().split()
			topic = messagedata[0]
			del messagedata[0
			message_value = " ".join(messagedata)
			print(message_value)

We gebruiken het Publisher/Subscriber-patroon om de verschillende clients van updates op de hoogte te stellen. Als client1 een bericht verstuurt, dan willen we client2 daarvan op de hoogte stellen. Publish/Subscribe is hier erg geschikt voor, omdat we de updates vanuit de server willen versturen. Om te zien hoe dit uiteindelijk werkt, kun je op GitHub kijken.

Tot slot

Met deze voorbeelden heb ik proberen te illustreren hoe een message queue zoals ZMQ toegepast kan worden voor een chatapp. Op GitHub staat een README hoe je de chatapp kunt draaien. Het is wellicht niet de beste use case en ik zou de app ook verre van ideaal willen noemen, maar het geeft wel inzicht in hoe berichten tussen twee apps uitgewisseld worden.

Dit artikel is natuurlijk nog maar het tipje van de ijsberg. Het concept message queues is veel breder. Alleen voor ZMQ is er nog genoeg te bespreken. Wil jij je er verder in verdiepen, dan zijn de onderstaande bronnen het waard om eens te raadplegen. Veel succes als je ermee aan de slag gaat!

P.S. Blijf op de hoogte en volg ons via Facebook, Twitter, Instagram, e-mail en RSS. Heb je vragen, tips of opmerkingen? Laat het achter als reactie. Vond je het artikel nuttig? Deel het dan met anderen!

Deel App Tweet Mail Deel

Geef een reactie

Het e-mailadres wordt niet gepubliceerd. Vereiste velden zijn gemarkeerd met *