APScheduler Tutorial: Installation, Basic Usage, Triggers, and Advanced Features
This article introduces the Python APScheduler library, covering installation, a basic interval example, various trigger types such as date and cron, parameterized and timezone-aware jobs, retry handling, dynamic job management, SQL job stores, multithreading, parallel execution, external event triggers, priority settings, and how to run the examples.
APScheduler is a powerful Python library for scheduling timed tasks, supporting multiple trigger types (date, interval, cron) and capable of running in single‑process or multi‑threaded environments.
Installation
pip install apschedulerBasic Example
Define a simple task that prints a message every 5 seconds using BlockingScheduler . This blocks the main thread until the scheduler is stopped.
from apscheduler.schedulers.blocking import BlockingScheduler
import time
def print_message():
print("Hello, world! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
scheduler = BlockingScheduler()
# 每隔 5 秒钟执行一次 print_message 函数
scheduler.add_job(print_message, 'interval', seconds=5)
# 启动调度器
scheduler.start()If you prefer a non‑blocking scheduler, use BackgroundScheduler instead.
Using Different Triggers
1. Date Trigger (run once)
from datetime import datetime, timedelta
from apscheduler.schedulers.blocking import BlockingScheduler
def print_message():
print("Hello, world! Current time:", datetime.now())
scheduler = BlockingScheduler()
# 在指定的时间点执行任务
scheduler.add_job(print_message, 'date', run_date=datetime.now() + timedelta(seconds=10))
scheduler.start()2. Cron Trigger (schedule like Linux cron)
from apscheduler.schedulers.blocking import BlockingScheduler
import time
def print_message():
print("Hello, world! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
scheduler = BlockingScheduler()
# 每天的 10:30 执行一次任务
scheduler.add_job(print_message, 'cron', hour=10, minute=30)
scheduler.start()Note: The cron example assumes the script runs before 10:30 each day; otherwise it will fire the next day.
Parameterized Tasks
from apscheduler.schedulers.blocking import BlockingScheduler
import time
def print_message(message):
print("Message:", message, "Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
scheduler = BlockingScheduler()
# 每隔 5 秒执行一次并传递参数
scheduler.add_job(print_message, 'interval', seconds=5, args=['Hello, world!'])
scheduler.start()Complex Cron Expressions
from apscheduler.schedulers.blocking import BlockingScheduler
import time
def print_message():
print("Hello, world! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
scheduler = BlockingScheduler()
# 每天 10:30 到 11:30 之间每 5 分钟执行一次
scheduler.add_job(print_message, 'cron', hour='10-11', minute='*/5')
scheduler.start()Cron with Timezone
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from datetime import datetime, timezone, timedelta
import pytz
def print_message():
print("Hello, world! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}
executors = {'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5)}
job_defaults = {'coalesce': False, 'max_instances': 3}
scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
# 指定时区为美国东部时间
scheduler.add_job(print_message, 'cron', hour='10-11', minute='*/5', timezone=pytz.timezone('US/Eastern'))
scheduler.start()Job Retry Mechanism
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import time
def print_message():
print("Hello, world! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
def handle_job_error(event):
print("Error occurred in job:", event.exception)
print("Traceback:", event.traceback)
scheduler = BlockingScheduler()
# 添加错误监听器
scheduler.add_listener(handle_job_error, EVENT_JOB_ERROR)
# 设置任务重试次数
scheduler.add_job(print_message, 'interval', seconds=5, max_instances=1, misfire_grace_time=10, coalesce=True)
scheduler.start()Dynamic Add/Remove Tasks
from apscheduler.schedulers.blocking import BlockingScheduler
import time
def print_message(message):
print("Message:", message, "Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
scheduler = BlockingScheduler()
# 动态添加任务
scheduler.add_job(print_message, 'interval', seconds=5, args=['Hello, world!'])
# 运行一段时间后移除任务
time.sleep(20)
scheduler.remove_job('job1')
scheduler.start()Using SQL Database to Store Jobs
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
import time
jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}
scheduler = BlockingScheduler(jobstores=jobstores)
def print_message():
print("Hello, world! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
scheduler.add_job(print_message, 'interval', seconds=5)
scheduler.start()Multithread/Multiprocess Execution
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
import time
executors = {'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5)}
scheduler = BlockingScheduler(executors=executors)
def print_message(message):
print("Message:", message, "Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
scheduler.add_job(print_message, 'interval', seconds=5, args=['Hello, world!'])
scheduler.start()Parallel Execution of Multiple Tasks
from apscheduler.schedulers.blocking import BlockingScheduler
import time
def print_message1():
print("Message 1: Hello, world! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
def print_message2():
print("Message 2: Hello again! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
scheduler = BlockingScheduler()
scheduler.add_job(print_message1, 'interval', seconds=5)
scheduler.add_job(print_message2, 'interval', seconds=10)
scheduler.start()External Event Triggered Task
from apscheduler.schedulers.blocking import BlockingScheduler
import threading, time
def print_message():
print("Hello, world! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
def trigger_event():
event.set()
event = threading.Event()
scheduler = BlockingScheduler()
# 当外部事件被触发时执行任务
scheduler.add_job(print_message, 'interval', seconds=5, trigger='event', args=[event])
# 模拟外部事件触发
threading.Timer(10, trigger_event).start()
scheduler.start()Task Priority
from apscheduler.schedulers.blocking import BlockingScheduler
import time
def print_message1():
print("Message 1: Hello, world! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
def print_message2():
print("Message 2: Hello again! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))
scheduler = BlockingScheduler()
# 设置任务优先级
scheduler.add_job(print_message1, 'interval', seconds=5, id='job1', replace_existing=True, misfire_grace_time=60, priority=10)
scheduler.add_job(print_message2, 'interval', seconds=10, id='job2', replace_existing=True, misfire_grace_time=60, priority=5)
scheduler.start()Running the Examples
Save any of the above snippets to a Python file (e.g., scheduler_example.py ) and execute it from the command line:
python scheduler_example.pyTest Development Learning Exchange
Test Development Learning Exchange
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.