Способы работы с потребителем в распределенном брокере Apache Kafka

курс kafka spark, курс kafka spark, курсы администрирования kafka, курс kafka spark, apache kafka для начинающих, kafka это, ksql, kafka streams, обучение kafka, курсы потоковой обработки kafka, курс kafka spark, Big Data, apache kafka, курсы администраторов spark, apache kafka для начинающих, Big Data, Data Science, kafka streaming, Kafka, брокер kafka, avro, курс kafka spark, курс kafka spark, курсы администрирования kafka, курс kafka spark, apache kafka для начинающих, kafka это, ksql, kafka streams, обучение kafka, курсы потоковой обработки kafka, курс kafka spark, Big Data, курсы kafka rest, apache kafka для начинающих, kafka это, big data курсы, kafka streams, курс kafka spark, курсы по kafka, курсы big data москва, курс kafka spark, apache kafka для начинающих, apache kafka, курсы администраторов spark, apache kafka для начинающих, Big Data, Data Science, kafka streaming, Kafka, брокер kafka, avro

В прошлый раз мы говорили про фиксацию смещений в Apache Kafka. Сегодня поговорим про способы работы с потребителями в Kafka. Читайте далее про особенности и способы работы с Kafka-потребителями в распределенной среде.

Какие существуют способы работы с потребителем в Kafka

Kafka-потребитель (consumer) — это сервис (или группа брокеров), который отвечает за получение сообщений с данными, созданных и опубликованных продюсером. Получение сообщений, также, как и создание идет в распределенной среде в реальном времени. В Kafka существует 2 наиболее популярных метода работы с потребителем [1]:

  • в исходном коде Java-приложения;
  • из командной строки операционной системы.

Каждый из этих методов мы подробнее рассмотрим на практических примерах далее.

Способы создания потребителей в брокере Kafka: несколько практических примеров

Для того, чтобы начать создание потребителя в исходном коде приложения, необходимо задать базовую конфигурацию через коллекцию свойств Properties:

Properties props = new Properties();
//брокеры-потребители
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
//группа потребителей
props.put("group.id", "CountryCounter");
//десериализация ключей и значений
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

За создание Kafka-потребителя отвечает Java-класс KafkaConsumer, который принимает в качестве параметра своего конструктора созданный и вышеописанный экземпляр класса Properties, а затем с помощью метода subscribe() происходит подписка на топик [1]:

KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(Collections.singletonList("customerCountries"));

Для получения сообщений из топика используется метод poll(), который на вход принимает время ожидания блокировки буфера в случае недоступности данных. Это делается для того, чтобы при опросе потребителями сервера Kafka избежать потери или утечки данных (например, в другие разделы). По истечении времени ожидания потребитель «засыпает» и возвращает все полученные данные (записи) [1]:

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
log.debug("topic = %s, partition = %d, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
int updatedCount = 1;
if (custCountryMap.countainsValue(record.value())) {
updatedCount = custCountryMap.get(record.value()) + 1;}
custCountryMap.put(record.value(), updatedCount)
JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString(4)); }}
} finally {
consumer.close(); }

Второй способ предполагает создание Kafka-потребителя из командной строки (консольный потребитель). Это делается через специальную утилиту kafka-comsole-consumer.sh (kafka-comsole-consumer.bat для систем Windows) [1]:

kafka-console-consumer.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --topic my-topic

В вышеприведенной команде параметр —zookeeper отвечает за подключение к определенному кластеру, а --topic — для подключение к указанному топику. Для того, чтобы подключится к нескольким топикам одновременно, можно указать параметр --whitelist, указывающий на то, что будут использоваться все топики, соответствующие регулярному выражению [1]:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist my-topic|world-topic|your-topic'

Аналогично вышеприведенному примеру используется параметр --blacklist, отвечающий за использование всех топиков, кроме тех, которые соответствуют регулярному выражению.

Таким образом, благодаря широкому спектру возможностей работы с потребителем, брокер Kafka может обеспечивает разработчика удобным и безопасным интерфейсом для создания приложений, работающих с большими объемами данных. Это делает Apache Kafka универсальным и надежным средством для хранения и обмена большими потоками данных, что позволяет активно использовать этот брокер сообщений в задачах Data Science и разработке распределенных приложений. В следующей статье мы поговорим про виды отправок сообщений в Kafka.

Администрирование кластера Kafka

Код курса
KAFKA
Ближайшая дата курса
12 февраля, 2025
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.

Освоить Apache Kafka на профессиональном уровне в качестве администратора Big Data кластеров, разработчика распределенных приложений и прочих прикладных областях Data Science вы сможете на практических курсах по Kafka в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве:

Записаться на курс

Смотреть раcписание

Источники

  1. Н.Нархид, Г.Шапира, Т.Палино. Apache Kafka. Потоковая обработка и анализ данных

Добавить комментарий

Поиск по сайту