Как устроена конфигурация Kafka-потребителя

kafka это, курс kafka spark, apache kafka для начинающих, apache kafka, курсы администраторов spark, apache kafka для начинающих, Big Data, Data Science, kafka streaming, Kafka, брокер kafka, avro

В прошлый раз мы говорили про механизм смещений и особенности его работы в Kafka. Сегодня поговорим про особенности настройки конфигурации Kafka-потребителя. Читайте далее про настройку потребителя в распределенной среде Apache Kafka.

Из чего состоит конфигурация Kafka-потребителя: основные свойства для настройки

Kafka-потребитель (Kafka consumer) — это брокер (или группа брокеров), который отвечает за получение сообщений Big Data, созданных (или опубликованных) продюсером. Потребитель, так же, как и продюсер работает в распределенной среде Kafka. В качестве примера рассмотрим настройку базовой конфигурации Kafka-потребителя:

private fun kafkaConsumer(): KafkaConsumer<String, String> {
    val properties = Properties()
    properties["bootstrap.servers"] = "localhost:9092"
    properties["group.id"] = "kafka-example"
    properties["key.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
    properties["value.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"

    return KafkaConsumer(properties)
}

Как видно из вышерассмотренного фрагмента кода, для настройки базовой конфигурации потребителя в Kafka используется коллекция Properties, которая включает в себя следующие элементы:

  • bootstrap.servers — список URL-адресов серверов, с которыми должен связаться потребитель для получения конфигурации Kafka-кластера;
  • group.id — идентификатор группы потребителей, который отвечает за создание группы для объединения нескольких потребителей. Рекомендуется указывать одинаковое значение для нескольких (около 5) потребителей, чтобы сбалансировать рабочую нагрузку между ними;
  • key.deserializer — каждая запись, полученная от брокера Kafka представляет собой набор байтов, поэтому их необходимо десериализолвать (превратить из байтов в объект). Для десериализации ключей используется класс StringDeserializer, так как все объекты при десериализации принимают строковый вид (в кодитровке UTF-8);
  • value.deserializer — класс-десериализатор, использующийся для десериализации значений записей, генерируемых продюсером для отправки брокерам в распределенном кластере. Значения, также, как и ключи принимают строковый тип данных;

Особенности настройки потребителя в Kafka: несколько практических примеров

Для своевременного получения сообщений потребитель использует специальный координатор опроса группы на предмет обновлений, который отвечает за уведомление об обновлениях (новых сообщениях) в кластере. Для того, чтобы координатор предоставлял сведения об обновлениях, необходимо делегировать ему данные, имеющиеся у потребителей в группе для того, чтобы координатор сравнивал их с имеющимися данными в кластере и оповещал о новых данных (которых нет у потребителей в группе):

boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
    if (coordinator != null && !coordinator.poll(timer)) {
        return false;
    }

    return updateFetchPositions(timer);
}

Метод updateFetchPositions() отвечает за обновление позиции смещений полученных данных. В качестве параметра он принимает объект класса Timer. Следовательно, метод updateFetchPositions(), также как и метод poll() ожидает заданное количество времени, и если в течение этого времени произойдёт какое-либо изменение в смещениях, их позиция обновится.

Для получения Big Data сообщений потребителю также необходимо знать, что группа активна и не произошло разрыва соединения. Для этого необходимо реализовать метод, который будет проверять активность группы, и, если соединение потеряно, восстанавливать его и присоединяться к группе заново:

boolean ensureActiveGroup(final Timer timer) {
    if (!ensureCoordinatorReady(timer)) {
        return false;
    }


    startHeartbeatThreadIfNeeded();
    return joinGroupIfNeeded(timer);
}

Из вышерассмотренного фрагмента кода видно, что при разрыве соединения с группой, потребитель запускает метод startHeartbeatThreadIfNeeded() для того, чтобы послать уведомление в кластер о готовности (о том, что он активен), а затем с помощью метода joinGroupIfNeeded() ждать повторного присоединения к группе.

Таким образом, благодаря надежной архитектуре консьюмера, брокер Kafka может настраивать механизм получения сообщений с высокой степенью безопасности и с минимальными рисками. Это делает Apache Kafka универсальным и надежным средством для хранения и обмена большими потоками данных, что позволяет активно использовать этот брокер сообщений в задачах Data Science и разработке распределенных приложений.

Администрирование кластера Kafka

Код курса
KAFKA
Ближайшая дата курса
11 июля, 2022
Длительность обучения
24 ак.часов
Стоимость обучения
60 000 руб.

Освоить Apache Kafka на профессиональном уровне в качестве администратора Big Data кластеров, разработчика распределенных приложений и прочих прикладных областях Data Science вы сможете на практических курсах по Kafka в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве:

Записаться на курс

Смотреть раcписание

Источники

  1. https://chrzaszcz.dev/2019/06/16/kafka-consumer-poll/

Добавить комментарий

Поиск по сайту