Python中使用Celery和Redis构建任务队列 | blutv

21-07-03 banq

您拥有的微服务越多,您就越需要在微服务或计划作业或后台任务之间进行异步消息传递。

后台任务的好处有很多;解除特定微服务的繁重负载,运行无状态作业和计划任务,异步消息传递等……

但是,如果您没有设置适当的监控或高度可用的系统,失败的后台任务 - 名副其实 - 可能会被忽视。因此,您需要这些任务(简而言之)快速、高可用且易于管理。

因此,您需要在通信时具有低延迟的分布式、快节奏的开发环境。

因此,我建议;

  • Python,由于它的简单性。
  • Celery:一个在 github 上积极维护的分布式任务队列(有很多贡献者),快速且高可用。
  • Redis:具有极低延迟的内存键值存储。它可以用作 Celery 的(消息)代理和(结果)后端。

可以在我的Github 帐户中查看完整代码。

 

我将在这个例子中解释计划任务和触发任务,我将使用python 3.8.5和celery==5.1.1。

让我们考虑一下我们有活跃用户通过付费订阅使用我们的服务。我们必须续订过期的订阅,并通过电子邮件(计划任务)向用户发送发票,发送有关注册、密码恢复等事件的自定义电子邮件……(触发任务)。

 

计划任务

何时续订过期的订阅基于当前时间和存储在数据库中的用户的过期日期。显然,逐个检查用户是否过期不是一个好习惯。但是,我们可以定期检查数据库中已过到期日期的用户。

因此,我们必须设置一个 cron 作业X second来获取过期的订阅,尝试更新它们,如果更新成功,最后将发票发送给用户。

import logging
import datetime
import smtplib
import ssl
from typing import Optional

from celery import Celery
from celery.schedules import crontab

REDIS_BASE_URL = 'redis://localhost:6379'
SMTP_SERVER = "your.smtp.server"
SENDER_EMAIL = "your@email.com"
EMAIL_PASSWORD = "Secret_Email_Password"
app = Celery(
    'runner',
    broker=f"{REDIS_BASE_URL}/0",
    backend=f"{REDIS_BASE_URL}/1"
)

app.conf.beat_schedule = {
    'renew-expired-subscriptions': {
        'task': 'runner.renew_expired_subscriptions',
        'schedule': 5.0, # Runs in every 5 seconds
        #'schedule': crontab(minute=5) # Runs in every 5 minutes
        #'args': (arg_1, arg_2, ...), # Run the function with arguments
    },
}

@app.task(name='renew-expired-subscriptions')
def renew_expired_subscriptions():
    # Get expired user informations from your db
    # Try to renew subscription
    _test_user_info = {
        'name': 'Test User',
        'subscription_price': 14.99,
        'renewal_date': datetime.datetime.strftime(
            datetime.datetime.utcnow(),
            '%Y-%m-%d %H:%M:%S'
        )
    }
    _renewed = True # Very optimistic assumption
    _test_user_email = 'test@user.com'
    if _renewed:
        _sent = send_email(
            _test_user_email,
            replace_fields_with_values(
                'invoice_email_template',
                _test_user_info
            )
        )
    else:
        _sent = send_email(
            _test_user_email,
            replace_fields_with_values(
                'failed_to_renew_subscription_template',
                _test_user_info
            )
        )

    if _sent:
        logging.info(f"Invoice sent to user {_test_user_email}")

    return {
        "email": _test_user_email,
        "subscription_renewed": _renewed,
        "email_status": _sent
    }

@app.task(
    name='send-email-to-user',
    default_retry_delay=300,
    max_retry=5,
    soft_time_limit=30
)
def send_email_to_user(
        user_email: str,
        email_template: str,
        extra_info: dict
    ):
    _message = replace_fields_with_values(email_template, extra_info)
    _sent = send_email(user_email, _message)
    logging.info(f"Sent {email_template} email to user <{user_email}>")
    return {
        "email": user_email,
        "email_type": email_template,
        "email_status": _sent
    }

def send_email(to: str, message: str):
    try:
        context = ssl.create_default_context()
        with smtplib.SMTP(SMTP_SERVER, 587) as server:
            server.ehlo()
            server.starttls(context=context)
            server.ehlo()
            server.login(SENDER_EMAIL, EMAIL_PASSWORD)
            server.sendmail(SENDER_EMAIL, to, message)
            return True
    except Exception as _ex:
        logging.error(str(_ex))
        logging.critical(
            "Error occured while trying to"\
            f"send invoice to: <{to}>"
        )
        return False

def replace_fields_with_values(
        email_template: str,
        extra_info: dict
    ):
    try:
        with open(f"{email_template}.txt", 'r') as _template:
            _template_text = _template.read()
            _email = _template_text.format(**extra_info)
        return _email
    except FileNotFoundError:
        logging.critical(f"Email template not found: <{email_template}.txt>")

我们的 cron 作业renew_expired_subscriptions已定义并具有一些(虚拟)功能,包括辅助函数replace_fields_with_values和send_email.

从基础开始;首先,我们需要定义我们的 celery worker 应用程序:

REDIS_BASE_URL = 'redis://localhost:6379' 
app = Celery( 
    'runner', 
    broker=f"{REDIS_BASE_URL}/0", 
    backend=f"{REDIS_BASE_URL}/1" 
)

'runner'是我们应用程序的主要名称。有关更多详细信息,请参阅主要名称

我们有一些辅助函数;replace_fields_with_values函数用给定的值替换给定电子邮件模板中的占位符并输出自定义电子邮件消息。send_email函数正如它的名字所承诺的那样。

最后,您可以看到我们还定义了 cron 任务renew_expired_subscriptions,其中包含一些虚拟的订阅续订逻辑。每个 celery 任务都在一个@app.task装饰器下定义。

我们还需要为计划任务定义配置:

app.conf.beat_schedule = {
    'renew-expired-subscriptions': {
        'task': 'runner.renew_expired_subscriptions',
        'schedule': 5.0, # Runs in every 5 seconds
        #'schedule': crontab(minute=5) # Runs in every 5 minutes
        #'args': (arg_1, arg_2, ...), # Run the function with arguments
    },
}

此配置对象可以扩展为添加的 cron 任务。我们将订阅续订任务的 crontab 设置为 5 秒。

现在,我们可以运行工人worker:

shell 
# window 1 docker 
run -p 6379:6379 redis:alpine
# window 2 
celery --app=runner worker --loglevel=info
# window 3 
celery --app=runner beat --loglevel=info

 

触发任务

除了计划任务外,可能还需要触发任务。这些任务在触发事件时触发。

当事件被触发时,一些消息(任务数据)被推送到消息代理(又名bus)。然后工作人员从消息代理获取队列中的任务并处理它们。

在我们的例子中,我们需要在发生某些事件时向用户发送电子邮件,例如注册、新订阅、密码恢复等。

使用重试设置和软超时限制,我们的电子邮件发送任务应该类似于:

@app.task(
    name='send-email-to-user',
    default_retry_delay=300,
    max_retry=5,
    soft_time_limit=30
)
def send_email_to_user(
        user_email: str,
        email_template: str,
        extra_info: dict
    ):
    _message = replace_fields_with_values(email_template, extra_info)
    _sent = send_email(user_email, _message)
    logging.info(f"Sent {email_template} email to user <{user_email}>")
    return {
        "email": user_email,
        "email_type": email_template,
        "email_status": _sent
    }

要手动触发此任务,请启动您的worker、本地 redis 并在项目目录中运行 Python 命令:

shell 
# window 1 docker 
run -p 6379:6379 redis:alpine
# window 2 
celery --app=runner worker --loglevel=info
# window 3 
python3 -c "import runner; runner.send_email_to_user.delay('a@b.c', 'register', {'a': 'b'})"

 

结论

通过使用任务队列,您可能会受益于并发/并行计算、工作量较少的微服务、弹性工作者。

现在,您可以同时运行两个计划任务,同时手动触发其他任务并监控您的工作人员吗?你绝对应该试试这个。它将让您基本了解任务队列机制的工作原理。

任务队列的最佳实践:

  • 为任务设置时间限制。
  • 尽可能使用最少的参数。无状态任务更好。
  • 不要相信你的经纪人的安全。如果您的任务数据中有秘密,您就必须认真对待安全问题。
  • 请注意连接池限制的限制。
  • 您必须为不同的经纪人或后端进行不同的设置。没有经纪人或后端与其他人相同。
  • 指数重试间隔优于线性重试间隔。
  • 对您的队列进行分类和优先排序。
  • 监控您的工作人员并正确记录。

 

猜你喜欢