Fundamentals 11 min read

Observer Pattern and Publish‑Subscribe Pattern in Python with Practical Examples

This article explains the Observer and Publish‑Subscribe design patterns, compares their concepts and implementations, and provides ten practical Python code examples ranging from simple event notification to distributed messaging with libraries such as pubsub, redis, asyncio, Flask, threading, ZeroMQ, and PySide2.

Test Development Learning Exchange
Test Development Learning Exchange
Test Development Learning Exchange
Observer Pattern and Publish‑Subscribe Pattern in Python with Practical Examples

Observer pattern and publish‑subscribe pattern are common software design patterns used for communication and event handling between objects, involving a Subject (publisher) and Observer (subscriber).

Observer pattern: defines a one‑to‑many dependency; when the subject’s state changes, all observers are notified. Implementation: the subject maintains a list of observers and calls their update methods. Typical Python use cases include GUI updates, network data changes, and game event handling.

Publish‑subscribe pattern: also a one‑to‑many relationship but decouples publisher and subscriber via a middleware such as a message queue. Implementation: publishers send messages to the middleware, subscribers receive messages of interest. Python scenarios include distributed systems, asynchronous task processing, and logging.

Below are ten practical Python examples demonstrating both patterns:

1. Simple observer notification mechanism:

class Subject:
    def __init__(self):
        self.observers = []
    def attach(self, observer):
        self.observers.append(observer)
    def detach(self, observer):
        self.observers.remove(observer)
    def notify(self, message):
        for observer in self.observers:
            observer.update(message)

class Observer:
    def update(self, message):
        print("Received message:", message)

subject = Subject()
observer1 = Observer()
observer2 = Observer()
subject.attach(observer1)
subject.attach(observer2)
subject.notify("Hello, observers!")

2. Simple event handler using observer pattern:

class Event:
    def __init__(self, name):
        self.name = name

class EventHandler:
    def handle(self, event):
        print("Handling event:", event.name)

class EventDispatcher:
    def __init__(self):
        self.handlers = []
    def add_handler(self, handler):
        self.handlers.append(handler)
    def dispatch(self, event):
        for handler in self.handlers:
            handler.handle(event)

event1 = Event("Event 1")
event2 = Event("Event 2")
handler1 = EventHandler()
handler2 = EventHandler()
dispatcher = EventDispatcher()
dispatcher.add_handler(handler1)
dispatcher.add_handler(handler2)
dispatcher.dispatch(event1)
dispatcher.dispatch(event2)

3. Publish‑subscribe using the pubsub library:

from pubsub import pub

def subscriber(message):
    print("Received message:", message)

pub.subscribe(subscriber, "topic")
pub.sendMessage("topic", message="Hello, subscribers!")

4. Distributed messaging with redis as a message queue:

import redis

class Publisher:
    def __init__(self):
        self.redis = redis.Redis()
    def publish(self, channel, message):
        self.redis.publish(channel, message)

class Subscriber:
    def __init__(self):
        self.redis = redis.Redis()
    def subscribe(self, channel):
        pubsub = self.redis.pubsub()
        pubsub.subscribe(channel)
        for message in pubsub.listen():
            print("Received message:", message["data"])

publisher = Publisher()
subscriber = Subscriber()
publisher.publish("channel", "Hello, subscribers!")
subscriber.subscribe("channel")

5. Observer pattern in a simple GUI using tkinter :

import tkinter as tk

class Observable:
    def __init__(self):
        self.observers = []
    def attach(self, observer):
        self.observers.append(observer)
    def detach(self, observer):
        self.observers.remove(observer)
    def notify(self, event):
        for observer in self.observers:
            observer.update(event)

class Observer:
    def update(self, event):
        print("Received event:", event)

class App(tk.Tk):
    def __init__(self):
        super().__init__()
        self.observable = Observable()
        self.observer = Observer()
        self.observable.attach(self.observer)
        self.button = tk.Button(self, text="Click me", command=self.handle_click)
        self.button.pack()
    def handle_click(self):
        event = "Button clicked"
        self.observable.notify(event)

app = App()
app.mainloop()

6. Asynchronous event handling with asyncio :

import asyncio

class Subject:
    def __init__(self):
        self.observers = []
    def attach(self, observer):
        self.observers.append(observer)
    def detach(self, observer):
        self.observers.remove(observer)
    def notify(self, message):
        for observer in self.observers:
            observer.update(message)

class Observer:
    def update(self, message):
        print("Received message:", message)

async def main():
    subject = Subject()
    observer1 = Observer()
    observer2 = Observer()
    subject.attach(observer1)
    subject.attach(observer2)
    subject.notify("Hello, observers!")

asyncio.run(main())

7. Simple web application using flask to demonstrate publish‑subscribe:

from flask import Flask, request
from flask.signals import Namespace

app = Flask(__name__)
my_signals = Namespace()
event_signal = my_signals.signal("event")

@app.route("/publish", methods=["POST"])
def publish():
    message = request.json["message"]
    event_signal.send(message)
    return "Message published"

def subscriber(message):
    print("Received message:", message)

event_signal.connect(subscriber)

if __name__ == "__main__":
    app.run()

8. Multi‑threaded event handling with threading :

import threading

class Subject:
    def __init__(self):
        self.observers = []
    def attach(self, observer):
        self.observers.append(observer)
    def detach(self, observer):
        self.observers.remove(observer)
    def notify(self, message):
        for observer in self.observers:
            observer.update(message)

class Observer:
    def update(self, message):
        print("Received message:", message)

def worker(subject):
    subject.notify("Hello, observers!")

subject = Subject()
observer1 = Observer()
observer2 = Observer()
subject.attach(observer1)
subject.attach(observer2)
thread = threading.Thread(target=worker, args=(subject,))
thread.start()
thread.join()

9. Distributed messaging with zeromq :

import zmq
import threading

context = zmq.Context()

def publisher():
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:5555")
    socket.send_string("Hello, subscribers!")

def subscriber():
    socket = context.socket(zmq.SUB)
    socket.connect("tcp://localhost:5555")
    socket.setsockopt(zmq.SUBSCRIBE, b"")
    message = socket.recv_string()
    print("Received message:", message)

publisher_thread = threading.Thread(target=publisher)
subscriber_thread = threading.Thread(target=subscriber)
publisher_thread.start()
subscriber_thread.start()
publisher_thread.join()
subscriber_thread.join()

10. Observer pattern using PySide2 for a GUI application:

from PySide2.QtCore import QObject, Signal
from PySide2.QtWidgets import QApplication, QPushButton

class Subject(QObject):
    event = Signal(str)

class Observer(QObject):
    def __init__(self, name):
        super().__init__()
        self.name = name
    def update(self, message):
        print(f"{self.name} received message:", message)

app = QApplication([])
subject = Subject()
observer1 = Observer("Observer 1")
observer2 = Observer("Observer 2")
subject.event.connect(observer1.update)
subject.event.connect(observer2.update)
button = QPushButton("Click me")
button.clicked.connect(lambda: subject.event.emit("Hello, observers!"))
button.show()
app.exec_()

These examples illustrate how the Observer and Publish‑Subscribe patterns can be applied across different domains to build flexible, loosely‑coupled systems; they can be adapted and extended according to specific project requirements.

design patternsPythonEvent HandlingObserver PatternPublish-Subscribe
Test Development Learning Exchange
Written by

Test Development Learning Exchange

Test Development Learning Exchange

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.