Двойная проверка блокировки с Django ORM

Spread the love

Оригинальная статья: Luke PlantDouble-checked locking with Django ORM

Шаблон блокировки с двойной проверкой может быть полезен, когда:

  1. Вам необходимо ограничить доступ к определенному ресурсу, чтобы параллельные процессы не могли работать с ним одновременно.
  2. Другие доступные вам схемы блокировки сложны или медленные.

Этот пост о том, как можно реализовать этот шаблон в Django, используя функции ORM и блокировки на уровне базы данных. Шаблон может использоваться с любой другой ORM, но я проверил его только с Django, и подтвердил, что он работает, как и ожидается, используя PostgreSQL.

Задача

Допустим у вас есть некоторые записи в базе данных, которые требуют какой-то «обработки», но вы должны быть уверены, что они обрабатываются только один раз. Например, в вашей системе электронной коммерции вам нужно отправлять электронные письма своим пользователям, когда их заказ отправлен, но вам нужно что бы письмо отправлялось бы только один раз. Для этого в модели у вас есть что-то вроде этого:

class Order(models.Model):
    shipped_at = models.DateTimeField(null=True)
    shipped_email_sent = models.BooleanField(default=False)


Отправка электронной почты может занять некоторое время или потерпеть неудачу, поэтому у вас есть какой-то фоновый процесс для этого. Вы можете использовать очередь заданий, такую как Celery, чтобы реализовать это и гарантировать, что только один процесс за раз пытается выполнить отправку, но, возможно, у вас нет такой очереди, или, возможно, вы не доверяете ее реализации и что то вроде этого и вам нужно самостоятельно реализовать эту логику так что бы различные параллельные задачи, которые могут блокировать одни и те же строки, не мешали друг другу.

Мы используем shipped_email_sent для отслеживания того, отправили ли мы электронные письма или нет, но даже если мы отфильтруем их, параллельные процессы, запущенные в один и тот же момент, могут выполнить отправку почты дважды, из-за задержки между запросом и обновлением записей. Мы могли бы использовать select_for_update(), но хотим избежать блокировки этой важной таблицы больше, чем это абсолютно необходимо. Что нам делать?

Решение

Я представлю свое решение, а потом поясню как оно работает.

def send_pending_order_shipped_emails():
    orders_to_email = Order.objects.filter(
        shipped_at__isnull=False,
        shipped_email_sent=False,
    )

    for order in orders_to_email:
        with transaction.atomic():
            for order in (orders_to_email
                          .filter(id=order.id)
                          .select_for_update(of='self', skip_locked=True)):
                send_shipped_email(order)
                order.shipped_email_sent = True
                order.save()


Объяснение

  1. Весь этот блок кода должен выполняться вне атомарного блока.
  2. Обратите внимание, что внешний блок orders_to_email находится вне транзакции и не использует блокировку. Таким образом, если этот запрос не возвращает результатов, весь процесс выполняется только один запрос на чтение без блокировки и затем завершается. Это хорошо с точки зрения производительности и позволяет избежать конфликтов в БД.
  3. Если есть элементы для обработки, мы запускаем атомарный блок транзакции with transaction.atomic().
  4. Поскольку первый запрос был за пределами транзакции, другой процесс мог опередить нас, поэтому мы должны сделать еще один запрос, чтобы убедиться, что запись все еще соответствует критериям – так называемый внутренний запрос.
  5. На этот раз мы используем select_for_update, чтобы никакой другой процесс не смог одновременно использовать этот блок с этой строкой.
  6. Мы используем skip_locked, так что если какой-то другой процесс попытаться использовать это, мы просто пропустим запись и попробуем следующую.
  7. И в конце мы устанавливаем флаг, который гарантирует, что эта запись больше не будет найдена запросом orders_to_email.

В результате мы гарантируем, что каждая запись обрабатывается не более одного раза.

Примечание

  • Я проверил это только с PostgreSQL и уровнем изоляции по умолчанию READ COMMITTED.
  • Обратите внимание на использование Django QuerySets: мы определяем правильно отфильтрованный запрос один раз, а затем повторно используем цепочку для его многократного выполнения. Мы полагаемся на тот факт, что дополнительный фильтр и т. п. создают новый QuerySet, который выполняет новый запрос, когда мы зациклим его вторым циклом for.
  • Убедитесь, что вы читали примечания для select_for_update и используете соответствующий параметр.
  • Мы гарантируем «максимум один раз», но это дает возможность нулевого времени (zero times). Если у вас есть другие процессы, которые также блокируют эти строки в таблице (а не только несколько копий этого кода, выполняющего этот же код), то флаг skip_locked = True означает, что этот процесс может завершиться без обработки всех строк и без обработки любых ошибок. Другими словами, этот код предполагает, что «все остальное важнее меня»(everyone else is more important than me). Я думаю, что вы могли бы изменить это, используя взамен select_for_update (nowait = True), в сочетании с соответствующим try/except/looping. Для проверки при многократных попытках вы могли бы:
    1. Оставить это для следующей попытки вашего фонового процесса, или
    2. Сделать некоторый подсчет внутри двух циклов, и если внутренний цикл окажется коротким, мы знаем, что по какой-то причине мы пропустили некоторые строки (это могло быть потому, что какой-то другой процесс уже обработал строку или какой-то другой заблокировал строки по другой причине). Если это так, рекурсивно вызывайте send_pending_order_shipped_emails. Эта рекурсия обязательно прекратится, когда запрос orders_to_email окажется пустым, или когда нам удастся обработать все в нем.
  • Примечание по производительности: мы выполняем N + 1 запросов на чтение, чтобы обработать все ожидающие записи. Возможно, вам нужно знать об этом, по сравнению с выполнением 1 read и 1 write, если мы сделали их все вместе и использовали какой-то другой механизм, чтобы убедиться, что у нас не было нескольких конкурирующих процессов.
  • Если у вас есть несколько процессов, участвующих в обработке ожидающих записей, приведенный выше код, естественно, распределит работу между ними примерно поровну – вы получите бесплатное распределение работы.
  • Я пытался найти способы инкапсулировать этот шаблон более аккуратно в Django / Python, например, с помощью double_checked_locking (queryset), но до сих пор не было удачи в создании чего-то существенно лучшего (например, в django-mailer, который работает нормально, но имеет неудобный вида шаблон использования). Я думаю, что лучше использовать его как есть каждый раз, особенно учитывая некоторые из вышеперечисленных моментов.
  • Если ваша обработка идемпотентна, или вы можете организовать это, тогда вам, возможно, удастся обойтись без какой-либо блокировки, и вам может не понадобиться этот шаблон. Возможно, вам нужно быть осторожным, чтобы использовать QuerySet.update вместо Model.save(), чтобы избежать условий гонки с перезаписью данных и т. п. (Спасибо Haki)

Что-нибудь еще? Мое понимание того, как работают уровни изоляции и транзакции PostgreSQL, а также мои эксперименты (с использованием хорошего IL ‘time.sleep ()), кажется, подтверждают это работает в соответствии с описанием и примечаниями выше, но если я что-то пропустил, пожалуйста, добавьте комментарий !

Обновление

  • Adam Johnson предпринял попытку рефакторинга этой идеи. Это не проверено, но похоже, что это будет работать. Я бы по-прежнему рекомендовал прочитать все предостережения, приведенные выше, и адаптировать их для собственного использования, а не просто использовать их – здесь есть много тонкостей, как уже отмечалось.
  • Другие обновления в Twitter.
Была ли вам полезна эта статья?
[0 / 0]

Spread the love
Подписаться
Уведомление о
guest
0 Комментарий
Inline Feedbacks
View all comments