В прошлый раз мы говорил про виды отправок сообщений в Apache Kafka. Сегодня поговорим про особенности создания потребителя в Kafka. Читайте далее про особенности структуры Kafka-потребителей, благодаря которым брокер Kafka имеет весьма мощный механизм получения Big Data сообщений в режиме реального времени.
Из чего состоит потребитель Kafka: основные элементы архитектуры
Kafka-потребитель (consumer) — это сервис (или группа брокеров), который отвечает за получение Big Data сообщений, созданных продюсером. Получение сообщений, также, как и создание идет в распределенной среде в реальном времени. Потребитель, как и продюсер, имеет 3 базовых элемента, составляющих его архитектуру:
- servers — список брокеров для соединения с кластером Kafka. Этот элемент также является базовой составляющей архитектуры Kafka-продюсеров;
- deserializer — класс-десериализатор, применяемый для десериализации (восстановление структуры объекта из байтового массива) ключей сообщений. В качестве десериализатора используется класс StringDeserializer, так как все при десериализации объекты автоматически принимают стрококвый тип (даже если до этого они были другого типа);
- deserializer — класс-десериализатор, использующийся для десериализации значений записей, генерируемых продюсером для отправки брокерам в распределенном кластере. Значения, также, как и ключи принимают строковый тип данных.
- group.id — свойство, отвечающее за создание группы потребителей для одного из экземпляров класса потребителя
Особенности работы Kafka-потребителя: несколько практических примеров
Как и в случае с продюсером, для того, чтобы начать работу с потребителем, необходимо настроить базовую конфигурацию. За это отвечает класс Properties
, который позволяет создать коллекцию свойств, в которой задаются значения вышеописанных элементов архитектуры [1]:
Properties props = new Properties(); //брокеры-потребители props.put("bootstrap.servers", "broker1:9092,broker2:9092"); //группа потребителей props.put("group.id", "CountryCounter"); //десериализация ключей и значений props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
За создание Kafka-потребителя отвечает класс KafkaConsumer
, который принимает в качестве параметра своего конструктора созданный экземпляр класса Properties
[1]:
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
Для того, чтобы начать получать сообщения, консьюмер должен быть подписан на топик, в который эти сообщения будут поступать от продюсера. За подписку на топик отвечает метод subscribe()
[1]:
consumer.subscribe(Collections.singletonList("customerCountries"));
Для получения сообщений из топика используется метод poll()
, который в качестве параметра принимает время ожидания блокировки буфера в случае недоступности данных. Это делается для того, чтобы при опросе потребителями сервера Kafka избежать потери или утечки данных (например, в другие разделы). По истечении времени ожидания потребитель «засыпает» и возвращает все полученные данные (записи). Следующий код на языке Java отвечает за создание бесконечного цикла опроса для получения сообщений потребителем [1]:
try { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { log.debug("topic = %s, partition = %d, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); int updatedCount = 1; if (custCountryMap.countainsValue(record.value())) { updatedCount = custCountryMap.get(record.value()) + 1;} custCountryMap.put(record.value(), updatedCount) JSONObject json = new JSONObject(custCountryMap); System.out.println(json.toString(4)); }} } finally { consumer.close(); }
Таким образом, благодаря надежной архитектуре консьюмера, брокер Kafka может настраивать механизм получения сообщений с высокой степенью безопасности и с минимальными рисками. Это делает Apache Kafka универсальным и надежным средством для хранения и обмена большими потоками данных, что позволяет активно использовать этот брокер сообщений в задачах Data Science и разработке распределенных приложений. В следующей статье мы поговорим про работу с Avro в Kafka.
Освоить Apache Kafka на профессиональном уровне в качестве администратора Big Data кластеров, разработчика распределенных приложений и прочих прикладных областях Data Science вы сможете на практических курсах по Kafka в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве:
- Администрирование кластера Kafka
- Kafka Streams для разработчиков
- Интеграция Apache Kafka для разработчиков
- Администрирование кластера Arenadata Streaming Kafka
Источники
- Н.Нархид, Г.Шапира, Т.Палино. Apache Kafka. Потоковая обработка и анализ данных