В прошлый раз мы говорили про механизм смещений и особенности его работы в Kafka. Сегодня поговорим про особенности настройки конфигурации Kafka-потребителя. Читайте далее про настройку потребителя в распределенной среде Apache Kafka.
Из чего состоит конфигурация Kafka-потребителя: основные свойства для настройки
Kafka-потребитель (Kafka consumer) — это брокер (или группа брокеров), который отвечает за получение сообщений Big Data, созданных (или опубликованных) продюсером. Потребитель, так же, как и продюсер работает в распределенной среде Kafka. В качестве примера рассмотрим настройку базовой конфигурации Kafka-потребителя:
private fun kafkaConsumer(): KafkaConsumer<String, String> { val properties = Properties() properties["bootstrap.servers"] = "localhost:9092" properties["group.id"] = "kafka-example" properties["key.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer" properties["value.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer" return KafkaConsumer(properties) }
Как видно из вышерассмотренного фрагмента кода, для настройки базовой конфигурации потребителя в Kafka используется коллекция Properties
, которая включает в себя следующие элементы:
bootstrap.servers
— список URL-адресов серверов, с которыми должен связаться потребитель для получения конфигурации Kafka-кластера;group.id
— идентификатор группы потребителей, который отвечает за создание группы для объединения нескольких потребителей. Рекомендуется указывать одинаковое значение для нескольких (около 5) потребителей, чтобы сбалансировать рабочую нагрузку между ними;key.deserializer
— каждая запись, полученная от брокера Kafka представляет собой набор байтов, поэтому их необходимо десериализолвать (превратить из байтов в объект). Для десериализации ключей используется класс StringDeserializer, так как все объекты при десериализации принимают строковый вид (в кодитровке UTF-8);value.deserializer
— класс-десериализатор, использующийся для десериализации значений записей, генерируемых продюсером для отправки брокерам в распределенном кластере. Значения, также, как и ключи принимают строковый тип данных;
Особенности настройки потребителя в Kafka: несколько практических примеров
Для своевременного получения сообщений потребитель использует специальный координатор опроса группы на предмет обновлений, который отвечает за уведомление об обновлениях (новых сообщениях) в кластере. Для того, чтобы координатор предоставлял сведения об обновлениях, необходимо делегировать ему данные, имеющиеся у потребителей в группе для того, чтобы координатор сравнивал их с имеющимися данными в кластере и оповещал о новых данных (которых нет у потребителей в группе):
boolean updateAssignmentMetadataIfNeeded(final Timer timer) { if (coordinator != null && !coordinator.poll(timer)) { return false; } return updateFetchPositions(timer); }
Метод updateFetchPositions()
отвечает за обновление позиции смещений полученных данных. В качестве параметра он принимает объект класса Timer. Следовательно, метод updateFetchPositions()
, также как и метод poll()
ожидает заданное количество времени, и если в течение этого времени произойдёт какое-либо изменение в смещениях, их позиция обновится.
Для получения Big Data сообщений потребителю также необходимо знать, что группа активна и не произошло разрыва соединения. Для этого необходимо реализовать метод, который будет проверять активность группы, и, если соединение потеряно, восстанавливать его и присоединяться к группе заново:
boolean ensureActiveGroup(final Timer timer) { if (!ensureCoordinatorReady(timer)) { return false; } startHeartbeatThreadIfNeeded(); return joinGroupIfNeeded(timer); }
Из вышерассмотренного фрагмента кода видно, что при разрыве соединения с группой, потребитель запускает метод startHeartbeatThreadIfNeeded()
для того, чтобы послать уведомление в кластер о готовности (о том, что он активен), а затем с помощью метода joinGroupIfNeeded()
ждать повторного присоединения к группе.
Таким образом, благодаря надежной архитектуре консьюмера, брокер Kafka может настраивать механизм получения сообщений с высокой степенью безопасности и с минимальными рисками. Это делает Apache Kafka универсальным и надежным средством для хранения и обмена большими потоками данных, что позволяет активно использовать этот брокер сообщений в задачах Data Science и разработке распределенных приложений.
Администрирование кластера Kafka
Код курса
KAFKA
Ближайшая дата курса
9 декабря, 2024
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.
Освоить Apache Kafka на профессиональном уровне в качестве администратора Big Data кластеров, разработчика распределенных приложений и прочих прикладных областях Data Science вы сможете на практических курсах по Kafka в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве:
- Администрирование кластера Kafka
- Apache Kafka для разработчиков
- Администрирование кластера Arenadata Streaming Kafka
Источники