APScheduler Tutorial: Installation, Basic Usage, Triggers, and Advanced Features
This article introduces the Python APScheduler library, covering installation, a basic interval example, various trigger types such as date and cron, parameterized and timezone-aware jobs, retry handling, dynamic job management, SQL job stores, multithreading, parallel execution, external event triggers, priority settings, and how to run the examples.
APScheduler is a powerful Python library for scheduling timed tasks, supporting multiple trigger types (date, interval, cron) and capable of running in single‑process or multi‑threaded environments.
Installation pip install apscheduler Basic Example
Define a simple task that prints a message every 5 seconds using BlockingScheduler. This blocks the main thread until the scheduler is stopped.
from apscheduler.schedulers.blocking import BlockingScheduler
import time
def print_message():
print("Hello, world! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
scheduler = BlockingScheduler()
# 每隔 5 秒钟执行一次 print_message 函数
scheduler.add_job(print_message, 'interval', seconds=5)
# 启动调度器
scheduler.start()If you prefer a non‑blocking scheduler, use BackgroundScheduler instead.
Using Different Triggers
1. Date Trigger (run once)
from datetime import datetime, timedelta
from apscheduler.schedulers.blocking import BlockingScheduler
def print_message():
print("Hello, world! Current time:", datetime.now())
scheduler = BlockingScheduler()
# 在指定的时间点执行任务
scheduler.add_job(print_message, 'date', run_date=datetime.now() + timedelta(seconds=10))
scheduler.start()2. Cron Trigger (schedule like Linux cron)
from apscheduler.schedulers.blocking import BlockingScheduler
import time
def print_message():
print("Hello, world! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
scheduler = BlockingScheduler()
# 每天的 10:30 执行一次任务
scheduler.add_job(print_message, 'cron', hour=10, minute=30)
scheduler.start()Note: The cron example assumes the script runs before 10:30 each day; otherwise it will fire the next day.
Parameterized Tasks
from apscheduler.schedulers.blocking import BlockingScheduler
import time
def print_message(message):
print("Message:", message, "Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
scheduler = BlockingScheduler()
# 每隔 5 秒执行一次并传递参数
scheduler.add_job(print_message, 'interval', seconds=5, args=['Hello, world!'])
scheduler.start()Complex Cron Expressions
from apscheduler.schedulers.blocking import BlockingScheduler
import time
def print_message():
print("Hello, world! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
scheduler = BlockingScheduler()
# 每天 10:30 到 11:30 之间每 5 分钟执行一次
scheduler.add_job(print_message, 'cron', hour='10-11', minute='*/5')
scheduler.start()Cron with Timezone
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from datetime import datetime, timezone, timedelta
import pytz
def print_message():
print("Hello, world! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}
executors = {'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5)}
job_defaults = {'coalesce': False, 'max_instances': 3}
scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
# 指定时区为美国东部时间
scheduler.add_job(print_message, 'cron', hour='10-11', minute='*/5', timezone=pytz.timezone('US/Eastern'))
scheduler.start()Job Retry Mechanism
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import time
def print_message():
print("Hello, world! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
def handle_job_error(event):
print("Error occurred in job:", event.exception)
print("Traceback:", event.traceback)
scheduler = BlockingScheduler()
# 添加错误监听器
scheduler.add_listener(handle_job_error, EVENT_JOB_ERROR)
# 设置任务重试次数
scheduler.add_job(print_message, 'interval', seconds=5, max_instances=1, misfire_grace_time=10, coalesce=True)
scheduler.start()Dynamic Add/Remove Tasks
from apscheduler.schedulers.blocking import BlockingScheduler
import time
def print_message(message):
print("Message:", message, "Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
scheduler = BlockingScheduler()
# 动态添加任务
scheduler.add_job(print_message, 'interval', seconds=5, args=['Hello, world!'])
# 运行一段时间后移除任务
time.sleep(20)
scheduler.remove_job('job1')
scheduler.start()Using SQL Database to Store Jobs
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
import time
jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}
scheduler = BlockingScheduler(jobstores=jobstores)
def print_message():
print("Hello, world! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
scheduler.add_job(print_message, 'interval', seconds=5)
scheduler.start()Multithread/Multiprocess Execution
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
import time
executors = {'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5)}
scheduler = BlockingScheduler(executors=executors)
def print_message(message):
print("Message:", message, "Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
scheduler.add_job(print_message, 'interval', seconds=5, args=['Hello, world!'])
scheduler.start()Parallel Execution of Multiple Tasks
from apscheduler.schedulers.blocking import BlockingScheduler
import time
def print_message1():
print("Message 1: Hello, world! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
def print_message2():
print("Message 2: Hello again! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
scheduler = BlockingScheduler()
scheduler.add_job(print_message1, 'interval', seconds=5)
scheduler.add_job(print_message2, 'interval', seconds=10)
scheduler.start()External Event Triggered Task
from apscheduler.schedulers.blocking import BlockingScheduler
import threading, time
def print_message():
print("Hello, world! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
def trigger_event():
event.set()
event = threading.Event()
scheduler = BlockingScheduler()
# 当外部事件被触发时执行任务
scheduler.add_job(print_message, 'interval', seconds=5, trigger='event', args=[event])
# 模拟外部事件触发
threading.Timer(10, trigger_event).start()
scheduler.start()Task Priority
from apscheduler.schedulers.blocking import BlockingScheduler
import time
def print_message1():
print("Message 1: Hello, world! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
def print_message2():
print("Message 2: Hello again! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
scheduler = BlockingScheduler()
# 设置任务优先级
scheduler.add_job(print_message1, 'interval', seconds=5, id='job1', replace_existing=True, misfire_grace_time=60, priority=10)
scheduler.add_job(print_message2, 'interval', seconds=10, id='job2', replace_existing=True, misfire_grace_time=60, priority=5)
scheduler.start()Running the Examples
Save any of the above snippets to a Python file (e.g., scheduler_example.py) and execute it from the command line:
python scheduler_example.pySigned-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
