Двойная проверка блокировки с Django ORM
Оригинальная статья: Luke Plant — Double-checked locking with Django ORM
Шаблон блокировки с двойной проверкой может быть полезен, когда:
- Вам необходимо ограничить доступ к определенному ресурсу, чтобы параллельные процессы не могли работать с ним одновременно.
- Другие доступные вам схемы блокировки сложны или медленные.
Этот пост о том, как можно реализовать этот шаблон в Django, используя функции ORM и блокировки на уровне базы данных. Шаблон может использоваться с любой другой ORM, но я проверил его только с Django, и подтвердил, что он работает, как и ожидается, используя PostgreSQL.
Задача
Допустим у вас есть некоторые записи в базе данных, которые требуют какой-то «обработки», но вы должны быть уверены, что они обрабатываются только один раз. Например, в вашей системе электронной коммерции вам нужно отправлять электронные письма своим пользователям, когда их заказ отправлен, но вам нужно что бы письмо отправлялось бы только один раз. Для этого в модели у вас есть что-то вроде этого:
class Order(models.Model): shipped_at = models.DateTimeField(null=True) shipped_email_sent = models.BooleanField(default=False)
Отправка электронной почты может занять некоторое время или потерпеть неудачу, поэтому у вас есть какой-то фоновый процесс для этого. Вы можете использовать очередь заданий, такую как Celery, чтобы реализовать это и гарантировать, что только один процесс за раз пытается выполнить отправку, но, возможно, у вас нет такой очереди, или, возможно, вы не доверяете ее реализации и что то вроде этого и вам нужно самостоятельно реализовать эту логику так что бы различные параллельные задачи, которые могут блокировать одни и те же строки, не мешали друг другу.
Мы используем shipped_email_sent для отслеживания того, отправили ли мы электронные письма или нет, но даже если мы отфильтруем их, параллельные процессы, запущенные в один и тот же момент, могут выполнить отправку почты дважды, из-за задержки между запросом и обновлением записей. Мы могли бы использовать select_for_update(), но хотим избежать блокировки этой важной таблицы больше, чем это абсолютно необходимо. Что нам делать?
Решение
Я представлю свое решение, а потом поясню как оно работает.
def send_pending_order_shipped_emails(): orders_to_email = Order.objects.filter( shipped_at__isnull=False, shipped_email_sent=False, ) for order in orders_to_email: with transaction.atomic(): for order in (orders_to_email .filter(id=order.id) .select_for_update(of='self', skip_locked=True)): send_shipped_email(order) order.shipped_email_sent = True order.save()
Объяснение
- Весь этот блок кода должен выполняться вне атомарного блока.
- Обратите внимание, что внешний блок orders_to_email находится вне транзакции и не использует блокировку. Таким образом, если этот запрос не возвращает результатов, весь процесс выполняется только один запрос на чтение без блокировки и затем завершается. Это хорошо с точки зрения производительности и позволяет избежать конфликтов в БД.
- Если есть элементы для обработки, мы запускаем атомарный блок транзакции with transaction.atomic().
- Поскольку первый запрос был за пределами транзакции, другой процесс мог опередить нас, поэтому мы должны сделать еще один запрос, чтобы убедиться, что запись все еще соответствует критериям — так называемый внутренний запрос.
- На этот раз мы используем select_for_update, чтобы никакой другой процесс не смог одновременно использовать этот блок с этой строкой.
- Мы используем skip_locked, так что если какой-то другой процесс попытаться использовать это, мы просто пропустим запись и попробуем следующую.
- И в конце мы устанавливаем флаг, который гарантирует, что эта запись больше не будет найдена запросом orders_to_email.
В результате мы гарантируем, что каждая запись обрабатывается не более одного раза.
Примечание
- Я проверил это только с PostgreSQL и уровнем изоляции по умолчанию READ COMMITTED.
- Обратите внимание на использование Django QuerySets: мы определяем правильно отфильтрованный запрос один раз, а затем повторно используем цепочку для его многократного выполнения. Мы полагаемся на тот факт, что дополнительный фильтр и т. п. создают новый QuerySet, который выполняет новый запрос, когда мы зациклим его вторым циклом for.
- Убедитесь, что вы читали примечания для select_for_update и используете соответствующий параметр.
- Мы гарантируем «максимум один раз», но это дает возможность нулевого времени (zero times). Если у вас есть другие процессы, которые также блокируют эти строки в таблице (а не только несколько копий этого кода, выполняющего этот же код), то флаг skip_locked = True означает, что этот процесс может завершиться без обработки всех строк и без обработки любых ошибок. Другими словами, этот код предполагает, что «все остальное важнее меня»(everyone else is more important than me). Я думаю, что вы могли бы изменить это, используя взамен select_for_update (nowait = True), в сочетании с соответствующим try/except/looping. Для проверки при многократных попытках вы могли бы:
- Оставить это для следующей попытки вашего фонового процесса, или
- Сделать некоторый подсчет внутри двух циклов, и если внутренний цикл окажется коротким, мы знаем, что по какой-то причине мы пропустили некоторые строки (это могло быть потому, что какой-то другой процесс уже обработал строку или какой-то другой заблокировал строки по другой причине). Если это так, рекурсивно вызывайте send_pending_order_shipped_emails. Эта рекурсия обязательно прекратится, когда запрос orders_to_email окажется пустым, или когда нам удастся обработать все в нем.
- Примечание по производительности: мы выполняем N + 1 запросов на чтение, чтобы обработать все ожидающие записи. Возможно, вам нужно знать об этом, по сравнению с выполнением 1 read и 1 write, если мы сделали их все вместе и использовали какой-то другой механизм, чтобы убедиться, что у нас не было нескольких конкурирующих процессов.
- Если у вас есть несколько процессов, участвующих в обработке ожидающих записей, приведенный выше код, естественно, распределит работу между ними примерно поровну — вы получите бесплатное распределение работы.
- Я пытался найти способы инкапсулировать этот шаблон более аккуратно в Django / Python, например, с помощью double_checked_locking (queryset), но до сих пор не было удачи в создании чего-то существенно лучшего (например, в django-mailer, который работает нормально, но имеет неудобный вида шаблон использования). Я думаю, что лучше использовать его как есть каждый раз, особенно учитывая некоторые из вышеперечисленных моментов.
- Если ваша обработка идемпотентна, или вы можете организовать это, тогда вам, возможно, удастся обойтись без какой-либо блокировки, и вам может не понадобиться этот шаблон. Возможно, вам нужно быть осторожным, чтобы использовать QuerySet.update вместо Model.save(), чтобы избежать условий гонки с перезаписью данных и т. п. (Спасибо Haki)
Что-нибудь еще? Мое понимание того, как работают уровни изоляции и транзакции PostgreSQL, а также мои эксперименты (с использованием хорошего IL ‘time.sleep ()), кажется, подтверждают это работает в соответствии с описанием и примечаниями выше, но если я что-то пропустил, пожалуйста, добавьте комментарий !
Обновление
- Adam Johnson предпринял попытку рефакторинга этой идеи. Это не проверено, но похоже, что это будет работать. Я бы по-прежнему рекомендовал прочитать все предостережения, приведенные выше, и адаптировать их для собственного использования, а не просто использовать их — здесь есть много тонкостей, как уже отмечалось.
- Другие обновления в Twitter.