В прошлый раз мы говорили про фиксацию смещений в Apache Kafka. Сегодня поговорим про способы работы с потребителями в Kafka. Читайте далее про особенности и способы работы с Kafka-потребителями в распределенной среде.
Какие существуют способы работы с потребителем в Kafka
Kafka-потребитель (consumer) — это сервис (или группа брокеров), который отвечает за получение сообщений с данными, созданных и опубликованных продюсером. Получение сообщений, также, как и создание идет в распределенной среде в реальном времени. В Kafka существует 2 наиболее популярных метода работы с потребителем [1]:
- в исходном коде Java-приложения;
- из командной строки операционной системы.
Каждый из этих методов мы подробнее рассмотрим на практических примерах далее.
Способы создания потребителей в брокере Kafka: несколько практических примеров
Для того, чтобы начать создание потребителя в исходном коде приложения, необходимо задать базовую конфигурацию через коллекцию свойств Properties
:
Properties props = new Properties(); //брокеры-потребители props.put("bootstrap.servers", "broker1:9092,broker2:9092"); //группа потребителей props.put("group.id", "CountryCounter"); //десериализация ключей и значений props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
За создание Kafka-потребителя отвечает Java-класс KafkaConsumer, который принимает в качестве параметра своего конструктора созданный и вышеописанный экземпляр класса Properties
, а затем с помощью метода subscribe()
происходит подписка на топик [1]:
KafkaConsumer consumer = new KafkaConsumer(props); consumer.subscribe(Collections.singletonList("customerCountries"));
Для получения сообщений из топика используется метод poll()
, который на вход принимает время ожидания блокировки буфера в случае недоступности данных. Это делается для того, чтобы при опросе потребителями сервера Kafka избежать потери или утечки данных (например, в другие разделы). По истечении времени ожидания потребитель «засыпает» и возвращает все полученные данные (записи) [1]:
try { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { log.debug("topic = %s, partition = %d, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); int updatedCount = 1; if (custCountryMap.countainsValue(record.value())) { updatedCount = custCountryMap.get(record.value()) + 1;} custCountryMap.put(record.value(), updatedCount) JSONObject json = new JSONObject(custCountryMap); System.out.println(json.toString(4)); }} } finally { consumer.close(); }
Второй способ предполагает создание Kafka-потребителя из командной строки (консольный потребитель). Это делается через специальную утилиту kafka-comsole-consumer.sh
(kafka-comsole-consumer.bat
для систем Windows) [1]:
kafka-console-consumer.sh --zookeeper zoo1.example.com:2181/kafka-cluster --topic my-topic
В вышеприведенной команде параметр —zookeeper отвечает за подключение к определенному кластеру, а --topic
— для подключение к указанному топику. Для того, чтобы подключится к нескольким топикам одновременно, можно указать параметр --whitelist
, указывающий на то, что будут использоваться все топики, соответствующие регулярному выражению [1]:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist my-topic|world-topic|your-topic'
Аналогично вышеприведенному примеру используется параметр --blacklist
, отвечающий за использование всех топиков, кроме тех, которые соответствуют регулярному выражению.
Таким образом, благодаря широкому спектру возможностей работы с потребителем, брокер Kafka может обеспечивает разработчика удобным и безопасным интерфейсом для создания приложений, работающих с большими объемами данных. Это делает Apache Kafka универсальным и надежным средством для хранения и обмена большими потоками данных, что позволяет активно использовать этот брокер сообщений в задачах Data Science и разработке распределенных приложений. В следующей статье мы поговорим про виды отправок сообщений в Kafka.
Администрирование кластера Kafka
Код курса
KAFKA
Ближайшая дата курса
12 февраля, 2025
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.
Освоить Apache Kafka на профессиональном уровне в качестве администратора Big Data кластеров, разработчика распределенных приложений и прочих прикладных областях Data Science вы сможете на практических курсах по Kafka в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве:
- Администрирование кластера Kafka
- Apache Kafka для разработчиков
- Администрирование кластера Arenadata Streaming Kafka
Источники
- Н.Нархид, Г.Шапира, Т.Палино. Apache Kafka. Потоковая обработка и анализ данных