Backend Development 10 min read

Using schedule and APScheduler Libraries for Python Task Scheduling

This article demonstrates how to use the simple schedule library and the more powerful APScheduler library—including blocking, background, and async schedulers—to create, run, log, handle errors, and manage concurrency and dependencies for Python scheduled tasks.

Test Development Learning Exchange
Test Development Learning Exchange
Test Development Learning Exchange
Using schedule and APScheduler Libraries for Python Task Scheduling

Using the schedule library

import schedule
import time
def job():
print("Job executed at: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
# Execute every day at 09:00
schedule.every().day.at("09:00").do(job)
# Execute every 5 seconds
schedule.every(5).seconds.do(job)
# Main loop
while True:
schedule.run_pending()
time.sleep(1)  # Sleep 1 second to reduce CPU usage

Using APScheduler (advanced Python scheduler)

from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
def scheduled_job():
print("Job executed at: " + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
scheduler = BlockingScheduler()
# Add a daily job at 09:00
scheduler.add_job(scheduled_job, 'cron', hour=9)
# Add a job that runs every 5 seconds
scheduler.add_job(scheduled_job, 'interval', seconds=5)
# Start the scheduler (blocks the main thread)
scheduler.start()
# In production, combine with a web framework or long‑running service

Using APScheduler for more complex non‑blocking tasks (BackgroundScheduler)

When you need the main thread to continue doing other work, you can use AsyncIOScheduler together with an asyncio event loop.

from apscheduler.schedulers.asyncio import AsyncIOScheduler
import asyncio
import datetime
async def scheduled_job():
print("Job executed at: " + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
async def main():
scheduler = AsyncIOScheduler()
# Add a daily job at 09:00
scheduler.add_job(scheduled_job, 'cron', hour=9)
# Add a job that runs every 5 seconds
scheduler.add_job(scheduled_job, 'interval', seconds=5)
scheduler.start()
# Let the program run for an hour
await asyncio.sleep(3600)
# Shut down before exiting
scheduler.shutdown()
if __name__ == "__main__":
asyncio.run(main())

APScheduler also supports advanced options such as limiting execution counts, defining dependencies, callbacks on success/failure, and persisting jobs so they survive process restarts.

Recording task results and handling exceptions with APScheduler

from apscheduler.schedulers.blocking import BlockingScheduler
import logging
import datetime
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scheduled_job():
try:
result = do_something_complex()  # May raise an exception
logger.info(f"Job executed at: {datetime.datetime.now()} - Result: {result}")
except Exception as e:
logger.error(f"Job execution failed at: {datetime.datetime.now()} - Error: {e}")
def start_scheduler():
scheduler = BlockingScheduler()
scheduler.add_job(scheduled_job, 'cron', hour=9)
scheduler.add_job(scheduled_job, 'interval', seconds=5)
# Add listeners for job execution events
scheduler.add_listener(
lambda event: logger.info(f"Event triggered: {event.job_id} finished with status {event.retval}"),
apscheduler.events.EVENT_JOB_EXECUTED | apscheduler.events.EVENT_JOB_ERROR
)
scheduler.start()
if __name__ == "__main__":
start_scheduler()

This example adds logging, exception handling, and event listeners so that both successful completions and errors are recorded.

Concurrent execution and task dependencies with APScheduler

For scenarios requiring parallel jobs or simple dependency logic, you can configure a ThreadPoolExecutor for the scheduler and use custom checks inside job functions.

from apscheduler.schedulers.blocking import BlockingScheduler
from concurrent.futures import ThreadPoolExecutor
import logging
import datetime
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def task_a():
logger.info("Task A started.")
# ... task A logic ...
logger.info("Task A finished.")
def task_b():
logger.info("Task B started.")
# ... task B logic ...
logger.info("Task B finished.")
def dependent_task():
logger.info("Dependent task started.")
# ... logic that depends on A and B ...
logger.info("Dependent task finished.")
def start_scheduler():
scheduler = BlockingScheduler(executor=ThreadPoolExecutor(max_workers=2))  # Enable concurrency
scheduler.add_job(task_a, 'interval', seconds=5)
scheduler.add_job(task_b, 'interval', seconds=5)
# APScheduler does not provide explicit dependency support; you must implement it yourself.
scheduler.add_job(dependent_task, 'interval', seconds=10,
next_run_time=lambda prev: prev + datetime.timedelta(seconds=10) if getattr(task_a, 'finished', False) and getattr(task_b, 'finished', False) else None)
scheduler.start()
if __name__ == "__main__":
start_scheduler()

The example shows how to run two jobs concurrently using a thread pool and discusses that true task dependencies must be managed manually (e.g., via shared state or a database).

Note

schedule library is suitable for simple timing needs; it runs in the main thread and does not support multithreading or multiprocessing.

APScheduler offers multiple scheduler types—including BlockingScheduler , BackgroundScheduler , and AsyncIOScheduler —and supports complex cron‑style schedules, asynchronous execution, and persistence, making it appropriate for production‑grade task scheduling.

PythonBackend Developmenttask schedulingscheduleapscheduler
Test Development Learning Exchange
Written by

Test Development Learning Exchange

Test Development Learning Exchange

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.