Using Locust to Load Test WebSocket and Redis with Heartbeat Simulation
This guide explains how to set up and run Locust load tests for WebSocket connections and Redis interactions in Python, including client implementations, heartbeat simulation, and practical tips for resource management and concurrency control.
1. Analyze Requirements
First, identify the key aspects to test:
WebSocket connection: simulate a client maintaining a long‑lived connection with the server.
Redis interaction: integrate the redis-py library in the Locust script to check queue status or other data.
Heartbeat mechanism: simulate the client periodically sending heartbeat messages to keep the session alive.
2. Preparation
Make sure the necessary Python packages are installed:
pip install locust websockets redis3. Write Locust Script
The following example demonstrates how to integrate WebSocket, Redis, and a heartbeat mechanism in a Locust script.
WebSocket client handling
import websocket
import time
from locust import User, task, between, events
class WebSocketClient:
def __init__(self, host):
self.host = host
self.ws = None
def connect(self):
self.ws = websocket.create_connection(self.host)
return "Connected"
def send(self, message):
start_time = time.time()
try:
self.ws.send(message)
response = self.ws.recv()
total_time = int((time.time() - start_time) * 1000)
events.request_success.fire(request_type="WebSocket", name="send", response_time=total_time, response_length=len(response))
return response
except Exception as e:
total_time = int((time.time() - start_time) * 1000)
events.request_failure.fire(request_type="WebSocket", name="send", response_time=total_time, exception=e)
def heartbeat(self):
"""Simulate sending heartbeat"""
while True:
time.sleep(30) # heartbeat interval
self.send("heartbeat")
def disconnect(self):
if self.ws:
self.ws.close()Redis client handling
import redis
class RedisClient:
def __init__(self, host='localhost', port=6379, db=0):
self.client = redis.StrictRedis(host=host, port=port, db=db, decode_responses=True)
def check_queue_status(self, queue_name):
return self.client.llen(queue_name)User class definition
class HospitalQueueUser(User):
abstract = True
def __init__(self, *args, **kwargs):
super(HospitalQueueUser, self).__init__(*args, **kwargs)
self.ws_client = WebSocketClient("ws://your_websocket_server")
self.redis_client = RedisClient()
@task(1)
def ws_task(self):
response = self.ws_client.send("register_patient")
print(f"Received: {response}")
@task(2)
def check_queue(self):
status = self.redis_client.check_queue_status("patient_queue")
print(f"Queue status: {status}")
def on_start(self):
"""Called when a virtual user starts"""
result = self.ws_client.connect()
print(result)
gevent.spawn(self.ws_client.heartbeat) # start heartbeat thread
def on_stop(self):
"""Called when a virtual user stops"""
self.ws_client.disconnect()4. Run Test
Save the script and execute Locust from the command line:
locust -f your_locustfile.pyThen open a browser at http://localhost:8089 , specify the number of users to simulate and the spawn rate, and click “Start swarming” to begin the test.
5. Notes
Resource management: ensure WebSocket and Redis connections are properly closed to avoid leaks.
Concurrency control: adjust the number of concurrent users and heartbeat frequency according to your environment to prevent excessive load.
Data consistency: monitor Redis data changes during the test to verify that the system behaves as expected.
Test Development Learning Exchange
Test Development Learning Exchange
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.