Using RQ (Redis Queue) in Python for Asynchronous Task Processing
This guide introduces the Python RQ library, shows how to install it with Redis, and provides multiple code examples covering queue creation, task execution, priority handling, timeouts, batch processing, scheduling, retries, and status tracking for building robust asynchronous back‑end systems.
RQ (Redis Queue) is a simple Python library that uses Redis as a backend to store task queues, offering an easy‑to‑use API for enqueuing tasks and executing them asynchronously.
Installation
First install RQ and Redis:
pip install rq redisMake sure the Redis service is running; install Redis from the official website if needed.
Example 1: Creating a queue and a task
import time
from rq import Queue
from worker import conn # assume worker.py defines the Redis connection
def count_words_at_url(url):
time.sleep(5) # simulate a time‑consuming operation
return len(url)
q = Queue(connection=conn) # create queue
job = q.enqueue(count_words_at_url, 'http://example.com') # enqueue task
print(job.result) # prints None because the job is pendingThis example defines a simple task that simulates a delay and enqueues it; the result is initially None.
Example 2: Retrieving task results
from rq import Queue
from worker import conn
def count_words_at_url(url):
time.sleep(5)
return len(url)
q = Queue(connection=conn)
job = q.enqueue(count_words_at_url, 'http://example.com')
while job.result is None:
print("Waiting for result...")
time.sleep(2)
print(job.result) # prints 19The loop polls the job until it finishes and then prints the result.
Example 3: Defining a worker process
Create worker.py to define the Redis connection and start the worker:
# worker.py
from rq import Worker, Queue, Connection
from redis import Redis
listen = ['high', 'default', 'low']
redis_url = 'redis://localhost:6379' # your Redis address
conn = Redis.from_url(redis_url)
if __name__ == '__main__':
with Connection(conn):
worker = Worker(map(Queue, listen))
worker.work()Run the worker with python worker.py .
Example 4: Using different queue priorities
from rq import Queue
from worker import conn
def count_words_at_url(url):
time.sleep(5)
return len(url)
high_q = Queue('high', connection=conn)
default_q = Queue('default', connection=conn)
low_q = Queue('low', connection=conn)
high_job = high_q.enqueue(count_words_at_url, 'http://example.com')
default_job = default_q.enqueue(count_words_at_url, 'http://example.com')
low_job = low_q.enqueue(count_words_at_url, 'http://example.com')Three queues with different priorities are defined; the worker processes them according to the priority order.
Example 5: Task timeout handling
from rq import Queue, Timeout
from worker import conn
def long_running_task():
time.sleep(10)
return "Task completed"
q = Queue(connection=conn)
job = q.enqueue(long_running_task, timeout=5) # 5‑second timeout
try:
result = job.result
except Timeout:
print("Task timed out")If the task exceeds the 5‑second limit, a Timeout exception is raised.
Example 6: Bulk task processing
from rq import Queue
from worker import conn
def process_item(item):
time.sleep(2)
return f"Processed {item}"
q = Queue(connection=conn)
items = ['item1', 'item2', 'item3']
jobs = [q.enqueue(process_item, item) for item in items]
for job in jobs:
print(job.result) # prints None because jobs are pendingMultiple tasks are enqueued at once and their Job objects are collected.
Example 7: Scheduled (periodic) tasks
Install rq-scheduler first:
pip install rq-schedulerThen add a scheduler to worker.py :
# worker.py
from rq import Worker, Queue, Connection
from redis import Redis
from rq_scheduler import Scheduler
from datetime import datetime
listen = ['high', 'default', 'low']
redis_url = 'redis://localhost:6379'
conn = Redis.from_url(redis_url)
if __name__ == '__main__':
with Connection(conn):
scheduler = Scheduler(queue_name='default', connection=conn)
scheduler.schedule(
scheduled_time=datetime.utcnow(),
func=count_words_at_url,
args=['http://example.com'],
interval=60, # seconds
repeat=None # infinite repeats
)
worker = Worker(map(Queue, listen))
worker.work()
# start with: python worker.pyThe scheduler enqueues count_words_at_url every minute.
Example 8: Task retry mechanism
from rq import Queue, Retry
from worker import conn
def failing_task():
raise Exception("Task failed")
q = Queue(connection=conn)
job = q.enqueue(failing_task, retry=Retry(max=3, interval=5))
try:
result = job.result
except Exception as e:
print(f"Task failed: {e}")The job will automatically retry up to three times with a 5‑second interval between attempts.
Example 9: Tracking task status and notifications
from rq import Queue
from worker import conn
def notify_on_completion(job, connection, result, *args, **kwargs):
print(f"Task completed with result: {result}")
q = Queue(connection=conn)
job = q.enqueue(count_words_at_url, 'http://example.com', meta={'on_complete': notify_on_completion})
print(job.result) # None while pendingThe callback prints a message when the task finishes.
Summary
Through these examples you can see how RQ helps implement asynchronous task queues in Python and provides advanced features such as retries, scheduled jobs, and status tracking.
Test Development Learning Exchange
Test Development Learning Exchange
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.