FastScheduler:Python 中简单又强大的任务调度工具详解

在实际项目里,你是不是经常需要定时执行某些任务?比如每隔 10 秒检查一次服务器状态、每天早上 9 点生成日报、周一固定时间开晨会提醒、每个月 1 号自动跑对账程序……

传统的解决方案可能是用 time.sleep() 硬写循环、借助 schedule 库、用 APScheduler、甚至直接上系统 crontab。但这些方案要么功能太简单、要么配置繁琐、要么对异步支持不好、要么重启后任务状态容易丢失。

今天要介绍的 FastScheduler 是一个 2024–2025 年间出现的轻量级 Python 任务调度库,它试图在“极简 API + 实用功能 + 美观监控”三者之间找到很好的平衡点。

下面我们从实际使用者的视角,一步步来看它到底好用在哪里、怎么用、以及适合哪些场景。(全文基于官方 README 内容整理,不添加任何外部假设。)

它最吸引人的地方是什么?

用一句话概括:“写起来像装饰器,功能却接近企业级”

核心亮点列表:

  • 一行装饰器就能定义任务,基本不用学新语法
  • 原生支持 async/await 函数
  • 支持几乎所有常见定时方式:间隔、每天/每周固定时间、标准 cron 表达式
  • 任务可以设置时区(非常友好给跨国团队)
  • 重启后自动恢复 + 可选择是否追赶错过的执行
  • 内置超时、自动重试、失败进入死信队列
  • 提供一个实时刷新的 FastAPI 可视化仪表盘(带 SSE 推送)
  • 支持 JSON 文件持久化,也支持 SQLite / PostgreSQL / MySQL

如果你之前用过 APScheduler 觉得配置太啰嗦,或者用 schedule 觉得功能太基础,那么 FastScheduler 很可能是目前“性价比”最高的中间选项之一。

快速上手:3 分钟跑通第一个任务

# 基础版(推荐先用这个)
pip install fastscheduler

# 如果想要仪表盘
pip install fastscheduler[fastapi]

# 如果要用 cron 表达式
pip install fastscheduler[cron]

# 全家桶(开发阶段最方便)
pip install fastscheduler[all]

最简代码示例:

from fastscheduler import FastScheduler

scheduler = FastScheduler(quiet=True)   # quiet=True 减少日志干扰

@scheduler.every(10).seconds
def say_hello():
    print("Hello, 世界!我在被定时调用~")

scheduler.start()  # 启动调度器(阻塞式)

想更优雅一点?可以把启动放到线程或进程里,或者用在 FastAPI 的 lifespan 里(后面会讲)。

再看几个常见写法对比:

# 每 5 分钟
@scheduler.every(5).minutes
def check_price(): ...

# 每天 14:30 执行
@scheduler.daily.at("14:30")
async def send_report(): ...

# 每个工作日(周一到周五)上午 9 点
@scheduler.cron("0 9 * * MON-FRI")
def market_open(): ...

# 下周一 10:00 只执行一次
@scheduler.weekly.monday.at("10:00")
def one_time_meeting(): ...

# 程序启动后 60 秒执行一次,然后不再执行
@scheduler.once(60)
def delayed_init(): ...

是不是比很多传统库写起来舒服很多?

这里插入一个官方的动态演示(gif)效果,大家可以直观感受代码 → 运行 → 仪表盘的流畅感:

时区问题终于不头疼了

很多团队分布在不同国家,定时任务必须考虑时区。

FastScheduler 支持两种写法:

  1. 直接在调度器方法里传 tz 参数
@scheduler.daily.at("09:00", tz="America/New_York")
def nyc_task(): ...
  1. 使用链式调用(更推荐,阅读性更好)
@scheduler.weekly.monday.tz("Asia/Tokyo").at("09:00")
def tokyo_standup(): ...

常用时区直接写字符串即可,例如:

  • UTC
  • Asia/Shanghai
  • Asia/Tokyo
  • America/Los_Angeles
  • Europe/London
  • Australia/Sydney

生产环境最关心的几个“硬核”功能

1. 任务超时自动杀死

@scheduler.every(1).minutes.timeout(30)   # 最多跑 30 秒,超时直接杀掉
def maybe_slow_api(): ...

2. 失败自动重试 + 指数退避

@scheduler.every(5).minutes.retries(5)    # 最多重试 5 次
def call_flaky_service(): ...

重试间隔:2s → 4s → 8s → 16s …… 非常经典的指数退避策略。

3. 重启后是否追赶错过的任务?

默认会追赶。你可以明确关闭:

@scheduler.every(1).hours.no_catch_up()
def collect_metrics(): ...

4. 死信队列(Dead Letter Queue)

任务连续失败多次后,会进入死信队列保存,便于事后排查。

你可以随时查看:

dead = scheduler.get_dead_letters(limit=50)
for item in dead:
    print(item.error, item.timestamp, item.attempts)

仪表盘里也有专门的“Failed”标签页。

这里是一张官方仪表盘截图示例(实时状态、倒计时、最近 5 次执行结果、快速操作按钮一应俱全):

如何把仪表盘集成到现有 FastAPI 项目?

大多数现代后端项目都用 FastAPI,集成非常简单。

from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastscheduler import FastScheduler
from fastscheduler.fastapi_integration import create_scheduler_routes

scheduler = FastScheduler(quiet=True)

@asynccontextmanager
async def lifespan(app: FastAPI):
    scheduler.start()
    yield
    await scheduler.stop(wait=True)   # 优雅停止,等待正在执行的任务完成

app = FastAPI(lifespan=lifespan)

# 挂载仪表盘路由,默认路径 /scheduler/
app.include_router(create_scheduler_routes(scheduler))

# 你的其他路由...
@app.get("/")
def read_root():
    return {"message": "Hello World"}

访问 http://localhost:8000/scheduler/ 就能看到完整的监控页面,支持:

  • 实时 SSE 更新
  • 任务列表 + 状态灯 + 下次执行倒计时
  • 一键立即执行 / 暂停 / 恢复 / 取消
  • 执行历史查询
  • 失败任务查看与清空
  • 整体统计(成功率、累计运行次数、调度器运行时长等)

持久化存储:JSON vs 数据库怎么选?

存储方式 适用场景 优点 缺点 配置示例
JSON 文件(默认) 单机、小型项目、开发阶段 零依赖、轻量、开箱即用 并发写可能有竞争、多实例不安全 FastScheduler()
SQLite 单机生产、中小型项目 事务安全、单文件、易备份 高并发写入性能一般 storage="sqlmodel", database_url="sqlite:///scheduler.db"
PostgreSQL 中大型项目、多实例部署 高并发、强一致性、生态好 需要维护数据库 database_url="postgresql://..."
MySQL 已使用 MySQL 的团队 与现有系统兼容好 相对 PostgreSQL 稍弱 database_url="mysql://..."

推荐生产环境优先 PostgreSQL + SQLModel 存储,数据更可靠,也方便后续做报表分析。

常见问题解答(FAQ)

Q1:重启程序后,任务会丢失吗?
不会。状态会自动保存到 JSON 或数据库里,重启后会恢复所有任务定义,并根据规则决定是否补跑错过的执行。

Q2:如何手动触发某个任务立即执行?
有三种方式:

  1. 仪表盘点“Run now”按钮
  2. 调用 scheduler.run_job_now("job_0")
  3. 通过 API POST /scheduler/api/jobs/{job_id}/run

Q3:我想暂停某个任务,但不想删除它,怎么办?
scheduler.pause_job("job_0") 暂停,scheduler.resume_job("job_0") 恢复。暂停期间任务保留在调度表里,只是不会被触发。

Q4:仪表盘的实时更新是怎么实现的?
使用 Server-Sent Events (SSE),前端通过 /scheduler/events 持续接收推送,基本做到了“秒级”刷新。

Q5:支持同一个函数绑定多个不同的调度规则吗?
支持。你可以对同一个函数多次使用装饰器,每次会生成不同的 job_id。

Q6:任务执行出错会怎么样?
默认记录到历史,超过重试次数后进入死信队列,不会阻塞其他任务。可以在仪表盘或代码里查看详细错误栈。

Q7:如何优雅停止调度器?
scheduler.stop(wait=True) 会等待所有正在运行的任务完成后再退出(有 30 秒默认超时,可配置)。

什么时候选 FastScheduler,而不是其他库?

  • 你想要极简的装饰器写法 → 选它
  • 你需要美观且实时的 Web 监控面板 → 选它
  • 项目里有 FastAPI,想复用同一个服务 → 选它
  • 需要跨时区调度且希望配置简单 → 选它
  • 希望重启后状态不丢,又不想自己写持久化逻辑 → 选它

反之,如果你只需要非常简单的间隔定时,schedule 库仍然够用;如果项目规模很大、对分布式一致性要求极高,可能还是要考虑 Celery + Redis / RQ 那样的方案。

总结

FastScheduler 把“简单”和“实用”做到了一个不错的平衡点:

  • 写任务像写普通函数 + 装饰器
  • 功能覆盖了 80% 日常调度需求
  • 自带生产级特性(超时、重试、死信、持久化、仪表盘)
  • 对新手友好,对老手也省心

如果你正在寻找一个“够用、不臃肿、带好看监控”的 Python 定时任务解决方案,不妨 clone 仓库试用一下:

https://github.com/MichielMe/fastscheduler

实际跑起来后,你很可能就会把它加入项目的“标配依赖”列表里。

有任何使用中的疑问,欢迎留言,我们一起讨论。祝你的后台任务永远准时、稳定地运行~