Operations 30 min read

7 Ready‑to‑Use Python & Shell Scripts to Supercharge Your Ops

This article shares a curated collection of ready‑to‑run Python and Shell scripts—including Enterprise WeChat alerts, FTP and SSH clients, SaltStack and vCenter utilities, SSL certificate checks, weather notifications, SVN backups, Zabbix password monitoring, local YUM mirroring, and high‑load detection—complete with full source code and usage notes to help engineers automate routine tasks and boost operational efficiency.

Efficient Ops
Efficient Ops
Efficient Ops
7 Ready‑to‑Use Python & Shell Scripts to Supercharge Your Ops

Python Script Section

Enterprise WeChat alert

FTP client

SSH client

SaltStack client

vCenter client

SSL certificate expiration checker

Weather forecast and trend chart sender

1. Enterprise WeChat Alert

This script sends messages through the Enterprise WeChat API, making it easy to push Zabbix alerts.

# -*- coding: utf-8 -*-
import requests, json

class DLF:
    def __init__(self, corpid, corpsecret):
        self.url = "https://qyapi.weixin.qq.com/cgi-bin"
        self.corpid = corpid
        self.corpsecret = corpsecret
        self._token = self._get_token()

    def _get_token(self):
        token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" % (self.corpid, self.corpsecret)
        try:
            res = requests.get(token_url).json()
            return res['access_token']
        except Exception as e:
            return str(e)

    def _get_media_id(self, file_obj):
        media_url = self.url + "/media/upload?access_token=%s&type=file" % self._token
        try:
            res = requests.post(url=media_url, files={"media": file_obj})
            return res.json()['media_id']
        except Exception as e:
            return str(e)

    def send_text(self, agentid, content, touser=None, toparty=None):
        url = self.url + "/message/send?access_token=%s" % self._token
        data = {
            "touser": touser,
            "toparty": toparty,
            "msgtype": "text",
            "agentid": agentid,
            "text": {"content": content}
        }
        try:
            return requests.post(url, data=json.dumps(data)).json()
        except Exception as e:
            return str(e)

    def send_image(self, agentid, file_obj, touser=None, toparty=None):
        media_id = self._get_media_id(file_obj)
        url = self.url + "/message/send?access_token=%s" % self._token
        data = {
            "touser": touser,
            "toparty": toparty,
            "msgtype": "image",
            "agentid": agentid,
            "image": {"media_id": media_id}
        }
        try:
            return requests.post(url, data=json.dumps(data)).json()
        except Exception as e:
            return str(e)

2. FTP Client

A lightweight wrapper around Python's ftplib that supports upload, download, directory listing, deletion, renaming and size queries.

# -*- coding: utf-8 -*-
from ftplib import FTP
import os, copy

class FTPClient:
    def __init__(self, host, user, passwd, port=21):
        self.host = host
        self.user = user
        self.passwd = passwd
        self.port = port
        self.res = {'status': True, 'msg': None}
        self._ftp = None
        self._login()

    def _login(self):
        try:
            self._ftp = FTP()
            self._ftp.connect(self.host, self.port, timeout=30)
            self._ftp.login(self.user, self.passwd)
        except Exception as e:
            return e

    def upload(self, localpath, remotepath=None):
        if not localpath:
            return 'Please select a local file.'
        if not remotepath:
            remotepath = os.path.basename(localpath)
        self._ftp.storbinary('STOR ' + remotepath, open(localpath, 'rb'))

    def download(self, remotepath, localpath=None):
        if not remotepath:
            return 'Please select a remote file.'
        if not localpath:
            localpath = os.path.basename(remotepath)
        if os.path.isdir(localpath):
            localpath = os.path.join(localpath, os.path.basename(remotepath))
        with open(localpath, 'wb') as fp:
            self._ftp.retrbinary('RETR ' + remotepath, fp.write)

    def nlst(self, dir='/'):
        return self._ftp.nlst(dir)

    def rmd(self, dir=None):
        if not dir:
            return 'Please input dirname'
        res = copy.deepcopy(self.res)
        try:
            res['msg'] = self._ftp.rmd(dir)
        except Exception as e:
            res['status'] = False
            res['msg'] = str(e)
        return res

    def mkd(self, dir=None):
        if not dir:
            return 'Please input dirname'
        res = copy.deepcopy(self.res)
        try:
            res['msg'] = self._ftp.mkd(dir)
        except Exception as e:
            res['status'] = False
            res['msg'] = str(e)
        return res

    def del_file(self, filename=None):
        if not filename:
            return 'Please input filename'
        res = copy.deepcopy(self.res)
        try:
            res['msg'] = self._ftp.delete(filename)
        except Exception as e:
            res['status'] = False
            res['msg'] = str(e)
        return res

    def get_file_size(self, filenames=[]):
        if not filenames:
            return {'msg': 'This is an empty directory'}
        result = []
        for f in filenames:
            info = {}
            try:
                size = self._ftp.size(f)
                typ = 'f'
            except Exception:
                size = '-'
                typ = 'd'
                f = f + '/'
            info['filename'] = f
            info['size'] = size
            info['type'] = typ
            result.append(info)
        return result

    def rename(self, old_name=None, new_name=None):
        if not old_name or not new_name:
            return 'Please input old_name and new_name'
        res = copy.deepcopy(self.res)
        try:
            res['msg'] = self._ftp.rename(old_name, new_name)
        except Exception as e:
            res['status'] = False
            res['msg'] = str(e)
        return res

    def close(self):
        try:
            self._ftp.quit()
        except Exception:
            return 'No response from server'
        finally:
            self._ftp.close()

3. SSH Client

Uses Paramiko to connect to a remote host via an SSH key and execute commands.

# -*- coding: utf-8 -*-
import paramiko

class SSHClient:
    def __init__(self, host, port, user, pkey):
        self.ssh_host = host
        self.ssh_port = port
        self.ssh_user = user
        self.private_key = paramiko.RSAKey.from_private_key_file(pkey)
        self.ssh = None
        self._connect()

    def _connect(self):
        self.ssh = paramiko.SSHClient()
        self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        try:
            self.ssh.connect(hostname=self.ssh_host, port=self.ssh_port,
                             username=self.ssh_user, pkey=self.private_key, timeout=10)
        except Exception:
            return 'ssh connect fail'

    def execute_command(self, command):
        stdin, stdout, stderr = self.ssh.exec_command(command)
        return stdout.read(), stderr.read()

    def close(self):
        self.ssh.close()

4. SaltStack Client

Wraps SaltStack's REST API to obtain authentication tokens, list keys, query grains, and run commands.

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import requests, json, copy

class SaltApi:
    def __init__(self):
        self.url = "http://172.85.10.21:8000/"
        self.username = "saltapi"
        self.password = "saltapi"
        self.headers = {"Content-type": "application/json"}
        self.params = {'client': 'local', 'fun': None, 'tgt': None, 'arg': None}
        self.login_url = self.url + "login"
        self.login_params = {'username': self.username, 'password': self.password, 'eauth': 'pam'}
        self.token = self.get_data(self.login_url, self.login_params)['token']
        self.headers['X-Auth-Token'] = self.token

    def get_data(self, url, params):
        res = requests.post(url, data=json.dumps(params), headers=self.headers).json()
        return res['return'][0]

    def get_auth_keys(self):
        data = copy.deepcopy(self.params)
        data['client'] = 'wheel'
        data['fun'] = 'key.list_all'
        try:
            return self.get_data(self.url, data)['data']['return']['minions']
        except Exception as e:
            return str(e)

    def get_grains(self, tgt, arg='id'):
        data = copy.deepcopy(self.params)
        data['tgt'] = tgt if tgt else '*'
        data['fun'] = 'grains.item'
        data['arg'] = arg
        return self.get_data(self.url, data)

    def execute_command(self, tgt, fun='cmd.run', arg=None, tgt_type='list', salt_async=False):
        if not tgt:
            return {'status': False, 'msg': 'target host not exist'}
        data = copy.deepcopy(self.params)
        if not arg:
            data.pop('arg')
        else:
            data['arg'] = arg
        if tgt != '*':
            data['tgt_type'] = tgt_type
        if salt_async:
            data['client'] = 'local_async'
        data['fun'] = fun
        data['tgt'] = tgt
        return self.get_data(self.url, data)

    def jobs(self, fun='detail', jid=None):
        data = {'client': 'runner'}
        if fun == 'detail':
            if not jid:
                return {'success': False, 'msg': 'job id is none'}
            data['fun'] = 'jobs.lookup_jid'
            data['jid'] = jid
        else:
            return {'success': False, 'msg': 'fun is active or detail'}
        return self.get_data(self.url, data)

5. vCenter Client

Uses pyVmomi to connect to a vCenter server, enumerate hosts, VMs and calculate free resources.

from pyVim.connect import SmartConnectNoSSL, Disconnect
from pyVmomi import vim
import atexit

class Vmware:
    def __init__(self, ip, user, password, port, idc, vcenter_id):
        self.ip = ip
        self.user = user
        self.password = password
        self.port = port
        self.idc_id = idc
        self.vcenter_id = vcenter_id

    def get_obj(self, content, vimtype, name=None):
        container = content.viewManager.CreateContainerView(content.rootFolder, vimtype, True)
        return list(container.view)

    def get_esxi_info(self):
        res = {'connect_status': True, 'msg': None}
        try:
            si = SmartConnectNoSSL(host=self.ip, user=self.user, pwd=self.password, port=self.port, connectionPoolTimeout=60)
        except Exception as e:
            res['connect_status'] = False
            res['msg'] = str(e)
            return res
        atexit.register(Disconnect, si)
        content = si.RetrieveContent()
        esxi_objs = self.get_obj(content, [vim.HostSystem])
        esxi_host = {}
        for esxi in esxi_objs:
            info = {
                'idc_id': self.idc_id,
                'vcenter_id': self.vcenter_id,
                'server_ip': esxi.name,
                'manufacturer': esxi.summary.hardware.vendor,
                'server_model': esxi.summary.hardware.model,
                'cpu_total': esxi.summary.hardware.numCpuThreads,
                'memory_total_gb': esxi.summary.hardware.memorySize / 1024**3,
                'disk_total_gb': sum(ds.summary.capacity for ds in esxi.datastore) / 1024**3,
                'vm_host': []
            }
            for vm in esxi.vm:
                vm_info = {
                    'vm_name': vm.name,
                    'power_status': vm.runtime.powerState,
                    'cpu_total_kernel': f"{vm.config.hardware.numCPU}核",
                    'memory_total': f"{vm.config.hardware.memoryMB}MB",
                    'system_info': vm.config.guestFullName,
                    'disk_info': ''
                }
                disk_total = 0
                for d in vm.config.hardware.device:
                    if isinstance(d, vim.vm.device.VirtualDisk):
                        disk_total += d.capacityInKB / 1024**2
                        vm_info['disk_info'] += f"{d.deviceInfo.label}: {d.capacityInKB/1024**2:.2f} GB,"
                vm_info['disk_info'] = vm_info['disk_info'].rstrip(',')
                info['vm_host'].append(vm_info)
            esxi_host[esxi.name] = info
        esxi_host['connect_status'] = True
        return esxi_host

Shell Script Section

1. SVN Full Backup

Creates a daily hot‑copy of all SVN repositories and retains logs for seven days.

#!/bin/bash
# Filename   : svn_backup_repos.sh
# Date       : 2020/12/14
set -e
SRC_PATH="/opt/svndata"
DST_PATH="/data/svnbackup"
LOG_FILE="$DST_PATH/logs/svn_backup.log"
SVN_BACKUP_C="/bin/svnadmin hotcopy"
SVN_LOOK_C="/bin/svnlook youngest"
TODAY=$(date +'%F')
cd $SRC_PATH
ALL_REPOS=$(find ./ -maxdepth 1 -type d ! -name 'httpd' ! -name 'bak' | tr -d './')
mkdir -p $DST_PATH/logs $DST_PATH/$TODAY
for repo in $ALL_REPOS; do
    $SVN_BACKUP_C $SRC_PATH/$repo $DST_PATH/$TODAY/$repo
    if $SVN_LOOK_C $DST_PATH/$TODAY/$repo; then
        echo "$TODAY: $repo Backup Success" >> $LOG_FILE
    else
        echo "$TODAY: $repo Backup Fail" >> $LOG_FILE
    fi
done
cp -p authz access.conf $DST_PATH/$TODAY
mv $LOG_FILE $LOG_FILE-$TODAY
seven_days_ago=$(date -d "7 days ago" +'%F')
rm -rf $DST_PATH/$seven_days_ago

2. Zabbix User Password Expiration Monitor

Generates a JSON payload for Zabbix low‑level discovery of users whose passwords will expire within seven days.

#!/bin/bash
users=($(awk -F':' '$NF ~ /\/bin\/bash|\/bin\/sh/ {print $1}' /etc/passwd))
len=${#users[@]}
printf '{
"data":[
'
for ((i=0;i<$len;i++)); do
    printf '\t{\"{#USER_NAME}\":\"%s\"}' "${users[$i]}"
    if [ $i -lt $((len-1)) ]; then
        printf ','
    fi
done
printf '
]
}
'
# Check password expiration for a given user
export LANG=en_US.UTF-8
SEVEN_DAYS_AGO=$(date -d '-7 day' +%s)
user="$1"
expires=$(chage -l $user | awk -F':' '/Password expires/ {print $2}' | sed 's/^ //')
if [[ "$expires" != "never" ]]; then
    expires_ts=$(date -d "$expires" +%s)
    if [ $expires_ts -le $SEVEN_DAYS_AGO ]; then
        echo "1"
    else
        echo "0"
    fi
else
    echo "0"
fi

3. Local YUM Repository Builder

Synchronizes multiple CentOS 6/7 repositories (Base, EPEL, SaltStack, Docker, MySQL) from a remote mirror using rsync and logs each operation.

#!/bin/bash
RsyncCommand="rsync -rvutH -P --delete --delete-after --delay-updates --bwlimit=1000"
DIR="/app/yumData"
LogDir="$DIR/logs"
Centos6Base="$DIR/Centos6/x86_64/Base"
Centos7Base="$DIR/Centos7/x86_64/Base"
# ... (other repo paths omitted for brevity)
MirrorDomain="rsync://rsync.mirrors.ustc.edu.cn"
check_dir(){ for d in "$@"; do test -d $d || mkdir -p $d; done }
check_rsync_status(){ if [ $? -eq 0 ]; then echo "rsync success" >> $1; else echo "rsync fail" >> $1; fi }
check_dir $DIR $LogDir $Centos6Base $Centos7Base
$RsyncCommand "$MirrorDomain/repo/centos/7/os/x86_64/" $Centos7Base >> "$LogDir/centos7Base.log" 2>&1
check_rsync_status "$LogDir/centos7Base.log"
# (similar commands for EPEL, Docker, MySQL, etc.)

4. High‑Load Detection Script

Monitors system load averages against CPU core thresholds and logs the top CPU‑ and memory‑consuming processes, as well as disk I/O, when a threshold is exceeded.

#!/bin/bash
physical_cpu_count=$(egrep 'physical id' /proc/cpuinfo | sort -u | wc -l)
physical_cpu_cores=$(egrep 'cpu cores' /proc/cpuinfo | uniq | awk '{print $NF}')
total_cpu_cores=$((physical_cpu_count * physical_cpu_cores))
one_min_load_threshold=$total_cpu_cores
five_min_load_threshold=$(awk "BEGIN {print $total_cpu_cores * 0.8}")
fifteen_min_load_threshold=$(awk "BEGIN {print $total_cpu_cores * 0.7}")
one_min_load=$(uptime | awk '{print $(NF-2)}' | tr -d ',')
five_min_load=$(uptime | awk '{print $(NF-1)}' | tr -d ',')
fifteen_min_load=$(uptime | awk '{print $NF}' | tr -d ',')
get_info(){
    log_dir="cpu_high_script_log"
    mkdir -p $log_dir
    ps -eo user,pid,%cpu,stat,time,command --sort -%cpu | head -10 > $log_dir/cpu_top10.log
    ps -eo user,pid,%mem,rss,vsz,stat,time,command --sort -%mem | head -10 > $log_dir/mem_top10.log
    iostat -dx 1 10 > $log_dir/disk_io_10.log
}
export -f get_info
if (( $(echo "$one_min_load >= $one_min_load_threshold" | bc -l) )) ||
   (( $(echo "$five_min_load >= $five_min_load_threshold" | bc -l) )) ||
   (( $(echo "$fifteen_min_load >= $fifteen_min_load_threshold" | bc -l) )); then
    get_info
fi

The article concludes with a reminder that readers can adapt these scripts to their own environments, share additional examples, and support the author through likes, comments, and shares.

MonitoringPythonautomationopsshellScripts
Efficient Ops
Written by

Efficient Ops

This public account is maintained by Xiaotianguo and friends, regularly publishing widely-read original technical articles. We focus on operations transformation and accompany you throughout your operations career, growing together happily.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.