Python Celery

Spread the love

Все в сообществе Python слышали о Celery хотя бы один раз, и, возможно, уже работали с ним. По сути, это удобный инструмент, который помогает запускать отложенный или выделенный код в отдельном процессе или даже на отдельном компьютере или сервере. Это экономит время и усилия на многих уровнях.

Введение в Celery Python

Celery позволяет снижать нагрузку на процессор, выполняя часть функциональности в виде отложенных задач либо на том же сервере, что и другие задачи, либо на другом сервере. Чаще всего разработчики используют его для отправки электронных писем. Тем не менее, Celery может предложить гораздо больше. В этой статье я расскажу о основах Celery, а также приведу пару лучших практик Python-Celery.

Основы Celery

Если вы уже работали с Celery, можете пропустить эту главу. Но если Celery для вас в новинку, здесь вы можете узнаеть, как включить Celery в свой проект, и почитать отдельное руководство по использованию Celery с Django.

Запуск Celery с Django

Чтобы использовать Celery с вашим проектом Django, вы должны сначала определить экземпляр библиотеки Celery (в нашем случае называемый «app»). Для этого создайте файл celery.py

Например:

- proj/
  - manage.py
  - proj/
    - celery.py
    - __init__.py
    - settings.py
    - urls.py

Содержимое файла celery.py:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

Давайте разберем, что здесь происходит, сначала мы импортируем absolute_import из __future__, чтобы наш модуль celery.py не конфликтовал с библиотекой (это пример код для версии python 2.x, для версии 3.x этого делать не нужно):

from __future__ import absolute_import

Затем мы устанавливаем переменную окружения DJANGO_SETTINGS_MODULE по умолчанию для запуска через командную строку celery:

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

Эта строка не обязательна, но она избавляет вас от необходимости всегда передавать файл настроек в celery. Это должно всегда происходить перед созданием экземпляров приложения, что мы и делаем дальше:

app = Celery('proj')

Это наш экземпляр библиотеки, у вас может быть несколько экземпляров, но, вероятно, не будет причин для этого при использовании Django.

Мы также добавили модуль настроек Django в качестве источника конфигурации для Celery. Это означает, что вам не нужно использовать несколько файлов конфигурации, а вместо этого вы будете настраивать Celery непосредственно из настроек Django; но вы также можете разделить их, если захотите.

app.config_from_object('django.conf:settings', namespace='CELERY')

Пространство имен namespace в верхнем регистре означает, что все параметры конфигурации Celery должны быть указаны в верхнем регистре, а не в нижнем регистре, и начинаются с CELERY_, поэтому, например, параметр task_always_eager будет CELERY_TASK_ALWAYS_EAGER, а параметр broker_url становится CELERY_BROKER_URL. namespace CELERY_ также является необязательным, но рекомендуется (для предотвращения наложения с другими настройками Django).

Далее, обычной практикой является определение всех задач в отдельном модуле tasks.py, и у Celery есть способ автоматического обнаружения этих модулей:

app.autodiscover_tasks()

С помощью строки выше Celery будет автоматически обнаруживать задачи из всех установленных приложений в соответствии с соглашением tasks.py:

- app1/
    - tasks.py
    - models.py
- app2/
    - tasks.py
    - models.py

Запуск рабочего процесса

В производственной среде вы захотите запускать worker в фоновом режиме в качестве демона – см. Daemonization – но для тестирования и разработки полезно иметь возможность запускать экземпляр worker с помощью команды управления celery worker. Например:

celery -A proj worker -l info

Лучше создать экземпляр (то есть запускать Celery) через отдельный файл, так как будет необходимо запустить/перезапускать отдельно Celery при внесение изменений.

Базовые примеры Python Celery

Как я уже упоминал ранее, классическим примером использования Celery является отправка электронной почты. Я использую этот пример, чтобы показать вам основы использования Celery. Для начало создадим вьюху и задачу:

from django.conf import settings
from django.core.mail import send_mail
from django.template import Engine, Context

from myproject.celery import app


def render_template(template, context):
    engine = Engine.get_default()

    tmpl = engine.get_template(template)

    return tmpl.render(Context(context))
    

@celery_app.task
def send_mail_task(recipients, subject, template, context):
    send_mail(
        subject=subject,
        message=render_template(f'{template}.txt', context),
        from_email=settings.DEFAULT_FROM_EMAIL,
        recipient_list=recipients,
        fail_silently=False,
        html_message=render_template(f'{template}.html', context)
)

Используя Celery, мы сокращаем время ответа клиенту, поскольку отделяем процесс отправки от основного кода, отвечающего за возврат ответа.

Самый простой способ выполнить эту задачу – вызвать метод delay, предоставляемый декоратором app.task.

send_mail_task.delay(('noreply@example.com', ), 'Celery cookbook test', 'test', {})

Celery так же позволяет настроить повторные попытки после сбоя.

@celery_app.task(bind=True, default_retry_delay=10 * 60)
def send_mail_task(self, recipients, subject, template, context):
    message = render_template(f'{template}.txt', context)
    html_message = render_template(f'{template}.html', context)
    try:
        send_mail(
            subject=subject,
            message=message,
            from_email=settings.DEFAULT_FROM_EMAIL,
            recipient_list=recipients,
            fail_silently=False,
            html_message=html_message
        )
    except smtplib.SMTPException as ex:
        self.retry(exc=ex)

Теперь задача будет перезапущена через десять минут, в случае если отправка не будет удачной. Кроме того, вы сможете установить количество повторных попыток.

Некоторые из вас могут удивиться, почему я переместил рендеринг шаблона за пределы вызова send_mail. Это потому, что мы заключаем вызов send_mail в try / except, и лучше иметь как можно меньше кода в try / except.

Celery для продвинутых пользователей

Celery Django Запланированные задачи

Celery позволяет запускать задачи с помощью планировщиков, таких как crontab в Linux.

Прежде всего, если вы хотите использовать периодические задачи, вы должны запустить worker Celery с флагом –beat, иначе Celery будет игнорировать планировщик. Следующим шагом будет создание конфигурации, в которой будет указано, какую задачу следует выполнить и когда. Вот пример:

from celery.schedules import crontab


CELERY_BEAT_SCHEDULE = {
    'monday-statistics-email': {
        'task': 'myproject.apps.statistics.tasks.monday_email',
        'schedule': crontab(day_of_week=1, hour=7),
    },
}
  • если вы не используете Django, вы должны использовать celery_app.conf.beat_schedule вместо CELERY_BEAT_SCHEDULE

В этой конфигурации у нас есть только одна задача, которая будет выполняться каждый понедельник в 7 часов утра.

Вы можете добавить аргументы к задачам и выбрать, что должно быть сделано в случае, если одна и та же задача должна выполняться в разное время с разными аргументами. Метод crontab поддерживает синтаксис системного crontab, например crontab (minute = ‘*/15’), для запуска задачи каждые 15 минут.

Отложенное выполнение задачи в Celery

eta – выполнить задачу в точное время
countdown – выполнить задачу через N секунд

Вы также можете установить задачи в очереди Python Celery с тайм-аутом перед выполнением. (Например, когда вам нужно отправить уведомление после действия.) Для этого используйте метод apply_async с аргументом eta или countdown.

Давайте посмотрим, как это может выглядеть в коде:

from datetime import datetime


send_mail_task.apply_async(
    (('noreply@example.com', ), 'Celery cookbook test', 'test', {}),
    countdown=15 * 60
)

send_mail_task.apply_async(
    (('noreply@example.com', ), 'Celery cookbook test', 'test', {}),
    eta=datetime(2019, 5, 20, 7, 0)
)

В первом примере электронное письмо будет отправлено через 15 минут, а во втором – в 7 часов утра 20 мая.

Настройка Python Celery очереди

Celery может быть распределен, когда у вас есть несколько worker на разных серверах, которые используют одну очередь сообщений для планирования задач. Вы можете настроить дополнительную очередь для вашей задачи/worker. Например, если отправка электронных писем является важной частью вашей системы, и вы не хотите, чтобы какие-либо другие задачи влияли на отправку. Затем вы можете добавить новую очередь, назовем ее mail и использовать эту очередь для отправки электронных писем.

CELERY_TASK_ROUTES = {
    'myproject.apps.mail.tasks.send_mail_task': {'queue': 'mail', },
}
  • если вы не используете Django, используйте <code> celery_app.conf.task_routes </code> вместо CELERY_TASK_ROUTES

Запустите двух отдельных workers celery для очереди по умолчанию и новой очереди:

celery -A myproject worker -l info -Q celery
celery -A myproject worker -l info -Q mail

Первая строка будет запускать worker для очереди по умолчанию, называемой celery, а вторая строка – worker для очереди mail. Вы можете использовать первый worker без аргумента -Q, тогда этот worker будет использовать все настроенные очереди.

Долгосрочные задачи

Иногда мне приходится иметь дело с заданиями, написанными для просмотра записей в базе данных и выполнения некоторых операций. Довольно часто разработчики забывают о росте данных, что может привести к очень длительному времени выполнения задачи. Всегда лучше писать такие задачи, которые позволяют работать с блоками данных. Самый простой способ – добавить смещение и ограничить параметры задачи. Это позволит вам указать размер фрагмента и курсор для получения нового фрагмента данных.

@celery_app.task
def send_good_morning_mail_task(offset=0, limit=100):
    users = User.objects.filter(is_active=True).order_by('id')
    for user in users:
        send_good_morning_mail(user)

    if len(users) >= limit:
        send_good_morning_mail_task.delay(offset + limit, limit)

Это очень простой пример того, как такая задача может быть реализована. В конце задачи мы проверяем, сколько пользователей мы нашли в базе данных. Если число равно пределу, возможно, у нас есть новые пользователи для обработки. Поэтому мы снова запускаем задачу с новым смещением. Если количество пользователей меньше определенного лимита, это означает, что это последний кусок, и нам не нужно продолжать. Но будьте осторожны: для реализации этой задачи каждый раз должен быть одинаковый порядок записей.

Celery: получение результатов задачи

Большинство разработчиков не записывают результаты, полученные после выполнения задачи. Представьте, что вы можете взять часть кода, назначить его для задачи и выполнить эту задачу независимо, как только вы получите запрос пользователя. Когда нам нужны результаты задания, мы либо сразу получаем результаты (если задание выполнено), либо ждем его завершения. Затем мы включаем результат в общий ответ. Используя этот подход, вы можете уменьшить время отклика, что очень хорошо для ваших пользователей и рейтинга сайта.

Мы используем эту функцию для запуска одновременных операций. В одном из наших проектов у нас было много пользовательских данных и много поставщиков услуг. Чтобы найти лучшего поставщика услуг, нам нужно произвести тяжелые расчеты и проверки. Чтобы сделать это быстрее, мы создали задачи для пользователя с каждым поставщиком услуг, запускаем их и собираем результаты, чтобы показать их пользователю. Это очень легко сделать с целевыми группами Celery.

from celery import group

@celery_app.task
def calculate_service_provider_task(user_id, provider_id):
    user = User.objects.get(pk=user_id)
    provider = ServiceProvider.objects.get(pk=provider_id)

    return calculate_service_provider(user, provider)


@celery_app.task
def find_best_service_provider_for_user(user_id):
    user = User.objects.get(pk=user_id)
    providers = ServiceProvider.objects.related_to_user(user)

    calc_group = group([
        calculate_service_provider_task.s(user.pk, provider.pk)
        for provider in providers
    ]).apply_async()

    return calc_group

Во-первых, почему мы запускаем две задачи? Мы используем второе задание для формирования групп задач расчета, их запуска и возврата. Кроме того, вторая задача заключается в том, где вы можете назначить фильтрацию проекта – например, поставщиков услуг, которые необходимо рассчитать для данного пользователя. Все это можно сделать, пока Celery выполняет другую работу. Когда группа задач возвращается, результатом первой задачи является фактически интересующий нас расчет.

Вот пример того, как использовать этот подход в коде:

def view(request):
    find_job = find_best_service_provider_for_user.delay(request.user.pk)

    # do other stuff

    calculations_results = find_job.get().join()

    # process calculations_results and send response

Здесь мы выполняем вычисления как можно скорее, ожидаем результатов в конце метода, затем готовим ответ и отправляем его пользователю.

Полезные советы

Используйте ID записей БД а не сами данные

Я, наверное, уже упоминал, что я использую идентификаторы записей базы данных в качестве аргументов задачи вместо полных объектов. Это хороший способ уменьшить размер очереди сообщений. Но более важно то, что при выполнении задачи данные в базе данных могут быть изменены. И когда у вас есть только идентификаторы, вы получите свежие данные, а не устаревшие данные, которые вы получаете при передаче объектов.

Заключение

Как видите, с Celery можно делать гораздо больше, чем просто отправлять электронные письма. Вы можете запускать различные задачи одновременно, используя основной процесс, и пока вы выполняете свою работу, Celery будет выполнять другие задачи. Вы можете настроить очереди, работать с блоками данных для долгосрочных задач и устанавливать время для выполнения ваших задач. Это позволит вам лучше планировать прогресс в своей работе, более эффективно планировать время разработки и тратить свое драгоценное время на работу над более важными задачами, пока целевые группы Celery творят свое волшебство.

Эта статья написана Vadym Zakovinko Python Celery Guide

Была ли вам полезна эта статья?
[14 / 4.2]

Spread the love
Подписаться
Уведомление о
guest
0 Комментарий
Inline Feedbacks
View all comments