Дизайн Kafka, потребитель (Consumer), позиция потребителя

Позиция потребителя

Отслеживание того, что было израсходовано, на удивление, является одной из ключевых точек производительности системы обмена сообщениями.

Большинство систем обмена сообщениями хранят метаданные о том, какие сообщения были получены брокером. То есть, когда сообщение передается потребителю, брокер либо немедленно записывает этот факт локально, либо может ждать подтверждения от потребителя. Это довольно интуитивный выбор, и действительно, для сервера с одной машиной не ясно, куда еще может перейти это состояние. Поскольку структуры данных, используемые для хранения во многих системах обмена сообщениями, плохо масштабируются, это также прагматичный выбор - поскольку брокер знает, что потребляется, он может немедленно удалить это, сохраняя небольшой размер данных.

Что, возможно, не очевидно, так это то, что заставить брокера и потребителя прийти к соглашению о том, что было потреблено, - нетривиальная проблема. Если брокер записывает сообщение как потребленное немедленно каждый раз, когда оно передается по сети, тогда, если потребитель не сможет обработать сообщение (скажем, из-за сбоя, истечения времени ожидания запроса или чего-то еще), это сообщение будет потеряно. Чтобы решить эту проблему, многие системы обмена сообщениями добавляют функцию подтверждения, что означает, что сообщения помечаются только как отправленные, а не использованные, когда они отправлены; брокер ожидает конкретного подтверждения от потребителя, чтобы записать сообщение как использованное. Эта стратегия решает проблему потери сообщений, но создает новые проблемы. Прежде всего, если потребитель обрабатывает сообщение, но не может отправить подтверждение, сообщение будет использовано дважды. Вторая проблема связана с производительностью, теперь брокер должен сохранять несколько состояний для каждого отдельного сообщения (сначала заблокировать его, чтобы оно не выдавалось во второй раз, а затем пометить его как окончательно потребленное, чтобы его можно было удалить). Необходимо решать сложные проблемы, например, что делать с сообщениями, которые отправляются, но никогда не подтверждаются.

Kafka поступает иначе. Тема разделена на набор полностью упорядоченных разделов, каждый из которых используется ровно одним потребителем в каждой группе подписанных потребителей в любой момент времени. Это означает, что позиция потребителя в каждом разделе - это всего лишь одно целое число, смещение следующего сообщения, которое нужно потребить. Это делает состояние потребленного очень маленьким, всего одно число для каждого раздела. Это состояние можно периодически проверять. Это делает эквивалент подтверждений сообщения очень дешевым.

У этого решения есть побочная выгода. Потребитель может намеренно вернуться к старому смещению и повторно использовать данные. Это нарушает общий контракт очереди, но оказывается важной функцией для многих потребителей. Например, если код потребителя содержит ошибку и обнаруживается после использования некоторых сообщений, потребитель может повторно использовать эти сообщения, как только ошибка будет исправлена.

Автономная загрузка данных

Масштабируемая персистентность позволяет потребителям, которые только периодически используют данные, например, пакетные загрузки данных, которые периодически загружают данные в автономную систему, такую как Hadoop или реляционное хранилище данных.

В случае Hadoop загрузка данных распараллеливается, разделяя нагрузку на отдельные задачи карты, по одной для каждой комбинации узел/тема/раздел, обеспечивая полный параллелизм в загрузке. Hadoop обеспечивает управление задачами, и задачи, которые завершились неудачей, могут быть перезапущены без опасности дублирования данных - они просто перезапускаются с исходного положения.

Статическое членство

Статическое членство направлено на повышение доступности потоковых приложений, групп потребителей и других приложений, построенных на основе протокола ребалансировки групп. Протокол перебалансировки полагается на координатора группы для распределения идентификаторов объектов среди членов группы. Эти сгенерированные идентификаторы эфемерны и изменятся, когда участники перезапустятся и снова присоединятся. Для потребительских приложений это «динамическое членство» может привести к переназначению большого процента задач другим экземплярам во время административных операций, таких как развертывание кода, обновления конфигурации и периодические перезапуски. Для приложений с большим состоянием перемешанным задачам требуется много времени, чтобы восстановить свое локальное состояние перед обработкой и привести к тому, что приложения будут частично или полностью недоступны. Обоснованный этим наблюдением протокол управления группами Kafka позволяет членам группы предоставлять постоянные идентификаторы объектов. Членство в группе остается неизменным на основе этих идентификаторов, поэтому перебалансировка не выполняется.

Если вы хотите использовать статическое членство,

  • Обновите кластер брокера и клиентские приложения до 2.3 или более поздней версии, а также убедитесь, что обновленные брокеры также используют inter.broker.protocol.version 2.3 или более поздней версии.
  • Задайте для конфигурации ConsumerConfig#GROUP_INSTANCE_ID_CONFIG уникальное значение для каждого экземпляра-потребителя в одной группе.
  • Для приложений Kafka Streams достаточно установить уникальный ConsumerConfig#GROUP_INSTANCE_ID_CONFIG для каждого экземпляра KafkaStreams, независимо от количества используемых потоков для экземпляра.

Если ваш брокер использует более старую версию, чем 2.3, но вы решили установить ConsumerConfig#GROUP_INSTANCE_ID_CONFIG на стороне клиента, приложение определит версию брокера и затем выдаст исключение UnsupportedException. Если вы случайно настроите повторяющиеся идентификаторы для разных экземпляров, механизм ограждения на стороне брокера сообщит вашему дублированному клиенту о немедленном завершении работы, вызвав исключение org.apache.kafka.common.errors.FencedInstanceIdException.


Читайте также:

Комментарии

Популярные сообщения из этого блога

Язык поисковых запросов в Graylog

Хэш-таблица: разрешение коллизий

Нормальные формы, пример нормализации в базе данных