admin 管理员组

文章数量: 1103806

python 中的 schedule 库是一个轻量级的定时任务调度库,可以让你轻松地管理周期性任务。你可以通过这个库定时运行任务,比如每隔一定时间执行一次某个函数,或每天定时执行某个任务。下面将详细介绍 schedule 模块的用法。

1. 安装 schedule 库

首先,你需要安装 schedule 库。可以通过 pip 安装:

pip install schedule

2. 基本使用

python-schedule 通过 schedule 模块提供了一个简单的 API 来安排任务。核心思想是将函数与定时规则(比如每隔多少分钟、每小时、每天等)结合。

示例:每隔 10 秒执行一次任务
import schedule
import time


# 定义要执行的任务
def job():
    print("任务正在执行...")

# 安排任务
schedule.every(10).seconds.do(job)

# 持续运行调度器
while True:
    schedule.run_pending()  # 检查是否有待执行的任务
    time.sleep(1)  # 每秒检查一次

在上面的代码中,schedule.every(10).seconds.do(job) 会安排 job 函数每隔 10 秒执行一次。schedule.run_pending() 检查是否有任务需要执行,time.sleep(1) 控制检查的频率。

3. 常见时间间隔的设置

python-schedule 支持多种常见的时间间隔设置,包括:秒、分钟、小时、天、周等。

示例:设置任务的不同时间间隔
import schedule
import time

def job():
    print("任务正在执行...")

# 每分钟执行一次
schedule.every(1).minute.do(job)

# 每小时执行一次
schedule.every().hour.do(job)

# 每天 9:00 执行一次
schedule.every().day.at("09:00").do(job)

# 每周一早上 8:30 执行一次
schedule.every().monday.at("08:30").do(job)

# 每月 1 号执行一次
schedule.every(1).month.do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

在这段代码中,任务将按照以下规则定时执行:

  • 每分钟一次
  • 每小时一次
  • 每天 9:00 执行一次
  • 每周一 8:30 执行一次
  • 每月 1 号执行一次

4. 取消任务

你可以通过 schedule.cancel_job() 来取消已安排的任务。

示例:取消任务
import schedule
import time


def job():
    print("任务正在执行...")

# 安排一个任务
job1 = schedule.every(5).seconds.do(job)

# 取消任务
schedule.cancel_job(job1)

while True:
    schedule.run_pending()
    time.sleep(1)

在上面的代码中,job1 任务被安排每隔 5 秒执行一次,但在之后通过 schedule.cancel_job(job1) 被取消了。即使调度器还在运行,该任务也不会再被执行。

5. 任务间隔的复杂设置

你可以设置更复杂的定时任务间隔,比如每隔一段时间执行某个任务,但只能在特定的时间段内执行。

示例:设置特定的任务间隔
import schedule
import time


def job():
    print("任务正在执行...")

# 每小时的第 15 分钟执行任务
schedule.every().hour.at(":15").do(job)

# 每周的周五执行任务
schedule.every().friday.do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

在上面的代码中,任务将安排在以下时刻执行:

  • 每小时的第 15 分钟执行
  • 每周五执行

6. 多任务调度

你可以安排多个任务在不同的时间间隔下执行。

示例:多个任务调度
import schedule
import time


def job1():
    print("任务 1 正在执行...")

def job2():
    print("任务 2 正在执行...")


# 安排任务
schedule.every(3).seconds.do(job1)

schedule.every(5).seconds.do(job2)

while True:
    schedule.run_pending()
    time.sleep(1)

在这段代码中,job1 每 3 秒执行一次,而 job2 每 5 秒执行一次,两个任务会同时运行。

7. 限制任务执行的次数

有时你可能希望限制任务执行的次数,可以通过 job.tag() 来实现。

示例:限制任务执行次数
import schedule
import time


def job():
    print("任务正在执行...")

# 安排任务,每 2 秒执行一次
job1 = schedule.every(2).seconds.do(job)

# 只执行 3 次
for i in range(3):
    schedule.run_pending()
    time.sleep(1)

在此示例中,任务只执行 3 次。你也可以通过其他方式来设置次数,具体取决于实际需求。

8. 调度的高级功能

使用 Job.tag 添加标记

你可以给任务添加标签,以便在以后可以通过标签来取消或修改任务。

import schedule
import time


def job():
    print("任务正在执行...")

# 安排任务并添加标签
job1 = schedule.every(3).seconds.do(job)

job1.tag('important')

# 根据标签取消任务
for job in schedule.get_jobs():
    if 'important' in job.tags:
        schedule.cancel_job(job)

while True:
    schedule.run_pending()
    time.sleep(1)

9. 处理异常和错误

你可以使用 try...except 语句来捕捉任务执行中的异常。

import schedule
import time


def job():
    try:
        print("任务开始执行...")

        # 模拟可能会抛出异常的操作
        1 / 0  # 故意制造一个异常

    except Exception as e:
        print(f"任务发生错误: {e}")

# 安排任务
schedule.every(5).seconds.do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

在这个例子中,任务中故意抛出了异常,但程序会继续运行并捕获错误。

总结

  • schedule 是一个非常轻量且易用的 python 定时任务调度库。
  • 你可以用它来安排简单的定时任务,或通过灵活的时间间隔设定来处理复杂的调度需求。
  • 可以安排任务、取消任务、限制任务的执行次数,并且支持任务标签和错误处理。

本文标签: 使用指南 模块 python schedule