在Python中实现调度计划作业的五种方法


今天构建的大多数应用程序都需要某种方式的调度机制。常见的例子是轮询 API 或数据库、频繁检查系统健康状况、将日志转储到存档等。KubernetesApache Mesos等自动扩展软件需要检查已部署的应用程序的状态,为此它们使用定期运行的活跃度探针. 调度任务需要与业务逻辑解耦,因此可以使用解耦的执行队列,例如Redis队列。
Python 有几种方法可以安排作业,这就是我们将在本文中学习的内容。我将使用以下方式讨论调度任务:

  1. 简单循环
  2. 简单的循环但线程化
  3. 日程库
  4. Python Crontab
  5. RQ 调度器作为解耦队列

 
简单循环
这是不费吹灰之力的。使用无限运行的 while 循环定期调用函数可用于调度作业,这不是最好的方法,但它有效。可以使用内置时间模块的睡眠功能给出时间延迟。这并不是大多数作业的调度方式,因为首先,它看起来很难看,其次,与其他方法相比,它的可读性较差。
import time

def task():
    print("Job Completed!")

 while 1:
    task()
    time.sleep(10)

 
简单的循环但线程化
线程是计算机科学中的一个概念,其中线程、具有自己指令的小程序由进程执行并独立管理。这可以解决我们第一种方法的阻塞性质,让我们看看如何。
import time
import threading

def task():
    print("Job Completed!")

def schedule():
    while 1:
        task()
        time.sleep(10)

# makes our logic non blocking
thread = threading.Thread(target=schedule)
thread.start()

线程启动后,其底层逻辑无法被主线程修改,因此我们可能需要添加资源,程序通过这些资源可以检查特定场景并根据它们执行逻辑。
 

日程库
早些时候,我说使用 while 循环进行调度看起来很丑陋,调度库可以解决这个问题。

import schedule
import time

def task():
    print("Job Executing!")

# for every n minutes
schedule.every(10).minutes.do(task)

# every hour
schedule.every().hour.do(task)

# every daya at specific time
schedule.every().day.at(
"10:30").do(task)

# schedule by name of day
schedule.every().monday.do(task)

# name of day with time
schedule.every().wednesday.at(
"13:15").do(task)

while True:
    schedule.run_pending()
    time.sleep(1)

如您所见,可以毫不费力地创建多个计划。我特别喜欢创建作业的方式,方法链,另一方面,这个片段有一个 while 循环,这意味着代码被阻塞。
 
Python Crontab
Linux 中的 crontab 实用程序是一种易于使用且被广泛接受的调度解决方案。Python 库python-crontab提供了一个 API 来使用 Python 中的 CLI 工具。在crontab中,一个时间表使用UNIX的描述的cron字符串格式(* * * * *),它是一组五个值的一条线,这表明当作业应该被执行。python-crontab 将在文件中写入 crontab 计划转换为编程方法。

from crontab import CronTab

cron = CronTab(user='root')

job = cron.new(command='my_script.sh')

job.hour.every(1)
cron.write()

python-crontab 不会自动保存计划,需要执行 write() 方法来保存计划。还有更多功能,我强烈建议您查看他们的文档。
 

RQ调度器
有些任务不能立即执行,因此我们需要根据 LIFO 或 FIFO 等队列系统创建任务队列并弹出任务。python-rq允许我们做到这一点,使用 Redis 作为代理来排队作业。新作业的条目存储为带有信息的哈希映射,例如created_at, enqueued_at, origin, data, description.
排队作业由名为 worker 的程序执行。worker 在 Redis 缓存中也有一个条目,负责将作业出列以及更新 Redis 中的作业状态。作业可以在需要时排队,但要安排这些worker ,我们需要rq-scheduler

from rq_scheduler import Scheduler

queue = Queue('circle', connection=Redis())
scheduler = Scheduler(queue=queue)

scheduler.schedule(
    scheduled_time=datetime.utcnow(), # Time for first execution, in UTC timezone
    func=func,                     # Function to be queued
    args=[arg1, arg2],             # Arguments passed into function when executed
    kwargs={'foo': 'bar'},         # Keyword arguments passed into function when executed
    interval=60,                   # Time before the function is called again, in seconds
    repeat=None,                     # Repeat this number of times (None means repeat forever)
    meta={'foo': 'bar'}            # Arbitrary pickleable data on the job itself
)

RQ 工作器必须在终端中单独启动或通过 python-rq 工作器实用程序启动。一旦作业被触发,就可以在工作终端中看到,在成功和失败场景中可以使用单独的函数回调。

 
结论
还有一些用于调度的库,但在这里,我已经讨论了最常见的库。值得一提的是Celery,celery 的另一个优点是用户可以在多个经纪人之间进行选择。