Сегодня поговорим про механизм, отвечающий за извлечение сообщений в брокере Kafka. Читайте далее про особенности получения сообщений в распределенном брокере Apache Kafka в реальном времени
Какие элементы отвечают за извлечение сообщений в Kafka
Извлечение сообщений в Kafka — это процесс получения содержимого опубликованных продюсером сообщений в заданном топике. За получение сообщений отвечает продюсер, однако за сам процесс извлечения отвечают такие элементы, как:
- смещение (offset) — это порядковый номер записи (индекс), который указывается для получения конкретной записи в данный момент времени независимо от того, когда она была создана. Заданные пользователем смещения позволяют переходить к конкретной записи минуя все остальные. Брокер Kafka устроен таким образом, что каждой записи при создании присваивается порядковый номер, на который указывает курсор (метка, которая перемещается при обращении или создании записи). Как только пользователь обращается к конкретной записи, указывая соответствующий индекс, курсор перемещается в данном направлении и фиксируется на заданном элементе (записи);
- цикл опроса — это механизм, представляющий собой бесконечный цикл (цикл, выполнение которого не завершится до тех пор, пока не произойдет полная остановка программы), который осуществляет координацию (распределение подписчиков по топикам) и перебалансировку разделов (передачу раздела от неактивного подписчика активному), а также отвечает за извлечение данных из топиков [1].
Извлечение сообщений в Kafka: несколько практических примеров
Для того, чтобы обеспечить возможность получения конкретных записей, можно записывать все смещения и соответствующие им записи в отдельную коллекцию (массив). Для этого необходимо каждый раз при обращении к серверу Kafka создавать экземпляр класса TopicPartition, помещая в него значение (value) записи, а также всю информацию о ее хранении (топик, раздел, смещение). Следующий код на языке Java отвечает за обращение к серверу и заполнение коллекции информацией о каждой полученной от сервера записи [1]:
while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { currentOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset()); processRecord(record); storeRecordInDB(record); consumer.commitAsync(currentOffsets); } }
Как видно из вышеприведенного кода, для получения данных используется метод poll(), который в качестве параметра принимает время ожидания появления новой записи. Метод poll() отвечает за получение записей, каждая из которых содержит топик, раздел (partition), из которого она поступила, а также ключ (название записи) и значение. Этот метод не только получает записи, но и отвечает за поиск координатора группы (для распределения потребителей по разделам), а также за присоединение потребителей (consumer) к группам. Все это обычно происходит при первом вызове метода poll(). При получении каждая запись обрабатывается отдельно с помощью специального цикла опроса.
Однако стоит отметить, что данный способ не гарантирует корректной работы при аварийном сбое приложения, так как при каждом сбое все обращения к серверу будут инициироваться заново, и при повторном заполнении коллекции в ней появятся дубликаты. Для того, чтобы гарантировать корректное обращение к записям по заданным смещениям во время сбоя, необходимо предусмотреть возможность перебалансировки (передача раздела от неактивного подписчика активному) и каждый раз, когда она происходит, заново выполнять поиск заданного смещения с помощью метода seek() [1]:
public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener { public void onPartitionsRevoked (Collection<TopicPartition> partitions) { commitDBTransaction(); } public void onPartitionsAssigned( Collection<TopicPartition> partitions) { for(TopicPartition partition: partitions) consumer.seek(partition, getOffsetFromDB(partition)); } } }
Таким образом, брокер Kafka гарантирует надежное извлечение и обработку каждой записи в распределенном кластере. Это делает Apache Kafka универсальным и надежным средством для хранения и обмена большими потоками данных, что позволяет активно использовать этот брокер сообщений в задачах Data Science и разработке распределенных приложений.
Apache Kafka для инженеров данных
Код курса
DEVKI
Ближайшая дата курса
21 октября, 2024
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.
Освоить Apache Kafka на профессиональном уровне в качестве администратора Big Data кластеров, разработчика распределенных приложений и прочих прикладных областях Data Science вы сможете на практических курсах по Kafka в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве: