Operations 9 min read

Python Scheduled Tasks: Automation Examples for Greeting, Backup, Monitoring, and More

This article presents a collection of Python code examples that demonstrate how to schedule recurring tasks such as printing greetings, backing up files, scraping data, cleaning caches, sending emails, monitoring servers, updating databases, uploading to cloud storage, executing scripts, and analyzing logs.

Test Development Learning Exchange
Test Development Learning Exchange
Test Development Learning Exchange
Python Scheduled Tasks: Automation Examples for Greeting, Backup, Monitoring, and More

The following Python snippets illustrate how to implement common scheduled automation tasks using simple loops and the time module, suitable for backend operations and system maintenance.

Timed greeting

import time

def greet():
    print("Hello, it's time for your daily reminder!")

# 每隔5秒打印一次问候语
while True:
    greet()
    time.sleep(5)

Timed file backup

import os
import shutil
import time

def backup_file(source, destination):
    shutil.copy2(source, destination)
    print(f"File {source} has been backed up to {destination}")

# 每天凌晨2点备份文件
backup_time = 2 * 3600  # 2 hours after midnight
while True:
    current_time = time.localtime().tm_hour * 3600 + time.localtime().tm_min * 60 + time.localtime().tm_sec
    if current_time >= backup_time:
        backup_file("/path/to/source", "/path/to/destination")
        time.sleep(24 * 3600)  # Sleep for one day
    else:
        time.sleep(60)  # Check every minute

Timed web data fetch

import requests
import time
from bs4 import BeautifulSoup

def fetch_data(url):
    response = requests.get(url)
    soup = BeautifulSoup(response.text, 'html.parser')
    data = soup.find('some', 'tag')
    print(data.text.strip())

# 每小时抓取一次数据
while True:
    fetch_data("http://example.com/data")
    time.sleep(3600)  # Sleep for one hour

Timed cache cleaning

import os
import time

def clean_cache(directory):
    for filename in os.listdir(directory):
        file_path = os.path.join(directory, filename)
        if os.path.isfile(file_path):
            os.remove(file_path)
            print(f"Removed {file_path}")

# 每周清理一次缓存
clean_time = 7 * 24 * 3600  # 7 days
last_clean = 0
while True:
    if time.time() - last_clean > clean_time:
        clean_cache("/path/to/cache/directory")
        last_clean = time.time()
    time.sleep(60)  # Check every minute

Timed email reminder

import smtplib
import time
from email.mime.text import MIMEText

def send_email(subject, body, to_addr):
    msg = MIMEText(body)
    msg['Subject'] = subject
    msg['From'] = '[email protected]'
    msg['To'] = to_addr
    server = smtplib.SMTP('smtp.example.com', 587)
    server.starttls()
    server.login("[email protected]", "your-password")
    server.sendmail('[email protected]', [to_addr], msg.as_string())
    server.quit()
    print(f"Email sent to {to_addr}")

# 每天早上9点发送邮件
email_time = 9 * 3600  # 9 hours after midnight
while True:
    current_time = time.localtime().tm_hour * 3600 + time.localtime().tm_min * 60 + time.localtime().tm_sec
    if current_time >= email_time:
        send_email("Daily Reminder", "This is your daily reminder!", "[email protected]")
        time.sleep(24 * 3600)  # Sleep for one day
    else:
        time.sleep(60)  # Check every minute

Timed server status monitoring

import requests
import time

def check_server_status(url):
    try:
        response = requests.get(url, timeout=5)
        if response.status_code == 200:
            print(f"Server {url} is UP")
        else:
            print(f"Server {url} responded with status code {response.status_code}")
    except requests.exceptions.RequestException as e:
        print(f"Failed to connect to {url}: {e}")

# 每10分钟检查一次
while True:
    check_server_status("http://example.com/status")
    time.sleep(600)  # Sleep for 10 minutes

Timed database record update

import pymysql
import time

def update_record(db, table, column, value, condition):
    conn = pymysql.connect(host='localhost', user='root', password='', db=db)
    with conn.cursor() as cursor:
        cursor.execute(f"UPDATE {table} SET {column} = %s WHERE {condition}", (value,))
        conn.commit()
        print(f"Updated record in {table}")
    conn.close()

# 每天晚上11点更新记录
update_time = 23 * 3600  # 23 hours after midnight
while True:
    current_time = time.localtime().tm_hour * 3600 + time.localtime().tm_min * 60 + time.localtime().tm_sec
    if current_time >= update_time:
        update_record("mydb", "users", "status", "active", "id = 1")
        time.sleep(24 * 3600)  # Sleep for one day
    else:
        time.sleep(60)  # Check every minute

Timed upload to cloud storage (AWS S3)

import boto3
import time

def upload_to_s3(file_path, bucket_name, object_name):
    s3 = boto3.client('s3')
    s3.upload_file(file_path, bucket_name, object_name)
    print(f"File {file_path} uploaded to S3 bucket {bucket_name}")

# 每天下午3点上传文件
upload_time = 15 * 3600  # 15 hours after midnight
while True:
    current_time = time.localtime().tm_hour * 3600 + time.localtime().tm_min * 60 + time.localtime().tm_sec
    if current_time >= upload_time:
        upload_to_s3("/path/to/local/file", "my-bucket", "remote/path/to/file")
        time.sleep(24 * 3600)  # Sleep for one day
    else:
        time.sleep(60)  # Check every minute

Timed script execution

import subprocess
import time

def run_script(script_path):
    subprocess.run(["python", script_path])
    print(f"Script {script_path} has been executed")

# 每天中午12点执行脚本
run_time = 12 * 3600  # 12 hours after midnight
while True:
    current_time = time.localtime().tm_hour * 3600 + time.localtime().tm_min * 60 + time.localtime().tm_sec
    if current_time >= run_time:
        run_script("/path/to/script.py")
        time.sleep(24 * 3600)  # Sleep for one day
    else:
        time.sleep(60)  # Check every minute

Timed log file analysis

import time
import re

def analyze_log(log_file):
    pattern = r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) - \[(.*?)\] "(GET|POST) (.*?) HTTP.*?" (\d{3}) (\d+)'
    with open(log_file, 'r') as file:
        content = file.read()
    matches = re.findall(pattern, content)
    for match in matches:
        ip, timestamp, method, path, status, size = match
        print(f"{ip} accessed {path} at {timestamp} with status {status}")

# 每小时统计一次
while True:
    analyze_log("/path/to/logfile.log")
    time.sleep(3600)  # Sleep for one hour

These examples can be adapted to run as background services, cron jobs, or integrated into larger automation frameworks.

backendOperationsscheduling
Test Development Learning Exchange
Written by

Test Development Learning Exchange

Test Development Learning Exchange

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.