В прошлый раз мы говорили про механизм перебалансировки данных в брокере Apache Kafka. Сегодня поговорим про цикл опроса в Kafka. Читайте далее про особенности работы цикла опроса, благодаря которому активные брокеры Kafka могут получать актуальные (новые) данные из топиков в реальном времени (по мере их поступления).
Как работает цикл опроса в Apache Kafka: особенности получения записей Big Data из топиков
Цикл опроса — это механизм, представляющий собой бесконечный цикл (цикл, выполнение которого не завершится до тех пор, пока не произойдет полная остановка программы), который осуществляет координацию (распределение подписчиков по топикам) и перебалансировку разделов (передачу раздела от неактивного подписчика активному), а также отвечает за извлечение данных из топиков. Структура цикла опроса на языке Java выглядит следующим образом [1]:
try {
// бесконечный цикл
while (true){
// получаем извлекаем все данные
ConsumerRecords <String, String> records = consumer.poll(100);
// обрабатываем каждую запись в отдельности
for (ConsumerRecord<String, String> record: records){
}
}
} finally {
consumer.close();
}
Извлечение и обработка записей Kafka: несколько практических примеров
Для получения данных используется метод poll(), который в качестве параметра принимает время ожидания появления новой записи. Следующий код на языке Java отвечает за извлечение Kafka-записей [1]:
ConsumerRecords <String, String> records = consumer.poll (300);
Метод poll() отвечает за получение записей, каждая из которых содержит топик, раздел (partition), из которого она поступила, а также ключ (название записи) и значение. Этот метод не только получает записи, но и отвечает за поиск координатора группы (для распределения подписчиков по разделам), а также за присоединение подписчиков (consumer) к группам. Все это обычно происходит при первом вызове метода poll(). При получении каждая запись обрабатывается отдельно с помощью специального цикла. Следующий код на языке Java отвечает за обработку каждой из полученных записей [1]:
for (ConsumerRecord <String, String> record : records)
{
// обработка каждой полученной записи
log.debug("topic = %s, partition = %d, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
int updatedCount = 1;
if (custCountryMap.countainsValue(record.value())) {
updatedCount = custCountryMap.get(record.value()) + 1;
}
Как видно из кода, обработка каждой записи включает в себя получение ключа этой записи, ее значения, смещения (порядкового номера), и топика, из которого она была получена. Однако, вышерассмотренный метод не позволяет получать поступающие записи в реальном времени (необходимо постоянно перезапускать программу для получения новой записи, что весьма неудобно). Для получения новых записей в реальном времени необходимо реализовать бесконечный цикл опроса [1]:
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
log.debug("topic = %s, partition = %d, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
int updatedCount = 1;
if (custCountryMap.countainsValue(record.value())) {
updatedCount = custCountryMap.get(record.value()) + 1;
}
custCountryMap.put(record.value(), updatedCount)
JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString(4));
}
}
} finally {
consumer.close();
}
Таким образом, благодаря механизму бесконечного цикла опроса, Kafka-брокеры могут получать всю актуальную Big Data информацию в реальном времени без каких-либо задержек. Это делает Apache Kafka универсальным и надежным средством для хранения и обмена большими потоками данных, что позволяет активно использовать этот брокер сообщений в задачах Data Science и разработке распределенных приложений. В следующей статье мы поговорим про получение Kafka-записей с заданными смещениями.
Освоить Apache Kafka на профессиональном уровне в качестве администратора Big Data кластеров, разработчика распределенных приложений и прочих прикладных областях Data Science вы сможете на практических курсах по Kafka в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве:
- Администрирование кластера Kafka
- Kafka Streams для разработчиков
- Интеграция Apache Kafka для разработчиков
- Администрирование кластера Arenadata Streaming Kafka
Источники
- Н.Нархид, Г.Шапира, Т.Палино. Apache Kafka. Потоковая обработка и анализ данных



