How to Plot Real-Time Serial Data in Python with PySerial & PyQtGraph
Learn step-by-step how to install pyserial, read and write serial data in Python, and use pyqtgraph with threading to create a real-time waveform visualization, including code examples and tips for handling baud rates, timeouts, and data queues.
1. Install pyserial and basic usage
Install the serial communication library with pip install pyserial . Common methods include serial.Serial(0) to open the first port, ser.portstr to view the port name, ser.write() to send data, ser.read() or ser.readline() to receive data, and ser.close() to close the port. You can set parameters such as baudrate , bytesize , parity , stopbits , and timeout .
2. Basic serial code
<code>import serial
portx = "COM5"
bps = 9600
timeout = 1
ser = serial.Serial(portx, int(bps), timeout=timeout, parity=serial.PARITY_NONE, stopbits=1)
if ser.isOpen():
print("open success")
ser.write("hello".encode())
while True:
line = ser.readline()
if line:
print(line)
else:
print("open failed")
ser.close()
</code>3. Using pyqtgraph for real-time plotting
Install the visualization libraries with pip install pyqtgraph and pip install PyQt5 . Pyqtgraph leverages NumPy and Qt's GraphicsView for fast 2D/3D plotting, making it suitable for large‑data and interactive applications.
<code>import pyqtgraph as pg
import numpy as np
import array
app = pg.mkQApp()
win = pg.GraphicsWindow()
win.setWindowTitle('pyqtgraph real‑time plot')
win.resize(800, 500)
historyLength = 100
p = win.addPlot()
p.showGrid(x=True, y=True)
p.setRange(xRange=[0, historyLength], yRange=[-1.2, 1.2], padding=0)
p.setLabel('left', 'y / V')
p.setLabel('bottom', 'x / point')
curve = p.plot()
idx = 0
def plotData():
global idx
tmp = np.sin(np.pi / 50 * idx)
if len(data) < historyLength:
data.append(tmp)
else:
data[:-1] = data[1:]
data[-1] = tmp
curve.setData(data)
idx += 1
timer = pg.QtCore.QTimer()
timer.timeout.connect(plotData)
timer.start(50)
app.exec_()
</code>4. Real‑time plotting with multithreading
Run serial reading in a separate thread and update the plot from the main thread.
<code>import array, serial, threading, numpy as np, pyqtgraph as pg
i = 0
def Serial():
while True:
n = mSerial.inWaiting()
if n:
dat = int.from_bytes(mSerial.readline(1), byteorder='little')
if i < historyLength:
data[i] = dat
i += 1
else:
data[:-1] = data[1:]
data[-1] = dat
def plotData():
curve.setData(data)
if __name__ == "__main__":
app = pg.mkQApp()
win = pg.GraphicsWindow()
win.setWindowTitle('pyqtgraph multithreaded plot')
win.resize(800, 500)
historyLength = 200
data = np.zeros(historyLength, dtype='d')
p = win.addPlot()
p.showGrid(x=True, y=True)
p.setRange(xRange=[0, historyLength], yRange=[0, 255], padding=0)
p.setLabel('left', 'y / V')
p.setLabel('bottom', 'x / point')
curve = p.plot()
mSerial = serial.Serial('COM24', 19200)
if mSerial.isOpen():
print('open success')
mSerial.write('hello'.encode())
else:
print('open failed')
th1 = threading.Thread(target=Serial)
th1.start()
timer = pg.QtCore.QTimer()
timer.timeout.connect(plotData)
timer.start(50)
app.exec_()
</code>5. Communicating with a microcontroller for continuous monitoring
Use a queue to buffer incoming bytes, convert signed values, and update the plot at a fixed interval.
<code>import pyqtgraph as pg, array, serial, threading, numpy as np, time
from queue import Queue
q = Queue()
def Serial():
while True:
n = mSerial.inWaiting()
if n:
dat = int.from_bytes(mSerial.readline(1), byteorder='little')
if dat >> 7:
dat = 256 - dat
dat = -dat
q.put(dat)
def plotData():
global i
if i < historyLength:
data[i] = q.get()
i += 1
else:
data[:-1] = data[1:]
data[-1] = q.get()
curve.setData(data)
if __name__ == "__main__":
app = pg.mkQApp()
win = pg.GraphicsWindow()
win.setWindowTitle('pyqtgraph microcontroller plot')
win.resize(800, 500)
historyLength = 100
data = np.zeros(historyLength, dtype='d')
p = win.addPlot()
p.showGrid(x=True, y=True)
p.setRange(xRange=[0, historyLength], yRange=[-50, 50], padding=0)
curve = p.plot()
mSerial = serial.Serial('COM25', 19200)
if mSerial.isOpen():
print('open success')
mSerial.write('hello'.encode())
else:
print('open failed')
th1 = threading.Thread(target=Serial)
th1.start()
timer = pg.QtCore.QTimer()
timer.timeout.connect(plotData)
timer.start(1)
app.exec_()
</code>Python Programming Learning Circle
A global community of Chinese Python developers offering technical articles, columns, original video tutorials, and problem sets. Topics include web full‑stack development, web scraping, data analysis, natural language processing, image processing, machine learning, automated testing, DevOps automation, and big data.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.