7 Ready‑to‑Use Python & Shell Scripts to Supercharge Your Ops
This article shares a curated collection of ready‑to‑run Python and Shell scripts—including Enterprise WeChat alerts, FTP and SSH clients, SaltStack and vCenter utilities, SSL certificate checks, weather notifications, SVN backups, Zabbix password monitoring, local YUM mirroring, and high‑load detection—complete with full source code and usage notes to help engineers automate routine tasks and boost operational efficiency.
Python Script Section
Enterprise WeChat alert
FTP client
SSH client
SaltStack client
vCenter client
SSL certificate expiration checker
Weather forecast and trend chart sender
1. Enterprise WeChat Alert
This script sends messages through the Enterprise WeChat API, making it easy to push Zabbix alerts.
# -*- coding: utf-8 -*-
import requests, json
class DLF:
def __init__(self, corpid, corpsecret):
self.url = "https://qyapi.weixin.qq.com/cgi-bin"
self.corpid = corpid
self.corpsecret = corpsecret
self._token = self._get_token()
def _get_token(self):
token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" % (self.corpid, self.corpsecret)
try:
res = requests.get(token_url).json()
return res['access_token']
except Exception as e:
return str(e)
def _get_media_id(self, file_obj):
media_url = self.url + "/media/upload?access_token=%s&type=file" % self._token
try:
res = requests.post(url=media_url, files={"media": file_obj})
return res.json()['media_id']
except Exception as e:
return str(e)
def send_text(self, agentid, content, touser=None, toparty=None):
url = self.url + "/message/send?access_token=%s" % self._token
data = {
"touser": touser,
"toparty": toparty,
"msgtype": "text",
"agentid": agentid,
"text": {"content": content}
}
try:
return requests.post(url, data=json.dumps(data)).json()
except Exception as e:
return str(e)
def send_image(self, agentid, file_obj, touser=None, toparty=None):
media_id = self._get_media_id(file_obj)
url = self.url + "/message/send?access_token=%s" % self._token
data = {
"touser": touser,
"toparty": toparty,
"msgtype": "image",
"agentid": agentid,
"image": {"media_id": media_id}
}
try:
return requests.post(url, data=json.dumps(data)).json()
except Exception as e:
return str(e)2. FTP Client
A lightweight wrapper around Python's ftplib that supports upload, download, directory listing, deletion, renaming and size queries.
# -*- coding: utf-8 -*-
from ftplib import FTP
import os, copy
class FTPClient:
def __init__(self, host, user, passwd, port=21):
self.host = host
self.user = user
self.passwd = passwd
self.port = port
self.res = {'status': True, 'msg': None}
self._ftp = None
self._login()
def _login(self):
try:
self._ftp = FTP()
self._ftp.connect(self.host, self.port, timeout=30)
self._ftp.login(self.user, self.passwd)
except Exception as e:
return e
def upload(self, localpath, remotepath=None):
if not localpath:
return 'Please select a local file.'
if not remotepath:
remotepath = os.path.basename(localpath)
self._ftp.storbinary('STOR ' + remotepath, open(localpath, 'rb'))
def download(self, remotepath, localpath=None):
if not remotepath:
return 'Please select a remote file.'
if not localpath:
localpath = os.path.basename(remotepath)
if os.path.isdir(localpath):
localpath = os.path.join(localpath, os.path.basename(remotepath))
with open(localpath, 'wb') as fp:
self._ftp.retrbinary('RETR ' + remotepath, fp.write)
def nlst(self, dir='/'):
return self._ftp.nlst(dir)
def rmd(self, dir=None):
if not dir:
return 'Please input dirname'
res = copy.deepcopy(self.res)
try:
res['msg'] = self._ftp.rmd(dir)
except Exception as e:
res['status'] = False
res['msg'] = str(e)
return res
def mkd(self, dir=None):
if not dir:
return 'Please input dirname'
res = copy.deepcopy(self.res)
try:
res['msg'] = self._ftp.mkd(dir)
except Exception as e:
res['status'] = False
res['msg'] = str(e)
return res
def del_file(self, filename=None):
if not filename:
return 'Please input filename'
res = copy.deepcopy(self.res)
try:
res['msg'] = self._ftp.delete(filename)
except Exception as e:
res['status'] = False
res['msg'] = str(e)
return res
def get_file_size(self, filenames=[]):
if not filenames:
return {'msg': 'This is an empty directory'}
result = []
for f in filenames:
info = {}
try:
size = self._ftp.size(f)
typ = 'f'
except Exception:
size = '-'
typ = 'd'
f = f + '/'
info['filename'] = f
info['size'] = size
info['type'] = typ
result.append(info)
return result
def rename(self, old_name=None, new_name=None):
if not old_name or not new_name:
return 'Please input old_name and new_name'
res = copy.deepcopy(self.res)
try:
res['msg'] = self._ftp.rename(old_name, new_name)
except Exception as e:
res['status'] = False
res['msg'] = str(e)
return res
def close(self):
try:
self._ftp.quit()
except Exception:
return 'No response from server'
finally:
self._ftp.close()3. SSH Client
Uses Paramiko to connect to a remote host via an SSH key and execute commands.
# -*- coding: utf-8 -*-
import paramiko
class SSHClient:
def __init__(self, host, port, user, pkey):
self.ssh_host = host
self.ssh_port = port
self.ssh_user = user
self.private_key = paramiko.RSAKey.from_private_key_file(pkey)
self.ssh = None
self._connect()
def _connect(self):
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
self.ssh.connect(hostname=self.ssh_host, port=self.ssh_port,
username=self.ssh_user, pkey=self.private_key, timeout=10)
except Exception:
return 'ssh connect fail'
def execute_command(self, command):
stdin, stdout, stderr = self.ssh.exec_command(command)
return stdout.read(), stderr.read()
def close(self):
self.ssh.close()4. SaltStack Client
Wraps SaltStack's REST API to obtain authentication tokens, list keys, query grains, and run commands.
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import requests, json, copy
class SaltApi:
def __init__(self):
self.url = "http://172.85.10.21:8000/"
self.username = "saltapi"
self.password = "saltapi"
self.headers = {"Content-type": "application/json"}
self.params = {'client': 'local', 'fun': None, 'tgt': None, 'arg': None}
self.login_url = self.url + "login"
self.login_params = {'username': self.username, 'password': self.password, 'eauth': 'pam'}
self.token = self.get_data(self.login_url, self.login_params)['token']
self.headers['X-Auth-Token'] = self.token
def get_data(self, url, params):
res = requests.post(url, data=json.dumps(params), headers=self.headers).json()
return res['return'][0]
def get_auth_keys(self):
data = copy.deepcopy(self.params)
data['client'] = 'wheel'
data['fun'] = 'key.list_all'
try:
return self.get_data(self.url, data)['data']['return']['minions']
except Exception as e:
return str(e)
def get_grains(self, tgt, arg='id'):
data = copy.deepcopy(self.params)
data['tgt'] = tgt if tgt else '*'
data['fun'] = 'grains.item'
data['arg'] = arg
return self.get_data(self.url, data)
def execute_command(self, tgt, fun='cmd.run', arg=None, tgt_type='list', salt_async=False):
if not tgt:
return {'status': False, 'msg': 'target host not exist'}
data = copy.deepcopy(self.params)
if not arg:
data.pop('arg')
else:
data['arg'] = arg
if tgt != '*':
data['tgt_type'] = tgt_type
if salt_async:
data['client'] = 'local_async'
data['fun'] = fun
data['tgt'] = tgt
return self.get_data(self.url, data)
def jobs(self, fun='detail', jid=None):
data = {'client': 'runner'}
if fun == 'detail':
if not jid:
return {'success': False, 'msg': 'job id is none'}
data['fun'] = 'jobs.lookup_jid'
data['jid'] = jid
else:
return {'success': False, 'msg': 'fun is active or detail'}
return self.get_data(self.url, data)5. vCenter Client
Uses pyVmomi to connect to a vCenter server, enumerate hosts, VMs and calculate free resources.
from pyVim.connect import SmartConnectNoSSL, Disconnect
from pyVmomi import vim
import atexit
class Vmware:
def __init__(self, ip, user, password, port, idc, vcenter_id):
self.ip = ip
self.user = user
self.password = password
self.port = port
self.idc_id = idc
self.vcenter_id = vcenter_id
def get_obj(self, content, vimtype, name=None):
container = content.viewManager.CreateContainerView(content.rootFolder, vimtype, True)
return list(container.view)
def get_esxi_info(self):
res = {'connect_status': True, 'msg': None}
try:
si = SmartConnectNoSSL(host=self.ip, user=self.user, pwd=self.password, port=self.port, connectionPoolTimeout=60)
except Exception as e:
res['connect_status'] = False
res['msg'] = str(e)
return res
atexit.register(Disconnect, si)
content = si.RetrieveContent()
esxi_objs = self.get_obj(content, [vim.HostSystem])
esxi_host = {}
for esxi in esxi_objs:
info = {
'idc_id': self.idc_id,
'vcenter_id': self.vcenter_id,
'server_ip': esxi.name,
'manufacturer': esxi.summary.hardware.vendor,
'server_model': esxi.summary.hardware.model,
'cpu_total': esxi.summary.hardware.numCpuThreads,
'memory_total_gb': esxi.summary.hardware.memorySize / 1024**3,
'disk_total_gb': sum(ds.summary.capacity for ds in esxi.datastore) / 1024**3,
'vm_host': []
}
for vm in esxi.vm:
vm_info = {
'vm_name': vm.name,
'power_status': vm.runtime.powerState,
'cpu_total_kernel': f"{vm.config.hardware.numCPU}核",
'memory_total': f"{vm.config.hardware.memoryMB}MB",
'system_info': vm.config.guestFullName,
'disk_info': ''
}
disk_total = 0
for d in vm.config.hardware.device:
if isinstance(d, vim.vm.device.VirtualDisk):
disk_total += d.capacityInKB / 1024**2
vm_info['disk_info'] += f"{d.deviceInfo.label}: {d.capacityInKB/1024**2:.2f} GB,"
vm_info['disk_info'] = vm_info['disk_info'].rstrip(',')
info['vm_host'].append(vm_info)
esxi_host[esxi.name] = info
esxi_host['connect_status'] = True
return esxi_hostShell Script Section
1. SVN Full Backup
Creates a daily hot‑copy of all SVN repositories and retains logs for seven days.
#!/bin/bash
# Filename : svn_backup_repos.sh
# Date : 2020/12/14
set -e
SRC_PATH="/opt/svndata"
DST_PATH="/data/svnbackup"
LOG_FILE="$DST_PATH/logs/svn_backup.log"
SVN_BACKUP_C="/bin/svnadmin hotcopy"
SVN_LOOK_C="/bin/svnlook youngest"
TODAY=$(date +'%F')
cd $SRC_PATH
ALL_REPOS=$(find ./ -maxdepth 1 -type d ! -name 'httpd' ! -name 'bak' | tr -d './')
mkdir -p $DST_PATH/logs $DST_PATH/$TODAY
for repo in $ALL_REPOS; do
$SVN_BACKUP_C $SRC_PATH/$repo $DST_PATH/$TODAY/$repo
if $SVN_LOOK_C $DST_PATH/$TODAY/$repo; then
echo "$TODAY: $repo Backup Success" >> $LOG_FILE
else
echo "$TODAY: $repo Backup Fail" >> $LOG_FILE
fi
done
cp -p authz access.conf $DST_PATH/$TODAY
mv $LOG_FILE $LOG_FILE-$TODAY
seven_days_ago=$(date -d "7 days ago" +'%F')
rm -rf $DST_PATH/$seven_days_ago2. Zabbix User Password Expiration Monitor
Generates a JSON payload for Zabbix low‑level discovery of users whose passwords will expire within seven days.
#!/bin/bash
users=($(awk -F':' '$NF ~ /\/bin\/bash|\/bin\/sh/ {print $1}' /etc/passwd))
len=${#users[@]}
printf '{
"data":[
'
for ((i=0;i<$len;i++)); do
printf '\t{\"{#USER_NAME}\":\"%s\"}' "${users[$i]}"
if [ $i -lt $((len-1)) ]; then
printf ','
fi
done
printf '
]
}
'
# Check password expiration for a given user
export LANG=en_US.UTF-8
SEVEN_DAYS_AGO=$(date -d '-7 day' +%s)
user="$1"
expires=$(chage -l $user | awk -F':' '/Password expires/ {print $2}' | sed 's/^ //')
if [[ "$expires" != "never" ]]; then
expires_ts=$(date -d "$expires" +%s)
if [ $expires_ts -le $SEVEN_DAYS_AGO ]; then
echo "1"
else
echo "0"
fi
else
echo "0"
fi3. Local YUM Repository Builder
Synchronizes multiple CentOS 6/7 repositories (Base, EPEL, SaltStack, Docker, MySQL) from a remote mirror using rsync and logs each operation.
#!/bin/bash
RsyncCommand="rsync -rvutH -P --delete --delete-after --delay-updates --bwlimit=1000"
DIR="/app/yumData"
LogDir="$DIR/logs"
Centos6Base="$DIR/Centos6/x86_64/Base"
Centos7Base="$DIR/Centos7/x86_64/Base"
# ... (other repo paths omitted for brevity)
MirrorDomain="rsync://rsync.mirrors.ustc.edu.cn"
check_dir(){ for d in "$@"; do test -d $d || mkdir -p $d; done }
check_rsync_status(){ if [ $? -eq 0 ]; then echo "rsync success" >> $1; else echo "rsync fail" >> $1; fi }
check_dir $DIR $LogDir $Centos6Base $Centos7Base
$RsyncCommand "$MirrorDomain/repo/centos/7/os/x86_64/" $Centos7Base >> "$LogDir/centos7Base.log" 2>&1
check_rsync_status "$LogDir/centos7Base.log"
# (similar commands for EPEL, Docker, MySQL, etc.)4. High‑Load Detection Script
Monitors system load averages against CPU core thresholds and logs the top CPU‑ and memory‑consuming processes, as well as disk I/O, when a threshold is exceeded.
#!/bin/bash
physical_cpu_count=$(egrep 'physical id' /proc/cpuinfo | sort -u | wc -l)
physical_cpu_cores=$(egrep 'cpu cores' /proc/cpuinfo | uniq | awk '{print $NF}')
total_cpu_cores=$((physical_cpu_count * physical_cpu_cores))
one_min_load_threshold=$total_cpu_cores
five_min_load_threshold=$(awk "BEGIN {print $total_cpu_cores * 0.8}")
fifteen_min_load_threshold=$(awk "BEGIN {print $total_cpu_cores * 0.7}")
one_min_load=$(uptime | awk '{print $(NF-2)}' | tr -d ',')
five_min_load=$(uptime | awk '{print $(NF-1)}' | tr -d ',')
fifteen_min_load=$(uptime | awk '{print $NF}' | tr -d ',')
get_info(){
log_dir="cpu_high_script_log"
mkdir -p $log_dir
ps -eo user,pid,%cpu,stat,time,command --sort -%cpu | head -10 > $log_dir/cpu_top10.log
ps -eo user,pid,%mem,rss,vsz,stat,time,command --sort -%mem | head -10 > $log_dir/mem_top10.log
iostat -dx 1 10 > $log_dir/disk_io_10.log
}
export -f get_info
if (( $(echo "$one_min_load >= $one_min_load_threshold" | bc -l) )) ||
(( $(echo "$five_min_load >= $five_min_load_threshold" | bc -l) )) ||
(( $(echo "$fifteen_min_load >= $fifteen_min_load_threshold" | bc -l) )); then
get_info
fiThe article concludes with a reminder that readers can adapt these scripts to their own environments, share additional examples, and support the author through likes, comments, and shares.
Efficient Ops
This public account is maintained by Xiaotianguo and friends, regularly publishing widely-read original technical articles. We focus on operations transformation and accompany you throughout your operations career, growing together happily.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.