Дизайн Kafka, семантика доставки сообщений

Здесь мы обсудим семантические гарантии, которые Kafka предоставляет между производителем и потребителем. Очевидно, что существует несколько возможных гарантий доставки сообщений, которые могут быть предоставлены:

  • Не более одного раза - сообщения могут быть потеряны, но никогда не будут доставлены повторно.
  • По крайней мере один раз - сообщения никогда не теряются, но могут быть доставлены повторно.
  • Ровно один раз - это то, чего на самом деле хотят люди, каждое сообщение доставляется только один раз.

Стоит отметить, что здесь можно выделить две проблемы: гарантии долговечности при публикации сообщения и гарантии при использовании сообщения.

Многие системы заявляют, что предоставляют семантику доставки «ровно один раз», но важно читать мелкий шрифт, большинство этих утверждений вводят в заблуждение (т.е. они не соответствуют случаю, когда потребители или производители могут потерпеть неудачу, случаям, когда есть несколько процессов потребителей или случаи, когда данные, записанные на диск, могут быть потеряны).

Семантика Kafka проста. При публикации сообщения оно становится «зафиксировано» в журнале. Как только опубликованное сообщение зафиксировано, оно не будет потеряно, пока один брокер, который реплицирует раздел, в который было записано это сообщение, остается «живым». Определение зафиксированного сообщения, активного раздела, а также описание типов сбоев, для которых будет попытка обработки, будут более подробно описаны в следующем посте. А пока давайте предположим, что это идеальный брокер без потерь, и попытаемся понять гарантии, предоставляемые производителю и потребителю. Если производитель пытается опубликовать сообщение и обнаруживает сетевую ошибку, он не может быть уверен, произошла ли эта ошибка до или после того, как сообщение было зафиксировано. Это похоже на семантику вставки в таблицу базы данных с автоматически сгенерированным ключом.

До 0.11.0.0, если производитель не смог получить ответ, указывающий, что сообщение было зафиксировано, у него не было другого выбора, кроме как повторно отправить сообщение. Это обеспечивает семантику доставки хотя бы один раз, поскольку сообщение может быть снова записано в журнал во время повторной отправки, если исходный запрос действительно был успешным. Начиная с версии 0.11.0.0, производитель Kafka также поддерживает параметр идемпотентной доставки, который гарантирует, что повторная отправка не приведет к дублированию записей в журнале. Для этого брокер присваивает каждому производителю идентификатор и дедуплицирует сообщения, используя порядковый номер, который отправляется производителем вместе с каждым сообщением. Также, начиная с версии 0.11.0.0, производитель поддерживает возможность отправки сообщений в несколько разделов темы с использованием семантики, подобной транзакции: т.е. либо все сообщения успешно записаны, либо ни одно из них не написано. Основным вариантом использования для этого является однократная обработка между темами Kafka (описано ниже).

Не все варианты использования требуют столь строгих гарантий. Для использования, чувствительного к задержке, производителю разрешается указать желаемый уровень долговечности. Если производитель указывает, что он хочет дождаться фиксации сообщения, это может занять порядка 10 мс. Однако производитель также может указать, что он хочет выполнить отправку полностью асинхронно или что он хочет ждать только до тех пор, пока лидер (но не обязательно последователи) получит сообщение.

Теперь опишем семантику с точки зрения потребителя. Все реплики имеют одинаковый журнал с одинаковыми смещениями. Потребитель контролирует свое положение в этом журнале. Если бы потребитель никогда аварийно не завершал работу, он мог бы просто сохранить эту позицию в памяти, но если потребитель отказывает, и мы хотим, чтобы этот раздел темы был передан другому процессу, новый процесс должен будет выбрать подходящую позицию, из которой следует начать обработку. Допустим, потребитель читает какие-то сообщения - у него есть несколько вариантов обработки сообщений и обновления своей позиции.

Он может читать сообщения, затем сохранять свою позицию в журнале и, наконец, обрабатывать сообщения. В этом случае существует вероятность того, что процесс-потребитель выйдет из строя после сохранения своей позиции, но до сохранения вывода обработки сообщения. В этом случае процесс, который взял на себя обработку, начнется с сохраненной позиции, даже если несколько сообщений до этой позиции не были обработаны. Это соответствует семантике «не более одного раза», поскольку в случае отказа потребителя сообщения могут не обрабатываться.

Он может читать сообщения, обрабатывать сообщения и, наконец, сохранять свою позицию. В этом случае существует вероятность того, что процесс-потребитель выйдет из строя после обработки сообщений, но до сохранения своей позиции. В этом случае, когда новый процесс берет на себя первые несколько сообщений, которые он получает, уже будут обработаны. Это соответствует семантике «хотя бы один раз» в случае отказа потребителя. Во многих случаях сообщения имеют первичный ключ, поэтому обновления идемпотентны (получение одного и того же сообщения дважды просто перезаписывает запись другой собственной копией).

Так что насчет семантики ровно один раз (то есть того, что вам действительно нужно)? При потреблении из темы Kafka и производстве для другой темы (как в приложении Kafka Streams) мы можем использовать новые возможности транзакционного производителя в 0.11.0.0. Позиция потребителя сохраняется в виде сообщения в теме, поэтому мы можем записать смещение в Kafka в той же транзакции, что и темы вывода, получающие обработанные данные. Если транзакция прервана, позиция потребителя вернется к своему старому значению, и полученные данные по темам вывода не будут видны другим потребителям, в зависимости от их «уровня изоляции». На уровне изоляции по умолчанию «read_uncommitted» все сообщения видны потребителям, даже если они были частью прерванной транзакции, но в «read_committed» потребитель будет возвращать только сообщения от транзакций, которые были зафиксированы (и любые сообщения, которые не были частью транзакции).

При записи во внешнюю систему ограничение заключается в необходимости согласовывать позицию потребителя с тем, что фактически сохраняется в качестве вывода. Классическим способом достижения этого было бы введение двухфазной фиксации между сохранением позиции потребителя и хранением вывода потребителей. Но с этим можно справиться проще и в целом, позволив потребителю хранить свое смещение в том же месте, что и его вывод. Это лучше, потому что многие системы вывода, в которые потребитель может захотеть писать, не будут поддерживать двухфазную фиксацию. В качестве примера рассмотрим коннектор Kafka Connect, который заполняет данные в HDFS вместе со смещениями данных, которые он считывает, так что гарантируется, что либо данные, либо смещения обновляются, либо нет.

Таким образом, Kafka поддерживает единовременную доставку в Kafka Streams, а транзакционный производитель / потребитель может обычно использоваться для обеспечения единовременной доставки при передаче и обработке данных между темами Kafka. Ровно-однократная доставка для других систем назначения обычно требует сотрудничества с такими системами, но Kafka предоставляет компенсацию, которая делает реализацию этого возможной. В противном случае Kafka гарантирует доставку как минимум один раз по умолчанию и позволяет пользователю реализовать доставку не более одного раза, отключив повторные попытки на производителе и зафиксировав смещения в потребителе перед обработкой пакета сообщений.


Читайте также:

Комментарии

Популярные сообщения из этого блога

Язык поисковых запросов в Graylog

Хэш-таблица: разрешение коллизий

Нормальные формы, пример нормализации в базе данных