Databases 7 min read

Optimistic and Pessimistic Locking in PostgreSQL with Python Demo

The article explains how concurrent updates can cause data loss in high‑traffic scenarios and demonstrates using optimistic and pessimistic locking in PostgreSQL with Python, including EFCore‑style token checks and a retry mechanism via the tenacity library.

Python Programming Learning Circle
Python Programming Learning Circle
Python Programming Learning Circle
Optimistic and Pessimistic Locking in PostgreSQL with Python Demo

In high‑concurrency environments, simultaneous updates to the same row can lead to lost updates; for example, two users reading a count of 1 and both incrementing it may result in a final value of 2 instead of 3.

Optimistic Lock – EFCore uses a concurrency token by adding a condition count = oldvalue to the UPDATE statement, ensuring the row is only updated if the count has not changed since it was read.

<code>update testblock set count = 2 where count=oldcount(读取的旧值)</code>

When the optimistic lock is applied, one of the concurrent operations will fail because the condition no longer holds after the first transaction commits.

Pessimistic Lock – The transaction explicitly locks the selected row (e.g., SELECT ... FROM testlock FOR UPDATE ) so that other sessions must wait until the lock is released, which can lead to deadlocks.

<code>select … from testblock for update</code>

The demo code shows how to use the tenacity library to retry failed operations up to seven times. Two classes are provided: Test (using a pessimistic lock) and TestOptimistic (using an optimistic lock with a retry decorator).

<code>from PGTool.PGTools import PGServer
from threading import Thread
import time
from tenacity import retry, stop_after_attempt, RetryError

class Test:
    def __init__(self) -> None:
        self.server = PGServer('postgres', 'postgres', '127.0.0.1', 5432, 'postgres')
        self.server.connect()
    def update(self, sql):
        id, count = self.server.fetchOne('select id,count from testlock where id=%(id)s for update', {'id': 1})
        print(id)
        time.sleep(10)
        self.server.execute(sql, {"count": int(count)+1, "id": id})

class TestOptimistic:
    def __init__(self) -> None:
        self.server = PGServer('postgres', 'postgres', '127.0.0.1', 5432, 'postgres')
        self.server.connect()
    @retry(stop=stop_after_attempt(7))
    def update(self, sql):
        try:
            id, oldcount = self.server.fetchOne('select id,count from testlock where id=%(id)s', {'id': 1})
            print(oldcount)
            self.server.execute(sql, {"count": int(oldcount)+1, "id": id, 'oldcount': oldcount})
            self.server.closeConnect()
        except RetryError as e:
            print(e)
    def disconnetDB(self):
        self.server.closeConnect()
        print('after retry')

# Pessimistic lock example (commented out)
# t1 = Thread(target=Test().update, args=('update testlock set count=%(count)s where id=%(id)s',))
# t2 = Thread(target=Test().update, args=('update testlock set count=%(count)s where id=%(id)s',))
# t1.start(); t2.start()

# Optimistic lock example
t1 = Thread(target=TestOptimistic().update, args=('update testlock set count=%(count)s where id=%(id)s and count=%(oldcount)s',))
t2 = Thread(target=TestOptimistic().update, args=('update testlock set count=%(count)s where id=%(id)s and count=%(oldcount)s',))
t1.start(); t2.start()</code>

The supporting PGTools.py module defines a PGServer class that wraps psycopg2 connections, providing methods to connect, execute statements, fetch one or all rows, and close the connection, raising an exception if an UPDATE affects no rows.

<code>import psycopg2

class PGServer:
    def __init__(self, user, password, host, port, db) -> None:
        self.user = user
        self.password = password
        self.host = host
        self.port = port
        self.db = db
    def connect(self):
        self.conn = psycopg2.connect(host=self.host, database=self.db, user=self.user, password=self.password, port=self.port)
        self.cour = self.conn.cursor()
    def execute(self, sql, data):
        self.cour.execute(sql, data)
        rowcount = self.cour.rowcount
        self.conn.commit()
        if rowcount <= 0:
            raise Exception('another has changed')
    def fetchOne(self, sql, data):
        self.cour.execute(sql, data)
        return self.cour.fetchone()
    def fetchAll(self, sql, data):
        self.cour.execute(sql, data)
        return self.cour.fetchall()
    def closeConnect(self):
        self.cour.close()
        self.conn.close()
</code>

*Disclaimer: This article is compiled from online sources; copyright belongs to the original author. Contact us for removal or licensing requests.

PythonDatabaseconcurrency controloptimistic lockPostgreSQLpessimistic lock
Python Programming Learning Circle
Written by

Python Programming Learning Circle

A global community of Chinese Python developers offering technical articles, columns, original video tutorials, and problem sets. Topics include web full‑stack development, web scraping, data analysis, natural language processing, image processing, machine learning, automated testing, DevOps automation, and big data.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.