В прошлый раз мы говорили про фиксацию смещений в Apache Kafka. Сегодня поговорим про способы работы с потребителями в Kafka. Читайте далее про особенности и способы работы с Kafka-потребителями в распределенной среде.
Какие существуют способы работы с потребителем в Kafka
Kafka-потребитель (consumer) — это сервис (или группа брокеров), который отвечает за получение сообщений с данными, созданных и опубликованных продюсером. Получение сообщений, также, как и создание идет в распределенной среде в реальном времени. В Kafka существует 2 наиболее популярных метода работы с потребителем [1]:
- в исходном коде Java-приложения;
- из командной строки операционной системы.
Каждый из этих методов мы подробнее рассмотрим на практических примерах далее.
Способы создания потребителей в брокере Kafka: несколько практических примеров
Для того, чтобы начать создание потребителя в исходном коде приложения, необходимо задать базовую конфигурацию через коллекцию свойств Properties:
Properties props = new Properties();
//брокеры-потребители
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
//группа потребителей
props.put("group.id", "CountryCounter");
//десериализация ключей и значений
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
За создание Kafka-потребителя отвечает Java-класс KafkaConsumer, который принимает в качестве параметра своего конструктора созданный и вышеописанный экземпляр класса Properties, а затем с помощью метода subscribe() происходит подписка на топик [1]:
KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(Collections.singletonList("customerCountries"));
Для получения сообщений из топика используется метод poll(), который на вход принимает время ожидания блокировки буфера в случае недоступности данных. Это делается для того, чтобы при опросе потребителями сервера Kafka избежать потери или утечки данных (например, в другие разделы). По истечении времени ожидания потребитель «засыпает» и возвращает все полученные данные (записи) [1]:
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
log.debug("topic = %s, partition = %d, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
int updatedCount = 1;
if (custCountryMap.countainsValue(record.value())) {
updatedCount = custCountryMap.get(record.value()) + 1;}
custCountryMap.put(record.value(), updatedCount)
JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString(4)); }}
} finally {
consumer.close(); }
Второй способ предполагает создание Kafka-потребителя из командной строки (консольный потребитель). Это делается через специальную утилиту kafka-comsole-consumer.sh (kafka-comsole-consumer.bat для систем Windows) [1]:
kafka-console-consumer.sh --zookeeper zoo1.example.com:2181/kafka-cluster --topic my-topic
В вышеприведенной команде параметр —zookeeper отвечает за подключение к определенному кластеру, а --topic — для подключение к указанному топику. Для того, чтобы подключится к нескольким топикам одновременно, можно указать параметр --whitelist, указывающий на то, что будут использоваться все топики, соответствующие регулярному выражению [1]:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist my-topic|world-topic|your-topic'
Аналогично вышеприведенному примеру используется параметр --blacklist, отвечающий за использование всех топиков, кроме тех, которые соответствуют регулярному выражению.
Таким образом, благодаря широкому спектру возможностей работы с потребителем, брокер Kafka может обеспечивает разработчика удобным и безопасным интерфейсом для создания приложений, работающих с большими объемами данных. Это делает Apache Kafka универсальным и надежным средством для хранения и обмена большими потоками данных, что позволяет активно использовать этот брокер сообщений в задачах Data Science и разработке распределенных приложений. В следующей статье мы поговорим про виды отправок сообщений в Kafka.
Курс Apache Kafka: администрирование кластера
Код курса
KAFKA
Ближайшая дата курса
22 декабря, 2025
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.
Освоить Apache Kafka на профессиональном уровне в качестве администратора Big Data кластеров, разработчика распределенных приложений и прочих прикладных областях Data Science вы сможете на практических курсах по Kafka в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве:
- Администрирование кластера Kafka
- Apache Kafka для разработчиков
- Администрирование кластера Arenadata Streaming Kafka
Источники
- Н.Нархид, Г.Шапира, Т.Палино. Apache Kafka. Потоковая обработка и анализ данных



