Observer Pattern and Publish‑Subscribe Pattern in Python with Practical Examples
This article explains the Observer and Publish‑Subscribe design patterns, compares their concepts and implementations, and provides ten practical Python code examples ranging from simple event notification to distributed messaging with libraries such as pubsub, redis, asyncio, Flask, threading, ZeroMQ, and PySide2.
Observer pattern and publish‑subscribe pattern are common software design patterns used for communication and event handling between objects, involving a Subject (publisher) and Observer (subscriber).
Observer pattern: defines a one‑to‑many dependency; when the subject’s state changes, all observers are notified. Implementation: the subject maintains a list of observers and calls their update methods. Typical Python use cases include GUI updates, network data changes, and game event handling.
Publish‑subscribe pattern: also a one‑to‑many relationship but decouples publisher and subscriber via a middleware such as a message queue. Implementation: publishers send messages to the middleware, subscribers receive messages of interest. Python scenarios include distributed systems, asynchronous task processing, and logging.
Below are ten practical Python examples demonstrating both patterns:
1. Simple observer notification mechanism:
class Subject:
def __init__(self):
self.observers = []
def attach(self, observer):
self.observers.append(observer)
def detach(self, observer):
self.observers.remove(observer)
def notify(self, message):
for observer in self.observers:
observer.update(message)
class Observer:
def update(self, message):
print("Received message:", message)
subject = Subject()
observer1 = Observer()
observer2 = Observer()
subject.attach(observer1)
subject.attach(observer2)
subject.notify("Hello, observers!")2. Simple event handler using observer pattern:
class Event:
def __init__(self, name):
self.name = name
class EventHandler:
def handle(self, event):
print("Handling event:", event.name)
class EventDispatcher:
def __init__(self):
self.handlers = []
def add_handler(self, handler):
self.handlers.append(handler)
def dispatch(self, event):
for handler in self.handlers:
handler.handle(event)
event1 = Event("Event 1")
event2 = Event("Event 2")
handler1 = EventHandler()
handler2 = EventHandler()
dispatcher = EventDispatcher()
dispatcher.add_handler(handler1)
dispatcher.add_handler(handler2)
dispatcher.dispatch(event1)
dispatcher.dispatch(event2)3. Publish‑subscribe using the pubsub library:
from pubsub import pub
def subscriber(message):
print("Received message:", message)
pub.subscribe(subscriber, "topic")
pub.sendMessage("topic", message="Hello, subscribers!")4. Distributed messaging with redis as a message queue:
import redis
class Publisher:
def __init__(self):
self.redis = redis.Redis()
def publish(self, channel, message):
self.redis.publish(channel, message)
class Subscriber:
def __init__(self):
self.redis = redis.Redis()
def subscribe(self, channel):
pubsub = self.redis.pubsub()
pubsub.subscribe(channel)
for message in pubsub.listen():
print("Received message:", message["data"])
publisher = Publisher()
subscriber = Subscriber()
publisher.publish("channel", "Hello, subscribers!")
subscriber.subscribe("channel")5. Observer pattern in a simple GUI using tkinter :
import tkinter as tk
class Observable:
def __init__(self):
self.observers = []
def attach(self, observer):
self.observers.append(observer)
def detach(self, observer):
self.observers.remove(observer)
def notify(self, event):
for observer in self.observers:
observer.update(event)
class Observer:
def update(self, event):
print("Received event:", event)
class App(tk.Tk):
def __init__(self):
super().__init__()
self.observable = Observable()
self.observer = Observer()
self.observable.attach(self.observer)
self.button = tk.Button(self, text="Click me", command=self.handle_click)
self.button.pack()
def handle_click(self):
event = "Button clicked"
self.observable.notify(event)
app = App()
app.mainloop()6. Asynchronous event handling with asyncio :
import asyncio
class Subject:
def __init__(self):
self.observers = []
def attach(self, observer):
self.observers.append(observer)
def detach(self, observer):
self.observers.remove(observer)
def notify(self, message):
for observer in self.observers:
observer.update(message)
class Observer:
def update(self, message):
print("Received message:", message)
async def main():
subject = Subject()
observer1 = Observer()
observer2 = Observer()
subject.attach(observer1)
subject.attach(observer2)
subject.notify("Hello, observers!")
asyncio.run(main())7. Simple web application using flask to demonstrate publish‑subscribe:
from flask import Flask, request
from flask.signals import Namespace
app = Flask(__name__)
my_signals = Namespace()
event_signal = my_signals.signal("event")
@app.route("/publish", methods=["POST"])
def publish():
message = request.json["message"]
event_signal.send(message)
return "Message published"
def subscriber(message):
print("Received message:", message)
event_signal.connect(subscriber)
if __name__ == "__main__":
app.run()8. Multi‑threaded event handling with threading :
import threading
class Subject:
def __init__(self):
self.observers = []
def attach(self, observer):
self.observers.append(observer)
def detach(self, observer):
self.observers.remove(observer)
def notify(self, message):
for observer in self.observers:
observer.update(message)
class Observer:
def update(self, message):
print("Received message:", message)
def worker(subject):
subject.notify("Hello, observers!")
subject = Subject()
observer1 = Observer()
observer2 = Observer()
subject.attach(observer1)
subject.attach(observer2)
thread = threading.Thread(target=worker, args=(subject,))
thread.start()
thread.join()9. Distributed messaging with zeromq :
import zmq
import threading
context = zmq.Context()
def publisher():
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555")
socket.send_string("Hello, subscribers!")
def subscriber():
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")
socket.setsockopt(zmq.SUBSCRIBE, b"")
message = socket.recv_string()
print("Received message:", message)
publisher_thread = threading.Thread(target=publisher)
subscriber_thread = threading.Thread(target=subscriber)
publisher_thread.start()
subscriber_thread.start()
publisher_thread.join()
subscriber_thread.join()10. Observer pattern using PySide2 for a GUI application:
from PySide2.QtCore import QObject, Signal
from PySide2.QtWidgets import QApplication, QPushButton
class Subject(QObject):
event = Signal(str)
class Observer(QObject):
def __init__(self, name):
super().__init__()
self.name = name
def update(self, message):
print(f"{self.name} received message:", message)
app = QApplication([])
subject = Subject()
observer1 = Observer("Observer 1")
observer2 = Observer("Observer 2")
subject.event.connect(observer1.update)
subject.event.connect(observer2.update)
button = QPushButton("Click me")
button.clicked.connect(lambda: subject.event.emit("Hello, observers!"))
button.show()
app.exec_()These examples illustrate how the Observer and Publish‑Subscribe patterns can be applied across different domains to build flexible, loosely‑coupled systems; they can be adapted and extended according to specific project requirements.
Test Development Learning Exchange
Test Development Learning Exchange
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.