Python Script for Bulk Ping Testing of Multiple Hosts
This article demonstrates how to use a Python script to perform batch ICMP ping tests across dozens or hundreds of network devices, providing a reusable Pinger class, detailed code, and usage examples for efficient network health monitoring.
The ping command is a fundamental tool for network troubleshooting, but manually pinging dozens or hundreds of devices is impractical; this article shows how to automate bulk ping testing with Python.
The core of the solution is the Pinger class, which implements ICMP checksum calculation, packet construction, sending, and receiving logic. It provides methods such as do_checksum , send_ping , receive_pong , ping_once , and ping to manage the entire ping lifecycle.
Usage is illustrated by iterating over an IP range (e.g., 192.168.242.1 to 192.168.242.254 ), creating a Pinger instance for each host, invoking ping_once() , and printing the result; responsive hosts are collected in an alive list.
Because the script uses raw sockets to send ICMP packets, it must be run with root (or administrator) privileges; otherwise a permission error is raised.
Full source code:
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import os
import argparse
import socket
import struct
import select
import time
ICMP_ECHO_REQUEST = 8 # Platform specific
DEFAULT_TIMEOUT = 0.1
DEFAULT_COUNT = 4
class Pinger(object):
""" Pings to a host -- the Pythonic way"""
def __init__(self, target_host, count=DEFAULT_COUNT, timeout=DEFAULT_TIMEOUT):
self.target_host = target_host
self.count = count
self.timeout = timeout
def do_checksum(self, source_string):
""" Verify the packet integrity """
sum = 0
max_count = (len(source_string)/2)*2
count = 0
while count < max_count:
val = source_string[count + 1]*256 + source_string[count]
sum = sum + val
sum = sum & 0xffffffff
count = count + 2
if max_count < len(source_string):
sum = sum + ord(source_string[len(source_string) - 1])
sum = sum & 0xffffffff
sum = (sum >> 16) + (sum & 0xffff)
sum = sum + (sum >> 16)
answer = ~sum
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return answer
def receive_pong(self, sock, ID, timeout):
"""Receive ping from the socket."""
time_remaining = timeout
while True:
start_time = time.time()
readable = select.select([sock], [], [], time_remaining)
time_spent = (time.time() - start_time)
if readable[0] == []: # Timeout
return
time_received = time.time()
recv_packet, addr = sock.recvfrom(1024)
icmp_header = recv_packet[20:28]
type, code, checksum, packet_ID, sequence = struct.unpack("bbHHh", icmp_header)
if packet_ID == ID:
bytes_In_double = struct.calcsize("d")
time_sent = struct.unpack("d", recv_packet[28:28 + bytes_In_double])[0]
return time_received - time_sent
time_remaining = time_remaining - time_spent
if time_remaining <= 0:
return
def send_ping(self, sock, ID):
"""Send ping to the target host"""
target_addr = socket.gethostbyname(self.target_host)
my_checksum = 0
# Create a dummy header with a 0 checksum.
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1)
bytes_In_double = struct.calcsize("d")
data = (192 - bytes_In_double) * "Q"
data = struct.pack("d", time.time()) + bytes(data.encode('utf-8'))
# Get the checksum on the data and the dummy header.
my_checksum = self.do_checksum(header + data)
header = struct.pack(
"bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1
)
packet = header + data
sock.sendto(packet, (target_addr, 1))
def ping_once(self):
"""Returns the delay (in seconds) or none on timeout."""
icmp = socket.getprotobyname("icmp")
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
except socket.error as e:
if e.errno == 1:
e.msg += "ICMP messages can only be sent from root user processes"
raise socket.error(e.msg)
except Exception as e:
print("Exception: %s" % (e))
my_ID = os.getpid() & 0xFFFF
self.send_ping(sock, my_ID)
delay = self.receive_pong(sock, my_ID, self.timeout)
sock.close()
return delay
def ping(self):
"""Run the ping process"""
for i in range(self.count):
print("Ping to %s..." % self.target_host,)
try:
delay = self.ping_once()
except socket.gaierror as e:
print("Ping failed. (socket error: '%s')" % e[1])
break
if delay == None:
print("Ping failed. (timeout within %ssec.)" % self.timeout)
else:
delay = delay * 1000
print("Get pong in %0.4fms" % delay)
if __name__ == '__main__':
alive = []
host_prefix = '192.168.242.'
for i in range(1, 255):
host = host_prefix + str(i)
pinger = Pinger(target_host=host)
delay = pinger.ping_once()
if delay == None:
print("Ping %s 失败,超时2秒" % host)
else:
print("ping %s = %s ms" % (host, round(delay * 1000, 4)))
alive.append(host)
# time.sleep(0.5)The script can be run directly; it will iterate through the specified subnet, report which hosts respond to ICMP echo requests, and list the reachable hosts.
Original article link: www.yjsec.com/2020/11/07
Python Programming Learning Circle
A global community of Chinese Python developers offering technical articles, columns, original video tutorials, and problem sets. Topics include web full‑stack development, web scraping, data analysis, natural language processing, image processing, machine learning, automated testing, DevOps automation, and big data.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.