Backend Development 9 min read

Using RQ (Redis Queue) in Python for Asynchronous Task Processing

This guide introduces the Python RQ library, shows how to install it with Redis, and provides multiple code examples covering queue creation, task execution, priority handling, timeouts, batch processing, scheduling, retries, and status tracking for building robust asynchronous back‑end systems.

Test Development Learning Exchange
Test Development Learning Exchange
Test Development Learning Exchange
Using RQ (Redis Queue) in Python for Asynchronous Task Processing

RQ (Redis Queue) is a simple Python library that uses Redis as a backend to store task queues, offering an easy‑to‑use API for enqueuing tasks and executing them asynchronously.

Installation

First install RQ and Redis:

pip install rq redis

Make sure the Redis service is running; install Redis from the official website if needed.

Example 1: Creating a queue and a task

import time
from rq import Queue
from worker import conn  # assume worker.py defines the Redis connection

def count_words_at_url(url):
    time.sleep(5)  # simulate a time‑consuming operation
    return len(url)

q = Queue(connection=conn)  # create queue
job = q.enqueue(count_words_at_url, 'http://example.com')  # enqueue task
print(job.result)  # prints None because the job is pending

This example defines a simple task that simulates a delay and enqueues it; the result is initially None.

Example 2: Retrieving task results

from rq import Queue
from worker import conn

def count_words_at_url(url):
    time.sleep(5)
    return len(url)

q = Queue(connection=conn)
job = q.enqueue(count_words_at_url, 'http://example.com')
while job.result is None:
    print("Waiting for result...")
    time.sleep(2)
print(job.result)  # prints 19

The loop polls the job until it finishes and then prints the result.

Example 3: Defining a worker process

Create worker.py to define the Redis connection and start the worker:

# worker.py
from rq import Worker, Queue, Connection
from redis import Redis

listen = ['high', 'default', 'low']
redis_url = 'redis://localhost:6379'  # your Redis address
conn = Redis.from_url(redis_url)

if __name__ == '__main__':
    with Connection(conn):
        worker = Worker(map(Queue, listen))
        worker.work()

Run the worker with python worker.py .

Example 4: Using different queue priorities

from rq import Queue
from worker import conn

def count_words_at_url(url):
    time.sleep(5)
    return len(url)

high_q = Queue('high', connection=conn)
default_q = Queue('default', connection=conn)
low_q = Queue('low', connection=conn)

high_job = high_q.enqueue(count_words_at_url, 'http://example.com')
default_job = default_q.enqueue(count_words_at_url, 'http://example.com')
low_job = low_q.enqueue(count_words_at_url, 'http://example.com')

Three queues with different priorities are defined; the worker processes them according to the priority order.

Example 5: Task timeout handling

from rq import Queue, Timeout
from worker import conn

def long_running_task():
    time.sleep(10)
    return "Task completed"

q = Queue(connection=conn)
job = q.enqueue(long_running_task, timeout=5)  # 5‑second timeout
try:
    result = job.result
except Timeout:
    print("Task timed out")

If the task exceeds the 5‑second limit, a Timeout exception is raised.

Example 6: Bulk task processing

from rq import Queue
from worker import conn

def process_item(item):
    time.sleep(2)
    return f"Processed {item}"

q = Queue(connection=conn)
items = ['item1', 'item2', 'item3']
jobs = [q.enqueue(process_item, item) for item in items]
for job in jobs:
    print(job.result)  # prints None because jobs are pending

Multiple tasks are enqueued at once and their Job objects are collected.

Example 7: Scheduled (periodic) tasks

Install rq-scheduler first:

pip install rq-scheduler

Then add a scheduler to worker.py :

# worker.py
from rq import Worker, Queue, Connection
from redis import Redis
from rq_scheduler import Scheduler
from datetime import datetime

listen = ['high', 'default', 'low']
redis_url = 'redis://localhost:6379'
conn = Redis.from_url(redis_url)

if __name__ == '__main__':
    with Connection(conn):
        scheduler = Scheduler(queue_name='default', connection=conn)
        scheduler.schedule(
            scheduled_time=datetime.utcnow(),
            func=count_words_at_url,
            args=['http://example.com'],
            interval=60,  # seconds
            repeat=None  # infinite repeats
        )
        worker = Worker(map(Queue, listen))
        worker.work()
# start with: python worker.py

The scheduler enqueues count_words_at_url every minute.

Example 8: Task retry mechanism

from rq import Queue, Retry
from worker import conn

def failing_task():
    raise Exception("Task failed")

q = Queue(connection=conn)
job = q.enqueue(failing_task, retry=Retry(max=3, interval=5))
try:
    result = job.result
except Exception as e:
    print(f"Task failed: {e}")

The job will automatically retry up to three times with a 5‑second interval between attempts.

Example 9: Tracking task status and notifications

from rq import Queue
from worker import conn

def notify_on_completion(job, connection, result, *args, **kwargs):
    print(f"Task completed with result: {result}")

q = Queue(connection=conn)
job = q.enqueue(count_words_at_url, 'http://example.com', meta={'on_complete': notify_on_completion})
print(job.result)  # None while pending

The callback prints a message when the task finishes.

Summary

Through these examples you can see how RQ helps implement asynchronous task queues in Python and provides advanced features such as retries, scheduled jobs, and status tracking.

pythonBackend DevelopmentRedisasynchronousTask QueueRQ
Test Development Learning Exchange
Written by

Test Development Learning Exchange

Test Development Learning Exchange

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.