В прошлый раз мы говорили про механизм перебалансировки данных в брокере Apache Kafka. Сегодня поговорим про цикл опроса в Kafka. Читайте далее про особенности работы цикла опроса, благодаря которому активные брокеры Kafka могут получать актуальные (новые) данные из топиков в реальном времени (по мере их поступления).
Как работает цикл опроса в Apache Kafka: особенности получения записей Big Data из топиков
Цикл опроса — это механизм, представляющий собой бесконечный цикл (цикл, выполнение которого не завершится до тех пор, пока не произойдет полная остановка программы), который осуществляет координацию (распределение подписчиков по топикам) и перебалансировку разделов (передачу раздела от неактивного подписчика активному), а также отвечает за извлечение данных из топиков. Структура цикла опроса на языке Java выглядит следующим образом [1]:
try { // бесконечный цикл while (true){ // получаем извлекаем все данные ConsumerRecords <String, String> records = consumer.poll(100); // обрабатываем каждую запись в отдельности for (ConsumerRecord<String, String> record: records){ } } } finally { consumer.close(); }
Извлечение и обработка записей Kafka: несколько практических примеров
Для получения данных используется метод poll()
, который в качестве параметра принимает время ожидания появления новой записи. Следующий код на языке Java отвечает за извлечение Kafka-записей [1]:
ConsumerRecords <String, String> records = consumer.poll (300);
Метод poll()
отвечает за получение записей, каждая из которых содержит топик, раздел (partition), из которого она поступила, а также ключ (название записи) и значение. Этот метод не только получает записи, но и отвечает за поиск координатора группы (для распределения подписчиков по разделам), а также за присоединение подписчиков (consumer) к группам. Все это обычно происходит при первом вызове метода poll()
. При получении каждая запись обрабатывается отдельно с помощью специального цикла. Следующий код на языке Java отвечает за обработку каждой из полученных записей [1]:
for (ConsumerRecord <String, String> record : records) { // обработка каждой полученной записи log.debug("topic = %s, partition = %d, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); int updatedCount = 1; if (custCountryMap.countainsValue(record.value())) { updatedCount = custCountryMap.get(record.value()) + 1; }
Как видно из кода, обработка каждой записи включает в себя получение ключа этой записи, ее значения, смещения (порядкового номера), и топика, из которого она была получена. Однако, вышерассмотренный метод не позволяет получать поступающие записи в реальном времени (необходимо постоянно перезапускать программу для получения новой записи, что весьма неудобно). Для получения новых записей в реальном времени необходимо реализовать бесконечный цикл опроса [1]:
try { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { log.debug("topic = %s, partition = %d, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); int updatedCount = 1; if (custCountryMap.countainsValue(record.value())) { updatedCount = custCountryMap.get(record.value()) + 1; } custCountryMap.put(record.value(), updatedCount) JSONObject json = new JSONObject(custCountryMap); System.out.println(json.toString(4)); } } } finally { consumer.close(); }
Таким образом, благодаря механизму бесконечного цикла опроса, Kafka-брокеры могут получать всю актуальную Big Data информацию в реальном времени без каких-либо задержек. Это делает Apache Kafka универсальным и надежным средством для хранения и обмена большими потоками данных, что позволяет активно использовать этот брокер сообщений в задачах Data Science и разработке распределенных приложений. В следующей статье мы поговорим про получение Kafka-записей с заданными смещениями.
Освоить Apache Kafka на профессиональном уровне в качестве администратора Big Data кластеров, разработчика распределенных приложений и прочих прикладных областях Data Science вы сможете на практических курсах по Kafka в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве:
- Администрирование кластера Kafka
- Kafka Streams для разработчиков
- Интеграция Apache Kafka для разработчиков
- Администрирование кластера Arenadata Streaming Kafka
Источники
- Н.Нархид, Г.Шапира, Т.Палино. Apache Kafka. Потоковая обработка и анализ данных