Observer Pattern and Publish‑Subscribe Pattern in Python with Practical Code Examples
This article explains the concepts, differences, and implementation approaches of the Observer and Publish‑Subscribe design patterns, and provides ten practical Python code examples covering GUI, networking, asynchronous, and distributed scenarios.
Observer pattern and publish‑subscribe pattern are common software design patterns used to manage communication and event handling between objects.
Observer pattern defines a one‑to‑many dependency: when the subject’s state changes, all registered observers are notified. It is typically implemented by a subject maintaining a list of observers and invoking their update methods.
Publish‑subscribe pattern also creates a one‑to‑many relationship but introduces a message broker (or queue) to decouple publishers from subscribers, allowing flexible distribution of events.
Both patterns have many Python application scenarios, such as GUI updates, network data changes, game event handling, distributed systems, asynchronous processing, and logging.
The article provides ten practical Python examples demonstrating these patterns:
1. Observer pattern – simple event notification
class Subject:
def __init__(self):
self.observers = []
def attach(self, observer):
self.observers.append(observer)
def detach(self, observer):
self.observers.remove(observer)
def notify(self, message):
for observer in self.observers:
observer.update(message)
class Observer:
def update(self, message):
print("Received message:", message)
subject = Subject()
observer1 = Observer()
observer2 = Observer()
subject.attach(observer1)
subject.attach(observer2)
subject.notify("Hello, observers!")2. Observer pattern – event dispatcher
class Event:
def __init__(self, name):
self.name = name
class EventHandler:
def handle(self, event):
print("Handling event:", event.name)
class EventDispatcher:
def __init__(self):
self.handlers = []
def add_handler(self, handler):
self.handlers.append(handler)
def dispatch(self, event):
for handler in self.handlers:
handler.handle(event)
event1 = Event("Event 1")
event2 = Event("Event 2")
handler1 = EventHandler()
handler2 = EventHandler()
dispatcher = EventDispatcher()
dispatcher.add_handler(handler1)
dispatcher.add_handler(handler2)
dispatcher.dispatch(event1)
dispatcher.dispatch(event2)3. Publish‑subscribe using the pubsub library
from pubsub import pub
def subscriber(message):
print("Received message:", message)
pub.subscribe(subscriber, "topic")
pub.sendMessage("topic", message="Hello, subscribers!")4. Publish‑subscribe using redis as a message broker
import redis
class Publisher:
def __init__(self):
self.redis = redis.Redis()
def publish(self, channel, message):
self.redis.publish(channel, message)
class Subscriber:
def __init__(self):
self.redis = redis.Redis()
def subscribe(self, channel):
pubsub = self.redis.pubsub()
pubsub.subscribe(channel)
for message in pubsub.listen():
print("Received message:", message["data"])
publisher = Publisher()
subscriber = Subscriber()
publisher.publish("channel", "Hello, subscribers!")
subscriber.subscribe("channel")5. Observer pattern with tkinter GUI
import tkinter as tk
class Observable:
def __init__(self):
self.observers = []
def attach(self, observer):
self.observers.append(observer)
def detach(self, observer):
self.observers.remove(observer)
def notify(self, event):
for observer in self.observers:
observer.update(event)
class Observer:
def update(self, event):
print("Received event:", event)
class App(tk.Tk):
def __init__(self):
super().__init__()
self.observable = Observable()
self.observer = Observer()
self.observable.attach(self.observer)
self.button = tk.Button(self, text="Click me", command=self.handle_click)
self.button.pack()
def handle_click(self):
event = "Button clicked"
self.observable.notify(event)
app = App()
app.mainloop()6. Observer pattern with asyncio for asynchronous handling
import asyncio
class Subject:
def __init__(self):
self.observers = []
def attach(self, observer):
self.observers.append(observer)
def detach(self, observer):
self.observers.remove(observer)
def notify(self, message):
for observer in self.observers:
observer.update(message)
class Observer:
def update(self, message):
print("Received message:", message)
async def main():
subject = Subject()
observer1 = Observer()
observer2 = Observer()
subject.attach(observer1)
subject.attach(observer2)
subject.notify("Hello, observers!")
asyncio.run(main())7. Publish‑subscribe using flask web framework
from flask import Flask, request
from flask.signals import Namespace
app = Flask(__name__)
my_signals = Namespace()
event_signal = my_signals.signal("event")
@app.route("/publish", methods=["POST"])
def publish():
message = request.json["message"]
event_signal.send(message)
return "Message published"
def subscriber(message):
print("Received message:", message)
event_signal.connect(subscriber)
if __name__ == "__main__":
app.run()8. Observer pattern with threading for multithreaded events
import threading
class Subject:
def __init__(self):
self.observers = []
def attach(self, observer):
self.observers.append(observer)
def detach(self, observer):
self.observers.remove(observer)
def notify(self, message):
for observer in self.observers:
observer.update(message)
class Observer:
def update(self, message):
print("Received message:", message)
def worker(subject):
subject.notify("Hello, observers!")
subject = Subject()
observer1 = Observer()
observer2 = Observer()
subject.attach(observer1)
subject.attach(observer2)
thread = threading.Thread(target=worker, args=(subject,))
thread.start()
thread.join()9. Publish‑subscribe using zeromq for distributed messaging
import zmq
import threading
context = zmq.Context()
def publisher():
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555")
socket.send_string("Hello, subscribers!")
def subscriber():
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")
socket.setsockopt(zmq.SUBSCRIBE, b"")
message = socket.recv_string()
print("Received message:", message)
publisher_thread = threading.Thread(target=publisher)
subscriber_thread = threading.Thread(target=subscriber)
publisher_thread.start()
subscriber_thread.start()
publisher_thread.join()
subscriber_thread.join()10. Observer pattern with PySide2 Qt signals
from PySide2.QtCore import QObject, Signal
from PySide2.QtWidgets import QApplication, QPushButton
class Subject(QObject):
event = Signal(str)
class Observer(QObject):
def __init__(self, name):
super().__init__()
self.name = name
def update(self, message):
print(f"{self.name} received message:", message)
app = QApplication([])
subject = Subject()
observer1 = Observer("Observer 1")
observer2 = Observer("Observer 2")
subject.event.connect(observer1.update)
subject.event.connect(observer2.update)
button = QPushButton("Click me")
button.clicked.connect(lambda: subject.event.emit("Hello, observers!"))
button.show()
app.exec_()These examples illustrate how the Observer and Publish‑Subscribe patterns can be applied across different domains to build scalable, loosely‑coupled systems.
Test Development Learning Exchange
Test Development Learning Exchange
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.