Shell vs Python for System Automation: Which One Should You Use?
This article compares Shell and Python for system automation, presenting performance benchmarks across file processing, log analysis, and bulk server operations, and offers practical guidance on when to choose each language, migration strategies, code templates, common pitfalls, and best‑practice recommendations for ops engineers.
Overview
A recent need to collect system information from 200 servers using a simple Bash loop took over three hours. Re‑implementing the same task in Python with concurrency reduced the runtime to eight minutes, prompting a deeper look at when to use Shell versus Python.
Shell vs Python Positioning
Conclusion: Shell excels at simple command orchestration, while Python is better for complex data processing and logic control.
Shell Advantages
Seamless integration with Linux commands
Concise pipeline operations
Available on any Unix system without extra installation
Fast to write for simple tasks
Shell Disadvantages
No native data structures (arrays, dictionaries)
String manipulation is cumbersome
Weak error handling
Poor concurrency support
Maintainability suffers for complex scripts
Python Advantages
Rich data structures and extensive standard library
Powerful string and regex handling
Native concurrency (threading, asyncio)
Robust exception handling
Highly readable and maintainable code
Python Disadvantages
Requires a Python runtime
System commands must be invoked explicitly
Simple tasks may need more lines of code
Performance Comparison
Test 1 – Batch File Processing
Task: Scan a log directory, find files larger than 100 MB from the past 7 days, and list them sorted by size.
#!/bin/bash
# find_large_logs.sh
LOG_DIR="/var/log"
DAYS=7
SIZE_MB=100
find "$LOG_DIR" -type f -name "*.log" -mtime -$DAYS -size +${SIZE_MB}M \
-exec ls -lh {} \; 2>/dev/null | \
awk '{print $5, $9}' | sort -rh #!/usr/bin/env python3
# find_large_logs.py
import os
from pathlib import Path
from datetime import datetime, timedelta
LOG_DIR = "/var/log"
DAYS = 7
SIZE_MB = 100
def find_large_logs():
cutoff = datetime.now() - timedelta(days=DAYS)
size_bytes = SIZE_MB * 1024 * 1024
results = []
for log_file in Path(LOG_DIR).rglob("*.log"):
try:
stat = log_file.stat()
mtime = datetime.fromtimestamp(stat.st_mtime)
if mtime > cutoff and stat.st_size > size_bytes:
results.append((stat.st_size, str(log_file)))
except (PermissionError, FileNotFoundError):
continue
results.sort(reverse=True)
for size, path in results:
print(f"{size/1024/1024:.1f}MB\t{path}")
if __name__ == "__main__":
find_large_logs()Result (100 k files): Shell 12.3 s, ~5 MB memory; Python 8.7 s, ~45 MB memory.
Test 2 – Log Analysis Statistics
Task: Analyse an Nginx access log, report top 10 IPs, status‑code distribution, and the 99th‑percentile response time.
#!/bin/bash
# analyze_nginx_log.sh
LOG_FILE="$1"
echo "=== Top 10 IPs ==="
awk '{print $1}' "$LOG_FILE" | sort | uniq -c | sort -rn | head -10
echo ""
echo "=== Status Code Distribution ==="
awk '{print $9}' "$LOG_FILE" | sort | uniq -c | sort -rn
echo ""
echo "=== Response Time P99 ==="
# assume response time is the last column
awk '{print $NF}' "$LOG_FILE" | sort -n | awk '
{a[NR]=$1}
END {
p99_idx = int(NR*0.99)
print "P99: " a[p99_idx] "ms"
}' #!/usr/bin/env python3
# analyze_nginx_log.py
import re, sys
from collections import Counter
from statistics import quantiles
LOG_PATTERN = re.compile(
r'(?P<ip>\d+\.\d+\.\d+\.\d+)' # IP
r'.*?"(?P<method>\w+) (?P<path>[^ ]+)' # Method and Path
r'.*?" (?P<status>\d+)' # Status
r'.*?(?P<time>\d+\.?\d*)$' # Response time
)
def analyze_log(filename):
ip_counter = Counter()
status_counter = Counter()
response_times = []
with open(filename) as f:
for line in f:
m = LOG_PATTERN.search(line)
if m:
ip_counter[m.group('ip')] += 1
status_counter[m.group('status')] += 1
try:
response_times.append(float(m.group('time')))
except ValueError:
pass
print("=== Top 10 IPs ===")
for ip, cnt in ip_counter.most_common(10):
print(f"{cnt:>8} {ip}")
print("
=== Status Code Distribution ===")
for status, cnt in status_counter.most_common():
print(f"{cnt:>8} {status}")
print("
=== Response Time Percentiles ===")
if response_times:
q = quantiles(response_times, n=100)
print(f"P50: {q[49]:.2f}ms")
print(f"P90: {q[89]:.2f}ms")
print(f"P99: {q[98]:.2f}ms")
if __name__ == "__main__":
analyze_log(sys.argv[1])Result (1 GB log, ~5 M lines): Shell 4 m 32 s, ~50 MB; Python 1 m 15 s, ~200 MB.
Test 3 – Batch Server Operations
Task: Collect hostname, uptime, memory and disk usage from 200 CentOS 7 servers.
#!/bin/bash
# collect_info.sh
SERVERS_FILE="servers.txt"
OUTPUT_FILE="report.csv"
echo "hostname,uptime,mem_used_pct,disk_used_pct" > "$OUTPUT_FILE"
while read -r server; do
echo "Collecting from $server..."
info=$(ssh -o ConnectTimeout=5 -o StrictHostKeyChecking=no "$server" '
hostname=$(hostname)
uptime=$(uptime -p)
mem_used=$(free | awk "/Mem:/ {printf \"%.1f\", $3/$2*100}")
disk_used=$(df -h / | awk "NR==2 {print $5}" | tr -d "%")
echo "$hostname,$uptime,$mem_used,$disk_used"
' 2>/dev/null)
if [ -n "$info" ]; then
echo "$info" >> "$OUTPUT_FILE"
else
echo "$server,FAILED,N/A,N/A" >> "$OUTPUT_FILE"
fi
done < "$SERVERS_FILE" #!/bin/bash
# collect_info_parallel.sh
SERVERS_FILE="servers.txt"
OUTPUT_FILE="report.csv"
echo "hostname,uptime,mem_used_pct,disk_used_pct" > "$OUTPUT_FILE"
collect_server_info() {
server=$1
info=$(ssh -o ConnectTimeout=5 -o StrictHostKeyChecking=no "$server" '
hostname=$(hostname)
uptime=$(uptime -p)
mem_used=$(free | awk "/Mem:/ {printf \"%.1f\", $3/$2*100}")
disk_used=$(df -h / | awk "NR==2 {print $5}" | tr -d "%")
echo "$hostname,$uptime,$mem_used,$disk_used"
' 2>/dev/null)
if [ -n "$info" ]; then
echo "$info"
else
echo "$server,FAILED,N/A,N/A"
fi
}
export -f collect_server_info
cat "$SERVERS_FILE" | parallel -j 50 collect_server_info >> "$OUTPUT_FILE" #!/usr/bin/env python3
# collect_info.py
import asyncio, asyncssh, csv
from dataclasses import dataclass
from typing import Optional
@dataclass
class ServerInfo:
hostname: str
uptime: str
mem_used_pct: float
disk_used_pct: float
async def collect_from_server(host: str, timeout: int = 10) -> Optional[ServerInfo]:
try:
async with asyncssh.connect(host, username='root', known_hosts=None, connect_timeout=timeout) as conn:
result = await conn.run('''
hostname
uptime -p
free | awk '/Mem:/ {printf "%.1f", $3/$2*100}'
df -h / | awk 'NR==2 {print $5}' | tr -d '%'
''', check=True)
lines = result.stdout.strip().split('
')
return ServerInfo(lines[0], lines[1], float(lines[2]), float(lines[3]))
except Exception as e:
print(f"Failed to connect {host}: {e}")
return None
async def collect_all(servers, concurrency=50):
semaphore = asyncio.Semaphore(concurrency)
async def limited(host):
async with semaphore:
return host, await collect_from_server(host)
tasks = [limited(h) for h in servers]
return await asyncio.gather(*tasks)
def main():
with open('servers.txt') as f:
servers = [line.strip() for line in f if line.strip()]
results = asyncio.run(collect_all(servers))
with open('report.csv', 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['hostname','uptime','mem_used_pct','disk_used_pct'])
for host, info in results:
if info:
writer.writerow([info.hostname, info.uptime, info.mem_used_pct, info.disk_used_pct])
else:
writer.writerow([host, 'FAILED', 'N/A', 'N/A'])
if __name__ == '__main__':
main()Results: Shell serial 3 h 12 m; Shell+Parallel 12 m; Python asyncio 8 m.
Test 4 – Bulk Configuration Modification
Task: Disable PasswordAuthentication in sshd_config on 200 servers.
#!/bin/bash
# update_sshd.sh
SERVERS_FILE="servers.txt"
BACKUP_DIR="/tmp/sshd_backup"
mkdir -p "$BACKUP_DIR"
while read -r server; do
echo "Updating $server..."
ssh "$server" '
cp /etc/ssh/sshd_config /etc/ssh/sshd_config.bak
if grep -q "^PasswordAuthentication" /etc/ssh/sshd_config; then
sed -i "s/^PasswordAuthentication.*/PasswordAuthentication no/" /etc/ssh/sshd_config
else
echo "PasswordAuthentication no" >> /etc/ssh/sshd_config
fi
sshd -t && systemctl reload sshd
'
if [ $? -eq 0 ]; then
echo "$server: SUCCESS"
else
echo "$server: FAILED"
fi
done < "$SERVERS_FILE" #!/usr/bin/env python3
# update_sshd.py
from fabric import Connection
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
SSHD_CONFIG = "/etc/ssh/sshd_config"
def update_server(host: str):
try:
conn = Connection(host, user='root', connect_timeout=10)
conn.run(f"cp {SSHD_CONFIG} {SSHD_CONFIG}.bak")
result = conn.run(f"cat {SSHD_CONFIG}", hide=True)
config = result.stdout
if "PasswordAuthentication" in config:
conn.run(f"sed -i 's/^#*PasswordAuthentication.*/PasswordAuthentication no/' {SSHD_CONFIG}")
else:
conn.run(f"echo 'PasswordAuthentication no' >> {SSHD_CONFIG}")
result = conn.run("sshd -t", warn=True)
if result.failed:
conn.run(f"cp {SSHD_CONFIG}.bak {SSHD_CONFIG}")
return host, False, "Config validation failed"
conn.run("systemctl reload sshd")
result = conn.run(f"grep '^PasswordAuthentication' {SSHD_CONFIG}", hide=True)
if "no" not in result.stdout.lower():
return host, False, "Change not applied"
return host, True, "Success"
except Exception as e:
return host, False, str(e)
def main():
with open('servers.txt') as f:
servers = [line.strip() for line in f if line.strip()]
from concurrent.futures import ThreadPoolExecutor, as_completed
results = {'success': [], 'failed': []}
with ThreadPoolExecutor(max_workers=20) as executor:
futures = {executor.submit(update_server, h): h for h in servers}
for future in as_completed(futures):
host, ok, msg = future.result()
if ok:
results['success'].append(host)
logger.info(f"{host}: {msg}")
else:
results['failed'].append((host, msg))
logger.error(f"{host}: {msg}")
print("
=== Summary ===")
print(f"Success: {len(results['success'])}")
print(f"Failed: {len(results['failed'])}")
if results['failed']:
print("
Failed servers:")
for h, r in results['failed']:
print(f" {h}: {r}")
if __name__ == '__main__':
main()Both approaches work; Python (Fabric) provides richer logging and easier error handling, while the Shell version is shorter.
Scenario Analysis: When to Use Which
Suitable for Shell
Simple command composition (e.g., du -sh /* | sort -rh | head -10)
Quick system‑management tasks (batch killing processes, cleaning temp files, checking service status)
Pipeline processing of streaming data (real‑time log monitoring with tail | awk | uniq)
Suitable for Python
Complex data processing (JSON log aggregation, statistical analysis)
Concurrent bulk operations (asyncio, ThreadPoolExecutor, asyncssh)
Interacting with APIs or cloud services (boto3, requests)
Robust error handling and retry logic (tenacity, custom exceptions)
Long‑term maintainable scripts (structured code, logging, argument parsing)
Migration Strategy
Gradual Migration
Write new requirements directly in Python.
Prioritise rewriting complex scripts (>100 lines).
Migrate only when performance or maintainability becomes an issue.
Mixed Use
Python can invoke Shell commands when a one‑liner is sufficient:
import subprocess
code, out, err = subprocess.run('df -h', shell=True, capture_output=True, text=True).returncode, stdout, stderrConversely, Shell can call a Python script for heavy processing:
# Pre‑process with Shell
find /var/log -name "*.log" -mtime -1 > /tmp/logs.txt
# Complex analysis with Python
python3 analyze_logs.py /tmp/logs.txt > report.json
# Post‑process with Shell
cat report.json | jq -r '.summary' | mail -s "Daily Report" [email protected]Code Templates
A reusable Python ops‑script skeleton:
#!/usr/bin/env python3
"""Script description"""
import argparse, logging, sys
from pathlib import Path
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def parse_args():
p = argparse.ArgumentParser(description='Script description')
p.add_argument('-c', '--config', type=Path, help='Path to config file')
p.add_argument('-v', '--verbose', action='store_true', help='Enable debug output')
p.add_argument('--dry-run', action='store_true', help='Simulate execution')
return p.parse_args()
def main():
args = parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
logger.info('Starting execution...')
try:
# main logic here
pass
except KeyboardInterrupt:
logger.warning('Interrupted by user')
sys.exit(130)
except Exception as e:
logger.error(f'Execution failed: {e}')
sys.exit(1)
logger.info('Execution completed')
if __name__ == '__main__':
main()Pitfalls
Python Encoding Issues
# Wrong – may raise UnicodeDecodeError
with open('log.txt') as f:
content = f.read()
# Correct – specify encoding and error handling
with open('log.txt', encoding='utf-8', errors='replace') as f:
content = f.read()
# Or detect automatically with chardet
import chardet
with open('log.txt', 'rb') as f:
raw = f.read()
encoding = chardet.detect(raw)['encoding'] or 'utf-8'
content = raw.decode(encoding, errors='replace')SSH Connection Leaks
# Bad – client never closed
import paramiko
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(host)
stdin, stdout, stderr = client.exec_command('hostname')
# Good – use context manager
with paramiko.SSHClient() as client:
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(host)
stdin, stdout, stderr = client.exec_command('hostname')
hostname = stdout.read().decode().strip()Unbounded Concurrency
# Bad – thousands of concurrent connections
tasks = [check_host(h) for h in hosts]
await asyncio.gather(*tasks)
# Good – limit with semaphore
semaphore = asyncio.Semaphore(50)
async def limited_check(host):
async with semaphore:
return await check_host(host)
tasks = [limited_check(h) for h in hosts]
await asyncio.gather(*tasks)Zombie Subprocesses
# Bad – fire‑and‑forget
subprocess.Popen('some_cmd', shell=True)
# Good – track and clean up
import atexit, subprocess
background = []
def run_background(cmd):
proc = subprocess.Popen(cmd, shell=True)
background.append(proc)
return proc
@atexit.register
def cleanup():
for p in background:
p.terminate()
p.wait()Conclusion
Shell is not obsolete and Python is not a silver bullet. The key is to select the right tool for the problem: use Shell for quick, line‑oriented tasks and Python for complex logic, concurrency, API interaction, and long‑term maintainability.
Ops Community
A leading IT operations community where professionals share and grow together.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
