Operations 20 min read

Python Server Resource Monitoring Scripts with Email and WeChat Alerting

This article describes a Python‑based solution that monitors CPU, memory, disk usage and network traffic on a server and multiple client machines, stores alerts in MySQL, and sends notifications via email and an Enterprise WeChat robot when predefined thresholds are exceeded.

Sohu Tech Products
Sohu Tech Products
Sohu Tech Products
Python Server Resource Monitoring Scripts with Email and WeChat Alerting

This article presents a Python‑based monitoring solution that continuously checks CPU, memory, disk usage and network traffic on a server and multiple client machines, generates warning messages when thresholds are exceeded, stores the alerts in MySQL, and sends notifications via email and an Enterprise WeChat robot.

Server‑side script (run on a fixed‑IP host) implements logging, a TCP socket handler, analysis of received metrics, threshold checks, MySQL insertion functions and alert‑sending helpers.

# -*- coding:utf-8 -*-
import io
import os
import sys
import logging
from logging import handlers
import MySQLdb
import smtplib
from email.mime.text import MIMEText
from email.header import Header
from email.utils import formataddr
import requests, json
import datetime
import time
import shutil,re
import uuid
import socket
import SocketServer

if sys.getdefaultencoding() != 'utf-8':
    reload(sys)
    sys.setdefaultencoding('utf-8')

class Logger(object):
    level_relations = {
        'debug': logging.DEBUG,
        'info': logging.INFO,
        'warning': logging.WARNING,
        'error': logging.ERROR,
        'crit': logging.CRITICAL
    }  # 日志级别关系映射

    def __init__(self,logname, level='info', when='D', backCount=10, fmt='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'):
        CURRENT_DIR = os.path.dirname(__file__)
        LOG_FILE = os.path.abspath(os.path.join(CURRENT_DIR, logname))
        self.logger = logging.getLogger(LOG_FILE)
        format_str = logging.Formatter(fmt)  # 设置日志格式
        self.logger.setLevel(self.level_relations.get(level))  # 设置日志级别
        sh = logging.StreamHandler()  # 往屏幕上输出
        sh.setFormatter(format_str)  # 设置屏幕上显示的格式
        th = handlers.TimedRotatingFileHandler(
            filename=LOG_FILE, when=when, backupCount=backCount, encoding='utf-8')  # 往文件里写入#指定间隔时间自动生成文件的处理器
        th.setFormatter(format_str)  # 设置文件里写入的格式
        if not self.logger.handlers:
            self.logger.addHandler(th)

class Analysis(object):
    def buildMsg(self,msg):
        print('构造预警信息'+str(msg))
        icount = 0
        if(float(msg[4]) > 90):
            icount+=1
            CPU="> CPU预警:使用率高于90%,使用"+str(msg[4])+"% \n"
        else:
            CPU=""
        if(float(msg[5]) > 90):
            icount+=1
            mem="> 内存预警:使用率高于90%,使用"+str(msg[5])+"% \n"
        else:
            mem=""
        if(float(msg[6]) > 85):
            icount+=1
            disk_root="> 磁盘根目录预警:使用率高于85%,使用"+str(msg[6])+"% \n"
        else:
            disk_root=""
        if(float(msg[7]) > 85):
            icount+=1
            disk_appslog="> 业务磁盘预警:使用率高于85%,使用"+str(msg[7])+"% \n"
        else:
            disk_appslog=""
        if(float(msg[8]) > 3000):
            icount+=1
            networkRecv="> 网卡10秒内接收数据预警:接收数据大于4000M,接收"+str(msg[8])+"M \n"
        else:
            networkRecv=""
        if(float(msg[9]) > 3000):
            icount+=1
            networkSend="> 网卡10秒内发送数据预警:发送数据大于4000M,发送"+str(msg[9])+"M \n"
        else:
            networkSend=""
        s= alarmName+"\n"+msg[2]+":" +msg[3]+"\n" +CPU+mem+disk_root+disk_appslog+networkRecv+networkSend
        log.logger.info('预警信息:'+s)
        if(icount>0):
            if mailconf ==1:
                self.send_mail(s,msg[3])
            if wxconf ==1:
                self.send_WX(s)

    def send_mail(self,content,ip):
        smtpserver = 'smtp.163.com'
        mail_user="[email protected]"
        mail_pass="passwordxxx"
        mail_res=["[email protected]","[email protected]","[email protected]","[email protected]","[email protected]","[email protected]","[email protected]"]
        sub = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
        msg = MIMEText( sub +"\n"+content, _subtype='plain',_charset='utf-8')
        msg['Subject'] = Header(alarmName+':'+ip, 'utf-8' )
        msg['From'] = formataddr(pair=('设备预警', mail_user))
        msg['To'] = ', '.join(mail_res)
        smtp  = smtplib.SMTP()
        smtp.connect(smtpserver)
        smtp.starttls()
        smtp.login(mail_user, mail_pass)
        smtp.sendmail(mail_user, mail_res, msg.as_string())
        smtp.quit()

    def send_WX(self,msg):
        headers = {"Content-Type": "text/plain"}
        data = {
            "msgtype": "text",
            "text": {
                "content":  msg,
            }
        }
        r = requests.post(
            url='企业微信机器人地址(需要根据实际机器人地址配置)',
            headers=headers, json=data)
        print(r.text)

    def Write_to_Mysql_alarm(self,valuelist):
        try:
            db = MySQLdb.connect("xxx", "xxx", "xxx", "xxx", charset='utf8' )
            log.logger.info("数据库连接成功") 
        except:
            log.logger.info("数据库连接失败") 
        cursor = db.cursor()
        uid = uuid.uuid1()
        result=0
        sql=''
        try:
            sql = 'insert into test_serverresourcealarm values (%s, %s, %s, %s,%s, %s, %s, %s, %s, %s, %s, %s, %s,%s)'
            val = (str(uid),valuelist[2], valuelist[3], valuelist[4], valuelist[5],valuelist[6], valuelist[7],'',valuelist[8], valuelist[9],valuelist[10],'','','')
            cursor.execute(sql,val)
            db.commit()
            log.logger.error('设备预警信息已入库!')
            self.buildMsg(valuelist)
        except:
            into = sys.exc_info()
            log.logger.error('设备预警信息入库失败!'+str(into))
            result=0
        db.close()
        return result

    def Write_to_Mysql_temp(self,valuelist):
        try:
            db = MySQLdb.connect("xxx", "xxx", "xxx", "xxx", charset='utf8' )
            log.logger.info("数据库连接成功") 
        except:
            log.logger.info("数据库连接失败") 
        cursor = db.cursor()
        uid = uuid.uuid1()
        result=0
        try:
            sql = 'insert into test_serverresourcetemp values (%s, %s, %s, %s,%s, %s, %s, %s, %s, %s, %s, %s, %s,%s)'
            val = (str(uid),valuelist[2], valuelist[3], valuelist[4], valuelist[5],valuelist[6], valuelist[7],'',valuelist[8], valuelist[9],valuelist[10],'','', '')
            cursor.execute(sql,val)
            db.commit()
            result=1
            log.logger.info("临时表sql执行状态:"+str(result))
        except:
            into = sys.exc_info()
            result = 0
            print(into)
            log.logger.info('临时表sql执行失败: '+str(into))
        db.close()
        return result

class MyServer(SocketServer.BaseRequestHandler):
    def handle(self):
        conn = self.request
        log.logger.info('... connected from {}'.format(self.client_address))
        Flag = True
        while Flag:
            data = conn.recv(1024)
            if len(data)>10:
                log.logger.info('接收到的客户端数据:'+data)
                conn.sendall('1')
                sub = data.strip('\n')
                str = sub.split('|') 
                a = Analysis()
                result = a.Write_to_Mysql_temp(str)
                if(float(str[4])>90 or float(str[5])>90 or float(str[6])>85 or float(str[7])>85 or float(str[8])>3000 or float(str[9])>3000):
                    result1 = a.Write_to_Mysql_alarm(str)
                if result == 0:
                    log.logger.info('预警信息入库失败!')
                else:
                    log.logger.info('预警信息入库完成!')
            if data =='exit':
                log.logger.info('... connecte end ...')
                Flag = False

if __name__ == "__main__":
    log = Logger('socketservice.logs')
    log.logger.info('----start----')
    alarmName ='服务器资源预警'
    mailconf =1
    wxconf =0
    server = SocketServer.ThreadingTCPServer(('IP',port),MyServer)
    server.serve_forever()

Client‑side script (run on each monitored host) collects the same metrics using psutil and system calls, evaluates the same thresholds, logs the data, formats a pipe‑separated message and transmits it to the server over a TCP socket.

# -*- coding:utf-8 -*-
import io
import os
import sys
import time
import datetime
import socket
import commands
import logging
from logging import handlers
import psutil
import struct
import fcntl

if sys.getdefaultencoding() != 'utf-8':
    reload(sys)
    sys.setdefaultencoding('utf-8')

class Logger(object):
    level_relations = {
        'debug': logging.DEBUG,
        'info': logging.INFO,
        'warning': logging.WARNING,
        'error': logging.ERROR,
        'crit': logging.CRITICAL
    }
    def __init__(self,logname, level='info', when='D', backCount=10, fmt='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'):
        CURRENT_DIR = os.path.dirname(__file__)
        LOG_FILE = os.path.abspath(os.path.join(CURRENT_DIR, logname))
        self.logger = logging.getLogger(LOG_FILE)
        format_str = logging.Formatter(fmt)
        self.logger.setLevel(self.level_relations.get(level))
        sh = logging.StreamHandler()
        sh.setFormatter(format_str)
        th = handlers.TimedRotatingFileHandler(filename=LOG_FILE, when=when, backupCount=backCount, encoding='utf-8')
        th.setFormatter(format_str)
        if not self.logger.handlers:
            self.logger.addHandler(th)

class clientMonitor(object):
    def getIpAddress(self,dev):
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        a = s.fileno()
        b = 0x8915
        c = struct.pack('256s', dev[:15])
        res = fcntl.ioctl(a, b, c)[20:24]
        return socket.inet_ntoa(res)

    def readNetInfo(self,dev):
        f = open('/proc/net/dev')
        lines = f.readlines()
        f.close()
        res = {'in':0, 'out':0}
        for line in lines:
            if line.lstrip().startswith(dev):
                line = line.replace(':',' ')
                items = line.split()
                res['in'] = long(items[1])
                res['out'] = long(items[len(items)/2 + 1])
        return res

    def readNetInfo_new(self,dev):
        res = {'in':0, 'out':0}
        res['in'] = psutil.net_io_counters(pernic=True).get(dev).bytes_recv
        res['out'] = psutil.net_io_counters(pernic=True).get(dev).bytes_sent
        return res

    def disk_stat(self,path):
        hd={}
        disk = os.statvfs(path)
        percent = (disk.f_blocks - disk.f_bfree) * 100 / (disk.f_blocks - disk.f_bfree + disk.f_bavail) + 1
        return percent

    def net_loop(self,dev):
        res =  self.readNetInfo_new(dev)
        time.sleep(2)
        new_res =  self.readNetInfo_new(dev)
        recv_data = (new_res['in']-res['in'])/1024/1024
        send_data = (new_res['out']-res['out'])/1024/1024
        print ("recv_data: %s M, send_data: %s M"%(recv_data, send_data))
        return recv_data,send_data

    def processcheck(self,cmd):
        (status,output) = commands.getstatusoutput(cmd)
        log.logger.info('资源占用top:\n'+output)

if __name__ == "__main__":
    custom ='test'
    deviceType ='客户端服务器'
    netName = 'ens3f0'
    log = Logger('socketclient.logs')
    log.logger.info("----start----")
    info=clientMonitor()
    locatIp = info.getIpAddress(netName)
    recv_data,send_data = info.net_loop(netName)
    cpuinfo = psutil.cpu_percent(1)
    svmem = psutil.virtual_memory()
    meminfo =  svmem[2]
    disk_root = info.disk_stat('/')
    disk_appslog = info.disk_stat('/appslog')
    disk_bigdata = info.disk_stat('/bigdata')
    issendmsg =1
    if(cpuinfo>80 or meminfo>80 or disk_root>80 or disk_appslog>80 or disk_bigdata>80 or recv_data>3000 or send_data>3000):
        sendmsg=locatIp +' 服务器资源占用高!请检查!\n'
        sendmsg += "CPU占用:"+str(cpuinfo)+'\n'
        sendmsg += "内存占用:"+str(meminfo)+'\n'
        sendmsg += "/目录占用:"+str(disk_root)+'\n'
        sendmsg += "/appslog目录占用:"+str(disk_appslog)+'\n'
        sendmsg += "/bigdata目录占用:"+str(disk_bigdata)+'\n'
        sendmsg += "网卡接收流量:"+str(recv_data)+"M,发送流量 "+str(send_data)+"M \n"
        log.logger.info(sendmsg)
        if cpuinfo>80 :
            info.processcheck('ps -aux | sort -k3nr | head -10')
        if meminfo>80 :
            info.processcheck('ps -aux | sort -k4nr | head -10')
        issendmsg = 1
    else:
        log.logger.info("CPU使用率:"+str(cpuinfo))
        log.logger.info("内存使用率:"+str(meminfo))
        log.logger.info("/目录使用率:"+str(disk_root))
        log.logger.info("/appslog使用率:"+str(disk_appslog))
        log.logger.info("/bigdata使用率:"+str(disk_bigdata))
        log.logger.info("网卡接收和发送情况:接收"+str(recv_data) +"M, 发送 "+str(send_data)+"M")
    msg = '1'+'|'+custom+'|'+deviceType+'|'+locatIp+'|'+str(cpuinfo)+'|'+str(meminfo)+'|'+str(disk_root)+'|'+str(disk_appslog)+'|'+str(disk_bigdata)+'|'+str(recv_data)+'|'+str(send_data)+'|'+time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    if issendmsg ==1:
        ip_port = ('IP',port)
        sk = socket.socket()
        sk.connect(ip_port)
        sk.sendall(msg)
        data = sk.recv(1024)
        if data=='1':
            log.logger.info("本地预警信息传输成功!")
        else:
            log.logger.info("本地预警信息传输失败!")
        sk.sendall('exit')
        sk.close()

After deploying both scripts, install the required psutil package (e.g., via the provided RPM), and schedule the client script to run every two hours with a crontab entry.

crontab -e
0 */2 * * * cd /opt/jiaoben;python test_socket_resourcemonitor.py

Sample log output and screenshots illustrate successful monitoring and alert transmission.

Pythonoperationsserver monitoringcrontabemail-alertresource-alertwechat bot
Sohu Tech Products
Written by

Sohu Tech Products

A knowledge-sharing platform for Sohu's technology products. As a leading Chinese internet brand with media, video, search, and gaming services and over 700 million users, Sohu continuously drives tech innovation and practice. We’ll share practical insights and tech news here.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.